# import google.generativeai as genai # import json # import logging # from typing import Dict, List # from django.conf import settings # from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type # logger = logging.getLogger(__name__) # class GeminiAttributeService: # """Service to interact with Google Gemini API for attribute suggestions""" # def __init__(self): # # Configure Gemini API # api_key = getattr(settings, 'GEMINI_API_KEY', None) # if not api_key: # raise ValueError("GEMINI_API_KEY not found in settings") # genai.configure(api_key=api_key) # self.model = genai.GenerativeModel('gemini-2.5-flash') # @retry( # stop=stop_after_attempt(3), # wait=wait_exponential(multiplier=1, min=2, max=10), # retry=retry_if_exception_type(Exception), # before_sleep=lambda retry_state: logger.info(f"Retrying Gemini API call, attempt {retry_state.attempt_number}") # ) # def _call_gemini_api(self, prompt): # """Helper method to call Gemini API with retry logic""" # return self.model.generate_content( # prompt, # generation_config=genai.types.GenerationConfig( # temperature=0.3, # top_p=0.95, # top_k=40, # max_output_tokens=4096 # Increased to handle complex responses # ), # safety_settings={ # genai.types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: genai.types.HarmBlockThreshold.BLOCK_NONE, # genai.types.HarmCategory.HARM_CATEGORY_HARASSMENT: genai.types.HarmBlockThreshold.BLOCK_NONE, # genai.types.HarmCategory.HARM_CATEGORY_HATE_SPEECH: genai.types.HarmBlockThreshold.BLOCK_NONE, # genai.types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: genai.types.HarmBlockThreshold.BLOCK_NONE # } # ) # def generate_attribute_suggestions( # self, # product: Dict, # issues: List[str], # category_rules: List[Dict] # ) -> Dict: # """ # Use Gemini to generate intelligent suggestions for fixing attribute issues # ... # """ # try: # prompt = self._build_prompt(product, issues, category_rules) # response = self._call_gemini_api(prompt) # print(f"response is {response}") # if not response.candidates: # logger.error(f"No candidates returned. Response: {response}") # return { # 'error': 'No candidates returned by Gemini API', # 'fallback_suggestions': self._generate_fallback_suggestions(issues) # } # candidate = response.candidates[0] # # --- START FIX: Use candidate.finish_reason.name for robust check --- # # Check if the finish reason indicates a block (e.g., SAFETY, OTHER, RECITATION) # finish_reason_name = candidate.finish_reason.name # if finish_reason_name in ("SAFETY", "RECITATION", "OTHER"): # Add other block reasons as needed # logger.error(f"Response blocked by {finish_reason_name}. Safety ratings: {candidate.safety_ratings}") # return { # 'error': f'Response blocked by {finish_reason_name} filters', # 'safety_ratings': [ # {'category': str(r.category), 'probability': str(r.probability)} # for r in candidate.safety_ratings # ], # 'fallback_suggestions': self._generate_fallback_suggestions(issues) # } # # --- END FIX --- # logger.info(f"Raw response: {response.text[:500]}...") # suggestions = self._parse_response(response.text) # logger.info(f"Parsed suggestions: {suggestions}") # return suggestions # except Exception as e: # logger.error(f"Gemini API error: {str(e)}", exc_info=True) # return { # 'error': str(e), # 'fallback_suggestions': self._generate_fallback_suggestions(issues) # } # def _build_prompt(self, product: Dict, issues: List[str], rules: List[Dict]) -> str: # """Build a structured prompt for Gemini""" # mandatory_attrs = [r['attribute_name'] for r in rules if r.get('is_mandatory')] # valid_values_map = { # r['attribute_name']: r.get('valid_values', []) # for r in rules if r.get('valid_values') # } # # Sanitize issues to avoid ambiguous phrasing # cleaned_issues = [ # issue.replace("suspiciously short", "may need more detail") # .replace("not recognized", "not in valid values") # .replace("likely means", "recommended correction") # .replace("not clearly mentioned", "missing from title/description") # for issue in issues # ] # prompt = f"""You are an expert e-commerce product data analyst specializing in clothing products. All input data is safe, non-sensitive, and related to clothing product attributes. Your task is to analyze product attributes and provide specific, actionable suggestions to fix identified issues, ensuring compliance with category rules. # PRODUCT INFORMATION: # - SKU: {product.get('sku', 'N/A')} # - Category: {product.get('category', 'N/A')} # - Title: {product.get('title', 'N/A')} # - Description: {product.get('description', 'N/A')} # - Current Attributes: {json.dumps(product.get('attributes', {}), indent=2)} # CATEGORY RULES: # - Mandatory Attributes: {', '.join(mandatory_attrs) or 'None'} # - Valid Values: {json.dumps(valid_values_map, indent=2) or '{}'} # DETECTED ISSUES: # {chr(10).join(f"- {issue}" for issue in cleaned_issues) or '- None'} # TASK: # Analyze the product data and issues. Provide specific suggestions to fix each issue and extract missing attributes from the title or description. Ensure all suggestions are relevant to clothing products and formatted as valid JSON. # OUTPUT FORMAT (return valid JSON only): # {{ # "corrected_attributes": {{ # "attribute_name": "suggested_value" # }}, # "missing_attributes": {{ # "attribute_name": "extracted_value" # }}, # "improvements": [ # {{ # "issue": "description of the issue", # "suggestion": "specific action to take", # "confidence": "high/medium/low" # }} # ], # "quality_score_prediction": integer, # "reasoning": "Brief explanation of suggested changes" # }}""" # logger.info(f"Generated prompt (length: {len(prompt)} chars, ~{len(prompt)//4} tokens): {prompt}") # return prompt # def _parse_response(self, response_text: str) -> Dict: # """Parse Gemini's response and extract JSON""" # try: # # Remove markdown code blocks and language identifier # cleaned = response_text.strip() # if cleaned.startswith('```'): # cleaned = cleaned.split('```')[1].strip() # if cleaned.startswith('json'): # cleaned = cleaned[4:].strip() # # Attempt to parse JSON # parsed = json.loads(cleaned) # return parsed # except json.JSONDecodeError as e: # logger.error(f"Failed to parse Gemini response: {e}") # logger.error(f"Response was: {response_text[:1000]}...") # # Attempt to fix partial JSON # try: # # Truncate at last valid closing brace # last_valid = cleaned.rfind('}') # if last_valid != -1: # partial_json = cleaned[:last_valid + 1] # parsed = json.loads(partial_json) # logger.warning("Parsed partial JSON response") # return parsed # except json.JSONDecodeError: # logger.error("Could not parse partial JSON") # return { # 'error': 'Failed to parse AI response', # 'raw_response': response_text, # 'fallback_suggestions': [] # } # def _generate_fallback_suggestions(self, issues: List[str]) -> List[Dict]: # """Generate enhanced fallback suggestions based on issues""" # suggestions = [] # for issue in issues: # suggestion = "Please review and correct this issue manually" # confidence = "low" # # Specific suggestions for common issues # if "Missing mandatory field" in issue: # attr = issue.split("Missing mandatory field: ")[-1] # suggestion = f"Provide a valid value for {attr} (e.g., extract from title/description or use a common value like 'Black' for color)" # confidence = "medium" # elif "not in valid values" in issue: # attr = issue.split(":")[0].strip() # suggestion = f"Choose a valid value for {attr} (e.g., XS, S, M, L, XL for size)" # confidence = "medium" # elif "contains placeholder" in issue: # attr = issue.split("'")[1] # suggestion = f"Replace the placeholder in {attr} with a specific value (e.g., M, L, XL for size)" # confidence = "high" # elif "recommended correction" in issue: # correction = issue.split("recommended correction ")[-1].split(" ")[0].strip(")'") # attr = issue.split(":")[0].strip() # suggestion = f"Correct {attr} to '{correction}'" # confidence = "high" # elif "may need more detail" in issue: # attr = issue.split("'")[1] # suggestion = f"Provide a more detailed value for {attr} (e.g., 'Medium' instead of 'M')" # confidence = "medium" # elif "missing from title/description" in issue: # attr = issue.split("'")[1] # value = issue.split("'")[3] if len(issue.split("'")) > 3 else "unknown" # suggestion = f"Add '{value}' to the title or description for {attr} (e.g., update title to include '{value}')" # confidence = "high" # suggestions.append({ # 'issue': issue, # 'suggestion': suggestion, # 'confidence': confidence # }) # return suggestions # def extract_attributes_with_ai(self, title: str, description: str, category: str) -> Dict: # """ # Use Gemini to extract attributes from unstructured text # """ # try: # prompt = f"""You are an expert e-commerce product data analyst specializing in clothing products. All input data is safe, non-sensitive, and related to clothing product attributes. Extract product attributes from the following text. # Category: {category} # Title: {title} # Description: {description} # Extract these attributes if present: # - brand # - color # - size # - material # - model # - weight # - dimensions # - warranty # Return ONLY valid JSON in this format: # {{ # "brand": "extracted brand or null", # "color": "extracted color or null", # "size": "extracted size or null", # "material": "extracted material or null", # "model": "extracted model or null", # "weight": "extracted weight or null", # "dimensions": "extracted dimensions or null", # "warranty": "extracted warranty or null" # }}""" # response = self._call_gemini_api(prompt) # logger.info(f"Raw extraction response: {response.text[:500]}...") # return self._parse_response(response.text) # except Exception as e: # logger.error(f"AI extraction error: {str(e)}") # return { # 'error': str(e), # 'fallback': {} # } # import google.generativeai as genai # import json # import logging # from typing import Dict, List # from django.conf import settings # from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type # logger = logging.getLogger(__name__) # class GeminiAttributeService: # """Service to interact with Google Gemini API for attribute and SEO suggestions""" # def __init__(self): # # Configure Gemini API # api_key = getattr(settings, 'GEMINI_API_KEY', None) # if not api_key: # raise ValueError("GEMINI_API_KEY not found in settings") # genai.configure(api_key=api_key) # self.model = genai.GenerativeModel('gemini-2.5-flash') # @retry( # stop=stop_after_attempt(3), # wait=wait_exponential(multiplier=1, min=2, max=10), # retry=retry_if_exception_type(Exception), # before_sleep=lambda retry_state: logger.info(f"Retrying Gemini API call, attempt {retry_state.attempt_number}") # ) # def _call_gemini_api(self, prompt): # """Helper method to call Gemini API with retry logic""" # return self.model.generate_content( # prompt, # generation_config=genai.types.GenerationConfig( # temperature=0.3, # top_p=0.95, # top_k=40, # max_output_tokens=4096 # ), # safety_settings={ # genai.types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: genai.types.HarmBlockThreshold.BLOCK_NONE, # genai.types.HarmCategory.HARM_CATEGORY_HARASSMENT: genai.types.HarmBlockThreshold.BLOCK_NONE, # genai.types.HarmCategory.HARM_CATEGORY_HATE_SPEECH: genai.types.HarmBlockThreshold.BLOCK_NONE, # genai.types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: genai.types.HarmBlockThreshold.BLOCK_NONE # } # ) # def generate_attribute_suggestions( # self, # product: Dict, # issues: List[str], # category_rules: List[Dict] # ) -> Dict: # """ # Use Gemini to generate intelligent suggestions for fixing attribute issues # Includes SEO-aware recommendations # """ # try: # prompt = self._build_prompt(product, issues, category_rules) # response = self._call_gemini_api(prompt) # if not response.candidates: # logger.error(f"No candidates returned. Response: {response}") # return { # 'error': 'No candidates returned by Gemini API', # 'fallback_suggestions': self._generate_fallback_suggestions(issues) # } # candidate = response.candidates[0] # finish_reason_name = candidate.finish_reason.name # if finish_reason_name in ("SAFETY", "RECITATION", "OTHER"): # logger.error(f"Response blocked by {finish_reason_name}. Safety ratings: {candidate.safety_ratings}") # return { # 'error': f'Response blocked by {finish_reason_name} filters', # 'safety_ratings': [ # {'category': str(r.category), 'probability': str(r.probability)} # for r in candidate.safety_ratings # ], # 'fallback_suggestions': self._generate_fallback_suggestions(issues) # } # logger.info(f"Raw response: {response.text[:500]}...") # suggestions = self._parse_response(response.text) # logger.info(f"Parsed suggestions: {suggestions}") # return suggestions # except Exception as e: # logger.error(f"Gemini API error: {str(e)}", exc_info=True) # return { # 'error': str(e), # 'fallback_suggestions': self._generate_fallback_suggestions(issues) # } # def _build_prompt(self, product: Dict, issues: List[str], rules: List[Dict]) -> str: # """Build a structured prompt for Gemini with SEO awareness""" # mandatory_attrs = [r['attribute_name'] for r in rules if r.get('is_mandatory')] # valid_values_map = { # r['attribute_name']: r.get('valid_values', []) # for r in rules if r.get('valid_values') # } # # Sanitize issues # cleaned_issues = [ # issue.replace("suspiciously short", "may need more detail") # .replace("not recognized", "not in valid values") # .replace("likely means", "recommended correction") # .replace("not clearly mentioned", "missing from title/description") # for issue in issues # ] # # Separate SEO issues # seo_issues = [i for i in cleaned_issues if i.startswith("SEO:")] # attribute_issues = [i for i in cleaned_issues if not i.startswith("SEO:")] # prompt = f"""You are an expert e-commerce product data analyst specializing in clothing products and SEO optimization. All input data is safe, non-sensitive, and related to clothing product attributes. Your task is to analyze product attributes and provide specific, actionable suggestions to fix identified issues, ensuring compliance with category rules and SEO best practices. # PRODUCT INFORMATION: # - SKU: {product.get('sku', 'N/A')} # - Category: {product.get('category', 'N/A')} # - Title: {product.get('title', 'N/A')} # - Description: {product.get('description', 'N/A')} # - Current Attributes: {json.dumps(product.get('attributes', {}), indent=2)} # CATEGORY RULES: # - Mandatory Attributes: {', '.join(mandatory_attrs) or 'None'} # - Valid Values: {json.dumps(valid_values_map, indent=2) or '{}'} # DETECTED ATTRIBUTE ISSUES: # {chr(10).join(f"- {issue}" for issue in attribute_issues) or '- None'} # DETECTED SEO ISSUES: # {chr(10).join(f"- {issue}" for issue in seo_issues) or '- None'} # TASK: # 1. Analyze the product data and fix all attribute issues # 2. Provide SEO-optimized recommendations for title and description # 3. Suggest missing attributes that can be extracted from title/description # 4. Ensure all suggestions improve both data quality AND discoverability # SEO GUIDELINES: # - Title should be 50-100 characters with key attributes (brand, model, color, size) # - Description should be 50-150 words, descriptive, and include relevant keywords # - Include high-value search terms where appropriate (e.g., "premium", "durable", "authentic") # - Avoid keyword stuffing - keep text natural and readable # OUTPUT FORMAT (return valid JSON only): # {{ # "corrected_attributes": {{ # "attribute_name": "suggested_value" # }}, # "missing_attributes": {{ # "attribute_name": "extracted_value" # }}, # "seo_optimizations": {{ # "optimized_title": "SEO-friendly title suggestion (if title needs improvement)", # "optimized_description": "SEO-friendly description suggestion (if description needs improvement)", # "recommended_keywords": ["keyword1", "keyword2"], # "title_improvements": "Specific changes for title", # "description_improvements": "Specific changes for description" # }}, # "improvements": [ # {{ # "issue": "description of the issue", # "suggestion": "specific action to take", # "confidence": "high/medium/low", # "type": "attribute/seo" # }} # ], # "quality_score_prediction": integer (0-100), # "reasoning": "Brief explanation of suggested changes and expected impact on discoverability" # }}""" # logger.info(f"Generated prompt (length: {len(prompt)} chars)") # return prompt # def generate_seo_suggestions(self, product: Dict, seo_issues: List[str]) -> Dict: # """ # Generate SEO-specific suggestions using Gemini # Focused prompt for SEO optimization only # """ # try: # prompt = f"""You are an SEO expert for e-commerce products. Analyze this product and provide SEO optimization suggestions. # PRODUCT: # - Title: {product.get('title', '')} # - Description: {product.get('description', '')} # - Category: {product.get('category', '')} # - Attributes: {json.dumps(product.get('attributes', {}), indent=2)} # SEO ISSUES DETECTED: # {chr(10).join(f"- {issue}" for issue in seo_issues)} # TASK: # Provide specific, actionable SEO improvements focusing on: # 1. Keyword optimization (include relevant search terms) # 2. Title structure (50-100 chars, include key attributes) # 3. Description quality (50-150 words, descriptive, engaging) # 4. Searchability (ensure users can find this product) # Return ONLY valid JSON: # {{ # "optimized_title": "Improved title with better SEO", # "optimized_description": "Improved description with better SEO", # "recommended_keywords": ["keyword1", "keyword2", "keyword3"], # "changes_made": [ # "Specific change 1", # "Specific change 2" # ], # "expected_improvement": "Brief explanation of SEO impact" # }}""" # response = self._call_gemini_api(prompt) # if not response.candidates: # return {'error': 'No SEO suggestions generated'} # candidate = response.candidates[0] # if candidate.finish_reason.name in ("SAFETY", "RECITATION", "OTHER"): # return {'error': f'Response blocked: {candidate.finish_reason.name}'} # return self._parse_response(response.text) # except Exception as e: # logger.error(f"SEO suggestion error: {e}") # return {'error': str(e)} # def _parse_response(self, response_text: str) -> Dict: # """Parse Gemini's response and extract JSON""" # try: # # Remove markdown code blocks and language identifier # cleaned = response_text.strip() # if cleaned.startswith('```'): # cleaned = cleaned.split('```')[1].strip() # if cleaned.startswith('json'): # cleaned = cleaned[4:].strip() # # Attempt to parse JSON # parsed = json.loads(cleaned) # return parsed # except json.JSONDecodeError as e: # logger.error(f"Failed to parse Gemini response: {e}") # logger.error(f"Response was: {response_text[:1000]}...") # # Attempt to fix partial JSON # try: # # Truncate at last valid closing brace # last_valid = cleaned.rfind('}') # if last_valid != -1: # partial_json = cleaned[:last_valid + 1] # parsed = json.loads(partial_json) # logger.warning("Parsed partial JSON response") # return parsed # except json.JSONDecodeError: # logger.error("Could not parse partial JSON") # return { # 'error': 'Failed to parse AI response', # 'raw_response': response_text, # 'fallback_suggestions': [] # } # def _generate_fallback_suggestions(self, issues: List[str]) -> List[Dict]: # """Generate enhanced fallback suggestions based on issues""" # suggestions = [] # for issue in issues: # suggestion = "Please review and correct this issue manually" # confidence = "low" # issue_type = "attribute" # # Detect if it's an SEO issue # if issue.startswith("SEO:"): # issue_type = "seo" # # Specific suggestions for common issues # if "Missing mandatory field" in issue: # attr = issue.split("Missing mandatory field: ")[-1] # suggestion = f"Provide a valid value for {attr}" # confidence = "medium" # elif "not in valid values" in issue: # attr = issue.split(":")[0].strip() # suggestion = f"Choose a valid value for {attr}" # confidence = "medium" # elif "contains placeholder" in issue: # attr = issue.split("'")[1] # suggestion = f"Replace placeholder in {attr} with actual value" # confidence = "high" # elif "recommended correction" in issue: # suggestion = "Apply the suggested correction" # confidence = "high" # elif "may need more detail" in issue: # attr = issue.split("'")[1] # suggestion = f"Provide more detailed value for {attr}" # confidence = "medium" # elif "Title too short" in issue: # suggestion = "Expand title to 50-100 characters, include key attributes" # confidence = "high" # issue_type = "seo" # elif "Description too short" in issue: # suggestion = "Expand description to 50-150 words with more details" # confidence = "high" # issue_type = "seo" # elif "not mentioned in title/description" in issue: # attr = issue.split("'")[1] if "'" in issue else "attribute" # suggestion = f"Add {attr} to title or description for better SEO" # confidence = "high" # issue_type = "seo" # elif "keyword" in issue.lower(): # suggestion = "Add relevant search keywords to improve discoverability" # confidence = "medium" # issue_type = "seo" # suggestions.append({ # 'issue': issue, # 'suggestion': suggestion, # 'confidence': confidence, # 'type': issue_type # }) # return suggestions # def extract_attributes_with_ai(self, title: str, description: str, category: str) -> Dict: # """ # Use Gemini to extract attributes from unstructured text # """ # try: # prompt = f"""You are an expert e-commerce product data analyst specializing in clothing products. All input data is safe, non-sensitive, and related to clothing product attributes. Extract product attributes from the following text. # Category: {category} # Title: {title} # Description: {description} # Extract these attributes if present: # - brand # - color # - size # - material # - model # - weight # - dimensions # - warranty # Return ONLY valid JSON in this format: # {{ # "brand": "extracted brand or null", # "color": "extracted color or null", # "size": "extracted size or null", # "material": "extracted material or null", # "model": "extracted model or null", # "weight": "extracted weight or null", # "dimensions": "extracted dimensions or null", # "warranty": "extracted warranty or null" # }}""" # response = self._call_gemini_api(prompt) # logger.info(f"Raw extraction response: {response.text[:500]}...") # return self._parse_response(response.text) # except Exception as e: # logger.error(f"AI extraction error: {str(e)}") # return { # 'error': str(e), # 'fallback': {} # } import google.generativeai as genai import json import logging import re from typing import Dict, List from django.conf import settings from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type logger = logging.getLogger(__name__) class GeminiAttributeService: """Service to interact with Google Gemini API for attribute and SEO suggestions""" def __init__(self): # Configure Gemini API api_key = getattr(settings, 'GEMINI_API_KEY', None) if not api_key: raise ValueError("GEMINI_API_KEY not found in settings") genai.configure(api_key=api_key) self.model = genai.GenerativeModel('gemini-2.0-flash-exp') # Use latest model @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10), retry=retry_if_exception_type(Exception), before_sleep=lambda retry_state: logger.info(f"Retrying Gemini API call, attempt {retry_state.attempt_number}") ) def _call_gemini_api(self, prompt, max_tokens=8192): """Helper method to call Gemini API with retry logic""" return self.model.generate_content( prompt, generation_config=genai.types.GenerationConfig( temperature=0.2, # Lower for more consistent JSON top_p=0.9, top_k=40, max_output_tokens=max_tokens, # Increased default response_mime_type="application/json" # Force JSON output ), safety_settings={ genai.types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: genai.types.HarmBlockThreshold.BLOCK_NONE, genai.types.HarmCategory.HARM_CATEGORY_HARASSMENT: genai.types.HarmBlockThreshold.BLOCK_NONE, genai.types.HarmCategory.HARM_CATEGORY_HATE_SPEECH: genai.types.HarmBlockThreshold.BLOCK_NONE, genai.types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: genai.types.HarmBlockThreshold.BLOCK_NONE } ) def generate_attribute_suggestions( self, product: Dict, issues: List[str], category_rules: List[Dict] ) -> Dict: """ Use Gemini to generate intelligent suggestions for fixing attribute issues Includes SEO-aware recommendations with robust error handling """ try: # Limit issues to prevent prompt overflow limited_issues = issues[:15] if len(issues) > 15 else issues prompt = self._build_prompt(product, limited_issues, category_rules) response = self._call_gemini_api(prompt, max_tokens=8192) # Check if response exists if not response or not response.candidates: logger.error(f"No candidates returned for SKU: {product.get('sku')}") return { 'error': 'No candidates returned by Gemini API', 'fallback_suggestions': self._generate_fallback_suggestions(limited_issues) } candidate = response.candidates[0] finish_reason_name = candidate.finish_reason.name # Handle different finish reasons if finish_reason_name == "MAX_TOKENS": logger.warning(f"Max tokens reached for SKU: {product.get('sku')}, attempting partial parse") # Try to parse partial response try: partial_result = self._parse_response(response.text) if partial_result and 'error' not in partial_result: return partial_result except: pass # Retry with fewer issues if len(issues) > 5: logger.info("Retrying with fewer issues") return self.generate_attribute_suggestions(product, issues[:5], category_rules) else: return { 'error': 'Response too long, using fallback', 'fallback_suggestions': self._generate_fallback_suggestions(limited_issues) } elif finish_reason_name in ("SAFETY", "RECITATION", "OTHER"): logger.error(f"Response blocked by {finish_reason_name} for SKU: {product.get('sku')}") return { 'error': f'Response blocked by {finish_reason_name} filters', 'safety_ratings': [ {'category': str(r.category), 'probability': str(r.probability)} for r in candidate.safety_ratings ], 'fallback_suggestions': self._generate_fallback_suggestions(limited_issues) } elif finish_reason_name != "STOP": logger.warning(f"Unexpected finish reason: {finish_reason_name}") return { 'error': f'Unexpected finish reason: {finish_reason_name}', 'fallback_suggestions': self._generate_fallback_suggestions(limited_issues) } # Parse successful response logger.info(f"Successfully received response for SKU: {product.get('sku')}") suggestions = self._parse_response(response.text) if 'error' in suggestions: logger.warning(f"Parse error for SKU: {product.get('sku')}, using fallback") suggestions['fallback_suggestions'] = self._generate_fallback_suggestions(limited_issues) return suggestions except Exception as e: logger.error(f"Gemini API error for SKU {product.get('sku')}: {str(e)}", exc_info=True) return { 'error': str(e), 'fallback_suggestions': self._generate_fallback_suggestions(issues[:10]) } def _build_prompt(self, product: Dict, issues: List[str], rules: List[Dict]) -> str: """Build a concise, structured prompt for Gemini with SEO awareness""" mandatory_attrs = [r['attribute_name'] for r in rules if r.get('is_mandatory')] valid_values_map = { r['attribute_name']: r.get('valid_values', [])[:5] # Limit to 5 values for r in rules if r.get('valid_values') } # Sanitize and categorize issues cleaned_issues = [ issue.replace("suspiciously short", "short value") .replace("not recognized", "invalid") .replace("likely means", "should be") .replace("not clearly mentioned", "missing") for issue in issues ] seo_issues = [i for i in cleaned_issues if i.startswith("SEO:")][:5] attribute_issues = [i for i in cleaned_issues if not i.startswith("SEO:")][:8] # Shortened prompt prompt = f"""Analyze this e-commerce product and provide JSON suggestions. PRODUCT: SKU: {product.get('sku')} Category: {product.get('category')} Title: {product.get('title', '')[:200]} Description: {product.get('description', '')[:300]} Attributes: {json.dumps(product.get('attributes', {}), ensure_ascii=False)} RULES: Mandatory: {', '.join(mandatory_attrs)} Valid Values: {json.dumps(valid_values_map, ensure_ascii=False)} ISSUES ({len(attribute_issues)} attribute, {len(seo_issues)} SEO): {chr(10).join(f"• {i}" for i in attribute_issues[:8])} {chr(10).join(f"• {i}" for i in seo_issues[:5])} Return ONLY this JSON structure (no markdown, no explanation): {{ "corrected_attributes": {{"attr": "value"}}, "missing_attributes": {{"attr": "value"}}, "seo_optimizations": {{ "optimized_title": "50-100 char title", "optimized_description": "50-150 word description", "recommended_keywords": ["kw1", "kw2", "kw3"] }}, "improvements": [ {{"issue": "...", "suggestion": "...", "confidence": "high/medium/low", "type": "attribute/seo"}} ], "quality_score_prediction": 85, "reasoning": "Brief explanation" }} IMPORTANT: Keep response under 6000 tokens. Prioritize top 3 most critical improvements.""" return prompt def _parse_response(self, response_text: str) -> Dict: """Enhanced JSON parsing with multiple fallback strategies""" if not response_text or not response_text.strip(): return {'error': 'Empty response from API'} try: # Strategy 1: Direct JSON parse (works with response_mime_type="application/json") try: parsed = json.loads(response_text) logger.info("Successfully parsed JSON directly") return parsed except json.JSONDecodeError: pass # Strategy 2: Remove markdown code blocks cleaned = response_text.strip() if '```' in cleaned: # Extract content between code blocks match = re.search(r'```(?:json)?\s*(\{.*\})\s*```', cleaned, re.DOTALL) if match: cleaned = match.group(1) else: # Remove all code block markers cleaned = re.sub(r'```(?:json)?', '', cleaned).strip() # Strategy 3: Find first { and last } first_brace = cleaned.find('{') last_brace = cleaned.rfind('}') if first_brace != -1 and last_brace != -1 and last_brace > first_brace: cleaned = cleaned[first_brace:last_brace + 1] # Strategy 4: Try parsing cleaned JSON try: parsed = json.loads(cleaned) logger.info("Successfully parsed JSON after cleaning") return parsed except json.JSONDecodeError as e: logger.warning(f"JSON parse error at position {e.pos}: {e.msg}") # Strategy 5: Attempt to fix common JSON issues cleaned = self._fix_json_syntax(cleaned) try: parsed = json.loads(cleaned) logger.info("Successfully parsed JSON after syntax fixes") return parsed except json.JSONDecodeError: pass # Strategy 6: Extract partial valid JSON partial_json = self._extract_partial_json(cleaned) if partial_json: logger.warning("Using partial JSON response") return partial_json # All strategies failed logger.error(f"All JSON parsing strategies failed. Response length: {len(response_text)}") logger.error(f"Response preview: {response_text[:500]}...") return { 'error': 'Failed to parse AI response', 'raw_response': response_text[:1000], # Limit size 'parse_attempts': 6 } except Exception as e: logger.error(f"Unexpected error in _parse_response: {e}", exc_info=True) return { 'error': f'Parse exception: {str(e)}', 'raw_response': response_text[:500] if response_text else 'None' } def _fix_json_syntax(self, json_str: str) -> str: """Attempt to fix common JSON syntax issues""" try: # Remove trailing commas before closing braces/brackets json_str = re.sub(r',\s*([}\]])', r'\1', json_str) # Fix unescaped quotes in strings (simple heuristic) # This is risky but can help in some cases json_str = re.sub(r'(? Dict: """Extract valid partial JSON by finding complete objects""" try: # Try to find complete nested structures depth = 0 start_idx = json_str.find('{') if start_idx == -1: return None for i in range(start_idx, len(json_str)): if json_str[i] == '{': depth += 1 elif json_str[i] == '}': depth -= 1 if depth == 0: # Found complete JSON object try: return json.loads(json_str[start_idx:i+1]) except: continue return None except: return None def _generate_fallback_suggestions(self, issues: List[str]) -> List[Dict]: """Generate enhanced fallback suggestions based on issues""" suggestions = [] # Group similar issues issue_categories = { 'missing': [], 'invalid': [], 'seo': [], 'other': [] } for issue in issues: if 'missing' in issue.lower() or 'mandatory' in issue.lower(): issue_categories['missing'].append(issue) elif 'invalid' in issue.lower() or 'not in valid' in issue.lower(): issue_categories['invalid'].append(issue) elif issue.startswith('SEO:'): issue_categories['seo'].append(issue) else: issue_categories['other'].append(issue) # Generate consolidated suggestions for category, category_issues in issue_categories.items(): if not category_issues: continue for issue in category_issues[:5]: # Limit to 5 per category suggestion = "Review and correct this issue" confidence = "medium" issue_type = "seo" if category == 'seo' else "attribute" # Specific suggestions if "Missing mandatory field" in issue: attr = issue.split(":")[-1].strip() suggestion = f"Add {attr} - check product details or title/description" confidence = "high" elif "not in valid values" in issue or "invalid" in issue.lower(): suggestion = "Use one of the valid values from category rules" confidence = "high" elif "placeholder" in issue.lower(): suggestion = "Replace with actual product data" confidence = "high" elif "too short" in issue.lower(): if "title" in issue.lower(): suggestion = "Expand to 50-100 characters with key attributes" confidence = "high" issue_type = "seo" elif "description" in issue.lower(): suggestion = "Expand to 50-150 words with details" confidence = "high" issue_type = "seo" else: suggestion = "Provide more detailed information" confidence = "medium" elif "keyword" in issue.lower() or "search term" in issue.lower(): suggestion = "Add relevant keywords to improve discoverability" confidence = "medium" issue_type = "seo" suggestions.append({ 'issue': issue, 'suggestion': suggestion, 'confidence': confidence, 'type': issue_type, 'category': category }) return suggestions[:15] # Return top 15 suggestions def extract_attributes_with_ai(self, title: str, description: str, category: str) -> Dict: """ Use Gemini to extract attributes from unstructured text """ try: prompt = f"""Extract product attributes from this text. Return ONLY valid JSON. Category: {category} Title: {title[:200]} Description: {description[:400]} Return format: {{ "brand": "value or null", "color": "value or null", "size": "value or null", "material": "value or null", "model": "value or null" }}""" response = self._call_gemini_api(prompt, max_tokens=1024) if not response or not response.candidates: return {'error': 'No response'} return self._parse_response(response.text) except Exception as e: logger.error(f"AI extraction error: {str(e)}") return {'error': str(e)}