# #gemini_service.py # import google.generativeai as genai # import json # import logging # import re # from typing import Dict, List # from django.conf import settings # from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type # logger = logging.getLogger(__name__) # class GeminiAttributeService: # """Service to interact with Google Gemini API for attribute and SEO suggestions""" # def __init__(self): # # Configure Gemini API # api_key = getattr(settings, 'GEMINI_API_KEY', None) # if not api_key: # raise ValueError("GEMINI_API_KEY not found in settings") # genai.configure(api_key=api_key) # self.model = genai.GenerativeModel('gemini-2.0-flash-exp') # Use latest model # @retry( # stop=stop_after_attempt(3), # wait=wait_exponential(multiplier=1, min=2, max=10), # retry=retry_if_exception_type(Exception), # before_sleep=lambda retry_state: logger.info(f"Retrying Gemini API call, attempt {retry_state.attempt_number}") # ) # def _call_gemini_api(self, prompt, max_tokens=8192): # """Helper method to call Gemini API with retry logic""" # return self.model.generate_content( # prompt, # generation_config=genai.types.GenerationConfig( # temperature=0.2, # Lower for more consistent JSON # top_p=0.9, # top_k=40, # max_output_tokens=max_tokens, # Increased default # response_mime_type="application/json" # Force JSON output # ), # safety_settings={ # genai.types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: genai.types.HarmBlockThreshold.BLOCK_NONE, # genai.types.HarmCategory.HARM_CATEGORY_HARASSMENT: genai.types.HarmBlockThreshold.BLOCK_NONE, # genai.types.HarmCategory.HARM_CATEGORY_HATE_SPEECH: genai.types.HarmBlockThreshold.BLOCK_NONE, # genai.types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: genai.types.HarmBlockThreshold.BLOCK_NONE # } # ) # def generate_attribute_suggestions( # self, # product: Dict, # issues: List[str], # category_rules: List[Dict] # ) -> Dict: # """ # Use Gemini to generate intelligent suggestions for fixing attribute issues # Includes SEO-aware recommendations with robust error handling # """ # try: # # Limit issues to prevent prompt overflow # limited_issues = issues[:15] if len(issues) > 15 else issues # prompt = self._build_prompt(product, limited_issues, category_rules) # response = self._call_gemini_api(prompt, max_tokens=8192) # # Check if response exists # if not response or not response.candidates: # logger.error(f"No candidates returned for SKU: {product.get('sku')}") # return { # 'error': 'No candidates returned by Gemini API', # 'fallback_suggestions': self._generate_fallback_suggestions(limited_issues) # } # candidate = response.candidates[0] # finish_reason_name = candidate.finish_reason.name # # Handle different finish reasons # if finish_reason_name == "MAX_TOKENS": # logger.warning(f"Max tokens reached for SKU: {product.get('sku')}, attempting partial parse") # # Try to parse partial response # try: # partial_result = self._parse_response(response.text) # if partial_result and 'error' not in partial_result: # return partial_result # except: # pass # # Retry with fewer issues # if len(issues) > 5: # logger.info("Retrying with fewer issues") # return self.generate_attribute_suggestions(product, issues[:5], category_rules) # else: # return { # 'error': 'Response too long, using fallback', # 'fallback_suggestions': self._generate_fallback_suggestions(limited_issues) # } # elif finish_reason_name in ("SAFETY", "RECITATION", "OTHER"): # logger.error(f"Response blocked by {finish_reason_name} for SKU: {product.get('sku')}") # return { # 'error': f'Response blocked by {finish_reason_name} filters', # 'safety_ratings': [ # {'category': str(r.category), 'probability': str(r.probability)} # for r in candidate.safety_ratings # ], # 'fallback_suggestions': self._generate_fallback_suggestions(limited_issues) # } # elif finish_reason_name != "STOP": # logger.warning(f"Unexpected finish reason: {finish_reason_name}") # return { # 'error': f'Unexpected finish reason: {finish_reason_name}', # 'fallback_suggestions': self._generate_fallback_suggestions(limited_issues) # } # # Parse successful response # logger.info(f"Successfully received response for SKU: {product.get('sku')}") # suggestions = self._parse_response(response.text) # if 'error' in suggestions: # logger.warning(f"Parse error for SKU: {product.get('sku')}, using fallback") # suggestions['fallback_suggestions'] = self._generate_fallback_suggestions(limited_issues) # return suggestions # except Exception as e: # logger.error(f"Gemini API error for SKU {product.get('sku')}: {str(e)}", exc_info=True) # return { # 'error': str(e), # 'fallback_suggestions': self._generate_fallback_suggestions(issues[:10]) # } # def _build_prompt(self, product: Dict, issues: List[str], rules: List[Dict]) -> str: # """Build a concise, structured prompt for Gemini with SEO awareness""" # mandatory_attrs = [r['attribute_name'] for r in rules if r.get('is_mandatory')] # valid_values_map = { # r['attribute_name']: r.get('valid_values', [])[:5] # Limit to 5 values # for r in rules if r.get('valid_values') # } # # Sanitize and categorize issues # cleaned_issues = [ # issue.replace("suspiciously short", "short value") # .replace("not recognized", "invalid") # .replace("likely means", "should be") # .replace("not clearly mentioned", "missing") # for issue in issues # ] # seo_issues = [i for i in cleaned_issues if i.startswith("SEO:")][:5] # attribute_issues = [i for i in cleaned_issues if not i.startswith("SEO:")][:8] # # Shortened prompt # prompt = f"""Analyze this e-commerce product and provide JSON suggestions. # PRODUCT: # SKU: {product.get('sku')} # Category: {product.get('category')} # Title: {product.get('title', '')[:200]} # Description: {product.get('description', '')[:300]} # Attributes: {json.dumps(product.get('attributes', {}), ensure_ascii=False)} # RULES: # Mandatory: {', '.join(mandatory_attrs)} # Valid Values: {json.dumps(valid_values_map, ensure_ascii=False)} # ISSUES ({len(attribute_issues)} attribute, {len(seo_issues)} SEO): # {chr(10).join(f"• {i}" for i in attribute_issues[:8])} # {chr(10).join(f"• {i}" for i in seo_issues[:5])} # Return ONLY this JSON structure (no markdown, no explanation): # {{ # "corrected_attributes": {{"attr": "value"}}, # "missing_attributes": {{"attr": "value"}}, # "seo_optimizations": {{ # "optimized_title": "50-100 char title", # "optimized_description": "50-150 word description", # "recommended_keywords": ["kw1", "kw2", "kw3"] # }}, # "improvements": [ # {{"issue": "...", "suggestion": "...", "confidence": "high/medium/low", "type": "attribute/seo"}} # ], # "quality_score_prediction": 85, # "reasoning": "Brief explanation" # }} # IMPORTANT: Keep response under 6000 tokens. Prioritize top 3 most critical improvements.""" # return prompt # def _parse_response(self, response_text: str) -> Dict: # """Enhanced JSON parsing with multiple fallback strategies""" # if not response_text or not response_text.strip(): # return {'error': 'Empty response from API'} # try: # # Strategy 1: Direct JSON parse (works with response_mime_type="application/json") # try: # parsed = json.loads(response_text) # logger.info("Successfully parsed JSON directly") # return parsed # except json.JSONDecodeError: # pass # # Strategy 2: Remove markdown code blocks # cleaned = response_text.strip() # if '```' in cleaned: # # Extract content between code blocks # match = re.search(r'```(?:json)?\s*(\{.*\})\s*```', cleaned, re.DOTALL) # if match: # cleaned = match.group(1) # else: # # Remove all code block markers # cleaned = re.sub(r'```(?:json)?', '', cleaned).strip() # # Strategy 3: Find first { and last } # first_brace = cleaned.find('{') # last_brace = cleaned.rfind('}') # if first_brace != -1 and last_brace != -1 and last_brace > first_brace: # cleaned = cleaned[first_brace:last_brace + 1] # # Strategy 4: Try parsing cleaned JSON # try: # parsed = json.loads(cleaned) # logger.info("Successfully parsed JSON after cleaning") # return parsed # except json.JSONDecodeError as e: # logger.warning(f"JSON parse error at position {e.pos}: {e.msg}") # # Strategy 5: Attempt to fix common JSON issues # cleaned = self._fix_json_syntax(cleaned) # try: # parsed = json.loads(cleaned) # logger.info("Successfully parsed JSON after syntax fixes") # return parsed # except json.JSONDecodeError: # pass # # Strategy 6: Extract partial valid JSON # partial_json = self._extract_partial_json(cleaned) # if partial_json: # logger.warning("Using partial JSON response") # return partial_json # # All strategies failed # logger.error(f"All JSON parsing strategies failed. Response length: {len(response_text)}") # logger.error(f"Response preview: {response_text[:500]}...") # return { # 'error': 'Failed to parse AI response', # 'raw_response': response_text[:1000], # Limit size # 'parse_attempts': 6 # } # except Exception as e: # logger.error(f"Unexpected error in _parse_response: {e}", exc_info=True) # return { # 'error': f'Parse exception: {str(e)}', # 'raw_response': response_text[:500] if response_text else 'None' # } # def _fix_json_syntax(self, json_str: str) -> str: # """Attempt to fix common JSON syntax issues""" # try: # # Remove trailing commas before closing braces/brackets # json_str = re.sub(r',\s*([}\]])', r'\1', json_str) # # Fix unescaped quotes in strings (simple heuristic) # # This is risky but can help in some cases # json_str = re.sub(r'(? Dict: # """Extract valid partial JSON by finding complete objects""" # try: # # Try to find complete nested structures # depth = 0 # start_idx = json_str.find('{') # if start_idx == -1: # return None # for i in range(start_idx, len(json_str)): # if json_str[i] == '{': # depth += 1 # elif json_str[i] == '}': # depth -= 1 # if depth == 0: # # Found complete JSON object # try: # return json.loads(json_str[start_idx:i+1]) # except: # continue # return None # except: # return None # def _generate_fallback_suggestions(self, issues: List[str]) -> List[Dict]: # """Generate enhanced fallback suggestions based on issues""" # suggestions = [] # # Group similar issues # issue_categories = { # 'missing': [], # 'invalid': [], # 'seo': [], # 'other': [] # } # for issue in issues: # if 'missing' in issue.lower() or 'mandatory' in issue.lower(): # issue_categories['missing'].append(issue) # elif 'invalid' in issue.lower() or 'not in valid' in issue.lower(): # issue_categories['invalid'].append(issue) # elif issue.startswith('SEO:'): # issue_categories['seo'].append(issue) # else: # issue_categories['other'].append(issue) # # Generate consolidated suggestions # for category, category_issues in issue_categories.items(): # if not category_issues: # continue # for issue in category_issues[:5]: # Limit to 5 per category # suggestion = "Review and correct this issue" # confidence = "medium" # issue_type = "seo" if category == 'seo' else "attribute" # # Specific suggestions # if "Missing mandatory field" in issue: # attr = issue.split(":")[-1].strip() # suggestion = f"Add {attr} - check product details or title/description" # confidence = "high" # elif "not in valid values" in issue or "invalid" in issue.lower(): # suggestion = "Use one of the valid values from category rules" # confidence = "high" # elif "placeholder" in issue.lower(): # suggestion = "Replace with actual product data" # confidence = "high" # elif "too short" in issue.lower(): # if "title" in issue.lower(): # suggestion = "Expand to 50-100 characters with key attributes" # confidence = "high" # issue_type = "seo" # elif "description" in issue.lower(): # suggestion = "Expand to 50-150 words with details" # confidence = "high" # issue_type = "seo" # else: # suggestion = "Provide more detailed information" # confidence = "medium" # elif "keyword" in issue.lower() or "search term" in issue.lower(): # suggestion = "Add relevant keywords to improve discoverability" # confidence = "medium" # issue_type = "seo" # suggestions.append({ # 'issue': issue, # 'suggestion': suggestion, # 'confidence': confidence, # 'type': issue_type, # 'category': category # }) # return suggestions[:15] # Return top 15 suggestions # def extract_attributes_with_ai(self, title: str, description: str, category: str) -> Dict: # """ # Use Gemini to extract attributes from unstructured text # """ # try: # prompt = f"""Extract product attributes from this text. Return ONLY valid JSON. # Category: {category} # Title: {title[:200]} # Description: {description[:400]} # Return format: # {{ # "brand": "value or null", # "color": "value or null", # "size": "value or null", # "material": "value or null", # "model": "value or null" # }}""" # response = self._call_gemini_api(prompt, max_tokens=1024) # if not response or not response.candidates: # return {'error': 'No response'} # return self._parse_response(response.text) # except Exception as e: # logger.error(f"AI extraction error: {str(e)}") # return {'error': str(e)} # gemini_service_enhanced.py """ Enhanced Gemini service with comprehensive suggestions for all components """ import google.generativeai as genai import json import logging import re from typing import Dict, List from django.conf import settings from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type logger = logging.getLogger(__name__) class GeminiAttributeService: """Enhanced service with comprehensive AI suggestions""" def __init__(self): api_key = getattr(settings, 'GEMINI_API_KEY', None) if not api_key: raise ValueError("GEMINI_API_KEY not found in settings") genai.configure(api_key=api_key) self.model = genai.GenerativeModel('gemini-2.0-flash-exp') @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10), retry=retry_if_exception_type(Exception) ) def _call_gemini_api(self, prompt, max_tokens=8192): """Helper method to call Gemini API with retry logic""" return self.model.generate_content( prompt, generation_config=genai.types.GenerationConfig( temperature=0.2, top_p=0.9, top_k=40, max_output_tokens=max_tokens, response_mime_type="application/json" ), safety_settings={ genai.types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: genai.types.HarmBlockThreshold.BLOCK_NONE, genai.types.HarmCategory.HARM_CATEGORY_HARASSMENT: genai.types.HarmBlockThreshold.BLOCK_NONE, genai.types.HarmCategory.HARM_CATEGORY_HATE_SPEECH: genai.types.HarmBlockThreshold.BLOCK_NONE, genai.types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: genai.types.HarmBlockThreshold.BLOCK_NONE } ) def generate_comprehensive_suggestions( self, product: Dict, issues: List[str], category_rules: List[Dict], scores: Dict ) -> Dict: """ Generate comprehensive AI suggestions covering ALL quality aspects """ try: limited_issues = issues[:20] if len(issues) > 20 else issues prompt = self._build_comprehensive_prompt(product, limited_issues, category_rules, scores) response = self._call_gemini_api(prompt, max_tokens=8192) if not response or not response.candidates: logger.error(f"No candidates returned for SKU: {product.get('sku')}") return { 'error': 'No response from AI', 'fallback_suggestions': self._generate_fallback_suggestions(limited_issues) } candidate = response.candidates[0] finish_reason = candidate.finish_reason.name if finish_reason != "STOP": logger.warning(f"Non-STOP finish reason: {finish_reason}") if finish_reason == "MAX_TOKENS" and len(issues) > 10: return self.generate_comprehensive_suggestions(product, issues[:10], category_rules, scores) return { 'error': f'Response blocked: {finish_reason}', 'fallback_suggestions': self._generate_fallback_suggestions(limited_issues) } logger.info(f"Successfully received comprehensive suggestions for SKU: {product.get('sku')}") suggestions = self._parse_response(response.text) if 'error' in suggestions: suggestions['fallback_suggestions'] = self._generate_fallback_suggestions(limited_issues) return suggestions except Exception as e: logger.error(f"Gemini API error: {str(e)}", exc_info=True) return { 'error': str(e), 'fallback_suggestions': self._generate_fallback_suggestions(issues[:15]) } def _build_comprehensive_prompt( self, product: Dict, issues: List[str], rules: List[Dict], scores: Dict ) -> str: """Build comprehensive prompt for all quality aspects""" mandatory_attrs = [r['attribute_name'] for r in rules if r.get('is_mandatory')] valid_values_map = { r['attribute_name']: r.get('valid_values', [])[:5] for r in rules if r.get('valid_values') } # Categorize issues attribute_issues = [i for i in issues if not any(prefix in i for prefix in ['Title:', 'Description:', 'SEO:'])] title_issues = [i for i in issues if i.startswith('Title:')] desc_issues = [i for i in issues if i.startswith('Description:')] seo_issues = [i for i in issues if i.startswith('SEO:')] prompt = f"""Analyze this e-commerce product and provide comprehensive quality improvements. PRODUCT DATA: SKU: {product.get('sku')} Category: {product.get('category')} Title: {product.get('title', '')[:250]} Description: {product.get('description', '')[:400]} Attributes: {json.dumps(product.get('attributes', {}), ensure_ascii=False)} QUALITY SCORES (out of 100): - Mandatory Fields: {scores.get('mandatory_fields', 0):.1f} - Standardization: {scores.get('standardization', 0):.1f} - Missing Values: {scores.get('missing_values', 0):.1f} - Consistency: {scores.get('consistency', 0):.1f} - SEO: {scores.get('seo_discoverability', 0):.1f} - Title Quality: {scores.get('title_quality', 0):.1f} - Description Quality: {scores.get('description_quality', 0):.1f} CATEGORY RULES: Mandatory Attributes: {', '.join(mandatory_attrs)} Valid Values: {json.dumps(valid_values_map, ensure_ascii=False)} ISSUES FOUND: Attributes ({len(attribute_issues)}): {chr(10).join(f" • {i}" for i in attribute_issues[:8])} Title ({len(title_issues)}): {chr(10).join(f" • {i}" for i in title_issues[:5])} Description ({len(desc_issues)}): {chr(10).join(f" • {i}" for i in desc_issues[:5])} SEO ({len(seo_issues)}): {chr(10).join(f" • {i}" for i in seo_issues[:5])} Return ONLY this JSON structure: {{ "corrected_attributes": {{ "attr_name": "corrected_value" }}, "missing_attributes": {{ "attr_name": "suggested_value" }}, "improved_title": "optimized title (50-100 chars, includes brand, model, key features)", "improved_description": "enhanced description (50-150 words, features, benefits, specs, use cases)", "seo_keywords": ["keyword1", "keyword2", "keyword3"], "improvements": [ {{ "component": "attributes/title/description/seo", "issue": "specific issue", "suggestion": "how to fix", "priority": "high/medium/low", "confidence": "high/medium/low" }} ], "quality_score_prediction": 85, "summary": "Brief 2-3 sentence summary of key improvements needed" }} CRITICAL: Keep response under 7000 tokens. Focus on top 5 most impactful improvements.""" return prompt def _parse_response(self, response_text: str) -> Dict: """Enhanced JSON parsing with fallback strategies""" if not response_text or not response_text.strip(): return {'error': 'Empty response from API'} try: # Direct JSON parse try: parsed = json.loads(response_text) logger.info("Successfully parsed JSON directly") return parsed except json.JSONDecodeError: pass # Remove markdown code blocks cleaned = response_text.strip() if '```' in cleaned: match = re.search(r'```(?:json)?\s*(\{.*\})\s*```', cleaned, re.DOTALL) if match: cleaned = match.group(1) else: cleaned = re.sub(r'```(?:json)?', '', cleaned).strip() # Find first { and last } first_brace = cleaned.find('{') last_brace = cleaned.rfind('}') if first_brace != -1 and last_brace != -1 and last_brace > first_brace: cleaned = cleaned[first_brace:last_brace + 1] # Try parsing cleaned JSON try: parsed = json.loads(cleaned) logger.info("Successfully parsed JSON after cleaning") return parsed except json.JSONDecodeError as e: logger.warning(f"JSON parse error: {e}") # Fix common JSON issues cleaned = self._fix_json_syntax(cleaned) try: parsed = json.loads(cleaned) logger.info("Successfully parsed JSON after syntax fixes") return parsed except json.JSONDecodeError: pass # Extract partial valid JSON partial_json = self._extract_partial_json(cleaned) if partial_json: logger.warning("Using partial JSON response") return partial_json logger.error(f"All JSON parsing failed. Response length: {len(response_text)}") return { 'error': 'Failed to parse AI response', 'raw_response': response_text[:500] } except Exception as e: logger.error(f"Parse exception: {e}", exc_info=True) return { 'error': f'Parse exception: {str(e)}', 'raw_response': response_text[:500] if response_text else 'None' } def _fix_json_syntax(self, json_str: str) -> str: """Fix common JSON syntax issues""" try: # Remove trailing commas json_str = re.sub(r',\s*([}\]])', r'\1', json_str) # Remove trailing content after final } last_brace = json_str.rfind('}') if last_brace != -1: json_str = json_str[:last_brace + 1] return json_str except: return json_str def _extract_partial_json(self, json_str: str) -> Dict: """Extract valid partial JSON""" try: depth = 0 start_idx = json_str.find('{') if start_idx == -1: return None for i in range(start_idx, len(json_str)): if json_str[i] == '{': depth += 1 elif json_str[i] == '}': depth -= 1 if depth == 0: try: return json.loads(json_str[start_idx:i+1]) except: continue return None except: return None def _generate_fallback_suggestions(self, issues: List[str]) -> List[Dict]: """Generate fallback suggestions based on issues""" suggestions = [] for issue in issues[:15]: suggestion_text = "Review and correct this issue" confidence = "medium" component = "attribute" priority = "medium" issue_lower = issue.lower() # Determine component if issue.startswith('Title:'): component = "title" elif issue.startswith('Description:'): component = "description" elif issue.startswith('SEO:'): component = "seo" # Specific suggestions if "missing mandatory" in issue_lower: attr = issue.split(":")[-1].strip() suggestion_text = f"Add required {attr} - check product packaging or manufacturer details" priority = "high" confidence = "high" elif "too short" in issue_lower: if "title" in issue_lower: suggestion_text = "Expand title to 50-100 characters including brand, model, and key features" component = "title" priority = "high" elif "description" in issue_lower: suggestion_text = "Write comprehensive 50-150 word description with features, benefits, and specifications" component = "description" priority = "high" else: suggestion_text = "Provide more detailed information" elif "placeholder" in issue_lower: suggestion_text = "Replace with actual product data from manufacturer or packaging" priority = "high" elif "grammar" in issue_lower or "spelling" in issue_lower: suggestion_text = "Run spell-check and grammar review, ensure professional language" component = "description" priority = "medium" elif "keyword" in issue_lower or "seo" in issue_lower: suggestion_text = "Add relevant search keywords and product attributes" component = "seo" priority = "medium" elif "duplicate" in issue_lower or "repetit" in issue_lower: suggestion_text = "Remove duplicate content, provide varied information with unique details" component = "description" priority = "medium" elif "not recognized" in issue_lower or "invalid" in issue_lower: suggestion_text = "Use standardized values from category rules" priority = "high" confidence = "high" suggestions.append({ 'component': component, 'issue': issue, 'suggestion': suggestion_text, 'priority': priority, 'confidence': confidence }) return suggestions