# ==================== ocr_service.py ==================== import cv2 import easyocr import numpy as np import re import requests from io import BytesIO from PIL import Image from typing import List, Tuple, Dict, Optional import logging logger = logging.getLogger(__name__) class OCRService: """Service for extracting text from product images using OCR.""" def __init__(self): self.reader = None def _get_reader(self): """Lazy load EasyOCR reader.""" if self.reader is None: self.reader = easyocr.Reader(['en'], gpu=False) return self.reader def download_image(self, image_url: str) -> Optional[np.ndarray]: """Download image from URL and convert to OpenCV format.""" try: response = requests.get(image_url, timeout=10) response.raise_for_status() # Convert to PIL Image then to OpenCV format pil_image = Image.open(BytesIO(response.content)) image = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR) return image except Exception as e: logger.error(f"Error downloading image from {image_url}: {str(e)}") return None def preprocess_horizontal(self, image: np.ndarray) -> np.ndarray: """Preprocess image for horizontal text.""" gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) enhanced = cv2.GaussianBlur(gray, (5, 5), 0) _, binary = cv2.threshold(enhanced, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) return binary def preprocess_vertical(self, image: np.ndarray) -> np.ndarray: """Preprocess image for vertical text.""" gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) enhanced = cv2.equalizeHist(gray) thresh = cv2.adaptiveThreshold( enhanced, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY_INV, 15, 10 ) return thresh def detect_text_regions(self, image: np.ndarray, preprocess_func) -> List[Tuple]: """Detect text regions using contours.""" processed = preprocess_func(image) contours, _ = cv2.findContours(processed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) text_regions = [] for contour in contours: x, y, w, h = cv2.boundingRect(contour) if w > 30 and h > 30: # Filter small regions aspect_ratio = h / w text_regions.append((x, y, w, h, aspect_ratio)) return text_regions def classify_and_extract_text(self, image: np.ndarray, regions: List[Tuple]) -> List[Tuple]: """Classify regions as horizontal or vertical and extract text.""" reader = self._get_reader() all_detected_text = [] for (x, y, w, h, aspect_ratio) in regions: roi = image[y:y + h, x:x + w] if aspect_ratio > 1.5: # Vertical text roi = cv2.rotate(roi, cv2.ROTATE_90_CLOCKWISE) results = reader.readtext(roi, detail=1) for _, text, confidence in results: all_detected_text.append((text, confidence)) return all_detected_text def clean_ocr_output(self, ocr_results: List[Tuple], confidence_threshold: float = 0.40) -> List[Tuple]: """Clean OCR results by removing unwanted characters and low-confidence detections.""" cleaned_results = [] for text, confidence in ocr_results: if confidence < confidence_threshold: continue # Remove unwanted characters using regex cleaned_text = re.sub(r"[^A-Za-z0-9\s\.\,\(\)\-\%\/]", "", text) cleaned_text = re.sub(r"\s+", " ", cleaned_text).strip() # Remove unwanted numeric characters like single digits if len(cleaned_text) == 1 and cleaned_text.isdigit(): continue if any(char.isdigit() for char in cleaned_text) and len(cleaned_text) < 2: continue if len(cleaned_text.strip()) > 0: cleaned_results.append((cleaned_text.strip(), confidence)) return cleaned_results def process_image(self, image_url: str) -> Dict: """Main method to process image and extract text.""" try: # Download image image = self.download_image(image_url) if image is None: print("Unable to download the image...") return { "detected_text": [], "extracted_attributes": {}, "error": "Failed to download image" } # Detect and process horizontal text horizontal_regions = self.detect_text_regions(image, self.preprocess_horizontal) horizontal_text = self.classify_and_extract_text(image, horizontal_regions) # Detect and process vertical text vertical_regions = self.detect_text_regions(image, self.preprocess_vertical) vertical_text = self.classify_and_extract_text(image, vertical_regions) # Combine results all_text = horizontal_text + vertical_text # Clean results cleaned_results = self.clean_ocr_output(all_text, confidence_threshold=0.40) # Format for response detected_text = [ {"text": text, "confidence": float(confidence)} for text, confidence in cleaned_results ] return { "detected_text": detected_text, "extracted_attributes": {} } except Exception as e: logger.error(f"Error processing image: {str(e)}") return { "detected_text": [], "extracted_attributes": {}, "error": str(e) }