| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221 |
- # # # ==================== services.py ====================
- # # import requests
- # # import json
- # # import re
- # # from typing import Dict, List, Optional, Tuple
- # # from django.conf import settings
- # # from concurrent.futures import ThreadPoolExecutor, as_completed
- # # from sentence_transformers import SentenceTransformer, util
- # # import numpy as np
- # # from .ocr_service import OCRService
- # # # Initialize embedding model for normalization
- # # model_embedder = SentenceTransformer("all-MiniLM-L6-v2")
- # # class ProductAttributeService:
- # # """Service class for extracting product attributes using Groq LLM."""
- # # @staticmethod
- # # def normalize_dimension_text(text: str) -> str:
- # # """
- # # Normalize dimension text to match format like '16x20', '20x30', etc.
- # # Handles formats like '16 x 20', '16x1.5x20', '16 x 1.5 x 20 Inches'
- # # Returns the normalized dimension (e.g., '16x20') or empty string if not found.
- # # """
- # # if not text:
- # # return ""
-
- # # # Convert to lowercase and remove common units
- # # text = text.lower()
- # # text = re.sub(r'\s*(inches|inch|in|cm|centimeters|mm|millimeters)\s*', '', text, flags=re.IGNORECASE)
-
- # # # Extract all numbers from the text
- # # numbers = re.findall(r'\d+\.?\d*', text)
-
- # # if not numbers:
- # # return ""
-
- # # # Convert to floats first to handle decimals properly
- # # float_numbers = []
- # # for num in numbers:
- # # try:
- # # float_numbers.append(float(num))
- # # except:
- # # continue
-
- # # if len(float_numbers) < 2:
- # # return ""
-
- # # # If we have 3 dimensions, it's likely Width x Depth x Height
- # # # For wall art, depth is usually small (< 5), so we keep first and last
- # # if len(float_numbers) == 3:
- # # # Keep first and last values (width and height), skip middle (depth)
- # # float_numbers = [float_numbers[0], float_numbers[2]]
- # # elif len(float_numbers) > 3:
- # # # If more than 3 dimensions, keep the two largest
- # # float_numbers = sorted(float_numbers)[-2:]
- # # else:
- # # # Just 2 dimensions, use as is
- # # float_numbers = float_numbers[:2]
-
- # # # Format numbers: use integer if whole, else one decimal
- # # formatted_numbers = []
- # # for num in float_numbers:
- # # if num.is_integer():
- # # formatted_numbers.append(str(int(num)))
- # # else:
- # # formatted_numbers.append(f"{num:.1f}")
-
- # # # Sort to ensure consistent order (smaller x larger)
- # # formatted_numbers.sort(key=lambda x: float(x))
-
- # # # Return formatted dimension
- # # return f"{formatted_numbers[0]}x{formatted_numbers[1]}"
-
-
- # # @staticmethod
- # # def normalize_value_for_matching(value: str, attr_name: str = "") -> str:
- # # """
- # # Normalize a value based on its attribute type for better matching.
- # # Currently handles dimensions specially, can be extended for other attributes.
- # # """
- # # # Check if this is a dimension-related attribute
- # # dimension_keywords = ['dimension', 'size', 'measurement']
- # # if any(keyword in attr_name.lower() for keyword in dimension_keywords):
- # # normalized = ProductAttributeService.normalize_dimension_text(value)
- # # if normalized:
- # # return normalized
-
- # # # For other attributes, just return cleaned value
- # # return value.strip()
- # # @staticmethod
- # # def combine_product_text(
- # # title: Optional[str] = None,
- # # short_desc: Optional[str] = None,
- # # long_desc: Optional[str] = None,
- # # ocr_text: Optional[str] = None
- # # ) -> Tuple[str, Dict[str, str]]:
- # # """
- # # Combine product metadata into a single text block.
- # # Returns: (combined_text, source_map) where source_map tracks which text came from where
- # # """
- # # parts = []
- # # source_map = {}
-
- # # if title:
- # # title_str = str(title).strip()
- # # parts.append(f"Title: {title_str}")
- # # source_map['title'] = title_str
- # # if short_desc:
- # # short_str = str(short_desc).strip()
- # # parts.append(f"Description: {short_str}")
- # # source_map['short_desc'] = short_str
- # # if long_desc:
- # # long_str = str(long_desc).strip()
- # # parts.append(f"Details: {long_str}")
- # # source_map['long_desc'] = long_str
- # # if ocr_text:
- # # parts.append(f"OCR Text: {ocr_text}")
- # # source_map['ocr_text'] = ocr_text
-
- # # combined = "\n".join(parts).strip()
-
- # # if not combined:
- # # return "No product information available", {}
-
- # # return combined, source_map
- # # @staticmethod
- # # def find_value_source(value: str, source_map: Dict[str, str], attr_name: str = "") -> str:
- # # """
- # # Find which source(s) contain the given value.
- # # Returns the source name(s) where the value appears.
- # # Now handles normalized matching for dimensions.
- # # """
- # # value_lower = value.lower()
- # # # Split value into tokens for better matching
- # # value_tokens = set(value_lower.replace("-", " ").replace("x", " ").split())
-
- # # # Check if this is a dimension-related attribute
- # # is_dimension_attr = any(keyword in attr_name.lower() for keyword in ['dimension', 'size', 'measurement'])
-
- # # sources_found = []
- # # source_scores = {}
-
- # # for source_name, source_text in source_map.items():
- # # source_lower = source_text.lower()
-
- # # # Check for exact phrase match first
- # # if value_lower in source_lower:
- # # source_scores[source_name] = 1.0
- # # continue
-
- # # # For dimensions, check normalized match
- # # if is_dimension_attr:
- # # # Normalize the value (e.g., "16x20" stays "16x20")
- # # normalized_value = ProductAttributeService.normalize_dimension_text(value)
- # # if not normalized_value:
- # # normalized_value = value.replace("x", " ").strip()
-
- # # # Normalize the source text to extract dimensions
- # # normalized_source = ProductAttributeService.normalize_dimension_text(source_text)
-
- # # # Direct match
- # # if normalized_value == normalized_source:
- # # source_scores[source_name] = 0.95
- # # continue
-
- # # # Also check if the dimension numbers appear in the source
- # # # Extract dimension parts (e.g., "16x20" -> ["16", "20"])
- # # dim_parts = normalized_value.split("x") if "x" in normalized_value else []
- # # if len(dim_parts) == 2:
- # # # Check if both numbers appear in the source
- # # if all(part in source_text for part in dim_parts):
- # # source_scores[source_name] = 0.85
- # # continue
-
- # # # Check for token matches
- # # token_matches = sum(1 for token in value_tokens if token and token in source_lower)
- # # if token_matches > 0 and len(value_tokens) > 0:
- # # source_scores[source_name] = token_matches / len(value_tokens)
-
- # # # Return source with highest score, or all sources if multiple have same score
- # # if source_scores:
- # # max_score = max(source_scores.values())
- # # sources_found = [s for s, score in source_scores.items() if score == max_score]
-
- # # # Prioritize: title > short_desc > long_desc > ocr_text
- # # priority = ['title', 'short_desc', 'long_desc', 'ocr_text']
- # # for p in priority:
- # # if p in sources_found:
- # # return p
-
- # # return sources_found[0] if sources_found else "Not found"
-
- # # return "Not found"
- # # @staticmethod
- # # def format_visual_attributes(visual_attributes: Dict) -> Dict:
- # # """
- # # Convert visual attributes to array format with source tracking.
- # # Source is always 'image' for visual attributes.
- # # """
- # # formatted = {}
-
- # # for key, value in visual_attributes.items():
- # # if isinstance(value, list):
- # # # Already a list (like color_palette)
- # # formatted[key] = [{"value": str(item), "source": "image"} for item in value]
- # # elif isinstance(value, dict):
- # # # Nested dictionary - format recursively
- # # nested_formatted = {}
- # # for nested_key, nested_value in value.items():
- # # if isinstance(nested_value, list):
- # # nested_formatted[nested_key] = [{"value": str(item), "source": "image"} for item in nested_value]
- # # else:
- # # nested_formatted[nested_key] = [{"value": str(nested_value), "source": "image"}]
- # # formatted[key] = nested_formatted
- # # else:
- # # # Single value
- # # formatted[key] = [{"value": str(value), "source": "image"}]
-
- # # return formatted
- # # @staticmethod
- # # def extract_attributes_from_ocr(ocr_results: Dict, model: str = None) -> Dict:
- # # """Extract structured attributes from OCR text using LLM."""
- # # if model is None:
- # # model = settings.SUPPORTED_MODELS[0]
-
- # # detected_text = ocr_results.get('detected_text', [])
- # # if not detected_text:
- # # return {}
-
- # # # Format OCR text for prompt
- # # ocr_text = "\n".join([f"Text: {item['text']}, Confidence: {item['confidence']:.2f}"
- # # for item in detected_text])
-
- # # prompt = f"""
- # # You are an AI model that extracts structured attributes from OCR text detected on product images.
- # # Given the OCR detections below, infer the possible product attributes and return them as a clean JSON object.
- # # OCR Text:
- # # {ocr_text}
- # # Extract relevant attributes like:
- # # - brand
- # # - model_number
- # # - size (waist_size, length, etc.)
- # # - collection
- # # - any other relevant product information
- # # Return a JSON object with only the attributes you can confidently identify.
- # # If an attribute is not present, do not include it in the response.
- # # """
-
- # # payload = {
- # # "model": model,
- # # "messages": [
- # # {
- # # "role": "system",
- # # "content": "You are a helpful AI that extracts structured data from OCR output. Return only valid JSON."
- # # },
- # # {"role": "user", "content": prompt}
- # # ],
- # # "temperature": 0.2,
- # # "max_tokens": 500
- # # }
-
- # # headers = {
- # # "Authorization": f"Bearer {settings.GROQ_API_KEY}",
- # # "Content-Type": "application/json",
- # # }
-
- # # try:
- # # response = requests.post(
- # # settings.GROQ_API_URL,
- # # headers=headers,
- # # json=payload,
- # # timeout=30
- # # )
- # # response.raise_for_status()
- # # result_text = response.json()["choices"][0]["message"]["content"].strip()
-
- # # # Clean and parse JSON
- # # result_text = ProductAttributeService._clean_json_response(result_text)
- # # parsed = json.loads(result_text)
-
- # # # Convert to array format with source tracking
- # # formatted_attributes = {}
- # # for key, value in parsed.items():
- # # if key == "error":
- # # continue
-
- # # # Handle nested dictionaries (like size)
- # # if isinstance(value, dict):
- # # nested_formatted = {}
- # # for nested_key, nested_value in value.items():
- # # nested_formatted[nested_key] = [{"value": str(nested_value), "source": "image"}]
- # # formatted_attributes[key] = nested_formatted
- # # elif isinstance(value, list):
- # # # Already a list, convert each item
- # # formatted_attributes[key] = [{"value": str(item), "source": "image"} for item in value]
- # # else:
- # # # Single value
- # # formatted_attributes[key] = [{"value": str(value), "source": "image"}]
-
- # # return formatted_attributes
- # # except Exception as e:
- # # return {"error": f"Failed to extract attributes from OCR: {str(e)}"}
- # # @staticmethod
- # # def calculate_attribute_relationships(
- # # mandatory_attrs: Dict[str, List[str]],
- # # product_text: str
- # # ) -> Dict[str, float]:
- # # """
- # # Calculate semantic relationships between attribute values across different attributes.
- # # Returns a matrix of cross-attribute value similarities.
- # # """
- # # pt_emb = model_embedder.encode(product_text, convert_to_tensor=True)
- # # # Calculate similarities between all attribute values and product text
- # # attr_scores = {}
- # # for attr, values in mandatory_attrs.items():
- # # attr_scores[attr] = {}
- # # for val in values:
- # # contexts = [val, f"for {val}", f"use in {val}", f"suitable for {val}"]
- # # ctx_embs = [model_embedder.encode(c, convert_to_tensor=True) for c in contexts]
- # # sem_sim = max(float(util.cos_sim(pt_emb, ce).item()) for ce in ctx_embs)
- # # attr_scores[attr][val] = sem_sim
- # # # Calculate cross-attribute value relationships
- # # relationships = {}
- # # attr_list = list(mandatory_attrs.keys())
- # # for i, attr1 in enumerate(attr_list):
- # # for attr2 in attr_list[i+1:]:
- # # # Calculate pairwise similarities between values of different attributes
- # # for val1 in mandatory_attrs[attr1]:
- # # for val2 in mandatory_attrs[attr2]:
- # # emb1 = model_embedder.encode(val1, convert_to_tensor=True)
- # # emb2 = model_embedder.encode(val2, convert_to_tensor=True)
- # # sim = float(util.cos_sim(emb1, emb2).item())
- # # # Store bidirectional relationships
- # # key1 = f"{attr1}:{val1}->{attr2}:{val2}"
- # # key2 = f"{attr2}:{val2}->{attr1}:{val1}"
- # # relationships[key1] = sim
- # # relationships[key2] = sim
- # # return relationships
- # # @staticmethod
- # # def calculate_value_clusters(
- # # values: List[str],
- # # scores: List[Tuple[str, float]],
- # # cluster_threshold: float = 0.4
- # # ) -> List[List[str]]:
- # # """
- # # Group values into semantic clusters based on their similarity to each other.
- # # Returns clusters of related values.
- # # """
- # # if len(values) <= 1:
- # # return [[val] for val, _ in scores]
- # # # Get embeddings for all values
- # # embeddings = [model_embedder.encode(val, convert_to_tensor=True) for val in values]
- # # # Calculate pairwise similarities
- # # similarity_matrix = np.zeros((len(values), len(values)))
- # # for i in range(len(values)):
- # # for j in range(i+1, len(values)):
- # # sim = float(util.cos_sim(embeddings[i], embeddings[j]).item())
- # # similarity_matrix[i][j] = sim
- # # similarity_matrix[j][i] = sim
- # # # Simple clustering: group values with high similarity
- # # clusters = []
- # # visited = set()
- # # for i, (val, score) in enumerate(scores):
- # # if i in visited:
- # # continue
- # # cluster = [val]
- # # visited.add(i)
- # # # Find similar values
- # # for j in range(len(values)):
- # # if j not in visited and similarity_matrix[i][j] >= cluster_threshold:
- # # cluster.append(values[j])
- # # visited.add(j)
- # # clusters.append(cluster)
- # # return clusters
- # # @staticmethod
- # # def get_dynamic_threshold(
- # # attr: str,
- # # val: str,
- # # base_score: float,
- # # extracted_attrs: Dict[str, List[Dict[str, str]]],
- # # relationships: Dict[str, float],
- # # mandatory_attrs: Dict[str, List[str]],
- # # base_threshold: float = 0.65,
- # # boost_factor: float = 0.15
- # # ) -> float:
- # # """
- # # Calculate dynamic threshold based on relationships with already-extracted attributes.
- # # """
- # # threshold = base_threshold
- # # # Check relationships with already extracted attributes
- # # max_relationship = 0.0
- # # for other_attr, other_values_list in extracted_attrs.items():
- # # if other_attr == attr:
- # # continue
- # # for other_val_dict in other_values_list:
- # # other_val = other_val_dict['value']
- # # key = f"{attr}:{val}->{other_attr}:{other_val}"
- # # if key in relationships:
- # # max_relationship = max(max_relationship, relationships[key])
- # # # If strong relationship exists, lower threshold
- # # if max_relationship > 0.6:
- # # threshold = base_threshold - (boost_factor * max_relationship)
- # # return max(0.3, threshold)
- # # @staticmethod
- # # def get_adaptive_margin(
- # # scores: List[Tuple[str, float]],
- # # base_margin: float = 0.15,
- # # max_margin: float = 0.22
- # # ) -> float:
- # # """
- # # Calculate adaptive margin based on score distribution.
- # # """
- # # if len(scores) < 2:
- # # return base_margin
- # # score_values = [s for _, s in scores]
- # # best_score = score_values[0]
- # # # If best score is very low, use adaptive margin but be more conservative
- # # if best_score < 0.5:
- # # # Calculate score spread in top 3-4 scores only (more selective)
- # # top_scores = score_values[:min(4, len(score_values))]
- # # score_range = max(top_scores) - min(top_scores)
- # # # Very controlled margin increase
- # # if score_range < 0.30:
- # # # Much more conservative scaling
- # # score_factor = (0.5 - best_score) * 0.35
- # # adaptive = base_margin + score_factor + (0.30 - score_range) * 0.2
- # # return min(adaptive, max_margin)
- # # return base_margin
- # # @staticmethod
- # # def _lexical_evidence(product_text: str, label: str) -> float:
- # # """Calculate lexical overlap between product text and label."""
- # # pt = product_text.lower()
- # # tokens = [t for t in label.lower().replace("-", " ").split() if t]
- # # if not tokens:
- # # return 0.0
- # # hits = sum(1 for t in tokens if t in pt)
- # # return hits / len(tokens)
- # # @staticmethod
- # # def normalize_against_product_text(
- # # product_text: str,
- # # mandatory_attrs: Dict[str, List[str]],
- # # source_map: Dict[str, str],
- # # threshold_abs: float = 0.65,
- # # margin: float = 0.15,
- # # allow_multiple: bool = False,
- # # sem_weight: float = 0.8,
- # # lex_weight: float = 0.2,
- # # extracted_attrs: Optional[Dict[str, List[Dict[str, str]]]] = None,
- # # relationships: Optional[Dict[str, float]] = None,
- # # use_dynamic_thresholds: bool = True,
- # # use_adaptive_margin: bool = True,
- # # use_semantic_clustering: bool = True
- # # ) -> dict:
- # # """
- # # Score each allowed value against the product_text with dynamic thresholds.
- # # Returns dict with values in array format: [{"value": "...", "source": "..."}]
- # # """
- # # if extracted_attrs is None:
- # # extracted_attrs = {}
- # # if relationships is None:
- # # relationships = {}
- # # pt_emb = model_embedder.encode(product_text, convert_to_tensor=True)
- # # extracted = {}
- # # for attr, allowed_values in mandatory_attrs.items():
- # # scores: List[Tuple[str, float]] = []
-
- # # # Check if this is a dimension attribute
- # # is_dimension_attr = any(keyword in attr.lower() for keyword in ['dimension', 'size', 'measurement'])
-
- # # # Normalize product text once for dimension matching
- # # normalized_product_text = ProductAttributeService.normalize_dimension_text(product_text) if is_dimension_attr else ""
- # # for val in allowed_values:
- # # # For dimension attributes, try exact normalized matching first
- # # if is_dimension_attr:
- # # # Normalize the allowed value from the list
- # # normalized_val = ProductAttributeService.normalize_dimension_text(val)
-
- # # # If we have both normalized values and they match exactly, give highest score
- # # if normalized_val and normalized_product_text and normalized_val == normalized_product_text:
- # # scores.append((val, 1.0))
- # # continue
-
- # # # Also check if the normalized value appears in the original product text
- # # # This handles cases where the format might be slightly different
- # # if normalized_val:
- # # # Extract just the numbers for flexible matching
- # # val_numbers = normalized_val.split('x')
- # # # Check if both numbers appear in the product text in close proximity
- # # text_lower = product_text.lower()
- # # if all(num in text_lower for num in val_numbers):
- # # # Calculate proximity score
- # # idx1 = text_lower.find(val_numbers[0])
- # # idx2 = text_lower.find(val_numbers[1])
- # # if idx1 != -1 and idx2 != -1:
- # # distance = abs(idx2 - idx1)
- # # # If numbers are close together (within 20 characters), high score
- # # if distance < 20:
- # # scores.append((val, 0.95))
- # # continue
-
- # # # Standard semantic matching for all attributes
- # # contexts = [val, f"for {val}", f"use in {val}", f"suitable for {val}", f"{val} room"]
- # # ctx_embs = [model_embedder.encode(c, convert_to_tensor=True) for c in contexts]
- # # sem_sim = max(float(util.cos_sim(pt_emb, ce).item()) for ce in ctx_embs)
- # # lex_score = ProductAttributeService._lexical_evidence(product_text, val)
- # # final_score = sem_weight * sem_sim + lex_weight * lex_score
- # # scores.append((val, final_score))
- # # scores.sort(key=lambda x: x[1], reverse=True)
- # # best_val, best_score = scores[0]
- # # # Calculate adaptive margin if enabled
- # # effective_margin = margin
- # # if allow_multiple and use_adaptive_margin:
- # # effective_margin = ProductAttributeService.get_adaptive_margin(scores, margin)
- # # # Special handling for dimension attributes with exact matches
- # # # If we have a very high score (0.90+), it means we found an exact/normalized match
- # # # In this case, don't apply multiple selection logic - just return the best match
- # # if is_dimension_attr and best_score >= 0.90:
- # # source = ProductAttributeService.find_value_source(best_val, source_map, attr)
- # # extracted[attr] = [{"value": best_val, "source": source}]
- # # continue
- # # if not allow_multiple:
- # # source = ProductAttributeService.find_value_source(best_val, source_map, attr)
- # # extracted[attr] = [{"value": best_val, "source": source}]
- # # else:
- # # candidates = [best_val]
- # # use_base_threshold = best_score >= threshold_abs
- # # # Get semantic clusters if enabled
- # # clusters = []
- # # if use_semantic_clustering:
- # # clusters = ProductAttributeService.calculate_value_clusters(
- # # allowed_values, scores, cluster_threshold=0.4
- # # )
- # # best_cluster = next((c for c in clusters if best_val in c), [best_val])
- # # for val, sc in scores[1:]:
- # # # Skip values with very low scores
- # # min_score = 0.4 if is_dimension_attr else 0.3
- # # if sc < min_score:
- # # continue
-
- # # # Calculate dynamic threshold for this value
- # # if use_dynamic_thresholds and extracted_attrs:
- # # dynamic_thresh = ProductAttributeService.get_dynamic_threshold(
- # # attr, val, sc, extracted_attrs, relationships,
- # # mandatory_attrs, threshold_abs
- # # )
- # # else:
- # # dynamic_thresh = threshold_abs
- # # within_margin = (best_score - sc) <= effective_margin
- # # above_threshold = sc >= dynamic_thresh
- # # # Check if in same semantic cluster as best value
- # # in_cluster = False
- # # if use_semantic_clustering and clusters:
- # # in_cluster = any(best_val in c and val in c for c in clusters)
- # # if use_base_threshold:
- # # # Best score is good, require threshold OR (cluster + margin)
- # # if above_threshold and within_margin:
- # # candidates.append(val)
- # # elif in_cluster and within_margin:
- # # candidates.append(val)
- # # else:
- # # # Best score is low, use margin OR cluster logic
- # # if within_margin:
- # # candidates.append(val)
- # # elif in_cluster and (best_score - sc) <= effective_margin * 2.0:
- # # # Extended margin for cluster members
- # # candidates.append(val)
- # # # Map each candidate to its source and create array format
- # # extracted[attr] = []
- # # for candidate in candidates:
- # # source = ProductAttributeService.find_value_source(candidate, source_map, attr)
- # # extracted[attr].append({"value": candidate, "source": source})
- # # return extracted
- # # @staticmethod
- # # def extract_attributes(
- # # product_text: str,
- # # mandatory_attrs: Dict[str, List[str]],
- # # source_map: Dict[str, str] = None,
- # # model: str = None,
- # # extract_additional: bool = True,
- # # multiple: Optional[List[str]] = None,
- # # threshold_abs: float = 0.65,
- # # margin: float = 0.15,
- # # use_dynamic_thresholds: bool = True,
- # # use_adaptive_margin: bool = True,
- # # use_semantic_clustering: bool = True
- # # ) -> dict:
- # # """
- # # Use Groq LLM to extract attributes from any product type with enhanced multi-value selection.
- # # Now returns values in array format: [{"value": "...", "source": "..."}]
- # # """
-
- # # if model is None:
- # # model = settings.SUPPORTED_MODELS[0]
- # # if multiple is None:
- # # multiple = []
- # # if source_map is None:
- # # source_map = {}
- # # # Check if product text is empty or minimal
- # # if not product_text or product_text == "No product information available":
- # # return ProductAttributeService._create_error_response(
- # # "No product information provided",
- # # mandatory_attrs,
- # # extract_additional
- # # )
- # # # Create structured prompt for mandatory attributes
- # # mandatory_attr_list = []
- # # for attr_name, allowed_values in mandatory_attrs.items():
- # # mandatory_attr_list.append(f"{attr_name}: {', '.join(allowed_values)}")
- # # mandatory_attr_text = "\n".join(mandatory_attr_list)
- # # additional_instruction = ""
- # # if extract_additional:
- # # additional_instruction = """
- # # 2. Extract ADDITIONAL attributes: Identify any other relevant attributes from the product text
- # # that are NOT in the mandatory list. Only include attributes where you can find actual values
- # # in the product text. Do NOT include attributes with "Not Specified" or empty values.
-
- # # Examples of attributes to look for (only if present): Brand, Material, Size, Color, Dimensions,
- # # Weight, Features, Style, Theme, Pattern, Finish, Care Instructions, etc."""
- # # output_format = {
- # # "mandatory": {attr: "value or list of values" for attr in mandatory_attrs.keys()},
- # # }
- # # if extract_additional:
- # # output_format["additional"] = {
- # # "example_attribute_1": "actual value found",
- # # "example_attribute_2": "actual value found"
- # # }
- # # output_format["additional"]["_note"] = "Only include attributes with actual values found in text"
- # # prompt = f"""
- # # You are an intelligent product attribute extractor that works with ANY product type.
- # # TASK:
- # # 1. Extract MANDATORY attributes: For each mandatory attribute, select the most appropriate value(s)
- # # from the provided list. Choose the value(s) that best match the product description.
- # # {additional_instruction}
- # # Product Text:
- # # {product_text}
- # # Mandatory Attribute Lists (MUST select from these allowed values):
- # # {mandatory_attr_text}
- # # CRITICAL INSTRUCTIONS:
- # # - Return ONLY valid JSON, nothing else
- # # - No explanations, no markdown, no text before or after the JSON
- # # - For mandatory attributes, choose the value(s) from the provided list that best match
- # # - If a mandatory attribute cannot be determined from the product text, use "Not Specified"
- # # - Prefer exact matches from the allowed values list over generic synonyms
- # # - If multiple values are plausible, you MAY return more than one
- # # {f"- For additional attributes: ONLY include attributes where you found actual values in the product text. DO NOT include attributes with 'Not Specified', 'None', 'N/A', or empty values. If you cannot find a value for an attribute, simply don't include that attribute." if extract_additional else ""}
- # # - Be precise and only extract information that is explicitly stated or clearly implied
- # # Required Output Format:
- # # {json.dumps(output_format, indent=2)}
- # # """
- # # payload = {
- # # "model": model,
- # # "messages": [
- # # {
- # # "role": "system",
- # # "content": f"You are a precise attribute extraction model. Return ONLY valid JSON with {'mandatory and additional' if extract_additional else 'mandatory'} sections. No explanations, no markdown, no other text."
- # # },
- # # {"role": "user", "content": prompt}
- # # ],
- # # "temperature": 0.0,
- # # "max_tokens": 1500
- # # }
- # # headers = {
- # # "Authorization": f"Bearer {settings.GROQ_API_KEY}",
- # # "Content-Type": "application/json",
- # # }
- # # try:
- # # response = requests.post(
- # # settings.GROQ_API_URL,
- # # headers=headers,
- # # json=payload,
- # # timeout=30
- # # )
- # # response.raise_for_status()
- # # result_text = response.json()["choices"][0]["message"]["content"].strip()
- # # # Clean the response
- # # result_text = ProductAttributeService._clean_json_response(result_text)
- # # # Parse JSON
- # # parsed = json.loads(result_text)
- # # # Validate and restructure with source tracking
- # # parsed = ProductAttributeService._validate_response_structure(
- # # parsed, mandatory_attrs, extract_additional, source_map
- # # )
- # # # Clean up and add source tracking to additional attributes in array format
- # # if extract_additional and "additional" in parsed:
- # # cleaned_additional = {}
- # # for k, v in parsed["additional"].items():
- # # if v and v not in ["Not Specified", "None", "N/A", "", "not specified", "none", "n/a"]:
- # # if not (isinstance(v, str) and v.lower() in ["not specified", "none", "n/a", ""]):
- # # # Convert to array format if not already
- # # if isinstance(v, list):
- # # cleaned_additional[k] = []
- # # for item in v:
- # # if isinstance(item, dict) and "value" in item:
- # # if "source" not in item:
- # # item["source"] = ProductAttributeService.find_value_source(
- # # item["value"], source_map, k
- # # )
- # # cleaned_additional[k].append(item)
- # # else:
- # # source = ProductAttributeService.find_value_source(str(item), source_map, k)
- # # cleaned_additional[k].append({"value": str(item), "source": source})
- # # else:
- # # source = ProductAttributeService.find_value_source(str(v), source_map, k)
- # # cleaned_additional[k] = [{"value": str(v), "source": source}]
- # # parsed["additional"] = cleaned_additional
- # # # Calculate attribute relationships if using dynamic thresholds
- # # relationships = {}
- # # if use_dynamic_thresholds:
- # # relationships = ProductAttributeService.calculate_attribute_relationships(
- # # mandatory_attrs, product_text
- # # )
- # # # Process attributes in order, allowing earlier ones to influence later ones
- # # extracted_so_far = {}
- # # for attr in mandatory_attrs.keys():
- # # allow_multiple = attr in multiple
- # # result = ProductAttributeService.normalize_against_product_text(
- # # product_text=product_text,
- # # mandatory_attrs={attr: mandatory_attrs[attr]},
- # # source_map=source_map,
- # # threshold_abs=threshold_abs,
- # # margin=margin,
- # # allow_multiple=allow_multiple,
- # # extracted_attrs=extracted_so_far,
- # # relationships=relationships,
- # # use_dynamic_thresholds=use_dynamic_thresholds,
- # # use_adaptive_margin=use_adaptive_margin,
- # # use_semantic_clustering=use_semantic_clustering
- # # )
- # # # Result is already in array format from normalize_against_product_text
- # # parsed["mandatory"][attr] = result[attr]
- # # extracted_so_far[attr] = result[attr]
- # # return parsed
- # # except requests.exceptions.RequestException as e:
- # # return ProductAttributeService._create_error_response(
- # # str(e), mandatory_attrs, extract_additional
- # # )
- # # except json.JSONDecodeError as e:
- # # return ProductAttributeService._create_error_response(
- # # f"Invalid JSON: {str(e)}", mandatory_attrs, extract_additional, result_text
- # # )
- # # except Exception as e:
- # # return ProductAttributeService._create_error_response(
- # # str(e), mandatory_attrs, extract_additional
- # # )
- # # @staticmethod
- # # def extract_attributes_batch(
- # # products: List[Dict],
- # # mandatory_attrs: Dict[str, List[str]],
- # # model: str = None,
- # # extract_additional: bool = True,
- # # process_image: bool = True,
- # # max_workers: int = 5,
- # # multiple: Optional[List[str]] = None,
- # # threshold_abs: float = 0.65,
- # # margin: float = 0.15,
- # # use_dynamic_thresholds: bool = True,
- # # use_adaptive_margin: bool = True,
- # # use_semantic_clustering: bool = True
- # # ) -> Dict:
- # # """Extract attributes for multiple products in parallel with enhanced multi-value selection and source tracking."""
- # # results = []
- # # successful = 0
- # # failed = 0
-
- # # ocr_service = OCRService()
- # # if multiple is None:
- # # multiple = []
- # # def process_product(product_data):
- # # """Process a single product."""
- # # product_id = product_data.get('product_id', f"product_{len(results)}")
-
- # # try:
- # # # Process image if URL is provided
- # # ocr_results = None
- # # ocr_text = None
-
- # # if process_image and product_data.get('image_url'):
- # # ocr_results = ocr_service.process_image(product_data['image_url'])
-
- # # # Extract attributes from OCR
- # # if ocr_results and ocr_results.get('detected_text'):
- # # ocr_attrs = ProductAttributeService.extract_attributes_from_ocr(
- # # ocr_results, model
- # # )
- # # ocr_results['extracted_attributes'] = ocr_attrs
-
- # # # Format OCR text for combining with product text
- # # ocr_text = "\n".join([
- # # f"{item['text']} (confidence: {item['confidence']:.2f})"
- # # for item in ocr_results['detected_text']
- # # ])
-
- # # # Combine all product information with source tracking
- # # product_text, source_map = ProductAttributeService.combine_product_text(
- # # title=product_data.get('title'),
- # # short_desc=product_data.get('short_desc'),
- # # long_desc=product_data.get('long_desc'),
- # # ocr_text=ocr_text
- # # )
-
- # # # Extract attributes from combined text with enhanced features
- # # result = ProductAttributeService.extract_attributes(
- # # product_text=product_text,
- # # mandatory_attrs=mandatory_attrs,
- # # source_map=source_map,
- # # model=model,
- # # extract_additional=extract_additional,
- # # multiple=multiple,
- # # threshold_abs=threshold_abs,
- # # margin=margin,
- # # use_dynamic_thresholds=use_dynamic_thresholds,
- # # use_adaptive_margin=use_adaptive_margin,
- # # use_semantic_clustering=use_semantic_clustering
- # # )
-
- # # result['product_id'] = product_id
-
- # # # Add OCR results if available (already in correct format)
- # # if ocr_results:
- # # result['ocr_results'] = ocr_results
-
- # # # Check if extraction was successful
- # # if 'error' not in result:
- # # return result, True
- # # else:
- # # return result, False
-
- # # except Exception as e:
- # # return {
- # # 'product_id': product_id,
- # # 'mandatory': {attr: [{"value": "Not Specified", "source": "error"}] for attr in mandatory_attrs.keys()},
- # # 'additional': {} if extract_additional else None,
- # # 'error': f"Processing error: {str(e)}"
- # # }, False
- # # # Process products in parallel
- # # with ThreadPoolExecutor(max_workers=max_workers) as executor:
- # # future_to_product = {
- # # executor.submit(process_product, product): product
- # # for product in products
- # # }
-
- # # for future in as_completed(future_to_product):
- # # try:
- # # result, success = future.result()
- # # results.append(result)
- # # if success:
- # # successful += 1
- # # else:
- # # failed += 1
- # # except Exception as e:
- # # failed += 1
- # # results.append({
- # # 'product_id': 'unknown',
- # # 'mandatory': {attr: [{"value": "Not Specified", "source": "error"}] for attr in mandatory_attrs.keys()},
- # # 'additional': {} if extract_additional else None,
- # # 'error': f"Unexpected error: {str(e)}"
- # # })
- # # return {
- # # 'results': results,
- # # 'total_products': len(products),
- # # 'successful': successful,
- # # 'failed': failed
- # # }
- # # @staticmethod
- # # def _clean_json_response(text: str) -> str:
- # # """Clean LLM response to extract valid JSON."""
- # # start_idx = text.find('{')
- # # end_idx = text.rfind('}')
- # # if start_idx != -1 and end_idx != -1:
- # # text = text[start_idx:end_idx + 1]
- # # if "```json" in text:
- # # text = text.split("```json")[1].split("```")[0].strip()
- # # elif "```" in text:
- # # text = text.split("```")[1].split("```")[0].strip()
- # # if text.startswith("json"):
- # # text = text[4:].strip()
- # # return text
- # # @staticmethod
- # # def _validate_response_structure(
- # # parsed: dict,
- # # mandatory_attrs: Dict[str, List[str]],
- # # extract_additional: bool,
- # # source_map: Dict[str, str] = None
- # # ) -> dict:
- # # """Validate and fix the response structure, ensuring array format with source tracking."""
- # # if source_map is None:
- # # source_map = {}
-
- # # expected_sections = ["mandatory"]
- # # if extract_additional:
- # # expected_sections.append("additional")
- # # if not all(section in parsed for section in expected_sections):
- # # if isinstance(parsed, dict):
- # # mandatory_keys = set(mandatory_attrs.keys())
- # # mandatory = {k: v for k, v in parsed.items() if k in mandatory_keys}
- # # additional = {k: v for k, v in parsed.items() if k not in mandatory_keys}
- # # result = {"mandatory": mandatory}
- # # if extract_additional:
- # # result["additional"] = additional
- # # parsed = result
- # # else:
- # # return ProductAttributeService._create_error_response(
- # # "Invalid response structure",
- # # mandatory_attrs,
- # # extract_additional,
- # # str(parsed)
- # # )
- # # # Convert mandatory attributes to array format with source tracking
- # # if "mandatory" in parsed:
- # # converted_mandatory = {}
- # # for attr, value in parsed["mandatory"].items():
- # # if isinstance(value, list):
- # # # Already in array format, ensure each item has source
- # # converted_mandatory[attr] = []
- # # for item in value:
- # # if isinstance(item, dict) and "value" in item:
- # # # Already has proper structure
- # # if "source" not in item:
- # # item["source"] = ProductAttributeService.find_value_source(
- # # item["value"], source_map, attr
- # # )
- # # converted_mandatory[attr].append(item)
- # # else:
- # # # Convert string to proper format
- # # source = ProductAttributeService.find_value_source(str(item), source_map, attr)
- # # converted_mandatory[attr].append({"value": str(item), "source": source})
- # # else:
- # # # Single value - convert to array format
- # # source = ProductAttributeService.find_value_source(str(value), source_map, attr)
- # # converted_mandatory[attr] = [{"value": str(value), "source": source}]
-
- # # parsed["mandatory"] = converted_mandatory
- # # return parsed
- # # @staticmethod
- # # def _create_error_response(
- # # error: str,
- # # mandatory_attrs: Dict[str, List[str]],
- # # extract_additional: bool,
- # # raw_output: Optional[str] = None
- # # ) -> dict:
- # # """Create a standardized error response in array format."""
- # # response = {
- # # "mandatory": {attr: [{"value": "Not Specified", "source": "error"}] for attr in mandatory_attrs.keys()},
- # # "error": error
- # # }
- # # if extract_additional:
- # # response["additional"] = {}
- # # if raw_output:
- # # response["raw_output"] = raw_output
- # # return response
- # # ==================== services.py (OPTIMIZED) ====================
- # import requests
- # import json
- # import re
- # import hashlib
- # import logging
- # from typing import Dict, List, Optional, Tuple
- # from django.conf import settings
- # from concurrent.futures import ThreadPoolExecutor, as_completed
- # from sentence_transformers import SentenceTransformer, util
- # import numpy as np
- # from .ocr_service import OCRService
- # logger = logging.getLogger(__name__)
- # # Initialize embedding model for normalization (SINGLETON)
- # model_embedder = SentenceTransformer("all-MiniLM-L6-v2")
- # # ==================== CACHING CLASSES ====================
- # class SimpleCache:
- # """
- # In-memory cache for attribute extraction results.
- # No Redis required - uses Python dict with automatic size management.
- # """
- # _cache = {}
- # _max_size = 1000 # Maximum number of cached items
-
- # @classmethod
- # def get(cls, key: str) -> Optional[Dict]:
- # """Get cached value by key"""
- # return cls._cache.get(key)
-
- # @classmethod
- # def set(cls, key: str, value: Dict):
- # """Set cache value with automatic LRU cleanup"""
- # # Simple LRU: clear oldest 20% if cache is full
- # if len(cls._cache) >= cls._max_size:
- # items = list(cls._cache.items())
- # # Keep newest 80%
- # cls._cache = dict(items[int(cls._max_size * 0.2):])
- # logger.info(f"Cache cleaned: kept {len(cls._cache)} items")
-
- # cls._cache[key] = value
-
- # @classmethod
- # def clear(cls):
- # """Clear entire cache"""
- # cls._cache.clear()
- # logger.info("Cache cleared")
-
- # @classmethod
- # def get_stats(cls) -> Dict:
- # """Get cache statistics"""
- # return {
- # "size": len(cls._cache),
- # "max_size": cls._max_size,
- # "usage_percent": round(len(cls._cache) / cls._max_size * 100, 2)
- # }
- # class EmbeddingCache:
- # """
- # Cache for sentence transformer embeddings.
- # Significantly reduces embedding computation time.
- # """
- # _cache = {}
- # _max_size = 500
- # _hit_count = 0
- # _miss_count = 0
-
- # @classmethod
- # def get_embedding(cls, text: str, model):
- # """Get or compute embedding with caching"""
- # if text in cls._cache:
- # cls._hit_count += 1
- # return cls._cache[text]
-
- # # Cache miss - compute embedding
- # cls._miss_count += 1
-
- # # Auto-cleanup if cache is full
- # if len(cls._cache) >= cls._max_size:
- # items = list(cls._cache.items())
- # cls._cache = dict(items[int(cls._max_size * 0.3):])
- # logger.info(f"Embedding cache cleaned: kept {len(cls._cache)} items")
-
- # # Compute and cache
- # embedding = model.encode(text, convert_to_tensor=True)
- # cls._cache[text] = embedding
- # return embedding
-
- # @classmethod
- # def clear(cls):
- # """Clear embedding cache"""
- # cls._cache.clear()
- # cls._hit_count = 0
- # cls._miss_count = 0
- # logger.info("Embedding cache cleared")
-
- # @classmethod
- # def get_stats(cls) -> Dict:
- # """Get cache statistics"""
- # total = cls._hit_count + cls._miss_count
- # hit_rate = (cls._hit_count / total * 100) if total > 0 else 0
- # return {
- # "size": len(cls._cache),
- # "max_size": cls._max_size,
- # "hits": cls._hit_count,
- # "misses": cls._miss_count,
- # "hit_rate_percent": round(hit_rate, 2)
- # }
- # # ==================== MAIN SERVICE CLASS ====================
- # class ProductAttributeService:
- # """Service class for extracting product attributes using Groq LLM."""
- # @staticmethod
- # def _generate_cache_key(product_text: str, mandatory_attrs: Dict) -> str:
- # """
- # Generate a unique cache key from product text and attributes.
- # Uses MD5 hash for consistent short keys.
- # """
- # # Sort attributes for consistent hashing
- # attrs_str = json.dumps(mandatory_attrs, sort_keys=True)
- # content = f"{product_text}:{attrs_str}"
- # return f"attr_{hashlib.md5(content.encode()).hexdigest()}"
- # @staticmethod
- # def normalize_dimension_text(text: str) -> str:
- # """
- # Normalize dimension text to match format like '16x20', '20x30', etc.
- # Handles formats like '16 x 20', '16x1.5x20', '16 x 1.5 x 20 Inches'
- # Returns the normalized dimension (e.g., '16x20') or empty string if not found.
- # """
- # if not text:
- # return ""
-
- # # Convert to lowercase and remove common units
- # text = text.lower()
- # text = re.sub(r'\s*(inches|inch|in|cm|centimeters|mm|millimeters)\s*', '', text, flags=re.IGNORECASE)
-
- # # Extract all numbers from the text
- # numbers = re.findall(r'\d+\.?\d*', text)
-
- # if not numbers:
- # return ""
-
- # # Convert to floats first to handle decimals properly
- # float_numbers = []
- # for num in numbers:
- # try:
- # float_numbers.append(float(num))
- # except:
- # continue
-
- # if len(float_numbers) < 2:
- # return ""
-
- # # If we have 3 dimensions, it's likely Width x Depth x Height
- # # For wall art, depth is usually small (< 5), so we keep first and last
- # if len(float_numbers) == 3:
- # # Keep first and last values (width and height), skip middle (depth)
- # float_numbers = [float_numbers[0], float_numbers[2]]
- # elif len(float_numbers) > 3:
- # # If more than 3 dimensions, keep the two largest
- # float_numbers = sorted(float_numbers)[-2:]
- # else:
- # # Just 2 dimensions, use as is
- # float_numbers = float_numbers[:2]
-
- # # Format numbers: use integer if whole, else one decimal
- # formatted_numbers = []
- # for num in float_numbers:
- # if num.is_integer():
- # formatted_numbers.append(str(int(num)))
- # else:
- # formatted_numbers.append(f"{num:.1f}")
-
- # # Sort to ensure consistent order (smaller x larger)
- # formatted_numbers.sort(key=lambda x: float(x))
-
- # # Return formatted dimension
- # return f"{formatted_numbers[0]}x{formatted_numbers[1]}"
-
- # @staticmethod
- # def normalize_value_for_matching(value: str, attr_name: str = "") -> str:
- # """
- # Normalize a value based on its attribute type for better matching.
- # Currently handles dimensions specially, can be extended for other attributes.
- # """
- # # Check if this is a dimension-related attribute
- # dimension_keywords = ['dimension', 'size', 'measurement']
- # if any(keyword in attr_name.lower() for keyword in dimension_keywords):
- # normalized = ProductAttributeService.normalize_dimension_text(value)
- # if normalized:
- # return normalized
-
- # # For other attributes, just return cleaned value
- # return value.strip()
- # @staticmethod
- # def combine_product_text(
- # title: Optional[str] = None,
- # short_desc: Optional[str] = None,
- # long_desc: Optional[str] = None,
- # ocr_text: Optional[str] = None
- # ) -> Tuple[str, Dict[str, str]]:
- # """
- # Combine product metadata into a single text block.
- # Returns: (combined_text, source_map) where source_map tracks which text came from where
- # """
- # parts = []
- # source_map = {}
-
- # if title:
- # title_str = str(title).strip()
- # parts.append(f"Title: {title_str}")
- # source_map['title'] = title_str
- # if short_desc:
- # short_str = str(short_desc).strip()
- # parts.append(f"Description: {short_str}")
- # source_map['short_desc'] = short_str
- # if long_desc:
- # long_str = str(long_desc).strip()
- # parts.append(f"Details: {long_str}")
- # source_map['long_desc'] = long_str
- # if ocr_text:
- # parts.append(f"OCR Text: {ocr_text}")
- # source_map['ocr_text'] = ocr_text
-
- # combined = "\n".join(parts).strip()
-
- # if not combined:
- # return "No product information available", {}
-
- # return combined, source_map
- # @staticmethod
- # def find_value_source(value: str, source_map: Dict[str, str], attr_name: str = "") -> str:
- # """
- # Find which source(s) contain the given value.
- # Returns the source name(s) where the value appears.
- # Now handles normalized matching for dimensions.
- # """
- # value_lower = value.lower()
- # # Split value into tokens for better matching
- # value_tokens = set(value_lower.replace("-", " ").replace("x", " ").split())
-
- # # Check if this is a dimension-related attribute
- # is_dimension_attr = any(keyword in attr_name.lower() for keyword in ['dimension', 'size', 'measurement'])
-
- # sources_found = []
- # source_scores = {}
-
- # for source_name, source_text in source_map.items():
- # source_lower = source_text.lower()
-
- # # Check for exact phrase match first
- # if value_lower in source_lower:
- # source_scores[source_name] = 1.0
- # continue
-
- # # For dimensions, check normalized match
- # if is_dimension_attr:
- # # Normalize the value (e.g., "16x20" stays "16x20")
- # normalized_value = ProductAttributeService.normalize_dimension_text(value)
- # if not normalized_value:
- # normalized_value = value.replace("x", " ").strip()
-
- # # Normalize the source text to extract dimensions
- # normalized_source = ProductAttributeService.normalize_dimension_text(source_text)
-
- # # Direct match
- # if normalized_value == normalized_source:
- # source_scores[source_name] = 0.95
- # continue
-
- # # Also check if the dimension numbers appear in the source
- # # Extract dimension parts (e.g., "16x20" -> ["16", "20"])
- # dim_parts = normalized_value.split("x") if "x" in normalized_value else []
- # if len(dim_parts) == 2:
- # # Check if both numbers appear in the source
- # if all(part in source_text for part in dim_parts):
- # source_scores[source_name] = 0.85
- # continue
-
- # # Check for token matches
- # token_matches = sum(1 for token in value_tokens if token and token in source_lower)
- # if token_matches > 0 and len(value_tokens) > 0:
- # source_scores[source_name] = token_matches / len(value_tokens)
-
- # # Return source with highest score, or all sources if multiple have same score
- # if source_scores:
- # max_score = max(source_scores.values())
- # sources_found = [s for s, score in source_scores.items() if score == max_score]
-
- # # Prioritize: title > short_desc > long_desc > ocr_text
- # priority = ['title', 'short_desc', 'long_desc', 'ocr_text']
- # for p in priority:
- # if p in sources_found:
- # return p
-
- # return sources_found[0] if sources_found else "Not found"
-
- # return "Not found"
- # @staticmethod
- # def format_visual_attributes(visual_attributes: Dict) -> Dict:
- # """
- # Convert visual attributes to array format with source tracking.
- # Source is always 'image' for visual attributes.
- # """
- # formatted = {}
-
- # for key, value in visual_attributes.items():
- # if isinstance(value, list):
- # # Already a list (like color_palette)
- # formatted[key] = [{"value": str(item), "source": "image"} for item in value]
- # elif isinstance(value, dict):
- # # Nested dictionary - format recursively
- # nested_formatted = {}
- # for nested_key, nested_value in value.items():
- # if isinstance(nested_value, list):
- # nested_formatted[nested_key] = [{"value": str(item), "source": "image"} for item in nested_value]
- # else:
- # nested_formatted[nested_key] = [{"value": str(nested_value), "source": "image"}]
- # formatted[key] = nested_formatted
- # else:
- # # Single value
- # formatted[key] = [{"value": str(value), "source": "image"}]
-
- # return formatted
- # @staticmethod
- # def extract_attributes_from_ocr(ocr_results: Dict, model: str = None) -> Dict:
- # """Extract structured attributes from OCR text using LLM."""
- # if model is None:
- # model = settings.SUPPORTED_MODELS[0]
-
- # detected_text = ocr_results.get('detected_text', [])
- # if not detected_text:
- # return {}
-
- # # Format OCR text for prompt
- # ocr_text = "\n".join([f"Text: {item['text']}, Confidence: {item['confidence']:.2f}"
- # for item in detected_text])
-
- # prompt = f"""
- # You are an AI model that extracts structured attributes from OCR text detected on product images.
- # Given the OCR detections below, infer the possible product attributes and return them as a clean JSON object.
- # OCR Text:
- # {ocr_text}
- # Extract relevant attributes like:
- # - brand
- # - model_number
- # - size (waist_size, length, etc.)
- # - collection
- # - any other relevant product information
- # Return a JSON object with only the attributes you can confidently identify.
- # If an attribute is not present, do not include it in the response.
- # """
-
- # payload = {
- # "model": model,
- # "messages": [
- # {
- # "role": "system",
- # "content": "You are a helpful AI that extracts structured data from OCR output. Return only valid JSON."
- # },
- # {"role": "user", "content": prompt}
- # ],
- # "temperature": 0.2,
- # "max_tokens": 500
- # }
-
- # headers = {
- # "Authorization": f"Bearer {settings.GROQ_API_KEY}",
- # "Content-Type": "application/json",
- # }
-
- # try:
- # response = requests.post(
- # settings.GROQ_API_URL,
- # headers=headers,
- # json=payload,
- # timeout=30
- # )
- # response.raise_for_status()
- # result_text = response.json()["choices"][0]["message"]["content"].strip()
-
- # # Clean and parse JSON
- # result_text = ProductAttributeService._clean_json_response(result_text)
- # parsed = json.loads(result_text)
-
- # # Convert to array format with source tracking
- # formatted_attributes = {}
- # for key, value in parsed.items():
- # if key == "error":
- # continue
-
- # # Handle nested dictionaries (like size)
- # if isinstance(value, dict):
- # nested_formatted = {}
- # for nested_key, nested_value in value.items():
- # nested_formatted[nested_key] = [{"value": str(nested_value), "source": "image"}]
- # formatted_attributes[key] = nested_formatted
- # elif isinstance(value, list):
- # # Already a list, convert each item
- # formatted_attributes[key] = [{"value": str(item), "source": "image"} for item in value]
- # else:
- # # Single value
- # formatted_attributes[key] = [{"value": str(value), "source": "image"}]
-
- # return formatted_attributes
- # except Exception as e:
- # logger.error(f"OCR attribute extraction failed: {str(e)}")
- # return {"error": f"Failed to extract attributes from OCR: {str(e)}"}
- # @staticmethod
- # def calculate_attribute_relationships(
- # mandatory_attrs: Dict[str, List[str]],
- # product_text: str
- # ) -> Dict[str, float]:
- # """
- # Calculate semantic relationships between attribute values across different attributes.
- # Returns a matrix of cross-attribute value similarities.
- # """
- # # USE EMBEDDING CACHE
- # pt_emb = EmbeddingCache.get_embedding(product_text, model_embedder)
- # # Calculate similarities between all attribute values and product text
- # attr_scores = {}
- # for attr, values in mandatory_attrs.items():
- # attr_scores[attr] = {}
- # for val in values:
- # contexts = [val, f"for {val}", f"use in {val}", f"suitable for {val}"]
- # # USE EMBEDDING CACHE FOR CONTEXTS
- # ctx_embs = [EmbeddingCache.get_embedding(c, model_embedder) for c in contexts]
- # sem_sim = max(float(util.cos_sim(pt_emb, ce).item()) for ce in ctx_embs)
- # attr_scores[attr][val] = sem_sim
- # # Calculate cross-attribute value relationships
- # relationships = {}
- # attr_list = list(mandatory_attrs.keys())
- # for i, attr1 in enumerate(attr_list):
- # for attr2 in attr_list[i+1:]:
- # # Calculate pairwise similarities between values of different attributes
- # for val1 in mandatory_attrs[attr1]:
- # for val2 in mandatory_attrs[attr2]:
- # # USE EMBEDDING CACHE
- # emb1 = EmbeddingCache.get_embedding(val1, model_embedder)
- # emb2 = EmbeddingCache.get_embedding(val2, model_embedder)
- # sim = float(util.cos_sim(emb1, emb2).item())
- # # Store bidirectional relationships
- # key1 = f"{attr1}:{val1}->{attr2}:{val2}"
- # key2 = f"{attr2}:{val2}->{attr1}:{val1}"
- # relationships[key1] = sim
- # relationships[key2] = sim
- # return relationships
- # @staticmethod
- # def calculate_value_clusters(
- # values: List[str],
- # scores: List[Tuple[str, float]],
- # cluster_threshold: float = 0.4
- # ) -> List[List[str]]:
- # """
- # Group values into semantic clusters based on their similarity to each other.
- # Returns clusters of related values.
- # """
- # if len(values) <= 1:
- # return [[val] for val, _ in scores]
- # # Get embeddings for all values - USE CACHE
- # embeddings = [EmbeddingCache.get_embedding(val, model_embedder) for val in values]
- # # Calculate pairwise similarities
- # similarity_matrix = np.zeros((len(values), len(values)))
- # for i in range(len(values)):
- # for j in range(i+1, len(values)):
- # sim = float(util.cos_sim(embeddings[i], embeddings[j]).item())
- # similarity_matrix[i][j] = sim
- # similarity_matrix[j][i] = sim
- # # Simple clustering: group values with high similarity
- # clusters = []
- # visited = set()
- # for i, (val, score) in enumerate(scores):
- # if i in visited:
- # continue
- # cluster = [val]
- # visited.add(i)
- # # Find similar values
- # for j in range(len(values)):
- # if j not in visited and similarity_matrix[i][j] >= cluster_threshold:
- # cluster.append(values[j])
- # visited.add(j)
- # clusters.append(cluster)
- # return clusters
- # @staticmethod
- # def get_dynamic_threshold(
- # attr: str,
- # val: str,
- # base_score: float,
- # extracted_attrs: Dict[str, List[Dict[str, str]]],
- # relationships: Dict[str, float],
- # mandatory_attrs: Dict[str, List[str]],
- # base_threshold: float = 0.65,
- # boost_factor: float = 0.15
- # ) -> float:
- # """
- # Calculate dynamic threshold based on relationships with already-extracted attributes.
- # """
- # threshold = base_threshold
- # # Check relationships with already extracted attributes
- # max_relationship = 0.0
- # for other_attr, other_values_list in extracted_attrs.items():
- # if other_attr == attr:
- # continue
- # for other_val_dict in other_values_list:
- # other_val = other_val_dict['value']
- # key = f"{attr}:{val}->{other_attr}:{other_val}"
- # if key in relationships:
- # max_relationship = max(max_relationship, relationships[key])
- # # If strong relationship exists, lower threshold
- # if max_relationship > 0.6:
- # threshold = base_threshold - (boost_factor * max_relationship)
- # return max(0.3, threshold)
- # @staticmethod
- # def get_adaptive_margin(
- # scores: List[Tuple[str, float]],
- # base_margin: float = 0.15,
- # max_margin: float = 0.22
- # ) -> float:
- # """
- # Calculate adaptive margin based on score distribution.
- # """
- # if len(scores) < 2:
- # return base_margin
- # score_values = [s for _, s in scores]
- # best_score = score_values[0]
- # # If best score is very low, use adaptive margin but be more conservative
- # if best_score < 0.5:
- # # Calculate score spread in top 3-4 scores only (more selective)
- # top_scores = score_values[:min(4, len(score_values))]
- # score_range = max(top_scores) - min(top_scores)
- # # Very controlled margin increase
- # if score_range < 0.30:
- # # Much more conservative scaling
- # score_factor = (0.5 - best_score) * 0.35
- # adaptive = base_margin + score_factor + (0.30 - score_range) * 0.2
- # return min(adaptive, max_margin)
- # return base_margin
- # @staticmethod
- # def _lexical_evidence(product_text: str, label: str) -> float:
- # """Calculate lexical overlap between product text and label."""
- # pt = product_text.lower()
- # tokens = [t for t in label.lower().replace("-", " ").split() if t]
- # if not tokens:
- # return 0.0
- # hits = sum(1 for t in tokens if t in pt)
- # return hits / len(tokens)
- # @staticmethod
- # def normalize_against_product_text(
- # product_text: str,
- # mandatory_attrs: Dict[str, List[str]],
- # source_map: Dict[str, str],
- # threshold_abs: float = 0.65,
- # margin: float = 0.15,
- # allow_multiple: bool = False,
- # sem_weight: float = 0.8,
- # lex_weight: float = 0.2,
- # extracted_attrs: Optional[Dict[str, List[Dict[str, str]]]] = None,
- # relationships: Optional[Dict[str, float]] = None,
- # use_dynamic_thresholds: bool = True,
- # use_adaptive_margin: bool = True,
- # use_semantic_clustering: bool = True
- # ) -> dict:
- # """
- # Score each allowed value against the product_text with dynamic thresholds.
- # Returns dict with values in array format: [{"value": "...", "source": "..."}]
-
- # ⚡ OPTIMIZED: Uses EmbeddingCache for faster computation
- # """
- # if extracted_attrs is None:
- # extracted_attrs = {}
- # if relationships is None:
- # relationships = {}
- # # USE EMBEDDING CACHE - CRITICAL OPTIMIZATION
- # pt_emb = EmbeddingCache.get_embedding(product_text, model_embedder)
- # extracted = {}
- # for attr, allowed_values in mandatory_attrs.items():
- # scores: List[Tuple[str, float]] = []
-
- # # Check if this is a dimension attribute
- # is_dimension_attr = any(keyword in attr.lower() for keyword in ['dimension', 'size', 'measurement'])
-
- # # Normalize product text once for dimension matching
- # normalized_product_text = ProductAttributeService.normalize_dimension_text(product_text) if is_dimension_attr else ""
- # for val in allowed_values:
- # # For dimension attributes, try exact normalized matching first
- # if is_dimension_attr:
- # # Normalize the allowed value from the list
- # normalized_val = ProductAttributeService.normalize_dimension_text(val)
-
- # # If we have both normalized values and they match exactly, give highest score
- # if normalized_val and normalized_product_text and normalized_val == normalized_product_text:
- # scores.append((val, 1.0))
- # continue
-
- # # Also check if the normalized value appears in the original product text
- # if normalized_val:
- # val_numbers = normalized_val.split('x')
- # text_lower = product_text.lower()
- # if all(num in text_lower for num in val_numbers):
- # idx1 = text_lower.find(val_numbers[0])
- # idx2 = text_lower.find(val_numbers[1])
- # if idx1 != -1 and idx2 != -1:
- # distance = abs(idx2 - idx1)
- # if distance < 20:
- # scores.append((val, 0.95))
- # continue
-
- # # Standard semantic matching - USE EMBEDDING CACHE
- # contexts = [val, f"for {val}", f"use in {val}", f"suitable for {val}", f"{val} room"]
- # ctx_embs = [EmbeddingCache.get_embedding(c, model_embedder) for c in contexts]
- # sem_sim = max(float(util.cos_sim(pt_emb, ce).item()) for ce in ctx_embs)
- # lex_score = ProductAttributeService._lexical_evidence(product_text, val)
- # final_score = sem_weight * sem_sim + lex_weight * lex_score
- # scores.append((val, final_score))
- # scores.sort(key=lambda x: x[1], reverse=True)
- # best_val, best_score = scores[0]
- # # Calculate adaptive margin if enabled
- # effective_margin = margin
- # if allow_multiple and use_adaptive_margin:
- # effective_margin = ProductAttributeService.get_adaptive_margin(scores, margin)
- # # Special handling for dimension attributes with exact matches
- # if is_dimension_attr and best_score >= 0.90:
- # source = ProductAttributeService.find_value_source(best_val, source_map, attr)
- # extracted[attr] = [{"value": best_val, "source": source}]
- # continue
- # if not allow_multiple:
- # source = ProductAttributeService.find_value_source(best_val, source_map, attr)
- # extracted[attr] = [{"value": best_val, "source": source}]
- # else:
- # candidates = [best_val]
- # use_base_threshold = best_score >= threshold_abs
- # # Get semantic clusters if enabled
- # clusters = []
- # if use_semantic_clustering:
- # clusters = ProductAttributeService.calculate_value_clusters(
- # allowed_values, scores, cluster_threshold=0.4
- # )
- # best_cluster = next((c for c in clusters if best_val in c), [best_val])
- # for val, sc in scores[1:]:
- # min_score = 0.4 if is_dimension_attr else 0.3
- # if sc < min_score:
- # continue
-
- # if use_dynamic_thresholds and extracted_attrs:
- # dynamic_thresh = ProductAttributeService.get_dynamic_threshold(
- # attr, val, sc, extracted_attrs, relationships,
- # mandatory_attrs, threshold_abs
- # )
- # else:
- # dynamic_thresh = threshold_abs
- # within_margin = (best_score - sc) <= effective_margin
- # above_threshold = sc >= dynamic_thresh
- # in_cluster = False
- # if use_semantic_clustering and clusters:
- # in_cluster = any(best_val in c and val in c for c in clusters)
- # if use_base_threshold:
- # if above_threshold and within_margin:
- # candidates.append(val)
- # elif in_cluster and within_margin:
- # candidates.append(val)
- # else:
- # if within_margin:
- # candidates.append(val)
- # elif in_cluster and (best_score - sc) <= effective_margin * 2.0:
- # candidates.append(val)
- # extracted[attr] = []
- # for candidate in candidates:
- # source = ProductAttributeService.find_value_source(candidate, source_map, attr)
- # extracted[attr].append({"value": candidate, "source": source})
- # return extracted
- # @staticmethod
- # def extract_attributes(
- # product_text: str,
- # mandatory_attrs: Dict[str, List[str]],
- # source_map: Dict[str, str] = None,
- # model: str = None,
- # extract_additional: bool = True,
- # multiple: Optional[List[str]] = None,
- # threshold_abs: float = 0.65,
- # margin: float = 0.15,
- # use_dynamic_thresholds: bool = True,
- # use_adaptive_margin: bool = True,
- # use_semantic_clustering: bool = True,
- # use_cache: bool = True # ⚡ NEW: Enable/disable caching
- # ) -> dict:
- # """
- # Use Groq LLM to extract attributes from any product type with enhanced multi-value selection.
- # Now returns values in array format: [{"value": "...", "source": "..."}]
-
- # ⚡ OPTIMIZED: Added caching layer for faster repeated requests
- # """
-
- # if model is None:
- # model = settings.SUPPORTED_MODELS[0]
- # if multiple is None:
- # multiple = []
- # if source_map is None:
- # source_map = {}
- # # Check if product text is empty or minimal
- # if not product_text or product_text == "No product information available":
- # return ProductAttributeService._create_error_response(
- # "No product information provided",
- # mandatory_attrs,
- # extract_additional
- # )
- # # ⚡ CHECK CACHE FIRST
- # if use_cache:
- # cache_key = ProductAttributeService._generate_cache_key(product_text, mandatory_attrs)
- # cached_result = SimpleCache.get(cache_key)
- # if cached_result:
- # logger.info(f"✓ Cache hit - returning cached result")
- # return cached_result
- # # Create structured prompt for mandatory attributes
- # mandatory_attr_list = []
- # for attr_name, allowed_values in mandatory_attrs.items():
- # mandatory_attr_list.append(f"{attr_name}: {', '.join(allowed_values)}")
- # mandatory_attr_text = "\n".join(mandatory_attr_list)
- # additional_instruction = ""
- # if extract_additional:
- # additional_instruction = """
- # 2. Extract ADDITIONAL attributes: Identify any other relevant attributes from the product text
- # that are NOT in the mandatory list. Only include attributes where you can find actual values
- # in the product text. Do NOT include attributes with "Not Specified" or empty values.
-
- # Examples of attributes to look for (only if present): Brand, Material, Size, Color, Dimensions,
- # Weight, Features, Style, Theme, Pattern, Finish, Care Instructions, etc."""
- # output_format = {
- # "mandatory": {attr: "value or list of values" for attr in mandatory_attrs.keys()},
- # }
- # if extract_additional:
- # output_format["additional"] = {
- # "example_attribute_1": "actual value found",
- # "example_attribute_2": "actual value found"
- # }
- # output_format["additional"]["_note"] = "Only include attributes with actual values found in text"
- # prompt = f"""
- # You are an intelligent product attribute extractor that works with ANY product type.
- # TASK:
- # 1. Extract MANDATORY attributes: For each mandatory attribute, select the most appropriate value(s)
- # from the provided list. Choose the value(s) that best match the product description.
- # {additional_instruction}
- # Product Text:
- # {product_text}
- # Mandatory Attribute Lists (MUST select from these allowed values):
- # {mandatory_attr_text}
- # CRITICAL INSTRUCTIONS:
- # - Return ONLY valid JSON, nothing else
- # - No explanations, no markdown, no text before or after the JSON
- # - For mandatory attributes, choose the value(s) from the provided list that best match
- # - If a mandatory attribute cannot be determined from the product text, use "Not Specified"
- # - Prefer exact matches from the allowed values list over generic synonyms
- # - If multiple values are plausible, you MAY return more than one
- # {f"- For additional attributes: ONLY include attributes where you found actual values in the product text. DO NOT include attributes with 'Not Specified', 'None', 'N/A', or empty values. If you cannot find a value for an attribute, simply don't include that attribute." if extract_additional else ""}
- # - Be precise and only extract information that is explicitly stated or clearly implied
- # Required Output Format:
- # {json.dumps(output_format, indent=2)}
- # """
- # payload = {
- # "model": model,
- # "messages": [
- # {
- # "role": "system",
- # "content": f"You are a precise attribute extraction model. Return ONLY valid JSON with {'mandatory and additional' if extract_additional else 'mandatory'} sections. No explanations, no markdown, no other text."
- # },
- # {"role": "user", "content": prompt}
- # ],
- # "temperature": 0.0,
- # "max_tokens": 1500
- # }
- # headers = {
- # "Authorization": f"Bearer {settings.GROQ_API_KEY}",
- # "Content-Type": "application/json",
- # }
- # try:
- # response = requests.post(
- # settings.GROQ_API_URL,
- # headers=headers,
- # json=payload,
- # timeout=30
- # )
- # response.raise_for_status()
- # result_text = response.json()["choices"][0]["message"]["content"].strip()
- # # Clean the response
- # result_text = ProductAttributeService._clean_json_response(result_text)
- # # Parse JSON
- # parsed = json.loads(result_text)
- # # Validate and restructure with source tracking
- # parsed = ProductAttributeService._validate_response_structure(
- # parsed, mandatory_attrs, extract_additional, source_map
- # )
- # # Clean up and add source tracking to additional attributes in array format
- # if extract_additional and "additional" in parsed:
- # cleaned_additional = {}
- # for k, v in parsed["additional"].items():
- # if v and v not in ["Not Specified", "None", "N/A", "", "not specified", "none", "n/a"]:
- # if not (isinstance(v, str) and v.lower() in ["not specified", "none", "n/a", ""]):
- # # Convert to array format if not already
- # if isinstance(v, list):
- # cleaned_additional[k] = []
- # for item in v:
- # if isinstance(item, dict) and "value" in item:
- # if "source" not in item:
- # item["source"] = ProductAttributeService.find_value_source(
- # item["value"], source_map, k
- # )
- # cleaned_additional[k].append(item)
- # else:
- # source = ProductAttributeService.find_value_source(str(item), source_map, k)
- # cleaned_additional[k].append({"value": str(item), "source": source})
- # else:
- # source = ProductAttributeService.find_value_source(str(v), source_map, k)
- # cleaned_additional[k] = [{"value": str(v), "source": source}]
- # parsed["additional"] = cleaned_additional
- # # Calculate attribute relationships if using dynamic thresholds
- # relationships = {}
- # if use_dynamic_thresholds:
- # relationships = ProductAttributeService.calculate_attribute_relationships(
- # mandatory_attrs, product_text
- # )
- # # Process attributes in order, allowing earlier ones to influence later ones
- # extracted_so_far = {}
- # for attr in mandatory_attrs.keys():
- # allow_multiple = attr in multiple
- # result = ProductAttributeService.normalize_against_product_text(
- # product_text=product_text,
- # mandatory_attrs={attr: mandatory_attrs[attr]},
- # source_map=source_map,
- # threshold_abs=threshold_abs,
- # margin=margin,
- # allow_multiple=allow_multiple,
- # extracted_attrs=extracted_so_far,
- # relationships=relationships,
- # use_dynamic_thresholds=use_dynamic_thresholds,
- # use_adaptive_margin=use_adaptive_margin,
- # use_semantic_clustering=use_semantic_clustering
- # )
- # # Result is already in array format from normalize_against_product_text
- # parsed["mandatory"][attr] = result[attr]
- # extracted_so_far[attr] = result[attr]
- # # ⚡ CACHE THE RESULT
- # if use_cache:
- # SimpleCache.set(cache_key, parsed)
- # logger.info(f"✓ Cached extraction result")
- # return parsed
- # except requests.exceptions.RequestException as e:
- # logger.error(f"Request exception: {str(e)}")
- # return ProductAttributeService._create_error_response(
- # str(e), mandatory_attrs, extract_additional
- # )
- # except json.JSONDecodeError as e:
- # logger.error(f"JSON decode error: {str(e)}")
- # return ProductAttributeService._create_error_response(
- # f"Invalid JSON: {str(e)}", mandatory_attrs, extract_additional, result_text
- # )
- # except Exception as e:
- # logger.error(f"Unexpected error: {str(e)}")
- # return ProductAttributeService._create_error_response(
- # str(e), mandatory_attrs, extract_additional
- # )
- # @staticmethod
- # def extract_attributes_batch(
- # products: List[Dict],
- # mandatory_attrs: Dict[str, List[str]],
- # model: str = None,
- # extract_additional: bool = True,
- # process_image: bool = True,
- # max_workers: int = 5,
- # multiple: Optional[List[str]] = None,
- # threshold_abs: float = 0.65,
- # margin: float = 0.15,
- # use_dynamic_thresholds: bool = True,
- # use_adaptive_margin: bool = True,
- # use_semantic_clustering: bool = True,
- # use_cache: bool = True # ⚡ NEW: Enable caching for batch processing
- # ) -> Dict:
- # """
- # Extract attributes for multiple products in parallel with enhanced multi-value selection and source tracking.
- # ⚡ OPTIMIZED: Added caching support for batch operations
- # """
- # results = []
- # successful = 0
- # failed = 0
-
- # ocr_service = OCRService()
- # if multiple is None:
- # multiple = []
- # def process_product(product_data):
- # """Process a single product."""
- # product_id = product_data.get('product_id', f"product_{len(results)}")
-
- # try:
- # # Process image if URL is provided
- # ocr_results = None
- # ocr_text = None
-
- # if process_image and product_data.get('image_url'):
- # ocr_results = ocr_service.process_image(product_data['image_url'])
-
- # # Extract attributes from OCR
- # if ocr_results and ocr_results.get('detected_text'):
- # ocr_attrs = ProductAttributeService.extract_attributes_from_ocr(
- # ocr_results, model
- # )
- # ocr_results['extracted_attributes'] = ocr_attrs
-
- # # Format OCR text for combining with product text
- # ocr_text = "\n".join([
- # f"{item['text']} (confidence: {item['confidence']:.2f})"
- # for item in ocr_results['detected_text']
- # ])
-
- # # Combine all product information with source tracking
- # product_text, source_map = ProductAttributeService.combine_product_text(
- # title=product_data.get('title'),
- # short_desc=product_data.get('short_desc'),
- # long_desc=product_data.get('long_desc'),
- # ocr_text=ocr_text
- # )
-
- # # Extract attributes from combined text with enhanced features
- # result = ProductAttributeService.extract_attributes(
- # product_text=product_text,
- # mandatory_attrs=mandatory_attrs,
- # source_map=source_map,
- # model=model,
- # extract_additional=extract_additional,
- # multiple=multiple,
- # threshold_abs=threshold_abs,
- # margin=margin,
- # use_dynamic_thresholds=use_dynamic_thresholds,
- # use_adaptive_margin=use_adaptive_margin,
- # use_semantic_clustering=use_semantic_clustering,
- # use_cache=use_cache # ⚡ Pass cache flag
- # )
-
- # result['product_id'] = product_id
-
- # # Add OCR results if available (already in correct format)
- # if ocr_results:
- # result['ocr_results'] = ocr_results
-
- # # Check if extraction was successful
- # if 'error' not in result:
- # return result, True
- # else:
- # return result, False
-
- # except Exception as e:
- # logger.error(f"Error processing product {product_id}: {str(e)}")
- # return {
- # 'product_id': product_id,
- # 'mandatory': {attr: [{"value": "Not Specified", "source": "error"}] for attr in mandatory_attrs.keys()},
- # 'additional': {} if extract_additional else None,
- # 'error': f"Processing error: {str(e)}"
- # }, False
- # # Process products in parallel
- # with ThreadPoolExecutor(max_workers=max_workers) as executor:
- # future_to_product = {
- # executor.submit(process_product, product): product
- # for product in products
- # }
-
- # for future in as_completed(future_to_product):
- # try:
- # result, success = future.result()
- # results.append(result)
- # if success:
- # successful += 1
- # else:
- # failed += 1
- # except Exception as e:
- # logger.error(f"Future execution error: {str(e)}")
- # failed += 1
- # results.append({
- # 'product_id': 'unknown',
- # 'mandatory': {attr: [{"value": "Not Specified", "source": "error"}] for attr in mandatory_attrs.keys()},
- # 'additional': {} if extract_additional else None,
- # 'error': f"Unexpected error: {str(e)}"
- # })
- # return {
- # 'results': results,
- # 'total_products': len(products),
- # 'successful': successful,
- # 'failed': failed,
- # 'cache_stats': SimpleCache.get_stats(), # ⚡ Include cache statistics
- # 'embedding_cache_stats': EmbeddingCache.get_stats() # ⚡ Include embedding cache stats
- # }
- # @staticmethod
- # def _clean_json_response(text: str) -> str:
- # """Clean LLM response to extract valid JSON."""
- # start_idx = text.find('{')
- # end_idx = text.rfind('}')
- # if start_idx != -1 and end_idx != -1:
- # text = text[start_idx:end_idx + 1]
- # if "```json" in text:
- # text = text.split("```json")[1].split("```")[0].strip()
- # elif "```" in text:
- # text = text.split("```")[1].split("```")[0].strip()
- # if text.startswith("json"):
- # text = text[4:].strip()
- # return text
- # @staticmethod
- # def _validate_response_structure(
- # parsed: dict,
- # mandatory_attrs: Dict[str, List[str]],
- # extract_additional: bool,
- # source_map: Dict[str, str] = None
- # ) -> dict:
- # """Validate and fix the response structure, ensuring array format with source tracking."""
- # if source_map is None:
- # source_map = {}
-
- # expected_sections = ["mandatory"]
- # if extract_additional:
- # expected_sections.append("additional")
- # if not all(section in parsed for section in expected_sections):
- # if isinstance(parsed, dict):
- # mandatory_keys = set(mandatory_attrs.keys())
- # mandatory = {k: v for k, v in parsed.items() if k in mandatory_keys}
- # additional = {k: v for k, v in parsed.items() if k not in mandatory_keys}
- # result = {"mandatory": mandatory}
- # if extract_additional:
- # result["additional"] = additional
- # parsed = result
- # else:
- # return ProductAttributeService._create_error_response(
- # "Invalid response structure",
- # mandatory_attrs,
- # extract_additional,
- # str(parsed)
- # )
- # # Convert mandatory attributes to array format with source tracking
- # if "mandatory" in parsed:
- # converted_mandatory = {}
- # for attr, value in parsed["mandatory"].items():
- # if isinstance(value, list):
- # # Already in array format, ensure each item has source
- # converted_mandatory[attr] = []
- # for item in value:
- # if isinstance(item, dict) and "value" in item:
- # # Already has proper structure
- # if "source" not in item:
- # item["source"] = ProductAttributeService.find_value_source(
- # item["value"], source_map, attr
- # )
- # converted_mandatory[attr].append(item)
- # else:
- # # Convert string to proper format
- # source = ProductAttributeService.find_value_source(str(item), source_map, attr)
- # converted_mandatory[attr].append({"value": str(item), "source": source})
- # else:
- # # Single value - convert to array format
- # source = ProductAttributeService.find_value_source(str(value), source_map, attr)
- # converted_mandatory[attr] = [{"value": str(value), "source": source}]
-
- # parsed["mandatory"] = converted_mandatory
- # return parsed
- # @staticmethod
- # def _create_error_response(
- # error: str,
- # mandatory_attrs: Dict[str, List[str]],
- # extract_additional: bool,
- # raw_output: Optional[str] = None
- # ) -> dict:
- # """Create a standardized error response in array format."""
- # response = {
- # "mandatory": {attr: [{"value": "Not Specified", "source": "error"}] for attr in mandatory_attrs.keys()},
- # "error": error
- # }
- # if extract_additional:
- # response["additional"] = {}
- # if raw_output:
- # response["raw_output"] = raw_output
- # return response
- # @staticmethod
- # def get_cache_stats() -> Dict:
- # """
- # Get statistics for both caches.
- # ⚡ NEW: Utility method to monitor cache performance
- # """
- # return {
- # "simple_cache": SimpleCache.get_stats(),
- # "embedding_cache": EmbeddingCache.get_stats()
- # }
- # @staticmethod
- # def clear_all_caches():
- # """
- # Clear both caches.
- # ⚡ NEW: Utility method to reset caches when needed
- # """
- # SimpleCache.clear()
- # EmbeddingCache.clear()
- # logger.info("All caches cleared")
- # ==================== services.py (PERFORMANCE OPTIMIZED) ====================
- import requests
- import json
- import re
- import hashlib
- import logging
- from typing import Dict, List, Optional, Tuple
- from django.conf import settings
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from sentence_transformers import SentenceTransformer, util
- import numpy as np
- logger = logging.getLogger(__name__)
- # ⚡ CRITICAL FIX: Initialize embedding model ONCE at module level
- print("Loading sentence transformer model (one-time initialization)...")
- model_embedder = SentenceTransformer("all-MiniLM-L6-v2")
- # Disable progress bars to prevent "Batches: 100%" spam
- import os
- os.environ['TOKENIZERS_PARALLELISM'] = 'false'
- print("✓ Model loaded successfully")
- # ==================== CACHING CLASSES ====================
- class SimpleCache:
- """In-memory cache for attribute extraction results."""
- _cache = {}
- _max_size = 1000
-
- @classmethod
- def get(cls, key: str) -> Optional[Dict]:
- return cls._cache.get(key)
-
- @classmethod
- def set(cls, key: str, value: Dict):
- if len(cls._cache) >= cls._max_size:
- items = list(cls._cache.items())
- cls._cache = dict(items[int(cls._max_size * 0.2):])
- cls._cache[key] = value
-
- @classmethod
- def clear(cls):
- cls._cache.clear()
-
- @classmethod
- def get_stats(cls) -> Dict:
- return {
- "size": len(cls._cache),
- "max_size": cls._max_size,
- "usage_percent": round(len(cls._cache) / cls._max_size * 100, 2)
- }
- class EmbeddingCache:
- """Cache for sentence transformer embeddings."""
- _cache = {}
- _max_size = 500
- _hit_count = 0
- _miss_count = 0
-
- @classmethod
- def get_embedding(cls, text: str, model):
- """Get or compute embedding with caching"""
- if text in cls._cache:
- cls._hit_count += 1
- return cls._cache[text]
-
- cls._miss_count += 1
-
- if len(cls._cache) >= cls._max_size:
- items = list(cls._cache.items())
- cls._cache = dict(items[int(cls._max_size * 0.3):])
-
- # ⚡ CRITICAL: Disable verbose output
- import warnings
- with warnings.catch_warnings():
- warnings.simplefilter("ignore")
- embedding = model.encode(text, convert_to_tensor=True, show_progress_bar=False)
-
- cls._cache[text] = embedding
- return embedding
-
- @classmethod
- def clear(cls):
- cls._cache.clear()
- cls._hit_count = 0
- cls._miss_count = 0
-
- @classmethod
- def get_stats(cls) -> Dict:
- total = cls._hit_count + cls._miss_count
- hit_rate = (cls._hit_count / total * 100) if total > 0 else 0
- return {
- "size": len(cls._cache),
- "hits": cls._hit_count,
- "misses": cls._miss_count,
- "hit_rate_percent": round(hit_rate, 2)
- }
- # ==================== MAIN SERVICE CLASS ====================
- class ProductAttributeService:
- """Service class for extracting product attributes using Groq LLM."""
- @staticmethod
- def _generate_cache_key(product_text: str, mandatory_attrs: Dict) -> str:
- """Generate cache key from product text and attributes."""
- attrs_str = json.dumps(mandatory_attrs, sort_keys=True)
- content = f"{product_text}:{attrs_str}"
- return f"attr_{hashlib.md5(content.encode()).hexdigest()}"
- @staticmethod
- def normalize_dimension_text(text: str) -> str:
- """Normalize dimension text to format like '16x20'."""
- if not text:
- return ""
-
- text = text.lower()
- text = re.sub(r'\s*(inches|inch|in|cm|centimeters|mm|millimeters)\s*', '', text, flags=re.IGNORECASE)
-
- numbers = re.findall(r'\d+\.?\d*', text)
- if not numbers:
- return ""
-
- float_numbers = []
- for num in numbers:
- try:
- float_numbers.append(float(num))
- except:
- continue
-
- if len(float_numbers) < 2:
- return ""
-
- if len(float_numbers) == 3:
- float_numbers = [float_numbers[0], float_numbers[2]]
- elif len(float_numbers) > 3:
- float_numbers = sorted(float_numbers)[-2:]
- else:
- float_numbers = float_numbers[:2]
-
- formatted_numbers = []
- for num in float_numbers:
- if num.is_integer():
- formatted_numbers.append(str(int(num)))
- else:
- formatted_numbers.append(f"{num:.1f}")
-
- formatted_numbers.sort(key=lambda x: float(x))
- return f"{formatted_numbers[0]}x{formatted_numbers[1]}"
-
- @staticmethod
- def normalize_value_for_matching(value: str, attr_name: str = "") -> str:
- """Normalize a value based on its attribute type."""
- dimension_keywords = ['dimension', 'size', 'measurement']
- if any(keyword in attr_name.lower() for keyword in dimension_keywords):
- normalized = ProductAttributeService.normalize_dimension_text(value)
- if normalized:
- return normalized
- return value.strip()
- @staticmethod
- def combine_product_text(
- title: Optional[str] = None,
- short_desc: Optional[str] = None,
- long_desc: Optional[str] = None,
- ocr_text: Optional[str] = None
- ) -> Tuple[str, Dict[str, str]]:
- """Combine product metadata into a single text block."""
- parts = []
- source_map = {}
-
- if title:
- title_str = str(title).strip()
- parts.append(f"Title: {title_str}")
- source_map['title'] = title_str
- if short_desc:
- short_str = str(short_desc).strip()
- parts.append(f"Description: {short_str}")
- source_map['short_desc'] = short_str
- if long_desc:
- long_str = str(long_desc).strip()
- parts.append(f"Details: {long_str}")
- source_map['long_desc'] = long_str
- if ocr_text:
- parts.append(f"OCR Text: {ocr_text}")
- source_map['ocr_text'] = ocr_text
-
- combined = "\n".join(parts).strip()
-
- if not combined:
- return "No product information available", {}
-
- return combined, source_map
- @staticmethod
- def find_value_source(value: str, source_map: Dict[str, str], attr_name: str = "") -> str:
- """Find which source(s) contain the given value."""
- value_lower = value.lower()
- value_tokens = set(value_lower.replace("-", " ").replace("x", " ").split())
-
- is_dimension_attr = any(keyword in attr_name.lower() for keyword in ['dimension', 'size', 'measurement'])
-
- sources_found = []
- source_scores = {}
-
- for source_name, source_text in source_map.items():
- source_lower = source_text.lower()
-
- if value_lower in source_lower:
- source_scores[source_name] = 1.0
- continue
-
- if is_dimension_attr:
- normalized_value = ProductAttributeService.normalize_dimension_text(value)
- if not normalized_value:
- normalized_value = value.replace("x", " ").strip()
-
- normalized_source = ProductAttributeService.normalize_dimension_text(source_text)
-
- if normalized_value == normalized_source:
- source_scores[source_name] = 0.95
- continue
-
- dim_parts = normalized_value.split("x") if "x" in normalized_value else []
- if len(dim_parts) == 2:
- if all(part in source_text for part in dim_parts):
- source_scores[source_name] = 0.85
- continue
-
- token_matches = sum(1 for token in value_tokens if token and token in source_lower)
- if token_matches > 0 and len(value_tokens) > 0:
- source_scores[source_name] = token_matches / len(value_tokens)
-
- if source_scores:
- max_score = max(source_scores.values())
- sources_found = [s for s, score in source_scores.items() if score == max_score]
-
- priority = ['title', 'short_desc', 'long_desc', 'ocr_text']
- for p in priority:
- if p in sources_found:
- return p
-
- return sources_found[0] if sources_found else "Not found"
-
- return "Not found"
- @staticmethod
- def format_visual_attributes(visual_attributes: Dict) -> Dict:
- """Convert visual attributes to array format with source tracking."""
- formatted = {}
-
- for key, value in visual_attributes.items():
- if isinstance(value, list):
- formatted[key] = [{"value": str(item), "source": "image"} for item in value]
- elif isinstance(value, dict):
- nested_formatted = {}
- for nested_key, nested_value in value.items():
- if isinstance(nested_value, list):
- nested_formatted[nested_key] = [{"value": str(item), "source": "image"} for item in nested_value]
- else:
- nested_formatted[nested_key] = [{"value": str(nested_value), "source": "image"}]
- formatted[key] = nested_formatted
- else:
- formatted[key] = [{"value": str(value), "source": "image"}]
-
- return formatted
- @staticmethod
- def extract_attributes_from_ocr(ocr_results: Dict, model: str = None) -> Dict:
- """Extract structured attributes from OCR text using LLM."""
- if model is None:
- model = settings.SUPPORTED_MODELS[0]
-
- detected_text = ocr_results.get('detected_text', [])
- if not detected_text:
- return {}
-
- ocr_text = "\n".join([f"Text: {item['text']}, Confidence: {item['confidence']:.2f}"
- for item in detected_text])
-
- prompt = f"""
- You are an AI model that extracts structured attributes from OCR text detected on product images.
- Given the OCR detections below, infer the possible product attributes and return them as a clean JSON object.
- OCR Text:
- {ocr_text}
- Extract relevant attributes like:
- - brand
- - model_number
- - size (waist_size, length, etc.)
- - collection
- - any other relevant product information
- Return a JSON object with only the attributes you can confidently identify.
- If an attribute is not present, do not include it in the response.
- """
-
- payload = {
- "model": model,
- "messages": [
- {
- "role": "system",
- "content": "You are a helpful AI that extracts structured data from OCR output. Return only valid JSON."
- },
- {"role": "user", "content": prompt}
- ],
- "temperature": 0.2,
- "max_tokens": 500
- }
-
- headers = {
- "Authorization": f"Bearer {settings.GROQ_API_KEY}",
- "Content-Type": "application/json",
- }
-
- try:
- response = requests.post(
- settings.GROQ_API_URL,
- headers=headers,
- json=payload,
- timeout=30
- )
- response.raise_for_status()
- result_text = response.json()["choices"][0]["message"]["content"].strip()
-
- result_text = ProductAttributeService._clean_json_response(result_text)
- parsed = json.loads(result_text)
-
- formatted_attributes = {}
- for key, value in parsed.items():
- if key == "error":
- continue
-
- if isinstance(value, dict):
- nested_formatted = {}
- for nested_key, nested_value in value.items():
- nested_formatted[nested_key] = [{"value": str(nested_value), "source": "image"}]
- formatted_attributes[key] = nested_formatted
- elif isinstance(value, list):
- formatted_attributes[key] = [{"value": str(item), "source": "image"} for item in value]
- else:
- formatted_attributes[key] = [{"value": str(value), "source": "image"}]
-
- return formatted_attributes
- except Exception as e:
- logger.error(f"OCR attribute extraction failed: {str(e)}")
- return {"error": f"Failed to extract attributes from OCR: {str(e)}"}
- @staticmethod
- def calculate_attribute_relationships(
- mandatory_attrs: Dict[str, List[str]],
- product_text: str
- ) -> Dict[str, float]:
- """Calculate semantic relationships between attribute values."""
- pt_emb = EmbeddingCache.get_embedding(product_text, model_embedder)
- attr_scores = {}
- for attr, values in mandatory_attrs.items():
- attr_scores[attr] = {}
- for val in values:
- contexts = [val, f"for {val}", f"use in {val}", f"suitable for {val}"]
- ctx_embs = [EmbeddingCache.get_embedding(c, model_embedder) for c in contexts]
- sem_sim = max(float(util.cos_sim(pt_emb, ce).item()) for ce in ctx_embs)
- attr_scores[attr][val] = sem_sim
- relationships = {}
- attr_list = list(mandatory_attrs.keys())
- for i, attr1 in enumerate(attr_list):
- for attr2 in attr_list[i+1:]:
- for val1 in mandatory_attrs[attr1]:
- for val2 in mandatory_attrs[attr2]:
- emb1 = EmbeddingCache.get_embedding(val1, model_embedder)
- emb2 = EmbeddingCache.get_embedding(val2, model_embedder)
- sim = float(util.cos_sim(emb1, emb2).item())
- key1 = f"{attr1}:{val1}->{attr2}:{val2}"
- key2 = f"{attr2}:{val2}->{attr1}:{val1}"
- relationships[key1] = sim
- relationships[key2] = sim
- return relationships
- @staticmethod
- def calculate_value_clusters(
- values: List[str],
- scores: List[Tuple[str, float]],
- cluster_threshold: float = 0.4
- ) -> List[List[str]]:
- """Group values into semantic clusters."""
- if len(values) <= 1:
- return [[val] for val, _ in scores]
- embeddings = [EmbeddingCache.get_embedding(val, model_embedder) for val in values]
- similarity_matrix = np.zeros((len(values), len(values)))
- for i in range(len(values)):
- for j in range(i+1, len(values)):
- sim = float(util.cos_sim(embeddings[i], embeddings[j]).item())
- similarity_matrix[i][j] = sim
- similarity_matrix[j][i] = sim
- clusters = []
- visited = set()
- for i, (val, score) in enumerate(scores):
- if i in visited:
- continue
- cluster = [val]
- visited.add(i)
- for j in range(len(values)):
- if j not in visited and similarity_matrix[i][j] >= cluster_threshold:
- cluster.append(values[j])
- visited.add(j)
- clusters.append(cluster)
- return clusters
- @staticmethod
- def get_dynamic_threshold(
- attr: str,
- val: str,
- base_score: float,
- extracted_attrs: Dict[str, List[Dict[str, str]]],
- relationships: Dict[str, float],
- mandatory_attrs: Dict[str, List[str]],
- base_threshold: float = 0.65,
- boost_factor: float = 0.15
- ) -> float:
- """Calculate dynamic threshold based on relationships."""
- threshold = base_threshold
- max_relationship = 0.0
- for other_attr, other_values_list in extracted_attrs.items():
- if other_attr == attr:
- continue
- for other_val_dict in other_values_list:
- other_val = other_val_dict['value']
- key = f"{attr}:{val}->{other_attr}:{other_val}"
- if key in relationships:
- max_relationship = max(max_relationship, relationships[key])
- if max_relationship > 0.6:
- threshold = base_threshold - (boost_factor * max_relationship)
- return max(0.3, threshold)
- @staticmethod
- def get_adaptive_margin(
- scores: List[Tuple[str, float]],
- base_margin: float = 0.15,
- max_margin: float = 0.22
- ) -> float:
- """Calculate adaptive margin based on score distribution."""
- if len(scores) < 2:
- return base_margin
- score_values = [s for _, s in scores]
- best_score = score_values[0]
- if best_score < 0.5:
- top_scores = score_values[:min(4, len(score_values))]
- score_range = max(top_scores) - min(top_scores)
- if score_range < 0.30:
- score_factor = (0.5 - best_score) * 0.35
- adaptive = base_margin + score_factor + (0.30 - score_range) * 0.2
- return min(adaptive, max_margin)
- return base_margin
- @staticmethod
- def _lexical_evidence(product_text: str, label: str) -> float:
- """Calculate lexical overlap between product text and label."""
- pt = product_text.lower()
- tokens = [t for t in label.lower().replace("-", " ").split() if t]
- if not tokens:
- return 0.0
- hits = sum(1 for t in tokens if t in pt)
- return hits / len(tokens)
- @staticmethod
- def normalize_against_product_text(
- product_text: str,
- mandatory_attrs: Dict[str, List[str]],
- source_map: Dict[str, str],
- threshold_abs: float = 0.65,
- margin: float = 0.15,
- allow_multiple: bool = False,
- sem_weight: float = 0.8,
- lex_weight: float = 0.2,
- extracted_attrs: Optional[Dict[str, List[Dict[str, str]]]] = None,
- relationships: Optional[Dict[str, float]] = None,
- use_dynamic_thresholds: bool = True,
- use_adaptive_margin: bool = True,
- use_semantic_clustering: bool = True
- ) -> dict:
- """Score each allowed value against the product_text."""
- if extracted_attrs is None:
- extracted_attrs = {}
- if relationships is None:
- relationships = {}
- pt_emb = EmbeddingCache.get_embedding(product_text, model_embedder)
- extracted = {}
- for attr, allowed_values in mandatory_attrs.items():
- scores: List[Tuple[str, float]] = []
-
- is_dimension_attr = any(keyword in attr.lower() for keyword in ['dimension', 'size', 'measurement'])
- normalized_product_text = ProductAttributeService.normalize_dimension_text(product_text) if is_dimension_attr else ""
- for val in allowed_values:
- if is_dimension_attr:
- normalized_val = ProductAttributeService.normalize_dimension_text(val)
-
- if normalized_val and normalized_product_text and normalized_val == normalized_product_text:
- scores.append((val, 1.0))
- continue
-
- if normalized_val:
- val_numbers = normalized_val.split('x')
- text_lower = product_text.lower()
- if all(num in text_lower for num in val_numbers):
- idx1 = text_lower.find(val_numbers[0])
- idx2 = text_lower.find(val_numbers[1])
- if idx1 != -1 and idx2 != -1:
- distance = abs(idx2 - idx1)
- if distance < 20:
- scores.append((val, 0.95))
- continue
-
- contexts = [val, f"for {val}", f"use in {val}", f"suitable for {val}", f"{val} room"]
- ctx_embs = [EmbeddingCache.get_embedding(c, model_embedder) for c in contexts]
- sem_sim = max(float(util.cos_sim(pt_emb, ce).item()) for ce in ctx_embs)
- lex_score = ProductAttributeService._lexical_evidence(product_text, val)
- final_score = sem_weight * sem_sim + lex_weight * lex_score
- scores.append((val, final_score))
- scores.sort(key=lambda x: x[1], reverse=True)
- best_val, best_score = scores[0]
- effective_margin = margin
- if allow_multiple and use_adaptive_margin:
- effective_margin = ProductAttributeService.get_adaptive_margin(scores, margin)
- if is_dimension_attr and best_score >= 0.90:
- source = ProductAttributeService.find_value_source(best_val, source_map, attr)
- extracted[attr] = [{"value": best_val, "source": source}]
- continue
- if not allow_multiple:
- source = ProductAttributeService.find_value_source(best_val, source_map, attr)
- extracted[attr] = [{"value": best_val, "source": source}]
- else:
- candidates = [best_val]
- use_base_threshold = best_score >= threshold_abs
- clusters = []
- if use_semantic_clustering:
- clusters = ProductAttributeService.calculate_value_clusters(
- allowed_values, scores, cluster_threshold=0.4
- )
- best_cluster = next((c for c in clusters if best_val in c), [best_val])
- for val, sc in scores[1:]:
- min_score = 0.4 if is_dimension_attr else 0.3
- if sc < min_score:
- continue
-
- if use_dynamic_thresholds and extracted_attrs:
- dynamic_thresh = ProductAttributeService.get_dynamic_threshold(
- attr, val, sc, extracted_attrs, relationships,
- mandatory_attrs, threshold_abs
- )
- else:
- dynamic_thresh = threshold_abs
- within_margin = (best_score - sc) <= effective_margin
- above_threshold = sc >= dynamic_thresh
- in_cluster = False
- if use_semantic_clustering and clusters:
- in_cluster = any(best_val in c and val in c for c in clusters)
- if use_base_threshold:
- if above_threshold and within_margin:
- candidates.append(val)
- elif in_cluster and within_margin:
- candidates.append(val)
- else:
- if within_margin:
- candidates.append(val)
- elif in_cluster and (best_score - sc) <= effective_margin * 2.0:
- candidates.append(val)
- extracted[attr] = []
- for candidate in candidates:
- source = ProductAttributeService.find_value_source(candidate, source_map, attr)
- extracted[attr].append({"value": candidate, "source": source})
- return extracted
- @staticmethod
- def extract_attributes(
- product_text: str,
- mandatory_attrs: Dict[str, List[str]],
- source_map: Dict[str, str] = None,
- model: str = None,
- extract_additional: bool = True,
- multiple: Optional[List[str]] = None,
- threshold_abs: float = 0.65,
- margin: float = 0.15,
- use_dynamic_thresholds: bool = True,
- use_adaptive_margin: bool = True,
- use_semantic_clustering: bool = True,
- use_cache: bool = True
- ) -> dict:
- """Extract attributes from product text using Groq LLM."""
-
- if model is None:
- model = settings.SUPPORTED_MODELS[0]
- if multiple is None:
- multiple = []
- if source_map is None:
- source_map = {}
- if not product_text or product_text == "No product information available":
- return ProductAttributeService._create_error_response(
- "No product information provided",
- mandatory_attrs,
- extract_additional
- )
- # ⚡ CHECK CACHE FIRST
- if use_cache:
- cache_key = ProductAttributeService._generate_cache_key(product_text, mandatory_attrs)
- cached_result = SimpleCache.get(cache_key)
- if cached_result:
- logger.info(f"✓ Cache hit")
- return cached_result
- mandatory_attr_list = []
- for attr_name, allowed_values in mandatory_attrs.items():
- mandatory_attr_list.append(f"{attr_name}: {', '.join(allowed_values)}")
- mandatory_attr_text = "\n".join(mandatory_attr_list)
- additional_instruction = ""
- if extract_additional:
- additional_instruction = """
- 2. Extract ADDITIONAL attributes: Identify any other relevant attributes from the product text
- that are NOT in the mandatory list. Only include attributes where you can find actual values
- in the product text. Do NOT include attributes with "Not Specified" or empty values.
-
- Examples of attributes to look for (only if present): Brand, Material, Size, Color, Dimensions,
- Weight, Features, Style, Theme, Pattern, Finish, Care Instructions, etc."""
- output_format = {
- "mandatory": {attr: "value or list of values" for attr in mandatory_attrs.keys()},
- }
- if extract_additional:
- output_format["additional"] = {
- "example_attribute_1": "actual value found",
- "example_attribute_2": "actual value found"
- }
- output_format["additional"]["_note"] = "Only include attributes with actual values found in text"
- prompt = f"""
- You are an intelligent product attribute extractor that works with ANY product type.
- TASK:
- 1. Extract MANDATORY attributes: For each mandatory attribute, select the most appropriate value(s)
- from the provided list. Choose the value(s) that best match the product description.
- {additional_instruction}
- Product Text:
- {product_text}
- Mandatory Attribute Lists (MUST select from these allowed values):
- {mandatory_attr_text}
- CRITICAL INSTRUCTIONS:
- - Return ONLY valid JSON, nothing else
- - No explanations, no markdown, no text before or after the JSON
- - For mandatory attributes, choose the value(s) from the provided list that best match
- - If a mandatory attribute cannot be determined from the product text, use "Not Specified"
- - Prefer exact matches from the allowed values list over generic synonyms
- - If multiple values are plausible, you MAY return more than one
- {f"- For additional attributes: ONLY include attributes where you found actual values in the product text. DO NOT include attributes with 'Not Specified', 'None', 'N/A', or empty values. If you cannot find a value for an attribute, simply don't include that attribute." if extract_additional else ""}
- - Be precise and only extract information that is explicitly stated or clearly implied
- Required Output Format:
- {json.dumps(output_format, indent=2)}
- """
- payload = {
- "model": model,
- "messages": [
- {
- "role": "system",
- "content": f"You are a precise attribute extraction model. Return ONLY valid JSON with {'mandatory and additional' if extract_additional else 'mandatory'} sections. No explanations, no markdown, no other text."
- },
- {"role": "user", "content": prompt}
- ],
- "temperature": 0.0,
- "max_tokens": 1500
- }
- headers = {
- "Authorization": f"Bearer {settings.GROQ_API_KEY}",
- "Content-Type": "application/json",
- }
- try:
- response = requests.post(
- settings.GROQ_API_URL,
- headers=headers,
- json=payload,
- timeout=30
- )
- response.raise_for_status()
- result_text = response.json()["choices"][0]["message"]["content"].strip()
- result_text = ProductAttributeService._clean_json_response(result_text)
- parsed = json.loads(result_text)
- parsed = ProductAttributeService._validate_response_structure(
- parsed, mandatory_attrs, extract_additional, source_map
- )
- if extract_additional and "additional" in parsed:
- cleaned_additional = {}
- for k, v in parsed["additional"].items():
- if v and v not in ["Not Specified", "None", "N/A", "", "not specified", "none", "n/a"]:
- if not (isinstance(v, str) and v.lower() in ["not specified", "none", "n/a", ""]):
- if isinstance(v, list):
- cleaned_additional[k] = []
- for item in v:
- if isinstance(item, dict) and "value" in item:
- if "source" not in item:
- item["source"] = ProductAttributeService.find_value_source(
- item["value"], source_map, k
- )
- cleaned_additional[k].append(item)
- else:
- source = ProductAttributeService.find_value_source(str(item), source_map, k)
- cleaned_additional[k].append({"value": str(item), "source": source})
- else:
- source = ProductAttributeService.find_value_source(str(v), source_map, k)
- cleaned_additional[k] = [{"value": str(v), "source": source}]
- parsed["additional"] = cleaned_additional
- relationships = {}
- if use_dynamic_thresholds:
- relationships = ProductAttributeService.calculate_attribute_relationships(
- mandatory_attrs, product_text
- )
- extracted_so_far = {}
- for attr in mandatory_attrs.keys():
- allow_multiple = attr in multiple
- result = ProductAttributeService.normalize_against_product_text(
- product_text=product_text,
- mandatory_attrs={attr: mandatory_attrs[attr]},
- source_map=source_map,
- threshold_abs=threshold_abs,
- margin=margin,
- allow_multiple=allow_multiple,
- extracted_attrs=extracted_so_far,
- relationships=relationships,
- use_dynamic_thresholds=use_dynamic_thresholds,
- use_adaptive_margin=use_adaptive_margin,
- use_semantic_clustering=use_semantic_clustering
- )
- parsed["mandatory"][attr] = result[attr]
- extracted_so_far[attr] = result[attr]
- # ⚡ CACHE THE RESULT
- if use_cache:
- SimpleCache.set(cache_key, parsed)
- return parsed
- except requests.exceptions.RequestException as e:
- logger.error(f"Request exception: {str(e)}")
- return ProductAttributeService._create_error_response(
- str(e), mandatory_attrs, extract_additional
- )
- except json.JSONDecodeError as e:
- logger.error(f"JSON decode error: {str(e)}")
- return ProductAttributeService._create_error_response(
- f"Invalid JSON: {str(e)}", mandatory_attrs, extract_additional, result_text
- )
- except Exception as e:
- logger.error(f"Unexpected error: {str(e)}")
- return ProductAttributeService._create_error_response(
- str(e), mandatory_attrs, extract_additional
- )
- @staticmethod
- def _clean_json_response(text: str) -> str:
- """Clean LLM response to extract valid JSON."""
- start_idx = text.find('{')
- end_idx = text.rfind('}')
- if start_idx != -1 and end_idx != -1:
- text = text[start_idx:end_idx + 1]
- if "```json" in text:
- text = text.split("```json")[1].split("```")[0].strip()
- elif "```" in text:
- text = text.split("```")[1].split("```")[0].strip()
- if text.startswith("json"):
- text = text[4:].strip()
- return text
- @staticmethod
- def _validate_response_structure(
- parsed: dict,
- mandatory_attrs: Dict[str, List[str]],
- extract_additional: bool,
- source_map: Dict[str, str] = None
- ) -> dict:
- """Validate and fix the response structure."""
- if source_map is None:
- source_map = {}
-
- expected_sections = ["mandatory"]
- if extract_additional:
- expected_sections.append("additional")
- if not all(section in parsed for section in expected_sections):
- if isinstance(parsed, dict):
- mandatory_keys = set(mandatory_attrs.keys())
- mandatory = {k: v for k, v in parsed.items() if k in mandatory_keys}
- additional = {k: v for k, v in parsed.items() if k not in mandatory_keys}
- result = {"mandatory": mandatory}
- if extract_additional:
- result["additional"] = additional
- parsed = result
- else:
- return ProductAttributeService._create_error_response(
- "Invalid response structure",
- mandatory_attrs,
- extract_additional,
- str(parsed)
- )
- if "mandatory" in parsed:
- converted_mandatory = {}
- for attr, value in parsed["mandatory"].items():
- if isinstance(value, list):
- converted_mandatory[attr] = []
- for item in value:
- if isinstance(item, dict) and "value" in item:
- if "source" not in item:
- item["source"] = ProductAttributeService.find_value_source(
- item["value"], source_map, attr
- )
- converted_mandatory[attr].append(item)
- else:
- source = ProductAttributeService.find_value_source(str(item), source_map, attr)
- converted_mandatory[attr].append({"value": str(item), "source": source})
- else:
- source = ProductAttributeService.find_value_source(str(value), source_map, attr)
- converted_mandatory[attr] = [{"value": str(value), "source": source}]
-
- parsed["mandatory"] = converted_mandatory
- return parsed
- @staticmethod
- def _create_error_response(
- error: str,
- mandatory_attrs: Dict[str, List[str]],
- extract_additional: bool,
- raw_output: Optional[str] = None
- ) -> dict:
- """Create a standardized error response."""
- response = {
- "mandatory": {attr: [{"value": "Not Specified", "source": "error"}] for attr in mandatory_attrs.keys()},
- "error": error
- }
- if extract_additional:
- response["additional"] = {}
- if raw_output:
- response["raw_output"] = raw_output
- return response
- @staticmethod
- def get_cache_stats() -> Dict:
- """Get statistics for both caches."""
- return {
- "simple_cache": SimpleCache.get_stats(),
- "embedding_cache": EmbeddingCache.get_stats()
- }
- @staticmethod
- def clear_all_caches():
- """Clear both caches."""
- SimpleCache.clear()
- EmbeddingCache.clear()
- logger.info("All caches cleared")
|