# # ==================== visual_processing_service.py (FIXED - Dynamic Detection) ==================== # import torch # import cv2 # import numpy as np # import requests # from io import BytesIO # from PIL import Image # from typing import Dict, List, Optional, Tuple # import logging # from transformers import CLIPProcessor, CLIPModel # from sklearn.cluster import KMeans # logger = logging.getLogger(__name__) # class VisualProcessingService: # """Service for extracting visual attributes from product images using CLIP.""" # # Class-level caching (shared across instances) # _clip_model = None # _clip_processor = None # _device = None # # Define category-specific attributes # CATEGORY_ATTRIBUTES = { # "clothing": { # "products": ["t-shirt", "shirt", "dress", "pants", "jeans", "shorts", # "skirt", "jacket", "coat", "sweater", "hoodie", "top", "blouse"], # "attributes": { # "pattern": ["solid color", "striped", "checkered", "graphic print", "floral", "geometric", "plain"], # "material": ["cotton", "polyester", "denim", "leather", "silk", "wool", "linen", "blend"], # "style": ["casual", "formal", "sporty", "streetwear", "elegant", "vintage", "bohemian"], # "fit": ["slim fit", "regular fit", "loose fit", "oversized", "tailored"], # "neckline": ["crew neck", "v-neck", "round neck", "collar", "scoop neck"], # "sleeve_type": ["short sleeve", "long sleeve", "sleeveless", "3/4 sleeve"], # "closure_type": ["button", "zipper", "pull-on", "snap", "tie"] # } # }, # "tools": { # "products": ["screwdriver", "hammer", "wrench", "pliers", "drill", "saw", # "measuring tape", "level", "chisel", "file"], # "attributes": { # "material": ["steel", "aluminum", "plastic", "wood", "rubber", "chrome"], # "type": ["manual", "electric", "pneumatic", "cordless", "corded"], # "finish": ["chrome plated", "powder coated", "stainless steel", "painted"], # "handle_type": ["rubber grip", "plastic", "wooden", "cushioned", "ergonomic"] # } # }, # "electronics": { # "products": ["phone", "laptop", "tablet", "headphones", "speaker", "camera", # "smartwatch", "charger", "mouse", "keyboard"], # "attributes": { # "material": ["plastic", "metal", "glass", "aluminum", "rubber"], # "style": ["modern", "minimalist", "sleek", "industrial", "vintage"], # "finish": ["matte", "glossy", "metallic", "textured"], # "connectivity": ["wireless", "wired", "bluetooth", "USB"] # } # }, # "furniture": { # "products": ["chair", "table", "sofa", "bed", "desk", "shelf", "cabinet", # "dresser", "bench", "stool"], # "attributes": { # "material": ["wood", "metal", "glass", "plastic", "fabric", "leather"], # "style": ["modern", "traditional", "industrial", "rustic", "contemporary", "vintage"], # "finish": ["natural wood", "painted", "stained", "laminated", "upholstered"] # } # }, # "home_decor": { # "products": ["painting", "canvas", "wall art", "frame", "vase", "lamp", # "mirror", "clock", "sculpture", "poster"], # "attributes": { # "style": ["modern", "abstract", "traditional", "contemporary", "vintage", "minimalist"], # "material": ["canvas", "wood", "metal", "glass", "ceramic", "paper"], # "finish": ["glossy", "matte", "textured", "framed", "gallery wrapped"], # "theme": ["nature", "geometric", "floral", "landscape", "portrait", "abstract"] # } # }, # "kitchen": { # "products": ["pot", "pan", "knife", "utensil", "plate", "bowl", "cup", # "appliance", "cutting board", "container"], # "attributes": { # "material": ["stainless steel", "aluminum", "ceramic", "glass", "plastic", "wood"], # "finish": ["non-stick", "stainless", "enameled", "anodized"], # "type": ["manual", "electric", "dishwasher safe"] # } # } # } # def __init__(self): # pass # @classmethod # def _get_device(cls): # """Get optimal device.""" # if cls._device is None: # cls._device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # logger.info(f"Visual Processing using device: {cls._device}") # return cls._device # @classmethod # def _get_clip_model(cls): # """Lazy load CLIP model with class-level caching.""" # if cls._clip_model is None: # logger.info("Loading CLIP model (this may take a few minutes on first use)...") # cls._clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32") # cls._clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32") # device = cls._get_device() # cls._clip_model.to(device) # cls._clip_model.eval() # logger.info("✓ CLIP model loaded successfully") # return cls._clip_model, cls._clip_processor # def download_image(self, image_url: str) -> Optional[Image.Image]: # """Download image from URL.""" # try: # response = requests.get(image_url, timeout=10) # response.raise_for_status() # image = Image.open(BytesIO(response.content)).convert('RGB') # return image # except Exception as e: # logger.error(f"Error downloading image from {image_url}: {str(e)}") # return None # def extract_dominant_colors(self, image: Image.Image, n_colors: int = 3) -> List[Dict]: # """Extract dominant colors using K-means.""" # try: # # Resize for faster processing # img_small = image.resize((150, 150)) # img_array = np.array(img_small) # pixels = img_array.reshape(-1, 3) # # K-means clustering # kmeans = KMeans(n_clusters=n_colors, random_state=42, n_init=5) # kmeans.fit(pixels) # colors = [] # labels_counts = np.bincount(kmeans.labels_) # for i, center in enumerate(kmeans.cluster_centers_): # rgb = tuple(center.astype(int)) # color_name = self._get_color_name_simple(rgb) # percentage = float(labels_counts[i] / len(kmeans.labels_) * 100) # colors.append({ # "name": color_name, # "rgb": rgb, # "percentage": percentage # }) # colors.sort(key=lambda x: x['percentage'], reverse=True) # return colors # except Exception as e: # logger.error(f"Error extracting colors: {str(e)}") # return [] # def _get_color_name_simple(self, rgb: Tuple[int, int, int]) -> str: # """ # Simple color name detection without webcolors dependency. # Maps RGB to basic color names. # """ # r, g, b = rgb # # Define basic color ranges # colors = { # 'black': (r < 50 and g < 50 and b < 50), # 'white': (r > 200 and g > 200 and b > 200), # 'gray': (abs(r - g) < 30 and abs(g - b) < 30 and abs(r - b) < 30 and 50 <= r <= 200), # 'red': (r > 150 and g < 100 and b < 100), # 'green': (g > 150 and r < 100 and b < 100), # 'blue': (b > 150 and r < 100 and g < 100), # 'yellow': (r > 200 and g > 200 and b < 100), # 'orange': (r > 200 and 100 < g < 200 and b < 100), # 'purple': (r > 100 and b > 100 and g < 100), # 'pink': (r > 200 and 100 < g < 200 and 100 < b < 200), # 'brown': (50 < r < 150 and 30 < g < 100 and b < 80), # 'cyan': (r < 100 and g > 150 and b > 150), # } # for color_name, condition in colors.items(): # if condition: # return color_name # # Default fallback # if r > g and r > b: # return 'red' # elif g > r and g > b: # return 'green' # elif b > r and b > g: # return 'blue' # else: # return 'gray' # def classify_with_clip( # self, # image: Image.Image, # candidates: List[str], # attribute_name: str, # confidence_threshold: float = 0.15 # ) -> Dict: # """Use CLIP to classify image against candidate labels.""" # try: # model, processor = self._get_clip_model() # device = self._get_device() # # Prepare inputs # inputs = processor( # text=candidates, # images=image, # return_tensors="pt", # padding=True # ) # # Move to device # inputs = {k: v.to(device) for k, v in inputs.items()} # # Get predictions # with torch.no_grad(): # outputs = model(**inputs) # logits_per_image = outputs.logits_per_image # probs = logits_per_image.softmax(dim=1).cpu() # # Get top predictions # top_k = min(3, len(candidates)) # top_probs, top_indices = torch.topk(probs[0], k=top_k) # results = [] # for prob, idx in zip(top_probs, top_indices): # if prob.item() > confidence_threshold: # results.append({ # "value": candidates[idx.item()], # "confidence": float(prob.item()) # }) # return { # "attribute": attribute_name, # "predictions": results # } # except Exception as e: # logger.error(f"Error in CLIP classification for {attribute_name}: {str(e)}") # return {"attribute": attribute_name, "predictions": []} # def detect_product_category(self, image: Image.Image) -> Tuple[str, float]: # """ # First detect which category the product belongs to. # Returns: (category_name, confidence) # """ # # Get all product types from all categories # all_categories = [] # category_map = {} # for category, data in self.CATEGORY_ATTRIBUTES.items(): # for product in data["products"]: # all_categories.append(f"a photo of a {product}") # category_map[f"a photo of a {product}"] = category # # Classify # result = self.classify_with_clip(image, all_categories, "category_detection", confidence_threshold=0.10) # if result["predictions"]: # best_match = result["predictions"][0] # detected_category = category_map[best_match["value"]] # product_type = best_match["value"].replace("a photo of a ", "") # confidence = best_match["confidence"] # logger.info(f"Detected category: {detected_category}, product: {product_type}, confidence: {confidence:.3f}") # return detected_category, product_type, confidence # return "unknown", "unknown", 0.0 # def process_image( # self, # image_url: str, # product_type_hint: Optional[str] = None # ) -> Dict: # """ # Main method to process image and extract visual attributes. # Now dynamically detects product category first. # """ # import time # start_time = time.time() # try: # # Download image # image = self.download_image(image_url) # if image is None: # return { # "visual_attributes": {}, # "error": "Failed to download image" # } # visual_attributes = {} # detailed_predictions = {} # # Step 1: Detect product category # detected_category, detected_product_type, category_confidence = self.detect_product_category(image) # # If confidence is too low, return minimal info # if category_confidence < 0.10: # logger.warning(f"Low confidence in category detection ({category_confidence:.3f}). Returning basic attributes only.") # colors = self.extract_dominant_colors(image, n_colors=3) # if colors: # visual_attributes["primary_color"] = colors[0]["name"] # visual_attributes["color_palette"] = [c["name"] for c in colors] # return { # "visual_attributes": visual_attributes, # "category_confidence": category_confidence, # "processing_time": round(time.time() - start_time, 2) # } # # Add detected product type # visual_attributes["product_type"] = detected_product_type # visual_attributes["category"] = detected_category # # Step 2: Extract color (universal attribute) # colors = self.extract_dominant_colors(image, n_colors=3) # if colors: # visual_attributes["primary_color"] = colors[0]["name"] # visual_attributes["color_palette"] = [c["name"] for c in colors] # # Step 3: Extract category-specific attributes # if detected_category in self.CATEGORY_ATTRIBUTES: # category_config = self.CATEGORY_ATTRIBUTES[detected_category] # for attr_name, attr_values in category_config["attributes"].items(): # # Use higher confidence threshold for category-specific attributes # result = self.classify_with_clip(image, attr_values, attr_name, confidence_threshold=0.20) # if result["predictions"]: # # Only add if confidence is reasonable # best_prediction = result["predictions"][0] # if best_prediction["confidence"] > 0.20: # visual_attributes[attr_name] = best_prediction["value"] # detailed_predictions[attr_name] = result # processing_time = time.time() - start_time # return { # "visual_attributes": visual_attributes, # "detailed_predictions": detailed_predictions, # "category_confidence": category_confidence, # "processing_time": round(processing_time, 2) # } # except Exception as e: # logger.error(f"Error processing image: {str(e)}") # return { # "visual_attributes": {}, # "error": str(e), # "processing_time": round(time.time() - start_time, 2) # } # ==================== visual_processing_service.py (FIXED - Smart Subcategory Detection) ==================== import torch import numpy as np import requests from io import BytesIO from PIL import Image from typing import Dict, List, Optional, Tuple import logging from transformers import CLIPProcessor, CLIPModel from sklearn.cluster import KMeans logger = logging.getLogger(__name__) class VisualProcessingService: """Service for extracting visual attributes from product images using CLIP with smart subcategory detection.""" # Class-level caching (shared across instances) _clip_model = None _clip_processor = None _device = None # Define hierarchical category structure with subcategories CATEGORY_ATTRIBUTES = { "clothing": { "subcategories": { "tops": { "products": ["t-shirt", "shirt", "blouse", "top", "sweater", "hoodie", "tank top", "polo shirt"], "attributes": { "pattern": ["solid color", "striped", "checkered", "graphic print", "floral", "geometric", "plain", "logo print"], "material": ["cotton", "polyester", "silk", "wool", "linen", "blend", "knit"], "style": ["casual", "formal", "sporty", "streetwear", "elegant", "vintage", "minimalist"], "fit": ["slim fit", "regular fit", "loose fit", "oversized", "fitted"], "neckline": ["crew neck", "v-neck", "round neck", "collar", "scoop neck", "henley"], "sleeve_type": ["short sleeve", "long sleeve", "sleeveless", "3/4 sleeve", "cap sleeve"], "closure_type": ["button-up", "zipper", "pull-on", "snap button"] } }, "bottoms": { "products": ["jeans", "pants", "trousers", "shorts", "chinos", "cargo pants", "leggings"], "attributes": { "pattern": ["solid color", "distressed", "faded", "plain", "washed", "dark wash", "light wash"], "material": ["denim", "cotton", "polyester", "wool", "blend", "twill", "corduroy"], "style": ["casual", "formal", "sporty", "vintage", "modern", "workwear"], "fit": ["slim fit", "regular fit", "loose fit", "skinny", "bootcut", "straight leg", "relaxed fit"], "rise": ["high rise", "mid rise", "low rise"], "closure_type": ["button fly", "zipper fly", "elastic waist", "drawstring"], "length": ["full length", "cropped", "ankle length", "capri"] } }, "dresses_skirts": { "products": ["dress", "skirt", "gown", "sundress", "maxi dress", "mini skirt"], "attributes": { "pattern": ["solid color", "floral", "striped", "geometric", "plain", "printed", "polka dot"], "material": ["cotton", "silk", "polyester", "linen", "blend", "chiffon", "satin"], "style": ["casual", "formal", "cocktail", "bohemian", "vintage", "elegant", "party"], "fit": ["fitted", "loose", "a-line", "bodycon", "flowy", "wrap"], "neckline": ["crew neck", "v-neck", "scoop neck", "halter", "off-shoulder", "sweetheart"], "sleeve_type": ["short sleeve", "long sleeve", "sleeveless", "3/4 sleeve", "flutter sleeve"], "length": ["mini", "midi", "maxi", "knee-length", "floor-length"] } }, "outerwear": { "products": ["jacket", "coat", "blazer", "windbreaker", "parka", "bomber jacket", "denim jacket"], "attributes": { "pattern": ["solid color", "plain", "quilted", "textured"], "material": ["leather", "denim", "wool", "polyester", "cotton", "nylon", "fleece"], "style": ["casual", "formal", "sporty", "vintage", "military", "biker"], "fit": ["slim fit", "regular fit", "oversized", "cropped"], "closure_type": ["zipper", "button", "snap button", "toggle"], "length": ["cropped", "hip length", "thigh length", "knee length"] } } } }, "footwear": { "products": ["sneakers", "boots", "sandals", "heels", "loafers", "flats", "slippers"], "attributes": { "material": ["leather", "canvas", "suede", "synthetic", "rubber", "mesh"], "style": ["casual", "formal", "athletic", "vintage", "modern"], "closure_type": ["lace-up", "slip-on", "velcro", "buckle", "zipper"], "toe_style": ["round toe", "pointed toe", "square toe", "open toe", "closed toe"] } }, "tools": { "products": ["screwdriver", "hammer", "wrench", "pliers", "drill", "saw", "measuring tape"], "attributes": { "material": ["steel", "aluminum", "plastic", "rubber", "chrome", "iron"], "type": ["manual", "electric", "pneumatic", "cordless", "corded"], "finish": ["chrome plated", "powder coated", "stainless steel", "painted"], "handle_type": ["rubber grip", "plastic", "wooden", "ergonomic", "cushioned"] } }, "electronics": { "products": ["phone", "laptop", "tablet", "headphones", "speaker", "camera", "smartwatch", "earbuds"], "attributes": { "material": ["plastic", "metal", "glass", "aluminum", "rubber", "silicone"], "style": ["modern", "minimalist", "sleek", "industrial", "vintage"], "finish": ["matte", "glossy", "metallic", "textured", "transparent"], "connectivity": ["wireless", "wired", "bluetooth", "USB-C", "USB"] } }, "furniture": { "products": ["chair", "table", "sofa", "bed", "desk", "shelf", "cabinet", "bench"], "attributes": { "material": ["wood", "metal", "glass", "plastic", "fabric", "leather", "rattan"], "style": ["modern", "traditional", "industrial", "rustic", "contemporary", "vintage", "scandinavian"], "finish": ["natural wood", "painted", "stained", "laminated", "upholstered", "polished"] } } } def __init__(self): pass @classmethod def _get_device(cls): """Get optimal device.""" if cls._device is None: cls._device = torch.device("cuda" if torch.cuda.is_available() else "cpu") logger.info(f"Visual Processing using device: {cls._device}") return cls._device @classmethod def _get_clip_model(cls): """Lazy load CLIP model with class-level caching.""" if cls._clip_model is None: logger.info("Loading CLIP model (this may take a few minutes on first use)...") cls._clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32") cls._clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32") device = cls._get_device() cls._clip_model.to(device) cls._clip_model.eval() logger.info("✓ CLIP model loaded successfully") return cls._clip_model, cls._clip_processor def download_image(self, image_url: str) -> Optional[Image.Image]: """Download image from URL.""" try: response = requests.get(image_url, timeout=10) response.raise_for_status() image = Image.open(BytesIO(response.content)).convert('RGB') return image except Exception as e: logger.error(f"Error downloading image from {image_url}: {str(e)}") return None def extract_dominant_colors(self, image: Image.Image, n_colors: int = 3) -> List[Dict]: """Extract dominant colors using K-means clustering.""" try: # Resize for faster processing img_small = image.resize((150, 150)) img_array = np.array(img_small) pixels = img_array.reshape(-1, 3) # K-means clustering kmeans = KMeans(n_clusters=n_colors, random_state=42, n_init=5) kmeans.fit(pixels) colors = [] labels_counts = np.bincount(kmeans.labels_) for i, center in enumerate(kmeans.cluster_centers_): rgb = tuple(center.astype(int)) color_name = self._get_color_name_simple(rgb) percentage = float(labels_counts[i] / len(kmeans.labels_) * 100) colors.append({ "name": color_name, "rgb": rgb, "percentage": round(percentage, 2) }) # Sort by percentage (most dominant first) colors.sort(key=lambda x: x['percentage'], reverse=True) return colors except Exception as e: logger.error(f"Error extracting colors: {str(e)}") return [] def _get_color_name_simple(self, rgb: Tuple[int, int, int]) -> str: """Map RGB values to basic color names.""" r, g, b = rgb # Define color ranges with priorities colors = { 'black': (r < 50 and g < 50 and b < 50), 'white': (r > 200 and g > 200 and b > 200), 'gray': (abs(r - g) < 30 and abs(g - b) < 30 and abs(r - b) < 30 and 50 <= r <= 200), 'red': (r > 150 and g < 100 and b < 100), 'green': (g > 150 and r < 100 and b < 100), 'blue': (b > 150 and r < 100 and g < 100), 'yellow': (r > 200 and g > 200 and b < 100), 'orange': (r > 200 and 100 < g < 200 and b < 100), 'purple': (r > 100 and b > 100 and g < 100), 'pink': (r > 200 and 100 < g < 200 and 100 < b < 200), 'brown': (50 < r < 150 and 30 < g < 100 and b < 80), 'cyan': (r < 100 and g > 150 and b > 150), 'beige': (180 < r < 240 and 160 < g < 220 and 120 < b < 180), } for color_name, condition in colors.items(): if condition: return color_name # Fallback to dominant channel if r > g and r > b: return 'red' elif g > r and g > b: return 'green' elif b > r and b > g: return 'blue' else: return 'gray' def classify_with_clip( self, image: Image.Image, candidates: List[str], attribute_name: str, confidence_threshold: float = 0.15 ) -> Dict: """Use CLIP to classify image against candidate labels.""" try: model, processor = self._get_clip_model() device = self._get_device() # Prepare inputs inputs = processor( text=candidates, images=image, return_tensors="pt", padding=True ) # Move to device inputs = {k: v.to(device) for k, v in inputs.items()} # Get predictions with torch.no_grad(): outputs = model(**inputs) logits_per_image = outputs.logits_per_image probs = logits_per_image.softmax(dim=1).cpu() # Get top predictions top_k = min(3, len(candidates)) top_probs, top_indices = torch.topk(probs[0], k=top_k) results = [] for prob, idx in zip(top_probs, top_indices): if prob.item() > confidence_threshold: results.append({ "value": candidates[idx.item()], "confidence": round(float(prob.item()), 3) }) return { "attribute": attribute_name, "predictions": results } except Exception as e: logger.error(f"Error in CLIP classification for {attribute_name}: {str(e)}") return {"attribute": attribute_name, "predictions": []} def detect_category_and_subcategory(self, image: Image.Image) -> Tuple[str, str, str, float]: """ Hierarchically detect category, subcategory, and specific product. Returns: (category, subcategory, product_type, confidence) """ # Step 1: Detect if it's clothing or something else main_categories = list(self.CATEGORY_ATTRIBUTES.keys()) category_prompts = [f"a photo of {cat}" for cat in main_categories] result = self.classify_with_clip(image, category_prompts, "main_category", confidence_threshold=0.10) if not result["predictions"]: return "unknown", "unknown", "unknown", 0.0 detected_category = result["predictions"][0]["value"].replace("a photo of ", "") category_confidence = result["predictions"][0]["confidence"] logger.info(f"Step 1 - Main category detected: {detected_category} (confidence: {category_confidence:.3f})") # Step 2: For clothing, detect subcategory (tops/bottoms/dresses/outerwear) if detected_category == "clothing": subcategories = self.CATEGORY_ATTRIBUTES["clothing"]["subcategories"] # Collect all products grouped by subcategory all_products = [] product_to_subcategory = {} for subcat, subcat_data in subcategories.items(): for product in subcat_data["products"]: prompt = f"a photo of {product}" all_products.append(prompt) product_to_subcategory[prompt] = subcat # Step 3: Detect specific product type product_result = self.classify_with_clip( image, all_products, "product_type", confidence_threshold=0.12 ) if product_result["predictions"]: best_match = product_result["predictions"][0] product_prompt = best_match["value"] product_type = product_prompt.replace("a photo of ", "") subcategory = product_to_subcategory[product_prompt] product_confidence = best_match["confidence"] logger.info(f"Step 2 - Detected: {subcategory} > {product_type} (confidence: {product_confidence:.3f})") return detected_category, subcategory, product_type, product_confidence else: logger.warning("Could not detect specific product type for clothing") return detected_category, "unknown", "unknown", category_confidence # Step 3: For non-clothing categories, just detect product type else: category_data = self.CATEGORY_ATTRIBUTES[detected_category] # Check if this category has subcategories or direct products if "products" in category_data: products = category_data["products"] product_prompts = [f"a photo of {p}" for p in products] product_result = self.classify_with_clip( image, product_prompts, "product_type", confidence_threshold=0.12 ) if product_result["predictions"]: best_match = product_result["predictions"][0] product_type = best_match["value"].replace("a photo of ", "") logger.info(f"Step 2 - Detected: {detected_category} > {product_type}") return detected_category, "none", product_type, best_match["confidence"] return detected_category, "unknown", "unknown", category_confidence def process_image( self, image_url: str, product_type_hint: Optional[str] = None ) -> Dict: """ Main method to process image and extract visual attributes. Uses hierarchical detection to extract only relevant attributes. """ import time start_time = time.time() try: # Download image image = self.download_image(image_url) if image is None: return { "visual_attributes": {}, "error": "Failed to download image" } visual_attributes = {} detailed_predictions = {} # Step 1: Detect category, subcategory, and product type category, subcategory, product_type, confidence = self.detect_category_and_subcategory(image) # Low confidence check if confidence < 0.10: logger.warning(f"Low confidence in detection ({confidence:.3f}). Returning basic attributes only.") colors = self.extract_dominant_colors(image, n_colors=3) if colors: visual_attributes["primary_color"] = colors[0]["name"] visual_attributes["color_palette"] = [c["name"] for c in colors] return { "visual_attributes": visual_attributes, "detection_confidence": confidence, "warning": "Low confidence detection", "processing_time": round(time.time() - start_time, 2) } # Add detected metadata visual_attributes["product_type"] = product_type visual_attributes["category"] = category if subcategory != "none" and subcategory != "unknown": visual_attributes["subcategory"] = subcategory # Step 2: Extract color information (universal) colors = self.extract_dominant_colors(image, n_colors=3) if colors: visual_attributes["primary_color"] = colors[0]["name"] visual_attributes["color_palette"] = [c["name"] for c in colors[:3]] visual_attributes["color_distribution"] = [ {"color": c["name"], "percentage": c["percentage"]} for c in colors ] # Step 3: Get the right attribute configuration based on subcategory attributes_config = None if category == "clothing": if subcategory in self.CATEGORY_ATTRIBUTES["clothing"]["subcategories"]: attributes_config = self.CATEGORY_ATTRIBUTES["clothing"]["subcategories"][subcategory]["attributes"] logger.info(f"Using attributes for subcategory: {subcategory}") else: logger.warning(f"Unknown subcategory: {subcategory}. Skipping attribute extraction.") elif category in self.CATEGORY_ATTRIBUTES: if "attributes" in self.CATEGORY_ATTRIBUTES[category]: attributes_config = self.CATEGORY_ATTRIBUTES[category]["attributes"] logger.info(f"Using attributes for category: {category}") # Step 4: Extract category-specific attributes if attributes_config: for attr_name, attr_values in attributes_config.items(): result = self.classify_with_clip( image, attr_values, attr_name, confidence_threshold=0.20 ) if result["predictions"]: best_prediction = result["predictions"][0] # Only add attributes with reasonable confidence if best_prediction["confidence"] > 0.20: visual_attributes[attr_name] = best_prediction["value"] # Store detailed predictions for debugging detailed_predictions[attr_name] = result processing_time = time.time() - start_time logger.info(f"✓ Processing complete in {processing_time:.2f}s. Extracted {len(visual_attributes)} attributes.") return { "visual_attributes": visual_attributes, "detailed_predictions": detailed_predictions, "detection_confidence": confidence, "processing_time": round(processing_time, 2) } except Exception as e: logger.error(f"Error processing image: {str(e)}") return { "visual_attributes": {}, "error": str(e), "processing_time": round(time.time() - start_time, 2) } # # ==================== visual_processing_service_enhanced.py ==================== # """ # Enhanced Visual Processing Service combining CLIP's speed with BLIP-2's comprehensive taxonomy. # Features: # - Fast CLIP-based classification # - 70+ product categories across multiple domains # - Two-stage classification with validation # - Enhanced color normalization # - Category-specific attribute detection # - Confidence-based fallback mechanisms # - Optional center cropping for better focus # Usage: # service = VisualProcessingService() # result = service.process_image("https://example.com/product.jpg") # """ # import torch # import cv2 # import numpy as np # import requests # from io import BytesIO # from PIL import Image # from typing import Dict, List, Optional, Tuple # import logging # from transformers import CLIPProcessor, CLIPModel # from sklearn.cluster import KMeans # logger = logging.getLogger(__name__) # class VisualProcessingService: # """Enhanced service for extracting visual attributes from product images using CLIP.""" # # Class-level caching (shared across instances) # _clip_model = None # _clip_processor = None # _device = None # # ==================== EXPANDED TAXONOMY ==================== # # Base color vocabulary # COLORS = ["black", "white", "red", "blue", "green", "yellow", "gray", # "brown", "pink", "purple", "orange", "beige", "navy", "teal"] # # Pattern vocabulary # PATTERNS = ["solid", "striped", "checked", "plaid", "floral", "graphic", # "polka dot", "camo", "tie-dye", "abstract", "geometric"] # # Material vocabulary (extended) # MATERIALS = ["cotton", "polyester", "denim", "leather", "wool", "canvas", # "silicone", "metal", "fabric", "rubber", "plastic", "wood", # "glass", "ceramic", "steel", "foam", "aluminum", "carbon fiber"] # # Style vocabulary # STYLES = ["casual", "formal", "sporty", "streetwear", "elegant", "vintage", # "modern", "bohemian", "minimalist", "industrial", "rustic", "contemporary"] # # Fit vocabulary # FITS = ["slim fit", "regular fit", "loose fit", "oversized", "tailored", # "relaxed", "athletic fit"] # # Brand vocabulary (common marketplace brands) # BRANDS = ["nike", "adidas", "sony", "samsung", "apple", "generic", "lego", # "hasbro", "lg", "panasonic", "microsoft"] # # Age group vocabulary # AGE_GROUPS = ["baby", "toddler", "child", "teen", "adult", "all ages"] # # Comprehensive category-specific attributes # CATEGORY_ATTRIBUTES = { # # ==================== CLOTHING ==================== # "clothing": { # "products": ["t-shirt", "shirt", "dress", "pants", "jeans", "shorts", # "skirt", "jacket", "coat", "sweater", "hoodie", "top", # "blouse", "cardigan", "blazer"], # "attributes": { # "color": COLORS, # "pattern": PATTERNS, # "material": ["cotton", "polyester", "denim", "leather", "silk", # "wool", "linen", "blend", "canvas"], # "style": STYLES, # "fit": FITS, # "neckline": ["crew neck", "v-neck", "round neck", "collar", # "scoop neck", "boat neck", "turtleneck"], # "sleeve_type": ["short sleeve", "long sleeve", "sleeveless", # "3/4 sleeve", "cap sleeve"], # "closure_type": ["button", "zipper", "pull-on", "snap", "tie", "buckle"] # } # }, # # ==================== FOOTWEAR ==================== # "footwear": { # "products": ["shoes", "sneakers", "sandals", "boots", "slippers", # "heels", "loafers"], # "attributes": { # "color": COLORS, # "material": ["leather", "synthetic", "canvas", "rubber", "suede", "fabric"], # "type": ["sneakers", "sandals", "formal", "boots", "sports", "casual"], # "style": STYLES, # "closure_type": ["lace-up", "slip-on", "velcro", "zipper", "buckle"] # } # }, # # ==================== ACCESSORIES ==================== # "accessories": { # "products": ["watch", "bag", "backpack", "handbag", "wallet", "belt", # "sunglasses", "hat", "scarf"], # "attributes": { # "color": COLORS, # "material": ["leather", "fabric", "metal", "plastic", "canvas", "synthetic"], # "style": STYLES, # "type": ["backpack", "tote", "crossbody", "messenger", "duffel"] # } # }, # # ==================== JEWELRY ==================== # "jewelry": { # "products": ["necklace", "ring", "bracelet", "earrings", "pendant", "chain"], # "attributes": { # "material": ["gold", "silver", "platinum", "stainless steel", # "plastic", "beads", "leather"], # "style": ["modern", "vintage", "minimalist", "statement", "elegant"], # "type": ["chain", "band", "solitaire", "hoop", "stud"] # } # }, # # ==================== ELECTRONICS ==================== # "electronics": { # "products": ["phone", "smartphone", "tablet", "laptop", "headphones", # "camera", "tv", "monitor", "keyboard", "mouse", "speaker", # "smartwatch", "charger"], # "attributes": { # "color": COLORS, # "material": ["plastic", "metal", "glass", "aluminum", "rubber"], # "style": ["modern", "minimalist", "sleek", "industrial"], # "finish": ["matte", "glossy", "metallic", "textured"], # "type": ["over-ear", "in-ear", "on-ear", "wireless", "wired"], # "brand": BRANDS # } # }, # # ==================== FURNITURE ==================== # "furniture": { # "products": ["chair", "table", "sofa", "bed", "desk", "shelf", # "cabinet", "dresser", "bench", "stool", "bookshelf"], # "attributes": { # "color": COLORS, # "material": ["wood", "metal", "glass", "plastic", "fabric", "leather"], # "style": ["modern", "traditional", "industrial", "rustic", # "contemporary", "vintage", "minimalist"], # "finish": ["natural wood", "painted", "stained", "laminated", "upholstered"] # } # }, # # ==================== HOME DECOR ==================== # "home_decor": { # "products": ["painting", "canvas", "wall art", "frame", "vase", "lamp", # "mirror", "clock", "sculpture", "poster", "cushion", "rug"], # "attributes": { # "color": COLORS, # "style": ["modern", "abstract", "traditional", "contemporary", # "vintage", "minimalist", "bohemian"], # "material": ["canvas", "wood", "metal", "glass", "ceramic", "paper", "fabric"], # "finish": ["glossy", "matte", "textured", "framed"], # "theme": ["nature", "geometric", "floral", "landscape", "abstract"] # } # }, # # ==================== KITCHEN ==================== # "kitchen": { # "products": ["pot", "pan", "knife", "utensil", "plate", "bowl", "cup", # "mug", "bottle", "container", "cutting board"], # "attributes": { # "material": ["stainless steel", "aluminum", "ceramic", "glass", # "plastic", "wood", "silicone"], # "finish": ["non-stick", "stainless", "enameled", "anodized"], # "type": ["frypan", "saucepan", "chef knife", "utility", "mixing"] # } # }, # # ==================== APPLIANCES ==================== # "appliances": { # "products": ["microwave", "blender", "vacuum", "fan", "toaster", # "coffee maker", "iron", "hair dryer"], # "attributes": { # "color": COLORS, # "type": ["upright", "robot", "handheld", "ceiling", "table", "tower"], # "power": ["low", "medium", "high", "variable"], # "brand": BRANDS # } # }, # # ==================== BEAUTY & PERSONAL CARE ==================== # "beauty": { # "products": ["lipstick", "perfume", "lotion", "hair dryer", "makeup", # "skincare", "nail polish", "shampoo"], # "attributes": { # "color": COLORS, # "type": ["eau de parfum", "eau de toilette", "body spray", # "body lotion", "face cream"], # "finish": ["matte", "glossy", "satin", "shimmer"] # } # }, # # ==================== TOYS ==================== # "toys": { # "products": ["doll", "puzzle", "board game", "action figure", "plush toy", # "toy car", "lego", "building blocks"], # "attributes": { # "color": COLORS, # "age_group": AGE_GROUPS, # "material": ["plastic", "wood", "fabric", "metal", "foam"], # "type": ["educational", "plush", "action", "vehicle", "puzzle", "board game"], # "brand": BRANDS # } # }, # # ==================== SPORTS & OUTDOOR ==================== # "sports": { # "products": ["bicycle", "football", "basketball", "tennis racket", # "yoga mat", "helmet", "skateboard", "dumbbells", "ball"], # "attributes": { # "color": COLORS, # "material": ["steel", "aluminum", "carbon fiber", "rubber", # "leather", "synthetic", "foam", "composite"], # "sport_type": ["football", "basketball", "tennis", "cycling", # "yoga", "gym", "outdoor", "fitness"], # "type": ["mountain", "road", "hybrid", "bmx", "indoor", "outdoor"], # "brand": BRANDS # } # }, # # ==================== PET SUPPLIES ==================== # "pet_supplies": { # "products": ["pet bed", "pet toy", "leash", "pet bowl", "collar", # "pet carrier"], # "attributes": { # "color": COLORS, # "material": ["fabric", "plastic", "metal", "nylon", "leather"], # "size": ["small", "medium", "large", "extra large"] # } # }, # # ==================== BABY PRODUCTS ==================== # "baby": { # "products": ["stroller", "baby bottle", "diaper", "crib", "high chair", # "baby carrier"], # "attributes": { # "color": COLORS, # "material": MATERIALS, # "type": ["full-size", "umbrella", "jogging", "disposable", "cloth"], # "age_group": ["newborn", "baby", "toddler"] # } # }, # # ==================== TOOLS & HARDWARE ==================== # "tools": { # "products": ["hammer", "drill", "screwdriver", "wrench", "saw", # "pliers", "measuring tape", "level"], # "attributes": { # "material": ["steel", "aluminum", "plastic", "wood", "rubber", # "chrome", "fiberglass"], # "type": ["manual", "electric", "cordless", "corded", "pneumatic"], # "finish": ["chrome plated", "powder coated", "stainless steel"], # "brand": BRANDS # } # }, # # ==================== BOOKS & MEDIA ==================== # "books_media": { # "products": ["book", "magazine", "dvd", "video game", "cd", "vinyl"], # "attributes": { # "type": ["paperback", "hardcover", "ebook", "audiobook"], # "genre": ["fiction", "non-fiction", "educational", "kids", # "action", "adventure", "sports", "rpg"] # } # }, # # ==================== AUTOMOTIVE ==================== # "automotive": { # "products": ["car accessory", "tire", "car seat", "steering wheel cover", # "floor mat"], # "attributes": { # "color": COLORS, # "material": ["rubber", "plastic", "fabric", "leather", "vinyl"], # "type": ["universal", "custom fit"] # } # }, # # ==================== OFFICE SUPPLIES ==================== # "office": { # "products": ["pen", "notebook", "folder", "desk organizer", "stapler", # "calculator", "paper"], # "attributes": { # "color": COLORS, # "material": ["paper", "plastic", "metal", "cardboard"], # "type": ["ruled", "blank", "grid", "dot grid"] # } # }, # # ==================== GARDEN & OUTDOOR ==================== # "garden": { # "products": ["plant pot", "garden tool", "watering can", "planter", # "garden hose", "lawn mower"], # "attributes": { # "color": COLORS, # "material": ["ceramic", "plastic", "metal", "terracotta", "wood"], # "type": ["indoor", "outdoor", "hanging", "standing"] # } # } # } # # Attribute-specific confidence thresholds # CONFIDENCE_THRESHOLDS = { # "color": 0.20, # "pattern": 0.25, # "material": 0.30, # "style": 0.20, # "fit": 0.25, # "brand": 0.40, # "type": 0.22, # "finish": 0.28, # "neckline": 0.23, # "sleeve_type": 0.23 # } # def __init__(self): # pass # @classmethod # def _get_device(cls): # """Get optimal device.""" # if cls._device is None: # cls._device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # logger.info(f"Visual Processing using device: {cls._device}") # return cls._device # @classmethod # def _get_clip_model(cls): # """Lazy load CLIP model with class-level caching.""" # if cls._clip_model is None: # logger.info("Loading CLIP model (this may take a few minutes on first use)...") # cls._clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32") # cls._clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32") # device = cls._get_device() # cls._clip_model.to(device) # cls._clip_model.eval() # logger.info("✓ CLIP model loaded successfully") # return cls._clip_model, cls._clip_processor # def center_crop(self, image: Image.Image, rel_crop: float = 0.7) -> Image.Image: # """ # Center-crop to focus on the product area if there is too much background. # Args: # image: PIL Image # rel_crop: Relative crop size (0.7 = 70% of min dimension) # """ # w, h = image.size # side = int(min(w, h) * rel_crop) # left = (w - side) // 2 # top = (h - side) // 2 # return image.crop((left, top, left + side, top + side)) # def download_image(self, image_url: str, apply_crop: bool = False, # max_size: Tuple[int, int] = (1024, 1024)) -> Optional[Image.Image]: # """ # Download image from URL with optional preprocessing. # Args: # image_url: URL of the image # apply_crop: Whether to apply center crop # max_size: Maximum dimensions for resizing # """ # try: # response = requests.get(image_url, timeout=10) # response.raise_for_status() # image = Image.open(BytesIO(response.content)).convert('RGB') # # Resize if too large # if image.size[0] > max_size[0] or image.size[1] > max_size[1]: # image.thumbnail(max_size, Image.Resampling.LANCZOS) # # Optional center crop # if apply_crop: # image = self.center_crop(image, rel_crop=0.7) # return image # except Exception as e: # logger.error(f"Error downloading image from {image_url}: {str(e)}") # return None # def normalize_color(self, word: str) -> str: # """ # Enhanced color normalization with aliases and modifiers. # Args: # word: Color word to normalize # """ # w = word.lower().strip() # # Remove light/dark modifiers # w = w.replace("light ", "").replace("dark ", "") # w = w.replace("bright ", "").replace("pale ", "") # # Alias mapping # aliases = { # "grey": "gray", # "navy": "blue", # "navy blue": "blue", # "maroon": "red", # "crimson": "red", # "scarlet": "red", # "teal": "green", # "turquoise": "blue", # "cyan": "blue", # "indigo": "blue", # "violet": "purple", # "lavender": "purple", # "magenta": "pink", # "off white": "white", # "off-white": "white", # "cream": "beige", # "ivory": "white", # "khaki": "beige", # "tan": "brown", # "bronze": "brown", # "gold": "yellow", # "silver": "gray", # "charcoal": "gray" # } # normalized = aliases.get(w, w) # # Validate against canonical colors # if normalized not in [c.lower() for c in self.COLORS]: # # Try first word if it's a compound # first_word = normalized.split()[0] if ' ' in normalized else normalized # if first_word in [c.lower() for c in self.COLORS]: # return first_word # return normalized # def extract_dominant_colors(self, image: Image.Image, n_colors: int = 3) -> List[Dict]: # """Extract dominant colors using K-means clustering.""" # try: # # Resize for faster processing # img_small = image.resize((150, 150)) # img_array = np.array(img_small) # pixels = img_array.reshape(-1, 3) # # Sample if too many pixels # if len(pixels) > 10000: # indices = np.random.choice(len(pixels), 10000, replace=False) # pixels = pixels[indices] # # K-means clustering # kmeans = KMeans(n_clusters=n_colors, random_state=42, n_init=5, max_iter=100) # kmeans.fit(pixels) # colors = [] # labels_counts = np.bincount(kmeans.labels_) # for i, center in enumerate(kmeans.cluster_centers_): # rgb = tuple(center.astype(int)) # color_name = self._get_color_name_simple(rgb) # percentage = float(labels_counts[i] / len(kmeans.labels_) * 100) # colors.append({ # "name": color_name, # "rgb": rgb, # "percentage": percentage # }) # colors.sort(key=lambda x: x['percentage'], reverse=True) # return colors # except Exception as e: # logger.error(f"Error extracting colors: {str(e)}") # return [] # def _get_color_name_simple(self, rgb: Tuple[int, int, int]) -> str: # """Simple RGB to color name mapping.""" # r, g, b = rgb # # Define color ranges # colors = { # 'black': (r < 50 and g < 50 and b < 50), # 'white': (r > 200 and g > 200 and b > 200), # 'gray': (abs(r - g) < 30 and abs(g - b) < 30 and abs(r - b) < 30 and 50 <= r <= 200), # 'red': (r > 150 and g < 100 and b < 100), # 'green': (g > 150 and r < 100 and b < 100), # 'blue': (b > 150 and r < 100 and g < 100), # 'yellow': (r > 200 and g > 200 and b < 100), # 'orange': (r > 200 and 100 < g < 200 and b < 100), # 'purple': (r > 100 and b > 100 and g < 100), # 'pink': (r > 200 and 100 < g < 200 and 100 < b < 200), # 'brown': (50 < r < 150 and 30 < g < 100 and b < 80), # 'beige': (150 < r < 220 and 140 < g < 200 and 100 < b < 180), # } # for color_name, condition in colors.items(): # if condition: # return color_name # # Fallback based on dominant channel # if r > g and r > b: # return 'red' # elif g > r and g > b: # return 'green' # elif b > r and b > g: # return 'blue' # else: # return 'gray' # def classify_with_clip( # self, # image: Image.Image, # candidates: List[str], # attribute_name: str, # confidence_threshold: Optional[float] = None # ) -> Dict: # """ # Use CLIP to classify image against candidate labels. # Args: # image: PIL Image # candidates: List of text labels to classify against # attribute_name: Name of the attribute being classified # confidence_threshold: Override default threshold # """ # try: # model, processor = self._get_clip_model() # device = self._get_device() # # Use attribute-specific threshold if not provided # if confidence_threshold is None: # confidence_threshold = self.CONFIDENCE_THRESHOLDS.get(attribute_name, 0.20) # # Prepare inputs # inputs = processor( # text=candidates, # images=image, # return_tensors="pt", # padding=True # ) # # Move to device # inputs = {k: v.to(device) for k, v in inputs.items()} # # Get predictions # with torch.no_grad(): # outputs = model(**inputs) # logits_per_image = outputs.logits_per_image # probs = logits_per_image.softmax(dim=1).cpu() # # Get top predictions # top_k = min(3, len(candidates)) # top_probs, top_indices = torch.topk(probs[0], k=top_k) # results = [] # for prob, idx in zip(top_probs, top_indices): # if prob.item() > confidence_threshold: # value = candidates[idx.item()] # # Apply color normalization if color attribute # if attribute_name == "color": # value = self.normalize_color(value) # results.append({ # "value": value, # "confidence": float(prob.item()) # }) # return { # "attribute": attribute_name, # "predictions": results # } # except Exception as e: # logger.error(f"Error in CLIP classification for {attribute_name}: {str(e)}") # return {"attribute": attribute_name, "predictions": []} # def detect_category_hierarchical(self, image: Image.Image) -> Tuple[str, str, float]: # """ # Two-stage hierarchical product detection: # 1. Detect broad category # 2. Detect specific product within that category # Returns: # (category, product_type, confidence) # """ # # Stage 1: Detect broad category # category_names = list(self.CATEGORY_ATTRIBUTES.keys()) # category_labels = [f"a photo of {cat.replace('_', ' ')}" for cat in category_names] # category_result = self.classify_with_clip( # image, category_labels, "category_detection", confidence_threshold=0.15 # ) # if not category_result["predictions"]: # return "unknown", "unknown", 0.0 # # Extract category # best_category_match = category_result["predictions"][0] # detected_category = category_names[category_labels.index(best_category_match["value"])] # category_confidence = best_category_match["confidence"] # # Stage 2: Detect specific product within category # products_in_category = self.CATEGORY_ATTRIBUTES[detected_category]["products"] # product_labels = [f"a photo of a {p}" for p in products_in_category] # product_result = self.classify_with_clip( # image, product_labels, "product_detection", confidence_threshold=0.15 # ) # if product_result["predictions"]: # best_product = product_result["predictions"][0] # product_type = products_in_category[product_labels.index(best_product["value"])] # product_confidence = best_product["confidence"] # # Combined confidence (geometric mean for balance) # combined_confidence = (category_confidence * product_confidence) ** 0.5 # logger.info(f"Detected: {detected_category} → {product_type} (confidence: {combined_confidence:.3f})") # return detected_category, product_type, combined_confidence # return detected_category, "unknown", category_confidence * 0.5 # def detect_category_flat(self, image: Image.Image) -> Tuple[str, str, float]: # """ # Single-stage flat product detection across all categories. # Faster but potentially less accurate. # Returns: # (category, product_type, confidence) # """ # # Collect all products with their categories # all_products = [] # product_to_category = {} # for category, data in self.CATEGORY_ATTRIBUTES.items(): # for product in data["products"]: # label = f"a photo of a {product}" # all_products.append(label) # product_to_category[label] = category # # Classify # result = self.classify_with_clip( # image, all_products, "product_detection", confidence_threshold=0.15 # ) # if result["predictions"]: # best_match = result["predictions"][0] # product_label = best_match["value"] # category = product_to_category[product_label] # product_type = product_label.replace("a photo of a ", "") # confidence = best_match["confidence"] # logger.info(f"Detected: {category} → {product_type} (confidence: {confidence:.3f})") # return category, product_type, confidence # return "unknown", "unknown", 0.0 # def process_image( # self, # image_url: str, # product_type_hint: Optional[str] = None, # apply_crop: bool = False, # detection_mode: str = "hierarchical" # ) -> Dict: # """ # Main method to process image and extract visual attributes. # Args: # image_url: URL of the product image # product_type_hint: Optional hint about product type # apply_crop: Whether to apply center crop for better focus # detection_mode: "hierarchical" (slower, more accurate) or "flat" (faster) # """ # import time # start_time = time.time() # try: # # Download image # image = self.download_image(image_url, apply_crop=apply_crop) # if image is None: # return { # "visual_attributes": {}, # "error": "Failed to download image" # } # visual_attributes = {} # detailed_predictions = {} # # Step 1: Detect product category and type # if detection_mode == "hierarchical": # detected_category, detected_product_type, category_confidence = \ # self.detect_category_hierarchical(image) # else: # detected_category, detected_product_type, category_confidence = \ # self.detect_category_flat(image) # # If confidence is too low, return minimal info # if category_confidence < 0.12: # logger.warning(f"Low confidence ({category_confidence:.3f}). Returning basic attributes only.") # colors = self.extract_dominant_colors(image, n_colors=3) # if colors: # visual_attributes["primary_color"] = colors[0]["name"] # visual_attributes["color_palette"] = [c["name"] for c in colors] # return { # "visual_attributes": visual_attributes, # "category_confidence": category_confidence, # "processing_time": round(time.time() - start_time, 2), # "warning": "Low confidence detection" # } # # Add detected information # visual_attributes["product_type"] = detected_product_type # visual_attributes["category"] = detected_category # visual_attributes["detection_confidence"] = round(category_confidence, 3) # # Step 2: Extract universal color attribute # colors = self.extract_dominant_colors(image, n_colors=3) # if colors: # visual_attributes["primary_color"] = colors[0]["name"] # visual_attributes["color_palette"] = [c["name"] for c in colors] # visual_attributes["color_distribution"] = [ # {"name": c["name"], "percentage": round(c["percentage"], 1)} # for c in colors # ] # # Step 3: Extract category-specific attributes # if detected_category in self.CATEGORY_ATTRIBUTES: # category_config = self.CATEGORY_ATTRIBUTES[detected_category] # for attr_name, attr_values in category_config["attributes"].items(): # # Skip color since we already extracted it # if attr_name == "color": # continue # # Get attribute-specific threshold # threshold = self.CONFIDENCE_THRESHOLDS.get(attr_name, 0.20) # # Classify # result = self.classify_with_clip( # image, attr_values, attr_name, confidence_threshold=threshold # ) # detailed_predictions[attr_name] = result # # Only add if confidence is reasonable # if result["predictions"]: # best_prediction = result["predictions"][0] # if best_prediction["confidence"] > threshold: # visual_attributes[attr_name] = best_prediction["value"] # processing_time = time.time() - start_time # return { # "visual_attributes": visual_attributes, # "detailed_predictions": detailed_predictions, # "detection_confidence": round(category_confidence, 3), # "processing_time": round(processing_time, 2), # "metadata": { # "detection_mode": detection_mode, # "crop_applied": apply_crop, # "image_size": image.size # } # } # except Exception as e: # logger.error(f"Error processing image: {str(e)}") # import traceback # traceback.print_exc() # return { # "visual_attributes": {}, # "error": str(e), # "processing_time": round(time.time() - start_time, 2) # } # def batch_process_images( # self, # image_urls: List[str], # detection_mode: str = "flat" # ) -> List[Dict]: # """ # Process multiple images in batch. # Args: # image_urls: List of image URLs # detection_mode: Detection mode to use # """ # results = [] # for i, url in enumerate(image_urls): # logger.info(f"Processing image {i+1}/{len(image_urls)}: {url}") # result = self.process_image(url, detection_mode=detection_mode) # results.append(result) # return results # @classmethod # def cleanup_models(cls): # """Free up memory by unloading models.""" # if cls._clip_model is not None: # del cls._clip_model # del cls._clip_processor # cls._clip_model = None # cls._clip_processor = None # if torch.cuda.is_available(): # torch.cuda.empty_cache() # logger.info("Models unloaded and memory freed") # def get_supported_categories(self) -> List[str]: # """Get list of all supported product categories.""" # return list(self.CATEGORY_ATTRIBUTES.keys()) # def get_category_products(self, category: str) -> List[str]: # """Get list of products in a specific category.""" # return self.CATEGORY_ATTRIBUTES.get(category, {}).get("products", []) # def get_category_attributes(self, category: str) -> Dict[str, List[str]]: # """Get attribute schema for a specific category.""" # return self.CATEGORY_ATTRIBUTES.get(category, {}).get("attributes", {}) # def get_statistics(self) -> Dict: # """Get statistics about the taxonomy.""" # total_products = sum( # len(data["products"]) # for data in self.CATEGORY_ATTRIBUTES.values() # ) # total_attributes = sum( # len(data["attributes"]) # for data in self.CATEGORY_ATTRIBUTES.values() # ) # return { # "total_categories": len(self.CATEGORY_ATTRIBUTES), # "total_products": total_products, # "total_unique_attributes": len(set( # attr # for data in self.CATEGORY_ATTRIBUTES.values() # for attr in data["attributes"].keys() # )), # "categories": list(self.CATEGORY_ATTRIBUTES.keys()) # } # # ==================== USAGE EXAMPLES ==================== # def example_basic_usage(): # """Basic usage example.""" # print("=== Basic Usage Example ===\n") # # Initialize service # service = VisualProcessingService() # # Process single image (hierarchical mode - more accurate) # result = service.process_image( # "https://example.com/product.jpg", # detection_mode="hierarchical" # ) # print("Product Type:", result["visual_attributes"].get("product_type")) # print("Category:", result["visual_attributes"].get("category")) # print("Primary Color:", result["visual_attributes"].get("primary_color")) # print("Detection Confidence:", result.get("detection_confidence")) # print("Processing Time:", result["processing_time"], "seconds") # print("\nAll Attributes:") # for key, value in result["visual_attributes"].items(): # print(f" {key}: {value}") # def example_fast_mode(): # """Fast processing mode example.""" # print("\n=== Fast Mode Example ===\n") # service = VisualProcessingService() # # Fast mode (flat detection) # result = service.process_image( # "https://example.com/product.jpg", # detection_mode="flat" # Faster, single-stage detection # ) # print("Processing Time:", result["processing_time"], "seconds") # print("Detected:", result["visual_attributes"]) # def example_with_cropping(): # """Example with center cropping for busy backgrounds.""" # print("\n=== With Center Cropping ===\n") # service = VisualProcessingService() # # Apply center crop to focus on product # result = service.process_image( # "https://example.com/product-with-background.jpg", # apply_crop=True, # Enable center cropping # detection_mode="hierarchical" # ) # print("Crop Applied:", result["metadata"]["crop_applied"]) # print("Detected:", result["visual_attributes"]) # def example_batch_processing(): # """Batch processing example.""" # print("\n=== Batch Processing ===\n") # service = VisualProcessingService() # image_urls = [ # "https://example.com/product1.jpg", # "https://example.com/product2.jpg", # "https://example.com/product3.jpg" # ] # results = service.batch_process_images(image_urls, detection_mode="flat") # for i, result in enumerate(results): # print(f"\nProduct {i+1}:") # print(f" Type: {result['visual_attributes'].get('product_type')}") # print(f" Category: {result['visual_attributes'].get('category')}") # print(f" Time: {result['processing_time']}s") # def example_category_info(): # """Get information about supported categories.""" # print("\n=== Category Information ===\n") # service = VisualProcessingService() # # Get statistics # stats = service.get_statistics() # print("Statistics:") # print(f" Total Categories: {stats['total_categories']}") # print(f" Total Products: {stats['total_products']}") # print(f" Unique Attributes: {stats['total_unique_attributes']}") # # Get all categories # categories = service.get_supported_categories() # print(f"\nSupported Categories ({len(categories)}):") # for cat in categories: # products = service.get_category_products(cat) # print(f" {cat}: {len(products)} products") # # Get attributes for a specific category # print("\nClothing Category Attributes:") # clothing_attrs = service.get_category_attributes("clothing") # for attr, values in clothing_attrs.items(): # print(f" {attr}: {len(values)} options") # def example_detailed_predictions(): # """Example showing detailed predictions with confidence scores.""" # print("\n=== Detailed Predictions ===\n") # service = VisualProcessingService() # result = service.process_image( # "https://example.com/product.jpg", # detection_mode="hierarchical" # ) # print("Visual Attributes (Best Predictions):") # for key, value in result["visual_attributes"].items(): # print(f" {key}: {value}") # print("\nDetailed Predictions (Top 3 for each attribute):") # for attr_name, predictions in result.get("detailed_predictions", {}).items(): # print(f"\n {attr_name}:") # for pred in predictions.get("predictions", []): # print(f" - {pred['value']}: {pred['confidence']:.3f}") # def example_color_distribution(): # """Example showing color palette extraction.""" # print("\n=== Color Distribution ===\n") # service = VisualProcessingService() # result = service.process_image("https://example.com/product.jpg") # print("Primary Color:", result["visual_attributes"].get("primary_color")) # print("\nColor Palette:") # for color in result["visual_attributes"].get("color_palette", []): # print(f" - {color}") # print("\nColor Distribution:") # for color_info in result["visual_attributes"].get("color_distribution", []): # print(f" {color_info['name']}: {color_info['percentage']}%") # def example_error_handling(): # """Example showing error handling.""" # print("\n=== Error Handling ===\n") # service = VisualProcessingService() # # Invalid URL # result = service.process_image("https://invalid-url.com/nonexistent.jpg") # if "error" in result: # print("Error occurred:", result["error"]) # else: # print("Processing successful") # # Low confidence warning # result = service.process_image("https://example.com/ambiguous-product.jpg") # if "warning" in result: # print("Warning:", result["warning"]) # print("Confidence:", result.get("category_confidence")) # def example_cleanup(): # """Example showing model cleanup.""" # print("\n=== Model Cleanup ===\n") # service = VisualProcessingService() # # Process some images # result = service.process_image("https://example.com/product.jpg") # print("Processed successfully") # # Clean up models when done (frees memory) # VisualProcessingService.cleanup_models() # print("Models cleaned up and memory freed") # # ==================== PRODUCTION USAGE ==================== # def production_example(): # """ # Production-ready example with proper error handling and logging. # """ # import logging # # Setup logging # logging.basicConfig( # level=logging.INFO, # format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' # ) # service = VisualProcessingService() # def process_product_image(image_url: str, product_id: str) -> Dict: # """ # Process a product image with full error handling. # """ # try: # # Process with hierarchical mode for best accuracy # result = service.process_image( # image_url, # detection_mode="hierarchical", # apply_crop=False # Set True if images have busy backgrounds # ) # # Check for errors # if "error" in result: # logger.error(f"Failed to process {product_id}: {result['error']}") # return { # "product_id": product_id, # "status": "error", # "error": result["error"] # } # # Check confidence # confidence = result.get("detection_confidence", 0) # if confidence < 0.15: # logger.warning(f"Low confidence for {product_id}: {confidence}") # return { # "product_id": product_id, # "status": "low_confidence", # "confidence": confidence, # "partial_attributes": result["visual_attributes"] # } # # Success # return { # "product_id": product_id, # "status": "success", # "attributes": result["visual_attributes"], # "confidence": confidence, # "processing_time": result["processing_time"] # } # except Exception as e: # logger.exception(f"Unexpected error processing {product_id}") # return { # "product_id": product_id, # "status": "exception", # "error": str(e) # } # # Process products # products = [ # {"id": "PROD001", "image_url": "https://example.com/tshirt.jpg"}, # {"id": "PROD002", "image_url": "https://example.com/laptop.jpg"}, # {"id": "PROD003", "image_url": "https://example.com/chair.jpg"} # ] # results = [] # for product in products: # result = process_product_image(product["image_url"], product["id"]) # results.append(result) # # Print summary # if result["status"] == "success": # attrs = result["attributes"] # print(f"\n✓ {product['id']} ({result['processing_time']}s):") # print(f" Type: {attrs.get('product_type')}") # print(f" Category: {attrs.get('category')}") # print(f" Color: {attrs.get('primary_color')}") # else: # print(f"\n✗ {product['id']}: {result['status']}") # return results # # ==================== MAIN ==================== # if __name__ == "__main__": # # Run examples # print("Enhanced Visual Processing Service") # print("=" * 60) # # Show statistics # service = VisualProcessingService() # stats = service.get_statistics() # print(f"\nTaxonomy Coverage:") # print(f" Categories: {stats['total_categories']}") # print(f" Products: {stats['total_products']}") # print(f" Attributes: {stats['total_unique_attributes']}") # print("\n" + "=" * 60) # print("Run individual examples by calling the example functions:") # print(" - example_basic_usage()") # print(" - example_fast_mode()") # print(" - example_with_cropping()") # print(" - example_batch_processing()") # print(" - example_category_info()") # print(" - example_detailed_predictions()") # print(" - example_color_distribution()") # print(" - production_example()") # print("=" * 60)