# # attribute_scorer.py (Enhanced) # import re # import time # from typing import Dict, List, Tuple # from rapidfuzz import fuzz, process # import spacy # from collections import defaultdict # import logging # logger = logging.getLogger(__name__) # class AttributeQualityScorer: # """ # Enhanced scorer with AI-powered suggestions and robust error handling # """ # def __init__(self, use_ai: bool = True): # # Load spaCy model # try: # self.nlp = spacy.load("en_core_web_sm") # except Exception as e: # logger.warning(f"spaCy model not loaded: {e}") # self.nlp = None # # Initialize Gemini service # self.use_ai = use_ai # if use_ai: # try: # from .gemini_service import GeminiAttributeService # self.ai_service = GeminiAttributeService() # except Exception as e: # logger.warning(f"Gemini service not available: {e}") # self.use_ai = False # self.ai_service = None # # Enhanced weights # self.weights = { # 'mandatory_fields': 0.40, # 'standardization': 0.30, # 'missing_values': 0.20, # 'consistency': 0.10 # } # def score_product(self, product: Dict, category_rules: List[Dict], generate_ai_suggestions: bool = True) -> Dict: # """ # Enhanced scoring with AI suggestions and guaranteed AI call # """ # start_time = time.time() # attributes = product.get('attributes', {}) # category = product.get('category', '') # # Initialize scores # scores = { # 'mandatory_fields': 0, # 'standardization': 0, # 'missing_values': 0, # 'consistency': 0 # } # issues = [] # suggestions = [] # # --- Mandatory Fields --- # try: # mandatory_score, mandatory_issues, mandatory_suggestions = self._check_mandatory_fields(attributes, category_rules) # scores['mandatory_fields'] = mandatory_score # issues.extend(mandatory_issues) # suggestions.extend(mandatory_suggestions) # except Exception as e: # logger.error(f"[Mandatory Fields] Error: {e}") # scores['mandatory_fields'] = 0 # # --- Standardization --- # try: # std_score, std_issues, std_suggestions = self._check_standardization(attributes, category_rules) # scores['standardization'] = std_score # issues.extend(std_issues) # suggestions.extend(std_suggestions) # except Exception as e: # logger.error(f"[Standardization] Error: {e}") # scores['standardization'] = 0 # # --- Missing Values --- # try: # missing_score, missing_issues, missing_suggestions = self._check_missing_values(attributes, category_rules) # scores['missing_values'] = missing_score # issues.extend(missing_issues) # suggestions.extend(missing_suggestions) # except Exception as e: # logger.error(f"[Missing Values] Error: {e}") # scores['missing_values'] = 0 # # --- Consistency --- # try: # consistency_score, consistency_issues = self._check_consistency(attributes, product.get('title', ''), product.get('description', '')) # scores['consistency'] = consistency_score # issues.extend(consistency_issues) # except Exception as e: # logger.error(f"[Consistency] Error: {e}") # scores['consistency'] = 0 # # --- Final Score --- # final_score = sum(scores[key] * self.weights[key] for key in scores) # result = { # 'final_score': round(final_score, 2), # 'max_score': 100.0, # 'breakdown': scores, # 'issues': issues, # 'suggestions': suggestions, # 'weights': self.weights, # 'processing_time': round(time.time() - start_time, 3) # } # # --- AI Suggestions (Guaranteed Attempt) --- # if generate_ai_suggestions and self.use_ai: # try: # logger.info(f"Generating AI suggestions for SKU: {product.get('sku')}, issues count: {len(issues)}") # ai_suggestions = self.ai_service.generate_attribute_suggestions( # product, # issues, # category_rules # ) # if not ai_suggestions: # logger.warning(f"AI service returned empty suggestions for SKU: {product.get('sku')}") # ai_suggestions = {"note": "No AI suggestions generated"} # result['ai_suggestions'] = ai_suggestions # except Exception as e: # logger.error(f"[AI Suggestions] Error: {e}") # result['ai_suggestions'] = {'error': str(e)} # return result # def _check_mandatory_fields(self, attributes: Dict, rules: List[Dict]) -> Tuple[float, List, List]: # """Enhanced mandatory field validation with detailed feedback""" # mandatory_rules = [r for r in rules if r.get('is_mandatory', False)] # if not mandatory_rules: # return 100.0, [], [] # present_count = 0 # issues = [] # suggestions = [] # for rule in mandatory_rules: # attr_name = rule['attribute_name'] # # Check presence and validity # if attr_name in attributes and attributes[attr_name]: # value = str(attributes[attr_name]).strip() # if not value: # issues.append(f"Mandatory field '{attr_name}' is empty") # suggestions.append(f"Provide a non-empty value for {attr_name}") # continue # # Check length constraints # min_len = rule.get('min_length') # max_len = rule.get('max_length') # if min_len and len(value) < min_len: # issues.append(f"'{attr_name}' too short (min: {min_len} chars)") # suggestions.append(f"Expand {attr_name} to at least {min_len} characters") # continue # if max_len and len(value) > max_len: # issues.append(f"'{attr_name}' too long (max: {max_len} chars)") # suggestions.append(f"Shorten {attr_name} to {max_len} characters or less") # continue # # Check regex pattern # regex = rule.get('validation_regex') # if regex and not re.match(regex, value): # issues.append(f"'{attr_name}' format invalid") # suggestions.append(f"Ensure {attr_name} matches required format") # continue # present_count += 1 # else: # issues.append(f"Missing mandatory field: {attr_name}") # desc = rule.get('description', '') # if desc: # suggestions.append(f"Add {attr_name}: {desc}") # else: # suggestions.append(f"Add required attribute: {attr_name}") # score = (present_count / len(mandatory_rules)) * 100 if mandatory_rules else 100.0 # return score, issues, suggestions # def _check_standardization(self, attributes: Dict, rules: List[Dict]) -> Tuple[float, List, List]: # """Enhanced standardization with better fuzzy matching""" # standardized_rules = [r for r in rules if r.get('valid_values')] # if not standardized_rules: # return 100.0, [], [] # correct_count = 0 # issues = [] # suggestions = [] # for rule in standardized_rules: # attr_name = rule['attribute_name'] # valid_values = rule['valid_values'] # if attr_name not in attributes or not attributes[attr_name]: # continue # actual_value = str(attributes[attr_name]).strip() # if not actual_value: # continue # # Exact match (case-insensitive) # if actual_value in valid_values: # correct_count += 1 # continue # # Case-insensitive match # lower_valid = {v.lower(): v for v in valid_values} # if actual_value.lower() in lower_valid: # correct_count += 1 # if actual_value != lower_valid[actual_value.lower()]: # issues.append(f"{attr_name}: Case mismatch - '{actual_value}' should be '{lower_valid[actual_value.lower()]}'") # suggestions.append(f"Correct capitalization of {attr_name} to: {lower_valid[actual_value.lower()]}") # continue # # Fuzzy matching with multiple scorers # best_match = None # best_score = 0 # for scorer in [fuzz.ratio, fuzz.partial_ratio, fuzz.token_sort_ratio]: # match = process.extractOne(actual_value, valid_values, scorer=scorer) # if match and match[1] > best_score: # best_match = match # best_score = match[1] # if best_match and best_score >= 80: # correct_count += 1 # if best_score < 100: # issues.append(f"{attr_name}: '{actual_value}' likely means '{best_match[0]}' (confidence: {best_score}%)") # suggestions.append(f"Standardize {attr_name} to: {best_match[0]}") # else: # issues.append(f"{attr_name}: '{actual_value}' not recognized. Valid: {', '.join(valid_values[:5])}") # suggestions.append(f"Change {attr_name} to one of: {', '.join(valid_values[:3])}") # score = (correct_count / len(standardized_rules)) * 100 if standardized_rules else 100.0 # return score, issues, suggestions # def _check_missing_values(self, attributes: Dict, rules: List[Dict]) -> Tuple[float, List, List]: # """Enhanced placeholder detection""" # placeholder_patterns = [ # r'^n/?a$', r'^none$', r'^null$', r'^-+$', r'^\.+$', # r'^tbd$', r'^to be determined$', r'^unknown$', r'^na$', # r'^todo$', r'^pending$', r'^\?+$', r'^xxx+$', r'^placeholder$' # ] # total_attrs = len(rules) # valid_count = 0 # issues = [] # suggestions = [] # for rule in rules: # attr_name = rule['attribute_name'] # if attr_name not in attributes: # continue # value = str(attributes[attr_name]).strip() # # Check if empty # if not value: # issues.append(f"'{attr_name}' is empty") # suggestions.append(f"Provide a valid value for {attr_name}") # continue # # Check if placeholder # value_lower = value.lower() # is_placeholder = any(re.match(pattern, value_lower, re.IGNORECASE) for pattern in placeholder_patterns) # if is_placeholder: # issues.append(f"'{attr_name}' contains placeholder: '{value}'") # suggestions.append(f"Replace placeholder in {attr_name} with actual data") # continue # # Check for suspicious patterns # if len(value) < 2 and rule.get('is_mandatory'): # issues.append(f"'{attr_name}' suspiciously short: '{value}'") # suggestions.append(f"Provide more detailed {attr_name}") # continue # valid_count += 1 # score = (valid_count / total_attrs) * 100 if total_attrs > 0 else 100.0 # return score, issues, suggestions # def _check_consistency(self, attributes: Dict, title: str, description: str) -> Tuple[float, List]: # """Enhanced consistency checking with context awareness""" # issues = [] # consistency_count = 0 # total_checks = 0 # check_attrs = ['brand', 'color', 'size', 'material', 'model', 'weight', 'dimensions'] # combined_text = f"{title} {description}".lower() # for attr in check_attrs: # if attr not in attributes or not attributes[attr]: # continue # total_checks += 1 # attr_value = str(attributes[attr]).lower().strip() # # Skip very short values # if len(attr_value) < 2: # consistency_count += 1 # continue # # Direct substring match # if attr_value in combined_text: # consistency_count += 1 # continue # # Word boundary match # words_in_text = set(combined_text.split()) # words_in_attr = set(attr_value.split()) # if words_in_attr.issubset(words_in_text): # consistency_count += 1 # continue # # Fuzzy word matching # text_words = combined_text.split() # matches = 0 # for attr_word in words_in_attr: # match = process.extractOne(attr_word, text_words, scorer=fuzz.ratio) # if match and match[1] >= 80: # matches += 1 # if matches / len(words_in_attr) >= 0.7: # consistency_count += 1 # continue # issues.append(f"'{attr.title()}': '{attributes[attr]}' not clearly mentioned in title/description") # score = (consistency_count / total_checks) * 100 if total_checks > 0 else 100.0 # return score, issues # def extract_attributes_from_text(self, text: str, category: str = '') -> Dict: # """Enhanced attribute extraction with AI fallback""" # extracted = {} # # Try spaCy first # if self.nlp: # extracted = self._extract_with_spacy(text) # # Use AI if available and spaCy found little # if self.use_ai and len(extracted) < 3: # try: # ai_extracted = self.ai_service.extract_attributes_with_ai(text, '', category) # extracted.update({k: v for k, v in ai_extracted.items() if v}) # except Exception as e: # logger.error(f"AI extraction failed: {e}") # return extracted # def _extract_with_spacy(self, text: str) -> Dict: # """Extract using spaCy NER""" # doc = self.nlp(text) # extracted = defaultdict(list) # for ent in doc.ents: # if ent.label_ == 'ORG': # extracted['brand'].append(ent.text) # elif ent.label_ == 'QUANTITY': # extracted['size'].append(ent.text) # elif ent.label_ == 'PRODUCT': # extracted['product_type'].append(ent.text) # # Color detection # colors = ['red', 'blue', 'green', 'black', 'white', 'yellow', 'orange', # 'purple', 'pink', 'brown', 'gray', 'grey', 'silver', 'gold'] # text_lower = text.lower() # for color in colors: # if color in text_lower: # extracted['color'].append(color.title()) # # Return most common value # result = {} # for key, values in extracted.items(): # if values: # result[key] = max(set(values), key=values.count) # return result # attribute_scorer.py (Enhanced with SEO) import re import time from typing import Dict, List, Tuple from rapidfuzz import fuzz, process import spacy from collections import defaultdict import logging logger = logging.getLogger(__name__) class AttributeQualityScorer: """ Enhanced scorer with AI-powered suggestions, robust error handling, and SEO scoring """ def __init__(self, use_ai: bool = True, use_seo: bool = True): # Load spaCy model try: self.nlp = spacy.load("en_core_web_sm") except Exception as e: logger.warning(f"spaCy model not loaded: {e}") self.nlp = None # Initialize Gemini service self.use_ai = use_ai if use_ai: try: from .gemini_service import GeminiAttributeService self.ai_service = GeminiAttributeService() except Exception as e: logger.warning(f"Gemini service not available: {e}") self.use_ai = False self.ai_service = None # Initialize SEO scorer self.use_seo = use_seo if use_seo: try: from .seo_scorer import SEODiscoverabilityScorer self.seo_scorer = SEODiscoverabilityScorer() except Exception as e: logger.warning(f"SEO scorer not available: {e}") self.use_seo = False self.seo_scorer = None # Updated weights to include SEO (total = 100%) self.weights = { 'mandatory_fields': 0.34, # Reduced from 40% -> 34% 'standardization': 0.26, # Reduced from 30% -> 26% 'missing_values': 0.17, # Reduced from 20% -> 17% 'consistency': 0.08, # Reduced from 10% -> 8% 'seo_discoverability': 0.15 # NEW: 15% } def score_product(self, product: Dict, category_rules: List[Dict], generate_ai_suggestions: bool = True) -> Dict: """ Enhanced scoring with AI suggestions, SEO scoring, and guaranteed AI call """ start_time = time.time() attributes = product.get('attributes', {}) category = product.get('category', '') # Initialize scores scores = { 'mandatory_fields': 0, 'standardization': 0, 'missing_values': 0, 'consistency': 0, 'seo_discoverability': 0 } issues = [] suggestions = [] # --- Mandatory Fields --- try: mandatory_score, mandatory_issues, mandatory_suggestions = self._check_mandatory_fields(attributes, category_rules) scores['mandatory_fields'] = mandatory_score issues.extend(mandatory_issues) suggestions.extend(mandatory_suggestions) except Exception as e: logger.error(f"[Mandatory Fields] Error: {e}") scores['mandatory_fields'] = 0 # --- Standardization --- try: std_score, std_issues, std_suggestions = self._check_standardization(attributes, category_rules) scores['standardization'] = std_score issues.extend(std_issues) suggestions.extend(std_suggestions) except Exception as e: logger.error(f"[Standardization] Error: {e}") scores['standardization'] = 0 # --- Missing Values --- try: missing_score, missing_issues, missing_suggestions = self._check_missing_values(attributes, category_rules) scores['missing_values'] = missing_score issues.extend(missing_issues) suggestions.extend(missing_suggestions) except Exception as e: logger.error(f"[Missing Values] Error: {e}") scores['missing_values'] = 0 # --- Consistency --- try: consistency_score, consistency_issues = self._check_consistency(attributes, product.get('title', ''), product.get('description', '')) scores['consistency'] = consistency_score issues.extend(consistency_issues) except Exception as e: logger.error(f"[Consistency] Error: {e}") scores['consistency'] = 0 # --- SEO & Discoverability (NEW) --- seo_result = None if self.use_seo and self.seo_scorer: try: seo_result = self.seo_scorer.score_seo(product, category_rules) scores['seo_discoverability'] = seo_result['seo_score'] issues.extend(seo_result['issues']) suggestions.extend(seo_result['suggestions']) except Exception as e: logger.error(f"[SEO Scoring] Error: {e}") scores['seo_discoverability'] = 0 # --- Final Score --- final_score = sum(scores[key] * self.weights[key] for key in scores) result = { 'final_score': round(final_score, 2), 'max_score': 100.0, 'breakdown': scores, 'issues': issues, 'suggestions': suggestions, 'weights': self.weights, 'processing_time': round(time.time() - start_time, 3) } # Add SEO-specific details if seo_result: result['seo_details'] = { 'breakdown': seo_result['breakdown'], 'extracted_keywords': seo_result.get('extracted_keywords', []), 'missing_high_value_terms': seo_result.get('missing_high_value_terms', []) } # --- AI Suggestions (Guaranteed Attempt) --- if generate_ai_suggestions and self.use_ai: try: logger.info(f"Generating AI suggestions for SKU: {product.get('sku')}, issues count: {len(issues)}") ai_suggestions = self.ai_service.generate_attribute_suggestions( product, issues, category_rules ) if not ai_suggestions: logger.warning(f"AI service returned empty suggestions for SKU: {product.get('sku')}") ai_suggestions = {"note": "No AI suggestions generated"} result['ai_suggestions'] = ai_suggestions except Exception as e: logger.error(f"[AI Suggestions] Error: {e}") result['ai_suggestions'] = {'error': str(e)} return result def _check_mandatory_fields(self, attributes: Dict, rules: List[Dict]) -> Tuple[float, List, List]: """Enhanced mandatory field validation with detailed feedback""" mandatory_rules = [r for r in rules if r.get('is_mandatory', False)] if not mandatory_rules: return 100.0, [], [] present_count = 0 issues = [] suggestions = [] for rule in mandatory_rules: attr_name = rule['attribute_name'] # Check presence and validity if attr_name in attributes and attributes[attr_name]: value = str(attributes[attr_name]).strip() if not value: issues.append(f"Mandatory field '{attr_name}' is empty") suggestions.append(f"Provide a non-empty value for {attr_name}") continue # Check length constraints min_len = rule.get('min_length') max_len = rule.get('max_length') if min_len and len(value) < min_len: issues.append(f"'{attr_name}' too short (min: {min_len} chars)") suggestions.append(f"Expand {attr_name} to at least {min_len} characters") continue if max_len and len(value) > max_len: issues.append(f"'{attr_name}' too long (max: {max_len} chars)") suggestions.append(f"Shorten {attr_name} to {max_len} characters or less") continue # Check regex pattern regex = rule.get('validation_regex') if regex and not re.match(regex, value): issues.append(f"'{attr_name}' format invalid") suggestions.append(f"Ensure {attr_name} matches required format") continue present_count += 1 else: issues.append(f"Missing mandatory field: {attr_name}") desc = rule.get('description', '') if desc: suggestions.append(f"Add {attr_name}: {desc}") else: suggestions.append(f"Add required attribute: {attr_name}") score = (present_count / len(mandatory_rules)) * 100 if mandatory_rules else 100.0 return score, issues, suggestions def _check_standardization(self, attributes: Dict, rules: List[Dict]) -> Tuple[float, List, List]: """Enhanced standardization with better fuzzy matching""" standardized_rules = [r for r in rules if r.get('valid_values')] if not standardized_rules: return 100.0, [], [] correct_count = 0 issues = [] suggestions = [] for rule in standardized_rules: attr_name = rule['attribute_name'] valid_values = rule['valid_values'] if attr_name not in attributes or not attributes[attr_name]: continue actual_value = str(attributes[attr_name]).strip() if not actual_value: continue # Exact match (case-insensitive) if actual_value in valid_values: correct_count += 1 continue # Case-insensitive match lower_valid = {v.lower(): v for v in valid_values} if actual_value.lower() in lower_valid: correct_count += 1 if actual_value != lower_valid[actual_value.lower()]: issues.append(f"{attr_name}: Case mismatch - '{actual_value}' should be '{lower_valid[actual_value.lower()]}'") suggestions.append(f"Correct capitalization of {attr_name} to: {lower_valid[actual_value.lower()]}") continue # Fuzzy matching with multiple scorers best_match = None best_score = 0 for scorer in [fuzz.ratio, fuzz.partial_ratio, fuzz.token_sort_ratio]: match = process.extractOne(actual_value, valid_values, scorer=scorer) if match and match[1] > best_score: best_match = match best_score = match[1] if best_match and best_score >= 80: correct_count += 1 if best_score < 100: issues.append(f"{attr_name}: '{actual_value}' likely means '{best_match[0]}' (confidence: {best_score}%)") suggestions.append(f"Standardize {attr_name} to: {best_match[0]}") else: issues.append(f"{attr_name}: '{actual_value}' not recognized. Valid: {', '.join(valid_values[:5])}") suggestions.append(f"Change {attr_name} to one of: {', '.join(valid_values[:3])}") score = (correct_count / len(standardized_rules)) * 100 if standardized_rules else 100.0 return score, issues, suggestions def _check_missing_values(self, attributes: Dict, rules: List[Dict]) -> Tuple[float, List, List]: """Enhanced placeholder detection""" placeholder_patterns = [ r'^n/?a', r'^none', r'^null', r'^-+', r'^\.+' r'^tbd', r'^to be determined', r'^unknown', r'^na', r'^todo', r'^pending', r'^\?+', r'^xxx+', r'^placeholder' ] total_attrs = len(rules) valid_count = 0 issues = [] suggestions = [] for rule in rules: attr_name = rule['attribute_name'] if attr_name not in attributes: continue value = str(attributes[attr_name]).strip() # Check if empty if not value: issues.append(f"'{attr_name}' is empty") suggestions.append(f"Provide a valid value for {attr_name}") continue # Check if placeholder value_lower = value.lower() is_placeholder = any(re.match(pattern, value_lower, re.IGNORECASE) for pattern in placeholder_patterns) if is_placeholder: issues.append(f"'{attr_name}' contains placeholder: '{value}'") suggestions.append(f"Replace placeholder in {attr_name} with actual data") continue # Check for suspicious patterns if len(value) < 2 and rule.get('is_mandatory'): issues.append(f"'{attr_name}' suspiciously short: '{value}'") suggestions.append(f"Provide more detailed {attr_name}") continue valid_count += 1 score = (valid_count / total_attrs) * 100 if total_attrs > 0 else 100.0 return score, issues, suggestions def _check_consistency(self, attributes: Dict, title: str, description: str) -> Tuple[float, List]: """Enhanced consistency checking with context awareness""" issues = [] consistency_count = 0 total_checks = 0 check_attrs = ['brand', 'color', 'size', 'material', 'model', 'weight', 'dimensions'] combined_text = f"{title} {description}".lower() for attr in check_attrs: if attr not in attributes or not attributes[attr]: continue total_checks += 1 attr_value = str(attributes[attr]).lower().strip() # Skip very short values if len(attr_value) < 2: consistency_count += 1 continue # Direct substring match if attr_value in combined_text: consistency_count += 1 continue # Word boundary match words_in_text = set(combined_text.split()) words_in_attr = set(attr_value.split()) if words_in_attr.issubset(words_in_text): consistency_count += 1 continue # Fuzzy word matching text_words = combined_text.split() matches = 0 for attr_word in words_in_attr: match = process.extractOne(attr_word, text_words, scorer=fuzz.ratio) if match and match[1] >= 80: matches += 1 if matches / len(words_in_attr) >= 0.7: consistency_count += 1 continue issues.append(f"'{attr.title()}': '{attributes[attr]}' not clearly mentioned in title/description") score = (consistency_count / total_checks) * 100 if total_checks > 0 else 100.0 return score, issues def extract_attributes_from_text(self, text: str, category: str = '') -> Dict: """Enhanced attribute extraction with AI fallback""" extracted = {} # Try spaCy first if self.nlp: extracted = self._extract_with_spacy(text) # Use AI if available and spaCy found little if self.use_ai and len(extracted) < 3: try: ai_extracted = self.ai_service.extract_attributes_with_ai(text, '', category) extracted.update({k: v for k, v in ai_extracted.items() if v}) except Exception as e: logger.error(f"AI extraction failed: {e}") return extracted def _extract_with_spacy(self, text: str) -> Dict: """Extract using spaCy NER""" doc = self.nlp(text) extracted = defaultdict(list) for ent in doc.ents: if ent.label_ == 'ORG': extracted['brand'].append(ent.text) elif ent.label_ == 'QUANTITY': extracted['size'].append(ent.text) elif ent.label_ == 'PRODUCT': extracted['product_type'].append(ent.text) # Color detection colors = ['red', 'blue', 'green', 'black', 'white', 'yellow', 'orange', 'purple', 'pink', 'brown', 'gray', 'grey', 'silver', 'gold'] text_lower = text.lower() for color in colors: if color in text_lower: extracted['color'].append(color.title()) # Return most common value result = {} for key, values in extracted.items(): if values: result[key] = max(set(values), key=values.count) return result