123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943 |
- # # attribute_scorer_integrated.py
- # """
- # Enhanced AttributeQualityScorer with ProductContentRule integration
- # """
- # import re
- # import time
- # import logging
- # from typing import Dict, List, Tuple
- # from rapidfuzz import fuzz, process
- # from collections import defaultdict
- # logger = logging.getLogger(__name__)
- # class AttributeQualityScorer:
- # """
- # Optimized Product Quality Scorer with safe handling of optional components:
- # - Mandatory Fields (16%)
- # - Standardization (12%)
- # - Missing Values (8%)
- # - Consistency (4%)
- # - SEO Discoverability (8%)
- # - Content Rules Compliance (12%)
- # - Title Quality (8%)
- # - Description Quality (12%)
- # - Image Quality (20%)
- # Optional components do not inflate final score; weights are rescaled proportionally.
- # """
- # def __init__(self, use_ai: bool = True, use_seo: bool = True):
- # # spaCy for NER
- # self.nlp = None
- # try:
- # import spacy
- # self.nlp = spacy.load("en_core_web_sm")
- # logger.info("spaCy model loaded successfully")
- # except Exception as e:
- # logger.warning(f"spaCy not loaded: {e}")
- # # AI Service
- # self.use_ai = use_ai
- # self.ai_service = None
- # if use_ai:
- # try:
- # from .gemini_service import GeminiAttributeService
- # self.ai_service = GeminiAttributeService()
- # except Exception as e:
- # logger.warning(f"AI service not available: {e}")
- # self.use_ai = False
- # # Image scorer
- # self.image_scorer = None
- # try:
- # from .image_scorer import ImageQualityScorer
- # self.image_scorer = ImageQualityScorer()
- # except Exception as e:
- # logger.warning(f"Image scorer not available: {e}")
- # # SEO scorer
- # self.use_seo = use_seo
- # self.seo_scorer = None
- # if use_seo:
- # try:
- # from .seo_scorer import SEODiscoverabilityScorer
- # self.seo_scorer = SEODiscoverabilityScorer()
- # except Exception as e:
- # logger.warning(f"SEO scorer not available: {e}")
- # self.use_seo = False
- # # Title/Description scorer
- # self.title_desc_scorer = None
- # try:
- # from .title_description_scorer import TitleDescriptionScorer
- # self.title_desc_scorer = TitleDescriptionScorer(use_ai=use_ai)
- # except Exception as e:
- # logger.warning(f"Title/Description scorer not available: {e}")
- # # Content Rules scorer
- # self.content_rules_scorer = None
- # try:
- # from .content_rules_scorer import ContentRulesScorer
- # self.content_rules_scorer = ContentRulesScorer()
- # except Exception as e:
- # logger.warning(f"Content Rules scorer not available: {e}")
- # # Base weights
- # self.weights = {
- # 'mandatory_fields': 0.16,
- # 'standardization': 0.12,
- # 'missing_values': 0.08,
- # 'consistency': 0.04,
- # 'seo_discoverability': 0.08,
- # 'content_rules_compliance': 0.12,
- # 'title_quality': 0.08,
- # 'description_quality': 0.12,
- # 'image_quality': 0.20
- # }
- # def score_product(
- # self,
- # product: Dict,
- # category_rules: List[Dict],
- # content_rules: List[Dict] = None,
- # generate_ai_suggestions: bool = True
- # ) -> Dict:
- # start_time = time.time()
- # attributes = product.get('attributes', {})
- # category = product.get('category', '')
- # scores = {k: 0 for k in self.weights.keys()}
- # issues, suggestions = [], []
- # # --- 1. Mandatory Fields ---
- # try:
- # score, i, s = self._check_mandatory_fields(attributes, category_rules)
- # scores['mandatory_fields'] = score
- # issues.extend(i)
- # suggestions.extend(s)
- # except Exception as e:
- # logger.error(f"[Mandatory Fields] {e}", exc_info=True)
- # # --- 2. Standardization ---
- # try:
- # score, i, s = self._check_standardization(attributes, category_rules)
- # scores['standardization'] = score
- # issues.extend(i)
- # suggestions.extend(s)
- # except Exception as e:
- # logger.error(f"[Standardization] {e}", exc_info=True)
- # # --- 3. Missing Values ---
- # try:
- # score, i, s = self._check_missing_values(attributes, category_rules)
- # scores['missing_values'] = score
- # issues.extend(i)
- # suggestions.extend(s)
- # except Exception as e:
- # logger.error(f"[Missing Values] {e}", exc_info=True)
- # # --- 4. Consistency ---
- # try:
- # score, i = self._check_consistency(attributes, product.get('title', ''), product.get('description', ''))
- # scores['consistency'] = score
- # issues.extend(i)
- # except Exception as e:
- # logger.error(f"[Consistency] {e}", exc_info=True)
- # # --- 5. SEO ---
- # seo_result = None
- # if self.use_seo and self.seo_scorer:
- # try:
- # seo_result = self.seo_scorer.score_seo(product, category_rules)
- # scores['seo_discoverability'] = seo_result['seo_score']
- # issues.extend(seo_result['issues'])
- # suggestions.extend(seo_result['suggestions'])
- # except Exception as e:
- # logger.error(f"[SEO] {e}", exc_info=True)
- # # --- 6. Content Rules ---
- # content_rules_result = None
- # if content_rules and self.content_rules_scorer:
- # try:
- # content_rules_result = self.content_rules_scorer.score_content_fields(product, content_rules)
- # scores['content_rules_compliance'] = content_rules_result['overall_content_score']
- # issues.extend(content_rules_result['issues'])
- # suggestions.extend(content_rules_result['suggestions'])
- # except Exception as e:
- # logger.error(f"[Content Rules] {e}", exc_info=True)
- # # --- 7. Title & Description ---
- # title_desc_result = None
- # if self.title_desc_scorer:
- # try:
- # title_desc_result = self.title_desc_scorer.score_title_and_description(product, category_rules)
- # scores['title_quality'] = title_desc_result['title_score']
- # scores['description_quality'] = title_desc_result['description_score']
- # issues.extend(title_desc_result['issues'])
- # suggestions.extend(title_desc_result['suggestions'])
- # except Exception as e:
- # logger.error(f"[Title/Desc] {e}", exc_info=True)
- # # --- 8. Image Quality (always present in breakdown) ---
- # image_result = None
- # if self.image_scorer:
- # images = product.get("images", [])
- # if images:
- # try:
- # image_result = self.image_scorer.score_images(images)
- # scores['image_quality'] = image_result.get("overall_image_score", 0)
- # except Exception as e:
- # logger.error(f"[Image] {e}", exc_info=True)
- # scores['image_quality'] = None
- # else:
- # scores['image_quality'] = None # explicitly mark as unavailable
- # else:
- # scores['image_quality'] = None
- # # --- 9. Weight Rescaling for applicable numeric components ---
- # numeric_scores = {k: v for k, v in scores.items() if isinstance(v, (int, float))}
- # applicable_weights = {k: self.weights[k] for k in numeric_scores}
- # total_weight = sum(applicable_weights.values())
- # final_score = sum(numeric_scores[k] * (applicable_weights[k]/total_weight) for k in numeric_scores) if numeric_scores else None
-
-
- # if image_result:
- # result['image_details'] = image_result
- # # AI Suggestions
- # if generate_ai_suggestions and self.use_ai and self.ai_service:
- # try:
- # ai_suggestions = self.ai_service.generate_comprehensive_suggestions(
- # product, issues, category_rules, scores
- # )
- # result['ai_suggestions'] = ai_suggestions
- # except Exception as e:
- # logger.error(f"[AI Suggestions] {e}", exc_info=True)
- # result['ai_suggestions'] = {'error': str(e)}
- # return result
- # # ========== ATTRIBUTE VALIDATION METHODS (unchanged) ==========
-
- # def _check_mandatory_fields(
- # self,
- # attributes: Dict,
- # rules: List[Dict]
- # ) -> Tuple[float, List[str], List[str]]:
- # """Check if all mandatory fields are present and valid"""
- # mandatory_rules = [r for r in rules if r.get('is_mandatory', False)]
-
- # if not mandatory_rules:
- # return 100.0, [], []
-
- # present_count = 0
- # issues = []
- # suggestions = []
-
- # for rule in mandatory_rules:
- # attr_name = rule['attribute_name']
-
- # if attr_name in attributes and attributes[attr_name]:
- # value = str(attributes[attr_name]).strip()
-
- # if not value:
- # issues.append(f"Mandatory field '{attr_name}' is empty")
- # suggestions.append(f"Provide a non-empty value for {attr_name}")
- # continue
-
- # # Check length constraints
- # min_len = rule.get('min_length')
- # max_len = rule.get('max_length')
-
- # if min_len and len(value) < min_len:
- # issues.append(f"'{attr_name}' too short (min: {min_len} chars)")
- # suggestions.append(f"Expand {attr_name} to at least {min_len} characters")
- # continue
-
- # if max_len and len(value) > max_len:
- # issues.append(f"'{attr_name}' too long (max: {max_len} chars)")
- # suggestions.append(f"Shorten {attr_name} to {max_len} characters or less")
- # continue
-
- # # Check regex pattern if provided
- # regex = rule.get('validation_regex')
- # if regex:
- # try:
- # if not re.match(regex, value):
- # issues.append(f"'{attr_name}' format invalid")
- # suggestions.append(f"Ensure {attr_name} matches required format")
- # continue
- # except re.error:
- # logger.warning(f"Invalid regex pattern for {attr_name}: {regex}")
-
- # present_count += 1
- # else:
- # issues.append(f"Missing mandatory field: {attr_name}")
- # desc = rule.get('description', '')
- # if desc:
- # suggestions.append(f"Add {attr_name}: {desc}")
- # else:
- # suggestions.append(f"Add required attribute: {attr_name}")
-
- # score = (present_count / len(mandatory_rules)) * 100 if mandatory_rules else 100.0
- # return score, issues, suggestions
-
- # def _check_standardization(
- # self,
- # attributes: Dict,
- # rules: List[Dict]
- # ) -> Tuple[float, List[str], List[str]]:
- # """Check if attribute values match standardized valid values"""
- # standardized_rules = [r for r in rules if r.get('valid_values')]
-
- # if not standardized_rules:
- # return 100.0, [], []
-
- # correct_count = 0
- # issues = []
- # suggestions = []
-
- # for rule in standardized_rules:
- # attr_name = rule['attribute_name']
- # valid_values = rule['valid_values']
-
- # if not valid_values:
- # continue
-
- # if attr_name not in attributes or not attributes[attr_name]:
- # continue
-
- # actual_value = str(attributes[attr_name]).strip()
-
- # if not actual_value:
- # continue
-
- # # Exact match
- # if actual_value in valid_values:
- # correct_count += 1
- # continue
-
- # # Case-insensitive match
- # lower_valid = {v.lower(): v for v in valid_values}
- # if actual_value.lower() in lower_valid:
- # correct_count += 1
- # correct_value = lower_valid[actual_value.lower()]
- # if actual_value != correct_value:
- # issues.append(
- # f"{attr_name}: Case mismatch - '{actual_value}' should be '{correct_value}'"
- # )
- # suggestions.append(f"Correct capitalization of {attr_name} to: {correct_value}")
- # continue
-
- # # Fuzzy matching
- # best_match = process.extractOne(actual_value, valid_values, scorer=fuzz.ratio)
-
- # if best_match and best_match[1] >= 80:
- # correct_count += 1
- # if best_match[1] < 100:
- # issues.append(
- # f"{attr_name}: '{actual_value}' likely means '{best_match[0]}' "
- # f"(confidence: {best_match[1]}%)"
- # )
- # suggestions.append(f"Standardize {attr_name} to: {best_match[0]}")
- # else:
- # issues.append(
- # f"{attr_name}: '{actual_value}' not recognized. "
- # f"Valid: {', '.join(valid_values[:3])}"
- # )
- # suggestions.append(f"Change {attr_name} to one of: {', '.join(valid_values[:3])}")
-
- # score = (correct_count / len(standardized_rules)) * 100 if standardized_rules else 100.0
- # return score, issues, suggestions
-
- # def _check_missing_values(
- # self,
- # attributes: Dict,
- # rules: List[Dict]
- # ) -> Tuple[float, List[str], List[str]]:
- # """Check for placeholder values or empty attributes"""
- # placeholder_patterns = [
- # r'^n/?a$', r'^none$', r'^null$', r'^-+$', r'^\.+$',
- # r'^tbd$', r'^to be determined$', r'^unknown$', r'^na$',
- # r'^todo$', r'^pending$', r'^\?+$', r'^xxx+$', r'^placeholder$'
- # ]
-
- # total_attrs = len(rules)
- # valid_count = 0
- # issues = []
- # suggestions = []
-
- # for rule in rules:
- # attr_name = rule['attribute_name']
-
- # if attr_name not in attributes:
- # continue
-
- # value = str(attributes[attr_name]).strip()
-
- # if not value:
- # issues.append(f"'{attr_name}' is empty")
- # suggestions.append(f"Provide a valid value for {attr_name}")
- # continue
-
- # value_lower = value.lower()
- # is_placeholder = any(
- # re.match(pattern, value_lower, re.IGNORECASE)
- # for pattern in placeholder_patterns
- # )
-
- # if is_placeholder:
- # issues.append(f"'{attr_name}' contains placeholder: '{value}'")
- # suggestions.append(f"Replace placeholder in {attr_name} with actual data")
- # continue
-
- # # Check for suspiciously short values on mandatory fields
- # if rule.get('is_mandatory') and len(value) < 2:
- # issues.append(f"'{attr_name}' suspiciously short: '{value}'")
- # suggestions.append(f"Provide more detailed {attr_name}")
- # continue
-
- # valid_count += 1
-
- # score = (valid_count / total_attrs) * 100 if total_attrs > 0 else 100.0
- # return score, issues, suggestions
-
- # def _check_consistency(
- # self,
- # attributes: Dict,
- # title: str,
- # description: str
- # ) -> Tuple[float, List[str]]:
- # """Check if attribute values are mentioned in title/description"""
- # issues = []
- # consistency_count = 0
- # total_checks = 0
-
- # check_attrs = ['brand', 'color', 'size', 'material', 'model', 'weight', 'dimensions']
- # combined_text = f"{title} {description}".lower()
-
- # for attr in check_attrs:
- # if attr not in attributes or not attributes[attr]:
- # continue
-
- # total_checks += 1
- # attr_value = str(attributes[attr]).lower().strip()
-
- # if len(attr_value) < 2:
- # consistency_count += 1
- # continue
-
- # if attr_value in combined_text:
- # consistency_count += 1
- # continue
-
- # words_in_attr = set(attr_value.split())
- # words_in_text = set(combined_text.split())
-
- # if words_in_attr.issubset(words_in_text):
- # consistency_count += 1
- # continue
-
- # text_words = combined_text.split()
- # if text_words:
- # match = process.extractOne(attr_value, text_words, scorer=fuzz.ratio)
-
- # if match and match[1] >= 80:
- # consistency_count += 1
- # continue
-
- # issues.append(
- # f"'{attr.title()}': '{attributes[attr]}' not mentioned in title/description"
- # )
-
- # score = (consistency_count / total_checks) * 100 if total_checks > 0 else 100.0
- # return score, issues
-
- # # ========== UTILITY METHODS (unchanged) ==========
-
- # def extract_attributes_from_text(self, text: str, category: str = '') -> Dict:
- # """Extract attributes from unstructured text using NER and patterns"""
- # extracted = {}
-
- # if self.nlp:
- # try:
- # doc = self.nlp(text)
- # orgs = [ent.text for ent in doc.ents if ent.label_ == 'ORG']
- # if orgs:
- # extracted['brand'] = orgs[0]
- # quantities = [ent.text for ent in doc.ents if ent.label_ == 'QUANTITY']
- # if quantities:
- # extracted['size'] = quantities[0]
- # except Exception as e:
- # logger.warning(f"spaCy extraction failed: {e}")
-
- # # Pattern-based extraction for colors
- # color_patterns = [
- # 'black', 'white', 'red', 'blue', 'green', 'yellow', 'orange',
- # 'purple', 'pink', 'brown', 'gray', 'grey', 'silver', 'gold',
- # 'rose gold', 'space gray', 'navy', 'beige', 'tan'
- # ]
- # text_lower = text.lower()
- # for color in color_patterns:
- # if color in text_lower:
- # extracted['color'] = color.title()
- # break
-
- # # Use AI for enhanced extraction if available
- # if self.use_ai and self.ai_service and len(extracted) < 3:
- # try:
- # ai_extracted = self.ai_service.extract_attributes_with_ai(
- # text, '', category
- # )
- # for key, value in ai_extracted.items():
- # if key not in extracted and value:
- # extracted[key] = value
- # except Exception as e:
- # logger.warning(f"AI extraction failed: {e}")
-
- # return extracted
-
- # def get_score_interpretation(self, score: float) -> Dict[str, str]:
- # """Get human-readable interpretation of score"""
- # if score >= 90:
- # return {
- # 'grade': 'A',
- # 'status': 'Excellent',
- # 'color': 'green',
- # 'recommendation': 'Product listing is of high quality. Minor tweaks only.'
- # }
- # elif score >= 80:
- # return {
- # 'grade': 'B',
- # 'status': 'Good',
- # 'color': 'lightgreen',
- # 'recommendation': 'Good quality. Address minor issues to reach excellence.'
- # }
- # elif score >= 70:
- # return {
- # 'grade': 'C',
- # 'status': 'Fair',
- # 'color': 'yellow',
- # 'recommendation': 'Acceptable but needs improvement. Review suggestions.'
- # }
- # elif score >= 60:
- # return {
- # 'grade': 'D',
- # 'status': 'Poor',
- # 'color': 'orange',
- # 'recommendation': 'Significant issues found. Requires immediate attention.'
- # }
- # else:
- # return {
- # 'grade': 'F',
- # 'status': 'Critical',
- # 'color': 'red',
- # 'recommendation': 'Critical quality issues. Major revision needed.'
- # }
- # attribute_scorer_integrated.py
- """
- Enhanced AttributeQualityScorer with ProductContentRule integration
- Consistent breakdown and component status for unavailable scores
- """
- import re
- import time
- import logging
- from typing import Dict, List, Tuple
- from rapidfuzz import fuzz, process
- logger = logging.getLogger(__name__)
- class AttributeQualityScorer:
- """
- Complete Product Quality Scorer with ALL components INCLUDING ProductContentRule validation:
- - Mandatory Fields (16%)
- - Standardization (12%)
- - Missing Values (8%)
- - Consistency (4%)
- - SEO Discoverability (8%)
- - Content Rules Compliance (12%) ← NEW
- - Title Quality (8%)
- - Description Quality (12%)
- - Image Quality (20%)
- """
- def __init__(self, use_ai: bool = True, use_seo: bool = True):
- # Load spaCy model (optional, for advanced NER)
- self.nlp = None
- try:
- import spacy
- self.nlp = spacy.load("en_core_web_sm")
- logger.info("spaCy model loaded successfully")
- except Exception as e:
- logger.warning(f"spaCy model not loaded: {e}. Will use fallback methods.")
-
- # Initialize AI service
- self.use_ai = use_ai
- self.ai_service = None
- if use_ai:
- try:
- from .gemini_service import GeminiAttributeService
- self.ai_service = GeminiAttributeService()
- logger.info("Gemini AI service initialized")
- except Exception as e:
- logger.warning(f"Gemini service not available: {e}")
- self.use_ai = False
- # Initialize Image Scorer
- self.image_scorer = None
- try:
- from .image_scorer import ImageQualityScorer
- self.image_scorer = ImageQualityScorer()
- logger.info("Image scorer initialized")
- except Exception as e:
- logger.warning(f"Image scorer not available: {e}")
- # Initialize SEO scorer
- self.use_seo = use_seo
- self.seo_scorer = None
- if use_seo:
- try:
- from .seo_scorer import SEODiscoverabilityScorer
- self.seo_scorer = SEODiscoverabilityScorer()
- logger.info("SEO scorer initialized")
- except Exception as e:
- logger.warning(f"SEO scorer not available: {e}")
- self.use_seo = False
- # Initialize Title/Description scorer
- self.title_desc_scorer = None
- try:
- from .title_description_scorer import TitleDescriptionScorer
- self.title_desc_scorer = TitleDescriptionScorer(use_ai=use_ai)
- logger.info("Title/Description scorer initialized")
- except Exception as e:
- logger.warning(f"Title/Description scorer not available: {e}")
- # Initialize Content Rules scorer ← NEW
- self.content_rules_scorer = None
- try:
- from .content_rules_scorer import ContentRulesScorer
- self.content_rules_scorer = ContentRulesScorer()
- logger.info("Content Rules scorer initialized")
- except Exception as e:
- logger.warning(f"Content Rules scorer not available: {e}")
- # UPDATED WEIGHTS (Total = 100%)
- self.weights = {
- 'mandatory_fields': 0.16,
- 'standardization': 0.12,
- 'missing_values': 0.08,
- 'consistency': 0.04,
- 'seo_discoverability': 0.08,
- 'content_rules_compliance': 0.12,
- 'title_quality': 0.08,
- 'description_quality': 0.12,
- 'image_quality': 0.20
- }
- def score_product(
- self,
- product: Dict,
- category_rules: List[Dict],
- content_rules: List[Dict] = None,
- generate_ai_suggestions: bool = True
- ) -> Dict:
- """
- Complete product scoring with all components including ProductContentRule validation
- and consistent breakdown
- """
- start_time = time.time()
- attributes = product.get('attributes', {})
- category = product.get('category', '')
- # Initialize scores and status
- scores = {k: None for k in self.weights}
- component_status = {k: "Not evaluated yet" for k in self.weights}
- issues = []
- suggestions = []
- # --- 1. Mandatory Fields ---
- try:
- score, comp_issues, comp_suggestions = self._check_mandatory_fields(attributes, category_rules)
- scores['mandatory_fields'] = score
- issues.extend(comp_issues)
- suggestions.extend(comp_suggestions)
- component_status['mandatory_fields'] = "Scored successfully"
- except Exception as e:
- logger.error(f"[Mandatory Fields] {e}", exc_info=True)
- component_status['mandatory_fields'] = f"Error: {str(e)}"
- # --- 2. Standardization ---
- try:
- score, comp_issues, comp_suggestions = self._check_standardization(attributes, category_rules)
- scores['standardization'] = score
- issues.extend(comp_issues)
- suggestions.extend(comp_suggestions)
- component_status['standardization'] = "Scored successfully"
- except Exception as e:
- logger.error(f"[Standardization] {e}", exc_info=True)
- component_status['standardization'] = f"Error: {str(e)}"
- # --- 3. Missing Values ---
- try:
- score, comp_issues, comp_suggestions = self._check_missing_values(attributes, category_rules)
- scores['missing_values'] = score
- issues.extend(comp_issues)
- suggestions.extend(comp_suggestions)
- component_status['missing_values'] = "Scored successfully"
- except Exception as e:
- logger.error(f"[Missing Values] {e}", exc_info=True)
- component_status['missing_values'] = f"Error: {str(e)}"
- # --- 4. Consistency ---
- try:
- score, comp_issues = self._check_consistency(attributes, product.get('title', ''), product.get('description', ''))
- scores['consistency'] = score
- issues.extend(comp_issues)
- component_status['consistency'] = "Scored successfully"
- except Exception as e:
- logger.error(f"[Consistency] {e}", exc_info=True)
- component_status['consistency'] = f"Error: {str(e)}"
- # --- 5. SEO Discoverability ---
- if self.use_seo and self.seo_scorer:
- try:
- seo_result = self.seo_scorer.score_seo(product, category_rules)
- scores['seo_discoverability'] = seo_result.get('seo_score', None)
- issues.extend(seo_result.get('issues', []))
- suggestions.extend(seo_result.get('suggestions', []))
- component_status['seo_discoverability'] = "Scored successfully"
- except Exception as e:
- logger.error(f"[SEO] {e}", exc_info=True)
- component_status['seo_discoverability'] = f"Error: {str(e)}"
- else:
- component_status['seo_discoverability'] = "SEO scorer not available"
- # --- 6. Content Rules Compliance ---
- if content_rules and self.content_rules_scorer:
- try:
- content_result = self.content_rules_scorer.score_content_fields(product, content_rules)
- scores['content_rules_compliance'] = content_result.get('overall_content_score', None)
- issues.extend(content_result.get('issues', []))
- suggestions.extend(content_result.get('suggestions', []))
- component_status['content_rules_compliance'] = "Scored successfully"
- except Exception as e:
- logger.error(f"[Content Rules] {e}", exc_info=True)
- component_status['content_rules_compliance'] = f"Error: {str(e)}"
- else:
- scores['content_rules_compliance'] = None
- component_status['content_rules_compliance'] = "No content rules provided"
- # --- 7. Title & Description ---
- if self.title_desc_scorer:
- try:
- td_result = self.title_desc_scorer.score_title_and_description(product, category_rules)
- scores['title_quality'] = td_result.get('title_score', None)
- scores['description_quality'] = td_result.get('description_score', None)
- issues.extend(td_result.get('issues', []))
- suggestions.extend(td_result.get('suggestions', []))
- component_status['title_quality'] = "Scored successfully"
- component_status['description_quality'] = "Scored successfully"
- except Exception as e:
- logger.error(f"[Title/Description] {e}", exc_info=True)
- component_status['title_quality'] = f"Error: {str(e)}"
- component_status['description_quality'] = f"Error: {str(e)}"
- else:
- component_status['title_quality'] = "Title/Description scorer not available"
- component_status['description_quality'] = "Title/Description scorer not available"
- # --- 8. Image Quality ---
- if self.image_scorer:
- try:
- images = product.get('images', [])
- if images:
- img_result = self.image_scorer.score_images(images)
- scores['image_quality'] = img_result.get("overall_image_score", None)
- component_status['image_quality'] = "Scored successfully"
- else:
- scores['image_quality'] = None
- component_status['image_quality'] = "No images provided"
- except Exception as e:
- logger.error(f"[Image] {e}", exc_info=True)
- scores['image_quality'] = None
- component_status['image_quality'] = f"Error: {str(e)}"
- else:
- scores['image_quality'] = None
- component_status['image_quality'] = "Image scorer not available"
- # --- Final Score Calculation: use only numeric scores ---
- numeric_scores = {k: v for k, v in scores.items() if isinstance(v, (int, float))}
- applicable_weights = {k: self.weights[k] for k in numeric_scores}
- total_weight = sum(applicable_weights.values())
- if numeric_scores and total_weight > 0:
- final_score = sum(numeric_scores[k] * (applicable_weights[k] / total_weight) for k in numeric_scores)
- else:
- final_score = None
- result = {
- 'final_score': round(final_score, 2) if final_score is not None else None,
- 'max_score': 100.0,
- 'breakdown': scores,
- 'component_status': component_status,
- 'issues': issues,
- 'suggestions': suggestions,
- 'weights': self.weights,
- 'processing_time': round(time.time() - start_time, 3)
- }
- # AI Suggestions
- if generate_ai_suggestions and self.use_ai and self.ai_service:
- try:
- ai_suggestions = self.ai_service.generate_comprehensive_suggestions(product, issues, category_rules, scores)
- result['ai_suggestions'] = ai_suggestions
- except Exception as e:
- logger.error(f"[AI Suggestions] {e}", exc_info=True)
- result['ai_suggestions'] = {'error': str(e)}
- return result
- # ================= ATTRIBUTE CHECK METHODS (unchanged) =================
- def _check_mandatory_fields(self, attributes: Dict, rules: List[Dict]) -> Tuple[float, List[str], List[str]]:
- mandatory_rules = [r for r in rules if r.get('is_mandatory', False)]
- if not mandatory_rules:
- return 100.0, [], []
- present_count, issues, suggestions = 0, [], []
- for rule in mandatory_rules:
- attr_name = rule['attribute_name']
- if attr_name in attributes and attributes[attr_name]:
- value = str(attributes[attr_name]).strip()
- if not value:
- issues.append(f"Mandatory field '{attr_name}' is empty")
- suggestions.append(f"Provide a non-empty value for {attr_name}")
- continue
- min_len = rule.get('min_length')
- max_len = rule.get('max_length')
- if min_len and len(value) < min_len:
- issues.append(f"'{attr_name}' too short (min: {min_len} chars)")
- suggestions.append(f"Expand {attr_name} to at least {min_len} characters")
- continue
- if max_len and len(value) > max_len:
- issues.append(f"'{attr_name}' too long (max: {max_len} chars)")
- suggestions.append(f"Shorten {attr_name} to {max_len} characters or less")
- continue
- regex = rule.get('validation_regex')
- if regex:
- try:
- if not re.match(regex, value):
- issues.append(f"'{attr_name}' format invalid")
- suggestions.append(f"Ensure {attr_name} matches required format")
- continue
- except re.error:
- logger.warning(f"Invalid regex pattern for {attr_name}: {regex}")
- present_count += 1
- else:
- issues.append(f"Missing mandatory field: {attr_name}")
- desc = rule.get('description', '')
- suggestions.append(f"Add {attr_name}: {desc}" if desc else f"Add required attribute: {attr_name}")
- score = (present_count / len(mandatory_rules)) * 100 if mandatory_rules else 100.0
- return score, issues, suggestions
- def _check_standardization(self, attributes: Dict, rules: List[Dict]) -> Tuple[float, List[str], List[str]]:
- standardized_rules = [r for r in rules if r.get('valid_values')]
- if not standardized_rules:
- return 100.0, [], []
- correct_count, issues, suggestions = 0, [], []
- for rule in standardized_rules:
- attr_name = rule['attribute_name']
- valid_values = rule['valid_values']
- if not valid_values or attr_name not in attributes or not attributes[attr_name]:
- continue
- actual_value = str(attributes[attr_name]).strip()
- if actual_value in valid_values:
- correct_count += 1
- continue
- lower_valid = {v.lower(): v for v in valid_values}
- if actual_value.lower() in lower_valid:
- correct_count += 1
- correct_value = lower_valid[actual_value.lower()]
- if actual_value != correct_value:
- issues.append(f"{attr_name}: Case mismatch - '{actual_value}' should be '{correct_value}'")
- suggestions.append(f"Correct capitalization of {attr_name} to: {correct_value}")
- continue
- best_match = process.extractOne(actual_value, valid_values, scorer=fuzz.ratio)
- if best_match and best_match[1] >= 80:
- correct_count += 1
- if best_match[1] < 100:
- issues.append(f"{attr_name}: '{actual_value}' likely means '{best_match[0]}' (confidence: {best_match[1]}%)")
- suggestions.append(f"Standardize {attr_name} to: {best_match[0]}")
- else:
- issues.append(f"{attr_name}: '{actual_value}' not recognized. Valid: {', '.join(valid_values[:3])}")
- suggestions.append(f"Change {attr_name} to one of: {', '.join(valid_values[:3])}")
- score = (correct_count / len(standardized_rules)) * 100 if standardized_rules else 100.0
- return score, issues, suggestions
- def _check_missing_values(self, attributes: Dict, rules: List[Dict]) -> Tuple[float, List[str], List[str]]:
- placeholder_patterns = [r'^n/?a$', r'^none$', r'^null$', r'^-+$', r'^\.+$', r'^tbd$', r'^to be determined$', r'^unknown$', r'^na$', r'^todo$', r'^pending$', r'^\?+$', r'^xxx+$', r'^placeholder$']
- total_attrs, valid_count, issues, suggestions = len(rules), 0, [], []
- for rule in rules:
- attr_name = rule['attribute_name']
- if attr_name not in attributes:
- continue
- value = str(attributes[attr_name]).strip()
- if not value:
- issues.append(f"'{attr_name}' is empty")
- suggestions.append(f"Provide a valid value for {attr_name}")
- continue
- value_lower = value.lower()
- is_placeholder = any(re.match(pattern, value_lower, re.IGNORECASE) for pattern in placeholder_patterns)
- if is_placeholder:
- issues.append(f"'{attr_name}' contains placeholder: '{value}'")
- suggestions.append(f"Replace placeholder in {attr_name} with actual data")
- continue
- if rule.get('is_mandatory') and len(value) < 2:
- issues.append(f"'{attr_name}' suspiciously short: '{value}'")
- suggestions.append(f"Provide more detailed {attr_name}")
- continue
- valid_count += 1
- score = (valid_count / total_attrs) * 100 if total_attrs > 0 else 100.0
- return score, issues, suggestions
- def _check_consistency(self, attributes: Dict, title: str, description: str) -> Tuple[float, List[str]]:
- issues, consistency_count, total_checks = [], 0, 0
- check_attrs = ['brand', 'color', 'size', 'material', 'model', 'weight', 'dimensions']
- combined_text = f"{title} {description}".lower()
- for attr in check_attrs:
- if attr not in attributes or not attributes[attr]:
- continue
- total_checks += 1
- attr_value = str(attributes[attr]).lower().strip()
- if len(attr_value) < 2 or attr_value in combined_text:
- consistency_count += 1
- continue
- words_in_attr = set(attr_value.split())
- words_in_text = set(combined_text.split())
- if words_in_attr.issubset(words_in_text):
- consistency_count += 1
- continue
- text_words = combined_text.split()
- if text_words:
- match = process.extractOne(attr_value, text_words, scorer=fuzz.ratio)
- if match and match[1] >= 80:
- consistency_count += 1
- continue
- issues.append(f"'{attr.title()}': '{attributes[attr]}' not mentioned in title/description")
- score = (consistency_count / total_checks) * 100 if total_checks > 0 else 100.0
- return score, issues
|