|
@@ -1,2303 +1,3 @@
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-# # # ==================== services.py ====================
|
|
|
|
|
-# # import requests
|
|
|
|
|
-# # import json
|
|
|
|
|
-# # import re
|
|
|
|
|
-# # from typing import Dict, List, Optional, Tuple
|
|
|
|
|
-# # from django.conf import settings
|
|
|
|
|
-# # from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
|
-# # from sentence_transformers import SentenceTransformer, util
|
|
|
|
|
-# # import numpy as np
|
|
|
|
|
-# # from .ocr_service import OCRService
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-# # # Initialize embedding model for normalization
|
|
|
|
|
-# # model_embedder = SentenceTransformer("all-MiniLM-L6-v2")
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-# # class ProductAttributeService:
|
|
|
|
|
-# # """Service class for extracting product attributes using Groq LLM."""
|
|
|
|
|
-
|
|
|
|
|
-# # @staticmethod
|
|
|
|
|
-# # def normalize_dimension_text(text: str) -> str:
|
|
|
|
|
-# # """
|
|
|
|
|
-# # Normalize dimension text to match format like '16x20', '20x30', etc.
|
|
|
|
|
-# # Handles formats like '16 x 20', '16x1.5x20', '16 x 1.5 x 20 Inches'
|
|
|
|
|
-# # Returns the normalized dimension (e.g., '16x20') or empty string if not found.
|
|
|
|
|
-# # """
|
|
|
|
|
-# # if not text:
|
|
|
|
|
-# # return ""
|
|
|
|
|
-
|
|
|
|
|
-# # # Convert to lowercase and remove common units
|
|
|
|
|
-# # text = text.lower()
|
|
|
|
|
-# # text = re.sub(r'\s*(inches|inch|in|cm|centimeters|mm|millimeters)\s*', '', text, flags=re.IGNORECASE)
|
|
|
|
|
-
|
|
|
|
|
-# # # Extract all numbers from the text
|
|
|
|
|
-# # numbers = re.findall(r'\d+\.?\d*', text)
|
|
|
|
|
-
|
|
|
|
|
-# # if not numbers:
|
|
|
|
|
-# # return ""
|
|
|
|
|
-
|
|
|
|
|
-# # # Convert to floats first to handle decimals properly
|
|
|
|
|
-# # float_numbers = []
|
|
|
|
|
-# # for num in numbers:
|
|
|
|
|
-# # try:
|
|
|
|
|
-# # float_numbers.append(float(num))
|
|
|
|
|
-# # except:
|
|
|
|
|
-# # continue
|
|
|
|
|
-
|
|
|
|
|
-# # if len(float_numbers) < 2:
|
|
|
|
|
-# # return ""
|
|
|
|
|
-
|
|
|
|
|
-# # # If we have 3 dimensions, it's likely Width x Depth x Height
|
|
|
|
|
-# # # For wall art, depth is usually small (< 5), so we keep first and last
|
|
|
|
|
-# # if len(float_numbers) == 3:
|
|
|
|
|
-# # # Keep first and last values (width and height), skip middle (depth)
|
|
|
|
|
-# # float_numbers = [float_numbers[0], float_numbers[2]]
|
|
|
|
|
-# # elif len(float_numbers) > 3:
|
|
|
|
|
-# # # If more than 3 dimensions, keep the two largest
|
|
|
|
|
-# # float_numbers = sorted(float_numbers)[-2:]
|
|
|
|
|
-# # else:
|
|
|
|
|
-# # # Just 2 dimensions, use as is
|
|
|
|
|
-# # float_numbers = float_numbers[:2]
|
|
|
|
|
-
|
|
|
|
|
-# # # Format numbers: use integer if whole, else one decimal
|
|
|
|
|
-# # formatted_numbers = []
|
|
|
|
|
-# # for num in float_numbers:
|
|
|
|
|
-# # if num.is_integer():
|
|
|
|
|
-# # formatted_numbers.append(str(int(num)))
|
|
|
|
|
-# # else:
|
|
|
|
|
-# # formatted_numbers.append(f"{num:.1f}")
|
|
|
|
|
-
|
|
|
|
|
-# # # Sort to ensure consistent order (smaller x larger)
|
|
|
|
|
-# # formatted_numbers.sort(key=lambda x: float(x))
|
|
|
|
|
-
|
|
|
|
|
-# # # Return formatted dimension
|
|
|
|
|
-# # return f"{formatted_numbers[0]}x{formatted_numbers[1]}"
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-# # @staticmethod
|
|
|
|
|
-# # def normalize_value_for_matching(value: str, attr_name: str = "") -> str:
|
|
|
|
|
-# # """
|
|
|
|
|
-# # Normalize a value based on its attribute type for better matching.
|
|
|
|
|
-# # Currently handles dimensions specially, can be extended for other attributes.
|
|
|
|
|
-# # """
|
|
|
|
|
-# # # Check if this is a dimension-related attribute
|
|
|
|
|
-# # dimension_keywords = ['dimension', 'size', 'measurement']
|
|
|
|
|
-# # if any(keyword in attr_name.lower() for keyword in dimension_keywords):
|
|
|
|
|
-# # normalized = ProductAttributeService.normalize_dimension_text(value)
|
|
|
|
|
-# # if normalized:
|
|
|
|
|
-# # return normalized
|
|
|
|
|
-
|
|
|
|
|
-# # # For other attributes, just return cleaned value
|
|
|
|
|
-# # return value.strip()
|
|
|
|
|
-
|
|
|
|
|
-# # @staticmethod
|
|
|
|
|
-# # def combine_product_text(
|
|
|
|
|
-# # title: Optional[str] = None,
|
|
|
|
|
-# # short_desc: Optional[str] = None,
|
|
|
|
|
-# # long_desc: Optional[str] = None,
|
|
|
|
|
-# # ocr_text: Optional[str] = None
|
|
|
|
|
-# # ) -> Tuple[str, Dict[str, str]]:
|
|
|
|
|
-# # """
|
|
|
|
|
-# # Combine product metadata into a single text block.
|
|
|
|
|
-# # Returns: (combined_text, source_map) where source_map tracks which text came from where
|
|
|
|
|
-# # """
|
|
|
|
|
-# # parts = []
|
|
|
|
|
-# # source_map = {}
|
|
|
|
|
-
|
|
|
|
|
-# # if title:
|
|
|
|
|
-# # title_str = str(title).strip()
|
|
|
|
|
-# # parts.append(f"Title: {title_str}")
|
|
|
|
|
-# # source_map['title'] = title_str
|
|
|
|
|
-# # if short_desc:
|
|
|
|
|
-# # short_str = str(short_desc).strip()
|
|
|
|
|
-# # parts.append(f"Description: {short_str}")
|
|
|
|
|
-# # source_map['short_desc'] = short_str
|
|
|
|
|
-# # if long_desc:
|
|
|
|
|
-# # long_str = str(long_desc).strip()
|
|
|
|
|
-# # parts.append(f"Details: {long_str}")
|
|
|
|
|
-# # source_map['long_desc'] = long_str
|
|
|
|
|
-# # if ocr_text:
|
|
|
|
|
-# # parts.append(f"OCR Text: {ocr_text}")
|
|
|
|
|
-# # source_map['ocr_text'] = ocr_text
|
|
|
|
|
-
|
|
|
|
|
-# # combined = "\n".join(parts).strip()
|
|
|
|
|
-
|
|
|
|
|
-# # if not combined:
|
|
|
|
|
-# # return "No product information available", {}
|
|
|
|
|
-
|
|
|
|
|
-# # return combined, source_map
|
|
|
|
|
-
|
|
|
|
|
-# # @staticmethod
|
|
|
|
|
-# # def find_value_source(value: str, source_map: Dict[str, str], attr_name: str = "") -> str:
|
|
|
|
|
-# # """
|
|
|
|
|
-# # Find which source(s) contain the given value.
|
|
|
|
|
-# # Returns the source name(s) where the value appears.
|
|
|
|
|
-# # Now handles normalized matching for dimensions.
|
|
|
|
|
-# # """
|
|
|
|
|
-# # value_lower = value.lower()
|
|
|
|
|
-# # # Split value into tokens for better matching
|
|
|
|
|
-# # value_tokens = set(value_lower.replace("-", " ").replace("x", " ").split())
|
|
|
|
|
-
|
|
|
|
|
-# # # Check if this is a dimension-related attribute
|
|
|
|
|
-# # is_dimension_attr = any(keyword in attr_name.lower() for keyword in ['dimension', 'size', 'measurement'])
|
|
|
|
|
-
|
|
|
|
|
-# # sources_found = []
|
|
|
|
|
-# # source_scores = {}
|
|
|
|
|
-
|
|
|
|
|
-# # for source_name, source_text in source_map.items():
|
|
|
|
|
-# # source_lower = source_text.lower()
|
|
|
|
|
-
|
|
|
|
|
-# # # Check for exact phrase match first
|
|
|
|
|
-# # if value_lower in source_lower:
|
|
|
|
|
-# # source_scores[source_name] = 1.0
|
|
|
|
|
-# # continue
|
|
|
|
|
-
|
|
|
|
|
-# # # For dimensions, check normalized match
|
|
|
|
|
-# # if is_dimension_attr:
|
|
|
|
|
-# # # Normalize the value (e.g., "16x20" stays "16x20")
|
|
|
|
|
-# # normalized_value = ProductAttributeService.normalize_dimension_text(value)
|
|
|
|
|
-# # if not normalized_value:
|
|
|
|
|
-# # normalized_value = value.replace("x", " ").strip()
|
|
|
|
|
-
|
|
|
|
|
-# # # Normalize the source text to extract dimensions
|
|
|
|
|
-# # normalized_source = ProductAttributeService.normalize_dimension_text(source_text)
|
|
|
|
|
-
|
|
|
|
|
-# # # Direct match
|
|
|
|
|
-# # if normalized_value == normalized_source:
|
|
|
|
|
-# # source_scores[source_name] = 0.95
|
|
|
|
|
-# # continue
|
|
|
|
|
-
|
|
|
|
|
-# # # Also check if the dimension numbers appear in the source
|
|
|
|
|
-# # # Extract dimension parts (e.g., "16x20" -> ["16", "20"])
|
|
|
|
|
-# # dim_parts = normalized_value.split("x") if "x" in normalized_value else []
|
|
|
|
|
-# # if len(dim_parts) == 2:
|
|
|
|
|
-# # # Check if both numbers appear in the source
|
|
|
|
|
-# # if all(part in source_text for part in dim_parts):
|
|
|
|
|
-# # source_scores[source_name] = 0.85
|
|
|
|
|
-# # continue
|
|
|
|
|
-
|
|
|
|
|
-# # # Check for token matches
|
|
|
|
|
-# # token_matches = sum(1 for token in value_tokens if token and token in source_lower)
|
|
|
|
|
-# # if token_matches > 0 and len(value_tokens) > 0:
|
|
|
|
|
-# # source_scores[source_name] = token_matches / len(value_tokens)
|
|
|
|
|
-
|
|
|
|
|
-# # # Return source with highest score, or all sources if multiple have same score
|
|
|
|
|
-# # if source_scores:
|
|
|
|
|
-# # max_score = max(source_scores.values())
|
|
|
|
|
-# # sources_found = [s for s, score in source_scores.items() if score == max_score]
|
|
|
|
|
-
|
|
|
|
|
-# # # Prioritize: title > short_desc > long_desc > ocr_text
|
|
|
|
|
-# # priority = ['title', 'short_desc', 'long_desc', 'ocr_text']
|
|
|
|
|
-# # for p in priority:
|
|
|
|
|
-# # if p in sources_found:
|
|
|
|
|
-# # return p
|
|
|
|
|
-
|
|
|
|
|
-# # return sources_found[0] if sources_found else "Not found"
|
|
|
|
|
-
|
|
|
|
|
-# # return "Not found"
|
|
|
|
|
-
|
|
|
|
|
-# # @staticmethod
|
|
|
|
|
-# # def format_visual_attributes(visual_attributes: Dict) -> Dict:
|
|
|
|
|
-# # """
|
|
|
|
|
-# # Convert visual attributes to array format with source tracking.
|
|
|
|
|
-# # Source is always 'image' for visual attributes.
|
|
|
|
|
-# # """
|
|
|
|
|
-# # formatted = {}
|
|
|
|
|
-
|
|
|
|
|
-# # for key, value in visual_attributes.items():
|
|
|
|
|
-# # if isinstance(value, list):
|
|
|
|
|
-# # # Already a list (like color_palette)
|
|
|
|
|
-# # formatted[key] = [{"value": str(item), "source": "image"} for item in value]
|
|
|
|
|
-# # elif isinstance(value, dict):
|
|
|
|
|
-# # # Nested dictionary - format recursively
|
|
|
|
|
-# # nested_formatted = {}
|
|
|
|
|
-# # for nested_key, nested_value in value.items():
|
|
|
|
|
-# # if isinstance(nested_value, list):
|
|
|
|
|
-# # nested_formatted[nested_key] = [{"value": str(item), "source": "image"} for item in nested_value]
|
|
|
|
|
-# # else:
|
|
|
|
|
-# # nested_formatted[nested_key] = [{"value": str(nested_value), "source": "image"}]
|
|
|
|
|
-# # formatted[key] = nested_formatted
|
|
|
|
|
-# # else:
|
|
|
|
|
-# # # Single value
|
|
|
|
|
-# # formatted[key] = [{"value": str(value), "source": "image"}]
|
|
|
|
|
-
|
|
|
|
|
-# # return formatted
|
|
|
|
|
-
|
|
|
|
|
-# # @staticmethod
|
|
|
|
|
-# # def extract_attributes_from_ocr(ocr_results: Dict, model: str = None) -> Dict:
|
|
|
|
|
-# # """Extract structured attributes from OCR text using LLM."""
|
|
|
|
|
-# # if model is None:
|
|
|
|
|
-# # model = settings.SUPPORTED_MODELS[0]
|
|
|
|
|
-
|
|
|
|
|
-# # detected_text = ocr_results.get('detected_text', [])
|
|
|
|
|
-# # if not detected_text:
|
|
|
|
|
-# # return {}
|
|
|
|
|
-
|
|
|
|
|
-# # # Format OCR text for prompt
|
|
|
|
|
-# # ocr_text = "\n".join([f"Text: {item['text']}, Confidence: {item['confidence']:.2f}"
|
|
|
|
|
-# # for item in detected_text])
|
|
|
|
|
-
|
|
|
|
|
-# # prompt = f"""
|
|
|
|
|
-# # You are an AI model that extracts structured attributes from OCR text detected on product images.
|
|
|
|
|
-# # Given the OCR detections below, infer the possible product attributes and return them as a clean JSON object.
|
|
|
|
|
-
|
|
|
|
|
-# # OCR Text:
|
|
|
|
|
-# # {ocr_text}
|
|
|
|
|
-
|
|
|
|
|
-# # Extract relevant attributes like:
|
|
|
|
|
-# # - brand
|
|
|
|
|
-# # - model_number
|
|
|
|
|
-# # - size (waist_size, length, etc.)
|
|
|
|
|
-# # - collection
|
|
|
|
|
-# # - any other relevant product information
|
|
|
|
|
-
|
|
|
|
|
-# # Return a JSON object with only the attributes you can confidently identify.
|
|
|
|
|
-# # If an attribute is not present, do not include it in the response.
|
|
|
|
|
-# # """
|
|
|
|
|
-
|
|
|
|
|
-# # payload = {
|
|
|
|
|
-# # "model": model,
|
|
|
|
|
-# # "messages": [
|
|
|
|
|
-# # {
|
|
|
|
|
-# # "role": "system",
|
|
|
|
|
-# # "content": "You are a helpful AI that extracts structured data from OCR output. Return only valid JSON."
|
|
|
|
|
-# # },
|
|
|
|
|
-# # {"role": "user", "content": prompt}
|
|
|
|
|
-# # ],
|
|
|
|
|
-# # "temperature": 0.2,
|
|
|
|
|
-# # "max_tokens": 500
|
|
|
|
|
-# # }
|
|
|
|
|
-
|
|
|
|
|
-# # headers = {
|
|
|
|
|
-# # "Authorization": f"Bearer {settings.GROQ_API_KEY}",
|
|
|
|
|
-# # "Content-Type": "application/json",
|
|
|
|
|
-# # }
|
|
|
|
|
-
|
|
|
|
|
-# # try:
|
|
|
|
|
-# # response = requests.post(
|
|
|
|
|
-# # settings.GROQ_API_URL,
|
|
|
|
|
-# # headers=headers,
|
|
|
|
|
-# # json=payload,
|
|
|
|
|
-# # timeout=30
|
|
|
|
|
-# # )
|
|
|
|
|
-# # response.raise_for_status()
|
|
|
|
|
-# # result_text = response.json()["choices"][0]["message"]["content"].strip()
|
|
|
|
|
-
|
|
|
|
|
-# # # Clean and parse JSON
|
|
|
|
|
-# # result_text = ProductAttributeService._clean_json_response(result_text)
|
|
|
|
|
-# # parsed = json.loads(result_text)
|
|
|
|
|
-
|
|
|
|
|
-# # # Convert to array format with source tracking
|
|
|
|
|
-# # formatted_attributes = {}
|
|
|
|
|
-# # for key, value in parsed.items():
|
|
|
|
|
-# # if key == "error":
|
|
|
|
|
-# # continue
|
|
|
|
|
-
|
|
|
|
|
-# # # Handle nested dictionaries (like size)
|
|
|
|
|
-# # if isinstance(value, dict):
|
|
|
|
|
-# # nested_formatted = {}
|
|
|
|
|
-# # for nested_key, nested_value in value.items():
|
|
|
|
|
-# # nested_formatted[nested_key] = [{"value": str(nested_value), "source": "image"}]
|
|
|
|
|
-# # formatted_attributes[key] = nested_formatted
|
|
|
|
|
-# # elif isinstance(value, list):
|
|
|
|
|
-# # # Already a list, convert each item
|
|
|
|
|
-# # formatted_attributes[key] = [{"value": str(item), "source": "image"} for item in value]
|
|
|
|
|
-# # else:
|
|
|
|
|
-# # # Single value
|
|
|
|
|
-# # formatted_attributes[key] = [{"value": str(value), "source": "image"}]
|
|
|
|
|
-
|
|
|
|
|
-# # return formatted_attributes
|
|
|
|
|
-# # except Exception as e:
|
|
|
|
|
-# # return {"error": f"Failed to extract attributes from OCR: {str(e)}"}
|
|
|
|
|
-
|
|
|
|
|
-# # @staticmethod
|
|
|
|
|
-# # def calculate_attribute_relationships(
|
|
|
|
|
-# # mandatory_attrs: Dict[str, List[str]],
|
|
|
|
|
-# # product_text: str
|
|
|
|
|
-# # ) -> Dict[str, float]:
|
|
|
|
|
-# # """
|
|
|
|
|
-# # Calculate semantic relationships between attribute values across different attributes.
|
|
|
|
|
-# # Returns a matrix of cross-attribute value similarities.
|
|
|
|
|
-# # """
|
|
|
|
|
-# # pt_emb = model_embedder.encode(product_text, convert_to_tensor=True)
|
|
|
|
|
-
|
|
|
|
|
-# # # Calculate similarities between all attribute values and product text
|
|
|
|
|
-# # attr_scores = {}
|
|
|
|
|
-# # for attr, values in mandatory_attrs.items():
|
|
|
|
|
-# # attr_scores[attr] = {}
|
|
|
|
|
-# # for val in values:
|
|
|
|
|
-# # contexts = [val, f"for {val}", f"use in {val}", f"suitable for {val}"]
|
|
|
|
|
-# # ctx_embs = [model_embedder.encode(c, convert_to_tensor=True) for c in contexts]
|
|
|
|
|
-# # sem_sim = max(float(util.cos_sim(pt_emb, ce).item()) for ce in ctx_embs)
|
|
|
|
|
-# # attr_scores[attr][val] = sem_sim
|
|
|
|
|
-
|
|
|
|
|
-# # # Calculate cross-attribute value relationships
|
|
|
|
|
-# # relationships = {}
|
|
|
|
|
-# # attr_list = list(mandatory_attrs.keys())
|
|
|
|
|
-
|
|
|
|
|
-# # for i, attr1 in enumerate(attr_list):
|
|
|
|
|
-# # for attr2 in attr_list[i+1:]:
|
|
|
|
|
-# # # Calculate pairwise similarities between values of different attributes
|
|
|
|
|
-# # for val1 in mandatory_attrs[attr1]:
|
|
|
|
|
-# # for val2 in mandatory_attrs[attr2]:
|
|
|
|
|
-# # emb1 = model_embedder.encode(val1, convert_to_tensor=True)
|
|
|
|
|
-# # emb2 = model_embedder.encode(val2, convert_to_tensor=True)
|
|
|
|
|
-# # sim = float(util.cos_sim(emb1, emb2).item())
|
|
|
|
|
-
|
|
|
|
|
-# # # Store bidirectional relationships
|
|
|
|
|
-# # key1 = f"{attr1}:{val1}->{attr2}:{val2}"
|
|
|
|
|
-# # key2 = f"{attr2}:{val2}->{attr1}:{val1}"
|
|
|
|
|
-# # relationships[key1] = sim
|
|
|
|
|
-# # relationships[key2] = sim
|
|
|
|
|
-
|
|
|
|
|
-# # return relationships
|
|
|
|
|
-
|
|
|
|
|
-# # @staticmethod
|
|
|
|
|
-# # def calculate_value_clusters(
|
|
|
|
|
-# # values: List[str],
|
|
|
|
|
-# # scores: List[Tuple[str, float]],
|
|
|
|
|
-# # cluster_threshold: float = 0.4
|
|
|
|
|
-# # ) -> List[List[str]]:
|
|
|
|
|
-# # """
|
|
|
|
|
-# # Group values into semantic clusters based on their similarity to each other.
|
|
|
|
|
-# # Returns clusters of related values.
|
|
|
|
|
-# # """
|
|
|
|
|
-# # if len(values) <= 1:
|
|
|
|
|
-# # return [[val] for val, _ in scores]
|
|
|
|
|
-
|
|
|
|
|
-# # # Get embeddings for all values
|
|
|
|
|
-# # embeddings = [model_embedder.encode(val, convert_to_tensor=True) for val in values]
|
|
|
|
|
-
|
|
|
|
|
-# # # Calculate pairwise similarities
|
|
|
|
|
-# # similarity_matrix = np.zeros((len(values), len(values)))
|
|
|
|
|
-# # for i in range(len(values)):
|
|
|
|
|
-# # for j in range(i+1, len(values)):
|
|
|
|
|
-# # sim = float(util.cos_sim(embeddings[i], embeddings[j]).item())
|
|
|
|
|
-# # similarity_matrix[i][j] = sim
|
|
|
|
|
-# # similarity_matrix[j][i] = sim
|
|
|
|
|
-
|
|
|
|
|
-# # # Simple clustering: group values with high similarity
|
|
|
|
|
-# # clusters = []
|
|
|
|
|
-# # visited = set()
|
|
|
|
|
-
|
|
|
|
|
-# # for i, (val, score) in enumerate(scores):
|
|
|
|
|
-# # if i in visited:
|
|
|
|
|
-# # continue
|
|
|
|
|
-
|
|
|
|
|
-# # cluster = [val]
|
|
|
|
|
-# # visited.add(i)
|
|
|
|
|
-
|
|
|
|
|
-# # # Find similar values
|
|
|
|
|
-# # for j in range(len(values)):
|
|
|
|
|
-# # if j not in visited and similarity_matrix[i][j] >= cluster_threshold:
|
|
|
|
|
-# # cluster.append(values[j])
|
|
|
|
|
-# # visited.add(j)
|
|
|
|
|
-
|
|
|
|
|
-# # clusters.append(cluster)
|
|
|
|
|
-
|
|
|
|
|
-# # return clusters
|
|
|
|
|
-
|
|
|
|
|
-# # @staticmethod
|
|
|
|
|
-# # def get_dynamic_threshold(
|
|
|
|
|
-# # attr: str,
|
|
|
|
|
-# # val: str,
|
|
|
|
|
-# # base_score: float,
|
|
|
|
|
-# # extracted_attrs: Dict[str, List[Dict[str, str]]],
|
|
|
|
|
-# # relationships: Dict[str, float],
|
|
|
|
|
-# # mandatory_attrs: Dict[str, List[str]],
|
|
|
|
|
-# # base_threshold: float = 0.65,
|
|
|
|
|
-# # boost_factor: float = 0.15
|
|
|
|
|
-# # ) -> float:
|
|
|
|
|
-# # """
|
|
|
|
|
-# # Calculate dynamic threshold based on relationships with already-extracted attributes.
|
|
|
|
|
-# # """
|
|
|
|
|
-# # threshold = base_threshold
|
|
|
|
|
-
|
|
|
|
|
-# # # Check relationships with already extracted attributes
|
|
|
|
|
-# # max_relationship = 0.0
|
|
|
|
|
-# # for other_attr, other_values_list in extracted_attrs.items():
|
|
|
|
|
-# # if other_attr == attr:
|
|
|
|
|
-# # continue
|
|
|
|
|
-
|
|
|
|
|
-# # for other_val_dict in other_values_list:
|
|
|
|
|
-# # other_val = other_val_dict['value']
|
|
|
|
|
-# # key = f"{attr}:{val}->{other_attr}:{other_val}"
|
|
|
|
|
-# # if key in relationships:
|
|
|
|
|
-# # max_relationship = max(max_relationship, relationships[key])
|
|
|
|
|
-
|
|
|
|
|
-# # # If strong relationship exists, lower threshold
|
|
|
|
|
-# # if max_relationship > 0.6:
|
|
|
|
|
-# # threshold = base_threshold - (boost_factor * max_relationship)
|
|
|
|
|
-
|
|
|
|
|
-# # return max(0.3, threshold)
|
|
|
|
|
-
|
|
|
|
|
-# # @staticmethod
|
|
|
|
|
-# # def get_adaptive_margin(
|
|
|
|
|
-# # scores: List[Tuple[str, float]],
|
|
|
|
|
-# # base_margin: float = 0.15,
|
|
|
|
|
-# # max_margin: float = 0.22
|
|
|
|
|
-# # ) -> float:
|
|
|
|
|
-# # """
|
|
|
|
|
-# # Calculate adaptive margin based on score distribution.
|
|
|
|
|
-# # """
|
|
|
|
|
-# # if len(scores) < 2:
|
|
|
|
|
-# # return base_margin
|
|
|
|
|
-
|
|
|
|
|
-# # score_values = [s for _, s in scores]
|
|
|
|
|
-# # best_score = score_values[0]
|
|
|
|
|
-
|
|
|
|
|
-# # # If best score is very low, use adaptive margin but be more conservative
|
|
|
|
|
-# # if best_score < 0.5:
|
|
|
|
|
-# # # Calculate score spread in top 3-4 scores only (more selective)
|
|
|
|
|
-# # top_scores = score_values[:min(4, len(score_values))]
|
|
|
|
|
-# # score_range = max(top_scores) - min(top_scores)
|
|
|
|
|
-
|
|
|
|
|
-# # # Very controlled margin increase
|
|
|
|
|
-# # if score_range < 0.30:
|
|
|
|
|
-# # # Much more conservative scaling
|
|
|
|
|
-# # score_factor = (0.5 - best_score) * 0.35
|
|
|
|
|
-# # adaptive = base_margin + score_factor + (0.30 - score_range) * 0.2
|
|
|
|
|
-# # return min(adaptive, max_margin)
|
|
|
|
|
-
|
|
|
|
|
-# # return base_margin
|
|
|
|
|
-
|
|
|
|
|
-# # @staticmethod
|
|
|
|
|
-# # def _lexical_evidence(product_text: str, label: str) -> float:
|
|
|
|
|
-# # """Calculate lexical overlap between product text and label."""
|
|
|
|
|
-# # pt = product_text.lower()
|
|
|
|
|
-# # tokens = [t for t in label.lower().replace("-", " ").split() if t]
|
|
|
|
|
-# # if not tokens:
|
|
|
|
|
-# # return 0.0
|
|
|
|
|
-# # hits = sum(1 for t in tokens if t in pt)
|
|
|
|
|
-# # return hits / len(tokens)
|
|
|
|
|
-
|
|
|
|
|
-# # @staticmethod
|
|
|
|
|
-# # def normalize_against_product_text(
|
|
|
|
|
-# # product_text: str,
|
|
|
|
|
-# # mandatory_attrs: Dict[str, List[str]],
|
|
|
|
|
-# # source_map: Dict[str, str],
|
|
|
|
|
-# # threshold_abs: float = 0.65,
|
|
|
|
|
-# # margin: float = 0.15,
|
|
|
|
|
-# # allow_multiple: bool = False,
|
|
|
|
|
-# # sem_weight: float = 0.8,
|
|
|
|
|
-# # lex_weight: float = 0.2,
|
|
|
|
|
-# # extracted_attrs: Optional[Dict[str, List[Dict[str, str]]]] = None,
|
|
|
|
|
-# # relationships: Optional[Dict[str, float]] = None,
|
|
|
|
|
-# # use_dynamic_thresholds: bool = True,
|
|
|
|
|
-# # use_adaptive_margin: bool = True,
|
|
|
|
|
-# # use_semantic_clustering: bool = True
|
|
|
|
|
-# # ) -> dict:
|
|
|
|
|
-# # """
|
|
|
|
|
-# # Score each allowed value against the product_text with dynamic thresholds.
|
|
|
|
|
-# # Returns dict with values in array format: [{"value": "...", "source": "..."}]
|
|
|
|
|
-# # """
|
|
|
|
|
-# # if extracted_attrs is None:
|
|
|
|
|
-# # extracted_attrs = {}
|
|
|
|
|
-# # if relationships is None:
|
|
|
|
|
-# # relationships = {}
|
|
|
|
|
-
|
|
|
|
|
-# # pt_emb = model_embedder.encode(product_text, convert_to_tensor=True)
|
|
|
|
|
-# # extracted = {}
|
|
|
|
|
-
|
|
|
|
|
-# # for attr, allowed_values in mandatory_attrs.items():
|
|
|
|
|
-# # scores: List[Tuple[str, float]] = []
|
|
|
|
|
-
|
|
|
|
|
-# # # Check if this is a dimension attribute
|
|
|
|
|
-# # is_dimension_attr = any(keyword in attr.lower() for keyword in ['dimension', 'size', 'measurement'])
|
|
|
|
|
-
|
|
|
|
|
-# # # Normalize product text once for dimension matching
|
|
|
|
|
-# # normalized_product_text = ProductAttributeService.normalize_dimension_text(product_text) if is_dimension_attr else ""
|
|
|
|
|
-
|
|
|
|
|
-# # for val in allowed_values:
|
|
|
|
|
-# # # For dimension attributes, try exact normalized matching first
|
|
|
|
|
-# # if is_dimension_attr:
|
|
|
|
|
-# # # Normalize the allowed value from the list
|
|
|
|
|
-# # normalized_val = ProductAttributeService.normalize_dimension_text(val)
|
|
|
|
|
-
|
|
|
|
|
-# # # If we have both normalized values and they match exactly, give highest score
|
|
|
|
|
-# # if normalized_val and normalized_product_text and normalized_val == normalized_product_text:
|
|
|
|
|
-# # scores.append((val, 1.0))
|
|
|
|
|
-# # continue
|
|
|
|
|
-
|
|
|
|
|
-# # # Also check if the normalized value appears in the original product text
|
|
|
|
|
-# # # This handles cases where the format might be slightly different
|
|
|
|
|
-# # if normalized_val:
|
|
|
|
|
-# # # Extract just the numbers for flexible matching
|
|
|
|
|
-# # val_numbers = normalized_val.split('x')
|
|
|
|
|
-# # # Check if both numbers appear in the product text in close proximity
|
|
|
|
|
-# # text_lower = product_text.lower()
|
|
|
|
|
-# # if all(num in text_lower for num in val_numbers):
|
|
|
|
|
-# # # Calculate proximity score
|
|
|
|
|
-# # idx1 = text_lower.find(val_numbers[0])
|
|
|
|
|
-# # idx2 = text_lower.find(val_numbers[1])
|
|
|
|
|
-# # if idx1 != -1 and idx2 != -1:
|
|
|
|
|
-# # distance = abs(idx2 - idx1)
|
|
|
|
|
-# # # If numbers are close together (within 20 characters), high score
|
|
|
|
|
-# # if distance < 20:
|
|
|
|
|
-# # scores.append((val, 0.95))
|
|
|
|
|
-# # continue
|
|
|
|
|
-
|
|
|
|
|
-# # # Standard semantic matching for all attributes
|
|
|
|
|
-# # contexts = [val, f"for {val}", f"use in {val}", f"suitable for {val}", f"{val} room"]
|
|
|
|
|
-# # ctx_embs = [model_embedder.encode(c, convert_to_tensor=True) for c in contexts]
|
|
|
|
|
-# # sem_sim = max(float(util.cos_sim(pt_emb, ce).item()) for ce in ctx_embs)
|
|
|
|
|
-
|
|
|
|
|
-# # lex_score = ProductAttributeService._lexical_evidence(product_text, val)
|
|
|
|
|
-# # final_score = sem_weight * sem_sim + lex_weight * lex_score
|
|
|
|
|
-# # scores.append((val, final_score))
|
|
|
|
|
-
|
|
|
|
|
-# # scores.sort(key=lambda x: x[1], reverse=True)
|
|
|
|
|
-# # best_val, best_score = scores[0]
|
|
|
|
|
-
|
|
|
|
|
-# # # Calculate adaptive margin if enabled
|
|
|
|
|
-# # effective_margin = margin
|
|
|
|
|
-# # if allow_multiple and use_adaptive_margin:
|
|
|
|
|
-# # effective_margin = ProductAttributeService.get_adaptive_margin(scores, margin)
|
|
|
|
|
-
|
|
|
|
|
-# # # Special handling for dimension attributes with exact matches
|
|
|
|
|
-# # # If we have a very high score (0.90+), it means we found an exact/normalized match
|
|
|
|
|
-# # # In this case, don't apply multiple selection logic - just return the best match
|
|
|
|
|
-# # if is_dimension_attr and best_score >= 0.90:
|
|
|
|
|
-# # source = ProductAttributeService.find_value_source(best_val, source_map, attr)
|
|
|
|
|
-# # extracted[attr] = [{"value": best_val, "source": source}]
|
|
|
|
|
-# # continue
|
|
|
|
|
-
|
|
|
|
|
-# # if not allow_multiple:
|
|
|
|
|
-# # source = ProductAttributeService.find_value_source(best_val, source_map, attr)
|
|
|
|
|
-# # extracted[attr] = [{"value": best_val, "source": source}]
|
|
|
|
|
-# # else:
|
|
|
|
|
-# # candidates = [best_val]
|
|
|
|
|
-# # use_base_threshold = best_score >= threshold_abs
|
|
|
|
|
-
|
|
|
|
|
-# # # Get semantic clusters if enabled
|
|
|
|
|
-# # clusters = []
|
|
|
|
|
-# # if use_semantic_clustering:
|
|
|
|
|
-# # clusters = ProductAttributeService.calculate_value_clusters(
|
|
|
|
|
-# # allowed_values, scores, cluster_threshold=0.4
|
|
|
|
|
-# # )
|
|
|
|
|
-# # best_cluster = next((c for c in clusters if best_val in c), [best_val])
|
|
|
|
|
-
|
|
|
|
|
-# # for val, sc in scores[1:]:
|
|
|
|
|
-# # # Skip values with very low scores
|
|
|
|
|
-# # min_score = 0.4 if is_dimension_attr else 0.3
|
|
|
|
|
-# # if sc < min_score:
|
|
|
|
|
-# # continue
|
|
|
|
|
-
|
|
|
|
|
-# # # Calculate dynamic threshold for this value
|
|
|
|
|
-# # if use_dynamic_thresholds and extracted_attrs:
|
|
|
|
|
-# # dynamic_thresh = ProductAttributeService.get_dynamic_threshold(
|
|
|
|
|
-# # attr, val, sc, extracted_attrs, relationships,
|
|
|
|
|
-# # mandatory_attrs, threshold_abs
|
|
|
|
|
-# # )
|
|
|
|
|
-# # else:
|
|
|
|
|
-# # dynamic_thresh = threshold_abs
|
|
|
|
|
-
|
|
|
|
|
-# # within_margin = (best_score - sc) <= effective_margin
|
|
|
|
|
-# # above_threshold = sc >= dynamic_thresh
|
|
|
|
|
-
|
|
|
|
|
-# # # Check if in same semantic cluster as best value
|
|
|
|
|
-# # in_cluster = False
|
|
|
|
|
-# # if use_semantic_clustering and clusters:
|
|
|
|
|
-# # in_cluster = any(best_val in c and val in c for c in clusters)
|
|
|
|
|
-
|
|
|
|
|
-# # if use_base_threshold:
|
|
|
|
|
-# # # Best score is good, require threshold OR (cluster + margin)
|
|
|
|
|
-# # if above_threshold and within_margin:
|
|
|
|
|
-# # candidates.append(val)
|
|
|
|
|
-# # elif in_cluster and within_margin:
|
|
|
|
|
-# # candidates.append(val)
|
|
|
|
|
-# # else:
|
|
|
|
|
-# # # Best score is low, use margin OR cluster logic
|
|
|
|
|
-# # if within_margin:
|
|
|
|
|
-# # candidates.append(val)
|
|
|
|
|
-# # elif in_cluster and (best_score - sc) <= effective_margin * 2.0:
|
|
|
|
|
-# # # Extended margin for cluster members
|
|
|
|
|
-# # candidates.append(val)
|
|
|
|
|
-
|
|
|
|
|
-# # # Map each candidate to its source and create array format
|
|
|
|
|
-# # extracted[attr] = []
|
|
|
|
|
-# # for candidate in candidates:
|
|
|
|
|
-# # source = ProductAttributeService.find_value_source(candidate, source_map, attr)
|
|
|
|
|
-# # extracted[attr].append({"value": candidate, "source": source})
|
|
|
|
|
-
|
|
|
|
|
-# # return extracted
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-# # @staticmethod
|
|
|
|
|
-# # def extract_attributes(
|
|
|
|
|
-# # product_text: str,
|
|
|
|
|
-# # mandatory_attrs: Dict[str, List[str]],
|
|
|
|
|
-# # source_map: Dict[str, str] = None,
|
|
|
|
|
-# # model: str = None,
|
|
|
|
|
-# # extract_additional: bool = True,
|
|
|
|
|
-# # multiple: Optional[List[str]] = None,
|
|
|
|
|
-# # threshold_abs: float = 0.65,
|
|
|
|
|
-# # margin: float = 0.15,
|
|
|
|
|
-# # use_dynamic_thresholds: bool = True,
|
|
|
|
|
-# # use_adaptive_margin: bool = True,
|
|
|
|
|
-# # use_semantic_clustering: bool = True
|
|
|
|
|
-# # ) -> dict:
|
|
|
|
|
-# # """
|
|
|
|
|
-# # Use Groq LLM to extract attributes from any product type with enhanced multi-value selection.
|
|
|
|
|
-# # Now returns values in array format: [{"value": "...", "source": "..."}]
|
|
|
|
|
-# # """
|
|
|
|
|
-
|
|
|
|
|
-# # if model is None:
|
|
|
|
|
-# # model = settings.SUPPORTED_MODELS[0]
|
|
|
|
|
-
|
|
|
|
|
-# # if multiple is None:
|
|
|
|
|
-# # multiple = []
|
|
|
|
|
-
|
|
|
|
|
-# # if source_map is None:
|
|
|
|
|
-# # source_map = {}
|
|
|
|
|
-
|
|
|
|
|
-# # # Check if product text is empty or minimal
|
|
|
|
|
-# # if not product_text or product_text == "No product information available":
|
|
|
|
|
-# # return ProductAttributeService._create_error_response(
|
|
|
|
|
-# # "No product information provided",
|
|
|
|
|
-# # mandatory_attrs,
|
|
|
|
|
-# # extract_additional
|
|
|
|
|
-# # )
|
|
|
|
|
-
|
|
|
|
|
-# # # Create structured prompt for mandatory attributes
|
|
|
|
|
-# # mandatory_attr_list = []
|
|
|
|
|
-# # for attr_name, allowed_values in mandatory_attrs.items():
|
|
|
|
|
-# # mandatory_attr_list.append(f"{attr_name}: {', '.join(allowed_values)}")
|
|
|
|
|
-# # mandatory_attr_text = "\n".join(mandatory_attr_list)
|
|
|
|
|
-
|
|
|
|
|
-# # additional_instruction = ""
|
|
|
|
|
-# # if extract_additional:
|
|
|
|
|
-# # additional_instruction = """
|
|
|
|
|
-# # 2. Extract ADDITIONAL attributes: Identify any other relevant attributes from the product text
|
|
|
|
|
-# # that are NOT in the mandatory list. Only include attributes where you can find actual values
|
|
|
|
|
-# # in the product text. Do NOT include attributes with "Not Specified" or empty values.
|
|
|
|
|
-
|
|
|
|
|
-# # Examples of attributes to look for (only if present): Brand, Material, Size, Color, Dimensions,
|
|
|
|
|
-# # Weight, Features, Style, Theme, Pattern, Finish, Care Instructions, etc."""
|
|
|
|
|
-
|
|
|
|
|
-# # output_format = {
|
|
|
|
|
-# # "mandatory": {attr: "value or list of values" for attr in mandatory_attrs.keys()},
|
|
|
|
|
-# # }
|
|
|
|
|
-
|
|
|
|
|
-# # if extract_additional:
|
|
|
|
|
-# # output_format["additional"] = {
|
|
|
|
|
-# # "example_attribute_1": "actual value found",
|
|
|
|
|
-# # "example_attribute_2": "actual value found"
|
|
|
|
|
-# # }
|
|
|
|
|
-# # output_format["additional"]["_note"] = "Only include attributes with actual values found in text"
|
|
|
|
|
-
|
|
|
|
|
-# # prompt = f"""
|
|
|
|
|
-# # You are an intelligent product attribute extractor that works with ANY product type.
|
|
|
|
|
-
|
|
|
|
|
-# # TASK:
|
|
|
|
|
-# # 1. Extract MANDATORY attributes: For each mandatory attribute, select the most appropriate value(s)
|
|
|
|
|
-# # from the provided list. Choose the value(s) that best match the product description.
|
|
|
|
|
-# # {additional_instruction}
|
|
|
|
|
-
|
|
|
|
|
-# # Product Text:
|
|
|
|
|
-# # {product_text}
|
|
|
|
|
-
|
|
|
|
|
-# # Mandatory Attribute Lists (MUST select from these allowed values):
|
|
|
|
|
-# # {mandatory_attr_text}
|
|
|
|
|
-
|
|
|
|
|
-# # CRITICAL INSTRUCTIONS:
|
|
|
|
|
-# # - Return ONLY valid JSON, nothing else
|
|
|
|
|
-# # - No explanations, no markdown, no text before or after the JSON
|
|
|
|
|
-# # - For mandatory attributes, choose the value(s) from the provided list that best match
|
|
|
|
|
-# # - If a mandatory attribute cannot be determined from the product text, use "Not Specified"
|
|
|
|
|
-# # - Prefer exact matches from the allowed values list over generic synonyms
|
|
|
|
|
-# # - If multiple values are plausible, you MAY return more than one
|
|
|
|
|
-# # {f"- For additional attributes: ONLY include attributes where you found actual values in the product text. DO NOT include attributes with 'Not Specified', 'None', 'N/A', or empty values. If you cannot find a value for an attribute, simply don't include that attribute." if extract_additional else ""}
|
|
|
|
|
-# # - Be precise and only extract information that is explicitly stated or clearly implied
|
|
|
|
|
-
|
|
|
|
|
-# # Required Output Format:
|
|
|
|
|
-# # {json.dumps(output_format, indent=2)}
|
|
|
|
|
-# # """
|
|
|
|
|
-
|
|
|
|
|
-# # payload = {
|
|
|
|
|
-# # "model": model,
|
|
|
|
|
-# # "messages": [
|
|
|
|
|
-# # {
|
|
|
|
|
-# # "role": "system",
|
|
|
|
|
-# # "content": f"You are a precise attribute extraction model. Return ONLY valid JSON with {'mandatory and additional' if extract_additional else 'mandatory'} sections. No explanations, no markdown, no other text."
|
|
|
|
|
-# # },
|
|
|
|
|
-# # {"role": "user", "content": prompt}
|
|
|
|
|
-# # ],
|
|
|
|
|
-# # "temperature": 0.0,
|
|
|
|
|
-# # "max_tokens": 1500
|
|
|
|
|
-# # }
|
|
|
|
|
-
|
|
|
|
|
-# # headers = {
|
|
|
|
|
-# # "Authorization": f"Bearer {settings.GROQ_API_KEY}",
|
|
|
|
|
-# # "Content-Type": "application/json",
|
|
|
|
|
-# # }
|
|
|
|
|
-
|
|
|
|
|
-# # try:
|
|
|
|
|
-# # response = requests.post(
|
|
|
|
|
-# # settings.GROQ_API_URL,
|
|
|
|
|
-# # headers=headers,
|
|
|
|
|
-# # json=payload,
|
|
|
|
|
-# # timeout=30
|
|
|
|
|
-# # )
|
|
|
|
|
-# # response.raise_for_status()
|
|
|
|
|
-# # result_text = response.json()["choices"][0]["message"]["content"].strip()
|
|
|
|
|
-
|
|
|
|
|
-# # # Clean the response
|
|
|
|
|
-# # result_text = ProductAttributeService._clean_json_response(result_text)
|
|
|
|
|
-
|
|
|
|
|
-# # # Parse JSON
|
|
|
|
|
-# # parsed = json.loads(result_text)
|
|
|
|
|
-
|
|
|
|
|
-# # # Validate and restructure with source tracking
|
|
|
|
|
-# # parsed = ProductAttributeService._validate_response_structure(
|
|
|
|
|
-# # parsed, mandatory_attrs, extract_additional, source_map
|
|
|
|
|
-# # )
|
|
|
|
|
-
|
|
|
|
|
-# # # Clean up and add source tracking to additional attributes in array format
|
|
|
|
|
-# # if extract_additional and "additional" in parsed:
|
|
|
|
|
-# # cleaned_additional = {}
|
|
|
|
|
-# # for k, v in parsed["additional"].items():
|
|
|
|
|
-# # if v and v not in ["Not Specified", "None", "N/A", "", "not specified", "none", "n/a"]:
|
|
|
|
|
-# # if not (isinstance(v, str) and v.lower() in ["not specified", "none", "n/a", ""]):
|
|
|
|
|
-# # # Convert to array format if not already
|
|
|
|
|
-# # if isinstance(v, list):
|
|
|
|
|
-# # cleaned_additional[k] = []
|
|
|
|
|
-# # for item in v:
|
|
|
|
|
-# # if isinstance(item, dict) and "value" in item:
|
|
|
|
|
-# # if "source" not in item:
|
|
|
|
|
-# # item["source"] = ProductAttributeService.find_value_source(
|
|
|
|
|
-# # item["value"], source_map, k
|
|
|
|
|
-# # )
|
|
|
|
|
-# # cleaned_additional[k].append(item)
|
|
|
|
|
-# # else:
|
|
|
|
|
-# # source = ProductAttributeService.find_value_source(str(item), source_map, k)
|
|
|
|
|
-# # cleaned_additional[k].append({"value": str(item), "source": source})
|
|
|
|
|
-# # else:
|
|
|
|
|
-# # source = ProductAttributeService.find_value_source(str(v), source_map, k)
|
|
|
|
|
-# # cleaned_additional[k] = [{"value": str(v), "source": source}]
|
|
|
|
|
-# # parsed["additional"] = cleaned_additional
|
|
|
|
|
-
|
|
|
|
|
-# # # Calculate attribute relationships if using dynamic thresholds
|
|
|
|
|
-# # relationships = {}
|
|
|
|
|
-# # if use_dynamic_thresholds:
|
|
|
|
|
-# # relationships = ProductAttributeService.calculate_attribute_relationships(
|
|
|
|
|
-# # mandatory_attrs, product_text
|
|
|
|
|
-# # )
|
|
|
|
|
-
|
|
|
|
|
-# # # Process attributes in order, allowing earlier ones to influence later ones
|
|
|
|
|
-# # extracted_so_far = {}
|
|
|
|
|
-# # for attr in mandatory_attrs.keys():
|
|
|
|
|
-# # allow_multiple = attr in multiple
|
|
|
|
|
-
|
|
|
|
|
-# # result = ProductAttributeService.normalize_against_product_text(
|
|
|
|
|
-# # product_text=product_text,
|
|
|
|
|
-# # mandatory_attrs={attr: mandatory_attrs[attr]},
|
|
|
|
|
-# # source_map=source_map,
|
|
|
|
|
-# # threshold_abs=threshold_abs,
|
|
|
|
|
-# # margin=margin,
|
|
|
|
|
-# # allow_multiple=allow_multiple,
|
|
|
|
|
-# # extracted_attrs=extracted_so_far,
|
|
|
|
|
-# # relationships=relationships,
|
|
|
|
|
-# # use_dynamic_thresholds=use_dynamic_thresholds,
|
|
|
|
|
-# # use_adaptive_margin=use_adaptive_margin,
|
|
|
|
|
-# # use_semantic_clustering=use_semantic_clustering
|
|
|
|
|
-# # )
|
|
|
|
|
-
|
|
|
|
|
-# # # Result is already in array format from normalize_against_product_text
|
|
|
|
|
-# # parsed["mandatory"][attr] = result[attr]
|
|
|
|
|
-# # extracted_so_far[attr] = result[attr]
|
|
|
|
|
-
|
|
|
|
|
-# # return parsed
|
|
|
|
|
-
|
|
|
|
|
-# # except requests.exceptions.RequestException as e:
|
|
|
|
|
-# # return ProductAttributeService._create_error_response(
|
|
|
|
|
-# # str(e), mandatory_attrs, extract_additional
|
|
|
|
|
-# # )
|
|
|
|
|
-# # except json.JSONDecodeError as e:
|
|
|
|
|
-# # return ProductAttributeService._create_error_response(
|
|
|
|
|
-# # f"Invalid JSON: {str(e)}", mandatory_attrs, extract_additional, result_text
|
|
|
|
|
-# # )
|
|
|
|
|
-# # except Exception as e:
|
|
|
|
|
-# # return ProductAttributeService._create_error_response(
|
|
|
|
|
-# # str(e), mandatory_attrs, extract_additional
|
|
|
|
|
-# # )
|
|
|
|
|
-
|
|
|
|
|
-# # @staticmethod
|
|
|
|
|
-# # def extract_attributes_batch(
|
|
|
|
|
-# # products: List[Dict],
|
|
|
|
|
-# # mandatory_attrs: Dict[str, List[str]],
|
|
|
|
|
-# # model: str = None,
|
|
|
|
|
-# # extract_additional: bool = True,
|
|
|
|
|
-# # process_image: bool = True,
|
|
|
|
|
-# # max_workers: int = 5,
|
|
|
|
|
-# # multiple: Optional[List[str]] = None,
|
|
|
|
|
-# # threshold_abs: float = 0.65,
|
|
|
|
|
-# # margin: float = 0.15,
|
|
|
|
|
-# # use_dynamic_thresholds: bool = True,
|
|
|
|
|
-# # use_adaptive_margin: bool = True,
|
|
|
|
|
-# # use_semantic_clustering: bool = True
|
|
|
|
|
-# # ) -> Dict:
|
|
|
|
|
-# # """Extract attributes for multiple products in parallel with enhanced multi-value selection and source tracking."""
|
|
|
|
|
-# # results = []
|
|
|
|
|
-# # successful = 0
|
|
|
|
|
-# # failed = 0
|
|
|
|
|
-
|
|
|
|
|
-# # ocr_service = OCRService()
|
|
|
|
|
-
|
|
|
|
|
-# # if multiple is None:
|
|
|
|
|
-# # multiple = []
|
|
|
|
|
-
|
|
|
|
|
-# # def process_product(product_data):
|
|
|
|
|
-# # """Process a single product."""
|
|
|
|
|
-# # product_id = product_data.get('product_id', f"product_{len(results)}")
|
|
|
|
|
-
|
|
|
|
|
-# # try:
|
|
|
|
|
-# # # Process image if URL is provided
|
|
|
|
|
-# # ocr_results = None
|
|
|
|
|
-# # ocr_text = None
|
|
|
|
|
-
|
|
|
|
|
-# # if process_image and product_data.get('image_url'):
|
|
|
|
|
-# # ocr_results = ocr_service.process_image(product_data['image_url'])
|
|
|
|
|
-
|
|
|
|
|
-# # # Extract attributes from OCR
|
|
|
|
|
-# # if ocr_results and ocr_results.get('detected_text'):
|
|
|
|
|
-# # ocr_attrs = ProductAttributeService.extract_attributes_from_ocr(
|
|
|
|
|
-# # ocr_results, model
|
|
|
|
|
-# # )
|
|
|
|
|
-# # ocr_results['extracted_attributes'] = ocr_attrs
|
|
|
|
|
-
|
|
|
|
|
-# # # Format OCR text for combining with product text
|
|
|
|
|
-# # ocr_text = "\n".join([
|
|
|
|
|
-# # f"{item['text']} (confidence: {item['confidence']:.2f})"
|
|
|
|
|
-# # for item in ocr_results['detected_text']
|
|
|
|
|
-# # ])
|
|
|
|
|
-
|
|
|
|
|
-# # # Combine all product information with source tracking
|
|
|
|
|
-# # product_text, source_map = ProductAttributeService.combine_product_text(
|
|
|
|
|
-# # title=product_data.get('title'),
|
|
|
|
|
-# # short_desc=product_data.get('short_desc'),
|
|
|
|
|
-# # long_desc=product_data.get('long_desc'),
|
|
|
|
|
-# # ocr_text=ocr_text
|
|
|
|
|
-# # )
|
|
|
|
|
-
|
|
|
|
|
-# # # Extract attributes from combined text with enhanced features
|
|
|
|
|
-# # result = ProductAttributeService.extract_attributes(
|
|
|
|
|
-# # product_text=product_text,
|
|
|
|
|
-# # mandatory_attrs=mandatory_attrs,
|
|
|
|
|
-# # source_map=source_map,
|
|
|
|
|
-# # model=model,
|
|
|
|
|
-# # extract_additional=extract_additional,
|
|
|
|
|
-# # multiple=multiple,
|
|
|
|
|
-# # threshold_abs=threshold_abs,
|
|
|
|
|
-# # margin=margin,
|
|
|
|
|
-# # use_dynamic_thresholds=use_dynamic_thresholds,
|
|
|
|
|
-# # use_adaptive_margin=use_adaptive_margin,
|
|
|
|
|
-# # use_semantic_clustering=use_semantic_clustering
|
|
|
|
|
-# # )
|
|
|
|
|
-
|
|
|
|
|
-# # result['product_id'] = product_id
|
|
|
|
|
-
|
|
|
|
|
-# # # Add OCR results if available (already in correct format)
|
|
|
|
|
-# # if ocr_results:
|
|
|
|
|
-# # result['ocr_results'] = ocr_results
|
|
|
|
|
-
|
|
|
|
|
-# # # Check if extraction was successful
|
|
|
|
|
-# # if 'error' not in result:
|
|
|
|
|
-# # return result, True
|
|
|
|
|
-# # else:
|
|
|
|
|
-# # return result, False
|
|
|
|
|
-
|
|
|
|
|
-# # except Exception as e:
|
|
|
|
|
-# # return {
|
|
|
|
|
-# # 'product_id': product_id,
|
|
|
|
|
-# # 'mandatory': {attr: [{"value": "Not Specified", "source": "error"}] for attr in mandatory_attrs.keys()},
|
|
|
|
|
-# # 'additional': {} if extract_additional else None,
|
|
|
|
|
-# # 'error': f"Processing error: {str(e)}"
|
|
|
|
|
-# # }, False
|
|
|
|
|
-
|
|
|
|
|
-# # # Process products in parallel
|
|
|
|
|
-# # with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
|
|
|
-# # future_to_product = {
|
|
|
|
|
-# # executor.submit(process_product, product): product
|
|
|
|
|
-# # for product in products
|
|
|
|
|
-# # }
|
|
|
|
|
-
|
|
|
|
|
-# # for future in as_completed(future_to_product):
|
|
|
|
|
-# # try:
|
|
|
|
|
-# # result, success = future.result()
|
|
|
|
|
-# # results.append(result)
|
|
|
|
|
-# # if success:
|
|
|
|
|
-# # successful += 1
|
|
|
|
|
-# # else:
|
|
|
|
|
-# # failed += 1
|
|
|
|
|
-# # except Exception as e:
|
|
|
|
|
-# # failed += 1
|
|
|
|
|
-# # results.append({
|
|
|
|
|
-# # 'product_id': 'unknown',
|
|
|
|
|
-# # 'mandatory': {attr: [{"value": "Not Specified", "source": "error"}] for attr in mandatory_attrs.keys()},
|
|
|
|
|
-# # 'additional': {} if extract_additional else None,
|
|
|
|
|
-# # 'error': f"Unexpected error: {str(e)}"
|
|
|
|
|
-# # })
|
|
|
|
|
-
|
|
|
|
|
-# # return {
|
|
|
|
|
-# # 'results': results,
|
|
|
|
|
-# # 'total_products': len(products),
|
|
|
|
|
-# # 'successful': successful,
|
|
|
|
|
-# # 'failed': failed
|
|
|
|
|
-# # }
|
|
|
|
|
-
|
|
|
|
|
-# # @staticmethod
|
|
|
|
|
-# # def _clean_json_response(text: str) -> str:
|
|
|
|
|
-# # """Clean LLM response to extract valid JSON."""
|
|
|
|
|
-# # start_idx = text.find('{')
|
|
|
|
|
-# # end_idx = text.rfind('}')
|
|
|
|
|
-
|
|
|
|
|
-# # if start_idx != -1 and end_idx != -1:
|
|
|
|
|
-# # text = text[start_idx:end_idx + 1]
|
|
|
|
|
-
|
|
|
|
|
-# # if "```json" in text:
|
|
|
|
|
-# # text = text.split("```json")[1].split("```")[0].strip()
|
|
|
|
|
-# # elif "```" in text:
|
|
|
|
|
-# # text = text.split("```")[1].split("```")[0].strip()
|
|
|
|
|
-# # if text.startswith("json"):
|
|
|
|
|
-# # text = text[4:].strip()
|
|
|
|
|
-
|
|
|
|
|
-# # return text
|
|
|
|
|
-
|
|
|
|
|
-# # @staticmethod
|
|
|
|
|
-# # def _validate_response_structure(
|
|
|
|
|
-# # parsed: dict,
|
|
|
|
|
-# # mandatory_attrs: Dict[str, List[str]],
|
|
|
|
|
-# # extract_additional: bool,
|
|
|
|
|
-# # source_map: Dict[str, str] = None
|
|
|
|
|
-# # ) -> dict:
|
|
|
|
|
-# # """Validate and fix the response structure, ensuring array format with source tracking."""
|
|
|
|
|
-# # if source_map is None:
|
|
|
|
|
-# # source_map = {}
|
|
|
|
|
-
|
|
|
|
|
-# # expected_sections = ["mandatory"]
|
|
|
|
|
-# # if extract_additional:
|
|
|
|
|
-# # expected_sections.append("additional")
|
|
|
|
|
-
|
|
|
|
|
-# # if not all(section in parsed for section in expected_sections):
|
|
|
|
|
-# # if isinstance(parsed, dict):
|
|
|
|
|
-# # mandatory_keys = set(mandatory_attrs.keys())
|
|
|
|
|
-# # mandatory = {k: v for k, v in parsed.items() if k in mandatory_keys}
|
|
|
|
|
-# # additional = {k: v for k, v in parsed.items() if k not in mandatory_keys}
|
|
|
|
|
-
|
|
|
|
|
-# # result = {"mandatory": mandatory}
|
|
|
|
|
-# # if extract_additional:
|
|
|
|
|
-# # result["additional"] = additional
|
|
|
|
|
-# # parsed = result
|
|
|
|
|
-# # else:
|
|
|
|
|
-# # return ProductAttributeService._create_error_response(
|
|
|
|
|
-# # "Invalid response structure",
|
|
|
|
|
-# # mandatory_attrs,
|
|
|
|
|
-# # extract_additional,
|
|
|
|
|
-# # str(parsed)
|
|
|
|
|
-# # )
|
|
|
|
|
-
|
|
|
|
|
-# # # Convert mandatory attributes to array format with source tracking
|
|
|
|
|
-# # if "mandatory" in parsed:
|
|
|
|
|
-# # converted_mandatory = {}
|
|
|
|
|
-# # for attr, value in parsed["mandatory"].items():
|
|
|
|
|
-# # if isinstance(value, list):
|
|
|
|
|
-# # # Already in array format, ensure each item has source
|
|
|
|
|
-# # converted_mandatory[attr] = []
|
|
|
|
|
-# # for item in value:
|
|
|
|
|
-# # if isinstance(item, dict) and "value" in item:
|
|
|
|
|
-# # # Already has proper structure
|
|
|
|
|
-# # if "source" not in item:
|
|
|
|
|
-# # item["source"] = ProductAttributeService.find_value_source(
|
|
|
|
|
-# # item["value"], source_map, attr
|
|
|
|
|
-# # )
|
|
|
|
|
-# # converted_mandatory[attr].append(item)
|
|
|
|
|
-# # else:
|
|
|
|
|
-# # # Convert string to proper format
|
|
|
|
|
-# # source = ProductAttributeService.find_value_source(str(item), source_map, attr)
|
|
|
|
|
-# # converted_mandatory[attr].append({"value": str(item), "source": source})
|
|
|
|
|
-# # else:
|
|
|
|
|
-# # # Single value - convert to array format
|
|
|
|
|
-# # source = ProductAttributeService.find_value_source(str(value), source_map, attr)
|
|
|
|
|
-# # converted_mandatory[attr] = [{"value": str(value), "source": source}]
|
|
|
|
|
-
|
|
|
|
|
-# # parsed["mandatory"] = converted_mandatory
|
|
|
|
|
-
|
|
|
|
|
-# # return parsed
|
|
|
|
|
-
|
|
|
|
|
-# # @staticmethod
|
|
|
|
|
-# # def _create_error_response(
|
|
|
|
|
-# # error: str,
|
|
|
|
|
-# # mandatory_attrs: Dict[str, List[str]],
|
|
|
|
|
-# # extract_additional: bool,
|
|
|
|
|
-# # raw_output: Optional[str] = None
|
|
|
|
|
-# # ) -> dict:
|
|
|
|
|
-# # """Create a standardized error response in array format."""
|
|
|
|
|
-# # response = {
|
|
|
|
|
-# # "mandatory": {attr: [{"value": "Not Specified", "source": "error"}] for attr in mandatory_attrs.keys()},
|
|
|
|
|
-# # "error": error
|
|
|
|
|
-# # }
|
|
|
|
|
-# # if extract_additional:
|
|
|
|
|
-# # response["additional"] = {}
|
|
|
|
|
-# # if raw_output:
|
|
|
|
|
-# # response["raw_output"] = raw_output
|
|
|
|
|
-# # return response
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-# # ==================== services.py (OPTIMIZED) ====================
|
|
|
|
|
-# import requests
|
|
|
|
|
-# import json
|
|
|
|
|
-# import re
|
|
|
|
|
-# import hashlib
|
|
|
|
|
-# import logging
|
|
|
|
|
-# from typing import Dict, List, Optional, Tuple
|
|
|
|
|
-# from django.conf import settings
|
|
|
|
|
-# from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
|
-# from sentence_transformers import SentenceTransformer, util
|
|
|
|
|
-# import numpy as np
|
|
|
|
|
-# from .ocr_service import OCRService
|
|
|
|
|
-
|
|
|
|
|
-# logger = logging.getLogger(__name__)
|
|
|
|
|
-
|
|
|
|
|
-# # Initialize embedding model for normalization (SINGLETON)
|
|
|
|
|
-# model_embedder = SentenceTransformer("all-MiniLM-L6-v2")
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-# # ==================== CACHING CLASSES ====================
|
|
|
|
|
-
|
|
|
|
|
-# class SimpleCache:
|
|
|
|
|
-# """
|
|
|
|
|
-# In-memory cache for attribute extraction results.
|
|
|
|
|
-# No Redis required - uses Python dict with automatic size management.
|
|
|
|
|
-# """
|
|
|
|
|
-# _cache = {}
|
|
|
|
|
-# _max_size = 1000 # Maximum number of cached items
|
|
|
|
|
-
|
|
|
|
|
-# @classmethod
|
|
|
|
|
-# def get(cls, key: str) -> Optional[Dict]:
|
|
|
|
|
-# """Get cached value by key"""
|
|
|
|
|
-# return cls._cache.get(key)
|
|
|
|
|
-
|
|
|
|
|
-# @classmethod
|
|
|
|
|
-# def set(cls, key: str, value: Dict):
|
|
|
|
|
-# """Set cache value with automatic LRU cleanup"""
|
|
|
|
|
-# # Simple LRU: clear oldest 20% if cache is full
|
|
|
|
|
-# if len(cls._cache) >= cls._max_size:
|
|
|
|
|
-# items = list(cls._cache.items())
|
|
|
|
|
-# # Keep newest 80%
|
|
|
|
|
-# cls._cache = dict(items[int(cls._max_size * 0.2):])
|
|
|
|
|
-# logger.info(f"Cache cleaned: kept {len(cls._cache)} items")
|
|
|
|
|
-
|
|
|
|
|
-# cls._cache[key] = value
|
|
|
|
|
-
|
|
|
|
|
-# @classmethod
|
|
|
|
|
-# def clear(cls):
|
|
|
|
|
-# """Clear entire cache"""
|
|
|
|
|
-# cls._cache.clear()
|
|
|
|
|
-# logger.info("Cache cleared")
|
|
|
|
|
-
|
|
|
|
|
-# @classmethod
|
|
|
|
|
-# def get_stats(cls) -> Dict:
|
|
|
|
|
-# """Get cache statistics"""
|
|
|
|
|
-# return {
|
|
|
|
|
-# "size": len(cls._cache),
|
|
|
|
|
-# "max_size": cls._max_size,
|
|
|
|
|
-# "usage_percent": round(len(cls._cache) / cls._max_size * 100, 2)
|
|
|
|
|
-# }
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-# class EmbeddingCache:
|
|
|
|
|
-# """
|
|
|
|
|
-# Cache for sentence transformer embeddings.
|
|
|
|
|
-# Significantly reduces embedding computation time.
|
|
|
|
|
-# """
|
|
|
|
|
-# _cache = {}
|
|
|
|
|
-# _max_size = 500
|
|
|
|
|
-# _hit_count = 0
|
|
|
|
|
-# _miss_count = 0
|
|
|
|
|
-
|
|
|
|
|
-# @classmethod
|
|
|
|
|
-# def get_embedding(cls, text: str, model):
|
|
|
|
|
-# """Get or compute embedding with caching"""
|
|
|
|
|
-# if text in cls._cache:
|
|
|
|
|
-# cls._hit_count += 1
|
|
|
|
|
-# return cls._cache[text]
|
|
|
|
|
-
|
|
|
|
|
-# # Cache miss - compute embedding
|
|
|
|
|
-# cls._miss_count += 1
|
|
|
|
|
-
|
|
|
|
|
-# # Auto-cleanup if cache is full
|
|
|
|
|
-# if len(cls._cache) >= cls._max_size:
|
|
|
|
|
-# items = list(cls._cache.items())
|
|
|
|
|
-# cls._cache = dict(items[int(cls._max_size * 0.3):])
|
|
|
|
|
-# logger.info(f"Embedding cache cleaned: kept {len(cls._cache)} items")
|
|
|
|
|
-
|
|
|
|
|
-# # Compute and cache
|
|
|
|
|
-# embedding = model.encode(text, convert_to_tensor=True)
|
|
|
|
|
-# cls._cache[text] = embedding
|
|
|
|
|
-# return embedding
|
|
|
|
|
-
|
|
|
|
|
-# @classmethod
|
|
|
|
|
-# def clear(cls):
|
|
|
|
|
-# """Clear embedding cache"""
|
|
|
|
|
-# cls._cache.clear()
|
|
|
|
|
-# cls._hit_count = 0
|
|
|
|
|
-# cls._miss_count = 0
|
|
|
|
|
-# logger.info("Embedding cache cleared")
|
|
|
|
|
-
|
|
|
|
|
-# @classmethod
|
|
|
|
|
-# def get_stats(cls) -> Dict:
|
|
|
|
|
-# """Get cache statistics"""
|
|
|
|
|
-# total = cls._hit_count + cls._miss_count
|
|
|
|
|
-# hit_rate = (cls._hit_count / total * 100) if total > 0 else 0
|
|
|
|
|
-# return {
|
|
|
|
|
-# "size": len(cls._cache),
|
|
|
|
|
-# "max_size": cls._max_size,
|
|
|
|
|
-# "hits": cls._hit_count,
|
|
|
|
|
-# "misses": cls._miss_count,
|
|
|
|
|
-# "hit_rate_percent": round(hit_rate, 2)
|
|
|
|
|
-# }
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-# # ==================== MAIN SERVICE CLASS ====================
|
|
|
|
|
-
|
|
|
|
|
-# class ProductAttributeService:
|
|
|
|
|
-# """Service class for extracting product attributes using Groq LLM."""
|
|
|
|
|
-
|
|
|
|
|
-# @staticmethod
|
|
|
|
|
-# def _generate_cache_key(product_text: str, mandatory_attrs: Dict) -> str:
|
|
|
|
|
-# """
|
|
|
|
|
-# Generate a unique cache key from product text and attributes.
|
|
|
|
|
-# Uses MD5 hash for consistent short keys.
|
|
|
|
|
-# """
|
|
|
|
|
-# # Sort attributes for consistent hashing
|
|
|
|
|
-# attrs_str = json.dumps(mandatory_attrs, sort_keys=True)
|
|
|
|
|
-# content = f"{product_text}:{attrs_str}"
|
|
|
|
|
-# return f"attr_{hashlib.md5(content.encode()).hexdigest()}"
|
|
|
|
|
-
|
|
|
|
|
-# @staticmethod
|
|
|
|
|
-# def normalize_dimension_text(text: str) -> str:
|
|
|
|
|
-# """
|
|
|
|
|
-# Normalize dimension text to match format like '16x20', '20x30', etc.
|
|
|
|
|
-# Handles formats like '16 x 20', '16x1.5x20', '16 x 1.5 x 20 Inches'
|
|
|
|
|
-# Returns the normalized dimension (e.g., '16x20') or empty string if not found.
|
|
|
|
|
-# """
|
|
|
|
|
-# if not text:
|
|
|
|
|
-# return ""
|
|
|
|
|
-
|
|
|
|
|
-# # Convert to lowercase and remove common units
|
|
|
|
|
-# text = text.lower()
|
|
|
|
|
-# text = re.sub(r'\s*(inches|inch|in|cm|centimeters|mm|millimeters)\s*', '', text, flags=re.IGNORECASE)
|
|
|
|
|
-
|
|
|
|
|
-# # Extract all numbers from the text
|
|
|
|
|
-# numbers = re.findall(r'\d+\.?\d*', text)
|
|
|
|
|
-
|
|
|
|
|
-# if not numbers:
|
|
|
|
|
-# return ""
|
|
|
|
|
-
|
|
|
|
|
-# # Convert to floats first to handle decimals properly
|
|
|
|
|
-# float_numbers = []
|
|
|
|
|
-# for num in numbers:
|
|
|
|
|
-# try:
|
|
|
|
|
-# float_numbers.append(float(num))
|
|
|
|
|
-# except:
|
|
|
|
|
-# continue
|
|
|
|
|
-
|
|
|
|
|
-# if len(float_numbers) < 2:
|
|
|
|
|
-# return ""
|
|
|
|
|
-
|
|
|
|
|
-# # If we have 3 dimensions, it's likely Width x Depth x Height
|
|
|
|
|
-# # For wall art, depth is usually small (< 5), so we keep first and last
|
|
|
|
|
-# if len(float_numbers) == 3:
|
|
|
|
|
-# # Keep first and last values (width and height), skip middle (depth)
|
|
|
|
|
-# float_numbers = [float_numbers[0], float_numbers[2]]
|
|
|
|
|
-# elif len(float_numbers) > 3:
|
|
|
|
|
-# # If more than 3 dimensions, keep the two largest
|
|
|
|
|
-# float_numbers = sorted(float_numbers)[-2:]
|
|
|
|
|
-# else:
|
|
|
|
|
-# # Just 2 dimensions, use as is
|
|
|
|
|
-# float_numbers = float_numbers[:2]
|
|
|
|
|
-
|
|
|
|
|
-# # Format numbers: use integer if whole, else one decimal
|
|
|
|
|
-# formatted_numbers = []
|
|
|
|
|
-# for num in float_numbers:
|
|
|
|
|
-# if num.is_integer():
|
|
|
|
|
-# formatted_numbers.append(str(int(num)))
|
|
|
|
|
-# else:
|
|
|
|
|
-# formatted_numbers.append(f"{num:.1f}")
|
|
|
|
|
-
|
|
|
|
|
-# # Sort to ensure consistent order (smaller x larger)
|
|
|
|
|
-# formatted_numbers.sort(key=lambda x: float(x))
|
|
|
|
|
-
|
|
|
|
|
-# # Return formatted dimension
|
|
|
|
|
-# return f"{formatted_numbers[0]}x{formatted_numbers[1]}"
|
|
|
|
|
-
|
|
|
|
|
-# @staticmethod
|
|
|
|
|
-# def normalize_value_for_matching(value: str, attr_name: str = "") -> str:
|
|
|
|
|
-# """
|
|
|
|
|
-# Normalize a value based on its attribute type for better matching.
|
|
|
|
|
-# Currently handles dimensions specially, can be extended for other attributes.
|
|
|
|
|
-# """
|
|
|
|
|
-# # Check if this is a dimension-related attribute
|
|
|
|
|
-# dimension_keywords = ['dimension', 'size', 'measurement']
|
|
|
|
|
-# if any(keyword in attr_name.lower() for keyword in dimension_keywords):
|
|
|
|
|
-# normalized = ProductAttributeService.normalize_dimension_text(value)
|
|
|
|
|
-# if normalized:
|
|
|
|
|
-# return normalized
|
|
|
|
|
-
|
|
|
|
|
-# # For other attributes, just return cleaned value
|
|
|
|
|
-# return value.strip()
|
|
|
|
|
-
|
|
|
|
|
-# @staticmethod
|
|
|
|
|
-# def combine_product_text(
|
|
|
|
|
-# title: Optional[str] = None,
|
|
|
|
|
-# short_desc: Optional[str] = None,
|
|
|
|
|
-# long_desc: Optional[str] = None,
|
|
|
|
|
-# ocr_text: Optional[str] = None
|
|
|
|
|
-# ) -> Tuple[str, Dict[str, str]]:
|
|
|
|
|
-# """
|
|
|
|
|
-# Combine product metadata into a single text block.
|
|
|
|
|
-# Returns: (combined_text, source_map) where source_map tracks which text came from where
|
|
|
|
|
-# """
|
|
|
|
|
-# parts = []
|
|
|
|
|
-# source_map = {}
|
|
|
|
|
-
|
|
|
|
|
-# if title:
|
|
|
|
|
-# title_str = str(title).strip()
|
|
|
|
|
-# parts.append(f"Title: {title_str}")
|
|
|
|
|
-# source_map['title'] = title_str
|
|
|
|
|
-# if short_desc:
|
|
|
|
|
-# short_str = str(short_desc).strip()
|
|
|
|
|
-# parts.append(f"Description: {short_str}")
|
|
|
|
|
-# source_map['short_desc'] = short_str
|
|
|
|
|
-# if long_desc:
|
|
|
|
|
-# long_str = str(long_desc).strip()
|
|
|
|
|
-# parts.append(f"Details: {long_str}")
|
|
|
|
|
-# source_map['long_desc'] = long_str
|
|
|
|
|
-# if ocr_text:
|
|
|
|
|
-# parts.append(f"OCR Text: {ocr_text}")
|
|
|
|
|
-# source_map['ocr_text'] = ocr_text
|
|
|
|
|
-
|
|
|
|
|
-# combined = "\n".join(parts).strip()
|
|
|
|
|
-
|
|
|
|
|
-# if not combined:
|
|
|
|
|
-# return "No product information available", {}
|
|
|
|
|
-
|
|
|
|
|
-# return combined, source_map
|
|
|
|
|
-
|
|
|
|
|
-# @staticmethod
|
|
|
|
|
-# def find_value_source(value: str, source_map: Dict[str, str], attr_name: str = "") -> str:
|
|
|
|
|
-# """
|
|
|
|
|
-# Find which source(s) contain the given value.
|
|
|
|
|
-# Returns the source name(s) where the value appears.
|
|
|
|
|
-# Now handles normalized matching for dimensions.
|
|
|
|
|
-# """
|
|
|
|
|
-# value_lower = value.lower()
|
|
|
|
|
-# # Split value into tokens for better matching
|
|
|
|
|
-# value_tokens = set(value_lower.replace("-", " ").replace("x", " ").split())
|
|
|
|
|
-
|
|
|
|
|
-# # Check if this is a dimension-related attribute
|
|
|
|
|
-# is_dimension_attr = any(keyword in attr_name.lower() for keyword in ['dimension', 'size', 'measurement'])
|
|
|
|
|
-
|
|
|
|
|
-# sources_found = []
|
|
|
|
|
-# source_scores = {}
|
|
|
|
|
-
|
|
|
|
|
-# for source_name, source_text in source_map.items():
|
|
|
|
|
-# source_lower = source_text.lower()
|
|
|
|
|
-
|
|
|
|
|
-# # Check for exact phrase match first
|
|
|
|
|
-# if value_lower in source_lower:
|
|
|
|
|
-# source_scores[source_name] = 1.0
|
|
|
|
|
-# continue
|
|
|
|
|
-
|
|
|
|
|
-# # For dimensions, check normalized match
|
|
|
|
|
-# if is_dimension_attr:
|
|
|
|
|
-# # Normalize the value (e.g., "16x20" stays "16x20")
|
|
|
|
|
-# normalized_value = ProductAttributeService.normalize_dimension_text(value)
|
|
|
|
|
-# if not normalized_value:
|
|
|
|
|
-# normalized_value = value.replace("x", " ").strip()
|
|
|
|
|
-
|
|
|
|
|
-# # Normalize the source text to extract dimensions
|
|
|
|
|
-# normalized_source = ProductAttributeService.normalize_dimension_text(source_text)
|
|
|
|
|
-
|
|
|
|
|
-# # Direct match
|
|
|
|
|
-# if normalized_value == normalized_source:
|
|
|
|
|
-# source_scores[source_name] = 0.95
|
|
|
|
|
-# continue
|
|
|
|
|
-
|
|
|
|
|
-# # Also check if the dimension numbers appear in the source
|
|
|
|
|
-# # Extract dimension parts (e.g., "16x20" -> ["16", "20"])
|
|
|
|
|
-# dim_parts = normalized_value.split("x") if "x" in normalized_value else []
|
|
|
|
|
-# if len(dim_parts) == 2:
|
|
|
|
|
-# # Check if both numbers appear in the source
|
|
|
|
|
-# if all(part in source_text for part in dim_parts):
|
|
|
|
|
-# source_scores[source_name] = 0.85
|
|
|
|
|
-# continue
|
|
|
|
|
-
|
|
|
|
|
-# # Check for token matches
|
|
|
|
|
-# token_matches = sum(1 for token in value_tokens if token and token in source_lower)
|
|
|
|
|
-# if token_matches > 0 and len(value_tokens) > 0:
|
|
|
|
|
-# source_scores[source_name] = token_matches / len(value_tokens)
|
|
|
|
|
-
|
|
|
|
|
-# # Return source with highest score, or all sources if multiple have same score
|
|
|
|
|
-# if source_scores:
|
|
|
|
|
-# max_score = max(source_scores.values())
|
|
|
|
|
-# sources_found = [s for s, score in source_scores.items() if score == max_score]
|
|
|
|
|
-
|
|
|
|
|
-# # Prioritize: title > short_desc > long_desc > ocr_text
|
|
|
|
|
-# priority = ['title', 'short_desc', 'long_desc', 'ocr_text']
|
|
|
|
|
-# for p in priority:
|
|
|
|
|
-# if p in sources_found:
|
|
|
|
|
-# return p
|
|
|
|
|
-
|
|
|
|
|
-# return sources_found[0] if sources_found else "Not found"
|
|
|
|
|
-
|
|
|
|
|
-# return "Not found"
|
|
|
|
|
-
|
|
|
|
|
-# @staticmethod
|
|
|
|
|
-# def format_visual_attributes(visual_attributes: Dict) -> Dict:
|
|
|
|
|
-# """
|
|
|
|
|
-# Convert visual attributes to array format with source tracking.
|
|
|
|
|
-# Source is always 'image' for visual attributes.
|
|
|
|
|
-# """
|
|
|
|
|
-# formatted = {}
|
|
|
|
|
-
|
|
|
|
|
-# for key, value in visual_attributes.items():
|
|
|
|
|
-# if isinstance(value, list):
|
|
|
|
|
-# # Already a list (like color_palette)
|
|
|
|
|
-# formatted[key] = [{"value": str(item), "source": "image"} for item in value]
|
|
|
|
|
-# elif isinstance(value, dict):
|
|
|
|
|
-# # Nested dictionary - format recursively
|
|
|
|
|
-# nested_formatted = {}
|
|
|
|
|
-# for nested_key, nested_value in value.items():
|
|
|
|
|
-# if isinstance(nested_value, list):
|
|
|
|
|
-# nested_formatted[nested_key] = [{"value": str(item), "source": "image"} for item in nested_value]
|
|
|
|
|
-# else:
|
|
|
|
|
-# nested_formatted[nested_key] = [{"value": str(nested_value), "source": "image"}]
|
|
|
|
|
-# formatted[key] = nested_formatted
|
|
|
|
|
-# else:
|
|
|
|
|
-# # Single value
|
|
|
|
|
-# formatted[key] = [{"value": str(value), "source": "image"}]
|
|
|
|
|
-
|
|
|
|
|
-# return formatted
|
|
|
|
|
-
|
|
|
|
|
-# @staticmethod
|
|
|
|
|
-# def extract_attributes_from_ocr(ocr_results: Dict, model: str = None) -> Dict:
|
|
|
|
|
-# """Extract structured attributes from OCR text using LLM."""
|
|
|
|
|
-# if model is None:
|
|
|
|
|
-# model = settings.SUPPORTED_MODELS[0]
|
|
|
|
|
-
|
|
|
|
|
-# detected_text = ocr_results.get('detected_text', [])
|
|
|
|
|
-# if not detected_text:
|
|
|
|
|
-# return {}
|
|
|
|
|
-
|
|
|
|
|
-# # Format OCR text for prompt
|
|
|
|
|
-# ocr_text = "\n".join([f"Text: {item['text']}, Confidence: {item['confidence']:.2f}"
|
|
|
|
|
-# for item in detected_text])
|
|
|
|
|
-
|
|
|
|
|
-# prompt = f"""
|
|
|
|
|
-# You are an AI model that extracts structured attributes from OCR text detected on product images.
|
|
|
|
|
-# Given the OCR detections below, infer the possible product attributes and return them as a clean JSON object.
|
|
|
|
|
-
|
|
|
|
|
-# OCR Text:
|
|
|
|
|
-# {ocr_text}
|
|
|
|
|
-
|
|
|
|
|
-# Extract relevant attributes like:
|
|
|
|
|
-# - brand
|
|
|
|
|
-# - model_number
|
|
|
|
|
-# - size (waist_size, length, etc.)
|
|
|
|
|
-# - collection
|
|
|
|
|
-# - any other relevant product information
|
|
|
|
|
-
|
|
|
|
|
-# Return a JSON object with only the attributes you can confidently identify.
|
|
|
|
|
-# If an attribute is not present, do not include it in the response.
|
|
|
|
|
-# """
|
|
|
|
|
-
|
|
|
|
|
-# payload = {
|
|
|
|
|
-# "model": model,
|
|
|
|
|
-# "messages": [
|
|
|
|
|
-# {
|
|
|
|
|
-# "role": "system",
|
|
|
|
|
-# "content": "You are a helpful AI that extracts structured data from OCR output. Return only valid JSON."
|
|
|
|
|
-# },
|
|
|
|
|
-# {"role": "user", "content": prompt}
|
|
|
|
|
-# ],
|
|
|
|
|
-# "temperature": 0.2,
|
|
|
|
|
-# "max_tokens": 500
|
|
|
|
|
-# }
|
|
|
|
|
-
|
|
|
|
|
-# headers = {
|
|
|
|
|
-# "Authorization": f"Bearer {settings.GROQ_API_KEY}",
|
|
|
|
|
-# "Content-Type": "application/json",
|
|
|
|
|
-# }
|
|
|
|
|
-
|
|
|
|
|
-# try:
|
|
|
|
|
-# response = requests.post(
|
|
|
|
|
-# settings.GROQ_API_URL,
|
|
|
|
|
-# headers=headers,
|
|
|
|
|
-# json=payload,
|
|
|
|
|
-# timeout=30
|
|
|
|
|
-# )
|
|
|
|
|
-# response.raise_for_status()
|
|
|
|
|
-# result_text = response.json()["choices"][0]["message"]["content"].strip()
|
|
|
|
|
-
|
|
|
|
|
-# # Clean and parse JSON
|
|
|
|
|
-# result_text = ProductAttributeService._clean_json_response(result_text)
|
|
|
|
|
-# parsed = json.loads(result_text)
|
|
|
|
|
-
|
|
|
|
|
-# # Convert to array format with source tracking
|
|
|
|
|
-# formatted_attributes = {}
|
|
|
|
|
-# for key, value in parsed.items():
|
|
|
|
|
-# if key == "error":
|
|
|
|
|
-# continue
|
|
|
|
|
-
|
|
|
|
|
-# # Handle nested dictionaries (like size)
|
|
|
|
|
-# if isinstance(value, dict):
|
|
|
|
|
-# nested_formatted = {}
|
|
|
|
|
-# for nested_key, nested_value in value.items():
|
|
|
|
|
-# nested_formatted[nested_key] = [{"value": str(nested_value), "source": "image"}]
|
|
|
|
|
-# formatted_attributes[key] = nested_formatted
|
|
|
|
|
-# elif isinstance(value, list):
|
|
|
|
|
-# # Already a list, convert each item
|
|
|
|
|
-# formatted_attributes[key] = [{"value": str(item), "source": "image"} for item in value]
|
|
|
|
|
-# else:
|
|
|
|
|
-# # Single value
|
|
|
|
|
-# formatted_attributes[key] = [{"value": str(value), "source": "image"}]
|
|
|
|
|
-
|
|
|
|
|
-# return formatted_attributes
|
|
|
|
|
-# except Exception as e:
|
|
|
|
|
-# logger.error(f"OCR attribute extraction failed: {str(e)}")
|
|
|
|
|
-# return {"error": f"Failed to extract attributes from OCR: {str(e)}"}
|
|
|
|
|
-
|
|
|
|
|
-# @staticmethod
|
|
|
|
|
-# def calculate_attribute_relationships(
|
|
|
|
|
-# mandatory_attrs: Dict[str, List[str]],
|
|
|
|
|
-# product_text: str
|
|
|
|
|
-# ) -> Dict[str, float]:
|
|
|
|
|
-# """
|
|
|
|
|
-# Calculate semantic relationships between attribute values across different attributes.
|
|
|
|
|
-# Returns a matrix of cross-attribute value similarities.
|
|
|
|
|
-# """
|
|
|
|
|
-# # USE EMBEDDING CACHE
|
|
|
|
|
-# pt_emb = EmbeddingCache.get_embedding(product_text, model_embedder)
|
|
|
|
|
-
|
|
|
|
|
-# # Calculate similarities between all attribute values and product text
|
|
|
|
|
-# attr_scores = {}
|
|
|
|
|
-# for attr, values in mandatory_attrs.items():
|
|
|
|
|
-# attr_scores[attr] = {}
|
|
|
|
|
-# for val in values:
|
|
|
|
|
-# contexts = [val, f"for {val}", f"use in {val}", f"suitable for {val}"]
|
|
|
|
|
-# # USE EMBEDDING CACHE FOR CONTEXTS
|
|
|
|
|
-# ctx_embs = [EmbeddingCache.get_embedding(c, model_embedder) for c in contexts]
|
|
|
|
|
-# sem_sim = max(float(util.cos_sim(pt_emb, ce).item()) for ce in ctx_embs)
|
|
|
|
|
-# attr_scores[attr][val] = sem_sim
|
|
|
|
|
-
|
|
|
|
|
-# # Calculate cross-attribute value relationships
|
|
|
|
|
-# relationships = {}
|
|
|
|
|
-# attr_list = list(mandatory_attrs.keys())
|
|
|
|
|
-
|
|
|
|
|
-# for i, attr1 in enumerate(attr_list):
|
|
|
|
|
-# for attr2 in attr_list[i+1:]:
|
|
|
|
|
-# # Calculate pairwise similarities between values of different attributes
|
|
|
|
|
-# for val1 in mandatory_attrs[attr1]:
|
|
|
|
|
-# for val2 in mandatory_attrs[attr2]:
|
|
|
|
|
-# # USE EMBEDDING CACHE
|
|
|
|
|
-# emb1 = EmbeddingCache.get_embedding(val1, model_embedder)
|
|
|
|
|
-# emb2 = EmbeddingCache.get_embedding(val2, model_embedder)
|
|
|
|
|
-# sim = float(util.cos_sim(emb1, emb2).item())
|
|
|
|
|
-
|
|
|
|
|
-# # Store bidirectional relationships
|
|
|
|
|
-# key1 = f"{attr1}:{val1}->{attr2}:{val2}"
|
|
|
|
|
-# key2 = f"{attr2}:{val2}->{attr1}:{val1}"
|
|
|
|
|
-# relationships[key1] = sim
|
|
|
|
|
-# relationships[key2] = sim
|
|
|
|
|
-
|
|
|
|
|
-# return relationships
|
|
|
|
|
-
|
|
|
|
|
-# @staticmethod
|
|
|
|
|
-# def calculate_value_clusters(
|
|
|
|
|
-# values: List[str],
|
|
|
|
|
-# scores: List[Tuple[str, float]],
|
|
|
|
|
-# cluster_threshold: float = 0.4
|
|
|
|
|
-# ) -> List[List[str]]:
|
|
|
|
|
-# """
|
|
|
|
|
-# Group values into semantic clusters based on their similarity to each other.
|
|
|
|
|
-# Returns clusters of related values.
|
|
|
|
|
-# """
|
|
|
|
|
-# if len(values) <= 1:
|
|
|
|
|
-# return [[val] for val, _ in scores]
|
|
|
|
|
-
|
|
|
|
|
-# # Get embeddings for all values - USE CACHE
|
|
|
|
|
-# embeddings = [EmbeddingCache.get_embedding(val, model_embedder) for val in values]
|
|
|
|
|
-
|
|
|
|
|
-# # Calculate pairwise similarities
|
|
|
|
|
-# similarity_matrix = np.zeros((len(values), len(values)))
|
|
|
|
|
-# for i in range(len(values)):
|
|
|
|
|
-# for j in range(i+1, len(values)):
|
|
|
|
|
-# sim = float(util.cos_sim(embeddings[i], embeddings[j]).item())
|
|
|
|
|
-# similarity_matrix[i][j] = sim
|
|
|
|
|
-# similarity_matrix[j][i] = sim
|
|
|
|
|
-
|
|
|
|
|
-# # Simple clustering: group values with high similarity
|
|
|
|
|
-# clusters = []
|
|
|
|
|
-# visited = set()
|
|
|
|
|
-
|
|
|
|
|
-# for i, (val, score) in enumerate(scores):
|
|
|
|
|
-# if i in visited:
|
|
|
|
|
-# continue
|
|
|
|
|
-
|
|
|
|
|
-# cluster = [val]
|
|
|
|
|
-# visited.add(i)
|
|
|
|
|
-
|
|
|
|
|
-# # Find similar values
|
|
|
|
|
-# for j in range(len(values)):
|
|
|
|
|
-# if j not in visited and similarity_matrix[i][j] >= cluster_threshold:
|
|
|
|
|
-# cluster.append(values[j])
|
|
|
|
|
-# visited.add(j)
|
|
|
|
|
-
|
|
|
|
|
-# clusters.append(cluster)
|
|
|
|
|
-
|
|
|
|
|
-# return clusters
|
|
|
|
|
-
|
|
|
|
|
-# @staticmethod
|
|
|
|
|
-# def get_dynamic_threshold(
|
|
|
|
|
-# attr: str,
|
|
|
|
|
-# val: str,
|
|
|
|
|
-# base_score: float,
|
|
|
|
|
-# extracted_attrs: Dict[str, List[Dict[str, str]]],
|
|
|
|
|
-# relationships: Dict[str, float],
|
|
|
|
|
-# mandatory_attrs: Dict[str, List[str]],
|
|
|
|
|
-# base_threshold: float = 0.65,
|
|
|
|
|
-# boost_factor: float = 0.15
|
|
|
|
|
-# ) -> float:
|
|
|
|
|
-# """
|
|
|
|
|
-# Calculate dynamic threshold based on relationships with already-extracted attributes.
|
|
|
|
|
-# """
|
|
|
|
|
-# threshold = base_threshold
|
|
|
|
|
-
|
|
|
|
|
-# # Check relationships with already extracted attributes
|
|
|
|
|
-# max_relationship = 0.0
|
|
|
|
|
-# for other_attr, other_values_list in extracted_attrs.items():
|
|
|
|
|
-# if other_attr == attr:
|
|
|
|
|
-# continue
|
|
|
|
|
-
|
|
|
|
|
-# for other_val_dict in other_values_list:
|
|
|
|
|
-# other_val = other_val_dict['value']
|
|
|
|
|
-# key = f"{attr}:{val}->{other_attr}:{other_val}"
|
|
|
|
|
-# if key in relationships:
|
|
|
|
|
-# max_relationship = max(max_relationship, relationships[key])
|
|
|
|
|
-
|
|
|
|
|
-# # If strong relationship exists, lower threshold
|
|
|
|
|
-# if max_relationship > 0.6:
|
|
|
|
|
-# threshold = base_threshold - (boost_factor * max_relationship)
|
|
|
|
|
-
|
|
|
|
|
-# return max(0.3, threshold)
|
|
|
|
|
-
|
|
|
|
|
-# @staticmethod
|
|
|
|
|
-# def get_adaptive_margin(
|
|
|
|
|
-# scores: List[Tuple[str, float]],
|
|
|
|
|
-# base_margin: float = 0.15,
|
|
|
|
|
-# max_margin: float = 0.22
|
|
|
|
|
-# ) -> float:
|
|
|
|
|
-# """
|
|
|
|
|
-# Calculate adaptive margin based on score distribution.
|
|
|
|
|
-# """
|
|
|
|
|
-# if len(scores) < 2:
|
|
|
|
|
-# return base_margin
|
|
|
|
|
-
|
|
|
|
|
-# score_values = [s for _, s in scores]
|
|
|
|
|
-# best_score = score_values[0]
|
|
|
|
|
-
|
|
|
|
|
-# # If best score is very low, use adaptive margin but be more conservative
|
|
|
|
|
-# if best_score < 0.5:
|
|
|
|
|
-# # Calculate score spread in top 3-4 scores only (more selective)
|
|
|
|
|
-# top_scores = score_values[:min(4, len(score_values))]
|
|
|
|
|
-# score_range = max(top_scores) - min(top_scores)
|
|
|
|
|
-
|
|
|
|
|
-# # Very controlled margin increase
|
|
|
|
|
-# if score_range < 0.30:
|
|
|
|
|
-# # Much more conservative scaling
|
|
|
|
|
-# score_factor = (0.5 - best_score) * 0.35
|
|
|
|
|
-# adaptive = base_margin + score_factor + (0.30 - score_range) * 0.2
|
|
|
|
|
-# return min(adaptive, max_margin)
|
|
|
|
|
-
|
|
|
|
|
-# return base_margin
|
|
|
|
|
-
|
|
|
|
|
-# @staticmethod
|
|
|
|
|
-# def _lexical_evidence(product_text: str, label: str) -> float:
|
|
|
|
|
-# """Calculate lexical overlap between product text and label."""
|
|
|
|
|
-# pt = product_text.lower()
|
|
|
|
|
-# tokens = [t for t in label.lower().replace("-", " ").split() if t]
|
|
|
|
|
-# if not tokens:
|
|
|
|
|
-# return 0.0
|
|
|
|
|
-# hits = sum(1 for t in tokens if t in pt)
|
|
|
|
|
-# return hits / len(tokens)
|
|
|
|
|
-
|
|
|
|
|
-# @staticmethod
|
|
|
|
|
-# def normalize_against_product_text(
|
|
|
|
|
-# product_text: str,
|
|
|
|
|
-# mandatory_attrs: Dict[str, List[str]],
|
|
|
|
|
-# source_map: Dict[str, str],
|
|
|
|
|
-# threshold_abs: float = 0.65,
|
|
|
|
|
-# margin: float = 0.15,
|
|
|
|
|
-# allow_multiple: bool = False,
|
|
|
|
|
-# sem_weight: float = 0.8,
|
|
|
|
|
-# lex_weight: float = 0.2,
|
|
|
|
|
-# extracted_attrs: Optional[Dict[str, List[Dict[str, str]]]] = None,
|
|
|
|
|
-# relationships: Optional[Dict[str, float]] = None,
|
|
|
|
|
-# use_dynamic_thresholds: bool = True,
|
|
|
|
|
-# use_adaptive_margin: bool = True,
|
|
|
|
|
-# use_semantic_clustering: bool = True
|
|
|
|
|
-# ) -> dict:
|
|
|
|
|
-# """
|
|
|
|
|
-# Score each allowed value against the product_text with dynamic thresholds.
|
|
|
|
|
-# Returns dict with values in array format: [{"value": "...", "source": "..."}]
|
|
|
|
|
-
|
|
|
|
|
-# ⚡ OPTIMIZED: Uses EmbeddingCache for faster computation
|
|
|
|
|
-# """
|
|
|
|
|
-# if extracted_attrs is None:
|
|
|
|
|
-# extracted_attrs = {}
|
|
|
|
|
-# if relationships is None:
|
|
|
|
|
-# relationships = {}
|
|
|
|
|
-
|
|
|
|
|
-# # USE EMBEDDING CACHE - CRITICAL OPTIMIZATION
|
|
|
|
|
-# pt_emb = EmbeddingCache.get_embedding(product_text, model_embedder)
|
|
|
|
|
-# extracted = {}
|
|
|
|
|
-
|
|
|
|
|
-# for attr, allowed_values in mandatory_attrs.items():
|
|
|
|
|
-# scores: List[Tuple[str, float]] = []
|
|
|
|
|
-
|
|
|
|
|
-# # Check if this is a dimension attribute
|
|
|
|
|
-# is_dimension_attr = any(keyword in attr.lower() for keyword in ['dimension', 'size', 'measurement'])
|
|
|
|
|
-
|
|
|
|
|
-# # Normalize product text once for dimension matching
|
|
|
|
|
-# normalized_product_text = ProductAttributeService.normalize_dimension_text(product_text) if is_dimension_attr else ""
|
|
|
|
|
-
|
|
|
|
|
-# for val in allowed_values:
|
|
|
|
|
-# # For dimension attributes, try exact normalized matching first
|
|
|
|
|
-# if is_dimension_attr:
|
|
|
|
|
-# # Normalize the allowed value from the list
|
|
|
|
|
-# normalized_val = ProductAttributeService.normalize_dimension_text(val)
|
|
|
|
|
-
|
|
|
|
|
-# # If we have both normalized values and they match exactly, give highest score
|
|
|
|
|
-# if normalized_val and normalized_product_text and normalized_val == normalized_product_text:
|
|
|
|
|
-# scores.append((val, 1.0))
|
|
|
|
|
-# continue
|
|
|
|
|
-
|
|
|
|
|
-# # Also check if the normalized value appears in the original product text
|
|
|
|
|
-# if normalized_val:
|
|
|
|
|
-# val_numbers = normalized_val.split('x')
|
|
|
|
|
-# text_lower = product_text.lower()
|
|
|
|
|
-# if all(num in text_lower for num in val_numbers):
|
|
|
|
|
-# idx1 = text_lower.find(val_numbers[0])
|
|
|
|
|
-# idx2 = text_lower.find(val_numbers[1])
|
|
|
|
|
-# if idx1 != -1 and idx2 != -1:
|
|
|
|
|
-# distance = abs(idx2 - idx1)
|
|
|
|
|
-# if distance < 20:
|
|
|
|
|
-# scores.append((val, 0.95))
|
|
|
|
|
-# continue
|
|
|
|
|
-
|
|
|
|
|
-# # Standard semantic matching - USE EMBEDDING CACHE
|
|
|
|
|
-# contexts = [val, f"for {val}", f"use in {val}", f"suitable for {val}", f"{val} room"]
|
|
|
|
|
-# ctx_embs = [EmbeddingCache.get_embedding(c, model_embedder) for c in contexts]
|
|
|
|
|
-# sem_sim = max(float(util.cos_sim(pt_emb, ce).item()) for ce in ctx_embs)
|
|
|
|
|
-
|
|
|
|
|
-# lex_score = ProductAttributeService._lexical_evidence(product_text, val)
|
|
|
|
|
-# final_score = sem_weight * sem_sim + lex_weight * lex_score
|
|
|
|
|
-# scores.append((val, final_score))
|
|
|
|
|
-
|
|
|
|
|
-# scores.sort(key=lambda x: x[1], reverse=True)
|
|
|
|
|
-# best_val, best_score = scores[0]
|
|
|
|
|
-
|
|
|
|
|
-# # Calculate adaptive margin if enabled
|
|
|
|
|
-# effective_margin = margin
|
|
|
|
|
-# if allow_multiple and use_adaptive_margin:
|
|
|
|
|
-# effective_margin = ProductAttributeService.get_adaptive_margin(scores, margin)
|
|
|
|
|
-
|
|
|
|
|
-# # Special handling for dimension attributes with exact matches
|
|
|
|
|
-# if is_dimension_attr and best_score >= 0.90:
|
|
|
|
|
-# source = ProductAttributeService.find_value_source(best_val, source_map, attr)
|
|
|
|
|
-# extracted[attr] = [{"value": best_val, "source": source}]
|
|
|
|
|
-# continue
|
|
|
|
|
-
|
|
|
|
|
-# if not allow_multiple:
|
|
|
|
|
-# source = ProductAttributeService.find_value_source(best_val, source_map, attr)
|
|
|
|
|
-# extracted[attr] = [{"value": best_val, "source": source}]
|
|
|
|
|
-# else:
|
|
|
|
|
-# candidates = [best_val]
|
|
|
|
|
-# use_base_threshold = best_score >= threshold_abs
|
|
|
|
|
-
|
|
|
|
|
-# # Get semantic clusters if enabled
|
|
|
|
|
-# clusters = []
|
|
|
|
|
-# if use_semantic_clustering:
|
|
|
|
|
-# clusters = ProductAttributeService.calculate_value_clusters(
|
|
|
|
|
-# allowed_values, scores, cluster_threshold=0.4
|
|
|
|
|
-# )
|
|
|
|
|
-# best_cluster = next((c for c in clusters if best_val in c), [best_val])
|
|
|
|
|
-
|
|
|
|
|
-# for val, sc in scores[1:]:
|
|
|
|
|
-# min_score = 0.4 if is_dimension_attr else 0.3
|
|
|
|
|
-# if sc < min_score:
|
|
|
|
|
-# continue
|
|
|
|
|
-
|
|
|
|
|
-# if use_dynamic_thresholds and extracted_attrs:
|
|
|
|
|
-# dynamic_thresh = ProductAttributeService.get_dynamic_threshold(
|
|
|
|
|
-# attr, val, sc, extracted_attrs, relationships,
|
|
|
|
|
-# mandatory_attrs, threshold_abs
|
|
|
|
|
-# )
|
|
|
|
|
-# else:
|
|
|
|
|
-# dynamic_thresh = threshold_abs
|
|
|
|
|
-
|
|
|
|
|
-# within_margin = (best_score - sc) <= effective_margin
|
|
|
|
|
-# above_threshold = sc >= dynamic_thresh
|
|
|
|
|
-
|
|
|
|
|
-# in_cluster = False
|
|
|
|
|
-# if use_semantic_clustering and clusters:
|
|
|
|
|
-# in_cluster = any(best_val in c and val in c for c in clusters)
|
|
|
|
|
-
|
|
|
|
|
-# if use_base_threshold:
|
|
|
|
|
-# if above_threshold and within_margin:
|
|
|
|
|
-# candidates.append(val)
|
|
|
|
|
-# elif in_cluster and within_margin:
|
|
|
|
|
-# candidates.append(val)
|
|
|
|
|
-# else:
|
|
|
|
|
-# if within_margin:
|
|
|
|
|
-# candidates.append(val)
|
|
|
|
|
-# elif in_cluster and (best_score - sc) <= effective_margin * 2.0:
|
|
|
|
|
-# candidates.append(val)
|
|
|
|
|
-
|
|
|
|
|
-# extracted[attr] = []
|
|
|
|
|
-# for candidate in candidates:
|
|
|
|
|
-# source = ProductAttributeService.find_value_source(candidate, source_map, attr)
|
|
|
|
|
-# extracted[attr].append({"value": candidate, "source": source})
|
|
|
|
|
-
|
|
|
|
|
-# return extracted
|
|
|
|
|
-
|
|
|
|
|
-# @staticmethod
|
|
|
|
|
-# def extract_attributes(
|
|
|
|
|
-# product_text: str,
|
|
|
|
|
-# mandatory_attrs: Dict[str, List[str]],
|
|
|
|
|
-# source_map: Dict[str, str] = None,
|
|
|
|
|
-# model: str = None,
|
|
|
|
|
-# extract_additional: bool = True,
|
|
|
|
|
-# multiple: Optional[List[str]] = None,
|
|
|
|
|
-# threshold_abs: float = 0.65,
|
|
|
|
|
-# margin: float = 0.15,
|
|
|
|
|
-# use_dynamic_thresholds: bool = True,
|
|
|
|
|
-# use_adaptive_margin: bool = True,
|
|
|
|
|
-# use_semantic_clustering: bool = True,
|
|
|
|
|
-# use_cache: bool = True # ⚡ NEW: Enable/disable caching
|
|
|
|
|
-# ) -> dict:
|
|
|
|
|
-# """
|
|
|
|
|
-# Use Groq LLM to extract attributes from any product type with enhanced multi-value selection.
|
|
|
|
|
-# Now returns values in array format: [{"value": "...", "source": "..."}]
|
|
|
|
|
-
|
|
|
|
|
-# ⚡ OPTIMIZED: Added caching layer for faster repeated requests
|
|
|
|
|
-# """
|
|
|
|
|
-
|
|
|
|
|
-# if model is None:
|
|
|
|
|
-# model = settings.SUPPORTED_MODELS[0]
|
|
|
|
|
-
|
|
|
|
|
-# if multiple is None:
|
|
|
|
|
-# multiple = []
|
|
|
|
|
-
|
|
|
|
|
-# if source_map is None:
|
|
|
|
|
-# source_map = {}
|
|
|
|
|
-
|
|
|
|
|
-# # Check if product text is empty or minimal
|
|
|
|
|
-# if not product_text or product_text == "No product information available":
|
|
|
|
|
-# return ProductAttributeService._create_error_response(
|
|
|
|
|
-# "No product information provided",
|
|
|
|
|
-# mandatory_attrs,
|
|
|
|
|
-# extract_additional
|
|
|
|
|
-# )
|
|
|
|
|
-
|
|
|
|
|
-# # ⚡ CHECK CACHE FIRST
|
|
|
|
|
-# if use_cache:
|
|
|
|
|
-# cache_key = ProductAttributeService._generate_cache_key(product_text, mandatory_attrs)
|
|
|
|
|
-# cached_result = SimpleCache.get(cache_key)
|
|
|
|
|
-# if cached_result:
|
|
|
|
|
-# logger.info(f"✓ Cache hit - returning cached result")
|
|
|
|
|
-# return cached_result
|
|
|
|
|
-
|
|
|
|
|
-# # Create structured prompt for mandatory attributes
|
|
|
|
|
-# mandatory_attr_list = []
|
|
|
|
|
-# for attr_name, allowed_values in mandatory_attrs.items():
|
|
|
|
|
-# mandatory_attr_list.append(f"{attr_name}: {', '.join(allowed_values)}")
|
|
|
|
|
-# mandatory_attr_text = "\n".join(mandatory_attr_list)
|
|
|
|
|
-
|
|
|
|
|
-# additional_instruction = ""
|
|
|
|
|
-# if extract_additional:
|
|
|
|
|
-# additional_instruction = """
|
|
|
|
|
-# 2. Extract ADDITIONAL attributes: Identify any other relevant attributes from the product text
|
|
|
|
|
-# that are NOT in the mandatory list. Only include attributes where you can find actual values
|
|
|
|
|
-# in the product text. Do NOT include attributes with "Not Specified" or empty values.
|
|
|
|
|
-
|
|
|
|
|
-# Examples of attributes to look for (only if present): Brand, Material, Size, Color, Dimensions,
|
|
|
|
|
-# Weight, Features, Style, Theme, Pattern, Finish, Care Instructions, etc."""
|
|
|
|
|
-
|
|
|
|
|
-# output_format = {
|
|
|
|
|
-# "mandatory": {attr: "value or list of values" for attr in mandatory_attrs.keys()},
|
|
|
|
|
-# }
|
|
|
|
|
-
|
|
|
|
|
-# if extract_additional:
|
|
|
|
|
-# output_format["additional"] = {
|
|
|
|
|
-# "example_attribute_1": "actual value found",
|
|
|
|
|
-# "example_attribute_2": "actual value found"
|
|
|
|
|
-# }
|
|
|
|
|
-# output_format["additional"]["_note"] = "Only include attributes with actual values found in text"
|
|
|
|
|
-
|
|
|
|
|
-# prompt = f"""
|
|
|
|
|
-# You are an intelligent product attribute extractor that works with ANY product type.
|
|
|
|
|
-
|
|
|
|
|
-# TASK:
|
|
|
|
|
-# 1. Extract MANDATORY attributes: For each mandatory attribute, select the most appropriate value(s)
|
|
|
|
|
-# from the provided list. Choose the value(s) that best match the product description.
|
|
|
|
|
-# {additional_instruction}
|
|
|
|
|
-
|
|
|
|
|
-# Product Text:
|
|
|
|
|
-# {product_text}
|
|
|
|
|
-
|
|
|
|
|
-# Mandatory Attribute Lists (MUST select from these allowed values):
|
|
|
|
|
-# {mandatory_attr_text}
|
|
|
|
|
-
|
|
|
|
|
-# CRITICAL INSTRUCTIONS:
|
|
|
|
|
-# - Return ONLY valid JSON, nothing else
|
|
|
|
|
-# - No explanations, no markdown, no text before or after the JSON
|
|
|
|
|
-# - For mandatory attributes, choose the value(s) from the provided list that best match
|
|
|
|
|
-# - If a mandatory attribute cannot be determined from the product text, use "Not Specified"
|
|
|
|
|
-# - Prefer exact matches from the allowed values list over generic synonyms
|
|
|
|
|
-# - If multiple values are plausible, you MAY return more than one
|
|
|
|
|
-# {f"- For additional attributes: ONLY include attributes where you found actual values in the product text. DO NOT include attributes with 'Not Specified', 'None', 'N/A', or empty values. If you cannot find a value for an attribute, simply don't include that attribute." if extract_additional else ""}
|
|
|
|
|
-# - Be precise and only extract information that is explicitly stated or clearly implied
|
|
|
|
|
-
|
|
|
|
|
-# Required Output Format:
|
|
|
|
|
-# {json.dumps(output_format, indent=2)}
|
|
|
|
|
-# """
|
|
|
|
|
-
|
|
|
|
|
-# payload = {
|
|
|
|
|
-# "model": model,
|
|
|
|
|
-# "messages": [
|
|
|
|
|
-# {
|
|
|
|
|
-# "role": "system",
|
|
|
|
|
-# "content": f"You are a precise attribute extraction model. Return ONLY valid JSON with {'mandatory and additional' if extract_additional else 'mandatory'} sections. No explanations, no markdown, no other text."
|
|
|
|
|
-# },
|
|
|
|
|
-# {"role": "user", "content": prompt}
|
|
|
|
|
-# ],
|
|
|
|
|
-# "temperature": 0.0,
|
|
|
|
|
-# "max_tokens": 1500
|
|
|
|
|
-# }
|
|
|
|
|
-
|
|
|
|
|
-# headers = {
|
|
|
|
|
-# "Authorization": f"Bearer {settings.GROQ_API_KEY}",
|
|
|
|
|
-# "Content-Type": "application/json",
|
|
|
|
|
-# }
|
|
|
|
|
-
|
|
|
|
|
-# try:
|
|
|
|
|
-# response = requests.post(
|
|
|
|
|
-# settings.GROQ_API_URL,
|
|
|
|
|
-# headers=headers,
|
|
|
|
|
-# json=payload,
|
|
|
|
|
-# timeout=30
|
|
|
|
|
-# )
|
|
|
|
|
-# response.raise_for_status()
|
|
|
|
|
-# result_text = response.json()["choices"][0]["message"]["content"].strip()
|
|
|
|
|
-
|
|
|
|
|
-# # Clean the response
|
|
|
|
|
-# result_text = ProductAttributeService._clean_json_response(result_text)
|
|
|
|
|
-
|
|
|
|
|
-# # Parse JSON
|
|
|
|
|
-# parsed = json.loads(result_text)
|
|
|
|
|
-
|
|
|
|
|
-# # Validate and restructure with source tracking
|
|
|
|
|
-# parsed = ProductAttributeService._validate_response_structure(
|
|
|
|
|
-# parsed, mandatory_attrs, extract_additional, source_map
|
|
|
|
|
-# )
|
|
|
|
|
-
|
|
|
|
|
-# # Clean up and add source tracking to additional attributes in array format
|
|
|
|
|
-# if extract_additional and "additional" in parsed:
|
|
|
|
|
-# cleaned_additional = {}
|
|
|
|
|
-# for k, v in parsed["additional"].items():
|
|
|
|
|
-# if v and v not in ["Not Specified", "None", "N/A", "", "not specified", "none", "n/a"]:
|
|
|
|
|
-# if not (isinstance(v, str) and v.lower() in ["not specified", "none", "n/a", ""]):
|
|
|
|
|
-# # Convert to array format if not already
|
|
|
|
|
-# if isinstance(v, list):
|
|
|
|
|
-# cleaned_additional[k] = []
|
|
|
|
|
-# for item in v:
|
|
|
|
|
-# if isinstance(item, dict) and "value" in item:
|
|
|
|
|
-# if "source" not in item:
|
|
|
|
|
-# item["source"] = ProductAttributeService.find_value_source(
|
|
|
|
|
-# item["value"], source_map, k
|
|
|
|
|
-# )
|
|
|
|
|
-# cleaned_additional[k].append(item)
|
|
|
|
|
-# else:
|
|
|
|
|
-# source = ProductAttributeService.find_value_source(str(item), source_map, k)
|
|
|
|
|
-# cleaned_additional[k].append({"value": str(item), "source": source})
|
|
|
|
|
-# else:
|
|
|
|
|
-# source = ProductAttributeService.find_value_source(str(v), source_map, k)
|
|
|
|
|
-# cleaned_additional[k] = [{"value": str(v), "source": source}]
|
|
|
|
|
-# parsed["additional"] = cleaned_additional
|
|
|
|
|
-
|
|
|
|
|
-# # Calculate attribute relationships if using dynamic thresholds
|
|
|
|
|
-# relationships = {}
|
|
|
|
|
-# if use_dynamic_thresholds:
|
|
|
|
|
-# relationships = ProductAttributeService.calculate_attribute_relationships(
|
|
|
|
|
-# mandatory_attrs, product_text
|
|
|
|
|
-# )
|
|
|
|
|
-
|
|
|
|
|
-# # Process attributes in order, allowing earlier ones to influence later ones
|
|
|
|
|
-# extracted_so_far = {}
|
|
|
|
|
-# for attr in mandatory_attrs.keys():
|
|
|
|
|
-# allow_multiple = attr in multiple
|
|
|
|
|
-
|
|
|
|
|
-# result = ProductAttributeService.normalize_against_product_text(
|
|
|
|
|
-# product_text=product_text,
|
|
|
|
|
-# mandatory_attrs={attr: mandatory_attrs[attr]},
|
|
|
|
|
-# source_map=source_map,
|
|
|
|
|
-# threshold_abs=threshold_abs,
|
|
|
|
|
-# margin=margin,
|
|
|
|
|
-# allow_multiple=allow_multiple,
|
|
|
|
|
-# extracted_attrs=extracted_so_far,
|
|
|
|
|
-# relationships=relationships,
|
|
|
|
|
-# use_dynamic_thresholds=use_dynamic_thresholds,
|
|
|
|
|
-# use_adaptive_margin=use_adaptive_margin,
|
|
|
|
|
-# use_semantic_clustering=use_semantic_clustering
|
|
|
|
|
-# )
|
|
|
|
|
-
|
|
|
|
|
-# # Result is already in array format from normalize_against_product_text
|
|
|
|
|
-# parsed["mandatory"][attr] = result[attr]
|
|
|
|
|
-# extracted_so_far[attr] = result[attr]
|
|
|
|
|
-
|
|
|
|
|
-# # ⚡ CACHE THE RESULT
|
|
|
|
|
-# if use_cache:
|
|
|
|
|
-# SimpleCache.set(cache_key, parsed)
|
|
|
|
|
-# logger.info(f"✓ Cached extraction result")
|
|
|
|
|
-
|
|
|
|
|
-# return parsed
|
|
|
|
|
-
|
|
|
|
|
-# except requests.exceptions.RequestException as e:
|
|
|
|
|
-# logger.error(f"Request exception: {str(e)}")
|
|
|
|
|
-# return ProductAttributeService._create_error_response(
|
|
|
|
|
-# str(e), mandatory_attrs, extract_additional
|
|
|
|
|
-# )
|
|
|
|
|
-# except json.JSONDecodeError as e:
|
|
|
|
|
-# logger.error(f"JSON decode error: {str(e)}")
|
|
|
|
|
-# return ProductAttributeService._create_error_response(
|
|
|
|
|
-# f"Invalid JSON: {str(e)}", mandatory_attrs, extract_additional, result_text
|
|
|
|
|
-# )
|
|
|
|
|
-# except Exception as e:
|
|
|
|
|
-# logger.error(f"Unexpected error: {str(e)}")
|
|
|
|
|
-# return ProductAttributeService._create_error_response(
|
|
|
|
|
-# str(e), mandatory_attrs, extract_additional
|
|
|
|
|
-# )
|
|
|
|
|
-
|
|
|
|
|
-# @staticmethod
|
|
|
|
|
-# def extract_attributes_batch(
|
|
|
|
|
-# products: List[Dict],
|
|
|
|
|
-# mandatory_attrs: Dict[str, List[str]],
|
|
|
|
|
-# model: str = None,
|
|
|
|
|
-# extract_additional: bool = True,
|
|
|
|
|
-# process_image: bool = True,
|
|
|
|
|
-# max_workers: int = 5,
|
|
|
|
|
-# multiple: Optional[List[str]] = None,
|
|
|
|
|
-# threshold_abs: float = 0.65,
|
|
|
|
|
-# margin: float = 0.15,
|
|
|
|
|
-# use_dynamic_thresholds: bool = True,
|
|
|
|
|
-# use_adaptive_margin: bool = True,
|
|
|
|
|
-# use_semantic_clustering: bool = True,
|
|
|
|
|
-# use_cache: bool = True # ⚡ NEW: Enable caching for batch processing
|
|
|
|
|
-# ) -> Dict:
|
|
|
|
|
-# """
|
|
|
|
|
-# Extract attributes for multiple products in parallel with enhanced multi-value selection and source tracking.
|
|
|
|
|
-# ⚡ OPTIMIZED: Added caching support for batch operations
|
|
|
|
|
-# """
|
|
|
|
|
-# results = []
|
|
|
|
|
-# successful = 0
|
|
|
|
|
-# failed = 0
|
|
|
|
|
-
|
|
|
|
|
-# ocr_service = OCRService()
|
|
|
|
|
-
|
|
|
|
|
-# if multiple is None:
|
|
|
|
|
-# multiple = []
|
|
|
|
|
-
|
|
|
|
|
-# def process_product(product_data):
|
|
|
|
|
-# """Process a single product."""
|
|
|
|
|
-# product_id = product_data.get('product_id', f"product_{len(results)}")
|
|
|
|
|
-
|
|
|
|
|
-# try:
|
|
|
|
|
-# # Process image if URL is provided
|
|
|
|
|
-# ocr_results = None
|
|
|
|
|
-# ocr_text = None
|
|
|
|
|
-
|
|
|
|
|
-# if process_image and product_data.get('image_url'):
|
|
|
|
|
-# ocr_results = ocr_service.process_image(product_data['image_url'])
|
|
|
|
|
-
|
|
|
|
|
-# # Extract attributes from OCR
|
|
|
|
|
-# if ocr_results and ocr_results.get('detected_text'):
|
|
|
|
|
-# ocr_attrs = ProductAttributeService.extract_attributes_from_ocr(
|
|
|
|
|
-# ocr_results, model
|
|
|
|
|
-# )
|
|
|
|
|
-# ocr_results['extracted_attributes'] = ocr_attrs
|
|
|
|
|
-
|
|
|
|
|
-# # Format OCR text for combining with product text
|
|
|
|
|
-# ocr_text = "\n".join([
|
|
|
|
|
-# f"{item['text']} (confidence: {item['confidence']:.2f})"
|
|
|
|
|
-# for item in ocr_results['detected_text']
|
|
|
|
|
-# ])
|
|
|
|
|
-
|
|
|
|
|
-# # Combine all product information with source tracking
|
|
|
|
|
-# product_text, source_map = ProductAttributeService.combine_product_text(
|
|
|
|
|
-# title=product_data.get('title'),
|
|
|
|
|
-# short_desc=product_data.get('short_desc'),
|
|
|
|
|
-# long_desc=product_data.get('long_desc'),
|
|
|
|
|
-# ocr_text=ocr_text
|
|
|
|
|
-# )
|
|
|
|
|
-
|
|
|
|
|
-# # Extract attributes from combined text with enhanced features
|
|
|
|
|
-# result = ProductAttributeService.extract_attributes(
|
|
|
|
|
-# product_text=product_text,
|
|
|
|
|
-# mandatory_attrs=mandatory_attrs,
|
|
|
|
|
-# source_map=source_map,
|
|
|
|
|
-# model=model,
|
|
|
|
|
-# extract_additional=extract_additional,
|
|
|
|
|
-# multiple=multiple,
|
|
|
|
|
-# threshold_abs=threshold_abs,
|
|
|
|
|
-# margin=margin,
|
|
|
|
|
-# use_dynamic_thresholds=use_dynamic_thresholds,
|
|
|
|
|
-# use_adaptive_margin=use_adaptive_margin,
|
|
|
|
|
-# use_semantic_clustering=use_semantic_clustering,
|
|
|
|
|
-# use_cache=use_cache # ⚡ Pass cache flag
|
|
|
|
|
-# )
|
|
|
|
|
-
|
|
|
|
|
-# result['product_id'] = product_id
|
|
|
|
|
-
|
|
|
|
|
-# # Add OCR results if available (already in correct format)
|
|
|
|
|
-# if ocr_results:
|
|
|
|
|
-# result['ocr_results'] = ocr_results
|
|
|
|
|
-
|
|
|
|
|
-# # Check if extraction was successful
|
|
|
|
|
-# if 'error' not in result:
|
|
|
|
|
-# return result, True
|
|
|
|
|
-# else:
|
|
|
|
|
-# return result, False
|
|
|
|
|
-
|
|
|
|
|
-# except Exception as e:
|
|
|
|
|
-# logger.error(f"Error processing product {product_id}: {str(e)}")
|
|
|
|
|
-# return {
|
|
|
|
|
-# 'product_id': product_id,
|
|
|
|
|
-# 'mandatory': {attr: [{"value": "Not Specified", "source": "error"}] for attr in mandatory_attrs.keys()},
|
|
|
|
|
-# 'additional': {} if extract_additional else None,
|
|
|
|
|
-# 'error': f"Processing error: {str(e)}"
|
|
|
|
|
-# }, False
|
|
|
|
|
-
|
|
|
|
|
-# # Process products in parallel
|
|
|
|
|
-# with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
|
|
|
-# future_to_product = {
|
|
|
|
|
-# executor.submit(process_product, product): product
|
|
|
|
|
-# for product in products
|
|
|
|
|
-# }
|
|
|
|
|
-
|
|
|
|
|
-# for future in as_completed(future_to_product):
|
|
|
|
|
-# try:
|
|
|
|
|
-# result, success = future.result()
|
|
|
|
|
-# results.append(result)
|
|
|
|
|
-# if success:
|
|
|
|
|
-# successful += 1
|
|
|
|
|
-# else:
|
|
|
|
|
-# failed += 1
|
|
|
|
|
-# except Exception as e:
|
|
|
|
|
-# logger.error(f"Future execution error: {str(e)}")
|
|
|
|
|
-# failed += 1
|
|
|
|
|
-# results.append({
|
|
|
|
|
-# 'product_id': 'unknown',
|
|
|
|
|
-# 'mandatory': {attr: [{"value": "Not Specified", "source": "error"}] for attr in mandatory_attrs.keys()},
|
|
|
|
|
-# 'additional': {} if extract_additional else None,
|
|
|
|
|
-# 'error': f"Unexpected error: {str(e)}"
|
|
|
|
|
-# })
|
|
|
|
|
-
|
|
|
|
|
-# return {
|
|
|
|
|
-# 'results': results,
|
|
|
|
|
-# 'total_products': len(products),
|
|
|
|
|
-# 'successful': successful,
|
|
|
|
|
-# 'failed': failed,
|
|
|
|
|
-# 'cache_stats': SimpleCache.get_stats(), # ⚡ Include cache statistics
|
|
|
|
|
-# 'embedding_cache_stats': EmbeddingCache.get_stats() # ⚡ Include embedding cache stats
|
|
|
|
|
-# }
|
|
|
|
|
-
|
|
|
|
|
-# @staticmethod
|
|
|
|
|
-# def _clean_json_response(text: str) -> str:
|
|
|
|
|
-# """Clean LLM response to extract valid JSON."""
|
|
|
|
|
-# start_idx = text.find('{')
|
|
|
|
|
-# end_idx = text.rfind('}')
|
|
|
|
|
-
|
|
|
|
|
-# if start_idx != -1 and end_idx != -1:
|
|
|
|
|
-# text = text[start_idx:end_idx + 1]
|
|
|
|
|
-
|
|
|
|
|
-# if "```json" in text:
|
|
|
|
|
-# text = text.split("```json")[1].split("```")[0].strip()
|
|
|
|
|
-# elif "```" in text:
|
|
|
|
|
-# text = text.split("```")[1].split("```")[0].strip()
|
|
|
|
|
-# if text.startswith("json"):
|
|
|
|
|
-# text = text[4:].strip()
|
|
|
|
|
-
|
|
|
|
|
-# return text
|
|
|
|
|
-
|
|
|
|
|
-# @staticmethod
|
|
|
|
|
-# def _validate_response_structure(
|
|
|
|
|
-# parsed: dict,
|
|
|
|
|
-# mandatory_attrs: Dict[str, List[str]],
|
|
|
|
|
-# extract_additional: bool,
|
|
|
|
|
-# source_map: Dict[str, str] = None
|
|
|
|
|
-# ) -> dict:
|
|
|
|
|
-# """Validate and fix the response structure, ensuring array format with source tracking."""
|
|
|
|
|
-# if source_map is None:
|
|
|
|
|
-# source_map = {}
|
|
|
|
|
-
|
|
|
|
|
-# expected_sections = ["mandatory"]
|
|
|
|
|
-# if extract_additional:
|
|
|
|
|
-# expected_sections.append("additional")
|
|
|
|
|
-
|
|
|
|
|
-# if not all(section in parsed for section in expected_sections):
|
|
|
|
|
-# if isinstance(parsed, dict):
|
|
|
|
|
-# mandatory_keys = set(mandatory_attrs.keys())
|
|
|
|
|
-# mandatory = {k: v for k, v in parsed.items() if k in mandatory_keys}
|
|
|
|
|
-# additional = {k: v for k, v in parsed.items() if k not in mandatory_keys}
|
|
|
|
|
-
|
|
|
|
|
-# result = {"mandatory": mandatory}
|
|
|
|
|
-# if extract_additional:
|
|
|
|
|
-# result["additional"] = additional
|
|
|
|
|
-# parsed = result
|
|
|
|
|
-# else:
|
|
|
|
|
-# return ProductAttributeService._create_error_response(
|
|
|
|
|
-# "Invalid response structure",
|
|
|
|
|
-# mandatory_attrs,
|
|
|
|
|
-# extract_additional,
|
|
|
|
|
-# str(parsed)
|
|
|
|
|
-# )
|
|
|
|
|
-
|
|
|
|
|
-# # Convert mandatory attributes to array format with source tracking
|
|
|
|
|
-# if "mandatory" in parsed:
|
|
|
|
|
-# converted_mandatory = {}
|
|
|
|
|
-# for attr, value in parsed["mandatory"].items():
|
|
|
|
|
-# if isinstance(value, list):
|
|
|
|
|
-# # Already in array format, ensure each item has source
|
|
|
|
|
-# converted_mandatory[attr] = []
|
|
|
|
|
-# for item in value:
|
|
|
|
|
-# if isinstance(item, dict) and "value" in item:
|
|
|
|
|
-# # Already has proper structure
|
|
|
|
|
-# if "source" not in item:
|
|
|
|
|
-# item["source"] = ProductAttributeService.find_value_source(
|
|
|
|
|
-# item["value"], source_map, attr
|
|
|
|
|
-# )
|
|
|
|
|
-# converted_mandatory[attr].append(item)
|
|
|
|
|
-# else:
|
|
|
|
|
-# # Convert string to proper format
|
|
|
|
|
-# source = ProductAttributeService.find_value_source(str(item), source_map, attr)
|
|
|
|
|
-# converted_mandatory[attr].append({"value": str(item), "source": source})
|
|
|
|
|
-# else:
|
|
|
|
|
-# # Single value - convert to array format
|
|
|
|
|
-# source = ProductAttributeService.find_value_source(str(value), source_map, attr)
|
|
|
|
|
-# converted_mandatory[attr] = [{"value": str(value), "source": source}]
|
|
|
|
|
-
|
|
|
|
|
-# parsed["mandatory"] = converted_mandatory
|
|
|
|
|
-
|
|
|
|
|
-# return parsed
|
|
|
|
|
-
|
|
|
|
|
-# @staticmethod
|
|
|
|
|
-# def _create_error_response(
|
|
|
|
|
-# error: str,
|
|
|
|
|
-# mandatory_attrs: Dict[str, List[str]],
|
|
|
|
|
-# extract_additional: bool,
|
|
|
|
|
-# raw_output: Optional[str] = None
|
|
|
|
|
-# ) -> dict:
|
|
|
|
|
-# """Create a standardized error response in array format."""
|
|
|
|
|
-# response = {
|
|
|
|
|
-# "mandatory": {attr: [{"value": "Not Specified", "source": "error"}] for attr in mandatory_attrs.keys()},
|
|
|
|
|
-# "error": error
|
|
|
|
|
-# }
|
|
|
|
|
-# if extract_additional:
|
|
|
|
|
-# response["additional"] = {}
|
|
|
|
|
-# if raw_output:
|
|
|
|
|
-# response["raw_output"] = raw_output
|
|
|
|
|
-# return response
|
|
|
|
|
-
|
|
|
|
|
-# @staticmethod
|
|
|
|
|
-# def get_cache_stats() -> Dict:
|
|
|
|
|
-# """
|
|
|
|
|
-# Get statistics for both caches.
|
|
|
|
|
-# ⚡ NEW: Utility method to monitor cache performance
|
|
|
|
|
-# """
|
|
|
|
|
-# return {
|
|
|
|
|
-# "simple_cache": SimpleCache.get_stats(),
|
|
|
|
|
-# "embedding_cache": EmbeddingCache.get_stats()
|
|
|
|
|
-# }
|
|
|
|
|
-
|
|
|
|
|
-# @staticmethod
|
|
|
|
|
-# def clear_all_caches():
|
|
|
|
|
-# """
|
|
|
|
|
-# Clear both caches.
|
|
|
|
|
-# ⚡ NEW: Utility method to reset caches when needed
|
|
|
|
|
-# """
|
|
|
|
|
-# SimpleCache.clear()
|
|
|
|
|
-# EmbeddingCache.clear()
|
|
|
|
|
-# logger.info("All caches cleared")
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
|
|
|
|
|
# ==================== services.py (PERFORMANCE OPTIMIZED) ====================
|
|
# ==================== services.py (PERFORMANCE OPTIMIZED) ====================
|
|
|
import requests
|
|
import requests
|