# ################## EDITING PROMPT OF ABOVE VERSION ONLY ################# # import json # import hashlib # import logging # import time # from functools import wraps # from typing import Dict, List, Optional, Tuple # import requests # from django.conf import settings # from .llm_load_balancer import call_llm_with_load_balancer # from .cache_config import ( # is_caching_enabled, # ENABLE_ATTRIBUTE_EXTRACTION_CACHE, # ATTRIBUTE_CACHE_MAX_SIZE, # ) # logger = logging.getLogger(__name__) # # --------------------------------------------------------------------------- # # # CACHES # # --------------------------------------------------------------------------- # # class SimpleCache: # _cache = {} # _max_size = ATTRIBUTE_CACHE_MAX_SIZE # @classmethod # def get(cls, key: str) -> Optional[Dict]: # if not ENABLE_ATTRIBUTE_EXTRACTION_CACHE: return None # return cls._cache.get(key) # @classmethod # def set(cls, key: str, value: Dict): # if not ENABLE_ATTRIBUTE_EXTRACTION_CACHE: return # if len(cls._cache) >= cls._max_size: # items = list(cls._cache.items()) # cls._cache = dict(items[int(cls._max_size * 0.2):]) # cls._cache[key] = value # @classmethod # def clear(cls): cls._cache.clear() # @classmethod # def get_stats(cls) -> Dict: # return { # "enabled": ENABLE_ATTRIBUTE_EXTRACTION_CACHE, # "size": len(cls._cache), # "max_size": cls._max_size, # "usage_percent": round(len(cls._cache)/cls._max_size*100, 2) if cls._max_size else 0 # } # # --------------------------------------------------------------------------- # # # RETRY DECORATOR # # --------------------------------------------------------------------------- # # def retry(max_attempts=3, delay=1.0): # def decorator(f): # @wraps(f) # def wrapper(*args, **kwargs): # last_exc = None # for i in range(max_attempts): # try: # return f(*args, **kwargs) # except Exception as e: # last_exc = e # if i < max_attempts - 1: # wait = delay * (2 ** i) # logger.warning(f"Retry {i+1}/{max_attempts} after {wait}s: {e}") # time.sleep(wait) # raise last_exc or RuntimeError("Retry failed") # return wrapper # return decorator # # --------------------------------------------------------------------------- # # # MAIN SERVICE # # --------------------------------------------------------------------------- # # class ProductAttributeService: # @staticmethod # def combine_product_text(title=None, short_desc=None, long_desc=None, ocr_text=None) -> Tuple[str, Dict[str, str]]: # parts = [] # source_map = {} # if title: # t = str(title).strip() # parts.append(f"Title: {t}") # source_map["title"] = t # if short_desc: # s = str(short_desc).strip() # parts.append(f"Description: {s}") # source_map["short_desc"] = s # if long_desc: # l = str(long_desc).strip() # parts.append(f"Details: {l}") # source_map["long_desc"] = l # if ocr_text: # parts.append(f"OCR Text: {ocr_text}") # source_map["ocr_text"] = ocr_text # combined = "\n".join(parts).strip() # return (combined or "No product information", source_map) # @staticmethod # def _cache_key(product_text: str, mandatory_attrs: Dict, extract_additional: bool, multiple: List[str], user_values: Dict = None) -> str: # payload = { # "text": product_text, # "attrs": mandatory_attrs, # "extra": extract_additional, # "multiple": sorted(multiple), # "user_values": user_values or {} # } # return f"attr_{hashlib.md5(json.dumps(payload, sort_keys=True).encode()).hexdigest()}" # # @staticmethod # # def _clean_json(text: str) -> str: # # start = text.find("{") # # end = text.rfind("}") + 1 # # if start != -1 and end > start: # # text = text[start:end] # # if "```json" in text: # # text = text.split("```json", 1)[1].split("```", 1)[0] # # elif "```" in text: # # text = text.split("```", 1)[1].split("```", 1)[0] # # if text.lstrip().startswith("json"): text = text[4:] # # return text.strip() # @staticmethod # def _clean_json(text: str) -> str: # text = text.strip() # # Extract JSON block if wrapped in ```json or ``` # if "```json" in text: # text = text.split("```json", 1)[1].split("```", 1)[0] # elif "```" in text: # parts = text.split("```", 2) # if len(parts) > 1: # text = parts[1] # # Find first { and last } # start = text.find("{") # end = text.rfind("}") + 1 # if start == -1 or end <= start: # raise ValueError("No JSON object found in LLM response") # text = text[start:end] # return text.strip() # @staticmethod # def format_visual_attributes(visual_attributes: Dict) -> Dict: # formatted = {} # for key, value in visual_attributes.items(): # if isinstance(value, list): # formatted[key] = [{"value": str(item), "source": "image"} for item in value] # elif isinstance(value, dict): # nested = {} # for sub_key, sub_val in value.items(): # if isinstance(sub_val, list): # nested[sub_key] = [{"value": str(v), "source": "image"} for v in sub_val] # else: # nested[sub_key] = [{"value": str(sub_val), "source": "image"}] # formatted[key] = nested # else: # formatted[key] = [{"value": str(value), "source": "image"}] # return formatted # # @staticmethod # # @retry(max_attempts=3, delay=1.0) # # def _call_llm(payload: dict) -> str: # # headers = {"Authorization": f"Bearer {settings.GROQ_API_KEY}", "Content-Type": "application/json"} # # resp = requests.post(settings.GROQ_API_URL, headers=headers, json=payload, timeout=30) # # resp.raise_for_status() # # return resp.json()["choices"][0]["message"]["content"] # # At the top of services.py, add this import # # from . import call_llm_with_load_balancer, get_load_balancer_stats # # Replace the existing _call_llm method with this: # @staticmethod # @retry(max_attempts=3, delay=3.0) # def _call_llm(payload: dict) -> str: # """ # Call LLM using load balancer with multiple API keys # Automatically handles rate limiting and failover # """ # return call_llm_with_load_balancer(payload) # @staticmethod # def extract_attributes( # product_text: str, # mandatory_attrs: Dict[str, List[str]], # source_map: Dict[str, str] = None, # model: str = None, # extract_additional: bool = True, # multiple: Optional[List[str]] = None, # use_cache: Optional[bool] = None, # user_entered_values: Optional[Dict[str, str]] = None, # NEW PARAMETER # ) -> dict: # if model is None: model = settings.SUPPORTED_MODELS[0] # if multiple is None: multiple = [] # if source_map is None: source_map = {} # if user_entered_values is None: user_entered_values = {} # if use_cache is None: use_cache = ENABLE_ATTRIBUTE_EXTRACTION_CACHE # if not is_caching_enabled(): use_cache = False # cache_key = None # if use_cache: # cache_key = ProductAttributeService._cache_key( # product_text, mandatory_attrs, extract_additional, multiple, user_entered_values # ) # cached = SimpleCache.get(cache_key) # if cached: # logger.info(f"CACHE HIT {cache_key[:16]}...") # return cached # # --------------------------- BUILD USER VALUES SECTION --------------------------- # user_values_section = "" # if user_entered_values: # user_lines = [] # for attr, value in user_entered_values.items(): # user_lines.append(f" - {attr}: {value}") # user_values_section = f""" # USER MANUALLY ENTERED VALUES: # {chr(10).join(user_lines)} # IMPORTANT INSTRUCTIONS FOR USER VALUES: # 1. Choose the BEST value (could be user's value, or from allowed list, or inferred) # 2. Always provide a "reason" field explaining your decision. Your reason should be valid and from the product text. Not always exact word to be matched from the product text, you can infer understanding the product text. # 3. DO NOT hallucinate - be honest if user's value seems wrong based on product evidence # 4. If user's value is not in the allowed list but seems correct, chose the most nearest value from the allowed list with proper reasoning why it chose it. Also in this case give the most apt value that should be added in the possible list in the reason so that user can edit it later. # """ # # --------------------------- PROMPT --------------------------- # allowed_lines = [f"{attr}: {', '.join(vals)}" for attr, vals in mandatory_attrs.items()] # allowed_text = "\n".join(allowed_lines) # print(f"allowed text is {allowed_text}") # allowed_sources = list(source_map.keys()) + ["title", "description", "inferred"] # print(f"allowed_sources is {allowed_sources}") # source_hint = "|".join(allowed_sources) # multiple_text = f"\nMULTIPLE ALLOWED FOR: {', '.join(multiple)}" if multiple else "" # if extract_additional: # additional_instructions = """ # For the 'additional' section, identify any other important product attributes and their values (e.g., 'Color', 'Material', 'Weight' etc according to the product text) that are present in the PRODUCT TEXT but not in the Mandatory Attribute list. # For each additional attribute, use the best available value from the PRODUCT TEXT and specify the 'source'. # Strictly Extract other key attributes other than mandatory attributes from the text. # """ # output_example_additional = """ # "additional": { # "Additional_Attr_1": [{ # "value": "Value 1", # "source": "<{source_hint}>", # "reason": "Why this attribute and value were identified" # }] # } # """ # else: # additional_instructions = """ # Do not identify or include any additional attributes. The 'additional' section must be an empty object {}. # """ # output_example_additional = ' "additional": {}' # prompt = f""" # You are an expert product attribute extraction system. Your task is to classify products into predefined categories with 100% accuracy. # CORE RULES - FOLLOW STRICTLY: # ***CRITICAL T-SHIRT RULE – ALWAYS APPLY FIRST*** # For ANY generic short-sleeve or long-sleeve T-shirt that is pulled over the head # (no zipper, no buttons, no special silhouette mentioned) you **MUST** classify # style_clothing_top as "Pullover". This overrides everything else for basic tees. # Examples: crew neck tee, v-neck tee, graphic tee, plain tee → ALWAYS Pullover. # 1. You MUST ONLY use values from the ALLOWED VALUES lists provided below # 2. NEVER invent, create, or infer values that don't exist in the allowed lists # 3. NEVER use synonyms, variations, or similar words - ONLY exact matches from the lists # 4. Each value you return MUST be an EXACT COPY from the allowed list (case-sensitive) # 5. If you cannot find a perfect match, choose the CLOSEST semantic match from the allowed list # 6. When unsure, default to the most generic option from the allowed list # 7. CRITICAL GUARDRAIL: NEVER use a value allowed for one attribute (e.g., T-Shirts from t_shirt_type) as the value for a different attribute (e.g., style_clothing_top). The lists are entirely separate. # 8. NEVER invent, create, or infer values that don't exist in the allowed lists. # 9. Special Instruction for T-Shirts: If the product text describes a generic T-shirt (Crew Neck, V-Neck, etc.) which is a basic shirt worn by pulling it over the head, you MUST use the value Pullover. This is the most appropriate generic style available in this specific list. # ===================== # ATTRIBUTE DEFINITIONS + ALLOWED VALUES # ===================== # 1️⃣ **style_clothing_top** (Mandatory) # Defines the overall silhouette or construction of the top. # Allowed values + definitions: # - Bandeau: Strapless, tight top covering just bust. # - Blouse: Loose, dressy top, usually with sleeves. # - Camisole: Thin-strapped sleeveless lightweight top. # - Chemise: Straight, loose unshaped top. # - Cocoon: Rounded, oversized body silhouette. # - Corset: Structured top shaping waist with boning. # - Crop: Short top exposing midriff above waist. # - Cutout: Top with intentionally open fabric areas. # - Duster: Long open-front flowy layer. # - Flounce: Top featuring decorative ruffles. # - Full Zip: Zipper opening entire length front. # - Guide: Top with reference markers or functional guides. # - Half Zip: Zipper halfway down front. # - High-Low: Back hem longer than front. # - Hoodie: Top with a hood attached. # - Muscle: Sleeveless with wide armholes, athletic. # - Peasant: Loose boho style with gathered neckline/sleeves. # - Peplum: Fitted top with flared waist ruffle. # - Pullover: Worn by pulling over head; no front opening. # - Quarter Zip: Short zipper from collar ~¼ length down. # - Raglan: Sleeves extend to collar with diagonal seams. # - Ringer: Contrast colored sleeve cuffs + neckline band. # - Rugby: Thick striped or solid collared sport-style top. # - Smocked: Gathered elastic shirring for stretch texture. # - Swing: A-line flare from bust downward. # - Torsette: Corset-like, bust exposed for layering. # - Tube: Strapless elongated top; longer than bandeau. # - Zip-Up: Top with zipper closure (partial/full). # 2️⃣ **shirt_neck_style** (Mandatory) # Describes the neckline’s shape and construction. # Allowed values + definitions: # - Boat Neck: Wide neckline shoulder-to-shoulder. # - Caged Neck: Multiple straps forming cage-like design. # - Choker Neck: Tight high neck like choker band. # - Collared: Fold-over collar shirt/polo style. # - Cowl Neck: Draped soft neckline folds. # - Crew Neck: Close-fitting round classic neckline. # - Deep V-Neck: Deep V shape below chest level. # - Drape Neck: Softly draped neckline less than cowl. # - Funnel Neck: Short stand-up collar not folded. # - Halter: Straps around neck leaving shoulders bare. # - Henley: Round neck with button placket. # - High Neck: More neck coverage, no fold. # - Hooded: Neck includes a hood. # - Jewel Neck: High round neck at base of throat. # - Keyhole Neck: Slit/hole opening at neckline. # - Lace Neckline: Lace material used around neckline. # - Mock Neck: Short raised collar, not folded. # - Notch Neck: Small V cut in round neckline. # - Open: General wide/open neckline shape. # - Plunge: Very deep V/U revealing cleavage. # - Roll Neck: Loose rolled turtleneck style. # - Round Neck: Standard circular neckline. # - Round Neckline: Same as Round Neck. # - Scoop Neck: Wide deep U-shaped neckline. # - Scrunch Neck: Gathered/scrunched fabric at neckline. # - Slit Neck: Small vertical slit opening at front. # - Square Neckline: Straight edged square neck. # - Sweetheart: Heart-shaped neckline contour. # - Tie Neck: Ties or bow at neckline. # - Turtleneck: High folded collar covering full neck. # - V-Neck: V-shaped neckline (not deep). # - Wide Neck: Broad neckline toward shoulders. # - Zip Mock Neck: Mock neck with zipper. # 3️⃣ **t_shirt_type** (Mandatory) # Defines the category/design purpose of the T-shirt. # Allowed values + definitions: # - Babydoll T-Shirt: Feminine fitted shirt with flared hem. # - Classic T-Shirt: Standard basic crew tee. # - Graphic Tees: T-shirts featuring printed graphics. # - Pocket Tee: T-shirt having a chest pocket. # - T-Shirts: General type when no specific style is clear. # - Tank Tops: Sleeveless shirts with shoulder straps. # ===================== # MANDATORY ATTRIBUTE RULES # ===================== # MULTI-VALUE ATTRIBUTES (Return exactly 2 most relevant values ONLY): # {multiple_text} # SINGLE-VALUE ATTRIBUTES (Return exactly 1 value for all other attributes) # ===================== # INPUT PRODUCT DETAILS # ===================== # PRODUCT INFORMATION TO ANALYZE: # {product_text} # {additional_instructions} # ===================== # EXTRACTION STRATEGY # ===================== # Step 1: Read product text carefully # Step 2: Identify strong style/type/neck indicators # Step 3: Choose closest exact match from allowed list # Step 4: VALIDATE chosen values exist in allowed list # ===================== # FORBIDDEN ACTIONS # ===================== # :x: NEVER invent values # :x: NEVER modify spelling/capitalization # :x: NEVER use non-allowed synonyms # :x: NEVER return attributes outside allowed lists # :x: NEVER use descriptive words like “short sleeve”, “women top”, “graphic print” unless EXACT in list # ===================== # OUTPUT FORMAT # ===================== # Return ONLY this JSON structure — no markdown, no explanation: # {{ # "mandatory": {{ # "style_clothing_top": [{{ # "value": "", # "source": "{source_hint}", # "reason": "Explain specific mapping from product text" # }}], # "shirt_neck_style": [{{ # "value": "", # "source": "{source_hint}", # "reason": "Explain specific mapping from product text" # }}], # "t_shirt_type": [{{ # "value": "", # "source": "{source_hint}", # "reason": "Explain specific mapping from product text" # }}] # }}, # {output_example_additional} # }} # ===================== # FINAL VALIDATION BEFORE RESPONDING # ===================== # ✓ All values EXACT from allowed list # ✓ Multi-value attributes have exactly 2 values # ✓ Single-value attributes have exactly 1 value # ✓ “source” must be one of: {source_hint} # ✓ Reasons clearly explain mapping # ✓ Pure JSON — no markdown wrapper # """ # print(f"Prompt to the llm is: {prompt}") # payload = { # "model": model, # "messages": [ # {"role": "system", "content": "You are a JSON-only extractor and validator. Always provide clear reasoning for your decisions."}, # {"role": "user", "content": prompt}, # ], # "temperature": 0.0, # "max_tokens": 2000, # Increased for reasoning # } # try: # raw = ProductAttributeService._call_llm(payload) # logger.info("Raw LLM response received") # print(raw) # cleaned = ProductAttributeService._clean_json(raw) # parsed = json.loads(cleaned) # except Exception as exc: # logger.error(f"LLM failed: {exc}") # return { # "mandatory": { # a: [{ # "value": "Not Specified", # "source": "llm_error", # "reason": f"LLM processing failed: {str(exc)}" # }] for a in mandatory_attrs # }, # "additional": {} if not extract_additional else {}, # "error": str(exc) # } # if use_cache and cache_key: # SimpleCache.set(cache_key, parsed) # logger.info(f"CACHE SET {cache_key[:16]}...") # return parsed # @staticmethod # def get_cache_stats() -> Dict: # return { # "global_enabled": is_caching_enabled(), # "result_cache": SimpleCache.get_stats(), # } # @staticmethod # def clear_all_caches(): # SimpleCache.clear() # logger.info("All caches cleared") # # IMPORTANT INSTRUCTIONS FOR USER VALUES: # # 1. Compare the user-entered value with what you find in the product text # # 2. Evaluate if the user value is correct, partially correct, or incorrect for this product # # 3. Choose the BEST value (could be user's value, or from allowed list, or inferred) # # 4. Always provide a "reason" field explaining your decision # # 5. DO NOT hallucinate - be honest if user's value seems wrong based on product evidence # # 6. If user's value is not in the allowed list but seems correct, chose the most nearest value from the allowed list with proper reasoning. import json import hashlib import logging import time import difflib import re from functools import wraps from typing import Dict, List, Optional, Tuple import requests from django.conf import settings from .llm_load_balancer import call_llm_with_load_balancer from .cache_config import ( is_caching_enabled, ENABLE_ATTRIBUTE_EXTRACTION_CACHE, ATTRIBUTE_CACHE_MAX_SIZE, ) logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # # CACHES # --------------------------------------------------------------------------- # class SimpleCache: _cache = {} _max_size = ATTRIBUTE_CACHE_MAX_SIZE @classmethod def get(cls, key: str) -> Optional[Dict]: if not ENABLE_ATTRIBUTE_EXTRACTION_CACHE: return None return cls._cache.get(key) @classmethod def set(cls, key: str, value: Dict): if not ENABLE_ATTRIBUTE_EXTRACTION_CACHE: return if len(cls._cache) >= cls._max_size: items = list(cls._cache.items()) cls._cache = dict(items[int(cls._max_size * 0.2):]) cls._cache[key] = value @classmethod def clear(cls): cls._cache.clear() @classmethod def get_stats(cls) -> Dict: return { "enabled": ENABLE_ATTRIBUTE_EXTRACTION_CACHE, "size": len(cls._cache), "max_size": cls._max_size, "usage_percent": round(len(cls._cache)/cls._max_size*100, 2) if cls._max_size else 0 } # --------------------------------------------------------------------------- # # RETRY DECORATOR # --------------------------------------------------------------------------- # def retry(max_attempts=3, delay=0.5): def decorator(f): @wraps(f) def wrapper(*args, **kwargs): last_exc = None for i in range(max_attempts): try: return f(*args, **kwargs) except Exception as e: last_exc = e if i < max_attempts - 1: wait = delay * (2 ** i) logger.warning(f"Retry {i+1}/{max_attempts} after {wait}s: {e}") time.sleep(wait) raise last_exc or RuntimeError("Retry failed") return wrapper return decorator # --------------------------------------------------------------------------- # # MAIN SERVICE # --------------------------------------------------------------------------- # class ProductAttributeService: @staticmethod def combine_product_text(title=None, short_desc=None, long_desc=None, ocr_text=None) -> Tuple[str, Dict[str, str]]: parts = [] source_map = {} if title: t = str(title).strip() parts.append(f"Title: {t}") source_map["title"] = t if short_desc: s = str(short_desc).strip() parts.append(f"Description: {s}") source_map["short_desc"] = s if long_desc: l = str(long_desc).strip() parts.append(f"Details: {l}") source_map["long_desc"] = l if ocr_text: parts.append(f"OCR Text: {ocr_text}") source_map["ocr_text"] = ocr_text combined = "\n".join(parts).strip() return (combined or "No product information", source_map) @staticmethod def _cache_key(product_text: str, mandatory_attrs: Dict, extract_additional: bool, multiple: List[str], user_values: Dict = None) -> str: payload = { "text": product_text, "attrs": mandatory_attrs, "extra": extract_additional, "multiple": sorted(multiple), "user_values": user_values or {} } return f"attr_{hashlib.md5(json.dumps(payload, sort_keys=True).encode()).hexdigest()}" @staticmethod def _clean_json(text: str) -> str: text = text.strip() if "```json" in text: text = text.split("```json", 1)[1].split("```", 1)[0] elif "```" in text: parts = text.split("```", 2) if len(parts) > 1: text = parts[1] start = text.find("{") end = text.rfind("}") + 1 if start == -1 or end <= start: raise ValueError("No JSON object found in LLM response") text = text[start:end] return text.strip() @staticmethod def _find_best_match(value: str, allowed_list: List[str]) -> Optional[str]: if not value or not allowed_list: return None value_lower = value.lower() # 1. Exact match for allowed in allowed_list: if allowed.lower() == value_lower: return allowed # 2. Substring match if len(value_lower) > 3: for allowed in allowed_list: if value_lower in allowed.lower() or allowed.lower() in value_lower: return allowed # 3. Fuzzy match matches = difflib.get_close_matches(value, allowed_list, n=1, cutoff=0.5) if matches: return matches[0] return None @staticmethod def format_visual_attributes(visual_attributes: Dict) -> Dict: formatted = {} for key, value in visual_attributes.items(): if isinstance(value, list): formatted[key] = [{"value": str(item), "source": "image"} for item in value] elif isinstance(value, dict): nested = {} for sub_key, sub_val in value.items(): if isinstance(sub_val, list): nested[sub_key] = [{"value": str(v), "source": "image"} for v in sub_val] else: nested[sub_key] = [{"value": str(sub_val), "source": "image"}] formatted[key] = nested else: formatted[key] = [{"value": str(value), "source": "image"}] return formatted @staticmethod @retry(max_attempts=3, delay=0.5) def _call_llm(payload: dict) -> str: return call_llm_with_load_balancer(payload) @staticmethod def extract_attributes( product_text: str, mandatory_attrs: Dict[str, List[str]], source_map: Dict[str, str] = None, model: str = None, extract_additional: bool = True, multiple: Optional[List[str]] = None, use_cache: Optional[bool] = None, user_entered_values: Optional[Dict[str, str]] = None, ) -> dict: if model is None: model = settings.SUPPORTED_MODELS[0] if multiple is None: multiple = [] if source_map is None: source_map = {} if user_entered_values is None: user_entered_values = {} if use_cache is None: use_cache = ENABLE_ATTRIBUTE_EXTRACTION_CACHE if not is_caching_enabled(): use_cache = False cache_key = None if use_cache: cache_key = ProductAttributeService._cache_key( product_text, mandatory_attrs, extract_additional, multiple, user_entered_values ) cached = SimpleCache.get(cache_key) if cached: logger.info(f"CACHE HIT {cache_key[:16]}...") return cached # --------------------------- PREPARE DATA --------------------------- allowed_lines = [f"{attr}: {', '.join(vals)}" for attr, vals in mandatory_attrs.items()] allowed_text_dynamic = "\n".join(allowed_lines) requested_keys = list(mandatory_attrs.keys()) product_text_lower = product_text.lower() allowed_sources = list(source_map.keys()) + ["title", "description", "inferred"] source_hint = "|".join(allowed_sources) multiple_text = f"\nMULTIPLE ALLOWED FOR: {', '.join(multiple)}" if multiple else "" user_values_section = "" if user_entered_values: user_lines = [f" - {attr}: {value}" for attr, value in user_entered_values.items()] user_values_section = f"USER MANUALLY ENTERED VALUES:\n{chr(10).join(user_lines)}\nIMPORTANT: Validate user values against product text. If correct, select them." if extract_additional: additional_instructions = "For the 'additional' section, identify other key attributes." output_example_additional = """ "additional": { "Attribute_Name": [{"value": "Extracted Value", "source": "", "reason": "Brief reason"}] }""" else: additional_instructions = "The 'additional' section must be an empty object {}." output_example_additional = ' "additional": {}' # --------------------------- PROMPT SELECTION --------------------------- keys_set = set(requested_keys) # 1. TABLE LAMPS if any(k in keys_set for k in ["lamp_type", "switch_type", "power_source", "Recommended_Room", "Age_Group"]): logger.info("Selecting TABLE LAMP prompt.") definitions_block = """ === TABLE LAMP ATTRIBUTE DEFINITIONS === 1. **lamp_type**: 'Stick', 'Table', 'Novelty', 'Desk/Task', 'Tiffany'. 2. **switch_type**: 'Pull Chain', 'Rocker', 'Rotary Socket', 'Touch', 'Push Button'. 3. **power_source**: 'Corded Electric', 'Battery Powered', 'USB'. 4. **Age_Group**: 'Child' (Kids/Nursery), 'Adult', 'Teen'. 5. **Recommended_Room**: 'Kids Room' (if child/themed), 'Bedroom', 'Office'. """ prompt = f""" You are an expert Home Decor Attribute Extractor. {user_values_section} {definitions_block} === INSTRUCTIONS === 1. Extract EXACT values from Allowed List. 2. **Mapping**: - "Kids/Nursery" -> Age_Group='Child', Room='Kids Room'. - "Plug-in/Cord" -> power_source='Corded Electric'. 3. **Brevity**: Keep "reason" short. === REQUIRED ATTRIBUTES & ALLOWED VALUES === {allowed_text_dynamic} {multiple_text} === INPUT DATA === {product_text} {additional_instructions} === OUTPUT FORMAT === Return pure JSON: {{ "mandatory": {{ "attribute_name": [{{ "value": "", "source": "{source_hint}", "reason": "Brief evidence" }}] }}, {output_example_additional} }} """ # 2. T-SHIRTS / CLOTHING elif any(k in keys_set for k in ["style_clothing_top", "shirt_neck_style", "t_shirt_type"]): logger.info("Selecting T-SHIRT prompt.") definitions_block = """ === CLOTHING ATTRIBUTE HIERARCHY === 1. **style_clothing_top**: - **Specifics**: 'Blouse', 'Camisole', 'Peplum', 'Crop', 'Tank', 'Tube', 'Tunic'. - **Generic**: **'Pullover'** (Standard T-shirts, no zipper/buttons). 2. **shirt_neck_style**: - 'Round Neck' (Standard), 'Crew Neck', 'V-Neck'. 3. **t_shirt_type**: - **Graphic Tees** (Has print/image). - **Classic T-Shirt** (Solid/Simple). - **Pocket Tee** (Has pocket). """ prompt = f""" You are an expert Fashion Attribute Extractor. {user_values_section} {definitions_block} === INSTRUCTIONS === 1. Extract attributes based on definitions. 2. **Hierarchy**: Check for Specific Styles ('Blouse', 'Peplum') first. 3. **Defaulting**: If it's a standard Tee/Top with no specific style -> Select **'Pullover'**. 4. **Anti-Hallucination**: Do NOT select 'Peasant' or 'Loose' unless explicitly stated. 'Graphic' is NOT a style. === REQUIRED ATTRIBUTES & ALLOWED VALUES === {allowed_text_dynamic} {multiple_text} === INPUT DATA === {product_text} {additional_instructions} === OUTPUT FORMAT === Return pure JSON: {{ "mandatory": {{ "attribute_name": [{{ "value": "", "source": "{source_hint}", "reason": "Brief evidence" }}] }}, {output_example_additional} }} """ else: logger.info("Selecting GENERIC prompt.") prompt = f""" Extract attributes. {user_values_section} === REQUIRED ATTRIBUTES & ALLOWED VALUES === {allowed_text_dynamic} {multiple_text} === INPUT DATA === {product_text} {additional_instructions} === OUTPUT FORMAT === Return pure JSON: {{ "mandatory": {{ "attribute_name": [{{ "value": "", "source": "{source_hint}", "reason": "Brief reasoning" }}] }}, {output_example_additional} }} """ payload = { "model": model, "messages": [ {"role": "system", "content": "You are a strict JSON extractor."}, {"role": "user", "content": prompt}, ], "temperature": 0.0, "max_tokens": 800, } try: raw = ProductAttributeService._call_llm(payload) cleaned = ProductAttributeService._clean_json(raw) parsed = json.loads(cleaned) # --------------------------- VALIDATION & RECOVERY --------------------------- if "mandatory" in parsed and isinstance(parsed["mandatory"], dict): filtered_mandatory = {} for key, items in parsed["mandatory"].items(): if key not in mandatory_attrs: continue valid_options = mandatory_attrs[key] validated_items = [] for item in items: raw_val = item.get("value", "").strip() # 1. Try Match best_match = ProductAttributeService._find_best_match(raw_val, valid_options) # 2. Guardrails if best_match in ["Peasant", "Chemise", "Corset", "Bandeau"]: if best_match.lower() not in product_text_lower: best_match = None if best_match: item["value"] = best_match validated_items.append(item) else: # 3. Fallback Text Search found_in_text = None for opt in valid_options: if opt.lower() in product_text_lower: found_in_text = opt break if found_in_text: item["value"] = found_in_text item["source"] = "inferred_from_text" item["reason"] = f"Found keyword '{found_in_text}'." validated_items.append(item) else: # 4. LOGIC OVERRIDES (The Fix) if key == "style_clothing_top" and "Pullover" in valid_options: if any(x in product_text_lower for x in ["t-shirt", "tee", "top", "shirt"]): item["value"] = "Pullover" item["reason"] = "Fallback: Standard Tee implies Pullover." validated_items.append(item) elif key == "shirt_neck_style" and "Round Neck" in valid_options: item["value"] = "Round Neck" item["reason"] = "Fallback: Defaulting to Round Neck." validated_items.append(item) elif key == "t_shirt_type" and "T-Shirts" in valid_options: item["value"] = "T-Shirts" item["reason"] = "Fallback: Generic T-Shirt." validated_items.append(item) if validated_items: filtered_mandatory[key] = validated_items else: # ========================================================= # FINAL LAST RESORT: FORCED DEFAULT (NO "NOT SPECIFIED") # ========================================================= default_val = None # T-Shirt Defaults if key == "style_clothing_top" and "Pullover" in valid_options: default_val = "Pullover" elif key == "shirt_neck_style" and "Round Neck" in valid_options: default_val = "Round Neck" elif key == "t_shirt_type" and "T-Shirts" in valid_options: default_val = "T-Shirts" elif key == "t_shirt_type" and "Classic T-Shirt" in valid_options: default_val = "Classic T-Shirt" # Lamp Defaults elif key == "power_source" and "Corded Electric" in valid_options: default_val = "Corded Electric" # Most common if default_val: filtered_mandatory[key] = [{ "value": default_val, "source": "system_default", "reason": "Forced valid default to avoid Not Specified." }] else: # Truly nothing works filtered_mandatory[key] = [{ "value": "Not Specified", "source": "system", "reason": "No match found and no safe default available." }] parsed["mandatory"] = filtered_mandatory except Exception as exc: logger.error(f"LLM failed: {exc}") return { "mandatory": { a: [{ "value": "Not Specified", "source": "llm_error", "reason": f"LLM processing failed: {str(exc)}" }] for a in mandatory_attrs }, "additional": {} if not extract_additional else {}, "error": str(exc) } if use_cache and cache_key: SimpleCache.set(cache_key, parsed) return parsed @staticmethod def get_cache_stats() -> Dict: return { "global_enabled": is_caching_enabled(), "result_cache": SimpleCache.get_stats(), } @staticmethod def clear_all_caches(): SimpleCache.clear() logger.info("All caches cleared")