# # ==================== services.py (WITH USER VALUE REASONING) ==================== # import json # import hashlib # import logging # import time # from functools import wraps # from typing import Dict, List, Optional, Tuple # import requests # from django.conf import settings # from .llm_load_balancer import call_llm_with_load_balancer # from .cache_config import ( # is_caching_enabled, # ENABLE_ATTRIBUTE_EXTRACTION_CACHE, # ATTRIBUTE_CACHE_MAX_SIZE, # ) # logger = logging.getLogger(__name__) # # --------------------------------------------------------------------------- # # # CACHES # # --------------------------------------------------------------------------- # # class SimpleCache: # _cache = {} # _max_size = ATTRIBUTE_CACHE_MAX_SIZE # @classmethod # def get(cls, key: str) -> Optional[Dict]: # if not ENABLE_ATTRIBUTE_EXTRACTION_CACHE: return None # return cls._cache.get(key) # @classmethod # def set(cls, key: str, value: Dict): # if not ENABLE_ATTRIBUTE_EXTRACTION_CACHE: return # if len(cls._cache) >= cls._max_size: # items = list(cls._cache.items()) # cls._cache = dict(items[int(cls._max_size * 0.2):]) # cls._cache[key] = value # @classmethod # def clear(cls): cls._cache.clear() # @classmethod # def get_stats(cls) -> Dict: # return { # "enabled": ENABLE_ATTRIBUTE_EXTRACTION_CACHE, # "size": len(cls._cache), # "max_size": cls._max_size, # "usage_percent": round(len(cls._cache)/cls._max_size*100, 2) if cls._max_size else 0 # } # # --------------------------------------------------------------------------- # # # RETRY DECORATOR # # --------------------------------------------------------------------------- # # def retry(max_attempts=3, delay=1.0): # def decorator(f): # @wraps(f) # def wrapper(*args, **kwargs): # last_exc = None # for i in range(max_attempts): # try: # return f(*args, **kwargs) # except Exception as e: # last_exc = e # if i < max_attempts - 1: # wait = delay * (2 ** i) # logger.warning(f"Retry {i+1}/{max_attempts} after {wait}s: {e}") # time.sleep(wait) # raise last_exc or RuntimeError("Retry failed") # return wrapper # return decorator # # --------------------------------------------------------------------------- # # # MAIN SERVICE # # --------------------------------------------------------------------------- # # class ProductAttributeService: # @staticmethod # def combine_product_text(title=None, short_desc=None, long_desc=None, ocr_text=None) -> Tuple[str, Dict[str, str]]: # parts = [] # source_map = {} # if title: # t = str(title).strip() # parts.append(f"Title: {t}") # source_map["title"] = t # if short_desc: # s = str(short_desc).strip() # parts.append(f"Description: {s}") # source_map["short_desc"] = s # if long_desc: # l = str(long_desc).strip() # parts.append(f"Details: {l}") # source_map["long_desc"] = l # if ocr_text: # parts.append(f"OCR Text: {ocr_text}") # source_map["ocr_text"] = ocr_text # combined = "\n".join(parts).strip() # return (combined or "No product information", source_map) # @staticmethod # def _cache_key(product_text: str, mandatory_attrs: Dict, extract_additional: bool, multiple: List[str], user_values: Dict = None) -> str: # payload = { # "text": product_text, # "attrs": mandatory_attrs, # "extra": extract_additional, # "multiple": sorted(multiple), # "user_values": user_values or {} # } # return f"attr_{hashlib.md5(json.dumps(payload, sort_keys=True).encode()).hexdigest()}" # @staticmethod # def _clean_json(text: str) -> str: # start = text.find("{") # end = text.rfind("}") + 1 # if start != -1 and end > start: # text = text[start:end] # if "```json" in text: # text = text.split("```json", 1)[1].split("```", 1)[0] # elif "```" in text: # text = text.split("```", 1)[1].split("```", 1)[0] # if text.lstrip().startswith("json"): text = text[4:] # return text.strip() # @staticmethod # def format_visual_attributes(visual_attributes: Dict) -> Dict: # formatted = {} # for key, value in visual_attributes.items(): # if isinstance(value, list): # formatted[key] = [{"value": str(item), "source": "image"} for item in value] # elif isinstance(value, dict): # nested = {} # for sub_key, sub_val in value.items(): # if isinstance(sub_val, list): # nested[sub_key] = [{"value": str(v), "source": "image"} for v in sub_val] # else: # nested[sub_key] = [{"value": str(sub_val), "source": "image"}] # formatted[key] = nested # else: # formatted[key] = [{"value": str(value), "source": "image"}] # return formatted # # @staticmethod # # @retry(max_attempts=3, delay=1.0) # # def _call_llm(payload: dict) -> str: # # headers = {"Authorization": f"Bearer {settings.GROQ_API_KEY}", "Content-Type": "application/json"} # # resp = requests.post(settings.GROQ_API_URL, headers=headers, json=payload, timeout=30) # # resp.raise_for_status() # # return resp.json()["choices"][0]["message"]["content"] # # At the top of services.py, add this import # # from . import call_llm_with_load_balancer, get_load_balancer_stats # # Replace the existing _call_llm method with this: # @staticmethod # @retry(max_attempts=3, delay=3.0) # def _call_llm(payload: dict) -> str: # """ # Call LLM using load balancer with multiple API keys # Automatically handles rate limiting and failover # """ # return call_llm_with_load_balancer(payload) # @staticmethod # def extract_attributes( # product_text: str, # mandatory_attrs: Dict[str, List[str]], # source_map: Dict[str, str] = None, # model: str = None, # extract_additional: bool = True, # multiple: Optional[List[str]] = None, # use_cache: Optional[bool] = None, # user_entered_values: Optional[Dict[str, str]] = None, # NEW PARAMETER # ) -> dict: # if model is None: model = settings.SUPPORTED_MODELS[0] # if multiple is None: multiple = [] # if source_map is None: source_map = {} # if user_entered_values is None: user_entered_values = {} # if use_cache is None: use_cache = ENABLE_ATTRIBUTE_EXTRACTION_CACHE # if not is_caching_enabled(): use_cache = False # cache_key = None # if use_cache: # cache_key = ProductAttributeService._cache_key( # product_text, mandatory_attrs, extract_additional, multiple, user_entered_values # ) # cached = SimpleCache.get(cache_key) # if cached: # logger.info(f"CACHE HIT {cache_key[:16]}...") # return cached # # --------------------------- BUILD USER VALUES SECTION --------------------------- # user_values_section = "" # if user_entered_values: # user_lines = [] # for attr, value in user_entered_values.items(): # user_lines.append(f" - {attr}: {value}") # user_values_section = f""" # USER MANUALLY ENTERED VALUES: # {chr(10).join(user_lines)} # IMPORTANT INSTRUCTIONS FOR USER VALUES: # 1. Compare the user-entered value with what you find in the product text # 2. Evaluate if the user value is correct, partially correct, or incorrect for this product # 3. Choose the BEST value (could be user's value, or from allowed list, or inferred) # 4. Always provide a "reason" field explaining your decision # 5. DO NOT hallucinate - be honest if user's value seems wrong based on product evidence # 6. If user's value is not in the allowed list but seems correct, chose the most nearest value from the allowed list with proper reasoning. # """ # # --------------------------- PROMPT --------------------------- # allowed_lines = [f"{attr}: {', '.join(vals)}" for attr, vals in mandatory_attrs.items()] # allowed_text = "\n".join(allowed_lines) # allowed_sources = list(source_map.keys()) + ["title", "description", "inferred"] # source_hint = "|".join(allowed_sources) # multiple_text = f"\nMULTIPLE ALLOWED FOR: {', '.join(multiple)}" if multiple else "" # print("Multiple text for attr: ") # print(multiple_text) # additional_instructions = """ # For the 'additional' section, identify any other important product attributes and their values (e.g., 'Color', 'Material', 'Weight' etc) that are present in the PRODUCT TEXT but not in the Mandatory Attribute list. # For each additional attribute, use the best available value from the PRODUCT TEXT and specify the 'source'. # """ if extract_additional else "" # prompt = f""" # You are a product-attribute classifier and validator. # Understand the product text very deeply. If the same product is available somewhere online, use that knowledge to predict accurate attribute values. # Do not depend only on word-by-word matching from the product text - interpret the meaning and suggest attributes intelligently. # Pick the *closest meaning* value from the allowed list, even if not an exact word match. # I want values for all mandatory attributes. # If a value is not found anywhere, the source should be "inferred". # Note: Source means from where you have concluded the result. Choose one of these value <{source_hint}> # ALLOWED VALUES (MANDATORY): # {allowed_text} # Note: "Strictly" return multiple values for these attributes: {multiple_text}. These values must be most possible values from the list and should be max 2 values. # {user_values_section} # {additional_instructions} # PRODUCT TEXT: # {product_text} # OUTPUT (strict JSON only): # {{ # "mandatory": {{ # "": [{{ # "value": "", # "source": "<{source_hint}>", # "reason": "Explanation of why this value was chosen. If user provided a value, explain why you agreed/disagreed with it.", # "original_value": "", # "decision": "accepted|rejected" # }}] # }}, # "additional": {{ # "Additional_Attr_1": [{{ # "value": "Value 1", # "source": "<{source_hint}>", # "reason": "Why this attribute and value were identified" # }}] # }} # }} # RULES: # - For each mandatory attribute with a user-entered value, include "original_value" and "decision" fields # - "decision" values: "accepted" (used user's value), "rejected" (used different value), "not_provided" (no user value given) # - "reason" must explain your choice, especially when rejecting user input # - For 'additional' attributes: Strictly Extract other key attributes other than mandatory attributes from the text. # - For 'multiple' attributes, always give multiple value for those attribues, choose wisely and max 2 multiple attribute that are very close. # - Source must be one of: {source_hint} # - Be honest and specific in your reasoning. # - Return ONLY valid JSON # """ # payload = { # "model": model, # "messages": [ # {"role": "system", "content": "You are a JSON-only extractor and validator. Always provide clear reasoning for your decisions."}, # {"role": "user", "content": prompt}, # ], # "temperature": 0.3, # "max_tokens": 2000, # Increased for reasoning # } # try: # raw = ProductAttributeService._call_llm(payload) # logger.info("Raw LLM response received") # print(raw) # cleaned = ProductAttributeService._clean_json(raw) # parsed = json.loads(cleaned) # except Exception as exc: # logger.error(f"LLM failed: {exc}") # return { # "mandatory": { # a: [{ # "value": "Not Specified", # "source": "llm_error", # "reason": f"LLM processing failed: {str(exc)}" # }] for a in mandatory_attrs # }, # "additional": {} if not extract_additional else {}, # "error": str(exc) # } # if use_cache and cache_key: # SimpleCache.set(cache_key, parsed) # logger.info(f"CACHE SET {cache_key[:16]}...") # return parsed # @staticmethod # def get_cache_stats() -> Dict: # return { # "global_enabled": is_caching_enabled(), # "result_cache": SimpleCache.get_stats(), # } # @staticmethod # def clear_all_caches(): # SimpleCache.clear() # logger.info("All caches cleared") import json import hashlib import logging import time from functools import wraps from typing import Dict, List, Optional, Tuple import requests from django.conf import settings from .llm_load_balancer import call_llm_with_load_balancer from .cache_config import ( is_caching_enabled, ENABLE_ATTRIBUTE_EXTRACTION_CACHE, ATTRIBUTE_CACHE_MAX_SIZE, ) logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # # CACHES # --------------------------------------------------------------------------- # class SimpleCache: _cache = {} _max_size = ATTRIBUTE_CACHE_MAX_SIZE @classmethod def get(cls, key: str) -> Optional[Dict]: if not ENABLE_ATTRIBUTE_EXTRACTION_CACHE: return None return cls._cache.get(key) @classmethod def set(cls, key: str, value: Dict): if not ENABLE_ATTRIBUTE_EXTRACTION_CACHE: return if len(cls._cache) >= cls._max_size: items = list(cls._cache.items()) cls._cache = dict(items[int(cls._max_size * 0.2):]) cls._cache[key] = value @classmethod def clear(cls): cls._cache.clear() @classmethod def get_stats(cls) -> Dict: return { "enabled": ENABLE_ATTRIBUTE_EXTRACTION_CACHE, "size": len(cls._cache), "max_size": cls._max_size, "usage_percent": round(len(cls._cache)/cls._max_size*100, 2) if cls._max_size else 0 } # --------------------------------------------------------------------------- # # RETRY DECORATOR # --------------------------------------------------------------------------- # def retry(max_attempts=3, delay=1.0): def decorator(f): @wraps(f) def wrapper(*args, **kwargs): last_exc = None for i in range(max_attempts): try: return f(*args, **kwargs) except Exception as e: last_exc = e if i < max_attempts - 1: wait = delay * (2 ** i) logger.warning(f"Retry {i+1}/{max_attempts} after {wait}s: {e}") time.sleep(wait) raise last_exc or RuntimeError("Retry failed") return wrapper return decorator # --------------------------------------------------------------------------- # # MAIN SERVICE # --------------------------------------------------------------------------- # class ProductAttributeService: @staticmethod def combine_product_text(title=None, short_desc=None, long_desc=None, ocr_text=None) -> Tuple[str, Dict[str, str]]: parts = [] source_map = {} if title: t = str(title).strip() parts.append(f"Title: {t}") source_map["title"] = t if short_desc: s = str(short_desc).strip() parts.append(f"Description: {s}") source_map["short_desc"] = s if long_desc: l = str(long_desc).strip() parts.append(f"Details: {l}") source_map["long_desc"] = l if ocr_text: parts.append(f"OCR Text: {ocr_text}") source_map["ocr_text"] = ocr_text combined = "\n".join(parts).strip() return (combined or "No product information", source_map) @staticmethod def _cache_key(product_text: str, mandatory_attrs: Dict, extract_additional: bool, multiple: List[str], user_values: Dict = None) -> str: payload = { "text": product_text, "attrs": mandatory_attrs, "extra": extract_additional, "multiple": sorted(multiple), "user_values": user_values or {} } return f"attr_{hashlib.md5(json.dumps(payload, sort_keys=True).encode()).hexdigest()}" @staticmethod def _clean_json(text: str) -> str: start = text.find("{") end = text.rfind("}") + 1 if start != -1 and end > start: text = text[start:end] if "```json" in text: text = text.split("```json", 1)[1].split("```", 1)[0] elif "```" in text: text = text.split("```", 1)[1].split("```", 1)[0] if text.lstrip().startswith("json"): text = text[4:] return text.strip() @staticmethod def format_visual_attributes(visual_attributes: Dict) -> Dict: formatted = {} for key, value in visual_attributes.items(): if isinstance(value, list): formatted[key] = [{"value": str(item), "source": "image"} for item in value] elif isinstance(value, dict): nested = {} for sub_key, sub_val in value.items(): if isinstance(sub_val, list): nested[sub_key] = [{"value": str(v), "source": "image"} for v in sub_val] else: nested[sub_key] = [{"value": str(sub_val), "source": "image"}] formatted[key] = nested else: formatted[key] = [{"value": str(value), "source": "image"}] return formatted # @staticmethod # @retry(max_attempts=3, delay=1.0) # def _call_llm(payload: dict) -> str: # headers = {"Authorization": f"Bearer {settings.GROQ_API_KEY}", "Content-Type": "application/json"} # resp = requests.post(settings.GROQ_API_URL, headers=headers, json=payload, timeout=30) # resp.raise_for_status() # return resp.json()["choices"][0]["message"]["content"] # At the top of services.py, add this import # from . import call_llm_with_load_balancer, get_load_balancer_stats # Replace the existing _call_llm method with this: @staticmethod @retry(max_attempts=3, delay=3.0) def _call_llm(payload: dict) -> str: """ Call LLM using load balancer with multiple API keys Automatically handles rate limiting and failover """ return call_llm_with_load_balancer(payload) @staticmethod def extract_attributes( product_text: str, mandatory_attrs: Dict[str, List[str]], source_map: Dict[str, str] = None, model: str = None, extract_additional: bool = True, multiple: Optional[List[str]] = None, use_cache: Optional[bool] = None, user_entered_values: Optional[Dict[str, str]] = None, # NEW PARAMETER ) -> dict: if model is None: model = settings.SUPPORTED_MODELS[0] if multiple is None: multiple = [] if source_map is None: source_map = {} if user_entered_values is None: user_entered_values = {} if use_cache is None: use_cache = ENABLE_ATTRIBUTE_EXTRACTION_CACHE if not is_caching_enabled(): use_cache = False cache_key = None if use_cache: cache_key = ProductAttributeService._cache_key( product_text, mandatory_attrs, extract_additional, multiple, user_entered_values ) cached = SimpleCache.get(cache_key) if cached: logger.info(f"CACHE HIT {cache_key[:16]}...") return cached # --------------------------- BUILD USER VALUES SECTION --------------------------- user_values_section = "" if user_entered_values: user_lines = [] for attr, value in user_entered_values.items(): user_lines.append(f" - {attr}: {value}") user_values_section = f""" USER MANUALLY ENTERED VALUES: {chr(10).join(user_lines)} IMPORTANT INSTRUCTIONS FOR USER VALUES: 1. Choose the BEST value (could be user's value, or from allowed list, or inferred) 2. Always provide a "reason" field explaining your decision. Your reason should be valid and from the product text. Not always exact word to be matched from the product text, you can infer understanding the product text. 3. DO NOT hallucinate - be honest if user's value seems wrong based on product evidence 4. If user's value is not in the allowed list but seems correct, chose the most nearest value from the allowed list with proper reasoning. """ # --------------------------- PROMPT --------------------------- allowed_lines = [f"{attr}: {', '.join(vals)}" for attr, vals in mandatory_attrs.items()] allowed_text = "\n".join(allowed_lines) allowed_sources = list(source_map.keys()) + ["title", "description", "inferred"] source_hint = "|".join(allowed_sources) multiple_text = f"\nMULTIPLE ALLOWED FOR: {', '.join(multiple)}" if multiple else "" if extract_additional: additional_instructions = """ For the 'additional' section, identify any other important product attributes and their values (e.g., 'Color', 'Material', 'Weight' etc according to the product text) that are present in the PRODUCT TEXT but not in the Mandatory Attribute list. For each additional attribute, use the best available value from the PRODUCT TEXT and specify the 'source'. Strictly Extract other key attributes other than mandatory attributes from the text. """ output_example_additional = """ "additional": { "Additional_Attr_1": [{ "value": "Value 1", "source": "<{source_hint}>", "reason": "Why this attribute and value were identified" }] } """ else: additional_instructions = """ Do not identify or include any additional attributes. The 'additional' section must be an empty object {}. """ output_example_additional = ' "additional": {}' prompt = f""" You are a product-attribute classifier and validator. Understand the product text very deeply. If the same product is available somewhere online, use that knowledge to predict accurate attribute values. Do not depend only on word-by-word matching from the product text - interpret the meaning and suggest attributes intelligently. Pick the *closest meaning* value from the allowed list, even if not an exact word match. I want values for all mandatory attributes. If a value is not found anywhere, the source should be "inferred". Note: Source means from where you have concluded the result. Choose one of these value <{source_hint}> ALLOWED VALUES (MANDATORY): {allowed_text} Note: "Strictly" return multiple values for these attributes: {multiple_text}. These values must be most possible values from the list and should be max 2 values. {user_values_section} {additional_instructions} PRODUCT TEXT: {product_text} OUTPUT (strict JSON only): {{ "mandatory": {{ "": [{{ "value": "", "source": "<{source_hint}>", "reason": "Explanation of why this value was chosen. If user provided a value, explain why you agreed/disagreed with it.", "original_value": "", "decision": "accepted|rejected|not_provided" }}] }}, {output_example_additional} }} RULES: - For each mandatory attribute with a user-entered value, include "original_value" and "decision" fields - "decision" values: "accepted" (used user's value), "rejected" (used different value), "not_provided" (no user value given) - "reason" must explain your choice, especially when rejecting user input - For 'multiple' attributes, always give multiple values for those attributes, choose wisely and max 2 values per attribute that are very close. - Source must be one of: {source_hint} - Be honest and specific in your reasoning. - Return ONLY valid JSON """ payload = { "model": model, "messages": [ {"role": "system", "content": "You are a JSON-only extractor and validator. Always provide clear reasoning for your decisions."}, {"role": "user", "content": prompt}, ], "temperature": 0.3, "max_tokens": 2000, # Increased for reasoning } try: raw = ProductAttributeService._call_llm(payload) logger.info("Raw LLM response received") cleaned = ProductAttributeService._clean_json(raw) parsed = json.loads(cleaned) except Exception as exc: logger.error(f"LLM failed: {exc}") return { "mandatory": { a: [{ "value": "Not Specified", "source": "llm_error", "reason": f"LLM processing failed: {str(exc)}" }] for a in mandatory_attrs }, "additional": {} if not extract_additional else {}, "error": str(exc) } if use_cache and cache_key: SimpleCache.set(cache_key, parsed) logger.info(f"CACHE SET {cache_key[:16]}...") return parsed @staticmethod def get_cache_stats() -> Dict: return { "global_enabled": is_caching_enabled(), "result_cache": SimpleCache.get_stats(), } @staticmethod def clear_all_caches(): SimpleCache.clear() logger.info("All caches cleared") # IMPORTANT INSTRUCTIONS FOR USER VALUES: # 1. Compare the user-entered value with what you find in the product text # 2. Evaluate if the user value is correct, partially correct, or incorrect for this product # 3. Choose the BEST value (could be user's value, or from allowed list, or inferred) # 4. Always provide a "reason" field explaining your decision # 5. DO NOT hallucinate - be honest if user's value seems wrong based on product evidence # 6. If user's value is not in the allowed list but seems correct, chose the most nearest value from the allowed list with proper reasoning.