# # attribute_scorer_integrated.py # """ # Enhanced AttributeQualityScorer with ProductContentRule integration # """ # import re # import time # import logging # from typing import Dict, List, Tuple # from rapidfuzz import fuzz, process # from collections import defaultdict # logger = logging.getLogger(__name__) # class AttributeQualityScorer: # """ # Optimized Product Quality Scorer with safe handling of optional components: # - Mandatory Fields (16%) # - Standardization (12%) # - Missing Values (8%) # - Consistency (4%) # - SEO Discoverability (8%) # - Content Rules Compliance (12%) # - Title Quality (8%) # - Description Quality (12%) # - Image Quality (20%) # Optional components do not inflate final score; weights are rescaled proportionally. # """ # def __init__(self, use_ai: bool = True, use_seo: bool = True): # # spaCy for NER # self.nlp = None # try: # import spacy # self.nlp = spacy.load("en_core_web_sm") # logger.info("spaCy model loaded successfully") # except Exception as e: # logger.warning(f"spaCy not loaded: {e}") # # AI Service # self.use_ai = use_ai # self.ai_service = None # if use_ai: # try: # from .gemini_service import GeminiAttributeService # self.ai_service = GeminiAttributeService() # except Exception as e: # logger.warning(f"AI service not available: {e}") # self.use_ai = False # # Image scorer # self.image_scorer = None # try: # from .image_scorer import ImageQualityScorer # self.image_scorer = ImageQualityScorer() # except Exception as e: # logger.warning(f"Image scorer not available: {e}") # # SEO scorer # self.use_seo = use_seo # self.seo_scorer = None # if use_seo: # try: # from .seo_scorer import SEODiscoverabilityScorer # self.seo_scorer = SEODiscoverabilityScorer() # except Exception as e: # logger.warning(f"SEO scorer not available: {e}") # self.use_seo = False # # Title/Description scorer # self.title_desc_scorer = None # try: # from .title_description_scorer import TitleDescriptionScorer # self.title_desc_scorer = TitleDescriptionScorer(use_ai=use_ai) # except Exception as e: # logger.warning(f"Title/Description scorer not available: {e}") # # Content Rules scorer # self.content_rules_scorer = None # try: # from .content_rules_scorer import ContentRulesScorer # self.content_rules_scorer = ContentRulesScorer() # except Exception as e: # logger.warning(f"Content Rules scorer not available: {e}") # # Base weights # self.weights = { # 'mandatory_fields': 0.16, # 'standardization': 0.12, # 'missing_values': 0.08, # 'consistency': 0.04, # 'seo_discoverability': 0.08, # 'content_rules_compliance': 0.12, # 'title_quality': 0.08, # 'description_quality': 0.12, # 'image_quality': 0.20 # } # def score_product( # self, # product: Dict, # category_rules: List[Dict], # content_rules: List[Dict] = None, # generate_ai_suggestions: bool = True # ) -> Dict: # start_time = time.time() # attributes = product.get('attributes', {}) # category = product.get('category', '') # scores = {k: 0 for k in self.weights.keys()} # issues, suggestions = [], [] # # --- 1. Mandatory Fields --- # try: # score, i, s = self._check_mandatory_fields(attributes, category_rules) # scores['mandatory_fields'] = score # issues.extend(i) # suggestions.extend(s) # except Exception as e: # logger.error(f"[Mandatory Fields] {e}", exc_info=True) # # --- 2. Standardization --- # try: # score, i, s = self._check_standardization(attributes, category_rules) # scores['standardization'] = score # issues.extend(i) # suggestions.extend(s) # except Exception as e: # logger.error(f"[Standardization] {e}", exc_info=True) # # --- 3. Missing Values --- # try: # score, i, s = self._check_missing_values(attributes, category_rules) # scores['missing_values'] = score # issues.extend(i) # suggestions.extend(s) # except Exception as e: # logger.error(f"[Missing Values] {e}", exc_info=True) # # --- 4. Consistency --- # try: # score, i = self._check_consistency(attributes, product.get('title', ''), product.get('description', '')) # scores['consistency'] = score # issues.extend(i) # except Exception as e: # logger.error(f"[Consistency] {e}", exc_info=True) # # --- 5. SEO --- # seo_result = None # if self.use_seo and self.seo_scorer: # try: # seo_result = self.seo_scorer.score_seo(product, category_rules) # scores['seo_discoverability'] = seo_result['seo_score'] # issues.extend(seo_result['issues']) # suggestions.extend(seo_result['suggestions']) # except Exception as e: # logger.error(f"[SEO] {e}", exc_info=True) # # --- 6. Content Rules --- # content_rules_result = None # if content_rules and self.content_rules_scorer: # try: # content_rules_result = self.content_rules_scorer.score_content_fields(product, content_rules) # scores['content_rules_compliance'] = content_rules_result['overall_content_score'] # issues.extend(content_rules_result['issues']) # suggestions.extend(content_rules_result['suggestions']) # except Exception as e: # logger.error(f"[Content Rules] {e}", exc_info=True) # # --- 7. Title & Description --- # title_desc_result = None # if self.title_desc_scorer: # try: # title_desc_result = self.title_desc_scorer.score_title_and_description(product, category_rules) # scores['title_quality'] = title_desc_result['title_score'] # scores['description_quality'] = title_desc_result['description_score'] # issues.extend(title_desc_result['issues']) # suggestions.extend(title_desc_result['suggestions']) # except Exception as e: # logger.error(f"[Title/Desc] {e}", exc_info=True) # # --- 8. Image Quality (always present in breakdown) --- # image_result = None # if self.image_scorer: # images = product.get("images", []) # if images: # try: # image_result = self.image_scorer.score_images(images) # scores['image_quality'] = image_result.get("overall_image_score", 0) # except Exception as e: # logger.error(f"[Image] {e}", exc_info=True) # scores['image_quality'] = None # else: # scores['image_quality'] = None # explicitly mark as unavailable # else: # scores['image_quality'] = None # # --- 9. Weight Rescaling for applicable numeric components --- # numeric_scores = {k: v for k, v in scores.items() if isinstance(v, (int, float))} # applicable_weights = {k: self.weights[k] for k in numeric_scores} # total_weight = sum(applicable_weights.values()) # final_score = sum(numeric_scores[k] * (applicable_weights[k]/total_weight) for k in numeric_scores) if numeric_scores else None # if image_result: # result['image_details'] = image_result # # AI Suggestions # if generate_ai_suggestions and self.use_ai and self.ai_service: # try: # ai_suggestions = self.ai_service.generate_comprehensive_suggestions( # product, issues, category_rules, scores # ) # result['ai_suggestions'] = ai_suggestions # except Exception as e: # logger.error(f"[AI Suggestions] {e}", exc_info=True) # result['ai_suggestions'] = {'error': str(e)} # return result # # ========== ATTRIBUTE VALIDATION METHODS (unchanged) ========== # def _check_mandatory_fields( # self, # attributes: Dict, # rules: List[Dict] # ) -> Tuple[float, List[str], List[str]]: # """Check if all mandatory fields are present and valid""" # mandatory_rules = [r for r in rules if r.get('is_mandatory', False)] # if not mandatory_rules: # return 100.0, [], [] # present_count = 0 # issues = [] # suggestions = [] # for rule in mandatory_rules: # attr_name = rule['attribute_name'] # if attr_name in attributes and attributes[attr_name]: # value = str(attributes[attr_name]).strip() # if not value: # issues.append(f"Mandatory field '{attr_name}' is empty") # suggestions.append(f"Provide a non-empty value for {attr_name}") # continue # # Check length constraints # min_len = rule.get('min_length') # max_len = rule.get('max_length') # if min_len and len(value) < min_len: # issues.append(f"'{attr_name}' too short (min: {min_len} chars)") # suggestions.append(f"Expand {attr_name} to at least {min_len} characters") # continue # if max_len and len(value) > max_len: # issues.append(f"'{attr_name}' too long (max: {max_len} chars)") # suggestions.append(f"Shorten {attr_name} to {max_len} characters or less") # continue # # Check regex pattern if provided # regex = rule.get('validation_regex') # if regex: # try: # if not re.match(regex, value): # issues.append(f"'{attr_name}' format invalid") # suggestions.append(f"Ensure {attr_name} matches required format") # continue # except re.error: # logger.warning(f"Invalid regex pattern for {attr_name}: {regex}") # present_count += 1 # else: # issues.append(f"Missing mandatory field: {attr_name}") # desc = rule.get('description', '') # if desc: # suggestions.append(f"Add {attr_name}: {desc}") # else: # suggestions.append(f"Add required attribute: {attr_name}") # score = (present_count / len(mandatory_rules)) * 100 if mandatory_rules else 100.0 # return score, issues, suggestions # def _check_standardization( # self, # attributes: Dict, # rules: List[Dict] # ) -> Tuple[float, List[str], List[str]]: # """Check if attribute values match standardized valid values""" # standardized_rules = [r for r in rules if r.get('valid_values')] # if not standardized_rules: # return 100.0, [], [] # correct_count = 0 # issues = [] # suggestions = [] # for rule in standardized_rules: # attr_name = rule['attribute_name'] # valid_values = rule['valid_values'] # if not valid_values: # continue # if attr_name not in attributes or not attributes[attr_name]: # continue # actual_value = str(attributes[attr_name]).strip() # if not actual_value: # continue # # Exact match # if actual_value in valid_values: # correct_count += 1 # continue # # Case-insensitive match # lower_valid = {v.lower(): v for v in valid_values} # if actual_value.lower() in lower_valid: # correct_count += 1 # correct_value = lower_valid[actual_value.lower()] # if actual_value != correct_value: # issues.append( # f"{attr_name}: Case mismatch - '{actual_value}' should be '{correct_value}'" # ) # suggestions.append(f"Correct capitalization of {attr_name} to: {correct_value}") # continue # # Fuzzy matching # best_match = process.extractOne(actual_value, valid_values, scorer=fuzz.ratio) # if best_match and best_match[1] >= 80: # correct_count += 1 # if best_match[1] < 100: # issues.append( # f"{attr_name}: '{actual_value}' likely means '{best_match[0]}' " # f"(confidence: {best_match[1]}%)" # ) # suggestions.append(f"Standardize {attr_name} to: {best_match[0]}") # else: # issues.append( # f"{attr_name}: '{actual_value}' not recognized. " # f"Valid: {', '.join(valid_values[:3])}" # ) # suggestions.append(f"Change {attr_name} to one of: {', '.join(valid_values[:3])}") # score = (correct_count / len(standardized_rules)) * 100 if standardized_rules else 100.0 # return score, issues, suggestions # def _check_missing_values( # self, # attributes: Dict, # rules: List[Dict] # ) -> Tuple[float, List[str], List[str]]: # """Check for placeholder values or empty attributes""" # placeholder_patterns = [ # r'^n/?a$', r'^none$', r'^null$', r'^-+$', r'^\.+$', # r'^tbd$', r'^to be determined$', r'^unknown$', r'^na$', # r'^todo$', r'^pending$', r'^\?+$', r'^xxx+$', r'^placeholder$' # ] # total_attrs = len(rules) # valid_count = 0 # issues = [] # suggestions = [] # for rule in rules: # attr_name = rule['attribute_name'] # if attr_name not in attributes: # continue # value = str(attributes[attr_name]).strip() # if not value: # issues.append(f"'{attr_name}' is empty") # suggestions.append(f"Provide a valid value for {attr_name}") # continue # value_lower = value.lower() # is_placeholder = any( # re.match(pattern, value_lower, re.IGNORECASE) # for pattern in placeholder_patterns # ) # if is_placeholder: # issues.append(f"'{attr_name}' contains placeholder: '{value}'") # suggestions.append(f"Replace placeholder in {attr_name} with actual data") # continue # # Check for suspiciously short values on mandatory fields # if rule.get('is_mandatory') and len(value) < 2: # issues.append(f"'{attr_name}' suspiciously short: '{value}'") # suggestions.append(f"Provide more detailed {attr_name}") # continue # valid_count += 1 # score = (valid_count / total_attrs) * 100 if total_attrs > 0 else 100.0 # return score, issues, suggestions # def _check_consistency( # self, # attributes: Dict, # title: str, # description: str # ) -> Tuple[float, List[str]]: # """Check if attribute values are mentioned in title/description""" # issues = [] # consistency_count = 0 # total_checks = 0 # check_attrs = ['brand', 'color', 'size', 'material', 'model', 'weight', 'dimensions'] # combined_text = f"{title} {description}".lower() # for attr in check_attrs: # if attr not in attributes or not attributes[attr]: # continue # total_checks += 1 # attr_value = str(attributes[attr]).lower().strip() # if len(attr_value) < 2: # consistency_count += 1 # continue # if attr_value in combined_text: # consistency_count += 1 # continue # words_in_attr = set(attr_value.split()) # words_in_text = set(combined_text.split()) # if words_in_attr.issubset(words_in_text): # consistency_count += 1 # continue # text_words = combined_text.split() # if text_words: # match = process.extractOne(attr_value, text_words, scorer=fuzz.ratio) # if match and match[1] >= 80: # consistency_count += 1 # continue # issues.append( # f"'{attr.title()}': '{attributes[attr]}' not mentioned in title/description" # ) # score = (consistency_count / total_checks) * 100 if total_checks > 0 else 100.0 # return score, issues # # ========== UTILITY METHODS (unchanged) ========== # def extract_attributes_from_text(self, text: str, category: str = '') -> Dict: # """Extract attributes from unstructured text using NER and patterns""" # extracted = {} # if self.nlp: # try: # doc = self.nlp(text) # orgs = [ent.text for ent in doc.ents if ent.label_ == 'ORG'] # if orgs: # extracted['brand'] = orgs[0] # quantities = [ent.text for ent in doc.ents if ent.label_ == 'QUANTITY'] # if quantities: # extracted['size'] = quantities[0] # except Exception as e: # logger.warning(f"spaCy extraction failed: {e}") # # Pattern-based extraction for colors # color_patterns = [ # 'black', 'white', 'red', 'blue', 'green', 'yellow', 'orange', # 'purple', 'pink', 'brown', 'gray', 'grey', 'silver', 'gold', # 'rose gold', 'space gray', 'navy', 'beige', 'tan' # ] # text_lower = text.lower() # for color in color_patterns: # if color in text_lower: # extracted['color'] = color.title() # break # # Use AI for enhanced extraction if available # if self.use_ai and self.ai_service and len(extracted) < 3: # try: # ai_extracted = self.ai_service.extract_attributes_with_ai( # text, '', category # ) # for key, value in ai_extracted.items(): # if key not in extracted and value: # extracted[key] = value # except Exception as e: # logger.warning(f"AI extraction failed: {e}") # return extracted # def get_score_interpretation(self, score: float) -> Dict[str, str]: # """Get human-readable interpretation of score""" # if score >= 90: # return { # 'grade': 'A', # 'status': 'Excellent', # 'color': 'green', # 'recommendation': 'Product listing is of high quality. Minor tweaks only.' # } # elif score >= 80: # return { # 'grade': 'B', # 'status': 'Good', # 'color': 'lightgreen', # 'recommendation': 'Good quality. Address minor issues to reach excellence.' # } # elif score >= 70: # return { # 'grade': 'C', # 'status': 'Fair', # 'color': 'yellow', # 'recommendation': 'Acceptable but needs improvement. Review suggestions.' # } # elif score >= 60: # return { # 'grade': 'D', # 'status': 'Poor', # 'color': 'orange', # 'recommendation': 'Significant issues found. Requires immediate attention.' # } # else: # return { # 'grade': 'F', # 'status': 'Critical', # 'color': 'red', # 'recommendation': 'Critical quality issues. Major revision needed.' # } # attribute_scorer_integrated.py """ Enhanced AttributeQualityScorer with ProductContentRule integration Consistent breakdown and component status for unavailable scores """ import re import time import logging from typing import Dict, List, Tuple from rapidfuzz import fuzz, process logger = logging.getLogger(__name__) class AttributeQualityScorer: """ Complete Product Quality Scorer with ALL components INCLUDING ProductContentRule validation: - Mandatory Fields (16%) - Standardization (12%) - Missing Values (8%) - Consistency (4%) - SEO Discoverability (8%) - Content Rules Compliance (12%) ← NEW - Title Quality (8%) - Description Quality (12%) - Image Quality (20%) """ def __init__(self, use_ai: bool = True, use_seo: bool = True): # Load spaCy model (optional, for advanced NER) self.nlp = None try: import spacy self.nlp = spacy.load("en_core_web_sm") logger.info("spaCy model loaded successfully") except Exception as e: logger.warning(f"spaCy model not loaded: {e}. Will use fallback methods.") # Initialize AI service self.use_ai = use_ai self.ai_service = None if use_ai: try: from .gemini_service import GeminiAttributeService self.ai_service = GeminiAttributeService() logger.info("Gemini AI service initialized") except Exception as e: logger.warning(f"Gemini service not available: {e}") self.use_ai = False # Initialize Image Scorer self.image_scorer = None try: from .image_scorer import ImageQualityScorer self.image_scorer = ImageQualityScorer() logger.info("Image scorer initialized") except Exception as e: logger.warning(f"Image scorer not available: {e}") # Initialize SEO scorer self.use_seo = use_seo self.seo_scorer = None if use_seo: try: from .seo_scorer import SEODiscoverabilityScorer self.seo_scorer = SEODiscoverabilityScorer() logger.info("SEO scorer initialized") except Exception as e: logger.warning(f"SEO scorer not available: {e}") self.use_seo = False # Initialize Title/Description scorer self.title_desc_scorer = None try: from .title_description_scorer import TitleDescriptionScorer self.title_desc_scorer = TitleDescriptionScorer(use_ai=use_ai) logger.info("Title/Description scorer initialized") except Exception as e: logger.warning(f"Title/Description scorer not available: {e}") # Initialize Content Rules scorer ← NEW self.content_rules_scorer = None try: from .content_rules_scorer import ContentRulesScorer self.content_rules_scorer = ContentRulesScorer() logger.info("Content Rules scorer initialized") except Exception as e: logger.warning(f"Content Rules scorer not available: {e}") # UPDATED WEIGHTS (Total = 100%) self.weights = { 'mandatory_fields': 0.16, 'standardization': 0.12, 'missing_values': 0.08, 'consistency': 0.04, 'seo_discoverability': 0.08, 'content_rules_compliance': 0.12, 'title_quality': 0.08, 'description_quality': 0.12, 'image_quality': 0.20 } def score_product( self, product: Dict, category_rules: List[Dict], content_rules: List[Dict] = None, generate_ai_suggestions: bool = True ) -> Dict: """ Complete product scoring with all components including ProductContentRule validation and consistent breakdown """ start_time = time.time() attributes = product.get('attributes', {}) category = product.get('category', '') # Initialize scores and status scores = {k: None for k in self.weights} component_status = {k: "Not evaluated yet" for k in self.weights} issues = [] suggestions = [] # --- 1. Mandatory Fields --- try: score, comp_issues, comp_suggestions = self._check_mandatory_fields(attributes, category_rules) scores['mandatory_fields'] = score issues.extend(comp_issues) suggestions.extend(comp_suggestions) component_status['mandatory_fields'] = "Scored successfully" except Exception as e: logger.error(f"[Mandatory Fields] {e}", exc_info=True) component_status['mandatory_fields'] = f"Error: {str(e)}" # --- 2. Standardization --- try: score, comp_issues, comp_suggestions = self._check_standardization(attributes, category_rules) scores['standardization'] = score issues.extend(comp_issues) suggestions.extend(comp_suggestions) component_status['standardization'] = "Scored successfully" except Exception as e: logger.error(f"[Standardization] {e}", exc_info=True) component_status['standardization'] = f"Error: {str(e)}" # --- 3. Missing Values --- try: score, comp_issues, comp_suggestions = self._check_missing_values(attributes, category_rules) scores['missing_values'] = score issues.extend(comp_issues) suggestions.extend(comp_suggestions) component_status['missing_values'] = "Scored successfully" except Exception as e: logger.error(f"[Missing Values] {e}", exc_info=True) component_status['missing_values'] = f"Error: {str(e)}" # --- 4. Consistency --- try: score, comp_issues = self._check_consistency(attributes, product.get('title', ''), product.get('description', '')) scores['consistency'] = score issues.extend(comp_issues) component_status['consistency'] = "Scored successfully" except Exception as e: logger.error(f"[Consistency] {e}", exc_info=True) component_status['consistency'] = f"Error: {str(e)}" # --- 5. SEO Discoverability --- if self.use_seo and self.seo_scorer: try: seo_result = self.seo_scorer.score_seo(product, category_rules) scores['seo_discoverability'] = seo_result.get('seo_score', None) issues.extend(seo_result.get('issues', [])) suggestions.extend(seo_result.get('suggestions', [])) component_status['seo_discoverability'] = "Scored successfully" except Exception as e: logger.error(f"[SEO] {e}", exc_info=True) component_status['seo_discoverability'] = f"Error: {str(e)}" else: component_status['seo_discoverability'] = "SEO scorer not available" # --- 6. Content Rules Compliance --- if content_rules and self.content_rules_scorer: try: content_result = self.content_rules_scorer.score_content_fields(product, content_rules) scores['content_rules_compliance'] = content_result.get('overall_content_score', None) issues.extend(content_result.get('issues', [])) suggestions.extend(content_result.get('suggestions', [])) component_status['content_rules_compliance'] = "Scored successfully" except Exception as e: logger.error(f"[Content Rules] {e}", exc_info=True) component_status['content_rules_compliance'] = f"Error: {str(e)}" else: scores['content_rules_compliance'] = None component_status['content_rules_compliance'] = "No content rules provided" # --- 7. Title & Description --- if self.title_desc_scorer: try: td_result = self.title_desc_scorer.score_title_and_description(product, category_rules) scores['title_quality'] = td_result.get('title_score', None) scores['description_quality'] = td_result.get('description_score', None) issues.extend(td_result.get('issues', [])) suggestions.extend(td_result.get('suggestions', [])) component_status['title_quality'] = "Scored successfully" component_status['description_quality'] = "Scored successfully" except Exception as e: logger.error(f"[Title/Description] {e}", exc_info=True) component_status['title_quality'] = f"Error: {str(e)}" component_status['description_quality'] = f"Error: {str(e)}" else: component_status['title_quality'] = "Title/Description scorer not available" component_status['description_quality'] = "Title/Description scorer not available" # --- 8. Image Quality --- if self.image_scorer: try: images = product.get('images', []) if images: img_result = self.image_scorer.score_images(images) scores['image_quality'] = img_result.get("overall_image_score", None) component_status['image_quality'] = "Scored successfully" else: scores['image_quality'] = None component_status['image_quality'] = "No images provided" except Exception as e: logger.error(f"[Image] {e}", exc_info=True) scores['image_quality'] = None component_status['image_quality'] = f"Error: {str(e)}" else: scores['image_quality'] = None component_status['image_quality'] = "Image scorer not available" # --- Final Score Calculation: use only numeric scores --- numeric_scores = {k: v for k, v in scores.items() if isinstance(v, (int, float))} applicable_weights = {k: self.weights[k] for k in numeric_scores} total_weight = sum(applicable_weights.values()) if numeric_scores and total_weight > 0: final_score = sum(numeric_scores[k] * (applicable_weights[k] / total_weight) for k in numeric_scores) else: final_score = None result = { 'final_score': round(final_score, 2) if final_score is not None else None, 'max_score': 100.0, 'breakdown': scores, 'component_status': component_status, 'issues': issues, 'suggestions': suggestions, 'weights': self.weights, 'processing_time': round(time.time() - start_time, 3) } # AI Suggestions if generate_ai_suggestions and self.use_ai and self.ai_service: try: ai_suggestions = self.ai_service.generate_comprehensive_suggestions(product, issues, category_rules, scores) result['ai_suggestions'] = ai_suggestions except Exception as e: logger.error(f"[AI Suggestions] {e}", exc_info=True) result['ai_suggestions'] = {'error': str(e)} return result # ================= ATTRIBUTE CHECK METHODS (unchanged) ================= def _check_mandatory_fields(self, attributes: Dict, rules: List[Dict]) -> Tuple[float, List[str], List[str]]: mandatory_rules = [r for r in rules if r.get('is_mandatory', False)] if not mandatory_rules: return 100.0, [], [] present_count, issues, suggestions = 0, [], [] for rule in mandatory_rules: attr_name = rule['attribute_name'] if attr_name in attributes and attributes[attr_name]: value = str(attributes[attr_name]).strip() if not value: issues.append(f"Mandatory field '{attr_name}' is empty") suggestions.append(f"Provide a non-empty value for {attr_name}") continue min_len = rule.get('min_length') max_len = rule.get('max_length') if min_len and len(value) < min_len: issues.append(f"'{attr_name}' too short (min: {min_len} chars)") suggestions.append(f"Expand {attr_name} to at least {min_len} characters") continue if max_len and len(value) > max_len: issues.append(f"'{attr_name}' too long (max: {max_len} chars)") suggestions.append(f"Shorten {attr_name} to {max_len} characters or less") continue regex = rule.get('validation_regex') if regex: try: if not re.match(regex, value): issues.append(f"'{attr_name}' format invalid") suggestions.append(f"Ensure {attr_name} matches required format") continue except re.error: logger.warning(f"Invalid regex pattern for {attr_name}: {regex}") present_count += 1 else: issues.append(f"Missing mandatory field: {attr_name}") desc = rule.get('description', '') suggestions.append(f"Add {attr_name}: {desc}" if desc else f"Add required attribute: {attr_name}") score = (present_count / len(mandatory_rules)) * 100 if mandatory_rules else 100.0 return score, issues, suggestions def _check_standardization(self, attributes: Dict, rules: List[Dict]) -> Tuple[float, List[str], List[str]]: standardized_rules = [r for r in rules if r.get('valid_values')] if not standardized_rules: return 100.0, [], [] correct_count, issues, suggestions = 0, [], [] for rule in standardized_rules: attr_name = rule['attribute_name'] valid_values = rule['valid_values'] if not valid_values or attr_name not in attributes or not attributes[attr_name]: continue actual_value = str(attributes[attr_name]).strip() if actual_value in valid_values: correct_count += 1 continue lower_valid = {v.lower(): v for v in valid_values} if actual_value.lower() in lower_valid: correct_count += 1 correct_value = lower_valid[actual_value.lower()] if actual_value != correct_value: issues.append(f"{attr_name}: Case mismatch - '{actual_value}' should be '{correct_value}'") suggestions.append(f"Correct capitalization of {attr_name} to: {correct_value}") continue best_match = process.extractOne(actual_value, valid_values, scorer=fuzz.ratio) if best_match and best_match[1] >= 80: correct_count += 1 if best_match[1] < 100: issues.append(f"{attr_name}: '{actual_value}' likely means '{best_match[0]}' (confidence: {best_match[1]}%)") suggestions.append(f"Standardize {attr_name} to: {best_match[0]}") else: issues.append(f"{attr_name}: '{actual_value}' not recognized. Valid: {', '.join(valid_values[:3])}") suggestions.append(f"Change {attr_name} to one of: {', '.join(valid_values[:3])}") score = (correct_count / len(standardized_rules)) * 100 if standardized_rules else 100.0 return score, issues, suggestions def _check_missing_values(self, attributes: Dict, rules: List[Dict]) -> Tuple[float, List[str], List[str]]: placeholder_patterns = [r'^n/?a$', r'^none$', r'^null$', r'^-+$', r'^\.+$', r'^tbd$', r'^to be determined$', r'^unknown$', r'^na$', r'^todo$', r'^pending$', r'^\?+$', r'^xxx+$', r'^placeholder$'] total_attrs, valid_count, issues, suggestions = len(rules), 0, [], [] for rule in rules: attr_name = rule['attribute_name'] if attr_name not in attributes: continue value = str(attributes[attr_name]).strip() if not value: issues.append(f"'{attr_name}' is empty") suggestions.append(f"Provide a valid value for {attr_name}") continue value_lower = value.lower() is_placeholder = any(re.match(pattern, value_lower, re.IGNORECASE) for pattern in placeholder_patterns) if is_placeholder: issues.append(f"'{attr_name}' contains placeholder: '{value}'") suggestions.append(f"Replace placeholder in {attr_name} with actual data") continue if rule.get('is_mandatory') and len(value) < 2: issues.append(f"'{attr_name}' suspiciously short: '{value}'") suggestions.append(f"Provide more detailed {attr_name}") continue valid_count += 1 score = (valid_count / total_attrs) * 100 if total_attrs > 0 else 100.0 return score, issues, suggestions def _check_consistency(self, attributes: Dict, title: str, description: str) -> Tuple[float, List[str]]: issues, consistency_count, total_checks = [], 0, 0 check_attrs = ['brand', 'color', 'size', 'material', 'model', 'weight', 'dimensions'] combined_text = f"{title} {description}".lower() for attr in check_attrs: if attr not in attributes or not attributes[attr]: continue total_checks += 1 attr_value = str(attributes[attr]).lower().strip() if len(attr_value) < 2 or attr_value in combined_text: consistency_count += 1 continue words_in_attr = set(attr_value.split()) words_in_text = set(combined_text.split()) if words_in_attr.issubset(words_in_text): consistency_count += 1 continue text_words = combined_text.split() if text_words: match = process.extractOne(attr_value, text_words, scorer=fuzz.ratio) if match and match[1] >= 80: consistency_count += 1 continue issues.append(f"'{attr.title()}': '{attributes[attr]}' not mentioned in title/description") score = (consistency_count / total_checks) * 100 if total_checks > 0 else 100.0 return score, issues