|
@@ -1,8 +1,8 @@
|
|
|
# # attribute_scorer_integrated.py
|
|
|
# """
|
|
|
-# Enhanced AttributeQualityScorer with Title/Description scoring integrated
|
|
|
+# Enhanced AttributeQualityScorer with ProductContentRule integration
|
|
|
# """
|
|
|
-# import re # <-- ADDED: Required for regex patterns
|
|
|
+# import re
|
|
|
# import time
|
|
|
# import logging
|
|
|
# from typing import Dict, List, Tuple
|
|
@@ -11,157 +11,141 @@
|
|
|
|
|
|
# logger = logging.getLogger(__name__)
|
|
|
|
|
|
-# class AttributeQualityScorer:
|
|
|
+# class AttributeQualityScorer:
|
|
|
# """
|
|
|
-# Complete Product Quality Scorer with ALL components:
|
|
|
-# - Mandatory Fields (25%)
|
|
|
-# - Standardization (20%)
|
|
|
-# - Missing Values (13%)
|
|
|
-# - Consistency (7%)
|
|
|
-# - SEO Discoverability (10%)
|
|
|
-# - Title Quality (10%)
|
|
|
-# - Description Quality (15%)
|
|
|
+# Optimized Product Quality Scorer with safe handling of optional components:
|
|
|
+# - Mandatory Fields (16%)
|
|
|
+# - Standardization (12%)
|
|
|
+# - Missing Values (8%)
|
|
|
+# - Consistency (4%)
|
|
|
+# - SEO Discoverability (8%)
|
|
|
+# - Content Rules Compliance (12%)
|
|
|
+# - Title Quality (8%)
|
|
|
+# - Description Quality (12%)
|
|
|
+# - Image Quality (20%)
|
|
|
+# Optional components do not inflate final score; weights are rescaled proportionally.
|
|
|
# """
|
|
|
-
|
|
|
+
|
|
|
# def __init__(self, use_ai: bool = True, use_seo: bool = True):
|
|
|
-# # Load spaCy model (optional, for advanced NER)
|
|
|
+# # spaCy for NER
|
|
|
# self.nlp = None
|
|
|
# try:
|
|
|
# import spacy
|
|
|
# self.nlp = spacy.load("en_core_web_sm")
|
|
|
# logger.info("spaCy model loaded successfully")
|
|
|
# except Exception as e:
|
|
|
-# logger.warning(f"spaCy model not loaded: {e}. Will use fallback methods.")
|
|
|
-
|
|
|
-# # Initialize AI service
|
|
|
+# logger.warning(f"spaCy not loaded: {e}")
|
|
|
+
|
|
|
+# # AI Service
|
|
|
# self.use_ai = use_ai
|
|
|
# self.ai_service = None
|
|
|
# if use_ai:
|
|
|
# try:
|
|
|
# from .gemini_service import GeminiAttributeService
|
|
|
# self.ai_service = GeminiAttributeService()
|
|
|
-# logger.info("Gemini AI service initialized")
|
|
|
# except Exception as e:
|
|
|
-# logger.warning(f"Gemini service not available: {e}")
|
|
|
+# logger.warning(f"AI service not available: {e}")
|
|
|
# self.use_ai = False
|
|
|
-
|
|
|
-# # Initialize SEO scorer
|
|
|
+
|
|
|
+# # Image scorer
|
|
|
+# self.image_scorer = None
|
|
|
+# try:
|
|
|
+# from .image_scorer import ImageQualityScorer
|
|
|
+# self.image_scorer = ImageQualityScorer()
|
|
|
+# except Exception as e:
|
|
|
+# logger.warning(f"Image scorer not available: {e}")
|
|
|
+
|
|
|
+# # SEO scorer
|
|
|
# self.use_seo = use_seo
|
|
|
# self.seo_scorer = None
|
|
|
# if use_seo:
|
|
|
# try:
|
|
|
# from .seo_scorer import SEODiscoverabilityScorer
|
|
|
# self.seo_scorer = SEODiscoverabilityScorer()
|
|
|
-# logger.info("SEO scorer initialized")
|
|
|
# except Exception as e:
|
|
|
# logger.warning(f"SEO scorer not available: {e}")
|
|
|
# self.use_seo = False
|
|
|
-
|
|
|
-# # Initialize Title/Description scorer
|
|
|
+
|
|
|
+# # Title/Description scorer
|
|
|
# self.title_desc_scorer = None
|
|
|
# try:
|
|
|
# from .title_description_scorer import TitleDescriptionScorer
|
|
|
# self.title_desc_scorer = TitleDescriptionScorer(use_ai=use_ai)
|
|
|
-# logger.info("Title/Description scorer initialized")
|
|
|
# except Exception as e:
|
|
|
# logger.warning(f"Title/Description scorer not available: {e}")
|
|
|
-
|
|
|
-# # UPDATED WEIGHTS (Total = 100%)
|
|
|
+
|
|
|
+# # Content Rules scorer
|
|
|
+# self.content_rules_scorer = None
|
|
|
+# try:
|
|
|
+# from .content_rules_scorer import ContentRulesScorer
|
|
|
+# self.content_rules_scorer = ContentRulesScorer()
|
|
|
+# except Exception as e:
|
|
|
+# logger.warning(f"Content Rules scorer not available: {e}")
|
|
|
+
|
|
|
+# # Base weights
|
|
|
# self.weights = {
|
|
|
-# 'mandatory_fields': 0.25, # 25%
|
|
|
-# 'standardization': 0.20, # 20%
|
|
|
-# 'missing_values': 0.13, # 13%
|
|
|
-# 'consistency': 0.07, # 7%
|
|
|
-# 'seo_discoverability': 0.10, # 10%
|
|
|
-# 'title_quality': 0.10, # 10% (NEW)
|
|
|
-# 'description_quality': 0.15 # 15% (NEW)
|
|
|
+# 'mandatory_fields': 0.16,
|
|
|
+# 'standardization': 0.12,
|
|
|
+# 'missing_values': 0.08,
|
|
|
+# 'consistency': 0.04,
|
|
|
+# 'seo_discoverability': 0.08,
|
|
|
+# 'content_rules_compliance': 0.12,
|
|
|
+# 'title_quality': 0.08,
|
|
|
+# 'description_quality': 0.12,
|
|
|
+# 'image_quality': 0.20
|
|
|
# }
|
|
|
-
|
|
|
+
|
|
|
# def score_product(
|
|
|
-# self,
|
|
|
-# product: Dict,
|
|
|
-# category_rules: List[Dict],
|
|
|
+# self,
|
|
|
+# product: Dict,
|
|
|
+# category_rules: List[Dict],
|
|
|
+# content_rules: List[Dict] = None,
|
|
|
# generate_ai_suggestions: bool = True
|
|
|
# ) -> Dict:
|
|
|
-# """
|
|
|
-# Complete product scoring with all components
|
|
|
-
|
|
|
-# Args:
|
|
|
-# product: Product dict with sku, category, title, description, attributes
|
|
|
-# category_rules: List of CategoryAttributeRule dicts
|
|
|
-# generate_ai_suggestions: Whether to call AI for suggestions
|
|
|
-
|
|
|
-# Returns:
|
|
|
-# Dict with final_score, breakdown, issues, suggestions, etc.
|
|
|
-# """
|
|
|
+
|
|
|
# start_time = time.time()
|
|
|
# attributes = product.get('attributes', {})
|
|
|
# category = product.get('category', '')
|
|
|
-
|
|
|
-# # Initialize scores
|
|
|
-# scores = {
|
|
|
-# 'mandatory_fields': 0,
|
|
|
-# 'standardization': 0,
|
|
|
-# 'missing_values': 0,
|
|
|
-# 'consistency': 0,
|
|
|
-# 'seo_discoverability': 0,
|
|
|
-# 'title_quality': 0,
|
|
|
-# 'description_quality': 0
|
|
|
-# }
|
|
|
-
|
|
|
-# issues = []
|
|
|
-# suggestions = []
|
|
|
-
|
|
|
-# # --- 1. Mandatory Fields (25%) ---
|
|
|
+
|
|
|
+# scores = {k: 0 for k in self.weights.keys()}
|
|
|
+# issues, suggestions = [], []
|
|
|
+
|
|
|
+# # --- 1. Mandatory Fields ---
|
|
|
# try:
|
|
|
-# mandatory_score, mandatory_issues, mandatory_suggestions = self._check_mandatory_fields(
|
|
|
-# attributes, category_rules
|
|
|
-# )
|
|
|
-# scores['mandatory_fields'] = mandatory_score
|
|
|
-# issues.extend(mandatory_issues)
|
|
|
-# suggestions.extend(mandatory_suggestions)
|
|
|
+# score, i, s = self._check_mandatory_fields(attributes, category_rules)
|
|
|
+# scores['mandatory_fields'] = score
|
|
|
+# issues.extend(i)
|
|
|
+# suggestions.extend(s)
|
|
|
# except Exception as e:
|
|
|
-# logger.error(f"[Mandatory Fields] Error: {e}", exc_info=True)
|
|
|
-# scores['mandatory_fields'] = 0
|
|
|
-
|
|
|
-# # --- 2. Standardization (20%) ---
|
|
|
+# logger.error(f"[Mandatory Fields] {e}", exc_info=True)
|
|
|
+
|
|
|
+# # --- 2. Standardization ---
|
|
|
# try:
|
|
|
-# std_score, std_issues, std_suggestions = self._check_standardization(
|
|
|
-# attributes, category_rules
|
|
|
-# )
|
|
|
-# scores['standardization'] = std_score
|
|
|
-# issues.extend(std_issues)
|
|
|
-# suggestions.extend(std_suggestions)
|
|
|
+# score, i, s = self._check_standardization(attributes, category_rules)
|
|
|
+# scores['standardization'] = score
|
|
|
+# issues.extend(i)
|
|
|
+# suggestions.extend(s)
|
|
|
# except Exception as e:
|
|
|
-# logger.error(f"[Standardization] Error: {e}", exc_info=True)
|
|
|
-# scores['standardization'] = 0
|
|
|
-
|
|
|
-# # --- 3. Missing Values (13%) ---
|
|
|
+# logger.error(f"[Standardization] {e}", exc_info=True)
|
|
|
+
|
|
|
+# # --- 3. Missing Values ---
|
|
|
# try:
|
|
|
-# missing_score, missing_issues, missing_suggestions = self._check_missing_values(
|
|
|
-# attributes, category_rules
|
|
|
-# )
|
|
|
-# scores['missing_values'] = missing_score
|
|
|
-# issues.extend(missing_issues)
|
|
|
-# suggestions.extend(missing_suggestions)
|
|
|
+# score, i, s = self._check_missing_values(attributes, category_rules)
|
|
|
+# scores['missing_values'] = score
|
|
|
+# issues.extend(i)
|
|
|
+# suggestions.extend(s)
|
|
|
# except Exception as e:
|
|
|
-# logger.error(f"[Missing Values] Error: {e}", exc_info=True)
|
|
|
-# scores['missing_values'] = 0
|
|
|
-
|
|
|
-# # --- 4. Consistency (7%) ---
|
|
|
+# logger.error(f"[Missing Values] {e}", exc_info=True)
|
|
|
+
|
|
|
+# # --- 4. Consistency ---
|
|
|
# try:
|
|
|
-# consistency_score, consistency_issues = self._check_consistency(
|
|
|
-# attributes,
|
|
|
-# product.get('title', ''),
|
|
|
-# product.get('description', '')
|
|
|
-# )
|
|
|
-# scores['consistency'] = consistency_score
|
|
|
-# issues.extend(consistency_issues)
|
|
|
+# score, i = self._check_consistency(attributes, product.get('title', ''), product.get('description', ''))
|
|
|
+# scores['consistency'] = score
|
|
|
+# issues.extend(i)
|
|
|
# except Exception as e:
|
|
|
-# logger.error(f"[Consistency] Error: {e}", exc_info=True)
|
|
|
-# scores['consistency'] = 0
|
|
|
-
|
|
|
-# # --- 5. SEO Discoverability (10%) ---
|
|
|
+# logger.error(f"[Consistency] {e}", exc_info=True)
|
|
|
+
|
|
|
+# # --- 5. SEO ---
|
|
|
# seo_result = None
|
|
|
# if self.use_seo and self.seo_scorer:
|
|
|
# try:
|
|
@@ -170,84 +154,80 @@
|
|
|
# issues.extend(seo_result['issues'])
|
|
|
# suggestions.extend(seo_result['suggestions'])
|
|
|
# except Exception as e:
|
|
|
-# logger.error(f"[SEO Scoring] Error: {e}", exc_info=True)
|
|
|
-# scores['seo_discoverability'] = 0
|
|
|
-
|
|
|
-# # --- 6. Title & Description Quality (25% combined) ---
|
|
|
+# logger.error(f"[SEO] {e}", exc_info=True)
|
|
|
+
|
|
|
+# # --- 6. Content Rules ---
|
|
|
+# content_rules_result = None
|
|
|
+# if content_rules and self.content_rules_scorer:
|
|
|
+# try:
|
|
|
+# content_rules_result = self.content_rules_scorer.score_content_fields(product, content_rules)
|
|
|
+# scores['content_rules_compliance'] = content_rules_result['overall_content_score']
|
|
|
+# issues.extend(content_rules_result['issues'])
|
|
|
+# suggestions.extend(content_rules_result['suggestions'])
|
|
|
+# except Exception as e:
|
|
|
+# logger.error(f"[Content Rules] {e}", exc_info=True)
|
|
|
+
|
|
|
+# # --- 7. Title & Description ---
|
|
|
# title_desc_result = None
|
|
|
# if self.title_desc_scorer:
|
|
|
# try:
|
|
|
-# title_desc_result = self.title_desc_scorer.score_title_and_description(
|
|
|
-# product,
|
|
|
-# category_rules
|
|
|
-# )
|
|
|
+# title_desc_result = self.title_desc_scorer.score_title_and_description(product, category_rules)
|
|
|
# scores['title_quality'] = title_desc_result['title_score']
|
|
|
# scores['description_quality'] = title_desc_result['description_score']
|
|
|
# issues.extend(title_desc_result['issues'])
|
|
|
# suggestions.extend(title_desc_result['suggestions'])
|
|
|
# except Exception as e:
|
|
|
-# logger.error(f"[Title/Description Scoring] Error: {e}", exc_info=True)
|
|
|
-# scores['title_quality'] = 0
|
|
|
-# scores['description_quality'] = 0
|
|
|
-
|
|
|
-# # --- Final Score Calculation ---
|
|
|
-# final_score = sum(scores[key] * self.weights[key] for key in scores)
|
|
|
-
|
|
|
-# result = {
|
|
|
-# 'final_score': round(final_score, 2),
|
|
|
-# 'max_score': 100.0,
|
|
|
-# 'breakdown': scores,
|
|
|
-# 'issues': issues,
|
|
|
-# 'suggestions': suggestions,
|
|
|
-# 'weights': self.weights,
|
|
|
-# 'processing_time': round(time.time() - start_time, 3)
|
|
|
-# }
|
|
|
-
|
|
|
-# # Add component-specific details
|
|
|
-# if seo_result:
|
|
|
-# result['seo_details'] = {
|
|
|
-# 'breakdown': seo_result['breakdown'],
|
|
|
-# 'extracted_keywords': seo_result.get('extracted_keywords', []),
|
|
|
-# 'missing_high_value_terms': seo_result.get('missing_high_value_terms', [])
|
|
|
-# }
|
|
|
-
|
|
|
-# if title_desc_result:
|
|
|
-# result['title_description_details'] = {
|
|
|
-# 'title_breakdown': title_desc_result.get('title_breakdown', {}),
|
|
|
-# 'description_breakdown': title_desc_result.get('description_breakdown', {}),
|
|
|
-# 'ai_improvements': title_desc_result.get('ai_improvements')
|
|
|
-# }
|
|
|
-
|
|
|
-# # --- AI Suggestions (Comprehensive) ---
|
|
|
+# logger.error(f"[Title/Desc] {e}", exc_info=True)
|
|
|
+
|
|
|
+# # --- 8. Image Quality (always present in breakdown) ---
|
|
|
+# image_result = None
|
|
|
+# if self.image_scorer:
|
|
|
+# images = product.get("images", [])
|
|
|
+# if images:
|
|
|
+# try:
|
|
|
+# image_result = self.image_scorer.score_images(images)
|
|
|
+# scores['image_quality'] = image_result.get("overall_image_score", 0)
|
|
|
+# except Exception as e:
|
|
|
+# logger.error(f"[Image] {e}", exc_info=True)
|
|
|
+# scores['image_quality'] = None
|
|
|
+# else:
|
|
|
+# scores['image_quality'] = None # explicitly mark as unavailable
|
|
|
+# else:
|
|
|
+# scores['image_quality'] = None
|
|
|
+
|
|
|
+
|
|
|
+# # --- 9. Weight Rescaling for applicable numeric components ---
|
|
|
+# numeric_scores = {k: v for k, v in scores.items() if isinstance(v, (int, float))}
|
|
|
+# applicable_weights = {k: self.weights[k] for k in numeric_scores}
|
|
|
+# total_weight = sum(applicable_weights.values())
|
|
|
+# final_score = sum(numeric_scores[k] * (applicable_weights[k]/total_weight) for k in numeric_scores) if numeric_scores else None
|
|
|
+
|
|
|
+
|
|
|
+# if image_result:
|
|
|
+# result['image_details'] = image_result
|
|
|
+
|
|
|
+# # AI Suggestions
|
|
|
# if generate_ai_suggestions and self.use_ai and self.ai_service:
|
|
|
# try:
|
|
|
-# logger.info(f"Generating comprehensive AI suggestions for SKU: {product.get('sku')}")
|
|
|
# ai_suggestions = self.ai_service.generate_comprehensive_suggestions(
|
|
|
-# product,
|
|
|
-# issues,
|
|
|
-# category_rules,
|
|
|
-# scores
|
|
|
+# product, issues, category_rules, scores
|
|
|
# )
|
|
|
# result['ai_suggestions'] = ai_suggestions
|
|
|
# except Exception as e:
|
|
|
-# logger.error(f"[AI Suggestions] Error: {e}", exc_info=True)
|
|
|
+# logger.error(f"[AI Suggestions] {e}", exc_info=True)
|
|
|
# result['ai_suggestions'] = {'error': str(e)}
|
|
|
-
|
|
|
+
|
|
|
# return result
|
|
|
-
|
|
|
-# # ========== ATTRIBUTE VALIDATION METHODS ==========
|
|
|
+
|
|
|
+
|
|
|
+# # ========== ATTRIBUTE VALIDATION METHODS (unchanged) ==========
|
|
|
|
|
|
# def _check_mandatory_fields(
|
|
|
# self,
|
|
|
# attributes: Dict,
|
|
|
# rules: List[Dict]
|
|
|
# ) -> Tuple[float, List[str], List[str]]:
|
|
|
-# """
|
|
|
-# Check if all mandatory fields are present and valid
|
|
|
-
|
|
|
-# Returns:
|
|
|
-# Tuple of (score, issues, suggestions)
|
|
|
-# """
|
|
|
+# """Check if all mandatory fields are present and valid"""
|
|
|
# mandatory_rules = [r for r in rules if r.get('is_mandatory', False)]
|
|
|
|
|
|
# if not mandatory_rules:
|
|
@@ -310,12 +290,7 @@
|
|
|
# attributes: Dict,
|
|
|
# rules: List[Dict]
|
|
|
# ) -> Tuple[float, List[str], List[str]]:
|
|
|
-# """
|
|
|
-# Check if attribute values match standardized valid values
|
|
|
-
|
|
|
-# Returns:
|
|
|
-# Tuple of (score, issues, suggestions)
|
|
|
-# """
|
|
|
+# """Check if attribute values match standardized valid values"""
|
|
|
# standardized_rules = [r for r in rules if r.get('valid_values')]
|
|
|
|
|
|
# if not standardized_rules:
|
|
@@ -329,7 +304,7 @@
|
|
|
# attr_name = rule['attribute_name']
|
|
|
# valid_values = rule['valid_values']
|
|
|
|
|
|
-# if not valid_values: # Skip if empty valid_values list
|
|
|
+# if not valid_values:
|
|
|
# continue
|
|
|
|
|
|
# if attr_name not in attributes or not attributes[attr_name]:
|
|
@@ -383,12 +358,7 @@
|
|
|
# attributes: Dict,
|
|
|
# rules: List[Dict]
|
|
|
# ) -> Tuple[float, List[str], List[str]]:
|
|
|
-# """
|
|
|
-# Check for placeholder values or empty attributes
|
|
|
-
|
|
|
-# Returns:
|
|
|
-# Tuple of (score, issues, suggestions)
|
|
|
-# """
|
|
|
+# """Check for placeholder values or empty attributes"""
|
|
|
# placeholder_patterns = [
|
|
|
# r'^n/?a$', r'^none$', r'^null$', r'^-+$', r'^\.+$',
|
|
|
# r'^tbd$', r'^to be determined$', r'^unknown$', r'^na$',
|
|
@@ -441,12 +411,7 @@
|
|
|
# title: str,
|
|
|
# description: str
|
|
|
# ) -> Tuple[float, List[str]]:
|
|
|
-# """
|
|
|
-# Check if attribute values are mentioned in title/description
|
|
|
-
|
|
|
-# Returns:
|
|
|
-# Tuple of (score, issues)
|
|
|
-# """
|
|
|
+# """Check if attribute values are mentioned in title/description"""
|
|
|
# issues = []
|
|
|
# consistency_count = 0
|
|
|
# total_checks = 0
|
|
@@ -461,17 +426,14 @@
|
|
|
# total_checks += 1
|
|
|
# attr_value = str(attributes[attr]).lower().strip()
|
|
|
|
|
|
-# # Skip very short values (likely abbreviations or numbers)
|
|
|
# if len(attr_value) < 2:
|
|
|
# consistency_count += 1
|
|
|
# continue
|
|
|
|
|
|
-# # Direct substring match
|
|
|
# if attr_value in combined_text:
|
|
|
# consistency_count += 1
|
|
|
# continue
|
|
|
|
|
|
-# # Word boundary match (for multi-word attributes)
|
|
|
# words_in_attr = set(attr_value.split())
|
|
|
# words_in_text = set(combined_text.split())
|
|
|
|
|
@@ -479,7 +441,6 @@
|
|
|
# consistency_count += 1
|
|
|
# continue
|
|
|
|
|
|
-# # Fuzzy matching as last resort
|
|
|
# text_words = combined_text.split()
|
|
|
# if text_words:
|
|
|
# match = process.extractOne(attr_value, text_words, scorer=fuzz.ratio)
|
|
@@ -488,7 +449,6 @@
|
|
|
# consistency_count += 1
|
|
|
# continue
|
|
|
|
|
|
-# # If we get here, attribute is not mentioned
|
|
|
# issues.append(
|
|
|
# f"'{attr.title()}': '{attributes[attr]}' not mentioned in title/description"
|
|
|
# )
|
|
@@ -496,36 +456,21 @@
|
|
|
# score = (consistency_count / total_checks) * 100 if total_checks > 0 else 100.0
|
|
|
# return score, issues
|
|
|
|
|
|
-# # ========== UTILITY METHODS ==========
|
|
|
+# # ========== UTILITY METHODS (unchanged) ==========
|
|
|
|
|
|
# def extract_attributes_from_text(self, text: str, category: str = '') -> Dict:
|
|
|
-# """
|
|
|
-# Extract attributes from unstructured text using NER and patterns
|
|
|
-
|
|
|
-# Args:
|
|
|
-# text: Product title or description
|
|
|
-# category: Product category (optional, for context)
|
|
|
-
|
|
|
-# Returns:
|
|
|
-# Dict of extracted attributes
|
|
|
-# """
|
|
|
+# """Extract attributes from unstructured text using NER and patterns"""
|
|
|
# extracted = {}
|
|
|
|
|
|
-# # Try spaCy NER if available
|
|
|
# if self.nlp:
|
|
|
# try:
|
|
|
# doc = self.nlp(text)
|
|
|
-
|
|
|
-# # Extract organizations as potential brands
|
|
|
# orgs = [ent.text for ent in doc.ents if ent.label_ == 'ORG']
|
|
|
# if orgs:
|
|
|
# extracted['brand'] = orgs[0]
|
|
|
-
|
|
|
-# # Extract quantities as potential sizes
|
|
|
# quantities = [ent.text for ent in doc.ents if ent.label_ == 'QUANTITY']
|
|
|
# if quantities:
|
|
|
# extracted['size'] = quantities[0]
|
|
|
-
|
|
|
# except Exception as e:
|
|
|
# logger.warning(f"spaCy extraction failed: {e}")
|
|
|
|
|
@@ -547,7 +492,6 @@
|
|
|
# ai_extracted = self.ai_service.extract_attributes_with_ai(
|
|
|
# text, '', category
|
|
|
# )
|
|
|
-# # Merge AI results (don't override existing)
|
|
|
# for key, value in ai_extracted.items():
|
|
|
# if key not in extracted and value:
|
|
|
# extracted[key] = value
|
|
@@ -557,15 +501,7 @@
|
|
|
# return extracted
|
|
|
|
|
|
# def get_score_interpretation(self, score: float) -> Dict[str, str]:
|
|
|
-# """
|
|
|
-# Get human-readable interpretation of score
|
|
|
-
|
|
|
-# Args:
|
|
|
-# score: Numeric score (0-100)
|
|
|
-
|
|
|
-# Returns:
|
|
|
-# Dict with grade, status, and recommendation
|
|
|
-# """
|
|
|
+# """Get human-readable interpretation of score"""
|
|
|
# if score >= 90:
|
|
|
# return {
|
|
|
# 'grade': 'A',
|
|
@@ -613,32 +549,35 @@
|
|
|
|
|
|
|
|
|
|
|
|
+
|
|
|
+
|
|
|
# attribute_scorer_integrated.py
|
|
|
"""
|
|
|
Enhanced AttributeQualityScorer with ProductContentRule integration
|
|
|
+Consistent breakdown and component status for unavailable scores
|
|
|
"""
|
|
|
import re
|
|
|
import time
|
|
|
import logging
|
|
|
from typing import Dict, List, Tuple
|
|
|
from rapidfuzz import fuzz, process
|
|
|
-from collections import defaultdict
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AttributeQualityScorer:
|
|
|
"""
|
|
|
Complete Product Quality Scorer with ALL components INCLUDING ProductContentRule validation:
|
|
|
- - Mandatory Fields (20%)
|
|
|
- - Standardization (15%)
|
|
|
- - Missing Values (10%)
|
|
|
- - Consistency (5%)
|
|
|
- - SEO Discoverability (10%)
|
|
|
- - Content Rules Compliance (15%) ← NEW: Validates against ProductContentRule
|
|
|
- - Title Quality (10%)
|
|
|
- - Description Quality (15%)
|
|
|
+ - Mandatory Fields (16%)
|
|
|
+ - Standardization (12%)
|
|
|
+ - Missing Values (8%)
|
|
|
+ - Consistency (4%)
|
|
|
+ - SEO Discoverability (8%)
|
|
|
+ - Content Rules Compliance (12%) ← NEW
|
|
|
+ - Title Quality (8%)
|
|
|
+ - Description Quality (12%)
|
|
|
+ - Image Quality (20%)
|
|
|
"""
|
|
|
-
|
|
|
+
|
|
|
def __init__(self, use_ai: bool = True, use_seo: bool = True):
|
|
|
# Load spaCy model (optional, for advanced NER)
|
|
|
self.nlp = None
|
|
@@ -660,7 +599,16 @@ class AttributeQualityScorer:
|
|
|
except Exception as e:
|
|
|
logger.warning(f"Gemini service not available: {e}")
|
|
|
self.use_ai = False
|
|
|
-
|
|
|
+
|
|
|
+ # Initialize Image Scorer
|
|
|
+ self.image_scorer = None
|
|
|
+ try:
|
|
|
+ from .image_scorer import ImageQualityScorer
|
|
|
+ self.image_scorer = ImageQualityScorer()
|
|
|
+ logger.info("Image scorer initialized")
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"Image scorer not available: {e}")
|
|
|
+
|
|
|
# Initialize SEO scorer
|
|
|
self.use_seo = use_seo
|
|
|
self.seo_scorer = None
|
|
@@ -672,7 +620,7 @@ class AttributeQualityScorer:
|
|
|
except Exception as e:
|
|
|
logger.warning(f"SEO scorer not available: {e}")
|
|
|
self.use_seo = False
|
|
|
-
|
|
|
+
|
|
|
# Initialize Title/Description scorer
|
|
|
self.title_desc_scorer = None
|
|
|
try:
|
|
@@ -681,7 +629,7 @@ class AttributeQualityScorer:
|
|
|
logger.info("Title/Description scorer initialized")
|
|
|
except Exception as e:
|
|
|
logger.warning(f"Title/Description scorer not available: {e}")
|
|
|
-
|
|
|
+
|
|
|
# Initialize Content Rules scorer ← NEW
|
|
|
self.content_rules_scorer = None
|
|
|
try:
|
|
@@ -690,246 +638,206 @@ class AttributeQualityScorer:
|
|
|
logger.info("Content Rules scorer initialized")
|
|
|
except Exception as e:
|
|
|
logger.warning(f"Content Rules scorer not available: {e}")
|
|
|
-
|
|
|
+
|
|
|
# UPDATED WEIGHTS (Total = 100%)
|
|
|
self.weights = {
|
|
|
- 'mandatory_fields': 0.20, # 20% (reduced from 25%)
|
|
|
- 'standardization': 0.15, # 15% (reduced from 20%)
|
|
|
- 'missing_values': 0.10, # 10% (reduced from 13%)
|
|
|
- 'consistency': 0.05, # 5% (reduced from 7%)
|
|
|
- 'seo_discoverability': 0.10, # 10%
|
|
|
- 'content_rules_compliance': 0.15, # 15% ← NEW: ProductContentRule validation
|
|
|
- 'title_quality': 0.10, # 10%
|
|
|
- 'description_quality': 0.15 # 15%
|
|
|
+ 'mandatory_fields': 0.16,
|
|
|
+ 'standardization': 0.12,
|
|
|
+ 'missing_values': 0.08,
|
|
|
+ 'consistency': 0.04,
|
|
|
+ 'seo_discoverability': 0.08,
|
|
|
+ 'content_rules_compliance': 0.12,
|
|
|
+ 'title_quality': 0.08,
|
|
|
+ 'description_quality': 0.12,
|
|
|
+ 'image_quality': 0.20
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
def score_product(
|
|
|
self,
|
|
|
product: Dict,
|
|
|
category_rules: List[Dict],
|
|
|
- content_rules: List[Dict] = None, # ← NEW parameter
|
|
|
+ content_rules: List[Dict] = None,
|
|
|
generate_ai_suggestions: bool = True
|
|
|
) -> Dict:
|
|
|
"""
|
|
|
Complete product scoring with all components including ProductContentRule validation
|
|
|
-
|
|
|
- Args:
|
|
|
- product: Product dict with sku, category, title, description, attributes
|
|
|
- category_rules: List of CategoryAttributeRule dicts
|
|
|
- content_rules: List of ProductContentRule dicts (NEW)
|
|
|
- generate_ai_suggestions: Whether to call AI for suggestions
|
|
|
-
|
|
|
- Returns:
|
|
|
- Dict with final_score, breakdown, issues, suggestions, etc.
|
|
|
+ and consistent breakdown
|
|
|
"""
|
|
|
start_time = time.time()
|
|
|
attributes = product.get('attributes', {})
|
|
|
category = product.get('category', '')
|
|
|
-
|
|
|
- # Initialize scores
|
|
|
- scores = {
|
|
|
- 'mandatory_fields': 0,
|
|
|
- 'standardization': 0,
|
|
|
- 'missing_values': 0,
|
|
|
- 'consistency': 0,
|
|
|
- 'seo_discoverability': 0,
|
|
|
- 'content_rules_compliance': 0, # ← NEW
|
|
|
- 'title_quality': 0,
|
|
|
- 'description_quality': 0
|
|
|
- }
|
|
|
-
|
|
|
+
|
|
|
+ # Initialize scores and status
|
|
|
+ scores = {k: None for k in self.weights}
|
|
|
+ component_status = {k: "Not evaluated yet" for k in self.weights}
|
|
|
issues = []
|
|
|
suggestions = []
|
|
|
-
|
|
|
- # --- 1. Mandatory Fields (20%) ---
|
|
|
+
|
|
|
+ # --- 1. Mandatory Fields ---
|
|
|
try:
|
|
|
- mandatory_score, mandatory_issues, mandatory_suggestions = self._check_mandatory_fields(
|
|
|
- attributes, category_rules
|
|
|
- )
|
|
|
- scores['mandatory_fields'] = mandatory_score
|
|
|
- issues.extend(mandatory_issues)
|
|
|
- suggestions.extend(mandatory_suggestions)
|
|
|
+ score, comp_issues, comp_suggestions = self._check_mandatory_fields(attributes, category_rules)
|
|
|
+ scores['mandatory_fields'] = score
|
|
|
+ issues.extend(comp_issues)
|
|
|
+ suggestions.extend(comp_suggestions)
|
|
|
+ component_status['mandatory_fields'] = "Scored successfully"
|
|
|
except Exception as e:
|
|
|
- logger.error(f"[Mandatory Fields] Error: {e}", exc_info=True)
|
|
|
- scores['mandatory_fields'] = 0
|
|
|
-
|
|
|
- # --- 2. Standardization (15%) ---
|
|
|
+ logger.error(f"[Mandatory Fields] {e}", exc_info=True)
|
|
|
+ component_status['mandatory_fields'] = f"Error: {str(e)}"
|
|
|
+
|
|
|
+ # --- 2. Standardization ---
|
|
|
try:
|
|
|
- std_score, std_issues, std_suggestions = self._check_standardization(
|
|
|
- attributes, category_rules
|
|
|
- )
|
|
|
- scores['standardization'] = std_score
|
|
|
- issues.extend(std_issues)
|
|
|
- suggestions.extend(std_suggestions)
|
|
|
+ score, comp_issues, comp_suggestions = self._check_standardization(attributes, category_rules)
|
|
|
+ scores['standardization'] = score
|
|
|
+ issues.extend(comp_issues)
|
|
|
+ suggestions.extend(comp_suggestions)
|
|
|
+ component_status['standardization'] = "Scored successfully"
|
|
|
except Exception as e:
|
|
|
- logger.error(f"[Standardization] Error: {e}", exc_info=True)
|
|
|
- scores['standardization'] = 0
|
|
|
-
|
|
|
- # --- 3. Missing Values (10%) ---
|
|
|
+ logger.error(f"[Standardization] {e}", exc_info=True)
|
|
|
+ component_status['standardization'] = f"Error: {str(e)}"
|
|
|
+
|
|
|
+ # --- 3. Missing Values ---
|
|
|
try:
|
|
|
- missing_score, missing_issues, missing_suggestions = self._check_missing_values(
|
|
|
- attributes, category_rules
|
|
|
- )
|
|
|
- scores['missing_values'] = missing_score
|
|
|
- issues.extend(missing_issues)
|
|
|
- suggestions.extend(missing_suggestions)
|
|
|
+ score, comp_issues, comp_suggestions = self._check_missing_values(attributes, category_rules)
|
|
|
+ scores['missing_values'] = score
|
|
|
+ issues.extend(comp_issues)
|
|
|
+ suggestions.extend(comp_suggestions)
|
|
|
+ component_status['missing_values'] = "Scored successfully"
|
|
|
except Exception as e:
|
|
|
- logger.error(f"[Missing Values] Error: {e}", exc_info=True)
|
|
|
- scores['missing_values'] = 0
|
|
|
-
|
|
|
- # --- 4. Consistency (5%) ---
|
|
|
+ logger.error(f"[Missing Values] {e}", exc_info=True)
|
|
|
+ component_status['missing_values'] = f"Error: {str(e)}"
|
|
|
+
|
|
|
+ # --- 4. Consistency ---
|
|
|
try:
|
|
|
- consistency_score, consistency_issues = self._check_consistency(
|
|
|
- attributes,
|
|
|
- product.get('title', ''),
|
|
|
- product.get('description', '')
|
|
|
- )
|
|
|
- scores['consistency'] = consistency_score
|
|
|
- issues.extend(consistency_issues)
|
|
|
+ score, comp_issues = self._check_consistency(attributes, product.get('title', ''), product.get('description', ''))
|
|
|
+ scores['consistency'] = score
|
|
|
+ issues.extend(comp_issues)
|
|
|
+ component_status['consistency'] = "Scored successfully"
|
|
|
except Exception as e:
|
|
|
- logger.error(f"[Consistency] Error: {e}", exc_info=True)
|
|
|
- scores['consistency'] = 0
|
|
|
-
|
|
|
- # --- 5. SEO Discoverability (10%) ---
|
|
|
- seo_result = None
|
|
|
+ logger.error(f"[Consistency] {e}", exc_info=True)
|
|
|
+ component_status['consistency'] = f"Error: {str(e)}"
|
|
|
+
|
|
|
+ # --- 5. SEO Discoverability ---
|
|
|
if self.use_seo and self.seo_scorer:
|
|
|
try:
|
|
|
seo_result = self.seo_scorer.score_seo(product, category_rules)
|
|
|
- scores['seo_discoverability'] = seo_result['seo_score']
|
|
|
- issues.extend(seo_result['issues'])
|
|
|
- suggestions.extend(seo_result['suggestions'])
|
|
|
+ scores['seo_discoverability'] = seo_result.get('seo_score', None)
|
|
|
+ issues.extend(seo_result.get('issues', []))
|
|
|
+ suggestions.extend(seo_result.get('suggestions', []))
|
|
|
+ component_status['seo_discoverability'] = "Scored successfully"
|
|
|
except Exception as e:
|
|
|
- logger.error(f"[SEO Scoring] Error: {e}", exc_info=True)
|
|
|
- scores['seo_discoverability'] = 0
|
|
|
-
|
|
|
- # --- 6. Content Rules Compliance (15%) ← NEW ---
|
|
|
- content_rules_result = None
|
|
|
+ logger.error(f"[SEO] {e}", exc_info=True)
|
|
|
+ component_status['seo_discoverability'] = f"Error: {str(e)}"
|
|
|
+ else:
|
|
|
+ component_status['seo_discoverability'] = "SEO scorer not available"
|
|
|
+
|
|
|
+ # --- 6. Content Rules Compliance ---
|
|
|
if content_rules and self.content_rules_scorer:
|
|
|
try:
|
|
|
- content_rules_result = self.content_rules_scorer.score_content_fields(
|
|
|
- product, content_rules
|
|
|
- )
|
|
|
- scores['content_rules_compliance'] = content_rules_result['overall_content_score']
|
|
|
- issues.extend(content_rules_result['issues'])
|
|
|
- suggestions.extend(content_rules_result['suggestions'])
|
|
|
+ content_result = self.content_rules_scorer.score_content_fields(product, content_rules)
|
|
|
+ scores['content_rules_compliance'] = content_result.get('overall_content_score', None)
|
|
|
+ issues.extend(content_result.get('issues', []))
|
|
|
+ suggestions.extend(content_result.get('suggestions', []))
|
|
|
+ component_status['content_rules_compliance'] = "Scored successfully"
|
|
|
except Exception as e:
|
|
|
- logger.error(f"[Content Rules] Error: {e}", exc_info=True)
|
|
|
- scores['content_rules_compliance'] = 0
|
|
|
+ logger.error(f"[Content Rules] {e}", exc_info=True)
|
|
|
+ component_status['content_rules_compliance'] = f"Error: {str(e)}"
|
|
|
else:
|
|
|
- # If no content rules provided, give neutral score
|
|
|
- scores['content_rules_compliance'] = 100.0
|
|
|
-
|
|
|
- # --- 7. Title & Description Quality (25% combined) ---
|
|
|
- title_desc_result = None
|
|
|
+ scores['content_rules_compliance'] = None
|
|
|
+ component_status['content_rules_compliance'] = "No content rules provided"
|
|
|
+
|
|
|
+ # --- 7. Title & Description ---
|
|
|
if self.title_desc_scorer:
|
|
|
try:
|
|
|
- title_desc_result = self.title_desc_scorer.score_title_and_description(
|
|
|
- product,
|
|
|
- category_rules
|
|
|
- )
|
|
|
- scores['title_quality'] = title_desc_result['title_score']
|
|
|
- scores['description_quality'] = title_desc_result['description_score']
|
|
|
- issues.extend(title_desc_result['issues'])
|
|
|
- suggestions.extend(title_desc_result['suggestions'])
|
|
|
+ td_result = self.title_desc_scorer.score_title_and_description(product, category_rules)
|
|
|
+ scores['title_quality'] = td_result.get('title_score', None)
|
|
|
+ scores['description_quality'] = td_result.get('description_score', None)
|
|
|
+ issues.extend(td_result.get('issues', []))
|
|
|
+ suggestions.extend(td_result.get('suggestions', []))
|
|
|
+ component_status['title_quality'] = "Scored successfully"
|
|
|
+ component_status['description_quality'] = "Scored successfully"
|
|
|
except Exception as e:
|
|
|
- logger.error(f"[Title/Description Scoring] Error: {e}", exc_info=True)
|
|
|
- scores['title_quality'] = 0
|
|
|
- scores['description_quality'] = 0
|
|
|
-
|
|
|
- # --- Final Score Calculation ---
|
|
|
- final_score = sum(scores[key] * self.weights[key] for key in scores)
|
|
|
-
|
|
|
+ logger.error(f"[Title/Description] {e}", exc_info=True)
|
|
|
+ component_status['title_quality'] = f"Error: {str(e)}"
|
|
|
+ component_status['description_quality'] = f"Error: {str(e)}"
|
|
|
+ else:
|
|
|
+ component_status['title_quality'] = "Title/Description scorer not available"
|
|
|
+ component_status['description_quality'] = "Title/Description scorer not available"
|
|
|
+
|
|
|
+ # --- 8. Image Quality ---
|
|
|
+ if self.image_scorer:
|
|
|
+ try:
|
|
|
+ images = product.get('images', [])
|
|
|
+ if images:
|
|
|
+ img_result = self.image_scorer.score_images(images)
|
|
|
+ scores['image_quality'] = img_result.get("overall_image_score", None)
|
|
|
+ component_status['image_quality'] = "Scored successfully"
|
|
|
+ else:
|
|
|
+ scores['image_quality'] = None
|
|
|
+ component_status['image_quality'] = "No images provided"
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"[Image] {e}", exc_info=True)
|
|
|
+ scores['image_quality'] = None
|
|
|
+ component_status['image_quality'] = f"Error: {str(e)}"
|
|
|
+ else:
|
|
|
+ scores['image_quality'] = None
|
|
|
+ component_status['image_quality'] = "Image scorer not available"
|
|
|
+
|
|
|
+ # --- Final Score Calculation: use only numeric scores ---
|
|
|
+ numeric_scores = {k: v for k, v in scores.items() if isinstance(v, (int, float))}
|
|
|
+ applicable_weights = {k: self.weights[k] for k in numeric_scores}
|
|
|
+ total_weight = sum(applicable_weights.values())
|
|
|
+ if numeric_scores and total_weight > 0:
|
|
|
+ final_score = sum(numeric_scores[k] * (applicable_weights[k] / total_weight) for k in numeric_scores)
|
|
|
+ else:
|
|
|
+ final_score = None
|
|
|
+
|
|
|
result = {
|
|
|
- 'final_score': round(final_score, 2),
|
|
|
+ 'final_score': round(final_score, 2) if final_score is not None else None,
|
|
|
'max_score': 100.0,
|
|
|
'breakdown': scores,
|
|
|
+ 'component_status': component_status,
|
|
|
'issues': issues,
|
|
|
'suggestions': suggestions,
|
|
|
'weights': self.weights,
|
|
|
'processing_time': round(time.time() - start_time, 3)
|
|
|
}
|
|
|
-
|
|
|
- # Add component-specific details
|
|
|
- if seo_result:
|
|
|
- result['seo_details'] = {
|
|
|
- 'breakdown': seo_result['breakdown'],
|
|
|
- 'extracted_keywords': seo_result.get('extracted_keywords', []),
|
|
|
- 'missing_high_value_terms': seo_result.get('missing_high_value_terms', [])
|
|
|
- }
|
|
|
-
|
|
|
- if content_rules_result:
|
|
|
- result['content_rules_details'] = {
|
|
|
- 'field_scores': content_rules_result['field_scores'],
|
|
|
- 'rules_applied': content_rules_result['rules_applied']
|
|
|
- }
|
|
|
-
|
|
|
- if title_desc_result:
|
|
|
- result['title_description_details'] = {
|
|
|
- 'title_breakdown': title_desc_result.get('title_breakdown', {}),
|
|
|
- 'description_breakdown': title_desc_result.get('description_breakdown', {}),
|
|
|
- 'ai_improvements': title_desc_result.get('ai_improvements')
|
|
|
- }
|
|
|
-
|
|
|
- # --- AI Suggestions (Comprehensive) ---
|
|
|
+
|
|
|
+ # AI Suggestions
|
|
|
if generate_ai_suggestions and self.use_ai and self.ai_service:
|
|
|
try:
|
|
|
- logger.info(f"Generating comprehensive AI suggestions for SKU: {product.get('sku')}")
|
|
|
- ai_suggestions = self.ai_service.generate_comprehensive_suggestions(
|
|
|
- product,
|
|
|
- issues,
|
|
|
- category_rules,
|
|
|
- scores
|
|
|
- )
|
|
|
+ ai_suggestions = self.ai_service.generate_comprehensive_suggestions(product, issues, category_rules, scores)
|
|
|
result['ai_suggestions'] = ai_suggestions
|
|
|
except Exception as e:
|
|
|
- logger.error(f"[AI Suggestions] Error: {e}", exc_info=True)
|
|
|
+ logger.error(f"[AI Suggestions] {e}", exc_info=True)
|
|
|
result['ai_suggestions'] = {'error': str(e)}
|
|
|
-
|
|
|
+
|
|
|
return result
|
|
|
-
|
|
|
- # ========== ATTRIBUTE VALIDATION METHODS (unchanged) ==========
|
|
|
-
|
|
|
- def _check_mandatory_fields(
|
|
|
- self,
|
|
|
- attributes: Dict,
|
|
|
- rules: List[Dict]
|
|
|
- ) -> Tuple[float, List[str], List[str]]:
|
|
|
- """Check if all mandatory fields are present and valid"""
|
|
|
+
|
|
|
+ # ================= ATTRIBUTE CHECK METHODS (unchanged) =================
|
|
|
+ def _check_mandatory_fields(self, attributes: Dict, rules: List[Dict]) -> Tuple[float, List[str], List[str]]:
|
|
|
mandatory_rules = [r for r in rules if r.get('is_mandatory', False)]
|
|
|
-
|
|
|
if not mandatory_rules:
|
|
|
return 100.0, [], []
|
|
|
-
|
|
|
- present_count = 0
|
|
|
- issues = []
|
|
|
- suggestions = []
|
|
|
-
|
|
|
+
|
|
|
+ present_count, issues, suggestions = 0, [], []
|
|
|
for rule in mandatory_rules:
|
|
|
attr_name = rule['attribute_name']
|
|
|
-
|
|
|
if attr_name in attributes and attributes[attr_name]:
|
|
|
value = str(attributes[attr_name]).strip()
|
|
|
-
|
|
|
if not value:
|
|
|
issues.append(f"Mandatory field '{attr_name}' is empty")
|
|
|
suggestions.append(f"Provide a non-empty value for {attr_name}")
|
|
|
continue
|
|
|
-
|
|
|
- # Check length constraints
|
|
|
min_len = rule.get('min_length')
|
|
|
max_len = rule.get('max_length')
|
|
|
-
|
|
|
if min_len and len(value) < min_len:
|
|
|
issues.append(f"'{attr_name}' too short (min: {min_len} chars)")
|
|
|
suggestions.append(f"Expand {attr_name} to at least {min_len} characters")
|
|
|
continue
|
|
|
-
|
|
|
if max_len and len(value) > max_len:
|
|
|
issues.append(f"'{attr_name}' too long (max: {max_len} chars)")
|
|
|
suggestions.append(f"Shorten {attr_name} to {max_len} characters or less")
|
|
|
continue
|
|
|
-
|
|
|
- # Check regex pattern if provided
|
|
|
regex = rule.get('validation_regex')
|
|
|
if regex:
|
|
|
try:
|
|
@@ -939,268 +847,97 @@ class AttributeQualityScorer:
|
|
|
continue
|
|
|
except re.error:
|
|
|
logger.warning(f"Invalid regex pattern for {attr_name}: {regex}")
|
|
|
-
|
|
|
present_count += 1
|
|
|
else:
|
|
|
issues.append(f"Missing mandatory field: {attr_name}")
|
|
|
desc = rule.get('description', '')
|
|
|
- if desc:
|
|
|
- suggestions.append(f"Add {attr_name}: {desc}")
|
|
|
- else:
|
|
|
- suggestions.append(f"Add required attribute: {attr_name}")
|
|
|
-
|
|
|
+ suggestions.append(f"Add {attr_name}: {desc}" if desc else f"Add required attribute: {attr_name}")
|
|
|
score = (present_count / len(mandatory_rules)) * 100 if mandatory_rules else 100.0
|
|
|
return score, issues, suggestions
|
|
|
-
|
|
|
- def _check_standardization(
|
|
|
- self,
|
|
|
- attributes: Dict,
|
|
|
- rules: List[Dict]
|
|
|
- ) -> Tuple[float, List[str], List[str]]:
|
|
|
- """Check if attribute values match standardized valid values"""
|
|
|
+
|
|
|
+ def _check_standardization(self, attributes: Dict, rules: List[Dict]) -> Tuple[float, List[str], List[str]]:
|
|
|
standardized_rules = [r for r in rules if r.get('valid_values')]
|
|
|
-
|
|
|
if not standardized_rules:
|
|
|
return 100.0, [], []
|
|
|
-
|
|
|
- correct_count = 0
|
|
|
- issues = []
|
|
|
- suggestions = []
|
|
|
-
|
|
|
+ correct_count, issues, suggestions = 0, [], []
|
|
|
for rule in standardized_rules:
|
|
|
attr_name = rule['attribute_name']
|
|
|
valid_values = rule['valid_values']
|
|
|
-
|
|
|
- if not valid_values:
|
|
|
+ if not valid_values or attr_name not in attributes or not attributes[attr_name]:
|
|
|
continue
|
|
|
-
|
|
|
- if attr_name not in attributes or not attributes[attr_name]:
|
|
|
- continue
|
|
|
-
|
|
|
actual_value = str(attributes[attr_name]).strip()
|
|
|
-
|
|
|
- if not actual_value:
|
|
|
- continue
|
|
|
-
|
|
|
- # Exact match
|
|
|
if actual_value in valid_values:
|
|
|
correct_count += 1
|
|
|
continue
|
|
|
-
|
|
|
- # Case-insensitive match
|
|
|
lower_valid = {v.lower(): v for v in valid_values}
|
|
|
if actual_value.lower() in lower_valid:
|
|
|
correct_count += 1
|
|
|
correct_value = lower_valid[actual_value.lower()]
|
|
|
if actual_value != correct_value:
|
|
|
- issues.append(
|
|
|
- f"{attr_name}: Case mismatch - '{actual_value}' should be '{correct_value}'"
|
|
|
- )
|
|
|
+ issues.append(f"{attr_name}: Case mismatch - '{actual_value}' should be '{correct_value}'")
|
|
|
suggestions.append(f"Correct capitalization of {attr_name} to: {correct_value}")
|
|
|
continue
|
|
|
-
|
|
|
- # Fuzzy matching
|
|
|
best_match = process.extractOne(actual_value, valid_values, scorer=fuzz.ratio)
|
|
|
-
|
|
|
if best_match and best_match[1] >= 80:
|
|
|
correct_count += 1
|
|
|
if best_match[1] < 100:
|
|
|
- issues.append(
|
|
|
- f"{attr_name}: '{actual_value}' likely means '{best_match[0]}' "
|
|
|
- f"(confidence: {best_match[1]}%)"
|
|
|
- )
|
|
|
+ issues.append(f"{attr_name}: '{actual_value}' likely means '{best_match[0]}' (confidence: {best_match[1]}%)")
|
|
|
suggestions.append(f"Standardize {attr_name} to: {best_match[0]}")
|
|
|
else:
|
|
|
- issues.append(
|
|
|
- f"{attr_name}: '{actual_value}' not recognized. "
|
|
|
- f"Valid: {', '.join(valid_values[:3])}"
|
|
|
- )
|
|
|
+ issues.append(f"{attr_name}: '{actual_value}' not recognized. Valid: {', '.join(valid_values[:3])}")
|
|
|
suggestions.append(f"Change {attr_name} to one of: {', '.join(valid_values[:3])}")
|
|
|
-
|
|
|
score = (correct_count / len(standardized_rules)) * 100 if standardized_rules else 100.0
|
|
|
return score, issues, suggestions
|
|
|
-
|
|
|
- def _check_missing_values(
|
|
|
- self,
|
|
|
- attributes: Dict,
|
|
|
- rules: List[Dict]
|
|
|
- ) -> Tuple[float, List[str], List[str]]:
|
|
|
- """Check for placeholder values or empty attributes"""
|
|
|
- placeholder_patterns = [
|
|
|
- r'^n/?a$', r'^none$', r'^null$', r'^-+$', r'^\.+$',
|
|
|
- r'^tbd$', r'^to be determined$', r'^unknown$', r'^na$',
|
|
|
- r'^todo$', r'^pending$', r'^\?+$', r'^xxx+$', r'^placeholder$'
|
|
|
- ]
|
|
|
-
|
|
|
- total_attrs = len(rules)
|
|
|
- valid_count = 0
|
|
|
- issues = []
|
|
|
- suggestions = []
|
|
|
-
|
|
|
+
|
|
|
+ def _check_missing_values(self, attributes: Dict, rules: List[Dict]) -> Tuple[float, List[str], List[str]]:
|
|
|
+ placeholder_patterns = [r'^n/?a$', r'^none$', r'^null$', r'^-+$', r'^\.+$', r'^tbd$', r'^to be determined$', r'^unknown$', r'^na$', r'^todo$', r'^pending$', r'^\?+$', r'^xxx+$', r'^placeholder$']
|
|
|
+ total_attrs, valid_count, issues, suggestions = len(rules), 0, [], []
|
|
|
for rule in rules:
|
|
|
attr_name = rule['attribute_name']
|
|
|
-
|
|
|
if attr_name not in attributes:
|
|
|
continue
|
|
|
-
|
|
|
value = str(attributes[attr_name]).strip()
|
|
|
-
|
|
|
if not value:
|
|
|
issues.append(f"'{attr_name}' is empty")
|
|
|
suggestions.append(f"Provide a valid value for {attr_name}")
|
|
|
continue
|
|
|
-
|
|
|
value_lower = value.lower()
|
|
|
- is_placeholder = any(
|
|
|
- re.match(pattern, value_lower, re.IGNORECASE)
|
|
|
- for pattern in placeholder_patterns
|
|
|
- )
|
|
|
-
|
|
|
+ is_placeholder = any(re.match(pattern, value_lower, re.IGNORECASE) for pattern in placeholder_patterns)
|
|
|
if is_placeholder:
|
|
|
issues.append(f"'{attr_name}' contains placeholder: '{value}'")
|
|
|
suggestions.append(f"Replace placeholder in {attr_name} with actual data")
|
|
|
continue
|
|
|
-
|
|
|
- # Check for suspiciously short values on mandatory fields
|
|
|
if rule.get('is_mandatory') and len(value) < 2:
|
|
|
issues.append(f"'{attr_name}' suspiciously short: '{value}'")
|
|
|
suggestions.append(f"Provide more detailed {attr_name}")
|
|
|
continue
|
|
|
-
|
|
|
valid_count += 1
|
|
|
-
|
|
|
score = (valid_count / total_attrs) * 100 if total_attrs > 0 else 100.0
|
|
|
return score, issues, suggestions
|
|
|
-
|
|
|
- def _check_consistency(
|
|
|
- self,
|
|
|
- attributes: Dict,
|
|
|
- title: str,
|
|
|
- description: str
|
|
|
- ) -> Tuple[float, List[str]]:
|
|
|
- """Check if attribute values are mentioned in title/description"""
|
|
|
- issues = []
|
|
|
- consistency_count = 0
|
|
|
- total_checks = 0
|
|
|
-
|
|
|
+
|
|
|
+ def _check_consistency(self, attributes: Dict, title: str, description: str) -> Tuple[float, List[str]]:
|
|
|
+ issues, consistency_count, total_checks = [], 0, 0
|
|
|
check_attrs = ['brand', 'color', 'size', 'material', 'model', 'weight', 'dimensions']
|
|
|
combined_text = f"{title} {description}".lower()
|
|
|
-
|
|
|
for attr in check_attrs:
|
|
|
if attr not in attributes or not attributes[attr]:
|
|
|
continue
|
|
|
-
|
|
|
total_checks += 1
|
|
|
attr_value = str(attributes[attr]).lower().strip()
|
|
|
-
|
|
|
- if len(attr_value) < 2:
|
|
|
- consistency_count += 1
|
|
|
- continue
|
|
|
-
|
|
|
- if attr_value in combined_text:
|
|
|
+ if len(attr_value) < 2 or attr_value in combined_text:
|
|
|
consistency_count += 1
|
|
|
continue
|
|
|
-
|
|
|
words_in_attr = set(attr_value.split())
|
|
|
words_in_text = set(combined_text.split())
|
|
|
-
|
|
|
if words_in_attr.issubset(words_in_text):
|
|
|
consistency_count += 1
|
|
|
continue
|
|
|
-
|
|
|
text_words = combined_text.split()
|
|
|
if text_words:
|
|
|
match = process.extractOne(attr_value, text_words, scorer=fuzz.ratio)
|
|
|
-
|
|
|
if match and match[1] >= 80:
|
|
|
consistency_count += 1
|
|
|
continue
|
|
|
-
|
|
|
- issues.append(
|
|
|
- f"'{attr.title()}': '{attributes[attr]}' not mentioned in title/description"
|
|
|
- )
|
|
|
-
|
|
|
+ issues.append(f"'{attr.title()}': '{attributes[attr]}' not mentioned in title/description")
|
|
|
score = (consistency_count / total_checks) * 100 if total_checks > 0 else 100.0
|
|
|
return score, issues
|
|
|
-
|
|
|
- # ========== UTILITY METHODS (unchanged) ==========
|
|
|
-
|
|
|
- def extract_attributes_from_text(self, text: str, category: str = '') -> Dict:
|
|
|
- """Extract attributes from unstructured text using NER and patterns"""
|
|
|
- extracted = {}
|
|
|
-
|
|
|
- if self.nlp:
|
|
|
- try:
|
|
|
- doc = self.nlp(text)
|
|
|
- orgs = [ent.text for ent in doc.ents if ent.label_ == 'ORG']
|
|
|
- if orgs:
|
|
|
- extracted['brand'] = orgs[0]
|
|
|
- quantities = [ent.text for ent in doc.ents if ent.label_ == 'QUANTITY']
|
|
|
- if quantities:
|
|
|
- extracted['size'] = quantities[0]
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"spaCy extraction failed: {e}")
|
|
|
-
|
|
|
- # Pattern-based extraction for colors
|
|
|
- color_patterns = [
|
|
|
- 'black', 'white', 'red', 'blue', 'green', 'yellow', 'orange',
|
|
|
- 'purple', 'pink', 'brown', 'gray', 'grey', 'silver', 'gold',
|
|
|
- 'rose gold', 'space gray', 'navy', 'beige', 'tan'
|
|
|
- ]
|
|
|
- text_lower = text.lower()
|
|
|
- for color in color_patterns:
|
|
|
- if color in text_lower:
|
|
|
- extracted['color'] = color.title()
|
|
|
- break
|
|
|
-
|
|
|
- # Use AI for enhanced extraction if available
|
|
|
- if self.use_ai and self.ai_service and len(extracted) < 3:
|
|
|
- try:
|
|
|
- ai_extracted = self.ai_service.extract_attributes_with_ai(
|
|
|
- text, '', category
|
|
|
- )
|
|
|
- for key, value in ai_extracted.items():
|
|
|
- if key not in extracted and value:
|
|
|
- extracted[key] = value
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"AI extraction failed: {e}")
|
|
|
-
|
|
|
- return extracted
|
|
|
-
|
|
|
- def get_score_interpretation(self, score: float) -> Dict[str, str]:
|
|
|
- """Get human-readable interpretation of score"""
|
|
|
- if score >= 90:
|
|
|
- return {
|
|
|
- 'grade': 'A',
|
|
|
- 'status': 'Excellent',
|
|
|
- 'color': 'green',
|
|
|
- 'recommendation': 'Product listing is of high quality. Minor tweaks only.'
|
|
|
- }
|
|
|
- elif score >= 80:
|
|
|
- return {
|
|
|
- 'grade': 'B',
|
|
|
- 'status': 'Good',
|
|
|
- 'color': 'lightgreen',
|
|
|
- 'recommendation': 'Good quality. Address minor issues to reach excellence.'
|
|
|
- }
|
|
|
- elif score >= 70:
|
|
|
- return {
|
|
|
- 'grade': 'C',
|
|
|
- 'status': 'Fair',
|
|
|
- 'color': 'yellow',
|
|
|
- 'recommendation': 'Acceptable but needs improvement. Review suggestions.'
|
|
|
- }
|
|
|
- elif score >= 60:
|
|
|
- return {
|
|
|
- 'grade': 'D',
|
|
|
- 'status': 'Poor',
|
|
|
- 'color': 'orange',
|
|
|
- 'recommendation': 'Significant issues found. Requires immediate attention.'
|
|
|
- }
|
|
|
- else:
|
|
|
- return {
|
|
|
- 'grade': 'F',
|
|
|
- 'status': 'Critical',
|
|
|
- 'color': 'red',
|
|
|
- 'recommendation': 'Critical quality issues. Major revision needed.'
|
|
|
- }
|