123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206 |
- # # attribute_scorer_integrated.py
- # """
- # Enhanced AttributeQualityScorer with Title/Description scoring integrated
- # """
- # import re # <-- ADDED: Required for regex patterns
- # import time
- # import logging
- # from typing import Dict, List, Tuple
- # from rapidfuzz import fuzz, process
- # from collections import defaultdict
- # logger = logging.getLogger(__name__)
- # class AttributeQualityScorer:
- # """
- # Complete Product Quality Scorer with ALL components:
- # - Mandatory Fields (25%)
- # - Standardization (20%)
- # - Missing Values (13%)
- # - Consistency (7%)
- # - SEO Discoverability (10%)
- # - Title Quality (10%)
- # - Description Quality (15%)
- # """
-
- # def __init__(self, use_ai: bool = True, use_seo: bool = True):
- # # Load spaCy model (optional, for advanced NER)
- # self.nlp = None
- # try:
- # import spacy
- # self.nlp = spacy.load("en_core_web_sm")
- # logger.info("spaCy model loaded successfully")
- # except Exception as e:
- # logger.warning(f"spaCy model not loaded: {e}. Will use fallback methods.")
-
- # # Initialize AI service
- # self.use_ai = use_ai
- # self.ai_service = None
- # if use_ai:
- # try:
- # from .gemini_service import GeminiAttributeService
- # self.ai_service = GeminiAttributeService()
- # logger.info("Gemini AI service initialized")
- # except Exception as e:
- # logger.warning(f"Gemini service not available: {e}")
- # self.use_ai = False
-
- # # Initialize SEO scorer
- # self.use_seo = use_seo
- # self.seo_scorer = None
- # if use_seo:
- # try:
- # from .seo_scorer import SEODiscoverabilityScorer
- # self.seo_scorer = SEODiscoverabilityScorer()
- # logger.info("SEO scorer initialized")
- # except Exception as e:
- # logger.warning(f"SEO scorer not available: {e}")
- # self.use_seo = False
-
- # # Initialize Title/Description scorer
- # self.title_desc_scorer = None
- # try:
- # from .title_description_scorer import TitleDescriptionScorer
- # self.title_desc_scorer = TitleDescriptionScorer(use_ai=use_ai)
- # logger.info("Title/Description scorer initialized")
- # except Exception as e:
- # logger.warning(f"Title/Description scorer not available: {e}")
-
- # # UPDATED WEIGHTS (Total = 100%)
- # self.weights = {
- # 'mandatory_fields': 0.25, # 25%
- # 'standardization': 0.20, # 20%
- # 'missing_values': 0.13, # 13%
- # 'consistency': 0.07, # 7%
- # 'seo_discoverability': 0.10, # 10%
- # 'title_quality': 0.10, # 10% (NEW)
- # 'description_quality': 0.15 # 15% (NEW)
- # }
-
- # def score_product(
- # self,
- # product: Dict,
- # category_rules: List[Dict],
- # generate_ai_suggestions: bool = True
- # ) -> Dict:
- # """
- # Complete product scoring with all components
-
- # Args:
- # product: Product dict with sku, category, title, description, attributes
- # category_rules: List of CategoryAttributeRule dicts
- # generate_ai_suggestions: Whether to call AI for suggestions
-
- # Returns:
- # Dict with final_score, breakdown, issues, suggestions, etc.
- # """
- # start_time = time.time()
- # attributes = product.get('attributes', {})
- # category = product.get('category', '')
-
- # # Initialize scores
- # scores = {
- # 'mandatory_fields': 0,
- # 'standardization': 0,
- # 'missing_values': 0,
- # 'consistency': 0,
- # 'seo_discoverability': 0,
- # 'title_quality': 0,
- # 'description_quality': 0
- # }
-
- # issues = []
- # suggestions = []
-
- # # --- 1. Mandatory Fields (25%) ---
- # try:
- # mandatory_score, mandatory_issues, mandatory_suggestions = self._check_mandatory_fields(
- # attributes, category_rules
- # )
- # scores['mandatory_fields'] = mandatory_score
- # issues.extend(mandatory_issues)
- # suggestions.extend(mandatory_suggestions)
- # except Exception as e:
- # logger.error(f"[Mandatory Fields] Error: {e}", exc_info=True)
- # scores['mandatory_fields'] = 0
-
- # # --- 2. Standardization (20%) ---
- # try:
- # std_score, std_issues, std_suggestions = self._check_standardization(
- # attributes, category_rules
- # )
- # scores['standardization'] = std_score
- # issues.extend(std_issues)
- # suggestions.extend(std_suggestions)
- # except Exception as e:
- # logger.error(f"[Standardization] Error: {e}", exc_info=True)
- # scores['standardization'] = 0
-
- # # --- 3. Missing Values (13%) ---
- # try:
- # missing_score, missing_issues, missing_suggestions = self._check_missing_values(
- # attributes, category_rules
- # )
- # scores['missing_values'] = missing_score
- # issues.extend(missing_issues)
- # suggestions.extend(missing_suggestions)
- # except Exception as e:
- # logger.error(f"[Missing Values] Error: {e}", exc_info=True)
- # scores['missing_values'] = 0
-
- # # --- 4. Consistency (7%) ---
- # try:
- # consistency_score, consistency_issues = self._check_consistency(
- # attributes,
- # product.get('title', ''),
- # product.get('description', '')
- # )
- # scores['consistency'] = consistency_score
- # issues.extend(consistency_issues)
- # except Exception as e:
- # logger.error(f"[Consistency] Error: {e}", exc_info=True)
- # scores['consistency'] = 0
-
- # # --- 5. SEO Discoverability (10%) ---
- # seo_result = None
- # if self.use_seo and self.seo_scorer:
- # try:
- # seo_result = self.seo_scorer.score_seo(product, category_rules)
- # scores['seo_discoverability'] = seo_result['seo_score']
- # issues.extend(seo_result['issues'])
- # suggestions.extend(seo_result['suggestions'])
- # except Exception as e:
- # logger.error(f"[SEO Scoring] Error: {e}", exc_info=True)
- # scores['seo_discoverability'] = 0
-
- # # --- 6. Title & Description Quality (25% combined) ---
- # title_desc_result = None
- # if self.title_desc_scorer:
- # try:
- # title_desc_result = self.title_desc_scorer.score_title_and_description(
- # product,
- # category_rules
- # )
- # scores['title_quality'] = title_desc_result['title_score']
- # scores['description_quality'] = title_desc_result['description_score']
- # issues.extend(title_desc_result['issues'])
- # suggestions.extend(title_desc_result['suggestions'])
- # except Exception as e:
- # logger.error(f"[Title/Description Scoring] Error: {e}", exc_info=True)
- # scores['title_quality'] = 0
- # scores['description_quality'] = 0
-
- # # --- Final Score Calculation ---
- # final_score = sum(scores[key] * self.weights[key] for key in scores)
-
- # result = {
- # 'final_score': round(final_score, 2),
- # 'max_score': 100.0,
- # 'breakdown': scores,
- # 'issues': issues,
- # 'suggestions': suggestions,
- # 'weights': self.weights,
- # 'processing_time': round(time.time() - start_time, 3)
- # }
-
- # # Add component-specific details
- # if seo_result:
- # result['seo_details'] = {
- # 'breakdown': seo_result['breakdown'],
- # 'extracted_keywords': seo_result.get('extracted_keywords', []),
- # 'missing_high_value_terms': seo_result.get('missing_high_value_terms', [])
- # }
-
- # if title_desc_result:
- # result['title_description_details'] = {
- # 'title_breakdown': title_desc_result.get('title_breakdown', {}),
- # 'description_breakdown': title_desc_result.get('description_breakdown', {}),
- # 'ai_improvements': title_desc_result.get('ai_improvements')
- # }
-
- # # --- AI Suggestions (Comprehensive) ---
- # if generate_ai_suggestions and self.use_ai and self.ai_service:
- # try:
- # logger.info(f"Generating comprehensive AI suggestions for SKU: {product.get('sku')}")
- # ai_suggestions = self.ai_service.generate_comprehensive_suggestions(
- # product,
- # issues,
- # category_rules,
- # scores
- # )
- # result['ai_suggestions'] = ai_suggestions
- # except Exception as e:
- # logger.error(f"[AI Suggestions] Error: {e}", exc_info=True)
- # result['ai_suggestions'] = {'error': str(e)}
-
- # return result
-
- # # ========== ATTRIBUTE VALIDATION METHODS ==========
-
- # def _check_mandatory_fields(
- # self,
- # attributes: Dict,
- # rules: List[Dict]
- # ) -> Tuple[float, List[str], List[str]]:
- # """
- # Check if all mandatory fields are present and valid
-
- # Returns:
- # Tuple of (score, issues, suggestions)
- # """
- # mandatory_rules = [r for r in rules if r.get('is_mandatory', False)]
-
- # if not mandatory_rules:
- # return 100.0, [], []
-
- # present_count = 0
- # issues = []
- # suggestions = []
-
- # for rule in mandatory_rules:
- # attr_name = rule['attribute_name']
-
- # if attr_name in attributes and attributes[attr_name]:
- # value = str(attributes[attr_name]).strip()
-
- # if not value:
- # issues.append(f"Mandatory field '{attr_name}' is empty")
- # suggestions.append(f"Provide a non-empty value for {attr_name}")
- # continue
-
- # # Check length constraints
- # min_len = rule.get('min_length')
- # max_len = rule.get('max_length')
-
- # if min_len and len(value) < min_len:
- # issues.append(f"'{attr_name}' too short (min: {min_len} chars)")
- # suggestions.append(f"Expand {attr_name} to at least {min_len} characters")
- # continue
-
- # if max_len and len(value) > max_len:
- # issues.append(f"'{attr_name}' too long (max: {max_len} chars)")
- # suggestions.append(f"Shorten {attr_name} to {max_len} characters or less")
- # continue
-
- # # Check regex pattern if provided
- # regex = rule.get('validation_regex')
- # if regex:
- # try:
- # if not re.match(regex, value):
- # issues.append(f"'{attr_name}' format invalid")
- # suggestions.append(f"Ensure {attr_name} matches required format")
- # continue
- # except re.error:
- # logger.warning(f"Invalid regex pattern for {attr_name}: {regex}")
-
- # present_count += 1
- # else:
- # issues.append(f"Missing mandatory field: {attr_name}")
- # desc = rule.get('description', '')
- # if desc:
- # suggestions.append(f"Add {attr_name}: {desc}")
- # else:
- # suggestions.append(f"Add required attribute: {attr_name}")
-
- # score = (present_count / len(mandatory_rules)) * 100 if mandatory_rules else 100.0
- # return score, issues, suggestions
-
- # def _check_standardization(
- # self,
- # attributes: Dict,
- # rules: List[Dict]
- # ) -> Tuple[float, List[str], List[str]]:
- # """
- # Check if attribute values match standardized valid values
-
- # Returns:
- # Tuple of (score, issues, suggestions)
- # """
- # standardized_rules = [r for r in rules if r.get('valid_values')]
-
- # if not standardized_rules:
- # return 100.0, [], []
-
- # correct_count = 0
- # issues = []
- # suggestions = []
-
- # for rule in standardized_rules:
- # attr_name = rule['attribute_name']
- # valid_values = rule['valid_values']
-
- # if not valid_values: # Skip if empty valid_values list
- # continue
-
- # if attr_name not in attributes or not attributes[attr_name]:
- # continue
-
- # actual_value = str(attributes[attr_name]).strip()
-
- # if not actual_value:
- # continue
-
- # # Exact match
- # if actual_value in valid_values:
- # correct_count += 1
- # continue
-
- # # Case-insensitive match
- # lower_valid = {v.lower(): v for v in valid_values}
- # if actual_value.lower() in lower_valid:
- # correct_count += 1
- # correct_value = lower_valid[actual_value.lower()]
- # if actual_value != correct_value:
- # issues.append(
- # f"{attr_name}: Case mismatch - '{actual_value}' should be '{correct_value}'"
- # )
- # suggestions.append(f"Correct capitalization of {attr_name} to: {correct_value}")
- # continue
-
- # # Fuzzy matching
- # best_match = process.extractOne(actual_value, valid_values, scorer=fuzz.ratio)
-
- # if best_match and best_match[1] >= 80:
- # correct_count += 1
- # if best_match[1] < 100:
- # issues.append(
- # f"{attr_name}: '{actual_value}' likely means '{best_match[0]}' "
- # f"(confidence: {best_match[1]}%)"
- # )
- # suggestions.append(f"Standardize {attr_name} to: {best_match[0]}")
- # else:
- # issues.append(
- # f"{attr_name}: '{actual_value}' not recognized. "
- # f"Valid: {', '.join(valid_values[:3])}"
- # )
- # suggestions.append(f"Change {attr_name} to one of: {', '.join(valid_values[:3])}")
-
- # score = (correct_count / len(standardized_rules)) * 100 if standardized_rules else 100.0
- # return score, issues, suggestions
-
- # def _check_missing_values(
- # self,
- # attributes: Dict,
- # rules: List[Dict]
- # ) -> Tuple[float, List[str], List[str]]:
- # """
- # Check for placeholder values or empty attributes
-
- # Returns:
- # Tuple of (score, issues, suggestions)
- # """
- # placeholder_patterns = [
- # r'^n/?a$', r'^none$', r'^null$', r'^-+$', r'^\.+$',
- # r'^tbd$', r'^to be determined$', r'^unknown$', r'^na$',
- # r'^todo$', r'^pending$', r'^\?+$', r'^xxx+$', r'^placeholder$'
- # ]
-
- # total_attrs = len(rules)
- # valid_count = 0
- # issues = []
- # suggestions = []
-
- # for rule in rules:
- # attr_name = rule['attribute_name']
-
- # if attr_name not in attributes:
- # continue
-
- # value = str(attributes[attr_name]).strip()
-
- # if not value:
- # issues.append(f"'{attr_name}' is empty")
- # suggestions.append(f"Provide a valid value for {attr_name}")
- # continue
-
- # value_lower = value.lower()
- # is_placeholder = any(
- # re.match(pattern, value_lower, re.IGNORECASE)
- # for pattern in placeholder_patterns
- # )
-
- # if is_placeholder:
- # issues.append(f"'{attr_name}' contains placeholder: '{value}'")
- # suggestions.append(f"Replace placeholder in {attr_name} with actual data")
- # continue
-
- # # Check for suspiciously short values on mandatory fields
- # if rule.get('is_mandatory') and len(value) < 2:
- # issues.append(f"'{attr_name}' suspiciously short: '{value}'")
- # suggestions.append(f"Provide more detailed {attr_name}")
- # continue
-
- # valid_count += 1
-
- # score = (valid_count / total_attrs) * 100 if total_attrs > 0 else 100.0
- # return score, issues, suggestions
-
- # def _check_consistency(
- # self,
- # attributes: Dict,
- # title: str,
- # description: str
- # ) -> Tuple[float, List[str]]:
- # """
- # Check if attribute values are mentioned in title/description
-
- # Returns:
- # Tuple of (score, issues)
- # """
- # issues = []
- # consistency_count = 0
- # total_checks = 0
-
- # check_attrs = ['brand', 'color', 'size', 'material', 'model', 'weight', 'dimensions']
- # combined_text = f"{title} {description}".lower()
-
- # for attr in check_attrs:
- # if attr not in attributes or not attributes[attr]:
- # continue
-
- # total_checks += 1
- # attr_value = str(attributes[attr]).lower().strip()
-
- # # Skip very short values (likely abbreviations or numbers)
- # if len(attr_value) < 2:
- # consistency_count += 1
- # continue
-
- # # Direct substring match
- # if attr_value in combined_text:
- # consistency_count += 1
- # continue
-
- # # Word boundary match (for multi-word attributes)
- # words_in_attr = set(attr_value.split())
- # words_in_text = set(combined_text.split())
-
- # if words_in_attr.issubset(words_in_text):
- # consistency_count += 1
- # continue
-
- # # Fuzzy matching as last resort
- # text_words = combined_text.split()
- # if text_words:
- # match = process.extractOne(attr_value, text_words, scorer=fuzz.ratio)
-
- # if match and match[1] >= 80:
- # consistency_count += 1
- # continue
-
- # # If we get here, attribute is not mentioned
- # issues.append(
- # f"'{attr.title()}': '{attributes[attr]}' not mentioned in title/description"
- # )
-
- # score = (consistency_count / total_checks) * 100 if total_checks > 0 else 100.0
- # return score, issues
-
- # # ========== UTILITY METHODS ==========
-
- # def extract_attributes_from_text(self, text: str, category: str = '') -> Dict:
- # """
- # Extract attributes from unstructured text using NER and patterns
-
- # Args:
- # text: Product title or description
- # category: Product category (optional, for context)
-
- # Returns:
- # Dict of extracted attributes
- # """
- # extracted = {}
-
- # # Try spaCy NER if available
- # if self.nlp:
- # try:
- # doc = self.nlp(text)
-
- # # Extract organizations as potential brands
- # orgs = [ent.text for ent in doc.ents if ent.label_ == 'ORG']
- # if orgs:
- # extracted['brand'] = orgs[0]
-
- # # Extract quantities as potential sizes
- # quantities = [ent.text for ent in doc.ents if ent.label_ == 'QUANTITY']
- # if quantities:
- # extracted['size'] = quantities[0]
-
- # except Exception as e:
- # logger.warning(f"spaCy extraction failed: {e}")
-
- # # Pattern-based extraction for colors
- # color_patterns = [
- # 'black', 'white', 'red', 'blue', 'green', 'yellow', 'orange',
- # 'purple', 'pink', 'brown', 'gray', 'grey', 'silver', 'gold',
- # 'rose gold', 'space gray', 'navy', 'beige', 'tan'
- # ]
- # text_lower = text.lower()
- # for color in color_patterns:
- # if color in text_lower:
- # extracted['color'] = color.title()
- # break
-
- # # Use AI for enhanced extraction if available
- # if self.use_ai and self.ai_service and len(extracted) < 3:
- # try:
- # ai_extracted = self.ai_service.extract_attributes_with_ai(
- # text, '', category
- # )
- # # Merge AI results (don't override existing)
- # for key, value in ai_extracted.items():
- # if key not in extracted and value:
- # extracted[key] = value
- # except Exception as e:
- # logger.warning(f"AI extraction failed: {e}")
-
- # return extracted
-
- # def get_score_interpretation(self, score: float) -> Dict[str, str]:
- # """
- # Get human-readable interpretation of score
-
- # Args:
- # score: Numeric score (0-100)
-
- # Returns:
- # Dict with grade, status, and recommendation
- # """
- # if score >= 90:
- # return {
- # 'grade': 'A',
- # 'status': 'Excellent',
- # 'color': 'green',
- # 'recommendation': 'Product listing is of high quality. Minor tweaks only.'
- # }
- # elif score >= 80:
- # return {
- # 'grade': 'B',
- # 'status': 'Good',
- # 'color': 'lightgreen',
- # 'recommendation': 'Good quality. Address minor issues to reach excellence.'
- # }
- # elif score >= 70:
- # return {
- # 'grade': 'C',
- # 'status': 'Fair',
- # 'color': 'yellow',
- # 'recommendation': 'Acceptable but needs improvement. Review suggestions.'
- # }
- # elif score >= 60:
- # return {
- # 'grade': 'D',
- # 'status': 'Poor',
- # 'color': 'orange',
- # 'recommendation': 'Significant issues found. Requires immediate attention.'
- # }
- # else:
- # return {
- # 'grade': 'F',
- # 'status': 'Critical',
- # 'color': 'red',
- # 'recommendation': 'Critical quality issues. Major revision needed.'
- # }
- # attribute_scorer_integrated.py
- """
- Enhanced AttributeQualityScorer with ProductContentRule integration
- """
- import re
- import time
- import logging
- from typing import Dict, List, Tuple
- from rapidfuzz import fuzz, process
- from collections import defaultdict
- logger = logging.getLogger(__name__)
- class AttributeQualityScorer:
- """
- Complete Product Quality Scorer with ALL components INCLUDING ProductContentRule validation:
- - Mandatory Fields (20%)
- - Standardization (15%)
- - Missing Values (10%)
- - Consistency (5%)
- - SEO Discoverability (10%)
- - Content Rules Compliance (15%) ← NEW: Validates against ProductContentRule
- - Title Quality (10%)
- - Description Quality (15%)
- """
-
- def __init__(self, use_ai: bool = True, use_seo: bool = True):
- # Load spaCy model (optional, for advanced NER)
- self.nlp = None
- try:
- import spacy
- self.nlp = spacy.load("en_core_web_sm")
- logger.info("spaCy model loaded successfully")
- except Exception as e:
- logger.warning(f"spaCy model not loaded: {e}. Will use fallback methods.")
-
- # Initialize AI service
- self.use_ai = use_ai
- self.ai_service = None
- if use_ai:
- try:
- from .gemini_service import GeminiAttributeService
- self.ai_service = GeminiAttributeService()
- logger.info("Gemini AI service initialized")
- except Exception as e:
- logger.warning(f"Gemini service not available: {e}")
- self.use_ai = False
-
- # Initialize SEO scorer
- self.use_seo = use_seo
- self.seo_scorer = None
- if use_seo:
- try:
- from .seo_scorer import SEODiscoverabilityScorer
- self.seo_scorer = SEODiscoverabilityScorer()
- logger.info("SEO scorer initialized")
- except Exception as e:
- logger.warning(f"SEO scorer not available: {e}")
- self.use_seo = False
-
- # Initialize Title/Description scorer
- self.title_desc_scorer = None
- try:
- from .title_description_scorer import TitleDescriptionScorer
- self.title_desc_scorer = TitleDescriptionScorer(use_ai=use_ai)
- logger.info("Title/Description scorer initialized")
- except Exception as e:
- logger.warning(f"Title/Description scorer not available: {e}")
-
- # Initialize Content Rules scorer ← NEW
- self.content_rules_scorer = None
- try:
- from .content_rules_scorer import ContentRulesScorer
- self.content_rules_scorer = ContentRulesScorer()
- logger.info("Content Rules scorer initialized")
- except Exception as e:
- logger.warning(f"Content Rules scorer not available: {e}")
-
- # UPDATED WEIGHTS (Total = 100%)
- self.weights = {
- 'mandatory_fields': 0.20, # 20% (reduced from 25%)
- 'standardization': 0.15, # 15% (reduced from 20%)
- 'missing_values': 0.10, # 10% (reduced from 13%)
- 'consistency': 0.05, # 5% (reduced from 7%)
- 'seo_discoverability': 0.10, # 10%
- 'content_rules_compliance': 0.15, # 15% ← NEW: ProductContentRule validation
- 'title_quality': 0.10, # 10%
- 'description_quality': 0.15 # 15%
- }
-
- def score_product(
- self,
- product: Dict,
- category_rules: List[Dict],
- content_rules: List[Dict] = None, # ← NEW parameter
- generate_ai_suggestions: bool = True
- ) -> Dict:
- """
- Complete product scoring with all components including ProductContentRule validation
-
- Args:
- product: Product dict with sku, category, title, description, attributes
- category_rules: List of CategoryAttributeRule dicts
- content_rules: List of ProductContentRule dicts (NEW)
- generate_ai_suggestions: Whether to call AI for suggestions
-
- Returns:
- Dict with final_score, breakdown, issues, suggestions, etc.
- """
- start_time = time.time()
- attributes = product.get('attributes', {})
- category = product.get('category', '')
-
- # Initialize scores
- scores = {
- 'mandatory_fields': 0,
- 'standardization': 0,
- 'missing_values': 0,
- 'consistency': 0,
- 'seo_discoverability': 0,
- 'content_rules_compliance': 0, # ← NEW
- 'title_quality': 0,
- 'description_quality': 0
- }
-
- issues = []
- suggestions = []
-
- # --- 1. Mandatory Fields (20%) ---
- try:
- mandatory_score, mandatory_issues, mandatory_suggestions = self._check_mandatory_fields(
- attributes, category_rules
- )
- scores['mandatory_fields'] = mandatory_score
- issues.extend(mandatory_issues)
- suggestions.extend(mandatory_suggestions)
- except Exception as e:
- logger.error(f"[Mandatory Fields] Error: {e}", exc_info=True)
- scores['mandatory_fields'] = 0
-
- # --- 2. Standardization (15%) ---
- try:
- std_score, std_issues, std_suggestions = self._check_standardization(
- attributes, category_rules
- )
- scores['standardization'] = std_score
- issues.extend(std_issues)
- suggestions.extend(std_suggestions)
- except Exception as e:
- logger.error(f"[Standardization] Error: {e}", exc_info=True)
- scores['standardization'] = 0
-
- # --- 3. Missing Values (10%) ---
- try:
- missing_score, missing_issues, missing_suggestions = self._check_missing_values(
- attributes, category_rules
- )
- scores['missing_values'] = missing_score
- issues.extend(missing_issues)
- suggestions.extend(missing_suggestions)
- except Exception as e:
- logger.error(f"[Missing Values] Error: {e}", exc_info=True)
- scores['missing_values'] = 0
-
- # --- 4. Consistency (5%) ---
- try:
- consistency_score, consistency_issues = self._check_consistency(
- attributes,
- product.get('title', ''),
- product.get('description', '')
- )
- scores['consistency'] = consistency_score
- issues.extend(consistency_issues)
- except Exception as e:
- logger.error(f"[Consistency] Error: {e}", exc_info=True)
- scores['consistency'] = 0
-
- # --- 5. SEO Discoverability (10%) ---
- seo_result = None
- if self.use_seo and self.seo_scorer:
- try:
- seo_result = self.seo_scorer.score_seo(product, category_rules)
- scores['seo_discoverability'] = seo_result['seo_score']
- issues.extend(seo_result['issues'])
- suggestions.extend(seo_result['suggestions'])
- except Exception as e:
- logger.error(f"[SEO Scoring] Error: {e}", exc_info=True)
- scores['seo_discoverability'] = 0
-
- # --- 6. Content Rules Compliance (15%) ← NEW ---
- content_rules_result = None
- if content_rules and self.content_rules_scorer:
- try:
- content_rules_result = self.content_rules_scorer.score_content_fields(
- product, content_rules
- )
- scores['content_rules_compliance'] = content_rules_result['overall_content_score']
- issues.extend(content_rules_result['issues'])
- suggestions.extend(content_rules_result['suggestions'])
- except Exception as e:
- logger.error(f"[Content Rules] Error: {e}", exc_info=True)
- scores['content_rules_compliance'] = 0
- else:
- # If no content rules provided, give neutral score
- scores['content_rules_compliance'] = 100.0
-
- # --- 7. Title & Description Quality (25% combined) ---
- title_desc_result = None
- if self.title_desc_scorer:
- try:
- title_desc_result = self.title_desc_scorer.score_title_and_description(
- product,
- category_rules
- )
- scores['title_quality'] = title_desc_result['title_score']
- scores['description_quality'] = title_desc_result['description_score']
- issues.extend(title_desc_result['issues'])
- suggestions.extend(title_desc_result['suggestions'])
- except Exception as e:
- logger.error(f"[Title/Description Scoring] Error: {e}", exc_info=True)
- scores['title_quality'] = 0
- scores['description_quality'] = 0
-
- # --- Final Score Calculation ---
- final_score = sum(scores[key] * self.weights[key] for key in scores)
-
- result = {
- 'final_score': round(final_score, 2),
- 'max_score': 100.0,
- 'breakdown': scores,
- 'issues': issues,
- 'suggestions': suggestions,
- 'weights': self.weights,
- 'processing_time': round(time.time() - start_time, 3)
- }
-
- # Add component-specific details
- if seo_result:
- result['seo_details'] = {
- 'breakdown': seo_result['breakdown'],
- 'extracted_keywords': seo_result.get('extracted_keywords', []),
- 'missing_high_value_terms': seo_result.get('missing_high_value_terms', [])
- }
-
- if content_rules_result:
- result['content_rules_details'] = {
- 'field_scores': content_rules_result['field_scores'],
- 'rules_applied': content_rules_result['rules_applied']
- }
-
- if title_desc_result:
- result['title_description_details'] = {
- 'title_breakdown': title_desc_result.get('title_breakdown', {}),
- 'description_breakdown': title_desc_result.get('description_breakdown', {}),
- 'ai_improvements': title_desc_result.get('ai_improvements')
- }
-
- # --- AI Suggestions (Comprehensive) ---
- if generate_ai_suggestions and self.use_ai and self.ai_service:
- try:
- logger.info(f"Generating comprehensive AI suggestions for SKU: {product.get('sku')}")
- ai_suggestions = self.ai_service.generate_comprehensive_suggestions(
- product,
- issues,
- category_rules,
- scores
- )
- result['ai_suggestions'] = ai_suggestions
- except Exception as e:
- logger.error(f"[AI Suggestions] Error: {e}", exc_info=True)
- result['ai_suggestions'] = {'error': str(e)}
-
- return result
-
- # ========== ATTRIBUTE VALIDATION METHODS (unchanged) ==========
-
- def _check_mandatory_fields(
- self,
- attributes: Dict,
- rules: List[Dict]
- ) -> Tuple[float, List[str], List[str]]:
- """Check if all mandatory fields are present and valid"""
- mandatory_rules = [r for r in rules if r.get('is_mandatory', False)]
-
- if not mandatory_rules:
- return 100.0, [], []
-
- present_count = 0
- issues = []
- suggestions = []
-
- for rule in mandatory_rules:
- attr_name = rule['attribute_name']
-
- if attr_name in attributes and attributes[attr_name]:
- value = str(attributes[attr_name]).strip()
-
- if not value:
- issues.append(f"Mandatory field '{attr_name}' is empty")
- suggestions.append(f"Provide a non-empty value for {attr_name}")
- continue
-
- # Check length constraints
- min_len = rule.get('min_length')
- max_len = rule.get('max_length')
-
- if min_len and len(value) < min_len:
- issues.append(f"'{attr_name}' too short (min: {min_len} chars)")
- suggestions.append(f"Expand {attr_name} to at least {min_len} characters")
- continue
-
- if max_len and len(value) > max_len:
- issues.append(f"'{attr_name}' too long (max: {max_len} chars)")
- suggestions.append(f"Shorten {attr_name} to {max_len} characters or less")
- continue
-
- # Check regex pattern if provided
- regex = rule.get('validation_regex')
- if regex:
- try:
- if not re.match(regex, value):
- issues.append(f"'{attr_name}' format invalid")
- suggestions.append(f"Ensure {attr_name} matches required format")
- continue
- except re.error:
- logger.warning(f"Invalid regex pattern for {attr_name}: {regex}")
-
- present_count += 1
- else:
- issues.append(f"Missing mandatory field: {attr_name}")
- desc = rule.get('description', '')
- if desc:
- suggestions.append(f"Add {attr_name}: {desc}")
- else:
- suggestions.append(f"Add required attribute: {attr_name}")
-
- score = (present_count / len(mandatory_rules)) * 100 if mandatory_rules else 100.0
- return score, issues, suggestions
-
- def _check_standardization(
- self,
- attributes: Dict,
- rules: List[Dict]
- ) -> Tuple[float, List[str], List[str]]:
- """Check if attribute values match standardized valid values"""
- standardized_rules = [r for r in rules if r.get('valid_values')]
-
- if not standardized_rules:
- return 100.0, [], []
-
- correct_count = 0
- issues = []
- suggestions = []
-
- for rule in standardized_rules:
- attr_name = rule['attribute_name']
- valid_values = rule['valid_values']
-
- if not valid_values:
- continue
-
- if attr_name not in attributes or not attributes[attr_name]:
- continue
-
- actual_value = str(attributes[attr_name]).strip()
-
- if not actual_value:
- continue
-
- # Exact match
- if actual_value in valid_values:
- correct_count += 1
- continue
-
- # Case-insensitive match
- lower_valid = {v.lower(): v for v in valid_values}
- if actual_value.lower() in lower_valid:
- correct_count += 1
- correct_value = lower_valid[actual_value.lower()]
- if actual_value != correct_value:
- issues.append(
- f"{attr_name}: Case mismatch - '{actual_value}' should be '{correct_value}'"
- )
- suggestions.append(f"Correct capitalization of {attr_name} to: {correct_value}")
- continue
-
- # Fuzzy matching
- best_match = process.extractOne(actual_value, valid_values, scorer=fuzz.ratio)
-
- if best_match and best_match[1] >= 80:
- correct_count += 1
- if best_match[1] < 100:
- issues.append(
- f"{attr_name}: '{actual_value}' likely means '{best_match[0]}' "
- f"(confidence: {best_match[1]}%)"
- )
- suggestions.append(f"Standardize {attr_name} to: {best_match[0]}")
- else:
- issues.append(
- f"{attr_name}: '{actual_value}' not recognized. "
- f"Valid: {', '.join(valid_values[:3])}"
- )
- suggestions.append(f"Change {attr_name} to one of: {', '.join(valid_values[:3])}")
-
- score = (correct_count / len(standardized_rules)) * 100 if standardized_rules else 100.0
- return score, issues, suggestions
-
- def _check_missing_values(
- self,
- attributes: Dict,
- rules: List[Dict]
- ) -> Tuple[float, List[str], List[str]]:
- """Check for placeholder values or empty attributes"""
- placeholder_patterns = [
- r'^n/?a$', r'^none$', r'^null$', r'^-+$', r'^\.+$',
- r'^tbd$', r'^to be determined$', r'^unknown$', r'^na$',
- r'^todo$', r'^pending$', r'^\?+$', r'^xxx+$', r'^placeholder$'
- ]
-
- total_attrs = len(rules)
- valid_count = 0
- issues = []
- suggestions = []
-
- for rule in rules:
- attr_name = rule['attribute_name']
-
- if attr_name not in attributes:
- continue
-
- value = str(attributes[attr_name]).strip()
-
- if not value:
- issues.append(f"'{attr_name}' is empty")
- suggestions.append(f"Provide a valid value for {attr_name}")
- continue
-
- value_lower = value.lower()
- is_placeholder = any(
- re.match(pattern, value_lower, re.IGNORECASE)
- for pattern in placeholder_patterns
- )
-
- if is_placeholder:
- issues.append(f"'{attr_name}' contains placeholder: '{value}'")
- suggestions.append(f"Replace placeholder in {attr_name} with actual data")
- continue
-
- # Check for suspiciously short values on mandatory fields
- if rule.get('is_mandatory') and len(value) < 2:
- issues.append(f"'{attr_name}' suspiciously short: '{value}'")
- suggestions.append(f"Provide more detailed {attr_name}")
- continue
-
- valid_count += 1
-
- score = (valid_count / total_attrs) * 100 if total_attrs > 0 else 100.0
- return score, issues, suggestions
-
- def _check_consistency(
- self,
- attributes: Dict,
- title: str,
- description: str
- ) -> Tuple[float, List[str]]:
- """Check if attribute values are mentioned in title/description"""
- issues = []
- consistency_count = 0
- total_checks = 0
-
- check_attrs = ['brand', 'color', 'size', 'material', 'model', 'weight', 'dimensions']
- combined_text = f"{title} {description}".lower()
-
- for attr in check_attrs:
- if attr not in attributes or not attributes[attr]:
- continue
-
- total_checks += 1
- attr_value = str(attributes[attr]).lower().strip()
-
- if len(attr_value) < 2:
- consistency_count += 1
- continue
-
- if attr_value in combined_text:
- consistency_count += 1
- continue
-
- words_in_attr = set(attr_value.split())
- words_in_text = set(combined_text.split())
-
- if words_in_attr.issubset(words_in_text):
- consistency_count += 1
- continue
-
- text_words = combined_text.split()
- if text_words:
- match = process.extractOne(attr_value, text_words, scorer=fuzz.ratio)
-
- if match and match[1] >= 80:
- consistency_count += 1
- continue
-
- issues.append(
- f"'{attr.title()}': '{attributes[attr]}' not mentioned in title/description"
- )
-
- score = (consistency_count / total_checks) * 100 if total_checks > 0 else 100.0
- return score, issues
-
- # ========== UTILITY METHODS (unchanged) ==========
-
- def extract_attributes_from_text(self, text: str, category: str = '') -> Dict:
- """Extract attributes from unstructured text using NER and patterns"""
- extracted = {}
-
- if self.nlp:
- try:
- doc = self.nlp(text)
- orgs = [ent.text for ent in doc.ents if ent.label_ == 'ORG']
- if orgs:
- extracted['brand'] = orgs[0]
- quantities = [ent.text for ent in doc.ents if ent.label_ == 'QUANTITY']
- if quantities:
- extracted['size'] = quantities[0]
- except Exception as e:
- logger.warning(f"spaCy extraction failed: {e}")
-
- # Pattern-based extraction for colors
- color_patterns = [
- 'black', 'white', 'red', 'blue', 'green', 'yellow', 'orange',
- 'purple', 'pink', 'brown', 'gray', 'grey', 'silver', 'gold',
- 'rose gold', 'space gray', 'navy', 'beige', 'tan'
- ]
- text_lower = text.lower()
- for color in color_patterns:
- if color in text_lower:
- extracted['color'] = color.title()
- break
-
- # Use AI for enhanced extraction if available
- if self.use_ai and self.ai_service and len(extracted) < 3:
- try:
- ai_extracted = self.ai_service.extract_attributes_with_ai(
- text, '', category
- )
- for key, value in ai_extracted.items():
- if key not in extracted and value:
- extracted[key] = value
- except Exception as e:
- logger.warning(f"AI extraction failed: {e}")
-
- return extracted
-
- def get_score_interpretation(self, score: float) -> Dict[str, str]:
- """Get human-readable interpretation of score"""
- if score >= 90:
- return {
- 'grade': 'A',
- 'status': 'Excellent',
- 'color': 'green',
- 'recommendation': 'Product listing is of high quality. Minor tweaks only.'
- }
- elif score >= 80:
- return {
- 'grade': 'B',
- 'status': 'Good',
- 'color': 'lightgreen',
- 'recommendation': 'Good quality. Address minor issues to reach excellence.'
- }
- elif score >= 70:
- return {
- 'grade': 'C',
- 'status': 'Fair',
- 'color': 'yellow',
- 'recommendation': 'Acceptable but needs improvement. Review suggestions.'
- }
- elif score >= 60:
- return {
- 'grade': 'D',
- 'status': 'Poor',
- 'color': 'orange',
- 'recommendation': 'Significant issues found. Requires immediate attention.'
- }
- else:
- return {
- 'grade': 'F',
- 'status': 'Critical',
- 'color': 'red',
- 'recommendation': 'Critical quality issues. Major revision needed.'
- }
|