ocr_service.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. # ==================== ocr_service.py ====================
  2. import cv2
  3. import easyocr
  4. import numpy as np
  5. import re
  6. import requests
  7. from io import BytesIO
  8. from PIL import Image
  9. from typing import List, Tuple, Dict, Optional
  10. import logging
  11. logger = logging.getLogger(__name__)
  12. class OCRService:
  13. """Service for extracting text from product images using OCR."""
  14. # 🔥 Class-level cache (persists across requests)
  15. _shared_reader = None
  16. def __init__(self):
  17. # Don't initialize here - use lazy loading with class cache
  18. pass
  19. def _get_reader(self):
  20. """🔥 Lazy load EasyOCR reader with class-level caching."""
  21. if OCRService._shared_reader is None:
  22. import time
  23. start = time.time()
  24. logger.info("📥 Loading EasyOCR model...")
  25. OCRService._shared_reader = easyocr.Reader(['en'], gpu=False)
  26. load_time = time.time() - start
  27. logger.info(f"✓ EasyOCR loaded in {load_time:.1f}s and cached in memory")
  28. else:
  29. logger.debug("✓ Using cached EasyOCR reader")
  30. return OCRService._shared_reader
  31. def download_image(self, image_url: str) -> Optional[np.ndarray]:
  32. """Download image from URL and convert to OpenCV format."""
  33. try:
  34. response = requests.get(image_url, timeout=10)
  35. response.raise_for_status()
  36. # Convert to PIL Image then to OpenCV format
  37. pil_image = Image.open(BytesIO(response.content))
  38. image = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR)
  39. return image
  40. except Exception as e:
  41. logger.error(f"Error downloading image from {image_url}: {str(e)}")
  42. return None
  43. def preprocess_horizontal(self, image: np.ndarray) -> np.ndarray:
  44. """Preprocess image for horizontal text."""
  45. gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
  46. enhanced = cv2.GaussianBlur(gray, (5, 5), 0)
  47. _, binary = cv2.threshold(enhanced, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
  48. return binary
  49. def preprocess_vertical(self, image: np.ndarray) -> np.ndarray:
  50. """Preprocess image for vertical text."""
  51. gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
  52. enhanced = cv2.equalizeHist(gray)
  53. thresh = cv2.adaptiveThreshold(
  54. enhanced, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY_INV, 15, 10
  55. )
  56. return thresh
  57. def detect_text_regions(self, image: np.ndarray, preprocess_func) -> List[Tuple]:
  58. """Detect text regions using contours."""
  59. processed = preprocess_func(image)
  60. contours, _ = cv2.findContours(processed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
  61. text_regions = []
  62. for contour in contours:
  63. x, y, w, h = cv2.boundingRect(contour)
  64. if w > 30 and h > 30: # Filter small regions
  65. aspect_ratio = h / w
  66. text_regions.append((x, y, w, h, aspect_ratio))
  67. return text_regions
  68. def classify_and_extract_text(self, image: np.ndarray, regions: List[Tuple]) -> List[Tuple]:
  69. """Classify regions as horizontal or vertical and extract text."""
  70. reader = self._get_reader()
  71. all_detected_text = []
  72. for (x, y, w, h, aspect_ratio) in regions:
  73. roi = image[y:y + h, x:x + w]
  74. if aspect_ratio > 1.5: # Vertical text
  75. roi = cv2.rotate(roi, cv2.ROTATE_90_CLOCKWISE)
  76. results = reader.readtext(roi, detail=1)
  77. for _, text, confidence in results:
  78. all_detected_text.append((text, confidence))
  79. return all_detected_text
  80. def clean_ocr_output(self, ocr_results: List[Tuple], confidence_threshold: float = 0.40) -> List[Tuple]:
  81. """Clean OCR results by removing unwanted characters and low-confidence detections."""
  82. cleaned_results = []
  83. for text, confidence in ocr_results:
  84. if confidence < confidence_threshold:
  85. continue
  86. # Remove unwanted characters using regex
  87. cleaned_text = re.sub(r"[^A-Za-z0-9\s\.\,\(\)\-\%\/]", "", text)
  88. cleaned_text = re.sub(r"\s+", " ", cleaned_text).strip()
  89. # Remove unwanted numeric characters like single digits
  90. if len(cleaned_text) == 1 and cleaned_text.isdigit():
  91. continue
  92. if any(char.isdigit() for char in cleaned_text) and len(cleaned_text) < 2:
  93. continue
  94. if len(cleaned_text.strip()) > 0:
  95. cleaned_results.append((cleaned_text.strip(), confidence))
  96. return cleaned_results
  97. def process_image(self, image_url: str) -> Dict:
  98. """Main method to process image and extract text."""
  99. try:
  100. # Download image
  101. image = self.download_image(image_url)
  102. if image is None:
  103. print("Unable to download the image...")
  104. return {
  105. "detected_text": [],
  106. "extracted_attributes": {},
  107. "error": "Failed to download image"
  108. }
  109. # Detect and process horizontal text
  110. horizontal_regions = self.detect_text_regions(image, self.preprocess_horizontal)
  111. horizontal_text = self.classify_and_extract_text(image, horizontal_regions)
  112. # Detect and process vertical text
  113. vertical_regions = self.detect_text_regions(image, self.preprocess_vertical)
  114. vertical_text = self.classify_and_extract_text(image, vertical_regions)
  115. # Combine results
  116. all_text = horizontal_text + vertical_text
  117. # Clean results
  118. cleaned_results = self.clean_ocr_output(all_text, confidence_threshold=0.40)
  119. # Format for response
  120. detected_text = [
  121. {"text": text, "confidence": float(confidence)}
  122. for text, confidence in cleaned_results
  123. ]
  124. return {
  125. "detected_text": detected_text,
  126. "extracted_attributes": {}
  127. }
  128. except Exception as e:
  129. logger.error(f"Error processing image: {str(e)}")
  130. return {
  131. "detected_text": [],
  132. "extracted_attributes": {},
  133. "error": str(e)
  134. }