# # attribute_scorer_integrated.py # """ # Enhanced AttributeQualityScorer with Title/Description scoring integrated # """ # import re # <-- ADDED: Required for regex patterns # import time # import logging # from typing import Dict, List, Tuple # from rapidfuzz import fuzz, process # from collections import defaultdict # logger = logging.getLogger(__name__) # class AttributeQualityScorer: # """ # Complete Product Quality Scorer with ALL components: # - Mandatory Fields (25%) # - Standardization (20%) # - Missing Values (13%) # - Consistency (7%) # - SEO Discoverability (10%) # - Title Quality (10%) # - Description Quality (15%) # """ # def __init__(self, use_ai: bool = True, use_seo: bool = True): # # Load spaCy model (optional, for advanced NER) # self.nlp = None # try: # import spacy # self.nlp = spacy.load("en_core_web_sm") # logger.info("spaCy model loaded successfully") # except Exception as e: # logger.warning(f"spaCy model not loaded: {e}. Will use fallback methods.") # # Initialize AI service # self.use_ai = use_ai # self.ai_service = None # if use_ai: # try: # from .gemini_service import GeminiAttributeService # self.ai_service = GeminiAttributeService() # logger.info("Gemini AI service initialized") # except Exception as e: # logger.warning(f"Gemini service not available: {e}") # self.use_ai = False # # Initialize SEO scorer # self.use_seo = use_seo # self.seo_scorer = None # if use_seo: # try: # from .seo_scorer import SEODiscoverabilityScorer # self.seo_scorer = SEODiscoverabilityScorer() # logger.info("SEO scorer initialized") # except Exception as e: # logger.warning(f"SEO scorer not available: {e}") # self.use_seo = False # # Initialize Title/Description scorer # self.title_desc_scorer = None # try: # from .title_description_scorer import TitleDescriptionScorer # self.title_desc_scorer = TitleDescriptionScorer(use_ai=use_ai) # logger.info("Title/Description scorer initialized") # except Exception as e: # logger.warning(f"Title/Description scorer not available: {e}") # # UPDATED WEIGHTS (Total = 100%) # self.weights = { # 'mandatory_fields': 0.25, # 25% # 'standardization': 0.20, # 20% # 'missing_values': 0.13, # 13% # 'consistency': 0.07, # 7% # 'seo_discoverability': 0.10, # 10% # 'title_quality': 0.10, # 10% (NEW) # 'description_quality': 0.15 # 15% (NEW) # } # def score_product( # self, # product: Dict, # category_rules: List[Dict], # generate_ai_suggestions: bool = True # ) -> Dict: # """ # Complete product scoring with all components # Args: # product: Product dict with sku, category, title, description, attributes # category_rules: List of CategoryAttributeRule dicts # generate_ai_suggestions: Whether to call AI for suggestions # Returns: # Dict with final_score, breakdown, issues, suggestions, etc. # """ # start_time = time.time() # attributes = product.get('attributes', {}) # category = product.get('category', '') # # Initialize scores # scores = { # 'mandatory_fields': 0, # 'standardization': 0, # 'missing_values': 0, # 'consistency': 0, # 'seo_discoverability': 0, # 'title_quality': 0, # 'description_quality': 0 # } # issues = [] # suggestions = [] # # --- 1. Mandatory Fields (25%) --- # try: # mandatory_score, mandatory_issues, mandatory_suggestions = self._check_mandatory_fields( # attributes, category_rules # ) # scores['mandatory_fields'] = mandatory_score # issues.extend(mandatory_issues) # suggestions.extend(mandatory_suggestions) # except Exception as e: # logger.error(f"[Mandatory Fields] Error: {e}", exc_info=True) # scores['mandatory_fields'] = 0 # # --- 2. Standardization (20%) --- # try: # std_score, std_issues, std_suggestions = self._check_standardization( # attributes, category_rules # ) # scores['standardization'] = std_score # issues.extend(std_issues) # suggestions.extend(std_suggestions) # except Exception as e: # logger.error(f"[Standardization] Error: {e}", exc_info=True) # scores['standardization'] = 0 # # --- 3. Missing Values (13%) --- # try: # missing_score, missing_issues, missing_suggestions = self._check_missing_values( # attributes, category_rules # ) # scores['missing_values'] = missing_score # issues.extend(missing_issues) # suggestions.extend(missing_suggestions) # except Exception as e: # logger.error(f"[Missing Values] Error: {e}", exc_info=True) # scores['missing_values'] = 0 # # --- 4. Consistency (7%) --- # try: # consistency_score, consistency_issues = self._check_consistency( # attributes, # product.get('title', ''), # product.get('description', '') # ) # scores['consistency'] = consistency_score # issues.extend(consistency_issues) # except Exception as e: # logger.error(f"[Consistency] Error: {e}", exc_info=True) # scores['consistency'] = 0 # # --- 5. SEO Discoverability (10%) --- # seo_result = None # if self.use_seo and self.seo_scorer: # try: # seo_result = self.seo_scorer.score_seo(product, category_rules) # scores['seo_discoverability'] = seo_result['seo_score'] # issues.extend(seo_result['issues']) # suggestions.extend(seo_result['suggestions']) # except Exception as e: # logger.error(f"[SEO Scoring] Error: {e}", exc_info=True) # scores['seo_discoverability'] = 0 # # --- 6. Title & Description Quality (25% combined) --- # title_desc_result = None # if self.title_desc_scorer: # try: # title_desc_result = self.title_desc_scorer.score_title_and_description( # product, # category_rules # ) # scores['title_quality'] = title_desc_result['title_score'] # scores['description_quality'] = title_desc_result['description_score'] # issues.extend(title_desc_result['issues']) # suggestions.extend(title_desc_result['suggestions']) # except Exception as e: # logger.error(f"[Title/Description Scoring] Error: {e}", exc_info=True) # scores['title_quality'] = 0 # scores['description_quality'] = 0 # # --- Final Score Calculation --- # final_score = sum(scores[key] * self.weights[key] for key in scores) # result = { # 'final_score': round(final_score, 2), # 'max_score': 100.0, # 'breakdown': scores, # 'issues': issues, # 'suggestions': suggestions, # 'weights': self.weights, # 'processing_time': round(time.time() - start_time, 3) # } # # Add component-specific details # if seo_result: # result['seo_details'] = { # 'breakdown': seo_result['breakdown'], # 'extracted_keywords': seo_result.get('extracted_keywords', []), # 'missing_high_value_terms': seo_result.get('missing_high_value_terms', []) # } # if title_desc_result: # result['title_description_details'] = { # 'title_breakdown': title_desc_result.get('title_breakdown', {}), # 'description_breakdown': title_desc_result.get('description_breakdown', {}), # 'ai_improvements': title_desc_result.get('ai_improvements') # } # # --- AI Suggestions (Comprehensive) --- # if generate_ai_suggestions and self.use_ai and self.ai_service: # try: # logger.info(f"Generating comprehensive AI suggestions for SKU: {product.get('sku')}") # ai_suggestions = self.ai_service.generate_comprehensive_suggestions( # product, # issues, # category_rules, # scores # ) # result['ai_suggestions'] = ai_suggestions # except Exception as e: # logger.error(f"[AI Suggestions] Error: {e}", exc_info=True) # result['ai_suggestions'] = {'error': str(e)} # return result # # ========== ATTRIBUTE VALIDATION METHODS ========== # def _check_mandatory_fields( # self, # attributes: Dict, # rules: List[Dict] # ) -> Tuple[float, List[str], List[str]]: # """ # Check if all mandatory fields are present and valid # Returns: # Tuple of (score, issues, suggestions) # """ # mandatory_rules = [r for r in rules if r.get('is_mandatory', False)] # if not mandatory_rules: # return 100.0, [], [] # present_count = 0 # issues = [] # suggestions = [] # for rule in mandatory_rules: # attr_name = rule['attribute_name'] # if attr_name in attributes and attributes[attr_name]: # value = str(attributes[attr_name]).strip() # if not value: # issues.append(f"Mandatory field '{attr_name}' is empty") # suggestions.append(f"Provide a non-empty value for {attr_name}") # continue # # Check length constraints # min_len = rule.get('min_length') # max_len = rule.get('max_length') # if min_len and len(value) < min_len: # issues.append(f"'{attr_name}' too short (min: {min_len} chars)") # suggestions.append(f"Expand {attr_name} to at least {min_len} characters") # continue # if max_len and len(value) > max_len: # issues.append(f"'{attr_name}' too long (max: {max_len} chars)") # suggestions.append(f"Shorten {attr_name} to {max_len} characters or less") # continue # # Check regex pattern if provided # regex = rule.get('validation_regex') # if regex: # try: # if not re.match(regex, value): # issues.append(f"'{attr_name}' format invalid") # suggestions.append(f"Ensure {attr_name} matches required format") # continue # except re.error: # logger.warning(f"Invalid regex pattern for {attr_name}: {regex}") # present_count += 1 # else: # issues.append(f"Missing mandatory field: {attr_name}") # desc = rule.get('description', '') # if desc: # suggestions.append(f"Add {attr_name}: {desc}") # else: # suggestions.append(f"Add required attribute: {attr_name}") # score = (present_count / len(mandatory_rules)) * 100 if mandatory_rules else 100.0 # return score, issues, suggestions # def _check_standardization( # self, # attributes: Dict, # rules: List[Dict] # ) -> Tuple[float, List[str], List[str]]: # """ # Check if attribute values match standardized valid values # Returns: # Tuple of (score, issues, suggestions) # """ # standardized_rules = [r for r in rules if r.get('valid_values')] # if not standardized_rules: # return 100.0, [], [] # correct_count = 0 # issues = [] # suggestions = [] # for rule in standardized_rules: # attr_name = rule['attribute_name'] # valid_values = rule['valid_values'] # if not valid_values: # Skip if empty valid_values list # continue # if attr_name not in attributes or not attributes[attr_name]: # continue # actual_value = str(attributes[attr_name]).strip() # if not actual_value: # continue # # Exact match # if actual_value in valid_values: # correct_count += 1 # continue # # Case-insensitive match # lower_valid = {v.lower(): v for v in valid_values} # if actual_value.lower() in lower_valid: # correct_count += 1 # correct_value = lower_valid[actual_value.lower()] # if actual_value != correct_value: # issues.append( # f"{attr_name}: Case mismatch - '{actual_value}' should be '{correct_value}'" # ) # suggestions.append(f"Correct capitalization of {attr_name} to: {correct_value}") # continue # # Fuzzy matching # best_match = process.extractOne(actual_value, valid_values, scorer=fuzz.ratio) # if best_match and best_match[1] >= 80: # correct_count += 1 # if best_match[1] < 100: # issues.append( # f"{attr_name}: '{actual_value}' likely means '{best_match[0]}' " # f"(confidence: {best_match[1]}%)" # ) # suggestions.append(f"Standardize {attr_name} to: {best_match[0]}") # else: # issues.append( # f"{attr_name}: '{actual_value}' not recognized. " # f"Valid: {', '.join(valid_values[:3])}" # ) # suggestions.append(f"Change {attr_name} to one of: {', '.join(valid_values[:3])}") # score = (correct_count / len(standardized_rules)) * 100 if standardized_rules else 100.0 # return score, issues, suggestions # def _check_missing_values( # self, # attributes: Dict, # rules: List[Dict] # ) -> Tuple[float, List[str], List[str]]: # """ # Check for placeholder values or empty attributes # Returns: # Tuple of (score, issues, suggestions) # """ # placeholder_patterns = [ # r'^n/?a$', r'^none$', r'^null$', r'^-+$', r'^\.+$', # r'^tbd$', r'^to be determined$', r'^unknown$', r'^na$', # r'^todo$', r'^pending$', r'^\?+$', r'^xxx+$', r'^placeholder$' # ] # total_attrs = len(rules) # valid_count = 0 # issues = [] # suggestions = [] # for rule in rules: # attr_name = rule['attribute_name'] # if attr_name not in attributes: # continue # value = str(attributes[attr_name]).strip() # if not value: # issues.append(f"'{attr_name}' is empty") # suggestions.append(f"Provide a valid value for {attr_name}") # continue # value_lower = value.lower() # is_placeholder = any( # re.match(pattern, value_lower, re.IGNORECASE) # for pattern in placeholder_patterns # ) # if is_placeholder: # issues.append(f"'{attr_name}' contains placeholder: '{value}'") # suggestions.append(f"Replace placeholder in {attr_name} with actual data") # continue # # Check for suspiciously short values on mandatory fields # if rule.get('is_mandatory') and len(value) < 2: # issues.append(f"'{attr_name}' suspiciously short: '{value}'") # suggestions.append(f"Provide more detailed {attr_name}") # continue # valid_count += 1 # score = (valid_count / total_attrs) * 100 if total_attrs > 0 else 100.0 # return score, issues, suggestions # def _check_consistency( # self, # attributes: Dict, # title: str, # description: str # ) -> Tuple[float, List[str]]: # """ # Check if attribute values are mentioned in title/description # Returns: # Tuple of (score, issues) # """ # issues = [] # consistency_count = 0 # total_checks = 0 # check_attrs = ['brand', 'color', 'size', 'material', 'model', 'weight', 'dimensions'] # combined_text = f"{title} {description}".lower() # for attr in check_attrs: # if attr not in attributes or not attributes[attr]: # continue # total_checks += 1 # attr_value = str(attributes[attr]).lower().strip() # # Skip very short values (likely abbreviations or numbers) # if len(attr_value) < 2: # consistency_count += 1 # continue # # Direct substring match # if attr_value in combined_text: # consistency_count += 1 # continue # # Word boundary match (for multi-word attributes) # words_in_attr = set(attr_value.split()) # words_in_text = set(combined_text.split()) # if words_in_attr.issubset(words_in_text): # consistency_count += 1 # continue # # Fuzzy matching as last resort # text_words = combined_text.split() # if text_words: # match = process.extractOne(attr_value, text_words, scorer=fuzz.ratio) # if match and match[1] >= 80: # consistency_count += 1 # continue # # If we get here, attribute is not mentioned # issues.append( # f"'{attr.title()}': '{attributes[attr]}' not mentioned in title/description" # ) # score = (consistency_count / total_checks) * 100 if total_checks > 0 else 100.0 # return score, issues # # ========== UTILITY METHODS ========== # def extract_attributes_from_text(self, text: str, category: str = '') -> Dict: # """ # Extract attributes from unstructured text using NER and patterns # Args: # text: Product title or description # category: Product category (optional, for context) # Returns: # Dict of extracted attributes # """ # extracted = {} # # Try spaCy NER if available # if self.nlp: # try: # doc = self.nlp(text) # # Extract organizations as potential brands # orgs = [ent.text for ent in doc.ents if ent.label_ == 'ORG'] # if orgs: # extracted['brand'] = orgs[0] # # Extract quantities as potential sizes # quantities = [ent.text for ent in doc.ents if ent.label_ == 'QUANTITY'] # if quantities: # extracted['size'] = quantities[0] # except Exception as e: # logger.warning(f"spaCy extraction failed: {e}") # # Pattern-based extraction for colors # color_patterns = [ # 'black', 'white', 'red', 'blue', 'green', 'yellow', 'orange', # 'purple', 'pink', 'brown', 'gray', 'grey', 'silver', 'gold', # 'rose gold', 'space gray', 'navy', 'beige', 'tan' # ] # text_lower = text.lower() # for color in color_patterns: # if color in text_lower: # extracted['color'] = color.title() # break # # Use AI for enhanced extraction if available # if self.use_ai and self.ai_service and len(extracted) < 3: # try: # ai_extracted = self.ai_service.extract_attributes_with_ai( # text, '', category # ) # # Merge AI results (don't override existing) # for key, value in ai_extracted.items(): # if key not in extracted and value: # extracted[key] = value # except Exception as e: # logger.warning(f"AI extraction failed: {e}") # return extracted # def get_score_interpretation(self, score: float) -> Dict[str, str]: # """ # Get human-readable interpretation of score # Args: # score: Numeric score (0-100) # Returns: # Dict with grade, status, and recommendation # """ # if score >= 90: # return { # 'grade': 'A', # 'status': 'Excellent', # 'color': 'green', # 'recommendation': 'Product listing is of high quality. Minor tweaks only.' # } # elif score >= 80: # return { # 'grade': 'B', # 'status': 'Good', # 'color': 'lightgreen', # 'recommendation': 'Good quality. Address minor issues to reach excellence.' # } # elif score >= 70: # return { # 'grade': 'C', # 'status': 'Fair', # 'color': 'yellow', # 'recommendation': 'Acceptable but needs improvement. Review suggestions.' # } # elif score >= 60: # return { # 'grade': 'D', # 'status': 'Poor', # 'color': 'orange', # 'recommendation': 'Significant issues found. Requires immediate attention.' # } # else: # return { # 'grade': 'F', # 'status': 'Critical', # 'color': 'red', # 'recommendation': 'Critical quality issues. Major revision needed.' # } # attribute_scorer_integrated.py """ Enhanced AttributeQualityScorer with ProductContentRule integration """ import re import time import logging from typing import Dict, List, Tuple from rapidfuzz import fuzz, process from collections import defaultdict logger = logging.getLogger(__name__) class AttributeQualityScorer: """ Complete Product Quality Scorer with ALL components INCLUDING ProductContentRule validation: - Mandatory Fields (20%) - Standardization (15%) - Missing Values (10%) - Consistency (5%) - SEO Discoverability (10%) - Content Rules Compliance (15%) ← NEW: Validates against ProductContentRule - Title Quality (10%) - Description Quality (15%) """ def __init__(self, use_ai: bool = True, use_seo: bool = True): # Load spaCy model (optional, for advanced NER) self.nlp = None try: import spacy self.nlp = spacy.load("en_core_web_sm") logger.info("spaCy model loaded successfully") except Exception as e: logger.warning(f"spaCy model not loaded: {e}. Will use fallback methods.") # Initialize AI service self.use_ai = use_ai self.ai_service = None if use_ai: try: from .gemini_service import GeminiAttributeService self.ai_service = GeminiAttributeService() logger.info("Gemini AI service initialized") except Exception as e: logger.warning(f"Gemini service not available: {e}") self.use_ai = False # Initialize SEO scorer self.use_seo = use_seo self.seo_scorer = None if use_seo: try: from .seo_scorer import SEODiscoverabilityScorer self.seo_scorer = SEODiscoverabilityScorer() logger.info("SEO scorer initialized") except Exception as e: logger.warning(f"SEO scorer not available: {e}") self.use_seo = False # Initialize Title/Description scorer self.title_desc_scorer = None try: from .title_description_scorer import TitleDescriptionScorer self.title_desc_scorer = TitleDescriptionScorer(use_ai=use_ai) logger.info("Title/Description scorer initialized") except Exception as e: logger.warning(f"Title/Description scorer not available: {e}") # Initialize Content Rules scorer ← NEW self.content_rules_scorer = None try: from .content_rules_scorer import ContentRulesScorer self.content_rules_scorer = ContentRulesScorer() logger.info("Content Rules scorer initialized") except Exception as e: logger.warning(f"Content Rules scorer not available: {e}") # UPDATED WEIGHTS (Total = 100%) self.weights = { 'mandatory_fields': 0.20, # 20% (reduced from 25%) 'standardization': 0.15, # 15% (reduced from 20%) 'missing_values': 0.10, # 10% (reduced from 13%) 'consistency': 0.05, # 5% (reduced from 7%) 'seo_discoverability': 0.10, # 10% 'content_rules_compliance': 0.15, # 15% ← NEW: ProductContentRule validation 'title_quality': 0.10, # 10% 'description_quality': 0.15 # 15% } def score_product( self, product: Dict, category_rules: List[Dict], content_rules: List[Dict] = None, # ← NEW parameter generate_ai_suggestions: bool = True ) -> Dict: """ Complete product scoring with all components including ProductContentRule validation Args: product: Product dict with sku, category, title, description, attributes category_rules: List of CategoryAttributeRule dicts content_rules: List of ProductContentRule dicts (NEW) generate_ai_suggestions: Whether to call AI for suggestions Returns: Dict with final_score, breakdown, issues, suggestions, etc. """ start_time = time.time() attributes = product.get('attributes', {}) category = product.get('category', '') # Initialize scores scores = { 'mandatory_fields': 0, 'standardization': 0, 'missing_values': 0, 'consistency': 0, 'seo_discoverability': 0, 'content_rules_compliance': 0, # ← NEW 'title_quality': 0, 'description_quality': 0 } issues = [] suggestions = [] # --- 1. Mandatory Fields (20%) --- try: mandatory_score, mandatory_issues, mandatory_suggestions = self._check_mandatory_fields( attributes, category_rules ) scores['mandatory_fields'] = mandatory_score issues.extend(mandatory_issues) suggestions.extend(mandatory_suggestions) except Exception as e: logger.error(f"[Mandatory Fields] Error: {e}", exc_info=True) scores['mandatory_fields'] = 0 # --- 2. Standardization (15%) --- try: std_score, std_issues, std_suggestions = self._check_standardization( attributes, category_rules ) scores['standardization'] = std_score issues.extend(std_issues) suggestions.extend(std_suggestions) except Exception as e: logger.error(f"[Standardization] Error: {e}", exc_info=True) scores['standardization'] = 0 # --- 3. Missing Values (10%) --- try: missing_score, missing_issues, missing_suggestions = self._check_missing_values( attributes, category_rules ) scores['missing_values'] = missing_score issues.extend(missing_issues) suggestions.extend(missing_suggestions) except Exception as e: logger.error(f"[Missing Values] Error: {e}", exc_info=True) scores['missing_values'] = 0 # --- 4. Consistency (5%) --- try: consistency_score, consistency_issues = self._check_consistency( attributes, product.get('title', ''), product.get('description', '') ) scores['consistency'] = consistency_score issues.extend(consistency_issues) except Exception as e: logger.error(f"[Consistency] Error: {e}", exc_info=True) scores['consistency'] = 0 # --- 5. SEO Discoverability (10%) --- seo_result = None if self.use_seo and self.seo_scorer: try: seo_result = self.seo_scorer.score_seo(product, category_rules) scores['seo_discoverability'] = seo_result['seo_score'] issues.extend(seo_result['issues']) suggestions.extend(seo_result['suggestions']) except Exception as e: logger.error(f"[SEO Scoring] Error: {e}", exc_info=True) scores['seo_discoverability'] = 0 # --- 6. Content Rules Compliance (15%) ← NEW --- content_rules_result = None if content_rules and self.content_rules_scorer: try: content_rules_result = self.content_rules_scorer.score_content_fields( product, content_rules ) scores['content_rules_compliance'] = content_rules_result['overall_content_score'] issues.extend(content_rules_result['issues']) suggestions.extend(content_rules_result['suggestions']) except Exception as e: logger.error(f"[Content Rules] Error: {e}", exc_info=True) scores['content_rules_compliance'] = 0 else: # If no content rules provided, give neutral score scores['content_rules_compliance'] = 100.0 # --- 7. Title & Description Quality (25% combined) --- title_desc_result = None if self.title_desc_scorer: try: title_desc_result = self.title_desc_scorer.score_title_and_description( product, category_rules ) scores['title_quality'] = title_desc_result['title_score'] scores['description_quality'] = title_desc_result['description_score'] issues.extend(title_desc_result['issues']) suggestions.extend(title_desc_result['suggestions']) except Exception as e: logger.error(f"[Title/Description Scoring] Error: {e}", exc_info=True) scores['title_quality'] = 0 scores['description_quality'] = 0 # --- Final Score Calculation --- final_score = sum(scores[key] * self.weights[key] for key in scores) result = { 'final_score': round(final_score, 2), 'max_score': 100.0, 'breakdown': scores, 'issues': issues, 'suggestions': suggestions, 'weights': self.weights, 'processing_time': round(time.time() - start_time, 3) } # Add component-specific details if seo_result: result['seo_details'] = { 'breakdown': seo_result['breakdown'], 'extracted_keywords': seo_result.get('extracted_keywords', []), 'missing_high_value_terms': seo_result.get('missing_high_value_terms', []) } if content_rules_result: result['content_rules_details'] = { 'field_scores': content_rules_result['field_scores'], 'rules_applied': content_rules_result['rules_applied'] } if title_desc_result: result['title_description_details'] = { 'title_breakdown': title_desc_result.get('title_breakdown', {}), 'description_breakdown': title_desc_result.get('description_breakdown', {}), 'ai_improvements': title_desc_result.get('ai_improvements') } # --- AI Suggestions (Comprehensive) --- if generate_ai_suggestions and self.use_ai and self.ai_service: try: logger.info(f"Generating comprehensive AI suggestions for SKU: {product.get('sku')}") ai_suggestions = self.ai_service.generate_comprehensive_suggestions( product, issues, category_rules, scores ) result['ai_suggestions'] = ai_suggestions except Exception as e: logger.error(f"[AI Suggestions] Error: {e}", exc_info=True) result['ai_suggestions'] = {'error': str(e)} return result # ========== ATTRIBUTE VALIDATION METHODS (unchanged) ========== def _check_mandatory_fields( self, attributes: Dict, rules: List[Dict] ) -> Tuple[float, List[str], List[str]]: """Check if all mandatory fields are present and valid""" mandatory_rules = [r for r in rules if r.get('is_mandatory', False)] if not mandatory_rules: return 100.0, [], [] present_count = 0 issues = [] suggestions = [] for rule in mandatory_rules: attr_name = rule['attribute_name'] if attr_name in attributes and attributes[attr_name]: value = str(attributes[attr_name]).strip() if not value: issues.append(f"Mandatory field '{attr_name}' is empty") suggestions.append(f"Provide a non-empty value for {attr_name}") continue # Check length constraints min_len = rule.get('min_length') max_len = rule.get('max_length') if min_len and len(value) < min_len: issues.append(f"'{attr_name}' too short (min: {min_len} chars)") suggestions.append(f"Expand {attr_name} to at least {min_len} characters") continue if max_len and len(value) > max_len: issues.append(f"'{attr_name}' too long (max: {max_len} chars)") suggestions.append(f"Shorten {attr_name} to {max_len} characters or less") continue # Check regex pattern if provided regex = rule.get('validation_regex') if regex: try: if not re.match(regex, value): issues.append(f"'{attr_name}' format invalid") suggestions.append(f"Ensure {attr_name} matches required format") continue except re.error: logger.warning(f"Invalid regex pattern for {attr_name}: {regex}") present_count += 1 else: issues.append(f"Missing mandatory field: {attr_name}") desc = rule.get('description', '') if desc: suggestions.append(f"Add {attr_name}: {desc}") else: suggestions.append(f"Add required attribute: {attr_name}") score = (present_count / len(mandatory_rules)) * 100 if mandatory_rules else 100.0 return score, issues, suggestions def _check_standardization( self, attributes: Dict, rules: List[Dict] ) -> Tuple[float, List[str], List[str]]: """Check if attribute values match standardized valid values""" standardized_rules = [r for r in rules if r.get('valid_values')] if not standardized_rules: return 100.0, [], [] correct_count = 0 issues = [] suggestions = [] for rule in standardized_rules: attr_name = rule['attribute_name'] valid_values = rule['valid_values'] if not valid_values: continue if attr_name not in attributes or not attributes[attr_name]: continue actual_value = str(attributes[attr_name]).strip() if not actual_value: continue # Exact match if actual_value in valid_values: correct_count += 1 continue # Case-insensitive match lower_valid = {v.lower(): v for v in valid_values} if actual_value.lower() in lower_valid: correct_count += 1 correct_value = lower_valid[actual_value.lower()] if actual_value != correct_value: issues.append( f"{attr_name}: Case mismatch - '{actual_value}' should be '{correct_value}'" ) suggestions.append(f"Correct capitalization of {attr_name} to: {correct_value}") continue # Fuzzy matching best_match = process.extractOne(actual_value, valid_values, scorer=fuzz.ratio) if best_match and best_match[1] >= 80: correct_count += 1 if best_match[1] < 100: issues.append( f"{attr_name}: '{actual_value}' likely means '{best_match[0]}' " f"(confidence: {best_match[1]}%)" ) suggestions.append(f"Standardize {attr_name} to: {best_match[0]}") else: issues.append( f"{attr_name}: '{actual_value}' not recognized. " f"Valid: {', '.join(valid_values[:3])}" ) suggestions.append(f"Change {attr_name} to one of: {', '.join(valid_values[:3])}") score = (correct_count / len(standardized_rules)) * 100 if standardized_rules else 100.0 return score, issues, suggestions def _check_missing_values( self, attributes: Dict, rules: List[Dict] ) -> Tuple[float, List[str], List[str]]: """Check for placeholder values or empty attributes""" placeholder_patterns = [ r'^n/?a$', r'^none$', r'^null$', r'^-+$', r'^\.+$', r'^tbd$', r'^to be determined$', r'^unknown$', r'^na$', r'^todo$', r'^pending$', r'^\?+$', r'^xxx+$', r'^placeholder$' ] total_attrs = len(rules) valid_count = 0 issues = [] suggestions = [] for rule in rules: attr_name = rule['attribute_name'] if attr_name not in attributes: continue value = str(attributes[attr_name]).strip() if not value: issues.append(f"'{attr_name}' is empty") suggestions.append(f"Provide a valid value for {attr_name}") continue value_lower = value.lower() is_placeholder = any( re.match(pattern, value_lower, re.IGNORECASE) for pattern in placeholder_patterns ) if is_placeholder: issues.append(f"'{attr_name}' contains placeholder: '{value}'") suggestions.append(f"Replace placeholder in {attr_name} with actual data") continue # Check for suspiciously short values on mandatory fields if rule.get('is_mandatory') and len(value) < 2: issues.append(f"'{attr_name}' suspiciously short: '{value}'") suggestions.append(f"Provide more detailed {attr_name}") continue valid_count += 1 score = (valid_count / total_attrs) * 100 if total_attrs > 0 else 100.0 return score, issues, suggestions def _check_consistency( self, attributes: Dict, title: str, description: str ) -> Tuple[float, List[str]]: """Check if attribute values are mentioned in title/description""" issues = [] consistency_count = 0 total_checks = 0 check_attrs = ['brand', 'color', 'size', 'material', 'model', 'weight', 'dimensions'] combined_text = f"{title} {description}".lower() for attr in check_attrs: if attr not in attributes or not attributes[attr]: continue total_checks += 1 attr_value = str(attributes[attr]).lower().strip() if len(attr_value) < 2: consistency_count += 1 continue if attr_value in combined_text: consistency_count += 1 continue words_in_attr = set(attr_value.split()) words_in_text = set(combined_text.split()) if words_in_attr.issubset(words_in_text): consistency_count += 1 continue text_words = combined_text.split() if text_words: match = process.extractOne(attr_value, text_words, scorer=fuzz.ratio) if match and match[1] >= 80: consistency_count += 1 continue issues.append( f"'{attr.title()}': '{attributes[attr]}' not mentioned in title/description" ) score = (consistency_count / total_checks) * 100 if total_checks > 0 else 100.0 return score, issues # ========== UTILITY METHODS (unchanged) ========== def extract_attributes_from_text(self, text: str, category: str = '') -> Dict: """Extract attributes from unstructured text using NER and patterns""" extracted = {} if self.nlp: try: doc = self.nlp(text) orgs = [ent.text for ent in doc.ents if ent.label_ == 'ORG'] if orgs: extracted['brand'] = orgs[0] quantities = [ent.text for ent in doc.ents if ent.label_ == 'QUANTITY'] if quantities: extracted['size'] = quantities[0] except Exception as e: logger.warning(f"spaCy extraction failed: {e}") # Pattern-based extraction for colors color_patterns = [ 'black', 'white', 'red', 'blue', 'green', 'yellow', 'orange', 'purple', 'pink', 'brown', 'gray', 'grey', 'silver', 'gold', 'rose gold', 'space gray', 'navy', 'beige', 'tan' ] text_lower = text.lower() for color in color_patterns: if color in text_lower: extracted['color'] = color.title() break # Use AI for enhanced extraction if available if self.use_ai and self.ai_service and len(extracted) < 3: try: ai_extracted = self.ai_service.extract_attributes_with_ai( text, '', category ) for key, value in ai_extracted.items(): if key not in extracted and value: extracted[key] = value except Exception as e: logger.warning(f"AI extraction failed: {e}") return extracted def get_score_interpretation(self, score: float) -> Dict[str, str]: """Get human-readable interpretation of score""" if score >= 90: return { 'grade': 'A', 'status': 'Excellent', 'color': 'green', 'recommendation': 'Product listing is of high quality. Minor tweaks only.' } elif score >= 80: return { 'grade': 'B', 'status': 'Good', 'color': 'lightgreen', 'recommendation': 'Good quality. Address minor issues to reach excellence.' } elif score >= 70: return { 'grade': 'C', 'status': 'Fair', 'color': 'yellow', 'recommendation': 'Acceptable but needs improvement. Review suggestions.' } elif score >= 60: return { 'grade': 'D', 'status': 'Poor', 'color': 'orange', 'recommendation': 'Significant issues found. Requires immediate attention.' } else: return { 'grade': 'F', 'status': 'Critical', 'color': 'red', 'recommendation': 'Critical quality issues. Major revision needed.' }