| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867 |
- ################## VERSION WORKING GOOD, BUT COMMENTING TO GET MULTIPLE VALUES WITH SEMANTIC MATCH ALSO #################
- # import json
- # import hashlib
- # import logging
- # import time
- # from functools import wraps
- # from typing import Dict, List, Optional, Tuple
- # import requests
- # from django.conf import settings
- # from .llm_load_balancer import call_llm_with_load_balancer
- # from .cache_config import (
- # is_caching_enabled,
- # ENABLE_ATTRIBUTE_EXTRACTION_CACHE,
- # ATTRIBUTE_CACHE_MAX_SIZE,
- # )
- # logger = logging.getLogger(__name__)
- # # --------------------------------------------------------------------------- #
- # # CACHES
- # # --------------------------------------------------------------------------- #
- # class SimpleCache:
- # _cache = {}
- # _max_size = ATTRIBUTE_CACHE_MAX_SIZE
- # @classmethod
- # def get(cls, key: str) -> Optional[Dict]:
- # if not ENABLE_ATTRIBUTE_EXTRACTION_CACHE: return None
- # return cls._cache.get(key)
- # @classmethod
- # def set(cls, key: str, value: Dict):
- # if not ENABLE_ATTRIBUTE_EXTRACTION_CACHE: return
- # if len(cls._cache) >= cls._max_size:
- # items = list(cls._cache.items())
- # cls._cache = dict(items[int(cls._max_size * 0.2):])
- # cls._cache[key] = value
- # @classmethod
- # def clear(cls): cls._cache.clear()
- # @classmethod
- # def get_stats(cls) -> Dict:
- # return {
- # "enabled": ENABLE_ATTRIBUTE_EXTRACTION_CACHE,
- # "size": len(cls._cache),
- # "max_size": cls._max_size,
- # "usage_percent": round(len(cls._cache)/cls._max_size*100, 2) if cls._max_size else 0
- # }
- # # --------------------------------------------------------------------------- #
- # # RETRY DECORATOR
- # # --------------------------------------------------------------------------- #
- # def retry(max_attempts=3, delay=1.0):
- # def decorator(f):
- # @wraps(f)
- # def wrapper(*args, **kwargs):
- # last_exc = None
- # for i in range(max_attempts):
- # try:
- # return f(*args, **kwargs)
- # except Exception as e:
- # last_exc = e
- # if i < max_attempts - 1:
- # wait = delay * (2 ** i)
- # logger.warning(f"Retry {i+1}/{max_attempts} after {wait}s: {e}")
- # time.sleep(wait)
- # raise last_exc or RuntimeError("Retry failed")
- # return wrapper
- # return decorator
- # # --------------------------------------------------------------------------- #
- # # MAIN SERVICE
- # # --------------------------------------------------------------------------- #
- # class ProductAttributeService:
- # @staticmethod
- # def combine_product_text(title=None, short_desc=None, long_desc=None, ocr_text=None) -> Tuple[str, Dict[str, str]]:
- # parts = []
- # source_map = {}
- # if title:
- # t = str(title).strip()
- # parts.append(f"Title: {t}")
- # source_map["title"] = t
- # if short_desc:
- # s = str(short_desc).strip()
- # parts.append(f"Description: {s}")
- # source_map["short_desc"] = s
- # if long_desc:
- # l = str(long_desc).strip()
- # parts.append(f"Details: {l}")
- # source_map["long_desc"] = l
- # if ocr_text:
- # parts.append(f"OCR Text: {ocr_text}")
- # source_map["ocr_text"] = ocr_text
- # combined = "\n".join(parts).strip()
- # return (combined or "No product information", source_map)
- # @staticmethod
- # def _cache_key(product_text: str, mandatory_attrs: Dict, extract_additional: bool, multiple: List[str], user_values: Dict = None) -> str:
- # payload = {
- # "text": product_text,
- # "attrs": mandatory_attrs,
- # "extra": extract_additional,
- # "multiple": sorted(multiple),
- # "user_values": user_values or {}
- # }
- # return f"attr_{hashlib.md5(json.dumps(payload, sort_keys=True).encode()).hexdigest()}"
- # @staticmethod
- # def _clean_json(text: str) -> str:
- # start = text.find("{")
- # end = text.rfind("}") + 1
- # if start != -1 and end > start:
- # text = text[start:end]
- # if "```json" in text:
- # text = text.split("```json", 1)[1].split("```", 1)[0]
- # elif "```" in text:
- # text = text.split("```", 1)[1].split("```", 1)[0]
- # if text.lstrip().startswith("json"): text = text[4:]
- # return text.strip()
- # @staticmethod
- # def format_visual_attributes(visual_attributes: Dict) -> Dict:
- # formatted = {}
- # for key, value in visual_attributes.items():
- # if isinstance(value, list):
- # formatted[key] = [{"value": str(item), "source": "image"} for item in value]
- # elif isinstance(value, dict):
- # nested = {}
- # for sub_key, sub_val in value.items():
- # if isinstance(sub_val, list):
- # nested[sub_key] = [{"value": str(v), "source": "image"} for v in sub_val]
- # else:
- # nested[sub_key] = [{"value": str(sub_val), "source": "image"}]
- # formatted[key] = nested
- # else:
- # formatted[key] = [{"value": str(value), "source": "image"}]
- # return formatted
- # # @staticmethod
- # # @retry(max_attempts=3, delay=1.0)
- # # def _call_llm(payload: dict) -> str:
- # # headers = {"Authorization": f"Bearer {settings.GROQ_API_KEY}", "Content-Type": "application/json"}
- # # resp = requests.post(settings.GROQ_API_URL, headers=headers, json=payload, timeout=30)
- # # resp.raise_for_status()
- # # return resp.json()["choices"][0]["message"]["content"]
- # # At the top of services.py, add this import
- # # from . import call_llm_with_load_balancer, get_load_balancer_stats
- # # Replace the existing _call_llm method with this:
- # @staticmethod
- # @retry(max_attempts=3, delay=3.0)
- # def _call_llm(payload: dict) -> str:
- # """
- # Call LLM using load balancer with multiple API keys
- # Automatically handles rate limiting and failover
- # """
- # return call_llm_with_load_balancer(payload)
- # @staticmethod
- # def extract_attributes(
- # product_text: str,
- # mandatory_attrs: Dict[str, List[str]],
- # source_map: Dict[str, str] = None,
- # model: str = None,
- # extract_additional: bool = True,
- # multiple: Optional[List[str]] = None,
- # use_cache: Optional[bool] = None,
- # user_entered_values: Optional[Dict[str, str]] = None, # NEW PARAMETER
- # ) -> dict:
- # if model is None: model = settings.SUPPORTED_MODELS[0]
- # if multiple is None: multiple = []
- # if source_map is None: source_map = {}
- # if user_entered_values is None: user_entered_values = {}
- # if use_cache is None: use_cache = ENABLE_ATTRIBUTE_EXTRACTION_CACHE
- # if not is_caching_enabled(): use_cache = False
- # cache_key = None
- # if use_cache:
- # cache_key = ProductAttributeService._cache_key(
- # product_text, mandatory_attrs, extract_additional, multiple, user_entered_values
- # )
- # cached = SimpleCache.get(cache_key)
- # if cached:
- # logger.info(f"CACHE HIT {cache_key[:16]}...")
- # return cached
- # # --------------------------- BUILD USER VALUES SECTION ---------------------------
- # user_values_section = ""
- # if user_entered_values:
- # user_lines = []
- # for attr, value in user_entered_values.items():
- # user_lines.append(f" - {attr}: {value}")
- # user_values_section = f"""
- # USER MANUALLY ENTERED VALUES:
- # {chr(10).join(user_lines)}
- # IMPORTANT INSTRUCTIONS FOR USER VALUES:
- # 1. Choose the BEST value (could be user's value, or from allowed list, or inferred)
- # 2. Always provide a "reason" field explaining your decision. Your reason should be valid and from the product text. Not always exact word to be matched from the product text, you can infer understanding the product text.
- # 3. DO NOT hallucinate - be honest if user's value seems wrong based on product evidence
- # 4. If user's value is not in the allowed list but seems correct, chose the most nearest value from the allowed list with proper reasoning why it chose it. Also in this case give the most apt value that should be added in the possible list in the reason so that user can edit it later.
- # """
- # # --------------------------- PROMPT ---------------------------
- # allowed_lines = [f"{attr}: {', '.join(vals)}" for attr, vals in mandatory_attrs.items()]
- # allowed_text = "\n".join(allowed_lines)
- # allowed_sources = list(source_map.keys()) + ["title", "description", "inferred"]
- # source_hint = "|".join(allowed_sources)
- # multiple_text = f"\nMULTIPLE ALLOWED FOR: {', '.join(multiple)}" if multiple else ""
- # if extract_additional:
- # additional_instructions = """
- # For the 'additional' section, identify any other important product attributes and their values (e.g., 'Color', 'Material', 'Weight' etc according to the product text) that are present in the PRODUCT TEXT but not in the Mandatory Attribute list.
- # For each additional attribute, use the best available value from the PRODUCT TEXT and specify the 'source'.
- # Strictly Extract other key attributes other than mandatory attributes from the text.
- # """
- # output_example_additional = """
- # "additional": {
- # "Additional_Attr_1": [{
- # "value": "Value 1",
- # "source": "<{source_hint}>",
- # "reason": "Why this attribute and value were identified"
- # }]
- # }
- # """
- # else:
- # additional_instructions = """
- # Do not identify or include any additional attributes. The 'additional' section must be an empty object {}.
- # """
- # output_example_additional = ' "additional": {}'
- # prompt = f"""
- # You are a product-attribute classifier and validator.
- # Understand the product text very deeply. If the same product is available somewhere online, use that knowledge to predict accurate attribute values.
- # Do not depend only on word-by-word matching from the product text - interpret the meaning and suggest attributes intelligently.
- # Pick the *closest meaning* value from the allowed list, even if not an exact word match.
- # I want values for all mandatory attributes.
- # If a value is not found anywhere, the source should be "inferred".
- # Note: Source means from where you have concluded the result. Choose one of these value <{source_hint}>
- # ALLOWED VALUES (MANDATORY):
- # {allowed_text}
- # Note: "Strictly" return multiple values for these attributes: {multiple_text}. These values must be most possible values from the list and should be max 2 values.
- # {user_values_section}
- # {additional_instructions}
- # PRODUCT TEXT:
- # {product_text}
- # OUTPUT (strict JSON only):
- # {{
- # "mandatory": {{
- # "<attr>": [{{
- # "value": "<chosen_value>",
- # "source": "<{source_hint}>",
- # "reason": "Explanation of why this value was chosen. If user provided a value, explain why you agreed/disagreed with it.",
- # "original_value": "<user_entered_value_if_provided>",
- # "decision": "accepted|rejected|not_provided"
- # }}]
- # }},
- # {output_example_additional}
- # }}
- # RULES:
- # - For each mandatory attribute with a user-entered value, include "original_value" and "decision" fields
- # - "decision" values: "accepted" (used user's value), "rejected" (used different value), "not_provided" (no user value given)
- # - "reason" must explain your choice, especially when rejecting user input
- # - For 'multiple' attributes, always give multiple values for those attributes, choose wisely and max 2 values per attribute that are very close.
- # - Source must be one of: {source_hint}
- # - Be honest and specific in your reasoning.
- # - Return ONLY valid JSON
- # """
- # payload = {
- # "model": model,
- # "messages": [
- # {"role": "system", "content": "You are a JSON-only extractor and validator. Always provide clear reasoning for your decisions."},
- # {"role": "user", "content": prompt},
- # ],
- # "temperature": 0.3,
- # "max_tokens": 2000, # Increased for reasoning
- # }
- # try:
- # raw = ProductAttributeService._call_llm(payload)
- # logger.info("Raw LLM response received")
- # cleaned = ProductAttributeService._clean_json(raw)
- # parsed = json.loads(cleaned)
- # except Exception as exc:
- # logger.error(f"LLM failed: {exc}")
- # return {
- # "mandatory": {
- # a: [{
- # "value": "Not Specified",
- # "source": "llm_error",
- # "reason": f"LLM processing failed: {str(exc)}"
- # }] for a in mandatory_attrs
- # },
- # "additional": {} if not extract_additional else {},
- # "error": str(exc)
- # }
- # if use_cache and cache_key:
- # SimpleCache.set(cache_key, parsed)
- # logger.info(f"CACHE SET {cache_key[:16]}...")
- # return parsed
- # @staticmethod
- # def get_cache_stats() -> Dict:
- # return {
- # "global_enabled": is_caching_enabled(),
- # "result_cache": SimpleCache.get_stats(),
- # }
- # @staticmethod
- # def clear_all_caches():
- # SimpleCache.clear()
- # logger.info("All caches cleared")
- ################## EDITING PROMPT OF ABOVE VERSION ONLY #################
- import json
- import hashlib
- import logging
- import time
- from functools import wraps
- from typing import Dict, List, Optional, Tuple
- import requests
- from django.conf import settings
- from .llm_load_balancer import call_llm_with_load_balancer
- from .cache_config import (
- is_caching_enabled,
- ENABLE_ATTRIBUTE_EXTRACTION_CACHE,
- ATTRIBUTE_CACHE_MAX_SIZE,
- )
- logger = logging.getLogger(__name__)
- # --------------------------------------------------------------------------- #
- # CACHES
- # --------------------------------------------------------------------------- #
- class SimpleCache:
- _cache = {}
- _max_size = ATTRIBUTE_CACHE_MAX_SIZE
- @classmethod
- def get(cls, key: str) -> Optional[Dict]:
- if not ENABLE_ATTRIBUTE_EXTRACTION_CACHE: return None
- return cls._cache.get(key)
- @classmethod
- def set(cls, key: str, value: Dict):
- if not ENABLE_ATTRIBUTE_EXTRACTION_CACHE: return
- if len(cls._cache) >= cls._max_size:
- items = list(cls._cache.items())
- cls._cache = dict(items[int(cls._max_size * 0.2):])
- cls._cache[key] = value
- @classmethod
- def clear(cls): cls._cache.clear()
- @classmethod
- def get_stats(cls) -> Dict:
- return {
- "enabled": ENABLE_ATTRIBUTE_EXTRACTION_CACHE,
- "size": len(cls._cache),
- "max_size": cls._max_size,
- "usage_percent": round(len(cls._cache)/cls._max_size*100, 2) if cls._max_size else 0
- }
- # --------------------------------------------------------------------------- #
- # RETRY DECORATOR
- # --------------------------------------------------------------------------- #
- def retry(max_attempts=3, delay=1.0):
- def decorator(f):
- @wraps(f)
- def wrapper(*args, **kwargs):
- last_exc = None
- for i in range(max_attempts):
- try:
- return f(*args, **kwargs)
- except Exception as e:
- last_exc = e
- if i < max_attempts - 1:
- wait = delay * (2 ** i)
- logger.warning(f"Retry {i+1}/{max_attempts} after {wait}s: {e}")
- time.sleep(wait)
- raise last_exc or RuntimeError("Retry failed")
- return wrapper
- return decorator
- # --------------------------------------------------------------------------- #
- # MAIN SERVICE
- # --------------------------------------------------------------------------- #
- class ProductAttributeService:
- @staticmethod
- def combine_product_text(title=None, short_desc=None, long_desc=None, ocr_text=None) -> Tuple[str, Dict[str, str]]:
- parts = []
- source_map = {}
- if title:
- t = str(title).strip()
- parts.append(f"Title: {t}")
- source_map["title"] = t
- if short_desc:
- s = str(short_desc).strip()
- parts.append(f"Description: {s}")
- source_map["short_desc"] = s
- if long_desc:
- l = str(long_desc).strip()
- parts.append(f"Details: {l}")
- source_map["long_desc"] = l
- if ocr_text:
- parts.append(f"OCR Text: {ocr_text}")
- source_map["ocr_text"] = ocr_text
- combined = "\n".join(parts).strip()
- return (combined or "No product information", source_map)
- @staticmethod
- def _cache_key(product_text: str, mandatory_attrs: Dict, extract_additional: bool, multiple: List[str], user_values: Dict = None) -> str:
- payload = {
- "text": product_text,
- "attrs": mandatory_attrs,
- "extra": extract_additional,
- "multiple": sorted(multiple),
- "user_values": user_values or {}
- }
- return f"attr_{hashlib.md5(json.dumps(payload, sort_keys=True).encode()).hexdigest()}"
- # @staticmethod
- # def _clean_json(text: str) -> str:
- # start = text.find("{")
- # end = text.rfind("}") + 1
- # if start != -1 and end > start:
- # text = text[start:end]
- # if "```json" in text:
- # text = text.split("```json", 1)[1].split("```", 1)[0]
- # elif "```" in text:
- # text = text.split("```", 1)[1].split("```", 1)[0]
- # if text.lstrip().startswith("json"): text = text[4:]
- # return text.strip()
-
- @staticmethod
- def _clean_json(text: str) -> str:
- text = text.strip()
- # Extract JSON block if wrapped in ```json or ```
- if "```json" in text:
- text = text.split("```json", 1)[1].split("```", 1)[0]
- elif "```" in text:
- parts = text.split("```", 2)
- if len(parts) > 1:
- text = parts[1]
- # Find first { and last }
- start = text.find("{")
- end = text.rfind("}") + 1
- if start == -1 or end <= start:
- raise ValueError("No JSON object found in LLM response")
- text = text[start:end]
- return text.strip()
- @staticmethod
- def format_visual_attributes(visual_attributes: Dict) -> Dict:
- formatted = {}
- for key, value in visual_attributes.items():
- if isinstance(value, list):
- formatted[key] = [{"value": str(item), "source": "image"} for item in value]
- elif isinstance(value, dict):
- nested = {}
- for sub_key, sub_val in value.items():
- if isinstance(sub_val, list):
- nested[sub_key] = [{"value": str(v), "source": "image"} for v in sub_val]
- else:
- nested[sub_key] = [{"value": str(sub_val), "source": "image"}]
- formatted[key] = nested
- else:
- formatted[key] = [{"value": str(value), "source": "image"}]
- return formatted
- # @staticmethod
- # @retry(max_attempts=3, delay=1.0)
- # def _call_llm(payload: dict) -> str:
- # headers = {"Authorization": f"Bearer {settings.GROQ_API_KEY}", "Content-Type": "application/json"}
- # resp = requests.post(settings.GROQ_API_URL, headers=headers, json=payload, timeout=30)
- # resp.raise_for_status()
- # return resp.json()["choices"][0]["message"]["content"]
- # At the top of services.py, add this import
- # from . import call_llm_with_load_balancer, get_load_balancer_stats
- # Replace the existing _call_llm method with this:
- @staticmethod
- @retry(max_attempts=3, delay=3.0)
- def _call_llm(payload: dict) -> str:
- """
- Call LLM using load balancer with multiple API keys
- Automatically handles rate limiting and failover
- """
- return call_llm_with_load_balancer(payload)
- @staticmethod
- def extract_attributes(
- product_text: str,
- mandatory_attrs: Dict[str, List[str]],
- source_map: Dict[str, str] = None,
- model: str = None,
- extract_additional: bool = True,
- multiple: Optional[List[str]] = None,
- use_cache: Optional[bool] = None,
- user_entered_values: Optional[Dict[str, str]] = None, # NEW PARAMETER
- ) -> dict:
- if model is None: model = settings.SUPPORTED_MODELS[0]
- if multiple is None: multiple = []
- if source_map is None: source_map = {}
- if user_entered_values is None: user_entered_values = {}
- if use_cache is None: use_cache = ENABLE_ATTRIBUTE_EXTRACTION_CACHE
- if not is_caching_enabled(): use_cache = False
- cache_key = None
- if use_cache:
- cache_key = ProductAttributeService._cache_key(
- product_text, mandatory_attrs, extract_additional, multiple, user_entered_values
- )
- cached = SimpleCache.get(cache_key)
- if cached:
- logger.info(f"CACHE HIT {cache_key[:16]}...")
- return cached
- # --------------------------- BUILD USER VALUES SECTION ---------------------------
- user_values_section = ""
- if user_entered_values:
- user_lines = []
- for attr, value in user_entered_values.items():
- user_lines.append(f" - {attr}: {value}")
- user_values_section = f"""
- USER MANUALLY ENTERED VALUES:
- {chr(10).join(user_lines)}
- IMPORTANT INSTRUCTIONS FOR USER VALUES:
- 1. Choose the BEST value (could be user's value, or from allowed list, or inferred)
- 2. Always provide a "reason" field explaining your decision. Your reason should be valid and from the product text. Not always exact word to be matched from the product text, you can infer understanding the product text.
- 3. DO NOT hallucinate - be honest if user's value seems wrong based on product evidence
- 4. If user's value is not in the allowed list but seems correct, chose the most nearest value from the allowed list with proper reasoning why it chose it. Also in this case give the most apt value that should be added in the possible list in the reason so that user can edit it later.
- """
- # --------------------------- PROMPT ---------------------------
- allowed_lines = [f"{attr}: {', '.join(vals)}" for attr, vals in mandatory_attrs.items()]
- allowed_text = "\n".join(allowed_lines)
- print(f"allowed text is {allowed_text}")
-
- allowed_sources = list(source_map.keys()) + ["title", "description", "inferred"]
- print(f"allowed_sources is {allowed_sources}")
- source_hint = "|".join(allowed_sources)
- multiple_text = f"\nMULTIPLE ALLOWED FOR: {', '.join(multiple)}" if multiple else ""
- if extract_additional:
- additional_instructions = """
- For the 'additional' section, identify any other important product attributes and their values (e.g., 'Color', 'Material', 'Weight' etc according to the product text) that are present in the PRODUCT TEXT but not in the Mandatory Attribute list.
- For each additional attribute, use the best available value from the PRODUCT TEXT and specify the 'source'.
- Strictly Extract other key attributes other than mandatory attributes from the text.
- """
- output_example_additional = """
- "additional": {
- "Additional_Attr_1": [{
- "value": "Value 1",
- "source": "<{source_hint}>",
- "reason": "Why this attribute and value were identified"
- }]
- }
- """
- else:
- additional_instructions = """
- Do not identify or include any additional attributes. The 'additional' section must be an empty object {}.
- """
- output_example_additional = ' "additional": {}'
- prompt = f"""
- You are an expert product attribute extraction system. Your task is to classify products into predefined categories with 100% accuracy.
- CORE RULES - FOLLOW STRICTLY:
- ***CRITICAL T-SHIRT RULE – ALWAYS APPLY FIRST***
- For ANY generic short-sleeve or long-sleeve T-shirt that is pulled over the head
- (no zipper, no buttons, no special silhouette mentioned) you **MUST** classify
- style_clothing_top as "Pullover". This overrides everything else for basic tees.
- Examples: crew neck tee, v-neck tee, graphic tee, plain tee → ALWAYS Pullover.
-
- 1. You MUST ONLY use values from the ALLOWED VALUES lists provided below
- 2. NEVER invent, create, or infer values that don't exist in the allowed lists
- 3. NEVER use synonyms, variations, or similar words - ONLY exact matches from the lists
- 4. Each value you return MUST be an EXACT COPY from the allowed list (case-sensitive)
- 5. If you cannot find a perfect match, choose the CLOSEST semantic match from the allowed list
- 6. When unsure, default to the most generic option from the allowed list
- 7. CRITICAL GUARDRAIL: NEVER use a value allowed for one attribute (e.g., T-Shirts from t_shirt_type) as the value for a different attribute (e.g., style_clothing_top). The lists are entirely separate.
- 8. NEVER invent, create, or infer values that don't exist in the allowed lists.
- 9. Special Instruction for T-Shirts: If the product text describes a generic T-shirt (Crew Neck, V-Neck, etc.) which is a basic shirt worn by pulling it over the head, you MUST use the value Pullover. This is the most appropriate generic style available in this specific list.
- =====================
- ATTRIBUTE DEFINITIONS + ALLOWED VALUES
- =====================
- 1️⃣ **style_clothing_top** (Mandatory)
- Defines the overall silhouette or construction of the top.
- Allowed values + definitions:
- - Bandeau: Strapless, tight top covering just bust.
- - Blouse: Loose, dressy top, usually with sleeves.
- - Camisole: Thin-strapped sleeveless lightweight top.
- - Chemise: Straight, loose unshaped top.
- - Cocoon: Rounded, oversized body silhouette.
- - Corset: Structured top shaping waist with boning.
- - Crop: Short top exposing midriff above waist.
- - Cutout: Top with intentionally open fabric areas.
- - Duster: Long open-front flowy layer.
- - Flounce: Top featuring decorative ruffles.
- - Full Zip: Zipper opening entire length front.
- - Guide: Top with reference markers or functional guides.
- - Half Zip: Zipper halfway down front.
- - High-Low: Back hem longer than front.
- - Hoodie: Top with a hood attached.
- - Muscle: Sleeveless with wide armholes, athletic.
- - Peasant: Loose boho style with gathered neckline/sleeves.
- - Peplum: Fitted top with flared waist ruffle.
- - Pullover: Worn by pulling over head; no front opening.
- - Quarter Zip: Short zipper from collar ~¼ length down.
- - Raglan: Sleeves extend to collar with diagonal seams.
- - Ringer: Contrast colored sleeve cuffs + neckline band.
- - Rugby: Thick striped or solid collared sport-style top.
- - Smocked: Gathered elastic shirring for stretch texture.
- - Swing: A-line flare from bust downward.
- - Torsette: Corset-like, bust exposed for layering.
- - Tube: Strapless elongated top; longer than bandeau.
- - Zip-Up: Top with zipper closure (partial/full).
- 2️⃣ **shirt_neck_style** (Mandatory)
- Describes the neckline’s shape and construction.
- Allowed values + definitions:
- - Boat Neck: Wide neckline shoulder-to-shoulder.
- - Caged Neck: Multiple straps forming cage-like design.
- - Choker Neck: Tight high neck like choker band.
- - Collared: Fold-over collar shirt/polo style.
- - Cowl Neck: Draped soft neckline folds.
- - Crew Neck: Close-fitting round classic neckline.
- - Deep V-Neck: Deep V shape below chest level.
- - Drape Neck: Softly draped neckline less than cowl.
- - Funnel Neck: Short stand-up collar not folded.
- - Halter: Straps around neck leaving shoulders bare.
- - Henley: Round neck with button placket.
- - High Neck: More neck coverage, no fold.
- - Hooded: Neck includes a hood.
- - Jewel Neck: High round neck at base of throat.
- - Keyhole Neck: Slit/hole opening at neckline.
- - Lace Neckline: Lace material used around neckline.
- - Mock Neck: Short raised collar, not folded.
- - Notch Neck: Small V cut in round neckline.
- - Open: General wide/open neckline shape.
- - Plunge: Very deep V/U revealing cleavage.
- - Roll Neck: Loose rolled turtleneck style.
- - Round Neck: Standard circular neckline.
- - Round Neckline: Same as Round Neck.
- - Scoop Neck: Wide deep U-shaped neckline.
- - Scrunch Neck: Gathered/scrunched fabric at neckline.
- - Slit Neck: Small vertical slit opening at front.
- - Square Neckline: Straight edged square neck.
- - Sweetheart: Heart-shaped neckline contour.
- - Tie Neck: Ties or bow at neckline.
- - Turtleneck: High folded collar covering full neck.
- - V-Neck: V-shaped neckline (not deep).
- - Wide Neck: Broad neckline toward shoulders.
- - Zip Mock Neck: Mock neck with zipper.
- 3️⃣ **t_shirt_type** (Mandatory)
- Defines the category/design purpose of the T-shirt.
- Allowed values + definitions:
- - Babydoll T-Shirt: Feminine fitted shirt with flared hem.
- - Classic T-Shirt: Standard basic crew tee.
- - Graphic Tees: T-shirts featuring printed graphics.
- - Pocket Tee: T-shirt having a chest pocket.
- - T-Shirts: General type when no specific style is clear.
- - Tank Tops: Sleeveless shirts with shoulder straps.
- =====================
- MANDATORY ATTRIBUTE RULES
- =====================
- MULTI-VALUE ATTRIBUTES (Return exactly 2 most relevant values ONLY):
- {multiple_text}
- SINGLE-VALUE ATTRIBUTES (Return exactly 1 value for all other attributes)
- =====================
- INPUT PRODUCT DETAILS
- =====================
- PRODUCT INFORMATION TO ANALYZE:
- {product_text}
- {additional_instructions}
- =====================
- EXTRACTION STRATEGY
- =====================
- Step 1: Read product text carefully
- Step 2: Identify strong style/type/neck indicators
- Step 3: Choose closest exact match from allowed list
- Step 4: VALIDATE chosen values exist in allowed list
- =====================
- FORBIDDEN ACTIONS
- =====================
- :x: NEVER invent values
- :x: NEVER modify spelling/capitalization
- :x: NEVER use non-allowed synonyms
- :x: NEVER return attributes outside allowed lists
- :x: NEVER use descriptive words like “short sleeve”, “women top”, “graphic print” unless EXACT in list
- =====================
- OUTPUT FORMAT
- =====================
- Return ONLY this JSON structure — no markdown, no explanation:
- {{
- "mandatory": {{
- "style_clothing_top": [{{
- "value": "<exact value from allowed list>",
- "source": "{source_hint}",
- "reason": "Explain specific mapping from product text"
- }}],
- "shirt_neck_style": [{{
- "value": "<exact value from allowed list>",
- "source": "{source_hint}",
- "reason": "Explain specific mapping from product text"
- }}],
- "t_shirt_type": [{{
- "value": "<exact value from allowed list>",
- "source": "{source_hint}",
- "reason": "Explain specific mapping from product text"
- }}]
- }},
- {output_example_additional}
- }}
- =====================
- FINAL VALIDATION BEFORE RESPONDING
- =====================
- ✓ All values EXACT from allowed list
- ✓ Multi-value attributes have exactly 2 values
- ✓ Single-value attributes have exactly 1 value
- ✓ “source” must be one of: {source_hint}
- ✓ Reasons clearly explain mapping
- ✓ Pure JSON — no markdown wrapper
- """
- print(f"Prompt to the llm is: {prompt}")
- payload = {
- "model": model,
- "messages": [
- {"role": "system", "content": "You are a JSON-only extractor and validator. Always provide clear reasoning for your decisions."},
- {"role": "user", "content": prompt},
- ],
- "temperature": 0.0,
- "max_tokens": 2000, # Increased for reasoning
- }
- try:
- raw = ProductAttributeService._call_llm(payload)
- logger.info("Raw LLM response received")
- print(raw)
- cleaned = ProductAttributeService._clean_json(raw)
- parsed = json.loads(cleaned)
- except Exception as exc:
- logger.error(f"LLM failed: {exc}")
- return {
- "mandatory": {
- a: [{
- "value": "Not Specified",
- "source": "llm_error",
- "reason": f"LLM processing failed: {str(exc)}"
- }] for a in mandatory_attrs
- },
- "additional": {} if not extract_additional else {},
- "error": str(exc)
- }
- if use_cache and cache_key:
- SimpleCache.set(cache_key, parsed)
- logger.info(f"CACHE SET {cache_key[:16]}...")
- return parsed
- @staticmethod
- def get_cache_stats() -> Dict:
- return {
- "global_enabled": is_caching_enabled(),
- "result_cache": SimpleCache.get_stats(),
- }
- @staticmethod
- def clear_all_caches():
- SimpleCache.clear()
- logger.info("All caches cleared")
- # IMPORTANT INSTRUCTIONS FOR USER VALUES:
- # 1. Compare the user-entered value with what you find in the product text
- # 2. Evaluate if the user value is correct, partially correct, or incorrect for this product
- # 3. Choose the BEST value (could be user's value, or from allowed list, or inferred)
- # 4. Always provide a "reason" field explaining your decision
- # 5. DO NOT hallucinate - be honest if user's value seems wrong based on product evidence
- # 6. If user's value is not in the allowed list but seems correct, chose the most nearest value from the allowed list with proper reasoning.
|