| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777 |
- # # import requests
- # # import json
- # # from typing import Dict, List, Optional
- # # from django.conf import settings
- # # class ProductAttributeService:
- # # """Service class for extracting product attributes using Groq LLM."""
- # # @staticmethod
- # # def combine_product_text(
- # # title: Optional[str] = None,
- # # short_desc: Optional[str] = None,
- # # long_desc: Optional[str] = None
- # # ) -> str:
- # # """Combine product metadata into a single text block."""
- # # parts = []
- # # if title:
- # # parts.append(str(title).strip())
- # # if short_desc:
- # # parts.append(str(short_desc).strip())
- # # if long_desc:
- # # parts.append(str(long_desc).strip())
- # # combined = " ".join(parts).strip()
- # # if not combined:
- # # return "No product information available"
- # # return combined
- # # @staticmethod
- # # def extract_attributes(
- # # product_text: str,
- # # mandatory_attrs: Dict[str, List[str]],
- # # model: str = None,
- # # extract_additional: bool = True
- # # ) -> dict:
- # # """Use Groq LLM to extract attributes from any product type."""
-
- # # if model is None:
- # # model = settings.SUPPORTED_MODELS[0]
- # # # Check if product text is empty or minimal
- # # if not product_text or product_text == "No product information available":
- # # return ProductAttributeService._create_error_response(
- # # "No product information provided",
- # # mandatory_attrs,
- # # extract_additional
- # # )
- # # # Create structured prompt for mandatory attributes
- # # mandatory_attr_list = []
- # # for attr_name, allowed_values in mandatory_attrs.items():
- # # mandatory_attr_list.append(f"{attr_name}: {', '.join(allowed_values)}")
- # # mandatory_attr_text = "\n".join(mandatory_attr_list)
- # # additional_instruction = ""
- # # if extract_additional:
- # # additional_instruction = """
- # # 2. Extract ADDITIONAL attributes: Identify any other relevant attributes from the product text
- # # (such as Material, Size, Color, Brand, Dimensions, Weight, Features, Specifications, etc.)
- # # and their values. Extract attributes that are specific and relevant to this product type."""
- # # output_format = {
- # # "mandatory": {attr: "value" for attr in mandatory_attrs.keys()},
- # # "additional": {} if extract_additional else None
- # # }
- # # if not extract_additional:
- # # output_format.pop("additional")
- # # prompt = f"""
- # # You are an intelligent product attribute extractor that works with ANY product type.
- # # TASK:
- # # 1. Extract MANDATORY attributes: For each mandatory attribute, select the most appropriate value
- # # from the provided list. Choose the value that best matches the product description.
- # # {additional_instruction}
- # # Product Text:
- # # {product_text}
- # # Mandatory Attribute Lists (MUST select one value for each):
- # # {mandatory_attr_text}
- # # CRITICAL INSTRUCTIONS:
- # # - Return ONLY valid JSON, nothing else
- # # - No explanations, no markdown, no text before or after the JSON
- # # - For mandatory attributes, choose EXACTLY ONE value from the provided list that best matches
- # # - If a mandatory attribute cannot be determined from the product text, use "Not Specified"
- # # - Work with whatever information is available - the product text may be incomplete (only title, or only description, etc.)
- # # {f"- For additional attributes, extract any relevant information found in the product text" if extract_additional else ""}
- # # - Be precise and only extract information that is explicitly stated or clearly implied
- # # Required Output Format (ONLY THIS, NO OTHER TEXT):
- # # {json.dumps(output_format, indent=2)}
- # # """
- # # payload = {
- # # "model": model,
- # # "messages": [
- # # {
- # # "role": "system",
- # # "content": f"You are a precise attribute extraction model. Return ONLY valid JSON with {'mandatory and additional' if extract_additional else 'mandatory'} sections. No explanations, no markdown, no other text."
- # # },
- # # {"role": "user", "content": prompt}
- # # ],
- # # "temperature": 0.0,
- # # "max_tokens": 1500
- # # }
- # # headers = {
- # # "Authorization": f"Bearer {settings.GROQ_API_KEY}",
- # # "Content-Type": "application/json",
- # # }
- # # try:
- # # response = requests.post(
- # # settings.GROQ_API_URL,
- # # headers=headers,
- # # json=payload,
- # # timeout=30
- # # )
- # # response.raise_for_status()
- # # result_text = response.json()["choices"][0]["message"]["content"].strip()
- # # # Clean the response
- # # result_text = ProductAttributeService._clean_json_response(result_text)
- # # # Parse JSON
- # # parsed = json.loads(result_text)
- # # # Validate and restructure if needed
- # # parsed = ProductAttributeService._validate_response_structure(
- # # parsed, mandatory_attrs, extract_additional
- # # )
- # # return parsed
- # # except requests.exceptions.RequestException as e:
- # # return ProductAttributeService._create_error_response(
- # # str(e), mandatory_attrs, extract_additional
- # # )
- # # except json.JSONDecodeError as e:
- # # return ProductAttributeService._create_error_response(
- # # f"Invalid JSON: {str(e)}", mandatory_attrs, extract_additional, result_text
- # # )
- # # except Exception as e:
- # # return ProductAttributeService._create_error_response(
- # # str(e), mandatory_attrs, extract_additional
- # # )
- # # @staticmethod
- # # def _clean_json_response(text: str) -> str:
- # # """Clean LLM response to extract valid JSON."""
- # # start_idx = text.find('{')
- # # end_idx = text.rfind('}')
- # # if start_idx != -1 and end_idx != -1:
- # # text = text[start_idx:end_idx + 1]
- # # if "```json" in text:
- # # text = text.split("```json")[1].split("```")[0].strip()
- # # elif "```" in text:
- # # text = text.split("```")[1].split("```")[0].strip()
- # # if text.startswith("json"):
- # # text = text[4:].strip()
- # # return text
- # # @staticmethod
- # # def _validate_response_structure(
- # # parsed: dict,
- # # mandatory_attrs: Dict[str, List[str]],
- # # extract_additional: bool
- # # ) -> dict:
- # # """Validate and fix the response structure."""
- # # expected_sections = ["mandatory"]
- # # if extract_additional:
- # # expected_sections.append("additional")
- # # if not all(section in parsed for section in expected_sections):
- # # if isinstance(parsed, dict):
- # # mandatory_keys = set(mandatory_attrs.keys())
- # # mandatory = {k: v for k, v in parsed.items() if k in mandatory_keys}
- # # additional = {k: v for k, v in parsed.items() if k not in mandatory_keys}
- # # result = {"mandatory": mandatory}
- # # if extract_additional:
- # # result["additional"] = additional
- # # return result
- # # else:
- # # return ProductAttributeService._create_error_response(
- # # "Invalid response structure",
- # # mandatory_attrs,
- # # extract_additional,
- # # str(parsed)
- # # )
- # # return parsed
- # # @staticmethod
- # # def _create_error_response(
- # # error: str,
- # # mandatory_attrs: Dict[str, List[str]],
- # # extract_additional: bool,
- # # raw_output: Optional[str] = None
- # # ) -> dict:
- # # """Create a standardized error response."""
- # # response = {
- # # "mandatory": {attr: "Not Specified" for attr in mandatory_attrs.keys()},
- # # "error": error
- # # }
- # # if extract_additional:
- # # response["additional"] = {}
- # # if raw_output:
- # # response["raw_output"] = raw_output
- # # return response
- # import requests
- # import json
- # from typing import Dict, List, Optional
- # from django.conf import settings
- # from concurrent.futures import ThreadPoolExecutor, as_completed
- # class ProductAttributeService:
- # """Service class for extracting product attributes using Groq LLM."""
- # @staticmethod
- # def combine_product_text(
- # title: Optional[str] = None,
- # short_desc: Optional[str] = None,
- # long_desc: Optional[str] = None
- # ) -> str:
- # """Combine product metadata into a single text block."""
- # parts = []
- # if title:
- # parts.append(str(title).strip())
- # if short_desc:
- # parts.append(str(short_desc).strip())
- # if long_desc:
- # parts.append(str(long_desc).strip())
- # combined = " ".join(parts).strip()
- # if not combined:
- # return "No product information available"
- # return combined
- # @staticmethod
- # def extract_attributes(
- # product_text: str,
- # mandatory_attrs: Dict[str, List[str]],
- # model: str = None,
- # extract_additional: bool = True
- # ) -> dict:
- # """Use Groq LLM to extract attributes from any product type."""
-
- # if model is None:
- # model = settings.SUPPORTED_MODELS[0]
- # # Check if product text is empty or minimal
- # if not product_text or product_text == "No product information available":
- # return ProductAttributeService._create_error_response(
- # "No product information provided",
- # mandatory_attrs,
- # extract_additional
- # )
- # # Create structured prompt for mandatory attributes
- # mandatory_attr_list = []
- # for attr_name, allowed_values in mandatory_attrs.items():
- # mandatory_attr_list.append(f"{attr_name}: {', '.join(allowed_values)}")
- # mandatory_attr_text = "\n".join(mandatory_attr_list)
- # additional_instruction = ""
- # if extract_additional:
- # additional_instruction = """
- # 2. Extract ADDITIONAL attributes: Identify any other relevant attributes from the product text
- # (such as Material, Size, Color, Brand, Dimensions, Weight, Features, Specifications, etc.)
- # and their values. Extract attributes that are specific and relevant to this product type."""
- # output_format = {
- # "mandatory": {attr: "value" for attr in mandatory_attrs.keys()},
- # "additional": {} if extract_additional else None
- # }
- # if not extract_additional:
- # output_format.pop("additional")
- # prompt = f"""
- # You are an intelligent product attribute extractor that works with ANY product type.
- # TASK:
- # 1. Extract MANDATORY attributes: For each mandatory attribute, select the most appropriate value
- # from the provided list. Choose the value that best matches the product description.
- # {additional_instruction}
- # Product Text:
- # {product_text}
- # Mandatory Attribute Lists (MUST select one value for each):
- # {mandatory_attr_text}
- # CRITICAL INSTRUCTIONS:
- # - Return ONLY valid JSON, nothing else
- # - No explanations, no markdown, no text before or after the JSON
- # - For mandatory attributes, choose EXACTLY ONE value from the provided list that best matches
- # - If a mandatory attribute cannot be determined from the product text, use "Not Specified"
- # - Work with whatever information is available - the product text may be incomplete (only title, or only description, etc.)
- # {f"- For additional attributes, extract any relevant information found in the product text" if extract_additional else ""}
- # - Be precise and only extract information that is explicitly stated or clearly implied
- # Required Output Format (ONLY THIS, NO OTHER TEXT):
- # {json.dumps(output_format, indent=2)}
- # """
- # payload = {
- # "model": model,
- # "messages": [
- # {
- # "role": "system",
- # "content": f"You are a precise attribute extraction model. Return ONLY valid JSON with {'mandatory and additional' if extract_additional else 'mandatory'} sections. No explanations, no markdown, no other text."
- # },
- # {"role": "user", "content": prompt}
- # ],
- # "temperature": 0.0,
- # "max_tokens": 1500
- # }
- # headers = {
- # "Authorization": f"Bearer {settings.GROQ_API_KEY}",
- # "Content-Type": "application/json",
- # }
- # try:
- # response = requests.post(
- # settings.GROQ_API_URL,
- # headers=headers,
- # json=payload,
- # timeout=30
- # )
- # response.raise_for_status()
- # result_text = response.json()["choices"][0]["message"]["content"].strip()
- # # Clean the response
- # result_text = ProductAttributeService._clean_json_response(result_text)
- # # Parse JSON
- # parsed = json.loads(result_text)
- # # Validate and restructure if needed
- # parsed = ProductAttributeService._validate_response_structure(
- # parsed, mandatory_attrs, extract_additional
- # )
- # return parsed
- # except requests.exceptions.RequestException as e:
- # return ProductAttributeService._create_error_response(
- # str(e), mandatory_attrs, extract_additional
- # )
- # except json.JSONDecodeError as e:
- # return ProductAttributeService._create_error_response(
- # f"Invalid JSON: {str(e)}", mandatory_attrs, extract_additional, result_text
- # )
- # except Exception as e:
- # return ProductAttributeService._create_error_response(
- # str(e), mandatory_attrs, extract_additional
- # )
- # @staticmethod
- # def extract_attributes_batch(
- # products: List[Dict],
- # mandatory_attrs: Dict[str, List[str]],
- # model: str = None,
- # extract_additional: bool = True,
- # max_workers: int = 5
- # ) -> Dict:
- # """
- # Extract attributes for multiple products in parallel.
-
- # Args:
- # products: List of product dictionaries with keys: product_id, title, short_desc, long_desc
- # mandatory_attrs: Dictionary of mandatory attributes
- # model: Groq model to use
- # extract_additional: Whether to extract additional attributes
- # max_workers: Maximum number of parallel workers
-
- # Returns:
- # Dictionary with results, total_products, successful, and failed counts
- # """
- # results = []
- # successful = 0
- # failed = 0
- # def process_product(product_data):
- # """Process a single product."""
- # product_id = product_data.get('product_id', f"product_{len(results)}")
-
- # try:
- # product_text = ProductAttributeService.combine_product_text(
- # title=product_data.get('title'),
- # short_desc=product_data.get('short_desc'),
- # long_desc=product_data.get('long_desc')
- # )
-
- # result = ProductAttributeService.extract_attributes(
- # product_text=product_text,
- # mandatory_attrs=mandatory_attrs,
- # model=model,
- # extract_additional=extract_additional
- # )
-
- # result['product_id'] = product_id
-
- # # Check if extraction was successful
- # if 'error' not in result:
- # return result, True
- # else:
- # return result, False
-
- # except Exception as e:
- # return {
- # 'product_id': product_id,
- # 'mandatory': {attr: "Not Specified" for attr in mandatory_attrs.keys()},
- # 'additional': {} if extract_additional else None,
- # 'error': f"Processing error: {str(e)}"
- # }, False
- # # Process products in parallel
- # with ThreadPoolExecutor(max_workers=max_workers) as executor:
- # future_to_product = {
- # executor.submit(process_product, product): product
- # for product in products
- # }
-
- # for future in as_completed(future_to_product):
- # try:
- # result, success = future.result()
- # results.append(result)
- # if success:
- # successful += 1
- # else:
- # failed += 1
- # except Exception as e:
- # failed += 1
- # results.append({
- # 'product_id': 'unknown',
- # 'mandatory': {attr: "Not Specified" for attr in mandatory_attrs.keys()},
- # 'additional': {} if extract_additional else None,
- # 'error': f"Unexpected error: {str(e)}"
- # })
- # return {
- # 'results': results,
- # 'total_products': len(products),
- # 'successful': successful,
- # 'failed': failed
- # }
- # @staticmethod
- # def _clean_json_response(text: str) -> str:
- # """Clean LLM response to extract valid JSON."""
- # start_idx = text.find('{')
- # end_idx = text.rfind('}')
- # if start_idx != -1 and end_idx != -1:
- # text = text[start_idx:end_idx + 1]
- # if "```json" in text:
- # text = text.split("```json")[1].split("```")[0].strip()
- # elif "```" in text:
- # text = text.split("```")[1].split("```")[0].strip()
- # if text.startswith("json"):
- # text = text[4:].strip()
- # return text
- # @staticmethod
- # def _validate_response_structure(
- # parsed: dict,
- # mandatory_attrs: Dict[str, List[str]],
- # extract_additional: bool
- # ) -> dict:
- # """Validate and fix the response structure."""
- # expected_sections = ["mandatory"]
- # if extract_additional:
- # expected_sections.append("additional")
- # if not all(section in parsed for section in expected_sections):
- # if isinstance(parsed, dict):
- # mandatory_keys = set(mandatory_attrs.keys())
- # mandatory = {k: v for k, v in parsed.items() if k in mandatory_keys}
- # additional = {k: v for k, v in parsed.items() if k not in mandatory_keys}
- # result = {"mandatory": mandatory}
- # if extract_additional:
- # result["additional"] = additional
- # return result
- # else:
- # return ProductAttributeService._create_error_response(
- # "Invalid response structure",
- # mandatory_attrs,
- # extract_additional,
- # str(parsed)
- # )
- # return parsed
- # @staticmethod
- # def _create_error_response(
- # error: str,
- # mandatory_attrs: Dict[str, List[str]],
- # extract_additional: bool,
- # raw_output: Optional[str] = None
- # ) -> dict:
- # """Create a standardized error response."""
- # response = {
- # "mandatory": {attr: "Not Specified" for attr in mandatory_attrs.keys()},
- # "error": error
- # }
- # if extract_additional:
- # response["additional"] = {}
- # if raw_output:
- # response["raw_output"] = raw_output
- # return response
- # # ==================== services.py ====================
- # import requests
- # import json
- # from typing import Dict, List, Optional
- # from django.conf import settings
- # from concurrent.futures import ThreadPoolExecutor, as_completed
- # from .ocr_service import OCRService
- # class ProductAttributeService:
- # """Service class for extracting product attributes using Groq LLM."""
- # @staticmethod
- # def combine_product_text(
- # title: Optional[str] = None,
- # short_desc: Optional[str] = None,
- # long_desc: Optional[str] = None,
- # ocr_text: Optional[str] = None
- # ) -> str:
- # """Combine product metadata into a single text block."""
- # parts = []
- # if title:
- # parts.append(f"Title: {str(title).strip()}")
- # if short_desc:
- # parts.append(f"Description: {str(short_desc).strip()}")
- # if long_desc:
- # parts.append(f"Details: {str(long_desc).strip()}")
- # if ocr_text:
- # parts.append(f"OCR Text: {ocr_text}")
-
- # combined = "\n".join(parts).strip()
-
- # if not combined:
- # return "No product information available"
-
- # return combined
- # @staticmethod
- # def extract_attributes_from_ocr(ocr_results: Dict, model: str = None) -> Dict:
- # """Extract structured attributes from OCR text using LLM."""
- # if model is None:
- # model = settings.SUPPORTED_MODELS[0]
-
- # detected_text = ocr_results.get('detected_text', [])
- # if not detected_text:
- # return {}
-
- # # Format OCR text for prompt
- # ocr_text = "\n".join([f"Text: {item['text']}, Confidence: {item['confidence']:.2f}"
- # for item in detected_text])
-
- # prompt = f"""
- # You are an AI model that extracts structured attributes from OCR text detected on product images.
- # Given the OCR detections below, infer the possible product attributes and return them as a clean JSON object.
- # OCR Text:
- # {ocr_text}
- # Extract relevant attributes like:
- # - brand
- # - model_number
- # - size (waist_size, length, etc.)
- # - collection
- # - any other relevant product information
- # Return a JSON object with only the attributes you can confidently identify.
- # If an attribute is not present, do not include it in the response.
- # """
-
- # payload = {
- # "model": model,
- # "messages": [
- # {
- # "role": "system",
- # "content": "You are a helpful AI that extracts structured data from OCR output. Return only valid JSON."
- # },
- # {"role": "user", "content": prompt}
- # ],
- # "temperature": 0.2,
- # "max_tokens": 500
- # }
-
- # headers = {
- # "Authorization": f"Bearer {settings.GROQ_API_KEY}",
- # "Content-Type": "application/json",
- # }
-
- # try:
- # response = requests.post(
- # settings.GROQ_API_URL,
- # headers=headers,
- # json=payload,
- # timeout=30
- # )
- # response.raise_for_status()
- # result_text = response.json()["choices"][0]["message"]["content"].strip()
-
- # # Clean and parse JSON
- # result_text = ProductAttributeService._clean_json_response(result_text)
- # parsed = json.loads(result_text)
-
- # return parsed
- # except Exception as e:
- # return {"error": f"Failed to extract attributes from OCR: {str(e)}"}
- # @staticmethod
- # def extract_attributes(
- # product_text: str,
- # mandatory_attrs: Dict[str, List[str]],
- # model: str = None,
- # extract_additional: bool = True
- # ) -> dict:
- # """Use Groq LLM to extract attributes from any product type."""
-
- # if model is None:
- # model = settings.SUPPORTED_MODELS[0]
- # # Check if product text is empty or minimal
- # if not product_text or product_text == "No product information available":
- # return ProductAttributeService._create_error_response(
- # "No product information provided",
- # mandatory_attrs,
- # extract_additional
- # )
- # # Create structured prompt for mandatory attributes
- # mandatory_attr_list = []
- # for attr_name, allowed_values in mandatory_attrs.items():
- # mandatory_attr_list.append(f"{attr_name}: {', '.join(allowed_values)}")
- # mandatory_attr_text = "\n".join(mandatory_attr_list)
- # additional_instruction = ""
- # if extract_additional:
- # additional_instruction = """
- # 2. Extract ADDITIONAL attributes: Identify any other relevant attributes from the product text
- # (such as Material, Size, Color, Brand, Dimensions, Weight, Features, Specifications, etc.)
- # and their values. Extract attributes that are specific and relevant to this product type."""
- # output_format = {
- # "mandatory": {attr: "value" for attr in mandatory_attrs.keys()},
- # "additional": {} if extract_additional else None
- # }
- # if not extract_additional:
- # output_format.pop("additional")
- # prompt = f"""
- # You are an intelligent product attribute extractor that works with ANY product type.
- # TASK:
- # 1. Extract MANDATORY attributes: For each mandatory attribute, select the most appropriate value
- # from the provided list. Choose the value that best matches the product description.
- # {additional_instruction}
- # Product Text:
- # {product_text}
- # Mandatory Attribute Lists (MUST select one value for each):
- # {mandatory_attr_text}
- # CRITICAL INSTRUCTIONS:
- # - Return ONLY valid JSON, nothing else
- # - No explanations, no markdown, no text before or after the JSON
- # - For mandatory attributes, choose EXACTLY ONE value from the provided list that best matches
- # - If a mandatory attribute cannot be determined from the product text, use "Not Specified"
- # - Work with whatever information is available - the product text may be incomplete
- # {f"- For additional attributes, extract any relevant information found in the product text" if extract_additional else ""}
- # - Be precise and only extract information that is explicitly stated or clearly implied
- # Required Output Format (ONLY THIS, NO OTHER TEXT):
- # {json.dumps(output_format, indent=2)}
- # """
- # payload = {
- # "model": model,
- # "messages": [
- # {
- # "role": "system",
- # "content": f"You are a precise attribute extraction model. Return ONLY valid JSON with {'mandatory and additional' if extract_additional else 'mandatory'} sections. No explanations, no markdown, no other text."
- # },
- # {"role": "user", "content": prompt}
- # ],
- # "temperature": 0.0,
- # "max_tokens": 1500
- # }
- # headers = {
- # "Authorization": f"Bearer {settings.GROQ_API_KEY}",
- # "Content-Type": "application/json",
- # }
- # try:
- # response = requests.post(
- # settings.GROQ_API_URL,
- # headers=headers,
- # json=payload,
- # timeout=30
- # )
- # response.raise_for_status()
- # result_text = response.json()["choices"][0]["message"]["content"].strip()
- # # Clean the response
- # result_text = ProductAttributeService._clean_json_response(result_text)
- # # Parse JSON
- # parsed = json.loads(result_text)
- # # Validate and restructure if needed
- # parsed = ProductAttributeService._validate_response_structure(
- # parsed, mandatory_attrs, extract_additional
- # )
- # return parsed
- # except requests.exceptions.RequestException as e:
- # return ProductAttributeService._create_error_response(
- # str(e), mandatory_attrs, extract_additional
- # )
- # except json.JSONDecodeError as e:
- # return ProductAttributeService._create_error_response(
- # f"Invalid JSON: {str(e)}", mandatory_attrs, extract_additional, result_text
- # )
- # except Exception as e:
- # return ProductAttributeService._create_error_response(
- # str(e), mandatory_attrs, extract_additional
- # )
- # @staticmethod
- # def extract_attributes_batch(
- # products: List[Dict],
- # mandatory_attrs: Dict[str, List[str]],
- # model: str = None,
- # extract_additional: bool = True,
- # process_image: bool = True,
- # max_workers: int = 5
- # ) -> Dict:
- # """Extract attributes for multiple products in parallel."""
- # results = []
- # successful = 0
- # failed = 0
-
- # ocr_service = OCRService()
- # def process_product(product_data):
- # """Process a single product."""
- # product_id = product_data.get('product_id', f"product_{len(results)}")
-
- # try:
- # # Process image if URL is provided
- # ocr_results = None
- # ocr_text = None
-
- # if process_image and product_data.get('image_url'):
- # ocr_results = ocr_service.process_image(product_data['image_url'])
-
- # # Extract attributes from OCR
- # if ocr_results and ocr_results.get('detected_text'):
- # ocr_attrs = ProductAttributeService.extract_attributes_from_ocr(
- # ocr_results, model
- # )
- # ocr_results['extracted_attributes'] = ocr_attrs
-
- # # Format OCR text for combining with product text
- # ocr_text = "\n".join([
- # f"{item['text']} (confidence: {item['confidence']:.2f})"
- # for item in ocr_results['detected_text']
- # ])
-
- # # Combine all product information
- # product_text = ProductAttributeService.combine_product_text(
- # title=product_data.get('title'),
- # short_desc=product_data.get('short_desc'),
- # long_desc=product_data.get('long_desc'),
- # ocr_text=ocr_text
- # )
-
- # # Extract attributes from combined text
- # result = ProductAttributeService.extract_attributes(
- # product_text=product_text,
- # mandatory_attrs=mandatory_attrs,
- # model=model,
- # extract_additional=extract_additional
- # )
-
- # result['product_id'] = product_id
-
- # # Add OCR results if available
- # if ocr_results:
- # result['ocr_results'] = ocr_results
-
- # # Check if extraction was successful
- # if 'error' not in result:
- # return result, True
- # else:
- # return result, False
-
- # except Exception as e:
- # return {
- # 'product_id': product_id,
- # 'mandatory': {attr: "Not Specified" for attr in mandatory_attrs.keys()},
- # 'additional': {} if extract_additional else None,
- # 'error': f"Processing error: {str(e)}"
- # }, False
- # # Process products in parallel
- # with ThreadPoolExecutor(max_workers=max_workers) as executor:
- # future_to_product = {
- # executor.submit(process_product, product): product
- # for product in products
- # }
-
- # for future in as_completed(future_to_product):
- # try:
- # result, success = future.result()
- # results.append(result)
- # if success:
- # successful += 1
- # else:
- # failed += 1
- # except Exception as e:
- # failed += 1
- # results.append({
- # 'product_id': 'unknown',
- # 'mandatory': {attr: "Not Specified" for attr in mandatory_attrs.keys()},
- # 'additional': {} if extract_additional else None,
- # 'error': f"Unexpected error: {str(e)}"
- # })
- # return {
- # 'results': results,
- # 'total_products': len(products),
- # 'successful': successful,
- # 'failed': failed
- # }
- # @staticmethod
- # def _clean_json_response(text: str) -> str:
- # """Clean LLM response to extract valid JSON."""
- # start_idx = text.find('{')
- # end_idx = text.rfind('}')
- # if start_idx != -1 and end_idx != -1:
- # text = text[start_idx:end_idx + 1]
- # if "```json" in text:
- # text = text.split("```json")[1].split("```")[0].strip()
- # elif "```" in text:
- # text = text.split("```")[1].split("```")[0].strip()
- # if text.startswith("json"):
- # text = text[4:].strip()
- # return text
- # @staticmethod
- # def _validate_response_structure(
- # parsed: dict,
- # mandatory_attrs: Dict[str, List[str]],
- # extract_additional: bool
- # ) -> dict:
- # """Validate and fix the response structure."""
- # expected_sections = ["mandatory"]
- # if extract_additional:
- # expected_sections.append("additional")
- # if not all(section in parsed for section in expected_sections):
- # if isinstance(parsed, dict):
- # mandatory_keys = set(mandatory_attrs.keys())
- # mandatory = {k: v for k, v in parsed.items() if k in mandatory_keys}
- # additional = {k: v for k, v in parsed.items() if k not in mandatory_keys}
- # result = {"mandatory": mandatory}
- # if extract_additional:
- # result["additional"] = additional
- # return result
- # else:
- # return ProductAttributeService._create_error_response(
- # "Invalid response structure",
- # mandatory_attrs,
- # extract_additional,
- # str(parsed)
- # )
- # return parsed
- # @staticmethod
- # def _create_error_response(
- # error: str,
- # mandatory_attrs: Dict[str, List[str]],
- # extract_additional: bool,
- # raw_output: Optional[str] = None
- # ) -> dict:
- # """Create a standardized error response."""
- # response = {
- # "mandatory": {attr: "Not Specified" for attr in mandatory_attrs.keys()},
- # "error": error
- # }
- # if extract_additional:
- # response["additional"] = {}
- # if raw_output:
- # response["raw_output"] = raw_output
- # return response
- # # ==================== services.py ====================
- # import requests
- # import json
- # from typing import Dict, List, Optional, Tuple
- # from django.conf import settings
- # from concurrent.futures import ThreadPoolExecutor, as_completed
- # from sentence_transformers import SentenceTransformer, util
- # import numpy as np
- # from .ocr_service import OCRService
- # # Initialize embedding model for normalization
- # model_embedder = SentenceTransformer("all-MiniLM-L6-v2")
- # class ProductAttributeService:
- # """Service class for extracting product attributes using Groq LLM."""
- # @staticmethod
- # def combine_product_text(
- # title: Optional[str] = None,
- # short_desc: Optional[str] = None,
- # long_desc: Optional[str] = None,
- # ocr_text: Optional[str] = None
- # ) -> Tuple[str, Dict[str, str]]:
- # """
- # Combine product metadata into a single text block.
- # Returns: (combined_text, source_map) where source_map tracks which text came from where
- # """
- # parts = []
- # source_map = {}
-
- # if title:
- # title_str = str(title).strip()
- # parts.append(f"Title: {title_str}")
- # source_map['title'] = title_str
- # if short_desc:
- # short_str = str(short_desc).strip()
- # parts.append(f"Description: {short_str}")
- # source_map['short_desc'] = short_str
- # if long_desc:
- # long_str = str(long_desc).strip()
- # parts.append(f"Details: {long_str}")
- # source_map['long_desc'] = long_str
- # if ocr_text:
- # parts.append(f"OCR Text: {ocr_text}")
- # source_map['ocr_text'] = ocr_text
-
- # combined = "\n".join(parts).strip()
-
- # if not combined:
- # return "No product information available", {}
-
- # return combined, source_map
- # @staticmethod
- # def find_value_source(value: str, source_map: Dict[str, str]) -> str:
- # """
- # Find which source(s) contain the given value.
- # Returns the source name(s) where the value appears.
- # """
- # value_lower = value.lower()
- # # Split value into tokens for better matching
- # value_tokens = set(value_lower.replace("-", " ").split())
-
- # sources_found = []
- # source_scores = {}
-
- # for source_name, source_text in source_map.items():
- # source_lower = source_text.lower()
-
- # # Check for exact phrase match first
- # if value_lower in source_lower:
- # source_scores[source_name] = 1.0
- # continue
-
- # # Check for token matches
- # token_matches = sum(1 for token in value_tokens if token in source_lower)
- # if token_matches > 0:
- # source_scores[source_name] = token_matches / len(value_tokens)
-
- # # Return source with highest score, or all sources if multiple have same score
- # if source_scores:
- # max_score = max(source_scores.values())
- # sources_found = [s for s, score in source_scores.items() if score == max_score]
-
- # # Prioritize: title > short_desc > long_desc > ocr_text
- # priority = ['title', 'short_desc', 'long_desc', 'ocr_text']
- # for p in priority:
- # if p in sources_found:
- # return p
-
- # return sources_found[0] if sources_found else "Not found"
-
- # return "Not found"
- # @staticmethod
- # def extract_attributes_from_ocr(ocr_results: Dict, model: str = None) -> Dict:
- # """Extract structured attributes from OCR text using LLM."""
- # if model is None:
- # model = settings.SUPPORTED_MODELS[0]
-
- # detected_text = ocr_results.get('detected_text', [])
- # if not detected_text:
- # return {}
-
- # # Format OCR text for prompt
- # ocr_text = "\n".join([f"Text: {item['text']}, Confidence: {item['confidence']:.2f}"
- # for item in detected_text])
-
- # prompt = f"""
- # You are an AI model that extracts structured attributes from OCR text detected on product images.
- # Given the OCR detections below, infer the possible product attributes and return them as a clean JSON object.
- # OCR Text:
- # {ocr_text}
- # Extract relevant attributes like:
- # - brand
- # - model_number
- # - size (waist_size, length, etc.)
- # - collection
- # - any other relevant product information
- # Return a JSON object with only the attributes you can confidently identify.
- # If an attribute is not present, do not include it in the response.
- # """
-
- # payload = {
- # "model": model,
- # "messages": [
- # {
- # "role": "system",
- # "content": "You are a helpful AI that extracts structured data from OCR output. Return only valid JSON."
- # },
- # {"role": "user", "content": prompt}
- # ],
- # "temperature": 0.2,
- # "max_tokens": 500
- # }
-
- # headers = {
- # "Authorization": f"Bearer {settings.GROQ_API_KEY}",
- # "Content-Type": "application/json",
- # }
-
- # try:
- # response = requests.post(
- # settings.GROQ_API_URL,
- # headers=headers,
- # json=payload,
- # timeout=30
- # )
- # response.raise_for_status()
- # result_text = response.json()["choices"][0]["message"]["content"].strip()
-
- # # Clean and parse JSON
- # result_text = ProductAttributeService._clean_json_response(result_text)
- # parsed = json.loads(result_text)
-
- # return parsed
- # except Exception as e:
- # return {"error": f"Failed to extract attributes from OCR: {str(e)}"}
- # @staticmethod
- # def calculate_attribute_relationships(
- # mandatory_attrs: Dict[str, List[str]],
- # product_text: str
- # ) -> Dict[str, float]:
- # """
- # Calculate semantic relationships between attribute values across different attributes.
- # Returns a matrix of cross-attribute value similarities.
- # """
- # pt_emb = model_embedder.encode(product_text, convert_to_tensor=True)
- # # Calculate similarities between all attribute values and product text
- # attr_scores = {}
- # for attr, values in mandatory_attrs.items():
- # attr_scores[attr] = {}
- # for val in values:
- # contexts = [val, f"for {val}", f"use in {val}", f"suitable for {val}"]
- # ctx_embs = [model_embedder.encode(c, convert_to_tensor=True) for c in contexts]
- # sem_sim = max(float(util.cos_sim(pt_emb, ce).item()) for ce in ctx_embs)
- # attr_scores[attr][val] = sem_sim
- # # Calculate cross-attribute value relationships
- # relationships = {}
- # attr_list = list(mandatory_attrs.keys())
- # for i, attr1 in enumerate(attr_list):
- # for attr2 in attr_list[i+1:]:
- # # Calculate pairwise similarities between values of different attributes
- # for val1 in mandatory_attrs[attr1]:
- # for val2 in mandatory_attrs[attr2]:
- # emb1 = model_embedder.encode(val1, convert_to_tensor=True)
- # emb2 = model_embedder.encode(val2, convert_to_tensor=True)
- # sim = float(util.cos_sim(emb1, emb2).item())
- # # Store bidirectional relationships
- # key1 = f"{attr1}:{val1}->{attr2}:{val2}"
- # key2 = f"{attr2}:{val2}->{attr1}:{val1}"
- # relationships[key1] = sim
- # relationships[key2] = sim
- # return relationships
- # @staticmethod
- # def calculate_value_clusters(
- # values: List[str],
- # scores: List[Tuple[str, float]],
- # cluster_threshold: float = 0.4
- # ) -> List[List[str]]:
- # """
- # Group values into semantic clusters based on their similarity to each other.
- # Returns clusters of related values.
- # """
- # if len(values) <= 1:
- # return [[val] for val, _ in scores]
- # # Get embeddings for all values
- # embeddings = [model_embedder.encode(val, convert_to_tensor=True) for val in values]
- # # Calculate pairwise similarities
- # similarity_matrix = np.zeros((len(values), len(values)))
- # for i in range(len(values)):
- # for j in range(i+1, len(values)):
- # sim = float(util.cos_sim(embeddings[i], embeddings[j]).item())
- # similarity_matrix[i][j] = sim
- # similarity_matrix[j][i] = sim
- # # Simple clustering: group values with high similarity
- # clusters = []
- # visited = set()
- # for i, (val, score) in enumerate(scores):
- # if i in visited:
- # continue
- # cluster = [val]
- # visited.add(i)
- # # Find similar values
- # for j in range(len(values)):
- # if j not in visited and similarity_matrix[i][j] >= cluster_threshold:
- # cluster.append(values[j])
- # visited.add(j)
- # clusters.append(cluster)
- # return clusters
- # @staticmethod
- # def get_dynamic_threshold(
- # attr: str,
- # val: str,
- # base_score: float,
- # extracted_attrs: Dict[str, List[Dict[str, str]]],
- # relationships: Dict[str, float],
- # mandatory_attrs: Dict[str, List[str]],
- # base_threshold: float = 0.65,
- # boost_factor: float = 0.15
- # ) -> float:
- # """
- # Calculate dynamic threshold based on relationships with already-extracted attributes.
- # """
- # threshold = base_threshold
- # # Check relationships with already extracted attributes
- # max_relationship = 0.0
- # for other_attr, other_values_list in extracted_attrs.items():
- # if other_attr == attr:
- # continue
- # for other_val_dict in other_values_list:
- # other_val = other_val_dict['value']
- # key = f"{attr}:{val}->{other_attr}:{other_val}"
- # if key in relationships:
- # max_relationship = max(max_relationship, relationships[key])
- # # If strong relationship exists, lower threshold
- # if max_relationship > 0.6:
- # threshold = base_threshold - (boost_factor * max_relationship)
- # return max(0.3, threshold)
- # @staticmethod
- # def get_adaptive_margin(
- # scores: List[Tuple[str, float]],
- # base_margin: float = 0.15,
- # max_margin: float = 0.22
- # ) -> float:
- # """
- # Calculate adaptive margin based on score distribution.
- # """
- # if len(scores) < 2:
- # return base_margin
- # score_values = [s for _, s in scores]
- # best_score = score_values[0]
- # # If best score is very low, use adaptive margin but be more conservative
- # if best_score < 0.5:
- # # Calculate score spread in top 3-4 scores only (more selective)
- # top_scores = score_values[:min(4, len(score_values))]
- # score_range = max(top_scores) - min(top_scores)
- # # Very controlled margin increase
- # if score_range < 0.30:
- # # Much more conservative scaling
- # score_factor = (0.5 - best_score) * 0.35
- # adaptive = base_margin + score_factor + (0.30 - score_range) * 0.2
- # return min(adaptive, max_margin)
- # return base_margin
- # @staticmethod
- # def _lexical_evidence(product_text: str, label: str) -> float:
- # """Calculate lexical overlap between product text and label."""
- # pt = product_text.lower()
- # tokens = [t for t in label.lower().replace("-", " ").split() if t]
- # if not tokens:
- # return 0.0
- # hits = sum(1 for t in tokens if t in pt)
- # return hits / len(tokens)
- # @staticmethod
- # def normalize_against_product_text(
- # product_text: str,
- # mandatory_attrs: Dict[str, List[str]],
- # source_map: Dict[str, str],
- # threshold_abs: float = 0.65,
- # margin: float = 0.15,
- # allow_multiple: bool = False,
- # sem_weight: float = 0.8,
- # lex_weight: float = 0.2,
- # extracted_attrs: Optional[Dict[str, List[Dict[str, str]]]] = None,
- # relationships: Optional[Dict[str, float]] = None,
- # use_dynamic_thresholds: bool = True,
- # use_adaptive_margin: bool = True,
- # use_semantic_clustering: bool = True
- # ) -> dict:
- # """
- # Score each allowed value against the product_text with dynamic thresholds.
- # Returns dict with values in array format: [{"value": "...", "source": "..."}]
- # """
- # if extracted_attrs is None:
- # extracted_attrs = {}
- # if relationships is None:
- # relationships = {}
- # pt_emb = model_embedder.encode(product_text, convert_to_tensor=True)
- # extracted = {}
- # for attr, allowed_values in mandatory_attrs.items():
- # scores: List[Tuple[str, float]] = []
- # for val in allowed_values:
- # contexts = [val, f"for {val}", f"use in {val}", f"suitable for {val}", f"{val} room"]
- # ctx_embs = [model_embedder.encode(c, convert_to_tensor=True) for c in contexts]
- # sem_sim = max(float(util.cos_sim(pt_emb, ce).item()) for ce in ctx_embs)
- # lex_score = ProductAttributeService._lexical_evidence(product_text, val)
- # final_score = sem_weight * sem_sim + lex_weight * lex_score
- # scores.append((val, final_score))
- # scores.sort(key=lambda x: x[1], reverse=True)
- # best_val, best_score = scores[0]
- # # Calculate adaptive margin if enabled
- # effective_margin = margin
- # if allow_multiple and use_adaptive_margin:
- # effective_margin = ProductAttributeService.get_adaptive_margin(scores, margin)
- # if not allow_multiple:
- # source = ProductAttributeService.find_value_source(best_val, source_map)
- # extracted[attr] = [{"value": best_val, "source": source}]
- # else:
- # candidates = [best_val]
- # use_base_threshold = best_score >= threshold_abs
- # # Get semantic clusters if enabled
- # clusters = []
- # if use_semantic_clustering:
- # clusters = ProductAttributeService.calculate_value_clusters(
- # allowed_values, scores, cluster_threshold=0.4
- # )
- # best_cluster = next((c for c in clusters if best_val in c), [best_val])
- # for val, sc in scores[1:]:
- # # Calculate dynamic threshold for this value
- # if use_dynamic_thresholds and extracted_attrs:
- # dynamic_thresh = ProductAttributeService.get_dynamic_threshold(
- # attr, val, sc, extracted_attrs, relationships,
- # mandatory_attrs, threshold_abs
- # )
- # else:
- # dynamic_thresh = threshold_abs
- # within_margin = (best_score - sc) <= effective_margin
- # above_threshold = sc >= dynamic_thresh
- # # Check if in same semantic cluster as best value
- # in_cluster = False
- # if use_semantic_clustering and clusters:
- # in_cluster = any(best_val in c and val in c for c in clusters)
- # if use_base_threshold:
- # # Best score is good, require threshold OR (cluster + margin)
- # if above_threshold and within_margin:
- # candidates.append(val)
- # elif in_cluster and within_margin:
- # candidates.append(val)
- # else:
- # # Best score is low, use margin OR cluster logic
- # if within_margin:
- # candidates.append(val)
- # elif in_cluster and (best_score - sc) <= effective_margin * 2.0:
- # # Extended margin for cluster members
- # candidates.append(val)
- # # Map each candidate to its source and create array format
- # extracted[attr] = []
- # for candidate in candidates:
- # source = ProductAttributeService.find_value_source(candidate, source_map)
- # extracted[attr].append({"value": candidate, "source": source})
- # return extracted
- # @staticmethod
- # def extract_attributes(
- # product_text: str,
- # mandatory_attrs: Dict[str, List[str]],
- # source_map: Dict[str, str] = None,
- # model: str = None,
- # extract_additional: bool = True,
- # multiple: Optional[List[str]] = None,
- # threshold_abs: float = 0.65,
- # margin: float = 0.15,
- # use_dynamic_thresholds: bool = True,
- # use_adaptive_margin: bool = True,
- # use_semantic_clustering: bool = True
- # ) -> dict:
- # """
- # Use Groq LLM to extract attributes from any product type with enhanced multi-value selection.
- # Now returns values in array format: [{"value": "...", "source": "..."}]
- # """
-
- # if model is None:
- # model = settings.SUPPORTED_MODELS[0]
- # if multiple is None:
- # multiple = []
- # if source_map is None:
- # source_map = {}
- # # Check if product text is empty or minimal
- # if not product_text or product_text == "No product information available":
- # return ProductAttributeService._create_error_response(
- # "No product information provided",
- # mandatory_attrs,
- # extract_additional
- # )
- # # Create structured prompt for mandatory attributes
- # mandatory_attr_list = []
- # for attr_name, allowed_values in mandatory_attrs.items():
- # mandatory_attr_list.append(f"{attr_name}: {', '.join(allowed_values)}")
- # mandatory_attr_text = "\n".join(mandatory_attr_list)
- # additional_instruction = ""
- # if extract_additional:
- # additional_instruction = """
- # 2. Extract ADDITIONAL attributes: Identify any other relevant attributes from the product text
- # that are NOT in the mandatory list. Only include attributes where you can find actual values
- # in the product text. Do NOT include attributes with "Not Specified" or empty values.
-
- # Examples of attributes to look for (only if present): Brand, Material, Size, Color, Dimensions,
- # Weight, Features, Style, Theme, Pattern, Finish, Care Instructions, etc."""
- # output_format = {
- # "mandatory": {attr: "value or list of values" for attr in mandatory_attrs.keys()},
- # }
- # if extract_additional:
- # output_format["additional"] = {
- # "example_attribute_1": "actual value found",
- # "example_attribute_2": "actual value found"
- # }
- # output_format["additional"]["_note"] = "Only include attributes with actual values found in text"
- # prompt = f"""
- # You are an intelligent product attribute extractor that works with ANY product type.
- # TASK:
- # 1. Extract MANDATORY attributes: For each mandatory attribute, select the most appropriate value(s)
- # from the provided list. Choose the value(s) that best match the product description.
- # {additional_instruction}
- # Product Text:
- # {product_text}
- # Mandatory Attribute Lists (MUST select from these allowed values):
- # {mandatory_attr_text}
- # CRITICAL INSTRUCTIONS:
- # - Return ONLY valid JSON, nothing else
- # - No explanations, no markdown, no text before or after the JSON
- # - For mandatory attributes, choose the value(s) from the provided list that best match
- # - If a mandatory attribute cannot be determined from the product text, use "Not Specified"
- # - Prefer exact matches from the allowed values list over generic synonyms
- # - If multiple values are plausible, you MAY return more than one
- # {f"- For additional attributes: ONLY include attributes where you found actual values in the product text. DO NOT include attributes with 'Not Specified', 'None', 'N/A', or empty values. If you cannot find a value for an attribute, simply don't include that attribute." if extract_additional else ""}
- # - Be precise and only extract information that is explicitly stated or clearly implied
- # Required Output Format:
- # {json.dumps(output_format, indent=2)}
- # """
- # payload = {
- # "model": model,
- # "messages": [
- # {
- # "role": "system",
- # "content": f"You are a precise attribute extraction model. Return ONLY valid JSON with {'mandatory and additional' if extract_additional else 'mandatory'} sections. No explanations, no markdown, no other text."
- # },
- # {"role": "user", "content": prompt}
- # ],
- # "temperature": 0.0,
- # "max_tokens": 1500
- # }
- # headers = {
- # "Authorization": f"Bearer {settings.GROQ_API_KEY}",
- # "Content-Type": "application/json",
- # }
- # try:
- # response = requests.post(
- # settings.GROQ_API_URL,
- # headers=headers,
- # json=payload,
- # timeout=30
- # )
- # response.raise_for_status()
- # result_text = response.json()["choices"][0]["message"]["content"].strip()
- # # Clean the response
- # result_text = ProductAttributeService._clean_json_response(result_text)
- # # Parse JSON
- # parsed = json.loads(result_text)
- # # Validate and restructure with source tracking
- # parsed = ProductAttributeService._validate_response_structure(
- # parsed, mandatory_attrs, extract_additional, source_map
- # )
- # # Clean up and add source tracking to additional attributes in array format
- # if extract_additional and "additional" in parsed:
- # cleaned_additional = {}
- # for k, v in parsed["additional"].items():
- # if v and v not in ["Not Specified", "None", "N/A", "", "not specified", "none", "n/a"]:
- # if not (isinstance(v, str) and v.lower() in ["not specified", "none", "n/a", ""]):
- # # Convert to array format if not already
- # if isinstance(v, list):
- # cleaned_additional[k] = []
- # for item in v:
- # if isinstance(item, dict) and "value" in item:
- # if "source" not in item:
- # item["source"] = ProductAttributeService.find_value_source(
- # item["value"], source_map
- # )
- # cleaned_additional[k].append(item)
- # else:
- # source = ProductAttributeService.find_value_source(str(item), source_map)
- # cleaned_additional[k].append({"value": str(item), "source": source})
- # else:
- # source = ProductAttributeService.find_value_source(str(v), source_map)
- # cleaned_additional[k] = [{"value": str(v), "source": source}]
- # parsed["additional"] = cleaned_additional
- # # Calculate attribute relationships if using dynamic thresholds
- # relationships = {}
- # if use_dynamic_thresholds:
- # relationships = ProductAttributeService.calculate_attribute_relationships(
- # mandatory_attrs, product_text
- # )
- # # Process attributes in order, allowing earlier ones to influence later ones
- # extracted_so_far = {}
- # for attr in mandatory_attrs.keys():
- # allow_multiple = attr in multiple
- # result = ProductAttributeService.normalize_against_product_text(
- # product_text=product_text,
- # mandatory_attrs={attr: mandatory_attrs[attr]},
- # source_map=source_map,
- # threshold_abs=threshold_abs,
- # margin=margin,
- # allow_multiple=allow_multiple,
- # extracted_attrs=extracted_so_far,
- # relationships=relationships,
- # use_dynamic_thresholds=use_dynamic_thresholds,
- # use_adaptive_margin=use_adaptive_margin,
- # use_semantic_clustering=use_semantic_clustering
- # )
- # # Result is already in array format from normalize_against_product_text
- # parsed["mandatory"][attr] = result[attr]
- # extracted_so_far[attr] = result[attr]
- # return parsed
- # except requests.exceptions.RequestException as e:
- # return ProductAttributeService._create_error_response(
- # str(e), mandatory_attrs, extract_additional
- # )
- # except json.JSONDecodeError as e:
- # return ProductAttributeService._create_error_response(
- # f"Invalid JSON: {str(e)}", mandatory_attrs, extract_additional, result_text
- # )
- # except Exception as e:
- # return ProductAttributeService._create_error_response(
- # str(e), mandatory_attrs, extract_additional
- # )
- # @staticmethod
- # def extract_attributes_batch(
- # products: List[Dict],
- # mandatory_attrs: Dict[str, List[str]],
- # model: str = None,
- # extract_additional: bool = True,
- # process_image: bool = True,
- # max_workers: int = 5,
- # multiple: Optional[List[str]] = None,
- # threshold_abs: float = 0.65,
- # margin: float = 0.15,
- # use_dynamic_thresholds: bool = True,
- # use_adaptive_margin: bool = True,
- # use_semantic_clustering: bool = True
- # ) -> Dict:
- # """Extract attributes for multiple products in parallel with enhanced multi-value selection and source tracking."""
- # results = []
- # successful = 0
- # failed = 0
-
- # ocr_service = OCRService()
- # if multiple is None:
- # multiple = []
- # def process_product(product_data):
- # """Process a single product."""
- # product_id = product_data.get('product_id', f"product_{len(results)}")
-
- # try:
- # # Process image if URL is provided
- # ocr_results = None
- # ocr_text = None
-
- # if process_image and product_data.get('image_url'):
- # ocr_results = ocr_service.process_image(product_data['image_url'])
-
- # # Extract attributes from OCR
- # if ocr_results and ocr_results.get('detected_text'):
- # ocr_attrs = ProductAttributeService.extract_attributes_from_ocr(
- # ocr_results, model
- # )
- # ocr_results['extracted_attributes'] = ocr_attrs
-
- # # Format OCR text for combining with product text
- # ocr_text = "\n".join([
- # f"{item['text']} (confidence: {item['confidence']:.2f})"
- # for item in ocr_results['detected_text']
- # ])
-
- # # Combine all product information with source tracking
- # product_text, source_map = ProductAttributeService.combine_product_text(
- # title=product_data.get('title'),
- # short_desc=product_data.get('short_desc'),
- # long_desc=product_data.get('long_desc'),
- # ocr_text=ocr_text
- # )
-
- # # Extract attributes from combined text with enhanced features
- # result = ProductAttributeService.extract_attributes(
- # product_text=product_text,
- # mandatory_attrs=mandatory_attrs,
- # source_map=source_map,
- # model=model,
- # extract_additional=extract_additional,
- # multiple=multiple,
- # threshold_abs=threshold_abs,
- # margin=margin,
- # use_dynamic_thresholds=use_dynamic_thresholds,
- # use_adaptive_margin=use_adaptive_margin,
- # use_semantic_clustering=use_semantic_clustering
- # )
-
- # result['product_id'] = product_id
-
- # # Add OCR results if available
- # if ocr_results:
- # result['ocr_results'] = ocr_results
-
- # # Check if extraction was successful
- # if 'error' not in result:
- # return result, True
- # else:
- # return result, False
-
- # except Exception as e:
- # return {
- # 'product_id': product_id,
- # 'mandatory': {attr: [{"value": "Not Specified", "source": "error"}] for attr in mandatory_attrs.keys()},
- # 'additional': {} if extract_additional else None,
- # 'error': f"Processing error: {str(e)}"
- # }, False
- # # Process products in parallel
- # with ThreadPoolExecutor(max_workers=max_workers) as executor:
- # future_to_product = {
- # executor.submit(process_product, product): product
- # for product in products
- # }
-
- # for future in as_completed(future_to_product):
- # try:
- # result, success = future.result()
- # results.append(result)
- # if success:
- # successful += 1
- # else:
- # failed += 1
- # except Exception as e:
- # failed += 1
- # results.append({
- # 'product_id': 'unknown',
- # 'mandatory': {attr: [{"value": "Not Specified", "source": "error"}] for attr in mandatory_attrs.keys()},
- # 'additional': {} if extract_additional else None,
- # 'error': f"Unexpected error: {str(e)}"
- # })
- # return {
- # 'results': results,
- # 'total_products': len(products),
- # 'successful': successful,
- # 'failed': failed
- # }
- # @staticmethod
- # def _clean_json_response(text: str) -> str:
- # """Clean LLM response to extract valid JSON."""
- # start_idx = text.find('{')
- # end_idx = text.rfind('}')
- # if start_idx != -1 and end_idx != -1:
- # text = text[start_idx:end_idx + 1]
- # if "```json" in text:
- # text = text.split("```json")[1].split("```")[0].strip()
- # elif "```" in text:
- # text = text.split("```")[1].split("```")[0].strip()
- # if text.startswith("json"):
- # text = text[4:].strip()
- # return text
- # @staticmethod
- # def _validate_response_structure(
- # parsed: dict,
- # mandatory_attrs: Dict[str, List[str]],
- # extract_additional: bool,
- # source_map: Dict[str, str] = None
- # ) -> dict:
- # """Validate and fix the response structure, ensuring array format with source tracking."""
- # if source_map is None:
- # source_map = {}
-
- # expected_sections = ["mandatory"]
- # if extract_additional:
- # expected_sections.append("additional")
- # if not all(section in parsed for section in expected_sections):
- # if isinstance(parsed, dict):
- # mandatory_keys = set(mandatory_attrs.keys())
- # mandatory = {k: v for k, v in parsed.items() if k in mandatory_keys}
- # additional = {k: v for k, v in parsed.items() if k not in mandatory_keys}
- # result = {"mandatory": mandatory}
- # if extract_additional:
- # result["additional"] = additional
- # parsed = result
- # else:
- # return ProductAttributeService._create_error_response(
- # "Invalid response structure",
- # mandatory_attrs,
- # extract_additional,
- # str(parsed)
- # )
- # # Convert mandatory attributes to array format with source tracking
- # if "mandatory" in parsed:
- # converted_mandatory = {}
- # for attr, value in parsed["mandatory"].items():
- # if isinstance(value, list):
- # # Already in array format, ensure each item has source
- # converted_mandatory[attr] = []
- # for item in value:
- # if isinstance(item, dict) and "value" in item:
- # # Already has proper structure
- # if "source" not in item:
- # item["source"] = ProductAttributeService.find_value_source(
- # item["value"], source_map
- # )
- # converted_mandatory[attr].append(item)
- # else:
- # # Convert string to proper format
- # source = ProductAttributeService.find_value_source(str(item), source_map)
- # converted_mandatory[attr].append({"value": str(item), "source": source})
- # else:
- # # Single value - convert to array format
- # source = ProductAttributeService.find_value_source(str(value), source_map)
- # converted_mandatory[attr] = [{"value": str(value), "source": source}]
-
- # parsed["mandatory"] = converted_mandatory
- # return parsed
- # @staticmethod
- # def _create_error_response(
- # error: str,
- # mandatory_attrs: Dict[str, List[str]],
- # extract_additional: bool,
- # raw_output: Optional[str] = None
- # ) -> dict:
- # """Create a standardized error response in array format."""
- # response = {
- # "mandatory": {attr: [{"value": "Not Specified", "source": "error"}] for attr in mandatory_attrs.keys()},
- # "error": error
- # }
- # if extract_additional:
- # response["additional"] = {}
- # if raw_output:
- # response["raw_output"] = raw_output
- # return response
- # # ==================== services.py ====================
- # import requests
- # import json
- # from typing import Dict, List, Optional, Tuple
- # from django.conf import settings
- # from concurrent.futures import ThreadPoolExecutor, as_completed
- # from sentence_transformers import SentenceTransformer, util
- # import numpy as np
- # from .ocr_service import OCRService
- # # Initialize embedding model for normalization
- # model_embedder = SentenceTransformer("all-MiniLM-L6-v2")
- # class ProductAttributeService:
- # """Service class for extracting product attributes using Groq LLM."""
- # @staticmethod
- # def combine_product_text(
- # title: Optional[str] = None,
- # short_desc: Optional[str] = None,
- # long_desc: Optional[str] = None,
- # ocr_text: Optional[str] = None
- # ) -> Tuple[str, Dict[str, str]]:
- # """
- # Combine product metadata into a single text block.
- # Returns: (combined_text, source_map) where source_map tracks which text came from where
- # """
- # parts = []
- # source_map = {}
-
- # if title:
- # title_str = str(title).strip()
- # parts.append(f"Title: {title_str}")
- # source_map['title'] = title_str
- # if short_desc:
- # short_str = str(short_desc).strip()
- # parts.append(f"Description: {short_str}")
- # source_map['short_desc'] = short_str
- # if long_desc:
- # long_str = str(long_desc).strip()
- # parts.append(f"Details: {long_str}")
- # source_map['long_desc'] = long_str
- # if ocr_text:
- # parts.append(f"OCR Text: {ocr_text}")
- # source_map['ocr_text'] = ocr_text
-
- # combined = "\n".join(parts).strip()
-
- # if not combined:
- # return "No product information available", {}
-
- # return combined, source_map
- # @staticmethod
- # def find_value_source(value: str, source_map: Dict[str, str]) -> str:
- # """
- # Find which source(s) contain the given value.
- # Returns the source name(s) where the value appears.
- # """
- # value_lower = value.lower()
- # # Split value into tokens for better matching
- # value_tokens = set(value_lower.replace("-", " ").split())
-
- # sources_found = []
- # source_scores = {}
-
- # for source_name, source_text in source_map.items():
- # source_lower = source_text.lower()
-
- # # Check for exact phrase match first
- # if value_lower in source_lower:
- # source_scores[source_name] = 1.0
- # continue
-
- # # Check for token matches
- # token_matches = sum(1 for token in value_tokens if token in source_lower)
- # if token_matches > 0:
- # source_scores[source_name] = token_matches / len(value_tokens)
-
- # # Return source with highest score, or all sources if multiple have same score
- # if source_scores:
- # max_score = max(source_scores.values())
- # sources_found = [s for s, score in source_scores.items() if score == max_score]
-
- # # Prioritize: title > short_desc > long_desc > ocr_text
- # priority = ['title', 'short_desc', 'long_desc', 'ocr_text']
- # for p in priority:
- # if p in sources_found:
- # return p
-
- # return sources_found[0] if sources_found else "Not found"
-
- # return "Not found"
- # @staticmethod
- # def format_visual_attributes(visual_attributes: Dict) -> Dict:
- # """
- # Convert visual attributes to array format with source tracking.
- # Source is always 'image' for visual attributes.
- # """
- # formatted = {}
-
- # for key, value in visual_attributes.items():
- # if isinstance(value, list):
- # # Already a list (like color_palette)
- # formatted[key] = [{"value": str(item), "source": "image"} for item in value]
- # elif isinstance(value, dict):
- # # Nested dictionary - format recursively
- # nested_formatted = {}
- # for nested_key, nested_value in value.items():
- # if isinstance(nested_value, list):
- # nested_formatted[nested_key] = [{"value": str(item), "source": "image"} for item in nested_value]
- # else:
- # nested_formatted[nested_key] = [{"value": str(nested_value), "source": "image"}]
- # formatted[key] = nested_formatted
- # else:
- # # Single value
- # formatted[key] = [{"value": str(value), "source": "image"}]
-
- # return formatted
- # @staticmethod
- # def extract_attributes_from_ocr(ocr_results: Dict, model: str = None) -> Dict:
- # """Extract structured attributes from OCR text using LLM."""
- # if model is None:
- # model = settings.SUPPORTED_MODELS[0]
-
- # detected_text = ocr_results.get('detected_text', [])
- # if not detected_text:
- # return {}
-
- # # Format OCR text for prompt
- # ocr_text = "\n".join([f"Text: {item['text']}, Confidence: {item['confidence']:.2f}"
- # for item in detected_text])
-
- # prompt = f"""
- # You are an AI model that extracts structured attributes from OCR text detected on product images.
- # Given the OCR detections below, infer the possible product attributes and return them as a clean JSON object.
- # OCR Text:
- # {ocr_text}
- # Extract relevant attributes like:
- # - brand
- # - model_number
- # - size (waist_size, length, etc.)
- # - collection
- # - any other relevant product information
- # Return a JSON object with only the attributes you can confidently identify.
- # If an attribute is not present, do not include it in the response.
- # """
-
- # payload = {
- # "model": model,
- # "messages": [
- # {
- # "role": "system",
- # "content": "You are a helpful AI that extracts structured data from OCR output. Return only valid JSON."
- # },
- # {"role": "user", "content": prompt}
- # ],
- # "temperature": 0.2,
- # "max_tokens": 500
- # }
-
- # headers = {
- # "Authorization": f"Bearer {settings.GROQ_API_KEY}",
- # "Content-Type": "application/json",
- # }
-
- # try:
- # response = requests.post(
- # settings.GROQ_API_URL,
- # headers=headers,
- # json=payload,
- # timeout=30
- # )
- # response.raise_for_status()
- # result_text = response.json()["choices"][0]["message"]["content"].strip()
-
- # # Clean and parse JSON
- # result_text = ProductAttributeService._clean_json_response(result_text)
- # parsed = json.loads(result_text)
-
- # # Convert to array format with source tracking
- # formatted_attributes = {}
- # for key, value in parsed.items():
- # if key == "error":
- # continue
-
- # # Handle nested dictionaries (like size)
- # if isinstance(value, dict):
- # nested_formatted = {}
- # for nested_key, nested_value in value.items():
- # nested_formatted[nested_key] = [{"value": str(nested_value), "source": "image"}]
- # formatted_attributes[key] = nested_formatted
- # elif isinstance(value, list):
- # # Already a list, convert each item
- # formatted_attributes[key] = [{"value": str(item), "source": "image"} for item in value]
- # else:
- # # Single value
- # formatted_attributes[key] = [{"value": str(value), "source": "image"}]
-
- # return formatted_attributes
- # except Exception as e:
- # return {"error": f"Failed to extract attributes from OCR: {str(e)}"}
- # @staticmethod
- # def calculate_attribute_relationships(
- # mandatory_attrs: Dict[str, List[str]],
- # product_text: str
- # ) -> Dict[str, float]:
- # """
- # Calculate semantic relationships between attribute values across different attributes.
- # Returns a matrix of cross-attribute value similarities.
- # """
- # pt_emb = model_embedder.encode(product_text, convert_to_tensor=True)
- # # Calculate similarities between all attribute values and product text
- # attr_scores = {}
- # for attr, values in mandatory_attrs.items():
- # attr_scores[attr] = {}
- # for val in values:
- # contexts = [val, f"for {val}", f"use in {val}", f"suitable for {val}"]
- # ctx_embs = [model_embedder.encode(c, convert_to_tensor=True) for c in contexts]
- # sem_sim = max(float(util.cos_sim(pt_emb, ce).item()) for ce in ctx_embs)
- # attr_scores[attr][val] = sem_sim
- # # Calculate cross-attribute value relationships
- # relationships = {}
- # attr_list = list(mandatory_attrs.keys())
- # for i, attr1 in enumerate(attr_list):
- # for attr2 in attr_list[i+1:]:
- # # Calculate pairwise similarities between values of different attributes
- # for val1 in mandatory_attrs[attr1]:
- # for val2 in mandatory_attrs[attr2]:
- # emb1 = model_embedder.encode(val1, convert_to_tensor=True)
- # emb2 = model_embedder.encode(val2, convert_to_tensor=True)
- # sim = float(util.cos_sim(emb1, emb2).item())
- # # Store bidirectional relationships
- # key1 = f"{attr1}:{val1}->{attr2}:{val2}"
- # key2 = f"{attr2}:{val2}->{attr1}:{val1}"
- # relationships[key1] = sim
- # relationships[key2] = sim
- # return relationships
- # @staticmethod
- # def calculate_value_clusters(
- # values: List[str],
- # scores: List[Tuple[str, float]],
- # cluster_threshold: float = 0.4
- # ) -> List[List[str]]:
- # """
- # Group values into semantic clusters based on their similarity to each other.
- # Returns clusters of related values.
- # """
- # if len(values) <= 1:
- # return [[val] for val, _ in scores]
- # # Get embeddings for all values
- # embeddings = [model_embedder.encode(val, convert_to_tensor=True) for val in values]
- # # Calculate pairwise similarities
- # similarity_matrix = np.zeros((len(values), len(values)))
- # for i in range(len(values)):
- # for j in range(i+1, len(values)):
- # sim = float(util.cos_sim(embeddings[i], embeddings[j]).item())
- # similarity_matrix[i][j] = sim
- # similarity_matrix[j][i] = sim
- # # Simple clustering: group values with high similarity
- # clusters = []
- # visited = set()
- # for i, (val, score) in enumerate(scores):
- # if i in visited:
- # continue
- # cluster = [val]
- # visited.add(i)
- # # Find similar values
- # for j in range(len(values)):
- # if j not in visited and similarity_matrix[i][j] >= cluster_threshold:
- # cluster.append(values[j])
- # visited.add(j)
- # clusters.append(cluster)
- # return clusters
- # @staticmethod
- # def get_dynamic_threshold(
- # attr: str,
- # val: str,
- # base_score: float,
- # extracted_attrs: Dict[str, List[Dict[str, str]]],
- # relationships: Dict[str, float],
- # mandatory_attrs: Dict[str, List[str]],
- # base_threshold: float = 0.65,
- # boost_factor: float = 0.15
- # ) -> float:
- # """
- # Calculate dynamic threshold based on relationships with already-extracted attributes.
- # """
- # threshold = base_threshold
- # # Check relationships with already extracted attributes
- # max_relationship = 0.0
- # for other_attr, other_values_list in extracted_attrs.items():
- # if other_attr == attr:
- # continue
- # for other_val_dict in other_values_list:
- # other_val = other_val_dict['value']
- # key = f"{attr}:{val}->{other_attr}:{other_val}"
- # if key in relationships:
- # max_relationship = max(max_relationship, relationships[key])
- # # If strong relationship exists, lower threshold
- # if max_relationship > 0.6:
- # threshold = base_threshold - (boost_factor * max_relationship)
- # return max(0.3, threshold)
- # @staticmethod
- # def get_adaptive_margin(
- # scores: List[Tuple[str, float]],
- # base_margin: float = 0.15,
- # max_margin: float = 0.22
- # ) -> float:
- # """
- # Calculate adaptive margin based on score distribution.
- # """
- # if len(scores) < 2:
- # return base_margin
- # score_values = [s for _, s in scores]
- # best_score = score_values[0]
- # # If best score is very low, use adaptive margin but be more conservative
- # if best_score < 0.5:
- # # Calculate score spread in top 3-4 scores only (more selective)
- # top_scores = score_values[:min(4, len(score_values))]
- # score_range = max(top_scores) - min(top_scores)
- # # Very controlled margin increase
- # if score_range < 0.30:
- # # Much more conservative scaling
- # score_factor = (0.5 - best_score) * 0.35
- # adaptive = base_margin + score_factor + (0.30 - score_range) * 0.2
- # return min(adaptive, max_margin)
- # return base_margin
- # @staticmethod
- # def _lexical_evidence(product_text: str, label: str) -> float:
- # """Calculate lexical overlap between product text and label."""
- # pt = product_text.lower()
- # tokens = [t for t in label.lower().replace("-", " ").split() if t]
- # if not tokens:
- # return 0.0
- # hits = sum(1 for t in tokens if t in pt)
- # return hits / len(tokens)
- # @staticmethod
- # def normalize_against_product_text(
- # product_text: str,
- # mandatory_attrs: Dict[str, List[str]],
- # source_map: Dict[str, str],
- # threshold_abs: float = 0.65,
- # margin: float = 0.15,
- # allow_multiple: bool = False,
- # sem_weight: float = 0.8,
- # lex_weight: float = 0.2,
- # extracted_attrs: Optional[Dict[str, List[Dict[str, str]]]] = None,
- # relationships: Optional[Dict[str, float]] = None,
- # use_dynamic_thresholds: bool = True,
- # use_adaptive_margin: bool = True,
- # use_semantic_clustering: bool = True
- # ) -> dict:
- # """
- # Score each allowed value against the product_text with dynamic thresholds.
- # Returns dict with values in array format: [{"value": "...", "source": "..."}]
- # """
- # if extracted_attrs is None:
- # extracted_attrs = {}
- # if relationships is None:
- # relationships = {}
- # pt_emb = model_embedder.encode(product_text, convert_to_tensor=True)
- # extracted = {}
- # for attr, allowed_values in mandatory_attrs.items():
- # scores: List[Tuple[str, float]] = []
- # for val in allowed_values:
- # contexts = [val, f"for {val}", f"use in {val}", f"suitable for {val}", f"{val} room"]
- # ctx_embs = [model_embedder.encode(c, convert_to_tensor=True) for c in contexts]
- # sem_sim = max(float(util.cos_sim(pt_emb, ce).item()) for ce in ctx_embs)
- # lex_score = ProductAttributeService._lexical_evidence(product_text, val)
- # final_score = sem_weight * sem_sim + lex_weight * lex_score
- # scores.append((val, final_score))
- # scores.sort(key=lambda x: x[1], reverse=True)
- # best_val, best_score = scores[0]
- # # Calculate adaptive margin if enabled
- # effective_margin = margin
- # if allow_multiple and use_adaptive_margin:
- # effective_margin = ProductAttributeService.get_adaptive_margin(scores, margin)
- # if not allow_multiple:
- # source = ProductAttributeService.find_value_source(best_val, source_map)
- # extracted[attr] = [{"value": best_val, "source": source}]
- # else:
- # candidates = [best_val]
- # use_base_threshold = best_score >= threshold_abs
- # # Get semantic clusters if enabled
- # clusters = []
- # if use_semantic_clustering:
- # clusters = ProductAttributeService.calculate_value_clusters(
- # allowed_values, scores, cluster_threshold=0.4
- # )
- # best_cluster = next((c for c in clusters if best_val in c), [best_val])
- # for val, sc in scores[1:]:
- # # Calculate dynamic threshold for this value
- # if use_dynamic_thresholds and extracted_attrs:
- # dynamic_thresh = ProductAttributeService.get_dynamic_threshold(
- # attr, val, sc, extracted_attrs, relationships,
- # mandatory_attrs, threshold_abs
- # )
- # else:
- # dynamic_thresh = threshold_abs
- # within_margin = (best_score - sc) <= effective_margin
- # above_threshold = sc >= dynamic_thresh
- # # Check if in same semantic cluster as best value
- # in_cluster = False
- # if use_semantic_clustering and clusters:
- # in_cluster = any(best_val in c and val in c for c in clusters)
- # if use_base_threshold:
- # # Best score is good, require threshold OR (cluster + margin)
- # if above_threshold and within_margin:
- # candidates.append(val)
- # elif in_cluster and within_margin:
- # candidates.append(val)
- # else:
- # # Best score is low, use margin OR cluster logic
- # if within_margin:
- # candidates.append(val)
- # elif in_cluster and (best_score - sc) <= effective_margin * 2.0:
- # # Extended margin for cluster members
- # candidates.append(val)
- # # Map each candidate to its source and create array format
- # extracted[attr] = []
- # for candidate in candidates:
- # source = ProductAttributeService.find_value_source(candidate, source_map)
- # extracted[attr].append({"value": candidate, "source": source})
- # return extracted
- # @staticmethod
- # def extract_attributes(
- # product_text: str,
- # mandatory_attrs: Dict[str, List[str]],
- # source_map: Dict[str, str] = None,
- # model: str = None,
- # extract_additional: bool = True,
- # multiple: Optional[List[str]] = None,
- # threshold_abs: float = 0.65,
- # margin: float = 0.15,
- # use_dynamic_thresholds: bool = True,
- # use_adaptive_margin: bool = True,
- # use_semantic_clustering: bool = True
- # ) -> dict:
- # """
- # Use Groq LLM to extract attributes from any product type with enhanced multi-value selection.
- # Now returns values in array format: [{"value": "...", "source": "..."}]
- # """
-
- # if model is None:
- # model = settings.SUPPORTED_MODELS[0]
- # if multiple is None:
- # multiple = []
- # if source_map is None:
- # source_map = {}
- # # Check if product text is empty or minimal
- # if not product_text or product_text == "No product information available":
- # return ProductAttributeService._create_error_response(
- # "No product information provided",
- # mandatory_attrs,
- # extract_additional
- # )
- # # Create structured prompt for mandatory attributes
- # mandatory_attr_list = []
- # for attr_name, allowed_values in mandatory_attrs.items():
- # mandatory_attr_list.append(f"{attr_name}: {', '.join(allowed_values)}")
- # mandatory_attr_text = "\n".join(mandatory_attr_list)
- # additional_instruction = ""
- # if extract_additional:
- # additional_instruction = """
- # 2. Extract ADDITIONAL attributes: Identify any other relevant attributes from the product text
- # that are NOT in the mandatory list. Only include attributes where you can find actual values
- # in the product text. Do NOT include attributes with "Not Specified" or empty values.
-
- # Examples of attributes to look for (only if present): Brand, Material, Size, Color, Dimensions,
- # Weight, Features, Style, Theme, Pattern, Finish, Care Instructions, etc."""
- # output_format = {
- # "mandatory": {attr: "value or list of values" for attr in mandatory_attrs.keys()},
- # }
- # if extract_additional:
- # output_format["additional"] = {
- # "example_attribute_1": "actual value found",
- # "example_attribute_2": "actual value found"
- # }
- # output_format["additional"]["_note"] = "Only include attributes with actual values found in text"
- # prompt = f"""
- # You are an intelligent product attribute extractor that works with ANY product type.
- # TASK:
- # 1. Extract MANDATORY attributes: For each mandatory attribute, select the most appropriate value(s)
- # from the provided list. Choose the value(s) that best match the product description.
- # {additional_instruction}
- # Product Text:
- # {product_text}
- # Mandatory Attribute Lists (MUST select from these allowed values):
- # {mandatory_attr_text}
- # CRITICAL INSTRUCTIONS:
- # - Return ONLY valid JSON, nothing else
- # - No explanations, no markdown, no text before or after the JSON
- # - For mandatory attributes, choose the value(s) from the provided list that best match
- # - If a mandatory attribute cannot be determined from the product text, use "Not Specified"
- # - Prefer exact matches from the allowed values list over generic synonyms
- # - If multiple values are plausible, you MAY return more than one
- # {f"- For additional attributes: ONLY include attributes where you found actual values in the product text. DO NOT include attributes with 'Not Specified', 'None', 'N/A', or empty values. If you cannot find a value for an attribute, simply don't include that attribute." if extract_additional else ""}
- # - Be precise and only extract information that is explicitly stated or clearly implied
- # Required Output Format:
- # {json.dumps(output_format, indent=2)}
- # """
- # payload = {
- # "model": model,
- # "messages": [
- # {
- # "role": "system",
- # "content": f"You are a precise attribute extraction model. Return ONLY valid JSON with {'mandatory and additional' if extract_additional else 'mandatory'} sections. No explanations, no markdown, no other text."
- # },
- # {"role": "user", "content": prompt}
- # ],
- # "temperature": 0.0,
- # "max_tokens": 1500
- # }
- # headers = {
- # "Authorization": f"Bearer {settings.GROQ_API_KEY}",
- # "Content-Type": "application/json",
- # }
- # try:
- # response = requests.post(
- # settings.GROQ_API_URL,
- # headers=headers,
- # json=payload,
- # timeout=30
- # )
- # response.raise_for_status()
- # result_text = response.json()["choices"][0]["message"]["content"].strip()
- # # Clean the response
- # result_text = ProductAttributeService._clean_json_response(result_text)
- # # Parse JSON
- # parsed = json.loads(result_text)
- # # Validate and restructure with source tracking
- # parsed = ProductAttributeService._validate_response_structure(
- # parsed, mandatory_attrs, extract_additional, source_map
- # )
- # # Clean up and add source tracking to additional attributes in array format
- # if extract_additional and "additional" in parsed:
- # cleaned_additional = {}
- # for k, v in parsed["additional"].items():
- # if v and v not in ["Not Specified", "None", "N/A", "", "not specified", "none", "n/a"]:
- # if not (isinstance(v, str) and v.lower() in ["not specified", "none", "n/a", ""]):
- # # Convert to array format if not already
- # if isinstance(v, list):
- # cleaned_additional[k] = []
- # for item in v:
- # if isinstance(item, dict) and "value" in item:
- # if "source" not in item:
- # item["source"] = ProductAttributeService.find_value_source(
- # item["value"], source_map
- # )
- # cleaned_additional[k].append(item)
- # else:
- # source = ProductAttributeService.find_value_source(str(item), source_map)
- # cleaned_additional[k].append({"value": str(item), "source": source})
- # else:
- # source = ProductAttributeService.find_value_source(str(v), source_map)
- # cleaned_additional[k] = [{"value": str(v), "source": source}]
- # parsed["additional"] = cleaned_additional
- # # Calculate attribute relationships if using dynamic thresholds
- # relationships = {}
- # if use_dynamic_thresholds:
- # relationships = ProductAttributeService.calculate_attribute_relationships(
- # mandatory_attrs, product_text
- # )
- # # Process attributes in order, allowing earlier ones to influence later ones
- # extracted_so_far = {}
- # for attr in mandatory_attrs.keys():
- # allow_multiple = attr in multiple
- # result = ProductAttributeService.normalize_against_product_text(
- # product_text=product_text,
- # mandatory_attrs={attr: mandatory_attrs[attr]},
- # source_map=source_map,
- # threshold_abs=threshold_abs,
- # margin=margin,
- # allow_multiple=allow_multiple,
- # extracted_attrs=extracted_so_far,
- # relationships=relationships,
- # use_dynamic_thresholds=use_dynamic_thresholds,
- # use_adaptive_margin=use_adaptive_margin,
- # use_semantic_clustering=use_semantic_clustering
- # )
- # # Result is already in array format from normalize_against_product_text
- # parsed["mandatory"][attr] = result[attr]
- # extracted_so_far[attr] = result[attr]
- # return parsed
- # except requests.exceptions.RequestException as e:
- # return ProductAttributeService._create_error_response(
- # str(e), mandatory_attrs, extract_additional
- # )
- # except json.JSONDecodeError as e:
- # return ProductAttributeService._create_error_response(
- # f"Invalid JSON: {str(e)}", mandatory_attrs, extract_additional, result_text
- # )
- # except Exception as e:
- # return ProductAttributeService._create_error_response(
- # str(e), mandatory_attrs, extract_additional
- # )
- # @staticmethod
- # def extract_attributes_batch(
- # products: List[Dict],
- # mandatory_attrs: Dict[str, List[str]],
- # model: str = None,
- # extract_additional: bool = True,
- # process_image: bool = True,
- # max_workers: int = 5,
- # multiple: Optional[List[str]] = None,
- # threshold_abs: float = 0.65,
- # margin: float = 0.15,
- # use_dynamic_thresholds: bool = True,
- # use_adaptive_margin: bool = True,
- # use_semantic_clustering: bool = True
- # ) -> Dict:
- # """Extract attributes for multiple products in parallel with enhanced multi-value selection and source tracking."""
- # results = []
- # successful = 0
- # failed = 0
-
- # ocr_service = OCRService()
- # if multiple is None:
- # multiple = []
- # def process_product(product_data):
- # """Process a single product."""
- # product_id = product_data.get('product_id', f"product_{len(results)}")
-
- # try:
- # # Process image if URL is provided
- # ocr_results = None
- # ocr_text = None
-
- # if process_image and product_data.get('image_url'):
- # ocr_results = ocr_service.process_image(product_data['image_url'])
-
- # # Extract attributes from OCR
- # if ocr_results and ocr_results.get('detected_text'):
- # ocr_attrs = ProductAttributeService.extract_attributes_from_ocr(
- # ocr_results, model
- # )
- # ocr_results['extracted_attributes'] = ocr_attrs
-
- # # Format OCR text for combining with product text
- # ocr_text = "\n".join([
- # f"{item['text']} (confidence: {item['confidence']:.2f})"
- # for item in ocr_results['detected_text']
- # ])
-
- # # Combine all product information with source tracking
- # product_text, source_map = ProductAttributeService.combine_product_text(
- # title=product_data.get('title'),
- # short_desc=product_data.get('short_desc'),
- # long_desc=product_data.get('long_desc'),
- # ocr_text=ocr_text
- # )
-
- # # Extract attributes from combined text with enhanced features
- # result = ProductAttributeService.extract_attributes(
- # product_text=product_text,
- # mandatory_attrs=mandatory_attrs,
- # source_map=source_map,
- # model=model,
- # extract_additional=extract_additional,
- # multiple=multiple,
- # threshold_abs=threshold_abs,
- # margin=margin,
- # use_dynamic_thresholds=use_dynamic_thresholds,
- # use_adaptive_margin=use_adaptive_margin,
- # use_semantic_clustering=use_semantic_clustering
- # )
-
- # result['product_id'] = product_id
-
- # # Add OCR results if available (already in correct format)
- # if ocr_results:
- # result['ocr_results'] = ocr_results
-
- # # Check if extraction was successful
- # if 'error' not in result:
- # return result, True
- # else:
- # return result, False
-
- # except Exception as e:
- # return {
- # 'product_id': product_id,
- # 'mandatory': {attr: [{"value": "Not Specified", "source": "error"}] for attr in mandatory_attrs.keys()},
- # 'additional': {} if extract_additional else None,
- # 'error': f"Processing error: {str(e)}"
- # }, False
- # # Process products in parallel
- # with ThreadPoolExecutor(max_workers=max_workers) as executor:
- # future_to_product = {
- # executor.submit(process_product, product): product
- # for product in products
- # }
-
- # for future in as_completed(future_to_product):
- # try:
- # result, success = future.result()
- # results.append(result)
- # if success:
- # successful += 1
- # else:
- # failed += 1
- # except Exception as e:
- # failed += 1
- # results.append({
- # 'product_id': 'unknown',
- # 'mandatory': {attr: [{"value": "Not Specified", "source": "error"}] for attr in mandatory_attrs.keys()},
- # 'additional': {} if extract_additional else None,
- # 'error': f"Unexpected error: {str(e)}"
- # })
- # return {
- # 'results': results,
- # 'total_products': len(products),
- # 'successful': successful,
- # 'failed': failed
- # }
- # @staticmethod
- # def _clean_json_response(text: str) -> str:
- # """Clean LLM response to extract valid JSON."""
- # start_idx = text.find('{')
- # end_idx = text.rfind('}')
- # if start_idx != -1 and end_idx != -1:
- # text = text[start_idx:end_idx + 1]
- # if "```json" in text:
- # text = text.split("```json")[1].split("```")[0].strip()
- # elif "```" in text:
- # text = text.split("```")[1].split("```")[0].strip()
- # if text.startswith("json"):
- # text = text[4:].strip()
- # return text
- # @staticmethod
- # def _validate_response_structure(
- # parsed: dict,
- # mandatory_attrs: Dict[str, List[str]],
- # extract_additional: bool,
- # source_map: Dict[str, str] = None
- # ) -> dict:
- # """Validate and fix the response structure, ensuring array format with source tracking."""
- # if source_map is None:
- # source_map = {}
-
- # expected_sections = ["mandatory"]
- # if extract_additional:
- # expected_sections.append("additional")
- # if not all(section in parsed for section in expected_sections):
- # if isinstance(parsed, dict):
- # mandatory_keys = set(mandatory_attrs.keys())
- # mandatory = {k: v for k, v in parsed.items() if k in mandatory_keys}
- # additional = {k: v for k, v in parsed.items() if k not in mandatory_keys}
- # result = {"mandatory": mandatory}
- # if extract_additional:
- # result["additional"] = additional
- # parsed = result
- # else:
- # return ProductAttributeService._create_error_response(
- # "Invalid response structure",
- # mandatory_attrs,
- # extract_additional,
- # str(parsed)
- # )
- # # Convert mandatory attributes to array format with source tracking
- # if "mandatory" in parsed:
- # converted_mandatory = {}
- # for attr, value in parsed["mandatory"].items():
- # if isinstance(value, list):
- # # Already in array format, ensure each item has source
- # converted_mandatory[attr] = []
- # for item in value:
- # if isinstance(item, dict) and "value" in item:
- # # Already has proper structure
- # if "source" not in item:
- # item["source"] = ProductAttributeService.find_value_source(
- # item["value"], source_map
- # )
- # converted_mandatory[attr].append(item)
- # else:
- # # Convert string to proper format
- # source = ProductAttributeService.find_value_source(str(item), source_map)
- # converted_mandatory[attr].append({"value": str(item), "source": source})
- # else:
- # # Single value - convert to array format
- # source = ProductAttributeService.find_value_source(str(value), source_map)
- # converted_mandatory[attr] = [{"value": str(value), "source": source}]
-
- # parsed["mandatory"] = converted_mandatory
- # return parsed
- # @staticmethod
- # def _create_error_response(
- # error: str,
- # mandatory_attrs: Dict[str, List[str]],
- # extract_additional: bool,
- # raw_output: Optional[str] = None
- # ) -> dict:
- # """Create a standardized error response in array format."""
- # response = {
- # "mandatory": {attr: [{"value": "Not Specified", "source": "error"}] for attr in mandatory_attrs.keys()},
- # "error": error
- # }
- # if extract_additional:
- # response["additional"] = {}
- # if raw_output:
- # response["raw_output"] = raw_output
- # return response
- # ==================== services.py ====================
- import requests
- import json
- import re
- from typing import Dict, List, Optional, Tuple
- from django.conf import settings
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from sentence_transformers import SentenceTransformer, util
- import numpy as np
- from .ocr_service import OCRService
- # Initialize embedding model for normalization
- model_embedder = SentenceTransformer("all-MiniLM-L6-v2")
- class ProductAttributeService:
- """Service class for extracting product attributes using Groq LLM."""
- @staticmethod
- def normalize_dimension_text(text: str) -> str:
- """
- Normalize dimension text to match format like '16x20', '20x30', etc.
- Handles formats like '16 x 20', '16x1.5x20', '16 x 1.5 x 20 Inches'
- Returns the normalized dimension (e.g., '16x20') or empty string if not found.
- """
- if not text:
- return ""
-
- # Convert to lowercase and remove common units
- text = text.lower()
- text = re.sub(r'\s*(inches|inch|in|cm|centimeters|mm|millimeters)\s*', '', text, flags=re.IGNORECASE)
-
- # Extract all numbers from the text
- numbers = re.findall(r'\d+\.?\d*', text)
-
- if not numbers:
- return ""
-
- # Convert to floats first to handle decimals properly
- float_numbers = []
- for num in numbers:
- try:
- float_numbers.append(float(num))
- except:
- continue
-
- if len(float_numbers) < 2:
- return ""
-
- # If we have 3 dimensions, remove the smallest one (usually thickness/depth)
- # This handles cases like "16 x 1.5 x 20" -> keep 16 and 20, drop 1.5
- if len(float_numbers) >= 3:
- # Sort and remove the smallest
- float_numbers_sorted = sorted(float_numbers, reverse=True)
- float_numbers = float_numbers_sorted[:2]
-
- # Convert to integers for dimension matching
- int_numbers = [int(num) for num in float_numbers]
-
- # Sort to ensure consistent order (smaller x larger)
- int_numbers.sort()
-
- # Return formatted dimension
- return f"{int_numbers[0]}x{int_numbers[1]}"
- @staticmethod
- def normalize_value_for_matching(value: str, attr_name: str = "") -> str:
- """
- Normalize a value based on its attribute type for better matching.
- Currently handles dimensions specially, can be extended for other attributes.
- """
- # Check if this is a dimension-related attribute
- dimension_keywords = ['dimension', 'size', 'measurement']
- if any(keyword in attr_name.lower() for keyword in dimension_keywords):
- normalized = ProductAttributeService.normalize_dimension_text(value)
- if normalized:
- return normalized
-
- # For other attributes, just return cleaned value
- return value.strip()
- @staticmethod
- def combine_product_text(
- title: Optional[str] = None,
- short_desc: Optional[str] = None,
- long_desc: Optional[str] = None,
- ocr_text: Optional[str] = None
- ) -> Tuple[str, Dict[str, str]]:
- """
- Combine product metadata into a single text block.
- Returns: (combined_text, source_map) where source_map tracks which text came from where
- """
- parts = []
- source_map = {}
-
- if title:
- title_str = str(title).strip()
- parts.append(f"Title: {title_str}")
- source_map['title'] = title_str
- if short_desc:
- short_str = str(short_desc).strip()
- parts.append(f"Description: {short_str}")
- source_map['short_desc'] = short_str
- if long_desc:
- long_str = str(long_desc).strip()
- parts.append(f"Details: {long_str}")
- source_map['long_desc'] = long_str
- if ocr_text:
- parts.append(f"OCR Text: {ocr_text}")
- source_map['ocr_text'] = ocr_text
-
- combined = "\n".join(parts).strip()
-
- if not combined:
- return "No product information available", {}
-
- return combined, source_map
- @staticmethod
- def find_value_source(value: str, source_map: Dict[str, str], attr_name: str = "") -> str:
- """
- Find which source(s) contain the given value.
- Returns the source name(s) where the value appears.
- Now handles normalized matching for dimensions.
- """
- value_lower = value.lower()
- # Split value into tokens for better matching
- value_tokens = set(value_lower.replace("-", " ").replace("x", " ").split())
-
- # Check if this is a dimension-related attribute
- is_dimension_attr = any(keyword in attr_name.lower() for keyword in ['dimension', 'size', 'measurement'])
-
- sources_found = []
- source_scores = {}
-
- for source_name, source_text in source_map.items():
- source_lower = source_text.lower()
-
- # Check for exact phrase match first
- if value_lower in source_lower:
- source_scores[source_name] = 1.0
- continue
-
- # For dimensions, check normalized match
- if is_dimension_attr:
- # Normalize the value (e.g., "16x20" stays "16x20")
- normalized_value = ProductAttributeService.normalize_dimension_text(value)
- if not normalized_value:
- normalized_value = value.replace("x", " ").strip()
-
- # Normalize the source text to extract dimensions
- normalized_source = ProductAttributeService.normalize_dimension_text(source_text)
-
- # Direct match
- if normalized_value == normalized_source:
- source_scores[source_name] = 0.95
- continue
-
- # Also check if the dimension numbers appear in the source
- # Extract dimension parts (e.g., "16x20" -> ["16", "20"])
- dim_parts = normalized_value.split("x") if "x" in normalized_value else []
- if len(dim_parts) == 2:
- # Check if both numbers appear in the source
- if all(part in source_text for part in dim_parts):
- source_scores[source_name] = 0.85
- continue
-
- # Check for token matches
- token_matches = sum(1 for token in value_tokens if token and token in source_lower)
- if token_matches > 0 and len(value_tokens) > 0:
- source_scores[source_name] = token_matches / len(value_tokens)
-
- # Return source with highest score, or all sources if multiple have same score
- if source_scores:
- max_score = max(source_scores.values())
- sources_found = [s for s, score in source_scores.items() if score == max_score]
-
- # Prioritize: title > short_desc > long_desc > ocr_text
- priority = ['title', 'short_desc', 'long_desc', 'ocr_text']
- for p in priority:
- if p in sources_found:
- return p
-
- return sources_found[0] if sources_found else "Not found"
-
- return "Not found"
- @staticmethod
- def format_visual_attributes(visual_attributes: Dict) -> Dict:
- """
- Convert visual attributes to array format with source tracking.
- Source is always 'image' for visual attributes.
- """
- formatted = {}
-
- for key, value in visual_attributes.items():
- if isinstance(value, list):
- # Already a list (like color_palette)
- formatted[key] = [{"value": str(item), "source": "image"} for item in value]
- elif isinstance(value, dict):
- # Nested dictionary - format recursively
- nested_formatted = {}
- for nested_key, nested_value in value.items():
- if isinstance(nested_value, list):
- nested_formatted[nested_key] = [{"value": str(item), "source": "image"} for item in nested_value]
- else:
- nested_formatted[nested_key] = [{"value": str(nested_value), "source": "image"}]
- formatted[key] = nested_formatted
- else:
- # Single value
- formatted[key] = [{"value": str(value), "source": "image"}]
-
- return formatted
- @staticmethod
- def extract_attributes_from_ocr(ocr_results: Dict, model: str = None) -> Dict:
- """Extract structured attributes from OCR text using LLM."""
- if model is None:
- model = settings.SUPPORTED_MODELS[0]
-
- detected_text = ocr_results.get('detected_text', [])
- if not detected_text:
- return {}
-
- # Format OCR text for prompt
- ocr_text = "\n".join([f"Text: {item['text']}, Confidence: {item['confidence']:.2f}"
- for item in detected_text])
-
- prompt = f"""
- You are an AI model that extracts structured attributes from OCR text detected on product images.
- Given the OCR detections below, infer the possible product attributes and return them as a clean JSON object.
- OCR Text:
- {ocr_text}
- Extract relevant attributes like:
- - brand
- - model_number
- - size (waist_size, length, etc.)
- - collection
- - any other relevant product information
- Return a JSON object with only the attributes you can confidently identify.
- If an attribute is not present, do not include it in the response.
- """
-
- payload = {
- "model": model,
- "messages": [
- {
- "role": "system",
- "content": "You are a helpful AI that extracts structured data from OCR output. Return only valid JSON."
- },
- {"role": "user", "content": prompt}
- ],
- "temperature": 0.2,
- "max_tokens": 500
- }
-
- headers = {
- "Authorization": f"Bearer {settings.GROQ_API_KEY}",
- "Content-Type": "application/json",
- }
-
- try:
- response = requests.post(
- settings.GROQ_API_URL,
- headers=headers,
- json=payload,
- timeout=30
- )
- response.raise_for_status()
- result_text = response.json()["choices"][0]["message"]["content"].strip()
-
- # Clean and parse JSON
- result_text = ProductAttributeService._clean_json_response(result_text)
- parsed = json.loads(result_text)
-
- # Convert to array format with source tracking
- formatted_attributes = {}
- for key, value in parsed.items():
- if key == "error":
- continue
-
- # Handle nested dictionaries (like size)
- if isinstance(value, dict):
- nested_formatted = {}
- for nested_key, nested_value in value.items():
- nested_formatted[nested_key] = [{"value": str(nested_value), "source": "image"}]
- formatted_attributes[key] = nested_formatted
- elif isinstance(value, list):
- # Already a list, convert each item
- formatted_attributes[key] = [{"value": str(item), "source": "image"} for item in value]
- else:
- # Single value
- formatted_attributes[key] = [{"value": str(value), "source": "image"}]
-
- return formatted_attributes
- except Exception as e:
- return {"error": f"Failed to extract attributes from OCR: {str(e)}"}
- @staticmethod
- def calculate_attribute_relationships(
- mandatory_attrs: Dict[str, List[str]],
- product_text: str
- ) -> Dict[str, float]:
- """
- Calculate semantic relationships between attribute values across different attributes.
- Returns a matrix of cross-attribute value similarities.
- """
- pt_emb = model_embedder.encode(product_text, convert_to_tensor=True)
- # Calculate similarities between all attribute values and product text
- attr_scores = {}
- for attr, values in mandatory_attrs.items():
- attr_scores[attr] = {}
- for val in values:
- contexts = [val, f"for {val}", f"use in {val}", f"suitable for {val}"]
- ctx_embs = [model_embedder.encode(c, convert_to_tensor=True) for c in contexts]
- sem_sim = max(float(util.cos_sim(pt_emb, ce).item()) for ce in ctx_embs)
- attr_scores[attr][val] = sem_sim
- # Calculate cross-attribute value relationships
- relationships = {}
- attr_list = list(mandatory_attrs.keys())
- for i, attr1 in enumerate(attr_list):
- for attr2 in attr_list[i+1:]:
- # Calculate pairwise similarities between values of different attributes
- for val1 in mandatory_attrs[attr1]:
- for val2 in mandatory_attrs[attr2]:
- emb1 = model_embedder.encode(val1, convert_to_tensor=True)
- emb2 = model_embedder.encode(val2, convert_to_tensor=True)
- sim = float(util.cos_sim(emb1, emb2).item())
- # Store bidirectional relationships
- key1 = f"{attr1}:{val1}->{attr2}:{val2}"
- key2 = f"{attr2}:{val2}->{attr1}:{val1}"
- relationships[key1] = sim
- relationships[key2] = sim
- return relationships
- @staticmethod
- def calculate_value_clusters(
- values: List[str],
- scores: List[Tuple[str, float]],
- cluster_threshold: float = 0.4
- ) -> List[List[str]]:
- """
- Group values into semantic clusters based on their similarity to each other.
- Returns clusters of related values.
- """
- if len(values) <= 1:
- return [[val] for val, _ in scores]
- # Get embeddings for all values
- embeddings = [model_embedder.encode(val, convert_to_tensor=True) for val in values]
- # Calculate pairwise similarities
- similarity_matrix = np.zeros((len(values), len(values)))
- for i in range(len(values)):
- for j in range(i+1, len(values)):
- sim = float(util.cos_sim(embeddings[i], embeddings[j]).item())
- similarity_matrix[i][j] = sim
- similarity_matrix[j][i] = sim
- # Simple clustering: group values with high similarity
- clusters = []
- visited = set()
- for i, (val, score) in enumerate(scores):
- if i in visited:
- continue
- cluster = [val]
- visited.add(i)
- # Find similar values
- for j in range(len(values)):
- if j not in visited and similarity_matrix[i][j] >= cluster_threshold:
- cluster.append(values[j])
- visited.add(j)
- clusters.append(cluster)
- return clusters
- @staticmethod
- def get_dynamic_threshold(
- attr: str,
- val: str,
- base_score: float,
- extracted_attrs: Dict[str, List[Dict[str, str]]],
- relationships: Dict[str, float],
- mandatory_attrs: Dict[str, List[str]],
- base_threshold: float = 0.65,
- boost_factor: float = 0.15
- ) -> float:
- """
- Calculate dynamic threshold based on relationships with already-extracted attributes.
- """
- threshold = base_threshold
- # Check relationships with already extracted attributes
- max_relationship = 0.0
- for other_attr, other_values_list in extracted_attrs.items():
- if other_attr == attr:
- continue
- for other_val_dict in other_values_list:
- other_val = other_val_dict['value']
- key = f"{attr}:{val}->{other_attr}:{other_val}"
- if key in relationships:
- max_relationship = max(max_relationship, relationships[key])
- # If strong relationship exists, lower threshold
- if max_relationship > 0.6:
- threshold = base_threshold - (boost_factor * max_relationship)
- return max(0.3, threshold)
- @staticmethod
- def get_adaptive_margin(
- scores: List[Tuple[str, float]],
- base_margin: float = 0.15,
- max_margin: float = 0.22
- ) -> float:
- """
- Calculate adaptive margin based on score distribution.
- """
- if len(scores) < 2:
- return base_margin
- score_values = [s for _, s in scores]
- best_score = score_values[0]
- # If best score is very low, use adaptive margin but be more conservative
- if best_score < 0.5:
- # Calculate score spread in top 3-4 scores only (more selective)
- top_scores = score_values[:min(4, len(score_values))]
- score_range = max(top_scores) - min(top_scores)
- # Very controlled margin increase
- if score_range < 0.30:
- # Much more conservative scaling
- score_factor = (0.5 - best_score) * 0.35
- adaptive = base_margin + score_factor + (0.30 - score_range) * 0.2
- return min(adaptive, max_margin)
- return base_margin
- @staticmethod
- def _lexical_evidence(product_text: str, label: str) -> float:
- """Calculate lexical overlap between product text and label."""
- pt = product_text.lower()
- tokens = [t for t in label.lower().replace("-", " ").split() if t]
- if not tokens:
- return 0.0
- hits = sum(1 for t in tokens if t in pt)
- return hits / len(tokens)
- @staticmethod
- def normalize_against_product_text(
- product_text: str,
- mandatory_attrs: Dict[str, List[str]],
- source_map: Dict[str, str],
- threshold_abs: float = 0.65,
- margin: float = 0.15,
- allow_multiple: bool = False,
- sem_weight: float = 0.8,
- lex_weight: float = 0.2,
- extracted_attrs: Optional[Dict[str, List[Dict[str, str]]]] = None,
- relationships: Optional[Dict[str, float]] = None,
- use_dynamic_thresholds: bool = True,
- use_adaptive_margin: bool = True,
- use_semantic_clustering: bool = True
- ) -> dict:
- """
- Score each allowed value against the product_text with dynamic thresholds.
- Returns dict with values in array format: [{"value": "...", "source": "..."}]
- """
- if extracted_attrs is None:
- extracted_attrs = {}
- if relationships is None:
- relationships = {}
- pt_emb = model_embedder.encode(product_text, convert_to_tensor=True)
- extracted = {}
- for attr, allowed_values in mandatory_attrs.items():
- scores: List[Tuple[str, float]] = []
-
- # Normalize product text for dimension matching if needed
- normalized_product_text = product_text
- is_dimension_attr = any(keyword in attr.lower() for keyword in ['dimension', 'size', 'measurement'])
- for val in allowed_values:
- # For dimension attributes, try normalized matching first
- if is_dimension_attr:
- normalized_val = ProductAttributeService.normalize_dimension_text(val)
- normalized_pt = ProductAttributeService.normalize_dimension_text(product_text)
-
- # If we find exact normalized match, give it highest score
- if normalized_val and normalized_pt and normalized_val == normalized_pt:
- scores.append((val, 0.99))
- continue
- # Also check if dimension appears anywhere in the text
- elif normalized_val and normalized_val in normalized_pt:
- scores.append((val, 0.95))
- continue
-
- # Standard semantic matching
- contexts = [val, f"for {val}", f"use in {val}", f"suitable for {val}", f"{val} room"]
- ctx_embs = [model_embedder.encode(c, convert_to_tensor=True) for c in contexts]
- sem_sim = max(float(util.cos_sim(pt_emb, ce).item()) for ce in ctx_embs)
- lex_score = ProductAttributeService._lexical_evidence(product_text, val)
- final_score = sem_weight * sem_sim + lex_weight * lex_score
- scores.append((val, final_score))
- scores.sort(key=lambda x: x[1], reverse=True)
- best_val, best_score = scores[0]
- # Calculate adaptive margin if enabled
- effective_margin = margin
- if allow_multiple and use_adaptive_margin:
- effective_margin = ProductAttributeService.get_adaptive_margin(scores, margin)
- # Special handling for dimension attributes with exact matches
- # If we have a very high score (0.90+), it means we found an exact/normalized match
- # In this case, don't apply multiple selection logic - just return the best match
- if is_dimension_attr and best_score >= 0.90:
- source = ProductAttributeService.find_value_source(best_val, source_map, attr)
- extracted[attr] = [{"value": best_val, "source": source}]
- continue
- if not allow_multiple:
- source = ProductAttributeService.find_value_source(best_val, source_map, attr)
- extracted[attr] = [{"value": best_val, "source": source}]
- else:
- candidates = [best_val]
- use_base_threshold = best_score >= threshold_abs
- # Get semantic clusters if enabled
- clusters = []
- if use_semantic_clustering:
- clusters = ProductAttributeService.calculate_value_clusters(
- allowed_values, scores, cluster_threshold=0.4
- )
- best_cluster = next((c for c in clusters if best_val in c), [best_val])
- for val, sc in scores[1:]:
- # Skip values with very low scores - they're likely not relevant
- # For dimension attributes, be even more strict
- min_score = 0.4 if is_dimension_attr else 0.3
- if sc < min_score:
- continue
-
- # Calculate dynamic threshold for this value
- if use_dynamic_thresholds and extracted_attrs:
- dynamic_thresh = ProductAttributeService.get_dynamic_threshold(
- attr, val, sc, extracted_attrs, relationships,
- mandatory_attrs, threshold_abs
- )
- else:
- dynamic_thresh = threshold_abs
- within_margin = (best_score - sc) <= effective_margin
- above_threshold = sc >= dynamic_thresh
- # Check if in same semantic cluster as best value
- in_cluster = False
- if use_semantic_clustering and clusters:
- in_cluster = any(best_val in c and val in c for c in clusters)
- if use_base_threshold:
- # Best score is good, require threshold OR (cluster + margin)
- if above_threshold and within_margin:
- candidates.append(val)
- elif in_cluster and within_margin:
- candidates.append(val)
- else:
- # Best score is low, use margin OR cluster logic
- if within_margin:
- candidates.append(val)
- elif in_cluster and (best_score - sc) <= effective_margin * 2.0:
- # Extended margin for cluster members
- candidates.append(val)
- # Map each candidate to its source and create array format
- extracted[attr] = []
- for candidate in candidates:
- source = ProductAttributeService.find_value_source(candidate, source_map, attr)
- extracted[attr].append({"value": candidate, "source": source})
- return extracted
- @staticmethod
- def extract_attributes(
- product_text: str,
- mandatory_attrs: Dict[str, List[str]],
- source_map: Dict[str, str] = None,
- model: str = None,
- extract_additional: bool = True,
- multiple: Optional[List[str]] = None,
- threshold_abs: float = 0.65,
- margin: float = 0.15,
- use_dynamic_thresholds: bool = True,
- use_adaptive_margin: bool = True,
- use_semantic_clustering: bool = True
- ) -> dict:
- """
- Use Groq LLM to extract attributes from any product type with enhanced multi-value selection.
- Now returns values in array format: [{"value": "...", "source": "..."}]
- """
-
- if model is None:
- model = settings.SUPPORTED_MODELS[0]
- if multiple is None:
- multiple = []
- if source_map is None:
- source_map = {}
- # Check if product text is empty or minimal
- if not product_text or product_text == "No product information available":
- return ProductAttributeService._create_error_response(
- "No product information provided",
- mandatory_attrs,
- extract_additional
- )
- # Create structured prompt for mandatory attributes
- mandatory_attr_list = []
- for attr_name, allowed_values in mandatory_attrs.items():
- mandatory_attr_list.append(f"{attr_name}: {', '.join(allowed_values)}")
- mandatory_attr_text = "\n".join(mandatory_attr_list)
- additional_instruction = ""
- if extract_additional:
- additional_instruction = """
- 2. Extract ADDITIONAL attributes: Identify any other relevant attributes from the product text
- that are NOT in the mandatory list. Only include attributes where you can find actual values
- in the product text. Do NOT include attributes with "Not Specified" or empty values.
-
- Examples of attributes to look for (only if present): Brand, Material, Size, Color, Dimensions,
- Weight, Features, Style, Theme, Pattern, Finish, Care Instructions, etc."""
- output_format = {
- "mandatory": {attr: "value or list of values" for attr in mandatory_attrs.keys()},
- }
- if extract_additional:
- output_format["additional"] = {
- "example_attribute_1": "actual value found",
- "example_attribute_2": "actual value found"
- }
- output_format["additional"]["_note"] = "Only include attributes with actual values found in text"
- prompt = f"""
- You are an intelligent product attribute extractor that works with ANY product type.
- TASK:
- 1. Extract MANDATORY attributes: For each mandatory attribute, select the most appropriate value(s)
- from the provided list. Choose the value(s) that best match the product description.
- {additional_instruction}
- Product Text:
- {product_text}
- Mandatory Attribute Lists (MUST select from these allowed values):
- {mandatory_attr_text}
- CRITICAL INSTRUCTIONS:
- - Return ONLY valid JSON, nothing else
- - No explanations, no markdown, no text before or after the JSON
- - For mandatory attributes, choose the value(s) from the provided list that best match
- - If a mandatory attribute cannot be determined from the product text, use "Not Specified"
- - Prefer exact matches from the allowed values list over generic synonyms
- - If multiple values are plausible, you MAY return more than one
- {f"- For additional attributes: ONLY include attributes where you found actual values in the product text. DO NOT include attributes with 'Not Specified', 'None', 'N/A', or empty values. If you cannot find a value for an attribute, simply don't include that attribute." if extract_additional else ""}
- - Be precise and only extract information that is explicitly stated or clearly implied
- Required Output Format:
- {json.dumps(output_format, indent=2)}
- """
- payload = {
- "model": model,
- "messages": [
- {
- "role": "system",
- "content": f"You are a precise attribute extraction model. Return ONLY valid JSON with {'mandatory and additional' if extract_additional else 'mandatory'} sections. No explanations, no markdown, no other text."
- },
- {"role": "user", "content": prompt}
- ],
- "temperature": 0.0,
- "max_tokens": 1500
- }
- headers = {
- "Authorization": f"Bearer {settings.GROQ_API_KEY}",
- "Content-Type": "application/json",
- }
- try:
- response = requests.post(
- settings.GROQ_API_URL,
- headers=headers,
- json=payload,
- timeout=30
- )
- response.raise_for_status()
- result_text = response.json()["choices"][0]["message"]["content"].strip()
- # Clean the response
- result_text = ProductAttributeService._clean_json_response(result_text)
- # Parse JSON
- parsed = json.loads(result_text)
- # Validate and restructure with source tracking
- parsed = ProductAttributeService._validate_response_structure(
- parsed, mandatory_attrs, extract_additional, source_map
- )
- # Clean up and add source tracking to additional attributes in array format
- if extract_additional and "additional" in parsed:
- cleaned_additional = {}
- for k, v in parsed["additional"].items():
- if v and v not in ["Not Specified", "None", "N/A", "", "not specified", "none", "n/a"]:
- if not (isinstance(v, str) and v.lower() in ["not specified", "none", "n/a", ""]):
- # Convert to array format if not already
- if isinstance(v, list):
- cleaned_additional[k] = []
- for item in v:
- if isinstance(item, dict) and "value" in item:
- if "source" not in item:
- item["source"] = ProductAttributeService.find_value_source(
- item["value"], source_map, k
- )
- cleaned_additional[k].append(item)
- else:
- source = ProductAttributeService.find_value_source(str(item), source_map, k)
- cleaned_additional[k].append({"value": str(item), "source": source})
- else:
- source = ProductAttributeService.find_value_source(str(v), source_map, k)
- cleaned_additional[k] = [{"value": str(v), "source": source}]
- parsed["additional"] = cleaned_additional
- # Calculate attribute relationships if using dynamic thresholds
- relationships = {}
- if use_dynamic_thresholds:
- relationships = ProductAttributeService.calculate_attribute_relationships(
- mandatory_attrs, product_text
- )
- # Process attributes in order, allowing earlier ones to influence later ones
- extracted_so_far = {}
- for attr in mandatory_attrs.keys():
- allow_multiple = attr in multiple
- result = ProductAttributeService.normalize_against_product_text(
- product_text=product_text,
- mandatory_attrs={attr: mandatory_attrs[attr]},
- source_map=source_map,
- threshold_abs=threshold_abs,
- margin=margin,
- allow_multiple=allow_multiple,
- extracted_attrs=extracted_so_far,
- relationships=relationships,
- use_dynamic_thresholds=use_dynamic_thresholds,
- use_adaptive_margin=use_adaptive_margin,
- use_semantic_clustering=use_semantic_clustering
- )
- # Result is already in array format from normalize_against_product_text
- parsed["mandatory"][attr] = result[attr]
- extracted_so_far[attr] = result[attr]
- return parsed
- except requests.exceptions.RequestException as e:
- return ProductAttributeService._create_error_response(
- str(e), mandatory_attrs, extract_additional
- )
- except json.JSONDecodeError as e:
- return ProductAttributeService._create_error_response(
- f"Invalid JSON: {str(e)}", mandatory_attrs, extract_additional, result_text
- )
- except Exception as e:
- return ProductAttributeService._create_error_response(
- str(e), mandatory_attrs, extract_additional
- )
- @staticmethod
- def extract_attributes_batch(
- products: List[Dict],
- mandatory_attrs: Dict[str, List[str]],
- model: str = None,
- extract_additional: bool = True,
- process_image: bool = True,
- max_workers: int = 5,
- multiple: Optional[List[str]] = None,
- threshold_abs: float = 0.65,
- margin: float = 0.15,
- use_dynamic_thresholds: bool = True,
- use_adaptive_margin: bool = True,
- use_semantic_clustering: bool = True
- ) -> Dict:
- """Extract attributes for multiple products in parallel with enhanced multi-value selection and source tracking."""
- results = []
- successful = 0
- failed = 0
-
- ocr_service = OCRService()
- if multiple is None:
- multiple = []
- def process_product(product_data):
- """Process a single product."""
- product_id = product_data.get('product_id', f"product_{len(results)}")
-
- try:
- # Process image if URL is provided
- ocr_results = None
- ocr_text = None
-
- if process_image and product_data.get('image_url'):
- ocr_results = ocr_service.process_image(product_data['image_url'])
-
- # Extract attributes from OCR
- if ocr_results and ocr_results.get('detected_text'):
- ocr_attrs = ProductAttributeService.extract_attributes_from_ocr(
- ocr_results, model
- )
- ocr_results['extracted_attributes'] = ocr_attrs
-
- # Format OCR text for combining with product text
- ocr_text = "\n".join([
- f"{item['text']} (confidence: {item['confidence']:.2f})"
- for item in ocr_results['detected_text']
- ])
-
- # Combine all product information with source tracking
- product_text, source_map = ProductAttributeService.combine_product_text(
- title=product_data.get('title'),
- short_desc=product_data.get('short_desc'),
- long_desc=product_data.get('long_desc'),
- ocr_text=ocr_text
- )
-
- # Extract attributes from combined text with enhanced features
- result = ProductAttributeService.extract_attributes(
- product_text=product_text,
- mandatory_attrs=mandatory_attrs,
- source_map=source_map,
- model=model,
- extract_additional=extract_additional,
- multiple=multiple,
- threshold_abs=threshold_abs,
- margin=margin,
- use_dynamic_thresholds=use_dynamic_thresholds,
- use_adaptive_margin=use_adaptive_margin,
- use_semantic_clustering=use_semantic_clustering
- )
-
- result['product_id'] = product_id
-
- # Add OCR results if available (already in correct format)
- if ocr_results:
- result['ocr_results'] = ocr_results
-
- # Check if extraction was successful
- if 'error' not in result:
- return result, True
- else:
- return result, False
-
- except Exception as e:
- return {
- 'product_id': product_id,
- 'mandatory': {attr: [{"value": "Not Specified", "source": "error"}] for attr in mandatory_attrs.keys()},
- 'additional': {} if extract_additional else None,
- 'error': f"Processing error: {str(e)}"
- }, False
- # Process products in parallel
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- future_to_product = {
- executor.submit(process_product, product): product
- for product in products
- }
-
- for future in as_completed(future_to_product):
- try:
- result, success = future.result()
- results.append(result)
- if success:
- successful += 1
- else:
- failed += 1
- except Exception as e:
- failed += 1
- results.append({
- 'product_id': 'unknown',
- 'mandatory': {attr: [{"value": "Not Specified", "source": "error"}] for attr in mandatory_attrs.keys()},
- 'additional': {} if extract_additional else None,
- 'error': f"Unexpected error: {str(e)}"
- })
- return {
- 'results': results,
- 'total_products': len(products),
- 'successful': successful,
- 'failed': failed
- }
- @staticmethod
- def _clean_json_response(text: str) -> str:
- """Clean LLM response to extract valid JSON."""
- start_idx = text.find('{')
- end_idx = text.rfind('}')
- if start_idx != -1 and end_idx != -1:
- text = text[start_idx:end_idx + 1]
- if "```json" in text:
- text = text.split("```json")[1].split("```")[0].strip()
- elif "```" in text:
- text = text.split("```")[1].split("```")[0].strip()
- if text.startswith("json"):
- text = text[4:].strip()
- return text
- @staticmethod
- def _validate_response_structure(
- parsed: dict,
- mandatory_attrs: Dict[str, List[str]],
- extract_additional: bool,
- source_map: Dict[str, str] = None
- ) -> dict:
- """Validate and fix the response structure, ensuring array format with source tracking."""
- if source_map is None:
- source_map = {}
-
- expected_sections = ["mandatory"]
- if extract_additional:
- expected_sections.append("additional")
- if not all(section in parsed for section in expected_sections):
- if isinstance(parsed, dict):
- mandatory_keys = set(mandatory_attrs.keys())
- mandatory = {k: v for k, v in parsed.items() if k in mandatory_keys}
- additional = {k: v for k, v in parsed.items() if k not in mandatory_keys}
- result = {"mandatory": mandatory}
- if extract_additional:
- result["additional"] = additional
- parsed = result
- else:
- return ProductAttributeService._create_error_response(
- "Invalid response structure",
- mandatory_attrs,
- extract_additional,
- str(parsed)
- )
- # Convert mandatory attributes to array format with source tracking
- if "mandatory" in parsed:
- converted_mandatory = {}
- for attr, value in parsed["mandatory"].items():
- if isinstance(value, list):
- # Already in array format, ensure each item has source
- converted_mandatory[attr] = []
- for item in value:
- if isinstance(item, dict) and "value" in item:
- # Already has proper structure
- if "source" not in item:
- item["source"] = ProductAttributeService.find_value_source(
- item["value"], source_map, attr
- )
- converted_mandatory[attr].append(item)
- else:
- # Convert string to proper format
- source = ProductAttributeService.find_value_source(str(item), source_map, attr)
- converted_mandatory[attr].append({"value": str(item), "source": source})
- else:
- # Single value - convert to array format
- source = ProductAttributeService.find_value_source(str(value), source_map, attr)
- converted_mandatory[attr] = [{"value": str(value), "source": source}]
-
- parsed["mandatory"] = converted_mandatory
- return parsed
- @staticmethod
- def _create_error_response(
- error: str,
- mandatory_attrs: Dict[str, List[str]],
- extract_additional: bool,
- raw_output: Optional[str] = None
- ) -> dict:
- """Create a standardized error response in array format."""
- response = {
- "mandatory": {attr: [{"value": "Not Specified", "source": "error"}] for attr in mandatory_attrs.keys()},
- "error": error
- }
- if extract_additional:
- response["additional"] = {}
- if raw_output:
- response["raw_output"] = raw_output
- return response
|