| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- # ==================== ocr_service.py ====================
- import cv2
- import easyocr
- import numpy as np
- import re
- import requests
- from io import BytesIO
- from PIL import Image
- from typing import List, Tuple, Dict, Optional
- import logging
- logger = logging.getLogger(__name__)
- class OCRService:
- """Service for extracting text from product images using OCR."""
-
- def __init__(self):
- self.reader = None
-
- def _get_reader(self):
- """Lazy load EasyOCR reader."""
- if self.reader is None:
- self.reader = easyocr.Reader(['en'], gpu=False)
- return self.reader
-
- def download_image(self, image_url: str) -> Optional[np.ndarray]:
- """Download image from URL and convert to OpenCV format."""
- try:
- response = requests.get(image_url, timeout=10)
- response.raise_for_status()
-
- # Convert to PIL Image then to OpenCV format
- pil_image = Image.open(BytesIO(response.content))
- image = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR)
- return image
- except Exception as e:
- logger.error(f"Error downloading image from {image_url}: {str(e)}")
- return None
-
- def preprocess_horizontal(self, image: np.ndarray) -> np.ndarray:
- """Preprocess image for horizontal text."""
- gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
- enhanced = cv2.GaussianBlur(gray, (5, 5), 0)
- _, binary = cv2.threshold(enhanced, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
- return binary
-
- def preprocess_vertical(self, image: np.ndarray) -> np.ndarray:
- """Preprocess image for vertical text."""
- gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
- enhanced = cv2.equalizeHist(gray)
- thresh = cv2.adaptiveThreshold(
- enhanced, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY_INV, 15, 10
- )
- return thresh
-
- def detect_text_regions(self, image: np.ndarray, preprocess_func) -> List[Tuple]:
- """Detect text regions using contours."""
- processed = preprocess_func(image)
- contours, _ = cv2.findContours(processed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
- text_regions = []
- for contour in contours:
- x, y, w, h = cv2.boundingRect(contour)
- if w > 30 and h > 30: # Filter small regions
- aspect_ratio = h / w
- text_regions.append((x, y, w, h, aspect_ratio))
- return text_regions
-
- def classify_and_extract_text(self, image: np.ndarray, regions: List[Tuple]) -> List[Tuple]:
- """Classify regions as horizontal or vertical and extract text."""
- reader = self._get_reader()
- all_detected_text = []
-
- for (x, y, w, h, aspect_ratio) in regions:
- roi = image[y:y + h, x:x + w]
- if aspect_ratio > 1.5: # Vertical text
- roi = cv2.rotate(roi, cv2.ROTATE_90_CLOCKWISE)
-
- results = reader.readtext(roi, detail=1)
- for _, text, confidence in results:
- all_detected_text.append((text, confidence))
-
- return all_detected_text
-
- def clean_ocr_output(self, ocr_results: List[Tuple], confidence_threshold: float = 0.40) -> List[Tuple]:
- """Clean OCR results by removing unwanted characters and low-confidence detections."""
- cleaned_results = []
- for text, confidence in ocr_results:
- if confidence < confidence_threshold:
- continue
-
- # Remove unwanted characters using regex
- cleaned_text = re.sub(r"[^A-Za-z0-9\s\.\,\(\)\-\%\/]", "", text)
- cleaned_text = re.sub(r"\s+", " ", cleaned_text).strip()
-
- # Remove unwanted numeric characters like single digits
- if len(cleaned_text) == 1 and cleaned_text.isdigit():
- continue
-
- if any(char.isdigit() for char in cleaned_text) and len(cleaned_text) < 2:
- continue
-
- if len(cleaned_text.strip()) > 0:
- cleaned_results.append((cleaned_text.strip(), confidence))
-
- return cleaned_results
-
- def process_image(self, image_url: str) -> Dict:
- """Main method to process image and extract text."""
- try:
- # Download image
- image = self.download_image(image_url)
- if image is None:
- print("Unable to download the image...")
- return {
- "detected_text": [],
- "extracted_attributes": {},
- "error": "Failed to download image"
- }
-
- # Detect and process horizontal text
- horizontal_regions = self.detect_text_regions(image, self.preprocess_horizontal)
- horizontal_text = self.classify_and_extract_text(image, horizontal_regions)
-
- # Detect and process vertical text
- vertical_regions = self.detect_text_regions(image, self.preprocess_vertical)
- vertical_text = self.classify_and_extract_text(image, vertical_regions)
-
- # Combine results
- all_text = horizontal_text + vertical_text
-
- # Clean results
- cleaned_results = self.clean_ocr_output(all_text, confidence_threshold=0.40)
-
- # Format for response
- detected_text = [
- {"text": text, "confidence": float(confidence)}
- for text, confidence in cleaned_results
- ]
-
- return {
- "detected_text": detected_text,
- "extracted_attributes": {}
- }
-
- except Exception as e:
- logger.error(f"Error processing image: {str(e)}")
- return {
- "detected_text": [],
- "extracted_attributes": {},
- "error": str(e)
- }
|