123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828 |
- # # attribute_scorer.py (Enhanced)
- # import re
- # import time
- # from typing import Dict, List, Tuple
- # from rapidfuzz import fuzz, process
- # import spacy
- # from collections import defaultdict
- # import logging
- # logger = logging.getLogger(__name__)
- # class AttributeQualityScorer:
- # """
- # Enhanced scorer with AI-powered suggestions and robust error handling
- # """
-
- # def __init__(self, use_ai: bool = True):
- # # Load spaCy model
- # try:
- # self.nlp = spacy.load("en_core_web_sm")
- # except Exception as e:
- # logger.warning(f"spaCy model not loaded: {e}")
- # self.nlp = None
-
- # # Initialize Gemini service
- # self.use_ai = use_ai
- # if use_ai:
- # try:
- # from .gemini_service import GeminiAttributeService
- # self.ai_service = GeminiAttributeService()
- # except Exception as e:
- # logger.warning(f"Gemini service not available: {e}")
- # self.use_ai = False
- # self.ai_service = None
-
- # # Enhanced weights
- # self.weights = {
- # 'mandatory_fields': 0.40,
- # 'standardization': 0.30,
- # 'missing_values': 0.20,
- # 'consistency': 0.10
- # }
-
- # def score_product(self, product: Dict, category_rules: List[Dict], generate_ai_suggestions: bool = True) -> Dict:
- # """
- # Enhanced scoring with AI suggestions and guaranteed AI call
- # """
- # start_time = time.time()
- # attributes = product.get('attributes', {})
- # category = product.get('category', '')
- # # Initialize scores
- # scores = {
- # 'mandatory_fields': 0,
- # 'standardization': 0,
- # 'missing_values': 0,
- # 'consistency': 0
- # }
- # issues = []
- # suggestions = []
- # # --- Mandatory Fields ---
- # try:
- # mandatory_score, mandatory_issues, mandatory_suggestions = self._check_mandatory_fields(attributes, category_rules)
- # scores['mandatory_fields'] = mandatory_score
- # issues.extend(mandatory_issues)
- # suggestions.extend(mandatory_suggestions)
- # except Exception as e:
- # logger.error(f"[Mandatory Fields] Error: {e}")
- # scores['mandatory_fields'] = 0
- # # --- Standardization ---
- # try:
- # std_score, std_issues, std_suggestions = self._check_standardization(attributes, category_rules)
- # scores['standardization'] = std_score
- # issues.extend(std_issues)
- # suggestions.extend(std_suggestions)
- # except Exception as e:
- # logger.error(f"[Standardization] Error: {e}")
- # scores['standardization'] = 0
- # # --- Missing Values ---
- # try:
- # missing_score, missing_issues, missing_suggestions = self._check_missing_values(attributes, category_rules)
- # scores['missing_values'] = missing_score
- # issues.extend(missing_issues)
- # suggestions.extend(missing_suggestions)
- # except Exception as e:
- # logger.error(f"[Missing Values] Error: {e}")
- # scores['missing_values'] = 0
- # # --- Consistency ---
- # try:
- # consistency_score, consistency_issues = self._check_consistency(attributes, product.get('title', ''), product.get('description', ''))
- # scores['consistency'] = consistency_score
- # issues.extend(consistency_issues)
- # except Exception as e:
- # logger.error(f"[Consistency] Error: {e}")
- # scores['consistency'] = 0
- # # --- Final Score ---
- # final_score = sum(scores[key] * self.weights[key] for key in scores)
- # result = {
- # 'final_score': round(final_score, 2),
- # 'max_score': 100.0,
- # 'breakdown': scores,
- # 'issues': issues,
- # 'suggestions': suggestions,
- # 'weights': self.weights,
- # 'processing_time': round(time.time() - start_time, 3)
- # }
- # # --- AI Suggestions (Guaranteed Attempt) ---
- # if generate_ai_suggestions and self.use_ai:
- # try:
- # logger.info(f"Generating AI suggestions for SKU: {product.get('sku')}, issues count: {len(issues)}")
- # ai_suggestions = self.ai_service.generate_attribute_suggestions(
- # product,
- # issues,
- # category_rules
- # )
- # if not ai_suggestions:
- # logger.warning(f"AI service returned empty suggestions for SKU: {product.get('sku')}")
- # ai_suggestions = {"note": "No AI suggestions generated"}
- # result['ai_suggestions'] = ai_suggestions
- # except Exception as e:
- # logger.error(f"[AI Suggestions] Error: {e}")
- # result['ai_suggestions'] = {'error': str(e)}
- # return result
-
- # def _check_mandatory_fields(self, attributes: Dict, rules: List[Dict]) -> Tuple[float, List, List]:
- # """Enhanced mandatory field validation with detailed feedback"""
- # mandatory_rules = [r for r in rules if r.get('is_mandatory', False)]
-
- # if not mandatory_rules:
- # return 100.0, [], []
-
- # present_count = 0
- # issues = []
- # suggestions = []
-
- # for rule in mandatory_rules:
- # attr_name = rule['attribute_name']
-
- # # Check presence and validity
- # if attr_name in attributes and attributes[attr_name]:
- # value = str(attributes[attr_name]).strip()
-
- # if not value:
- # issues.append(f"Mandatory field '{attr_name}' is empty")
- # suggestions.append(f"Provide a non-empty value for {attr_name}")
- # continue
-
- # # Check length constraints
- # min_len = rule.get('min_length')
- # max_len = rule.get('max_length')
-
- # if min_len and len(value) < min_len:
- # issues.append(f"'{attr_name}' too short (min: {min_len} chars)")
- # suggestions.append(f"Expand {attr_name} to at least {min_len} characters")
- # continue
-
- # if max_len and len(value) > max_len:
- # issues.append(f"'{attr_name}' too long (max: {max_len} chars)")
- # suggestions.append(f"Shorten {attr_name} to {max_len} characters or less")
- # continue
-
- # # Check regex pattern
- # regex = rule.get('validation_regex')
- # if regex and not re.match(regex, value):
- # issues.append(f"'{attr_name}' format invalid")
- # suggestions.append(f"Ensure {attr_name} matches required format")
- # continue
-
- # present_count += 1
- # else:
- # issues.append(f"Missing mandatory field: {attr_name}")
- # desc = rule.get('description', '')
- # if desc:
- # suggestions.append(f"Add {attr_name}: {desc}")
- # else:
- # suggestions.append(f"Add required attribute: {attr_name}")
-
- # score = (present_count / len(mandatory_rules)) * 100 if mandatory_rules else 100.0
- # return score, issues, suggestions
-
- # def _check_standardization(self, attributes: Dict, rules: List[Dict]) -> Tuple[float, List, List]:
- # """Enhanced standardization with better fuzzy matching"""
- # standardized_rules = [r for r in rules if r.get('valid_values')]
-
- # if not standardized_rules:
- # return 100.0, [], []
-
- # correct_count = 0
- # issues = []
- # suggestions = []
-
- # for rule in standardized_rules:
- # attr_name = rule['attribute_name']
- # valid_values = rule['valid_values']
-
- # if attr_name not in attributes or not attributes[attr_name]:
- # continue
-
- # actual_value = str(attributes[attr_name]).strip()
-
- # if not actual_value:
- # continue
-
- # # Exact match (case-insensitive)
- # if actual_value in valid_values:
- # correct_count += 1
- # continue
-
- # # Case-insensitive match
- # lower_valid = {v.lower(): v for v in valid_values}
- # if actual_value.lower() in lower_valid:
- # correct_count += 1
- # if actual_value != lower_valid[actual_value.lower()]:
- # issues.append(f"{attr_name}: Case mismatch - '{actual_value}' should be '{lower_valid[actual_value.lower()]}'")
- # suggestions.append(f"Correct capitalization of {attr_name} to: {lower_valid[actual_value.lower()]}")
- # continue
-
- # # Fuzzy matching with multiple scorers
- # best_match = None
- # best_score = 0
-
- # for scorer in [fuzz.ratio, fuzz.partial_ratio, fuzz.token_sort_ratio]:
- # match = process.extractOne(actual_value, valid_values, scorer=scorer)
- # if match and match[1] > best_score:
- # best_match = match
- # best_score = match[1]
-
- # if best_match and best_score >= 80:
- # correct_count += 1
- # if best_score < 100:
- # issues.append(f"{attr_name}: '{actual_value}' likely means '{best_match[0]}' (confidence: {best_score}%)")
- # suggestions.append(f"Standardize {attr_name} to: {best_match[0]}")
- # else:
- # issues.append(f"{attr_name}: '{actual_value}' not recognized. Valid: {', '.join(valid_values[:5])}")
- # suggestions.append(f"Change {attr_name} to one of: {', '.join(valid_values[:3])}")
-
- # score = (correct_count / len(standardized_rules)) * 100 if standardized_rules else 100.0
- # return score, issues, suggestions
-
- # def _check_missing_values(self, attributes: Dict, rules: List[Dict]) -> Tuple[float, List, List]:
- # """Enhanced placeholder detection"""
- # placeholder_patterns = [
- # r'^n/?a$', r'^none$', r'^null$', r'^-+$', r'^\.+$',
- # r'^tbd$', r'^to be determined$', r'^unknown$', r'^na$',
- # r'^todo$', r'^pending$', r'^\?+$', r'^xxx+$', r'^placeholder$'
- # ]
-
- # total_attrs = len(rules)
- # valid_count = 0
- # issues = []
- # suggestions = []
-
- # for rule in rules:
- # attr_name = rule['attribute_name']
-
- # if attr_name not in attributes:
- # continue
-
- # value = str(attributes[attr_name]).strip()
-
- # # Check if empty
- # if not value:
- # issues.append(f"'{attr_name}' is empty")
- # suggestions.append(f"Provide a valid value for {attr_name}")
- # continue
-
- # # Check if placeholder
- # value_lower = value.lower()
- # is_placeholder = any(re.match(pattern, value_lower, re.IGNORECASE) for pattern in placeholder_patterns)
-
- # if is_placeholder:
- # issues.append(f"'{attr_name}' contains placeholder: '{value}'")
- # suggestions.append(f"Replace placeholder in {attr_name} with actual data")
- # continue
-
- # # Check for suspicious patterns
- # if len(value) < 2 and rule.get('is_mandatory'):
- # issues.append(f"'{attr_name}' suspiciously short: '{value}'")
- # suggestions.append(f"Provide more detailed {attr_name}")
- # continue
-
- # valid_count += 1
-
- # score = (valid_count / total_attrs) * 100 if total_attrs > 0 else 100.0
- # return score, issues, suggestions
-
- # def _check_consistency(self, attributes: Dict, title: str, description: str) -> Tuple[float, List]:
- # """Enhanced consistency checking with context awareness"""
- # issues = []
- # consistency_count = 0
- # total_checks = 0
-
- # check_attrs = ['brand', 'color', 'size', 'material', 'model', 'weight', 'dimensions']
- # combined_text = f"{title} {description}".lower()
-
- # for attr in check_attrs:
- # if attr not in attributes or not attributes[attr]:
- # continue
-
- # total_checks += 1
- # attr_value = str(attributes[attr]).lower().strip()
-
- # # Skip very short values
- # if len(attr_value) < 2:
- # consistency_count += 1
- # continue
-
- # # Direct substring match
- # if attr_value in combined_text:
- # consistency_count += 1
- # continue
-
- # # Word boundary match
- # words_in_text = set(combined_text.split())
- # words_in_attr = set(attr_value.split())
-
- # if words_in_attr.issubset(words_in_text):
- # consistency_count += 1
- # continue
-
- # # Fuzzy word matching
- # text_words = combined_text.split()
- # matches = 0
- # for attr_word in words_in_attr:
- # match = process.extractOne(attr_word, text_words, scorer=fuzz.ratio)
- # if match and match[1] >= 80:
- # matches += 1
-
- # if matches / len(words_in_attr) >= 0.7:
- # consistency_count += 1
- # continue
-
- # issues.append(f"'{attr.title()}': '{attributes[attr]}' not clearly mentioned in title/description")
-
- # score = (consistency_count / total_checks) * 100 if total_checks > 0 else 100.0
- # return score, issues
-
- # def extract_attributes_from_text(self, text: str, category: str = '') -> Dict:
- # """Enhanced attribute extraction with AI fallback"""
- # extracted = {}
-
- # # Try spaCy first
- # if self.nlp:
- # extracted = self._extract_with_spacy(text)
-
- # # Use AI if available and spaCy found little
- # if self.use_ai and len(extracted) < 3:
- # try:
- # ai_extracted = self.ai_service.extract_attributes_with_ai(text, '', category)
- # extracted.update({k: v for k, v in ai_extracted.items() if v})
- # except Exception as e:
- # logger.error(f"AI extraction failed: {e}")
-
- # return extracted
-
- # def _extract_with_spacy(self, text: str) -> Dict:
- # """Extract using spaCy NER"""
- # doc = self.nlp(text)
- # extracted = defaultdict(list)
-
- # for ent in doc.ents:
- # if ent.label_ == 'ORG':
- # extracted['brand'].append(ent.text)
- # elif ent.label_ == 'QUANTITY':
- # extracted['size'].append(ent.text)
- # elif ent.label_ == 'PRODUCT':
- # extracted['product_type'].append(ent.text)
-
- # # Color detection
- # colors = ['red', 'blue', 'green', 'black', 'white', 'yellow', 'orange',
- # 'purple', 'pink', 'brown', 'gray', 'grey', 'silver', 'gold']
- # text_lower = text.lower()
- # for color in colors:
- # if color in text_lower:
- # extracted['color'].append(color.title())
-
- # # Return most common value
- # result = {}
- # for key, values in extracted.items():
- # if values:
- # result[key] = max(set(values), key=values.count)
-
- # return result
- # attribute_scorer.py (Enhanced with SEO)
- import re
- import time
- from typing import Dict, List, Tuple
- from rapidfuzz import fuzz, process
- import spacy
- from collections import defaultdict
- import logging
- logger = logging.getLogger(__name__)
- class AttributeQualityScorer:
- """
- Enhanced scorer with AI-powered suggestions, robust error handling, and SEO scoring
- """
-
- def __init__(self, use_ai: bool = True, use_seo: bool = True):
- # Load spaCy model
- try:
- self.nlp = spacy.load("en_core_web_sm")
- except Exception as e:
- logger.warning(f"spaCy model not loaded: {e}")
- self.nlp = None
-
- # Initialize Gemini service
- self.use_ai = use_ai
- if use_ai:
- try:
- from .gemini_service import GeminiAttributeService
- self.ai_service = GeminiAttributeService()
- except Exception as e:
- logger.warning(f"Gemini service not available: {e}")
- self.use_ai = False
- self.ai_service = None
-
- # Initialize SEO scorer
- self.use_seo = use_seo
- if use_seo:
- try:
- from .seo_scorer import SEODiscoverabilityScorer
- self.seo_scorer = SEODiscoverabilityScorer()
- except Exception as e:
- logger.warning(f"SEO scorer not available: {e}")
- self.use_seo = False
- self.seo_scorer = None
-
- # Updated weights to include SEO (total = 100%)
- self.weights = {
- 'mandatory_fields': 0.34, # Reduced from 40% -> 34%
- 'standardization': 0.26, # Reduced from 30% -> 26%
- 'missing_values': 0.17, # Reduced from 20% -> 17%
- 'consistency': 0.08, # Reduced from 10% -> 8%
- 'seo_discoverability': 0.15 # NEW: 15%
- }
-
- def score_product(self, product: Dict, category_rules: List[Dict], generate_ai_suggestions: bool = True) -> Dict:
- """
- Enhanced scoring with AI suggestions, SEO scoring, and guaranteed AI call
- """
- start_time = time.time()
- attributes = product.get('attributes', {})
- category = product.get('category', '')
- # Initialize scores
- scores = {
- 'mandatory_fields': 0,
- 'standardization': 0,
- 'missing_values': 0,
- 'consistency': 0,
- 'seo_discoverability': 0
- }
- issues = []
- suggestions = []
- # --- Mandatory Fields ---
- try:
- mandatory_score, mandatory_issues, mandatory_suggestions = self._check_mandatory_fields(attributes, category_rules)
- scores['mandatory_fields'] = mandatory_score
- issues.extend(mandatory_issues)
- suggestions.extend(mandatory_suggestions)
- except Exception as e:
- logger.error(f"[Mandatory Fields] Error: {e}")
- scores['mandatory_fields'] = 0
- # --- Standardization ---
- try:
- std_score, std_issues, std_suggestions = self._check_standardization(attributes, category_rules)
- scores['standardization'] = std_score
- issues.extend(std_issues)
- suggestions.extend(std_suggestions)
- except Exception as e:
- logger.error(f"[Standardization] Error: {e}")
- scores['standardization'] = 0
- # --- Missing Values ---
- try:
- missing_score, missing_issues, missing_suggestions = self._check_missing_values(attributes, category_rules)
- scores['missing_values'] = missing_score
- issues.extend(missing_issues)
- suggestions.extend(missing_suggestions)
- except Exception as e:
- logger.error(f"[Missing Values] Error: {e}")
- scores['missing_values'] = 0
- # --- Consistency ---
- try:
- consistency_score, consistency_issues = self._check_consistency(attributes, product.get('title', ''), product.get('description', ''))
- scores['consistency'] = consistency_score
- issues.extend(consistency_issues)
- except Exception as e:
- logger.error(f"[Consistency] Error: {e}")
- scores['consistency'] = 0
- # --- SEO & Discoverability (NEW) ---
- seo_result = None
- if self.use_seo and self.seo_scorer:
- try:
- seo_result = self.seo_scorer.score_seo(product, category_rules)
- scores['seo_discoverability'] = seo_result['seo_score']
- issues.extend(seo_result['issues'])
- suggestions.extend(seo_result['suggestions'])
- except Exception as e:
- logger.error(f"[SEO Scoring] Error: {e}")
- scores['seo_discoverability'] = 0
- # --- Final Score ---
- final_score = sum(scores[key] * self.weights[key] for key in scores)
- result = {
- 'final_score': round(final_score, 2),
- 'max_score': 100.0,
- 'breakdown': scores,
- 'issues': issues,
- 'suggestions': suggestions,
- 'weights': self.weights,
- 'processing_time': round(time.time() - start_time, 3)
- }
- # Add SEO-specific details
- if seo_result:
- result['seo_details'] = {
- 'breakdown': seo_result['breakdown'],
- 'extracted_keywords': seo_result.get('extracted_keywords', []),
- 'missing_high_value_terms': seo_result.get('missing_high_value_terms', [])
- }
- # --- AI Suggestions (Guaranteed Attempt) ---
- if generate_ai_suggestions and self.use_ai:
- try:
- logger.info(f"Generating AI suggestions for SKU: {product.get('sku')}, issues count: {len(issues)}")
- ai_suggestions = self.ai_service.generate_attribute_suggestions(
- product,
- issues,
- category_rules
- )
- if not ai_suggestions:
- logger.warning(f"AI service returned empty suggestions for SKU: {product.get('sku')}")
- ai_suggestions = {"note": "No AI suggestions generated"}
- result['ai_suggestions'] = ai_suggestions
- except Exception as e:
- logger.error(f"[AI Suggestions] Error: {e}")
- result['ai_suggestions'] = {'error': str(e)}
- return result
-
- def _check_mandatory_fields(self, attributes: Dict, rules: List[Dict]) -> Tuple[float, List, List]:
- """Enhanced mandatory field validation with detailed feedback"""
- mandatory_rules = [r for r in rules if r.get('is_mandatory', False)]
-
- if not mandatory_rules:
- return 100.0, [], []
-
- present_count = 0
- issues = []
- suggestions = []
-
- for rule in mandatory_rules:
- attr_name = rule['attribute_name']
-
- # Check presence and validity
- if attr_name in attributes and attributes[attr_name]:
- value = str(attributes[attr_name]).strip()
-
- if not value:
- issues.append(f"Mandatory field '{attr_name}' is empty")
- suggestions.append(f"Provide a non-empty value for {attr_name}")
- continue
-
- # Check length constraints
- min_len = rule.get('min_length')
- max_len = rule.get('max_length')
-
- if min_len and len(value) < min_len:
- issues.append(f"'{attr_name}' too short (min: {min_len} chars)")
- suggestions.append(f"Expand {attr_name} to at least {min_len} characters")
- continue
-
- if max_len and len(value) > max_len:
- issues.append(f"'{attr_name}' too long (max: {max_len} chars)")
- suggestions.append(f"Shorten {attr_name} to {max_len} characters or less")
- continue
-
- # Check regex pattern
- regex = rule.get('validation_regex')
- if regex and not re.match(regex, value):
- issues.append(f"'{attr_name}' format invalid")
- suggestions.append(f"Ensure {attr_name} matches required format")
- continue
-
- present_count += 1
- else:
- issues.append(f"Missing mandatory field: {attr_name}")
- desc = rule.get('description', '')
- if desc:
- suggestions.append(f"Add {attr_name}: {desc}")
- else:
- suggestions.append(f"Add required attribute: {attr_name}")
-
- score = (present_count / len(mandatory_rules)) * 100 if mandatory_rules else 100.0
- return score, issues, suggestions
-
- def _check_standardization(self, attributes: Dict, rules: List[Dict]) -> Tuple[float, List, List]:
- """Enhanced standardization with better fuzzy matching"""
- standardized_rules = [r for r in rules if r.get('valid_values')]
-
- if not standardized_rules:
- return 100.0, [], []
-
- correct_count = 0
- issues = []
- suggestions = []
-
- for rule in standardized_rules:
- attr_name = rule['attribute_name']
- valid_values = rule['valid_values']
-
- if attr_name not in attributes or not attributes[attr_name]:
- continue
-
- actual_value = str(attributes[attr_name]).strip()
-
- if not actual_value:
- continue
-
- # Exact match (case-insensitive)
- if actual_value in valid_values:
- correct_count += 1
- continue
-
- # Case-insensitive match
- lower_valid = {v.lower(): v for v in valid_values}
- if actual_value.lower() in lower_valid:
- correct_count += 1
- if actual_value != lower_valid[actual_value.lower()]:
- issues.append(f"{attr_name}: Case mismatch - '{actual_value}' should be '{lower_valid[actual_value.lower()]}'")
- suggestions.append(f"Correct capitalization of {attr_name} to: {lower_valid[actual_value.lower()]}")
- continue
-
- # Fuzzy matching with multiple scorers
- best_match = None
- best_score = 0
-
- for scorer in [fuzz.ratio, fuzz.partial_ratio, fuzz.token_sort_ratio]:
- match = process.extractOne(actual_value, valid_values, scorer=scorer)
- if match and match[1] > best_score:
- best_match = match
- best_score = match[1]
-
- if best_match and best_score >= 80:
- correct_count += 1
- if best_score < 100:
- issues.append(f"{attr_name}: '{actual_value}' likely means '{best_match[0]}' (confidence: {best_score}%)")
- suggestions.append(f"Standardize {attr_name} to: {best_match[0]}")
- else:
- issues.append(f"{attr_name}: '{actual_value}' not recognized. Valid: {', '.join(valid_values[:5])}")
- suggestions.append(f"Change {attr_name} to one of: {', '.join(valid_values[:3])}")
-
- score = (correct_count / len(standardized_rules)) * 100 if standardized_rules else 100.0
- return score, issues, suggestions
-
- def _check_missing_values(self, attributes: Dict, rules: List[Dict]) -> Tuple[float, List, List]:
- """Enhanced placeholder detection"""
- placeholder_patterns = [
- r'^n/?a', r'^none', r'^null', r'^-+', r'^\.+'
- r'^tbd', r'^to be determined', r'^unknown', r'^na',
- r'^todo', r'^pending', r'^\?+', r'^xxx+', r'^placeholder'
- ]
-
- total_attrs = len(rules)
- valid_count = 0
- issues = []
- suggestions = []
-
- for rule in rules:
- attr_name = rule['attribute_name']
-
- if attr_name not in attributes:
- continue
-
- value = str(attributes[attr_name]).strip()
-
- # Check if empty
- if not value:
- issues.append(f"'{attr_name}' is empty")
- suggestions.append(f"Provide a valid value for {attr_name}")
- continue
-
- # Check if placeholder
- value_lower = value.lower()
- is_placeholder = any(re.match(pattern, value_lower, re.IGNORECASE) for pattern in placeholder_patterns)
-
- if is_placeholder:
- issues.append(f"'{attr_name}' contains placeholder: '{value}'")
- suggestions.append(f"Replace placeholder in {attr_name} with actual data")
- continue
-
- # Check for suspicious patterns
- if len(value) < 2 and rule.get('is_mandatory'):
- issues.append(f"'{attr_name}' suspiciously short: '{value}'")
- suggestions.append(f"Provide more detailed {attr_name}")
- continue
-
- valid_count += 1
-
- score = (valid_count / total_attrs) * 100 if total_attrs > 0 else 100.0
- return score, issues, suggestions
-
- def _check_consistency(self, attributes: Dict, title: str, description: str) -> Tuple[float, List]:
- """Enhanced consistency checking with context awareness"""
- issues = []
- consistency_count = 0
- total_checks = 0
-
- check_attrs = ['brand', 'color', 'size', 'material', 'model', 'weight', 'dimensions']
- combined_text = f"{title} {description}".lower()
-
- for attr in check_attrs:
- if attr not in attributes or not attributes[attr]:
- continue
-
- total_checks += 1
- attr_value = str(attributes[attr]).lower().strip()
-
- # Skip very short values
- if len(attr_value) < 2:
- consistency_count += 1
- continue
-
- # Direct substring match
- if attr_value in combined_text:
- consistency_count += 1
- continue
-
- # Word boundary match
- words_in_text = set(combined_text.split())
- words_in_attr = set(attr_value.split())
-
- if words_in_attr.issubset(words_in_text):
- consistency_count += 1
- continue
-
- # Fuzzy word matching
- text_words = combined_text.split()
- matches = 0
- for attr_word in words_in_attr:
- match = process.extractOne(attr_word, text_words, scorer=fuzz.ratio)
- if match and match[1] >= 80:
- matches += 1
-
- if matches / len(words_in_attr) >= 0.7:
- consistency_count += 1
- continue
-
- issues.append(f"'{attr.title()}': '{attributes[attr]}' not clearly mentioned in title/description")
-
- score = (consistency_count / total_checks) * 100 if total_checks > 0 else 100.0
- return score, issues
-
- def extract_attributes_from_text(self, text: str, category: str = '') -> Dict:
- """Enhanced attribute extraction with AI fallback"""
- extracted = {}
-
- # Try spaCy first
- if self.nlp:
- extracted = self._extract_with_spacy(text)
-
- # Use AI if available and spaCy found little
- if self.use_ai and len(extracted) < 3:
- try:
- ai_extracted = self.ai_service.extract_attributes_with_ai(text, '', category)
- extracted.update({k: v for k, v in ai_extracted.items() if v})
- except Exception as e:
- logger.error(f"AI extraction failed: {e}")
-
- return extracted
-
- def _extract_with_spacy(self, text: str) -> Dict:
- """Extract using spaCy NER"""
- doc = self.nlp(text)
- extracted = defaultdict(list)
-
- for ent in doc.ents:
- if ent.label_ == 'ORG':
- extracted['brand'].append(ent.text)
- elif ent.label_ == 'QUANTITY':
- extracted['size'].append(ent.text)
- elif ent.label_ == 'PRODUCT':
- extracted['product_type'].append(ent.text)
-
- # Color detection
- colors = ['red', 'blue', 'green', 'black', 'white', 'yellow', 'orange',
- 'purple', 'pink', 'brown', 'gray', 'grey', 'silver', 'gold']
- text_lower = text.lower()
- for color in colors:
- if color in text_lower:
- extracted['color'].append(color.title())
-
- # Return most common value
- result = {}
- for key, values in extracted.items():
- if values:
- result[key] = max(set(values), key=values.count)
-
- return result
|