image_scorer.py 22 KB


  1. # image_scorer.py (FIXED - JSON serialization + NoneType)
  2. import logging
  3. from typing import Dict, List, Tuple
  4. import numpy as np
  5. from PIL import Image
  6. import cv2
  7. from sklearn.cluster import KMeans
  8. import webcolors
  9. import io
  10. import os
  11. logger = logging.getLogger(__name__)
  12. class ImageQualityScorer:
  13. """
  14. Image Quality Scorer for Product Images
  15. Evaluates: resolution, blur, background, size, format
  16. """
  17. def __init__(self, use_ai: bool = True):
  18. self.use_ai = use_ai
  19. self.ai_service = None
  20. # Initialize AI service if available
  21. if use_ai:
  22. try:
  23. from .gemini_service import GeminiAttributeService
  24. self.ai_service = GeminiAttributeService()
  25. except Exception as e:
  26. logger.warning(f"Gemini service not available: {e}")
  27. self.use_ai = False
  28. # Image scoring weights
  29. self.image_weights = {
  30. 'resolution': 0.25, # 25%
  31. 'clarity': 0.25, # 25%
  32. 'background': 0.20, # 20%
  33. 'size': 0.15, # 15%
  34. 'format': 0.15 # 15%
  35. }
  36. # Standards
  37. self.min_width = 500
  38. self.recommended_width = 1000
  39. self.min_height = 500
  40. self.recommended_height = 1000
  41. self.min_dpi = 72
  42. self.recommended_dpi = 150
  43. self.min_blur_variance = 100
  44. self.recommended_blur_variance = 500
  45. self.recommended_formats = ['JPEG', 'PNG', 'WEBP', 'JPG']
  46. self.max_file_size_mb = 5
  47. def _convert_to_json_serializable(self, obj):
  48. """Convert numpy types to native Python types for JSON serialization"""
  49. if isinstance(obj, np.integer):
  50. return int(obj)
  51. elif isinstance(obj, np.floating):
  52. return float(obj)
  53. elif isinstance(obj, np.ndarray):
  54. return obj.tolist()
  55. elif isinstance(obj, dict):
  56. return {key: self._convert_to_json_serializable(value) for key, value in obj.items()}
  57. elif isinstance(obj, (list, tuple)):
  58. return [self._convert_to_json_serializable(item) for item in obj]
  59. return obj
  60. def score_image(self, product: Dict, image_data: bytes = None, image_path: str = None) -> Dict:
  61. """
  62. Main scoring function for product images
  63. Args:
  64. product: Product dictionary with metadata
  65. image_data: Raw image bytes (optional)
  66. image_path: Path to image file (optional)
  67. Returns:
  68. Dictionary with scores, issues, and suggestions
  69. """
  70. logger.info(f"[IMAGE SCORER] Starting image scoring for SKU: {product.get('sku')}")
  71. try:
  72. # Load image
  73. if image_data:
  74. logger.info("[IMAGE SCORER] Loading image from bytes")
  75. image = Image.open(io.BytesIO(image_data)).convert("RGB")
  76. elif image_path:
  77. logger.info(f"[IMAGE SCORER] Loading image from path: {image_path}")
  78. if not os.path.exists(image_path):
  79. logger.error(f"[IMAGE SCORER] File not found: {image_path}")
  80. return {
  81. 'image_score': 0.0,
  82. 'breakdown': {},
  83. 'issues': [f'Image file not found: {image_path}'],
  84. 'suggestions': ['Verify image file exists at the specified path'],
  85. 'image_metadata': {}
  86. }
  87. image = Image.open(image_path).convert("RGB")
  88. logger.info(f"[IMAGE SCORER] Image loaded successfully: {image.size}")
  89. else:
  90. logger.warning("[IMAGE SCORER] No image provided")
  91. return {
  92. 'image_score': 0.0,
  93. 'breakdown': {},
  94. 'issues': ['No image provided'],
  95. 'suggestions': ['Upload a product image'],
  96. 'image_metadata': {}
  97. }
  98. image_np = np.array(image)
  99. logger.info(f"[IMAGE SCORER] Image converted to numpy array: {image_np.shape}")
  100. # Extract metadata
  101. metadata = self._extract_metadata(image, image_data or image_path)
  102. logger.info(f"[IMAGE SCORER] Metadata extracted: {metadata}")
  103. # Score components
  104. scores = {}
  105. issues = []
  106. suggestions = []
  107. # 1. Resolution (25%)
  108. logger.info("[IMAGE SCORER] Checking resolution...")
  109. res_score, res_issues, res_suggestions = self._check_resolution(image, metadata)
  110. scores['resolution'] = res_score
  111. issues.extend(res_issues)
  112. suggestions.extend(res_suggestions)
  113. logger.info(f"[IMAGE SCORER] Resolution score: {res_score}")
  114. # 2. Clarity/Blur (25%)
  115. logger.info("[IMAGE SCORER] Checking clarity...")
  116. clarity_score, clarity_issues, clarity_suggestions = self._check_clarity(image_np)
  117. scores['clarity'] = clarity_score
  118. issues.extend(clarity_issues)
  119. suggestions.extend(clarity_suggestions)
  120. logger.info(f"[IMAGE SCORER] Clarity score: {clarity_score}")
  121. # 3. Background (20%)
  122. logger.info("[IMAGE SCORER] Checking background...")
  123. bg_score, bg_issues, bg_suggestions, bg_info = self._check_background(image_np)
  124. scores['background'] = bg_score
  125. issues.extend(bg_issues)
  126. suggestions.extend(bg_suggestions)
  127. logger.info(f"[IMAGE SCORER] Background score: {bg_score}")
  128. # 4. Size (15%)
  129. logger.info("[IMAGE SCORER] Checking size...")
  130. size_score, size_issues, size_suggestions = self._check_size(image, metadata)
  131. scores['size'] = size_score
  132. issues.extend(size_issues)
  133. suggestions.extend(size_suggestions)
  134. logger.info(f"[IMAGE SCORER] Size score: {size_score}")
  135. # 5. Format (15%)
  136. logger.info("[IMAGE SCORER] Checking format...")
  137. format_score, format_issues, format_suggestions = self._check_format(image, metadata)
  138. scores['format'] = format_score
  139. issues.extend(format_issues)
  140. suggestions.extend(format_suggestions)
  141. logger.info(f"[IMAGE SCORER] Format score: {format_score}")
  142. # Calculate final score
  143. final_score = sum(scores[key] * self.image_weights[key] for key in scores)
  144. logger.info(f"[IMAGE SCORER] ✓ Final image score: {final_score}")
  145. # Convert all numpy types to native Python types for JSON serialization
  146. result = {
  147. 'image_score': round(float(final_score), 2),
  148. 'breakdown': {k: round(float(v), 2) for k, v in scores.items()},
  149. 'issues': issues,
  150. 'suggestions': suggestions,
  151. 'image_metadata': self._convert_to_json_serializable({**metadata, **bg_info}),
  152. 'ai_improvements': self._get_ai_improvements(product, scores, issues) if self.use_ai else None
  153. }
  154. return result
  155. except Exception as e:
  156. logger.error(f"[IMAGE SCORER] ✗ Image scoring error: {e}", exc_info=True)
  157. return {
  158. 'image_score': 0.0,
  159. 'breakdown': {},
  160. 'issues': [f"Image processing failed: {str(e)}"],
  161. 'suggestions': ['Ensure image is valid and accessible'],
  162. 'image_metadata': {}
  163. }
  164. def _extract_metadata(self, image: Image.Image, source) -> Dict:
  165. """Extract image metadata with safe handling of None values"""
  166. logger.info("[IMAGE SCORER] Extracting metadata...")
  167. width, height = image.size
  168. logger.info(f"[IMAGE SCORER] Image dimensions: {width}x{height}")
  169. # Get format - handle None case
  170. # img_format = image.format
  171. img_format="JPG"
  172. if img_format is None:
  173. # Try to detect from file extension
  174. if isinstance(source, str):
  175. ext = os.path.splitext(source)[1].upper().lstrip('.')
  176. img_format = ext if ext else 'UNKNOWN'
  177. logger.warning(f"[IMAGE SCORER] Format not in image metadata, detected from extension: {img_format}")
  178. else:
  179. img_format = 'UNKNOWN'
  180. logger.warning("[IMAGE SCORER] Format is None and cannot detect from source")
  181. logger.info(f"[IMAGE SCORER] Image format: {img_format}")
  182. # Get DPI
  183. dpi = image.info.get('dpi', (None, None))
  184. if not dpi or dpi == (None, None):
  185. # Try EXIF
  186. try:
  187. import piexif
  188. exif_data = piexif.load(image.info.get('exif', b''))
  189. x_res = exif_data['0th'].get(piexif.ImageIFD.XResolution, None)
  190. y_res = exif_data['0th'].get(piexif.ImageIFD.YResolution, None)
  191. if x_res and y_res:
  192. dpi = (int(x_res[0] / x_res[1]), int(y_res[0] / y_res[1]))
  193. else:
  194. dpi = (None, None)
  195. except Exception:
  196. dpi = (None, None)
  197. logger.info(f"[IMAGE SCORER] DPI: {dpi}")
  198. # Get file size
  199. file_size_mb = None
  200. if isinstance(source, bytes):
  201. file_size_mb = len(source) / (1024 * 1024)
  202. elif isinstance(source, str):
  203. if os.path.exists(source):
  204. file_size_mb = os.path.getsize(source) / (1024 * 1024)
  205. logger.info(f"[IMAGE SCORER] File size: {file_size_mb:.2f} MB" if file_size_mb else "[IMAGE SCORER] File size: Unknown")
  206. return {
  207. 'width': int(width), # Ensure native Python int
  208. 'height': int(height), # Ensure native Python int
  209. 'dpi': dpi,
  210. 'format': str(img_format), # Ensure string
  211. 'mode': str(image.mode),
  212. 'file_size_mb': round(float(file_size_mb), 2) if file_size_mb else None
  213. }
  214. def _check_resolution(self, image: Image.Image, metadata: Dict) -> Tuple[float, List[str], List[str]]:
  215. """Check image resolution (DPI)"""
  216. issues = []
  217. suggestions = []
  218. dpi = metadata.get('dpi', (None, None))
  219. if not dpi or dpi == (None, None) or dpi[0] is None:
  220. suggestions.append("DPI information not available in image, ensure high-quality source")
  221. score = 70.0
  222. else:
  223. avg_dpi = (dpi[0] + dpi[1]) / 2 if dpi[1] else dpi[0]
  224. if avg_dpi < self.min_dpi:
  225. issues.append(f"Image: Low resolution ({avg_dpi} DPI, minimum {self.min_dpi})")
  226. suggestions.append(f"Use images with at least {self.recommended_dpi} DPI")
  227. score = (avg_dpi / self.min_dpi) * 50
  228. elif avg_dpi < self.recommended_dpi:
  229. suggestions.append(f"Resolution acceptable but could be better (current: {avg_dpi} DPI)")
  230. score = 50 + ((avg_dpi - self.min_dpi) / (self.recommended_dpi - self.min_dpi)) * 50
  231. else:
  232. score = 100.0
  233. return float(score), issues, suggestions
  234. def _check_clarity(self, image_np: np.ndarray) -> Tuple[float, List[str], List[str]]:
  235. """Check image clarity using Laplacian variance (blur detection)"""
  236. issues = []
  237. suggestions = []
  238. try:
  239. gray = cv2.cvtColor(image_np, cv2.COLOR_RGB2GRAY)
  240. blur_variance = cv2.Laplacian(gray, cv2.CV_64F).var()
  241. blur_variance = float(blur_variance) # Convert to native Python float
  242. if blur_variance < self.min_blur_variance:
  243. issues.append(f"Image: Blurry/low clarity (variance: {blur_variance:.2f})")
  244. suggestions.append("Use sharp, well-focused images (variance should be > 500)")
  245. score = (blur_variance / self.min_blur_variance) * 50
  246. elif blur_variance < self.recommended_blur_variance:
  247. suggestions.append(f"Image clarity acceptable but could be sharper (variance: {blur_variance:.2f})")
  248. score = 50 + ((blur_variance - self.min_blur_variance) / (self.recommended_blur_variance - self.min_blur_variance)) * 50
  249. else:
  250. score = 100.0
  251. except Exception as e:
  252. logger.warning(f"Blur detection error: {e}")
  253. score = 70.0
  254. suggestions.append("Unable to assess image clarity")
  255. return float(score), issues, suggestions
  256. def _check_background(self, image_np: np.ndarray) -> Tuple[float, List[str], List[str], Dict]:
  257. """Check background color and coverage"""
  258. issues = []
  259. suggestions = []
  260. bg_info = {}
  261. try:
  262. pixels = image_np.reshape(-1, 3)
  263. kmeans = KMeans(n_clusters=3, random_state=0, n_init=10).fit(pixels)
  264. # Get dominant color
  265. dominant_idx = np.argmax(np.bincount(kmeans.labels_))
  266. dominant_color = tuple(int(x) for x in kmeans.cluster_centers_[dominant_idx].astype(int))
  267. # Color name and hex
  268. color_name = self._closest_color_name(dominant_color)
  269. hex_code = webcolors.rgb_to_hex(dominant_color)
  270. # Background coverage
  271. bg_pixels = int(np.sum(kmeans.labels_ == dominant_idx))
  272. total_pixels = int(len(kmeans.labels_))
  273. background_coverage = float(100 * bg_pixels / total_pixels)
  274. bg_info = {
  275. 'dominant_color_rgb': list(dominant_color), # Convert tuple to list for JSON
  276. 'dominant_color_hex': str(hex_code),
  277. 'dominant_color_name': str(color_name),
  278. 'background_coverage': round(background_coverage, 2)
  279. }
  280. # Score based on white/light background preference
  281. score_components = []
  282. # 1. Check if background is white/light (preferred for e-commerce)
  283. if color_name.lower() in ['white', 'whitesmoke', 'snow', 'ivory', 'linen']:
  284. score_components.append(100.0)
  285. elif sum(dominant_color) / 3 > 200: # Light color
  286. score_components.append(85.0)
  287. elif color_name.lower() in ['lightgray', 'lightgrey', 'gainsboro']:
  288. score_components.append(75.0)
  289. suggestions.append("Consider using pure white background for better product visibility")
  290. else:
  291. issues.append(f"Image: Non-white background ({color_name})")
  292. suggestions.append("Use white or light neutral background for e-commerce standards")
  293. score_components.append(50.0)
  294. # 2. Check coverage (background should be dominant)
  295. if background_coverage > 60:
  296. score_components.append(100.0)
  297. elif background_coverage > 40:
  298. score_components.append(80.0)
  299. else:
  300. suggestions.append(f"Background coverage low ({background_coverage:.1f}%), product may be too small")
  301. score_components.append(60.0)
  302. final_score = float(np.mean(score_components))
  303. except Exception as e:
  304. logger.warning(f"Background analysis error: {e}")
  305. final_score = 70.0
  306. suggestions.append("Unable to analyze background")
  307. return final_score, issues, suggestions, bg_info
  308. def _check_size(self, image: Image.Image, metadata: Dict) -> Tuple[float, List[str], List[str]]:
  309. """Check image dimensions"""
  310. issues = []
  311. suggestions = []
  312. width = metadata['width']
  313. height = metadata['height']
  314. score_components = []
  315. # Width check
  316. if width < self.min_width:
  317. issues.append(f"Image: Width too small ({width}px, minimum {self.min_width}px)")
  318. suggestions.append(f"Use images at least {self.recommended_width}x{self.recommended_height}px")
  319. score_components.append((width / self.min_width) * 50)
  320. elif width < self.recommended_width:
  321. suggestions.append(f"Image width acceptable but could be larger (current: {width}px)")
  322. score_components.append(50 + ((width - self.min_width) / (self.recommended_width - self.min_width)) * 50)
  323. else:
  324. score_components.append(100.0)
  325. # Height check
  326. if height < self.min_height:
  327. issues.append(f"Image: Height too small ({height}px, minimum {self.min_height}px)")
  328. score_components.append((height / self.min_height) * 50)
  329. elif height < self.recommended_height:
  330. score_components.append(50 + ((height - self.min_height) / (self.recommended_height - self.min_height)) * 50)
  331. else:
  332. score_components.append(100.0)
  333. # Aspect ratio check (should be roughly square or standard format)
  334. aspect_ratio = width / height
  335. if 0.75 <= aspect_ratio <= 1.33: # 4:3 to 3:4 range
  336. score_components.append(100.0)
  337. else:
  338. suggestions.append(f"Image aspect ratio unusual ({aspect_ratio:.2f}), consider standard format")
  339. score_components.append(80.0)
  340. final_score = float(np.mean(score_components))
  341. return final_score, issues, suggestions
  342. def _check_format(self, image: Image.Image, metadata: Dict) -> Tuple[float, List[str], List[str]]:
  343. """Check image format and file size - FIXED to handle None"""
  344. issues = []
  345. suggestions = []
  346. score_components = []
  347. # Format check - FIXED: safe handling of None
  348. img_format = metadata.get('format')
  349. if img_format is None or img_format == 'UNKNOWN':
  350. logger.warning("[IMAGE SCORER] Image format is None/Unknown")
  351. suggestions.append("Image format could not be determined, ensure proper file format")
  352. score_components.append(70.0)
  353. else:
  354. img_format_upper = str(img_format).upper() # Ensure string and uppercase
  355. if img_format_upper in self.recommended_formats:
  356. score_components.append(100.0)
  357. elif img_format_upper in ['JPG', 'JPEG']: # JPG vs JPEG
  358. score_components.append(100.0)
  359. elif img_format_upper in ['GIF', 'BMP', 'TIFF']:
  360. suggestions.append(f"Image format {img_format_upper} acceptable but consider JPEG/PNG/WEBP")
  361. score_components.append(75.0)
  362. else:
  363. issues.append(f"Image: Uncommon format ({img_format_upper})")
  364. suggestions.append("Use standard formats: JPEG, PNG, or WEBP")
  365. score_components.append(50.0)
  366. # File size check
  367. file_size_mb = metadata.get('file_size_mb')
  368. if file_size_mb:
  369. if file_size_mb <= self.max_file_size_mb:
  370. score_components.append(100.0)
  371. elif file_size_mb <= self.max_file_size_mb * 1.5:
  372. suggestions.append(f"Image file size large ({file_size_mb:.2f}MB), consider optimization")
  373. score_components.append(80.0)
  374. else:
  375. issues.append(f"Image: File size too large ({file_size_mb:.2f}MB, max {self.max_file_size_mb}MB)")
  376. suggestions.append("Compress image to reduce file size")
  377. score_components.append(50.0)
  378. else:
  379. score_components.append(85.0) # Default if size unknown
  380. final_score = float(np.mean(score_components))
  381. return final_score, issues, suggestions
  382. def _closest_color_name(self, rgb_color: tuple) -> str:
  383. """Convert RGB to closest CSS3 color name"""
  384. min_distance = float('inf')
  385. closest_name = 'unknown'
  386. try:
  387. for name in webcolors.names():
  388. r, g, b = webcolors.name_to_rgb(name)
  389. distance = (r - rgb_color[0])**2 + (g - rgb_color[1])**2 + (b - rgb_color[2])**2
  390. if distance < min_distance:
  391. min_distance = distance
  392. closest_name = name
  393. except Exception as e:
  394. logger.warning(f"Color name detection error: {e}")
  395. return closest_name
  396. def _get_ai_improvements(self, product: Dict, scores: Dict, issues: List[str]) -> Dict:
  397. """Use Gemini AI to suggest image improvements"""
  398. if not self.use_ai or not self.ai_service:
  399. return None
  400. try:
  401. if not issues:
  402. return {"note": "No improvements needed"}
  403. prompt = f"""Analyze this product image quality report and suggest improvements.
  404. PRODUCT: {product.get('title', 'Unknown')}
  405. CATEGORY: {product.get('category', 'Unknown')}
  406. SCORES:
  407. {chr(10).join(f"• {k}: {v:.1f}/100" for k, v in scores.items())}
  408. ISSUES:
  409. {chr(10).join(f"• {issue}" for issue in issues[:10])}
  410. Return ONLY this JSON:
  411. {{
  412. "priority_fixes": ["fix1", "fix2", "fix3"],
  413. "recommended_specs": {{"width": 1200, "height": 1200, "format": "JPEG", "background": "white"}},
  414. "improvement_notes": ["note1", "note2"],
  415. "confidence": "high/medium/low"
  416. }}"""
  417. response = self.ai_service._call_gemini_api(prompt, max_tokens=1024)
  418. if response and response.candidates:
  419. return self.ai_service._parse_response(response.text)
  420. return {"error": "No AI response"}
  421. except Exception as e:
  422. logger.error(f"AI improvement error: {e}")
  423. return {"error": str(e)}