|
@@ -0,0 +1,942 @@
|
|
|
|
|
+# # import requests
|
|
|
|
|
+# # import json
|
|
|
|
|
+# # from typing import Dict, List, Optional
|
|
|
|
|
+# # from django.conf import settings
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+# # class ProductAttributeService:
|
|
|
|
|
+# # """Service class for extracting product attributes using Groq LLM."""
|
|
|
|
|
+
|
|
|
|
|
+# # @staticmethod
|
|
|
|
|
+# # def combine_product_text(
|
|
|
|
|
+# # title: Optional[str] = None,
|
|
|
|
|
+# # short_desc: Optional[str] = None,
|
|
|
|
|
+# # long_desc: Optional[str] = None
|
|
|
|
|
+# # ) -> str:
|
|
|
|
|
+# # """Combine product metadata into a single text block."""
|
|
|
|
|
+# # parts = []
|
|
|
|
|
+# # if title:
|
|
|
|
|
+# # parts.append(str(title).strip())
|
|
|
|
|
+# # if short_desc:
|
|
|
|
|
+# # parts.append(str(short_desc).strip())
|
|
|
|
|
+# # if long_desc:
|
|
|
|
|
+# # parts.append(str(long_desc).strip())
|
|
|
|
|
+
|
|
|
|
|
+# # combined = " ".join(parts).strip()
|
|
|
|
|
+
|
|
|
|
|
+# # if not combined:
|
|
|
|
|
+# # return "No product information available"
|
|
|
|
|
+
|
|
|
|
|
+# # return combined
|
|
|
|
|
+
|
|
|
|
|
+# # @staticmethod
|
|
|
|
|
+# # def extract_attributes(
|
|
|
|
|
+# # product_text: str,
|
|
|
|
|
+# # mandatory_attrs: Dict[str, List[str]],
|
|
|
|
|
+# # model: str = None,
|
|
|
|
|
+# # extract_additional: bool = True
|
|
|
|
|
+# # ) -> dict:
|
|
|
|
|
+# # """Use Groq LLM to extract attributes from any product type."""
|
|
|
|
|
+
|
|
|
|
|
+# # if model is None:
|
|
|
|
|
+# # model = settings.SUPPORTED_MODELS[0]
|
|
|
|
|
+
|
|
|
|
|
+# # # Check if product text is empty or minimal
|
|
|
|
|
+# # if not product_text or product_text == "No product information available":
|
|
|
|
|
+# # return ProductAttributeService._create_error_response(
|
|
|
|
|
+# # "No product information provided",
|
|
|
|
|
+# # mandatory_attrs,
|
|
|
|
|
+# # extract_additional
|
|
|
|
|
+# # )
|
|
|
|
|
+
|
|
|
|
|
+# # # Create structured prompt for mandatory attributes
|
|
|
|
|
+# # mandatory_attr_list = []
|
|
|
|
|
+# # for attr_name, allowed_values in mandatory_attrs.items():
|
|
|
|
|
+# # mandatory_attr_list.append(f"{attr_name}: {', '.join(allowed_values)}")
|
|
|
|
|
+# # mandatory_attr_text = "\n".join(mandatory_attr_list)
|
|
|
|
|
+
|
|
|
|
|
+# # additional_instruction = ""
|
|
|
|
|
+# # if extract_additional:
|
|
|
|
|
+# # additional_instruction = """
|
|
|
|
|
+# # 2. Extract ADDITIONAL attributes: Identify any other relevant attributes from the product text
|
|
|
|
|
+# # (such as Material, Size, Color, Brand, Dimensions, Weight, Features, Specifications, etc.)
|
|
|
|
|
+# # and their values. Extract attributes that are specific and relevant to this product type."""
|
|
|
|
|
+
|
|
|
|
|
+# # output_format = {
|
|
|
|
|
+# # "mandatory": {attr: "value" for attr in mandatory_attrs.keys()},
|
|
|
|
|
+# # "additional": {} if extract_additional else None
|
|
|
|
|
+# # }
|
|
|
|
|
+
|
|
|
|
|
+# # if not extract_additional:
|
|
|
|
|
+# # output_format.pop("additional")
|
|
|
|
|
+
|
|
|
|
|
+# # prompt = f"""
|
|
|
|
|
+# # You are an intelligent product attribute extractor that works with ANY product type.
|
|
|
|
|
+
|
|
|
|
|
+# # TASK:
|
|
|
|
|
+# # 1. Extract MANDATORY attributes: For each mandatory attribute, select the most appropriate value
|
|
|
|
|
+# # from the provided list. Choose the value that best matches the product description.
|
|
|
|
|
+# # {additional_instruction}
|
|
|
|
|
+
|
|
|
|
|
+# # Product Text:
|
|
|
|
|
+# # {product_text}
|
|
|
|
|
+
|
|
|
|
|
+# # Mandatory Attribute Lists (MUST select one value for each):
|
|
|
|
|
+# # {mandatory_attr_text}
|
|
|
|
|
+
|
|
|
|
|
+# # CRITICAL INSTRUCTIONS:
|
|
|
|
|
+# # - Return ONLY valid JSON, nothing else
|
|
|
|
|
+# # - No explanations, no markdown, no text before or after the JSON
|
|
|
|
|
+# # - For mandatory attributes, choose EXACTLY ONE value from the provided list that best matches
|
|
|
|
|
+# # - If a mandatory attribute cannot be determined from the product text, use "Not Specified"
|
|
|
|
|
+# # - Work with whatever information is available - the product text may be incomplete (only title, or only description, etc.)
|
|
|
|
|
+# # {f"- For additional attributes, extract any relevant information found in the product text" if extract_additional else ""}
|
|
|
|
|
+# # - Be precise and only extract information that is explicitly stated or clearly implied
|
|
|
|
|
+
|
|
|
|
|
+# # Required Output Format (ONLY THIS, NO OTHER TEXT):
|
|
|
|
|
+# # {json.dumps(output_format, indent=2)}
|
|
|
|
|
+# # """
|
|
|
|
|
+
|
|
|
|
|
+# # payload = {
|
|
|
|
|
+# # "model": model,
|
|
|
|
|
+# # "messages": [
|
|
|
|
|
+# # {
|
|
|
|
|
+# # "role": "system",
|
|
|
|
|
+# # "content": f"You are a precise attribute extraction model. Return ONLY valid JSON with {'mandatory and additional' if extract_additional else 'mandatory'} sections. No explanations, no markdown, no other text."
|
|
|
|
|
+# # },
|
|
|
|
|
+# # {"role": "user", "content": prompt}
|
|
|
|
|
+# # ],
|
|
|
|
|
+# # "temperature": 0.0,
|
|
|
|
|
+# # "max_tokens": 1500
|
|
|
|
|
+# # }
|
|
|
|
|
+
|
|
|
|
|
+# # headers = {
|
|
|
|
|
+# # "Authorization": f"Bearer {settings.GROQ_API_KEY}",
|
|
|
|
|
+# # "Content-Type": "application/json",
|
|
|
|
|
+# # }
|
|
|
|
|
+
|
|
|
|
|
+# # try:
|
|
|
|
|
+# # response = requests.post(
|
|
|
|
|
+# # settings.GROQ_API_URL,
|
|
|
|
|
+# # headers=headers,
|
|
|
|
|
+# # json=payload,
|
|
|
|
|
+# # timeout=30
|
|
|
|
|
+# # )
|
|
|
|
|
+# # response.raise_for_status()
|
|
|
|
|
+# # result_text = response.json()["choices"][0]["message"]["content"].strip()
|
|
|
|
|
+
|
|
|
|
|
+# # # Clean the response
|
|
|
|
|
+# # result_text = ProductAttributeService._clean_json_response(result_text)
|
|
|
|
|
+
|
|
|
|
|
+# # # Parse JSON
|
|
|
|
|
+# # parsed = json.loads(result_text)
|
|
|
|
|
+
|
|
|
|
|
+# # # Validate and restructure if needed
|
|
|
|
|
+# # parsed = ProductAttributeService._validate_response_structure(
|
|
|
|
|
+# # parsed, mandatory_attrs, extract_additional
|
|
|
|
|
+# # )
|
|
|
|
|
+
|
|
|
|
|
+# # return parsed
|
|
|
|
|
+
|
|
|
|
|
+# # except requests.exceptions.RequestException as e:
|
|
|
|
|
+# # return ProductAttributeService._create_error_response(
|
|
|
|
|
+# # str(e), mandatory_attrs, extract_additional
|
|
|
|
|
+# # )
|
|
|
|
|
+# # except json.JSONDecodeError as e:
|
|
|
|
|
+# # return ProductAttributeService._create_error_response(
|
|
|
|
|
+# # f"Invalid JSON: {str(e)}", mandatory_attrs, extract_additional, result_text
|
|
|
|
|
+# # )
|
|
|
|
|
+# # except Exception as e:
|
|
|
|
|
+# # return ProductAttributeService._create_error_response(
|
|
|
|
|
+# # str(e), mandatory_attrs, extract_additional
|
|
|
|
|
+# # )
|
|
|
|
|
+
|
|
|
|
|
+# # @staticmethod
|
|
|
|
|
+# # def _clean_json_response(text: str) -> str:
|
|
|
|
|
+# # """Clean LLM response to extract valid JSON."""
|
|
|
|
|
+# # start_idx = text.find('{')
|
|
|
|
|
+# # end_idx = text.rfind('}')
|
|
|
|
|
+
|
|
|
|
|
+# # if start_idx != -1 and end_idx != -1:
|
|
|
|
|
+# # text = text[start_idx:end_idx + 1]
|
|
|
|
|
+
|
|
|
|
|
+# # if "```json" in text:
|
|
|
|
|
+# # text = text.split("```json")[1].split("```")[0].strip()
|
|
|
|
|
+# # elif "```" in text:
|
|
|
|
|
+# # text = text.split("```")[1].split("```")[0].strip()
|
|
|
|
|
+# # if text.startswith("json"):
|
|
|
|
|
+# # text = text[4:].strip()
|
|
|
|
|
+
|
|
|
|
|
+# # return text
|
|
|
|
|
+
|
|
|
|
|
+# # @staticmethod
|
|
|
|
|
+# # def _validate_response_structure(
|
|
|
|
|
+# # parsed: dict,
|
|
|
|
|
+# # mandatory_attrs: Dict[str, List[str]],
|
|
|
|
|
+# # extract_additional: bool
|
|
|
|
|
+# # ) -> dict:
|
|
|
|
|
+# # """Validate and fix the response structure."""
|
|
|
|
|
+# # expected_sections = ["mandatory"]
|
|
|
|
|
+# # if extract_additional:
|
|
|
|
|
+# # expected_sections.append("additional")
|
|
|
|
|
+
|
|
|
|
|
+# # if not all(section in parsed for section in expected_sections):
|
|
|
|
|
+# # if isinstance(parsed, dict):
|
|
|
|
|
+# # mandatory_keys = set(mandatory_attrs.keys())
|
|
|
|
|
+# # mandatory = {k: v for k, v in parsed.items() if k in mandatory_keys}
|
|
|
|
|
+# # additional = {k: v for k, v in parsed.items() if k not in mandatory_keys}
|
|
|
|
|
+
|
|
|
|
|
+# # result = {"mandatory": mandatory}
|
|
|
|
|
+# # if extract_additional:
|
|
|
|
|
+# # result["additional"] = additional
|
|
|
|
|
+# # return result
|
|
|
|
|
+# # else:
|
|
|
|
|
+# # return ProductAttributeService._create_error_response(
|
|
|
|
|
+# # "Invalid response structure",
|
|
|
|
|
+# # mandatory_attrs,
|
|
|
|
|
+# # extract_additional,
|
|
|
|
|
+# # str(parsed)
|
|
|
|
|
+# # )
|
|
|
|
|
+
|
|
|
|
|
+# # return parsed
|
|
|
|
|
+
|
|
|
|
|
+# # @staticmethod
|
|
|
|
|
+# # def _create_error_response(
|
|
|
|
|
+# # error: str,
|
|
|
|
|
+# # mandatory_attrs: Dict[str, List[str]],
|
|
|
|
|
+# # extract_additional: bool,
|
|
|
|
|
+# # raw_output: Optional[str] = None
|
|
|
|
|
+# # ) -> dict:
|
|
|
|
|
+# # """Create a standardized error response."""
|
|
|
|
|
+# # response = {
|
|
|
|
|
+# # "mandatory": {attr: "Not Specified" for attr in mandatory_attrs.keys()},
|
|
|
|
|
+# # "error": error
|
|
|
|
|
+# # }
|
|
|
|
|
+# # if extract_additional:
|
|
|
|
|
+# # response["additional"] = {}
|
|
|
|
|
+# # if raw_output:
|
|
|
|
|
+# # response["raw_output"] = raw_output
|
|
|
|
|
+# # return response
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+# import requests
|
|
|
|
|
+# import json
|
|
|
|
|
+# from typing import Dict, List, Optional
|
|
|
|
|
+# from django.conf import settings
|
|
|
|
|
+# from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+# class ProductAttributeService:
|
|
|
|
|
+# """Service class for extracting product attributes using Groq LLM."""
|
|
|
|
|
+
|
|
|
|
|
+# @staticmethod
|
|
|
|
|
+# def combine_product_text(
|
|
|
|
|
+# title: Optional[str] = None,
|
|
|
|
|
+# short_desc: Optional[str] = None,
|
|
|
|
|
+# long_desc: Optional[str] = None
|
|
|
|
|
+# ) -> str:
|
|
|
|
|
+# """Combine product metadata into a single text block."""
|
|
|
|
|
+# parts = []
|
|
|
|
|
+# if title:
|
|
|
|
|
+# parts.append(str(title).strip())
|
|
|
|
|
+# if short_desc:
|
|
|
|
|
+# parts.append(str(short_desc).strip())
|
|
|
|
|
+# if long_desc:
|
|
|
|
|
+# parts.append(str(long_desc).strip())
|
|
|
|
|
+
|
|
|
|
|
+# combined = " ".join(parts).strip()
|
|
|
|
|
+
|
|
|
|
|
+# if not combined:
|
|
|
|
|
+# return "No product information available"
|
|
|
|
|
+
|
|
|
|
|
+# return combined
|
|
|
|
|
+
|
|
|
|
|
+# @staticmethod
|
|
|
|
|
+# def extract_attributes(
|
|
|
|
|
+# product_text: str,
|
|
|
|
|
+# mandatory_attrs: Dict[str, List[str]],
|
|
|
|
|
+# model: str = None,
|
|
|
|
|
+# extract_additional: bool = True
|
|
|
|
|
+# ) -> dict:
|
|
|
|
|
+# """Use Groq LLM to extract attributes from any product type."""
|
|
|
|
|
+
|
|
|
|
|
+# if model is None:
|
|
|
|
|
+# model = settings.SUPPORTED_MODELS[0]
|
|
|
|
|
+
|
|
|
|
|
+# # Check if product text is empty or minimal
|
|
|
|
|
+# if not product_text or product_text == "No product information available":
|
|
|
|
|
+# return ProductAttributeService._create_error_response(
|
|
|
|
|
+# "No product information provided",
|
|
|
|
|
+# mandatory_attrs,
|
|
|
|
|
+# extract_additional
|
|
|
|
|
+# )
|
|
|
|
|
+
|
|
|
|
|
+# # Create structured prompt for mandatory attributes
|
|
|
|
|
+# mandatory_attr_list = []
|
|
|
|
|
+# for attr_name, allowed_values in mandatory_attrs.items():
|
|
|
|
|
+# mandatory_attr_list.append(f"{attr_name}: {', '.join(allowed_values)}")
|
|
|
|
|
+# mandatory_attr_text = "\n".join(mandatory_attr_list)
|
|
|
|
|
+
|
|
|
|
|
+# additional_instruction = ""
|
|
|
|
|
+# if extract_additional:
|
|
|
|
|
+# additional_instruction = """
|
|
|
|
|
+# 2. Extract ADDITIONAL attributes: Identify any other relevant attributes from the product text
|
|
|
|
|
+# (such as Material, Size, Color, Brand, Dimensions, Weight, Features, Specifications, etc.)
|
|
|
|
|
+# and their values. Extract attributes that are specific and relevant to this product type."""
|
|
|
|
|
+
|
|
|
|
|
+# output_format = {
|
|
|
|
|
+# "mandatory": {attr: "value" for attr in mandatory_attrs.keys()},
|
|
|
|
|
+# "additional": {} if extract_additional else None
|
|
|
|
|
+# }
|
|
|
|
|
+
|
|
|
|
|
+# if not extract_additional:
|
|
|
|
|
+# output_format.pop("additional")
|
|
|
|
|
+
|
|
|
|
|
+# prompt = f"""
|
|
|
|
|
+# You are an intelligent product attribute extractor that works with ANY product type.
|
|
|
|
|
+
|
|
|
|
|
+# TASK:
|
|
|
|
|
+# 1. Extract MANDATORY attributes: For each mandatory attribute, select the most appropriate value
|
|
|
|
|
+# from the provided list. Choose the value that best matches the product description.
|
|
|
|
|
+# {additional_instruction}
|
|
|
|
|
+
|
|
|
|
|
+# Product Text:
|
|
|
|
|
+# {product_text}
|
|
|
|
|
+
|
|
|
|
|
+# Mandatory Attribute Lists (MUST select one value for each):
|
|
|
|
|
+# {mandatory_attr_text}
|
|
|
|
|
+
|
|
|
|
|
+# CRITICAL INSTRUCTIONS:
|
|
|
|
|
+# - Return ONLY valid JSON, nothing else
|
|
|
|
|
+# - No explanations, no markdown, no text before or after the JSON
|
|
|
|
|
+# - For mandatory attributes, choose EXACTLY ONE value from the provided list that best matches
|
|
|
|
|
+# - If a mandatory attribute cannot be determined from the product text, use "Not Specified"
|
|
|
|
|
+# - Work with whatever information is available - the product text may be incomplete (only title, or only description, etc.)
|
|
|
|
|
+# {f"- For additional attributes, extract any relevant information found in the product text" if extract_additional else ""}
|
|
|
|
|
+# - Be precise and only extract information that is explicitly stated or clearly implied
|
|
|
|
|
+
|
|
|
|
|
+# Required Output Format (ONLY THIS, NO OTHER TEXT):
|
|
|
|
|
+# {json.dumps(output_format, indent=2)}
|
|
|
|
|
+# """
|
|
|
|
|
+
|
|
|
|
|
+# payload = {
|
|
|
|
|
+# "model": model,
|
|
|
|
|
+# "messages": [
|
|
|
|
|
+# {
|
|
|
|
|
+# "role": "system",
|
|
|
|
|
+# "content": f"You are a precise attribute extraction model. Return ONLY valid JSON with {'mandatory and additional' if extract_additional else 'mandatory'} sections. No explanations, no markdown, no other text."
|
|
|
|
|
+# },
|
|
|
|
|
+# {"role": "user", "content": prompt}
|
|
|
|
|
+# ],
|
|
|
|
|
+# "temperature": 0.0,
|
|
|
|
|
+# "max_tokens": 1500
|
|
|
|
|
+# }
|
|
|
|
|
+
|
|
|
|
|
+# headers = {
|
|
|
|
|
+# "Authorization": f"Bearer {settings.GROQ_API_KEY}",
|
|
|
|
|
+# "Content-Type": "application/json",
|
|
|
|
|
+# }
|
|
|
|
|
+
|
|
|
|
|
+# try:
|
|
|
|
|
+# response = requests.post(
|
|
|
|
|
+# settings.GROQ_API_URL,
|
|
|
|
|
+# headers=headers,
|
|
|
|
|
+# json=payload,
|
|
|
|
|
+# timeout=30
|
|
|
|
|
+# )
|
|
|
|
|
+# response.raise_for_status()
|
|
|
|
|
+# result_text = response.json()["choices"][0]["message"]["content"].strip()
|
|
|
|
|
+
|
|
|
|
|
+# # Clean the response
|
|
|
|
|
+# result_text = ProductAttributeService._clean_json_response(result_text)
|
|
|
|
|
+
|
|
|
|
|
+# # Parse JSON
|
|
|
|
|
+# parsed = json.loads(result_text)
|
|
|
|
|
+
|
|
|
|
|
+# # Validate and restructure if needed
|
|
|
|
|
+# parsed = ProductAttributeService._validate_response_structure(
|
|
|
|
|
+# parsed, mandatory_attrs, extract_additional
|
|
|
|
|
+# )
|
|
|
|
|
+
|
|
|
|
|
+# return parsed
|
|
|
|
|
+
|
|
|
|
|
+# except requests.exceptions.RequestException as e:
|
|
|
|
|
+# return ProductAttributeService._create_error_response(
|
|
|
|
|
+# str(e), mandatory_attrs, extract_additional
|
|
|
|
|
+# )
|
|
|
|
|
+# except json.JSONDecodeError as e:
|
|
|
|
|
+# return ProductAttributeService._create_error_response(
|
|
|
|
|
+# f"Invalid JSON: {str(e)}", mandatory_attrs, extract_additional, result_text
|
|
|
|
|
+# )
|
|
|
|
|
+# except Exception as e:
|
|
|
|
|
+# return ProductAttributeService._create_error_response(
|
|
|
|
|
+# str(e), mandatory_attrs, extract_additional
|
|
|
|
|
+# )
|
|
|
|
|
+
|
|
|
|
|
+# @staticmethod
|
|
|
|
|
+# def extract_attributes_batch(
|
|
|
|
|
+# products: List[Dict],
|
|
|
|
|
+# mandatory_attrs: Dict[str, List[str]],
|
|
|
|
|
+# model: str = None,
|
|
|
|
|
+# extract_additional: bool = True,
|
|
|
|
|
+# max_workers: int = 5
|
|
|
|
|
+# ) -> Dict:
|
|
|
|
|
+# """
|
|
|
|
|
+# Extract attributes for multiple products in parallel.
|
|
|
|
|
+
|
|
|
|
|
+# Args:
|
|
|
|
|
+# products: List of product dictionaries with keys: product_id, title, short_desc, long_desc
|
|
|
|
|
+# mandatory_attrs: Dictionary of mandatory attributes
|
|
|
|
|
+# model: Groq model to use
|
|
|
|
|
+# extract_additional: Whether to extract additional attributes
|
|
|
|
|
+# max_workers: Maximum number of parallel workers
|
|
|
|
|
+
|
|
|
|
|
+# Returns:
|
|
|
|
|
+# Dictionary with results, total_products, successful, and failed counts
|
|
|
|
|
+# """
|
|
|
|
|
+# results = []
|
|
|
|
|
+# successful = 0
|
|
|
|
|
+# failed = 0
|
|
|
|
|
+
|
|
|
|
|
+# def process_product(product_data):
|
|
|
|
|
+# """Process a single product."""
|
|
|
|
|
+# product_id = product_data.get('product_id', f"product_{len(results)}")
|
|
|
|
|
+
|
|
|
|
|
+# try:
|
|
|
|
|
+# product_text = ProductAttributeService.combine_product_text(
|
|
|
|
|
+# title=product_data.get('title'),
|
|
|
|
|
+# short_desc=product_data.get('short_desc'),
|
|
|
|
|
+# long_desc=product_data.get('long_desc')
|
|
|
|
|
+# )
|
|
|
|
|
+
|
|
|
|
|
+# result = ProductAttributeService.extract_attributes(
|
|
|
|
|
+# product_text=product_text,
|
|
|
|
|
+# mandatory_attrs=mandatory_attrs,
|
|
|
|
|
+# model=model,
|
|
|
|
|
+# extract_additional=extract_additional
|
|
|
|
|
+# )
|
|
|
|
|
+
|
|
|
|
|
+# result['product_id'] = product_id
|
|
|
|
|
+
|
|
|
|
|
+# # Check if extraction was successful
|
|
|
|
|
+# if 'error' not in result:
|
|
|
|
|
+# return result, True
|
|
|
|
|
+# else:
|
|
|
|
|
+# return result, False
|
|
|
|
|
+
|
|
|
|
|
+# except Exception as e:
|
|
|
|
|
+# return {
|
|
|
|
|
+# 'product_id': product_id,
|
|
|
|
|
+# 'mandatory': {attr: "Not Specified" for attr in mandatory_attrs.keys()},
|
|
|
|
|
+# 'additional': {} if extract_additional else None,
|
|
|
|
|
+# 'error': f"Processing error: {str(e)}"
|
|
|
|
|
+# }, False
|
|
|
|
|
+
|
|
|
|
|
+# # Process products in parallel
|
|
|
|
|
+# with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
|
|
|
+# future_to_product = {
|
|
|
|
|
+# executor.submit(process_product, product): product
|
|
|
|
|
+# for product in products
|
|
|
|
|
+# }
|
|
|
|
|
+
|
|
|
|
|
+# for future in as_completed(future_to_product):
|
|
|
|
|
+# try:
|
|
|
|
|
+# result, success = future.result()
|
|
|
|
|
+# results.append(result)
|
|
|
|
|
+# if success:
|
|
|
|
|
+# successful += 1
|
|
|
|
|
+# else:
|
|
|
|
|
+# failed += 1
|
|
|
|
|
+# except Exception as e:
|
|
|
|
|
+# failed += 1
|
|
|
|
|
+# results.append({
|
|
|
|
|
+# 'product_id': 'unknown',
|
|
|
|
|
+# 'mandatory': {attr: "Not Specified" for attr in mandatory_attrs.keys()},
|
|
|
|
|
+# 'additional': {} if extract_additional else None,
|
|
|
|
|
+# 'error': f"Unexpected error: {str(e)}"
|
|
|
|
|
+# })
|
|
|
|
|
+
|
|
|
|
|
+# return {
|
|
|
|
|
+# 'results': results,
|
|
|
|
|
+# 'total_products': len(products),
|
|
|
|
|
+# 'successful': successful,
|
|
|
|
|
+# 'failed': failed
|
|
|
|
|
+# }
|
|
|
|
|
+
|
|
|
|
|
+# @staticmethod
|
|
|
|
|
+# def _clean_json_response(text: str) -> str:
|
|
|
|
|
+# """Clean LLM response to extract valid JSON."""
|
|
|
|
|
+# start_idx = text.find('{')
|
|
|
|
|
+# end_idx = text.rfind('}')
|
|
|
|
|
+
|
|
|
|
|
+# if start_idx != -1 and end_idx != -1:
|
|
|
|
|
+# text = text[start_idx:end_idx + 1]
|
|
|
|
|
+
|
|
|
|
|
+# if "```json" in text:
|
|
|
|
|
+# text = text.split("```json")[1].split("```")[0].strip()
|
|
|
|
|
+# elif "```" in text:
|
|
|
|
|
+# text = text.split("```")[1].split("```")[0].strip()
|
|
|
|
|
+# if text.startswith("json"):
|
|
|
|
|
+# text = text[4:].strip()
|
|
|
|
|
+
|
|
|
|
|
+# return text
|
|
|
|
|
+
|
|
|
|
|
+# @staticmethod
|
|
|
|
|
+# def _validate_response_structure(
|
|
|
|
|
+# parsed: dict,
|
|
|
|
|
+# mandatory_attrs: Dict[str, List[str]],
|
|
|
|
|
+# extract_additional: bool
|
|
|
|
|
+# ) -> dict:
|
|
|
|
|
+# """Validate and fix the response structure."""
|
|
|
|
|
+# expected_sections = ["mandatory"]
|
|
|
|
|
+# if extract_additional:
|
|
|
|
|
+# expected_sections.append("additional")
|
|
|
|
|
+
|
|
|
|
|
+# if not all(section in parsed for section in expected_sections):
|
|
|
|
|
+# if isinstance(parsed, dict):
|
|
|
|
|
+# mandatory_keys = set(mandatory_attrs.keys())
|
|
|
|
|
+# mandatory = {k: v for k, v in parsed.items() if k in mandatory_keys}
|
|
|
|
|
+# additional = {k: v for k, v in parsed.items() if k not in mandatory_keys}
|
|
|
|
|
+
|
|
|
|
|
+# result = {"mandatory": mandatory}
|
|
|
|
|
+# if extract_additional:
|
|
|
|
|
+# result["additional"] = additional
|
|
|
|
|
+# return result
|
|
|
|
|
+# else:
|
|
|
|
|
+# return ProductAttributeService._create_error_response(
|
|
|
|
|
+# "Invalid response structure",
|
|
|
|
|
+# mandatory_attrs,
|
|
|
|
|
+# extract_additional,
|
|
|
|
|
+# str(parsed)
|
|
|
|
|
+# )
|
|
|
|
|
+
|
|
|
|
|
+# return parsed
|
|
|
|
|
+
|
|
|
|
|
+# @staticmethod
|
|
|
|
|
+# def _create_error_response(
|
|
|
|
|
+# error: str,
|
|
|
|
|
+# mandatory_attrs: Dict[str, List[str]],
|
|
|
|
|
+# extract_additional: bool,
|
|
|
|
|
+# raw_output: Optional[str] = None
|
|
|
|
|
+# ) -> dict:
|
|
|
|
|
+# """Create a standardized error response."""
|
|
|
|
|
+# response = {
|
|
|
|
|
+# "mandatory": {attr: "Not Specified" for attr in mandatory_attrs.keys()},
|
|
|
|
|
+# "error": error
|
|
|
|
|
+# }
|
|
|
|
|
+# if extract_additional:
|
|
|
|
|
+# response["additional"] = {}
|
|
|
|
|
+# if raw_output:
|
|
|
|
|
+# response["raw_output"] = raw_output
|
|
|
|
|
+# return response
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+# ==================== services.py ====================
|
|
|
|
|
+import requests
|
|
|
|
|
+import json
|
|
|
|
|
+from typing import Dict, List, Optional
|
|
|
|
|
+from django.conf import settings
|
|
|
|
|
+from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
|
+from .ocr_service import OCRService
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class ProductAttributeService:
|
|
|
|
|
+ """Service class for extracting product attributes using Groq LLM."""
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def combine_product_text(
|
|
|
|
|
+ title: Optional[str] = None,
|
|
|
|
|
+ short_desc: Optional[str] = None,
|
|
|
|
|
+ long_desc: Optional[str] = None,
|
|
|
|
|
+ ocr_text: Optional[str] = None
|
|
|
|
|
+ ) -> str:
|
|
|
|
|
+ """Combine product metadata into a single text block."""
|
|
|
|
|
+ parts = []
|
|
|
|
|
+ if title:
|
|
|
|
|
+ parts.append(f"Title: {str(title).strip()}")
|
|
|
|
|
+ if short_desc:
|
|
|
|
|
+ parts.append(f"Description: {str(short_desc).strip()}")
|
|
|
|
|
+ if long_desc:
|
|
|
|
|
+ parts.append(f"Details: {str(long_desc).strip()}")
|
|
|
|
|
+ if ocr_text:
|
|
|
|
|
+ parts.append(f"OCR Text: {ocr_text}")
|
|
|
|
|
+
|
|
|
|
|
+ combined = "\n".join(parts).strip()
|
|
|
|
|
+
|
|
|
|
|
+ if not combined:
|
|
|
|
|
+ return "No product information available"
|
|
|
|
|
+
|
|
|
|
|
+ return combined
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def extract_attributes_from_ocr(ocr_results: Dict, model: str = None) -> Dict:
|
|
|
|
|
+ """Extract structured attributes from OCR text using LLM."""
|
|
|
|
|
+ if model is None:
|
|
|
|
|
+ model = settings.SUPPORTED_MODELS[0]
|
|
|
|
|
+
|
|
|
|
|
+ detected_text = ocr_results.get('detected_text', [])
|
|
|
|
|
+ if not detected_text:
|
|
|
|
|
+ return {}
|
|
|
|
|
+
|
|
|
|
|
+ # Format OCR text for prompt
|
|
|
|
|
+ ocr_text = "\n".join([f"Text: {item['text']}, Confidence: {item['confidence']:.2f}"
|
|
|
|
|
+ for item in detected_text])
|
|
|
|
|
+
|
|
|
|
|
+ prompt = f"""
|
|
|
|
|
+You are an AI model that extracts structured attributes from OCR text detected on product images.
|
|
|
|
|
+Given the OCR detections below, infer the possible product attributes and return them as a clean JSON object.
|
|
|
|
|
+
|
|
|
|
|
+OCR Text:
|
|
|
|
|
+{ocr_text}
|
|
|
|
|
+
|
|
|
|
|
+Extract relevant attributes like:
|
|
|
|
|
+- brand
|
|
|
|
|
+- model_number
|
|
|
|
|
+- size (waist_size, length, etc.)
|
|
|
|
|
+- collection
|
|
|
|
|
+- any other relevant product information
|
|
|
|
|
+
|
|
|
|
|
+Return a JSON object with only the attributes you can confidently identify.
|
|
|
|
|
+If an attribute is not present, do not include it in the response.
|
|
|
|
|
+"""
|
|
|
|
|
+
|
|
|
|
|
+ payload = {
|
|
|
|
|
+ "model": model,
|
|
|
|
|
+ "messages": [
|
|
|
|
|
+ {
|
|
|
|
|
+ "role": "system",
|
|
|
|
|
+ "content": "You are a helpful AI that extracts structured data from OCR output. Return only valid JSON."
|
|
|
|
|
+ },
|
|
|
|
|
+ {"role": "user", "content": prompt}
|
|
|
|
|
+ ],
|
|
|
|
|
+ "temperature": 0.2,
|
|
|
|
|
+ "max_tokens": 500
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ headers = {
|
|
|
|
|
+ "Authorization": f"Bearer {settings.GROQ_API_KEY}",
|
|
|
|
|
+ "Content-Type": "application/json",
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ response = requests.post(
|
|
|
|
|
+ settings.GROQ_API_URL,
|
|
|
|
|
+ headers=headers,
|
|
|
|
|
+ json=payload,
|
|
|
|
|
+ timeout=30
|
|
|
|
|
+ )
|
|
|
|
|
+ response.raise_for_status()
|
|
|
|
|
+ result_text = response.json()["choices"][0]["message"]["content"].strip()
|
|
|
|
|
+
|
|
|
|
|
+ # Clean and parse JSON
|
|
|
|
|
+ result_text = ProductAttributeService._clean_json_response(result_text)
|
|
|
|
|
+ parsed = json.loads(result_text)
|
|
|
|
|
+
|
|
|
|
|
+ return parsed
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ return {"error": f"Failed to extract attributes from OCR: {str(e)}"}
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def extract_attributes(
|
|
|
|
|
+ product_text: str,
|
|
|
|
|
+ mandatory_attrs: Dict[str, List[str]],
|
|
|
|
|
+ model: str = None,
|
|
|
|
|
+ extract_additional: bool = True
|
|
|
|
|
+ ) -> dict:
|
|
|
|
|
+ """Use Groq LLM to extract attributes from any product type."""
|
|
|
|
|
+
|
|
|
|
|
+ if model is None:
|
|
|
|
|
+ model = settings.SUPPORTED_MODELS[0]
|
|
|
|
|
+
|
|
|
|
|
+ # Check if product text is empty or minimal
|
|
|
|
|
+ if not product_text or product_text == "No product information available":
|
|
|
|
|
+ return ProductAttributeService._create_error_response(
|
|
|
|
|
+ "No product information provided",
|
|
|
|
|
+ mandatory_attrs,
|
|
|
|
|
+ extract_additional
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # Create structured prompt for mandatory attributes
|
|
|
|
|
+ mandatory_attr_list = []
|
|
|
|
|
+ for attr_name, allowed_values in mandatory_attrs.items():
|
|
|
|
|
+ mandatory_attr_list.append(f"{attr_name}: {', '.join(allowed_values)}")
|
|
|
|
|
+ mandatory_attr_text = "\n".join(mandatory_attr_list)
|
|
|
|
|
+
|
|
|
|
|
+ additional_instruction = ""
|
|
|
|
|
+ if extract_additional:
|
|
|
|
|
+ additional_instruction = """
|
|
|
|
|
+2. Extract ADDITIONAL attributes: Identify any other relevant attributes from the product text
|
|
|
|
|
+ (such as Material, Size, Color, Brand, Dimensions, Weight, Features, Specifications, etc.)
|
|
|
|
|
+ and their values. Extract attributes that are specific and relevant to this product type."""
|
|
|
|
|
+
|
|
|
|
|
+ output_format = {
|
|
|
|
|
+ "mandatory": {attr: "value" for attr in mandatory_attrs.keys()},
|
|
|
|
|
+ "additional": {} if extract_additional else None
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if not extract_additional:
|
|
|
|
|
+ output_format.pop("additional")
|
|
|
|
|
+
|
|
|
|
|
+ prompt = f"""
|
|
|
|
|
+You are an intelligent product attribute extractor that works with ANY product type.
|
|
|
|
|
+
|
|
|
|
|
+TASK:
|
|
|
|
|
+1. Extract MANDATORY attributes: For each mandatory attribute, select the most appropriate value
|
|
|
|
|
+ from the provided list. Choose the value that best matches the product description.
|
|
|
|
|
+{additional_instruction}
|
|
|
|
|
+
|
|
|
|
|
+Product Text:
|
|
|
|
|
+{product_text}
|
|
|
|
|
+
|
|
|
|
|
+Mandatory Attribute Lists (MUST select one value for each):
|
|
|
|
|
+{mandatory_attr_text}
|
|
|
|
|
+
|
|
|
|
|
+CRITICAL INSTRUCTIONS:
|
|
|
|
|
+- Return ONLY valid JSON, nothing else
|
|
|
|
|
+- No explanations, no markdown, no text before or after the JSON
|
|
|
|
|
+- For mandatory attributes, choose EXACTLY ONE value from the provided list that best matches
|
|
|
|
|
+- If a mandatory attribute cannot be determined from the product text, use "Not Specified"
|
|
|
|
|
+- Work with whatever information is available - the product text may be incomplete
|
|
|
|
|
+{f"- For additional attributes, extract any relevant information found in the product text" if extract_additional else ""}
|
|
|
|
|
+- Be precise and only extract information that is explicitly stated or clearly implied
|
|
|
|
|
+
|
|
|
|
|
+Required Output Format (ONLY THIS, NO OTHER TEXT):
|
|
|
|
|
+{json.dumps(output_format, indent=2)}
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ payload = {
|
|
|
|
|
+ "model": model,
|
|
|
|
|
+ "messages": [
|
|
|
|
|
+ {
|
|
|
|
|
+ "role": "system",
|
|
|
|
|
+ "content": f"You are a precise attribute extraction model. Return ONLY valid JSON with {'mandatory and additional' if extract_additional else 'mandatory'} sections. No explanations, no markdown, no other text."
|
|
|
|
|
+ },
|
|
|
|
|
+ {"role": "user", "content": prompt}
|
|
|
|
|
+ ],
|
|
|
|
|
+ "temperature": 0.0,
|
|
|
|
|
+ "max_tokens": 1500
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ headers = {
|
|
|
|
|
+ "Authorization": f"Bearer {settings.GROQ_API_KEY}",
|
|
|
|
|
+ "Content-Type": "application/json",
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ response = requests.post(
|
|
|
|
|
+ settings.GROQ_API_URL,
|
|
|
|
|
+ headers=headers,
|
|
|
|
|
+ json=payload,
|
|
|
|
|
+ timeout=30
|
|
|
|
|
+ )
|
|
|
|
|
+ response.raise_for_status()
|
|
|
|
|
+ result_text = response.json()["choices"][0]["message"]["content"].strip()
|
|
|
|
|
+
|
|
|
|
|
+ # Clean the response
|
|
|
|
|
+ result_text = ProductAttributeService._clean_json_response(result_text)
|
|
|
|
|
+
|
|
|
|
|
+ # Parse JSON
|
|
|
|
|
+ parsed = json.loads(result_text)
|
|
|
|
|
+
|
|
|
|
|
+ # Validate and restructure if needed
|
|
|
|
|
+ parsed = ProductAttributeService._validate_response_structure(
|
|
|
|
|
+ parsed, mandatory_attrs, extract_additional
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ return parsed
|
|
|
|
|
+
|
|
|
|
|
+ except requests.exceptions.RequestException as e:
|
|
|
|
|
+ return ProductAttributeService._create_error_response(
|
|
|
|
|
+ str(e), mandatory_attrs, extract_additional
|
|
|
|
|
+ )
|
|
|
|
|
+ except json.JSONDecodeError as e:
|
|
|
|
|
+ return ProductAttributeService._create_error_response(
|
|
|
|
|
+ f"Invalid JSON: {str(e)}", mandatory_attrs, extract_additional, result_text
|
|
|
|
|
+ )
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ return ProductAttributeService._create_error_response(
|
|
|
|
|
+ str(e), mandatory_attrs, extract_additional
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def extract_attributes_batch(
|
|
|
|
|
+ products: List[Dict],
|
|
|
|
|
+ mandatory_attrs: Dict[str, List[str]],
|
|
|
|
|
+ model: str = None,
|
|
|
|
|
+ extract_additional: bool = True,
|
|
|
|
|
+ process_image: bool = True,
|
|
|
|
|
+ max_workers: int = 5
|
|
|
|
|
+ ) -> Dict:
|
|
|
|
|
+ """Extract attributes for multiple products in parallel."""
|
|
|
|
|
+ results = []
|
|
|
|
|
+ successful = 0
|
|
|
|
|
+ failed = 0
|
|
|
|
|
+
|
|
|
|
|
+ ocr_service = OCRService()
|
|
|
|
|
+
|
|
|
|
|
+ def process_product(product_data):
|
|
|
|
|
+ """Process a single product."""
|
|
|
|
|
+ product_id = product_data.get('product_id', f"product_{len(results)}")
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ # Process image if URL is provided
|
|
|
|
|
+ ocr_results = None
|
|
|
|
|
+ ocr_text = None
|
|
|
|
|
+
|
|
|
|
|
+ if process_image and product_data.get('image_url'):
|
|
|
|
|
+ ocr_results = ocr_service.process_image(product_data['image_url'])
|
|
|
|
|
+
|
|
|
|
|
+ # Extract attributes from OCR
|
|
|
|
|
+ if ocr_results and ocr_results.get('detected_text'):
|
|
|
|
|
+ ocr_attrs = ProductAttributeService.extract_attributes_from_ocr(
|
|
|
|
|
+ ocr_results, model
|
|
|
|
|
+ )
|
|
|
|
|
+ ocr_results['extracted_attributes'] = ocr_attrs
|
|
|
|
|
+
|
|
|
|
|
+ # Format OCR text for combining with product text
|
|
|
|
|
+ ocr_text = "\n".join([
|
|
|
|
|
+ f"{item['text']} (confidence: {item['confidence']:.2f})"
|
|
|
|
|
+ for item in ocr_results['detected_text']
|
|
|
|
|
+ ])
|
|
|
|
|
+
|
|
|
|
|
+ # Combine all product information
|
|
|
|
|
+ product_text = ProductAttributeService.combine_product_text(
|
|
|
|
|
+ title=product_data.get('title'),
|
|
|
|
|
+ short_desc=product_data.get('short_desc'),
|
|
|
|
|
+ long_desc=product_data.get('long_desc'),
|
|
|
|
|
+ ocr_text=ocr_text
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # Extract attributes from combined text
|
|
|
|
|
+ result = ProductAttributeService.extract_attributes(
|
|
|
|
|
+ product_text=product_text,
|
|
|
|
|
+ mandatory_attrs=mandatory_attrs,
|
|
|
|
|
+ model=model,
|
|
|
|
|
+ extract_additional=extract_additional
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ result['product_id'] = product_id
|
|
|
|
|
+
|
|
|
|
|
+ # Add OCR results if available
|
|
|
|
|
+ if ocr_results:
|
|
|
|
|
+ result['ocr_results'] = ocr_results
|
|
|
|
|
+
|
|
|
|
|
+ # Check if extraction was successful
|
|
|
|
|
+ if 'error' not in result:
|
|
|
|
|
+ return result, True
|
|
|
|
|
+ else:
|
|
|
|
|
+ return result, False
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ return {
|
|
|
|
|
+ 'product_id': product_id,
|
|
|
|
|
+ 'mandatory': {attr: "Not Specified" for attr in mandatory_attrs.keys()},
|
|
|
|
|
+ 'additional': {} if extract_additional else None,
|
|
|
|
|
+ 'error': f"Processing error: {str(e)}"
|
|
|
|
|
+ }, False
|
|
|
|
|
+
|
|
|
|
|
+ # Process products in parallel
|
|
|
|
|
+ with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
|
|
|
+ future_to_product = {
|
|
|
|
|
+ executor.submit(process_product, product): product
|
|
|
|
|
+ for product in products
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ for future in as_completed(future_to_product):
|
|
|
|
|
+ try:
|
|
|
|
|
+ result, success = future.result()
|
|
|
|
|
+ results.append(result)
|
|
|
|
|
+ if success:
|
|
|
|
|
+ successful += 1
|
|
|
|
|
+ else:
|
|
|
|
|
+ failed += 1
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ failed += 1
|
|
|
|
|
+ results.append({
|
|
|
|
|
+ 'product_id': 'unknown',
|
|
|
|
|
+ 'mandatory': {attr: "Not Specified" for attr in mandatory_attrs.keys()},
|
|
|
|
|
+ 'additional': {} if extract_additional else None,
|
|
|
|
|
+ 'error': f"Unexpected error: {str(e)}"
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ 'results': results,
|
|
|
|
|
+ 'total_products': len(products),
|
|
|
|
|
+ 'successful': successful,
|
|
|
|
|
+ 'failed': failed
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _clean_json_response(text: str) -> str:
|
|
|
|
|
+ """Clean LLM response to extract valid JSON."""
|
|
|
|
|
+ start_idx = text.find('{')
|
|
|
|
|
+ end_idx = text.rfind('}')
|
|
|
|
|
+
|
|
|
|
|
+ if start_idx != -1 and end_idx != -1:
|
|
|
|
|
+ text = text[start_idx:end_idx + 1]
|
|
|
|
|
+
|
|
|
|
|
+ if "```json" in text:
|
|
|
|
|
+ text = text.split("```json")[1].split("```")[0].strip()
|
|
|
|
|
+ elif "```" in text:
|
|
|
|
|
+ text = text.split("```")[1].split("```")[0].strip()
|
|
|
|
|
+ if text.startswith("json"):
|
|
|
|
|
+ text = text[4:].strip()
|
|
|
|
|
+
|
|
|
|
|
+ return text
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _validate_response_structure(
|
|
|
|
|
+ parsed: dict,
|
|
|
|
|
+ mandatory_attrs: Dict[str, List[str]],
|
|
|
|
|
+ extract_additional: bool
|
|
|
|
|
+ ) -> dict:
|
|
|
|
|
+ """Validate and fix the response structure."""
|
|
|
|
|
+ expected_sections = ["mandatory"]
|
|
|
|
|
+ if extract_additional:
|
|
|
|
|
+ expected_sections.append("additional")
|
|
|
|
|
+
|
|
|
|
|
+ if not all(section in parsed for section in expected_sections):
|
|
|
|
|
+ if isinstance(parsed, dict):
|
|
|
|
|
+ mandatory_keys = set(mandatory_attrs.keys())
|
|
|
|
|
+ mandatory = {k: v for k, v in parsed.items() if k in mandatory_keys}
|
|
|
|
|
+ additional = {k: v for k, v in parsed.items() if k not in mandatory_keys}
|
|
|
|
|
+
|
|
|
|
|
+ result = {"mandatory": mandatory}
|
|
|
|
|
+ if extract_additional:
|
|
|
|
|
+ result["additional"] = additional
|
|
|
|
|
+ return result
|
|
|
|
|
+ else:
|
|
|
|
|
+ return ProductAttributeService._create_error_response(
|
|
|
|
|
+ "Invalid response structure",
|
|
|
|
|
+ mandatory_attrs,
|
|
|
|
|
+ extract_additional,
|
|
|
|
|
+ str(parsed)
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ return parsed
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _create_error_response(
|
|
|
|
|
+ error: str,
|
|
|
|
|
+ mandatory_attrs: Dict[str, List[str]],
|
|
|
|
|
+ extract_additional: bool,
|
|
|
|
|
+ raw_output: Optional[str] = None
|
|
|
|
|
+ ) -> dict:
|
|
|
|
|
+ """Create a standardized error response."""
|
|
|
|
|
+ response = {
|
|
|
|
|
+ "mandatory": {attr: "Not Specified" for attr in mandatory_attrs.keys()},
|
|
|
|
|
+ "error": error
|
|
|
|
|
+ }
|
|
|
|
|
+ if extract_additional:
|
|
|
|
|
+ response["additional"] = {}
|
|
|
|
|
+ if raw_output:
|
|
|
|
|
+ response["raw_output"] = raw_output
|
|
|
|
|
+ return response
|
|
|
|
|
+
|