123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800 |
- # #gemini_service.py
- # import google.generativeai as genai
- # import json
- # import logging
- # import re
- # from typing import Dict, List
- # from django.conf import settings
- # from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
- # logger = logging.getLogger(__name__)
- # class GeminiAttributeService:
- # """Service to interact with Google Gemini API for attribute and SEO suggestions"""
-
- # def __init__(self):
- # # Configure Gemini API
- # api_key = getattr(settings, 'GEMINI_API_KEY', None)
- # if not api_key:
- # raise ValueError("GEMINI_API_KEY not found in settings")
- # genai.configure(api_key=api_key)
- # self.model = genai.GenerativeModel('gemini-2.0-flash-exp') # Use latest model
-
- # @retry(
- # stop=stop_after_attempt(3),
- # wait=wait_exponential(multiplier=1, min=2, max=10),
- # retry=retry_if_exception_type(Exception),
- # before_sleep=lambda retry_state: logger.info(f"Retrying Gemini API call, attempt {retry_state.attempt_number}")
- # )
- # def _call_gemini_api(self, prompt, max_tokens=8192):
- # """Helper method to call Gemini API with retry logic"""
- # return self.model.generate_content(
- # prompt,
- # generation_config=genai.types.GenerationConfig(
- # temperature=0.2, # Lower for more consistent JSON
- # top_p=0.9,
- # top_k=40,
- # max_output_tokens=max_tokens, # Increased default
- # response_mime_type="application/json" # Force JSON output
- # ),
- # safety_settings={
- # genai.types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: genai.types.HarmBlockThreshold.BLOCK_NONE,
- # genai.types.HarmCategory.HARM_CATEGORY_HARASSMENT: genai.types.HarmBlockThreshold.BLOCK_NONE,
- # genai.types.HarmCategory.HARM_CATEGORY_HATE_SPEECH: genai.types.HarmBlockThreshold.BLOCK_NONE,
- # genai.types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: genai.types.HarmBlockThreshold.BLOCK_NONE
- # }
- # )
- # def generate_attribute_suggestions(
- # self,
- # product: Dict,
- # issues: List[str],
- # category_rules: List[Dict]
- # ) -> Dict:
- # """
- # Use Gemini to generate intelligent suggestions for fixing attribute issues
- # Includes SEO-aware recommendations with robust error handling
- # """
- # try:
- # # Limit issues to prevent prompt overflow
- # limited_issues = issues[:15] if len(issues) > 15 else issues
-
- # prompt = self._build_prompt(product, limited_issues, category_rules)
- # response = self._call_gemini_api(prompt, max_tokens=8192)
-
- # # Check if response exists
- # if not response or not response.candidates:
- # logger.error(f"No candidates returned for SKU: {product.get('sku')}")
- # return {
- # 'error': 'No candidates returned by Gemini API',
- # 'fallback_suggestions': self._generate_fallback_suggestions(limited_issues)
- # }
-
- # candidate = response.candidates[0]
- # finish_reason_name = candidate.finish_reason.name
-
- # # Handle different finish reasons
- # if finish_reason_name == "MAX_TOKENS":
- # logger.warning(f"Max tokens reached for SKU: {product.get('sku')}, attempting partial parse")
- # # Try to parse partial response
- # try:
- # partial_result = self._parse_response(response.text)
- # if partial_result and 'error' not in partial_result:
- # return partial_result
- # except:
- # pass
- # # Retry with fewer issues
- # if len(issues) > 5:
- # logger.info("Retrying with fewer issues")
- # return self.generate_attribute_suggestions(product, issues[:5], category_rules)
- # else:
- # return {
- # 'error': 'Response too long, using fallback',
- # 'fallback_suggestions': self._generate_fallback_suggestions(limited_issues)
- # }
-
- # elif finish_reason_name in ("SAFETY", "RECITATION", "OTHER"):
- # logger.error(f"Response blocked by {finish_reason_name} for SKU: {product.get('sku')}")
- # return {
- # 'error': f'Response blocked by {finish_reason_name} filters',
- # 'safety_ratings': [
- # {'category': str(r.category), 'probability': str(r.probability)}
- # for r in candidate.safety_ratings
- # ],
- # 'fallback_suggestions': self._generate_fallback_suggestions(limited_issues)
- # }
-
- # elif finish_reason_name != "STOP":
- # logger.warning(f"Unexpected finish reason: {finish_reason_name}")
- # return {
- # 'error': f'Unexpected finish reason: {finish_reason_name}',
- # 'fallback_suggestions': self._generate_fallback_suggestions(limited_issues)
- # }
-
- # # Parse successful response
- # logger.info(f"Successfully received response for SKU: {product.get('sku')}")
- # suggestions = self._parse_response(response.text)
-
- # if 'error' in suggestions:
- # logger.warning(f"Parse error for SKU: {product.get('sku')}, using fallback")
- # suggestions['fallback_suggestions'] = self._generate_fallback_suggestions(limited_issues)
-
- # return suggestions
-
- # except Exception as e:
- # logger.error(f"Gemini API error for SKU {product.get('sku')}: {str(e)}", exc_info=True)
- # return {
- # 'error': str(e),
- # 'fallback_suggestions': self._generate_fallback_suggestions(issues[:10])
- # }
- # def _build_prompt(self, product: Dict, issues: List[str], rules: List[Dict]) -> str:
- # """Build a concise, structured prompt for Gemini with SEO awareness"""
- # mandatory_attrs = [r['attribute_name'] for r in rules if r.get('is_mandatory')]
- # valid_values_map = {
- # r['attribute_name']: r.get('valid_values', [])[:5] # Limit to 5 values
- # for r in rules if r.get('valid_values')
- # }
-
- # # Sanitize and categorize issues
- # cleaned_issues = [
- # issue.replace("suspiciously short", "short value")
- # .replace("not recognized", "invalid")
- # .replace("likely means", "should be")
- # .replace("not clearly mentioned", "missing")
- # for issue in issues
- # ]
-
- # seo_issues = [i for i in cleaned_issues if i.startswith("SEO:")][:5]
- # attribute_issues = [i for i in cleaned_issues if not i.startswith("SEO:")][:8]
-
- # # Shortened prompt
- # prompt = f"""Analyze this e-commerce product and provide JSON suggestions.
- # PRODUCT:
- # SKU: {product.get('sku')}
- # Category: {product.get('category')}
- # Title: {product.get('title', '')[:200]}
- # Description: {product.get('description', '')[:300]}
- # Attributes: {json.dumps(product.get('attributes', {}), ensure_ascii=False)}
- # RULES:
- # Mandatory: {', '.join(mandatory_attrs)}
- # Valid Values: {json.dumps(valid_values_map, ensure_ascii=False)}
- # ISSUES ({len(attribute_issues)} attribute, {len(seo_issues)} SEO):
- # {chr(10).join(f"• {i}" for i in attribute_issues[:8])}
- # {chr(10).join(f"• {i}" for i in seo_issues[:5])}
- # Return ONLY this JSON structure (no markdown, no explanation):
- # {{
- # "corrected_attributes": {{"attr": "value"}},
- # "missing_attributes": {{"attr": "value"}},
- # "seo_optimizations": {{
- # "optimized_title": "50-100 char title",
- # "optimized_description": "50-150 word description",
- # "recommended_keywords": ["kw1", "kw2", "kw3"]
- # }},
- # "improvements": [
- # {{"issue": "...", "suggestion": "...", "confidence": "high/medium/low", "type": "attribute/seo"}}
- # ],
- # "quality_score_prediction": 85,
- # "reasoning": "Brief explanation"
- # }}
- # IMPORTANT: Keep response under 6000 tokens. Prioritize top 3 most critical improvements."""
- # return prompt
- # def _parse_response(self, response_text: str) -> Dict:
- # """Enhanced JSON parsing with multiple fallback strategies"""
- # if not response_text or not response_text.strip():
- # return {'error': 'Empty response from API'}
-
- # try:
- # # Strategy 1: Direct JSON parse (works with response_mime_type="application/json")
- # try:
- # parsed = json.loads(response_text)
- # logger.info("Successfully parsed JSON directly")
- # return parsed
- # except json.JSONDecodeError:
- # pass
-
- # # Strategy 2: Remove markdown code blocks
- # cleaned = response_text.strip()
- # if '```' in cleaned:
- # # Extract content between code blocks
- # match = re.search(r'```(?:json)?\s*(\{.*\})\s*```', cleaned, re.DOTALL)
- # if match:
- # cleaned = match.group(1)
- # else:
- # # Remove all code block markers
- # cleaned = re.sub(r'```(?:json)?', '', cleaned).strip()
-
- # # Strategy 3: Find first { and last }
- # first_brace = cleaned.find('{')
- # last_brace = cleaned.rfind('}')
-
- # if first_brace != -1 and last_brace != -1 and last_brace > first_brace:
- # cleaned = cleaned[first_brace:last_brace + 1]
-
- # # Strategy 4: Try parsing cleaned JSON
- # try:
- # parsed = json.loads(cleaned)
- # logger.info("Successfully parsed JSON after cleaning")
- # return parsed
- # except json.JSONDecodeError as e:
- # logger.warning(f"JSON parse error at position {e.pos}: {e.msg}")
-
- # # Strategy 5: Attempt to fix common JSON issues
- # cleaned = self._fix_json_syntax(cleaned)
- # try:
- # parsed = json.loads(cleaned)
- # logger.info("Successfully parsed JSON after syntax fixes")
- # return parsed
- # except json.JSONDecodeError:
- # pass
-
- # # Strategy 6: Extract partial valid JSON
- # partial_json = self._extract_partial_json(cleaned)
- # if partial_json:
- # logger.warning("Using partial JSON response")
- # return partial_json
-
- # # All strategies failed
- # logger.error(f"All JSON parsing strategies failed. Response length: {len(response_text)}")
- # logger.error(f"Response preview: {response_text[:500]}...")
-
- # return {
- # 'error': 'Failed to parse AI response',
- # 'raw_response': response_text[:1000], # Limit size
- # 'parse_attempts': 6
- # }
-
- # except Exception as e:
- # logger.error(f"Unexpected error in _parse_response: {e}", exc_info=True)
- # return {
- # 'error': f'Parse exception: {str(e)}',
- # 'raw_response': response_text[:500] if response_text else 'None'
- # }
-
- # def _fix_json_syntax(self, json_str: str) -> str:
- # """Attempt to fix common JSON syntax issues"""
- # try:
- # # Remove trailing commas before closing braces/brackets
- # json_str = re.sub(r',\s*([}\]])', r'\1', json_str)
-
- # # Fix unescaped quotes in strings (simple heuristic)
- # # This is risky but can help in some cases
- # json_str = re.sub(r'(?<!\\)"(?=[^,:}\]]*[,:}\]])', '\\"', json_str)
-
- # # Remove any trailing content after final }
- # last_brace = json_str.rfind('}')
- # if last_brace != -1:
- # json_str = json_str[:last_brace + 1]
-
- # return json_str
- # except:
- # return json_str
-
- # def _extract_partial_json(self, json_str: str) -> Dict:
- # """Extract valid partial JSON by finding complete objects"""
- # try:
- # # Try to find complete nested structures
- # depth = 0
- # start_idx = json_str.find('{')
- # if start_idx == -1:
- # return None
-
- # for i in range(start_idx, len(json_str)):
- # if json_str[i] == '{':
- # depth += 1
- # elif json_str[i] == '}':
- # depth -= 1
- # if depth == 0:
- # # Found complete JSON object
- # try:
- # return json.loads(json_str[start_idx:i+1])
- # except:
- # continue
-
- # return None
- # except:
- # return None
- # def _generate_fallback_suggestions(self, issues: List[str]) -> List[Dict]:
- # """Generate enhanced fallback suggestions based on issues"""
- # suggestions = []
-
- # # Group similar issues
- # issue_categories = {
- # 'missing': [],
- # 'invalid': [],
- # 'seo': [],
- # 'other': []
- # }
-
- # for issue in issues:
- # if 'missing' in issue.lower() or 'mandatory' in issue.lower():
- # issue_categories['missing'].append(issue)
- # elif 'invalid' in issue.lower() or 'not in valid' in issue.lower():
- # issue_categories['invalid'].append(issue)
- # elif issue.startswith('SEO:'):
- # issue_categories['seo'].append(issue)
- # else:
- # issue_categories['other'].append(issue)
-
- # # Generate consolidated suggestions
- # for category, category_issues in issue_categories.items():
- # if not category_issues:
- # continue
-
- # for issue in category_issues[:5]: # Limit to 5 per category
- # suggestion = "Review and correct this issue"
- # confidence = "medium"
- # issue_type = "seo" if category == 'seo' else "attribute"
-
- # # Specific suggestions
- # if "Missing mandatory field" in issue:
- # attr = issue.split(":")[-1].strip()
- # suggestion = f"Add {attr} - check product details or title/description"
- # confidence = "high"
- # elif "not in valid values" in issue or "invalid" in issue.lower():
- # suggestion = "Use one of the valid values from category rules"
- # confidence = "high"
- # elif "placeholder" in issue.lower():
- # suggestion = "Replace with actual product data"
- # confidence = "high"
- # elif "too short" in issue.lower():
- # if "title" in issue.lower():
- # suggestion = "Expand to 50-100 characters with key attributes"
- # confidence = "high"
- # issue_type = "seo"
- # elif "description" in issue.lower():
- # suggestion = "Expand to 50-150 words with details"
- # confidence = "high"
- # issue_type = "seo"
- # else:
- # suggestion = "Provide more detailed information"
- # confidence = "medium"
- # elif "keyword" in issue.lower() or "search term" in issue.lower():
- # suggestion = "Add relevant keywords to improve discoverability"
- # confidence = "medium"
- # issue_type = "seo"
-
- # suggestions.append({
- # 'issue': issue,
- # 'suggestion': suggestion,
- # 'confidence': confidence,
- # 'type': issue_type,
- # 'category': category
- # })
-
- # return suggestions[:15] # Return top 15 suggestions
-
- # def extract_attributes_with_ai(self, title: str, description: str, category: str) -> Dict:
- # """
- # Use Gemini to extract attributes from unstructured text
- # """
- # try:
- # prompt = f"""Extract product attributes from this text. Return ONLY valid JSON.
- # Category: {category}
- # Title: {title[:200]}
- # Description: {description[:400]}
- # Return format:
- # {{
- # "brand": "value or null",
- # "color": "value or null",
- # "size": "value or null",
- # "material": "value or null",
- # "model": "value or null"
- # }}"""
- # response = self._call_gemini_api(prompt, max_tokens=1024)
-
- # if not response or not response.candidates:
- # return {'error': 'No response'}
-
- # return self._parse_response(response.text)
-
- # except Exception as e:
- # logger.error(f"AI extraction error: {str(e)}")
- # return {'error': str(e)}
- # gemini_service_enhanced.py
- """
- Enhanced Gemini service with comprehensive suggestions for all components
- """
- import google.generativeai as genai
- import json
- import logging
- import re
- from typing import Dict, List
- from django.conf import settings
- from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
- logger = logging.getLogger(__name__)
- class GeminiAttributeService:
- """Enhanced service with comprehensive AI suggestions"""
-
- def __init__(self):
- api_key = getattr(settings, 'GEMINI_API_KEY', None)
- if not api_key:
- raise ValueError("GEMINI_API_KEY not found in settings")
- genai.configure(api_key=api_key)
- self.model = genai.GenerativeModel('gemini-2.0-flash-exp')
-
- @retry(
- stop=stop_after_attempt(3),
- wait=wait_exponential(multiplier=1, min=2, max=10),
- retry=retry_if_exception_type(Exception)
- )
- def _call_gemini_api(self, prompt, max_tokens=8192):
- """Helper method to call Gemini API with retry logic"""
- return self.model.generate_content(
- prompt,
- generation_config=genai.types.GenerationConfig(
- temperature=0.2,
- top_p=0.9,
- top_k=40,
- max_output_tokens=max_tokens,
- response_mime_type="application/json"
- ),
- safety_settings={
- genai.types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: genai.types.HarmBlockThreshold.BLOCK_NONE,
- genai.types.HarmCategory.HARM_CATEGORY_HARASSMENT: genai.types.HarmBlockThreshold.BLOCK_NONE,
- genai.types.HarmCategory.HARM_CATEGORY_HATE_SPEECH: genai.types.HarmBlockThreshold.BLOCK_NONE,
- genai.types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: genai.types.HarmBlockThreshold.BLOCK_NONE
- }
- )
-
- def generate_comprehensive_suggestions(
- self,
- product: Dict,
- issues: List[str],
- category_rules: List[Dict],
- scores: Dict
- ) -> Dict:
- """
- Generate comprehensive AI suggestions covering ALL quality aspects
- """
- try:
- limited_issues = issues[:20] if len(issues) > 20 else issues
-
- prompt = self._build_comprehensive_prompt(product, limited_issues, category_rules, scores)
- response = self._call_gemini_api(prompt, max_tokens=8192)
-
- if not response or not response.candidates:
- logger.error(f"No candidates returned for SKU: {product.get('sku')}")
- return {
- 'error': 'No response from AI',
- 'fallback_suggestions': self._generate_fallback_suggestions(limited_issues)
- }
-
- candidate = response.candidates[0]
- finish_reason = candidate.finish_reason.name
-
- if finish_reason != "STOP":
- logger.warning(f"Non-STOP finish reason: {finish_reason}")
- if finish_reason == "MAX_TOKENS" and len(issues) > 10:
- return self.generate_comprehensive_suggestions(product, issues[:10], category_rules, scores)
- return {
- 'error': f'Response blocked: {finish_reason}',
- 'fallback_suggestions': self._generate_fallback_suggestions(limited_issues)
- }
-
- logger.info(f"Successfully received comprehensive suggestions for SKU: {product.get('sku')}")
- suggestions = self._parse_response(response.text)
-
- if 'error' in suggestions:
- suggestions['fallback_suggestions'] = self._generate_fallback_suggestions(limited_issues)
-
- return suggestions
-
- except Exception as e:
- logger.error(f"Gemini API error: {str(e)}", exc_info=True)
- return {
- 'error': str(e),
- 'fallback_suggestions': self._generate_fallback_suggestions(issues[:15])
- }
-
- def _build_comprehensive_prompt(
- self,
- product: Dict,
- issues: List[str],
- rules: List[Dict],
- scores: Dict
- ) -> str:
- """Build comprehensive prompt for all quality aspects"""
-
- mandatory_attrs = [r['attribute_name'] for r in rules if r.get('is_mandatory')]
- valid_values_map = {
- r['attribute_name']: r.get('valid_values', [])[:5]
- for r in rules if r.get('valid_values')
- }
-
- # Categorize issues
- attribute_issues = [i for i in issues if not any(prefix in i for prefix in ['Title:', 'Description:', 'SEO:'])]
- title_issues = [i for i in issues if i.startswith('Title:')]
- desc_issues = [i for i in issues if i.startswith('Description:')]
- seo_issues = [i for i in issues if i.startswith('SEO:')]
-
- prompt = f"""Analyze this e-commerce product and provide comprehensive quality improvements.
- PRODUCT DATA:
- SKU: {product.get('sku')}
- Category: {product.get('category')}
- Title: {product.get('title', '')[:250]}
- Description: {product.get('description', '')[:400]}
- Attributes: {json.dumps(product.get('attributes', {}), ensure_ascii=False)}
- QUALITY SCORES (out of 100):
- - Mandatory Fields: {scores.get('mandatory_fields', 0):.1f}
- - Standardization: {scores.get('standardization', 0):.1f}
- - Missing Values: {scores.get('missing_values', 0):.1f}
- - Consistency: {scores.get('consistency', 0):.1f}
- - SEO: {scores.get('seo_discoverability', 0):.1f}
- - Title Quality: {scores.get('title_quality', 0):.1f}
- - Description Quality: {scores.get('description_quality', 0):.1f}
- CATEGORY RULES:
- Mandatory Attributes: {', '.join(mandatory_attrs)}
- Valid Values: {json.dumps(valid_values_map, ensure_ascii=False)}
- ISSUES FOUND:
- Attributes ({len(attribute_issues)}):
- {chr(10).join(f" • {i}" for i in attribute_issues[:8])}
- Title ({len(title_issues)}):
- {chr(10).join(f" • {i}" for i in title_issues[:5])}
- Description ({len(desc_issues)}):
- {chr(10).join(f" • {i}" for i in desc_issues[:5])}
- SEO ({len(seo_issues)}):
- {chr(10).join(f" • {i}" for i in seo_issues[:5])}
- Return ONLY this JSON structure:
- {{
- "corrected_attributes": {{
- "attr_name": "corrected_value"
- }},
- "missing_attributes": {{
- "attr_name": "suggested_value"
- }},
- "improved_title": "optimized title (50-100 chars, includes brand, model, key features)",
- "improved_description": "enhanced description (50-150 words, features, benefits, specs, use cases)",
- "seo_keywords": ["keyword1", "keyword2", "keyword3"],
- "improvements": [
- {{
- "component": "attributes/title/description/seo",
- "issue": "specific issue",
- "suggestion": "how to fix",
- "priority": "high/medium/low",
- "confidence": "high/medium/low"
- }}
- ],
- "quality_score_prediction": 85,
- "summary": "Brief 2-3 sentence summary of key improvements needed"
- }}
- CRITICAL: Keep response under 7000 tokens. Focus on top 5 most impactful improvements."""
- return prompt
-
- def _parse_response(self, response_text: str) -> Dict:
- """Enhanced JSON parsing with fallback strategies"""
- if not response_text or not response_text.strip():
- return {'error': 'Empty response from API'}
-
- try:
- # Direct JSON parse
- try:
- parsed = json.loads(response_text)
- logger.info("Successfully parsed JSON directly")
- return parsed
- except json.JSONDecodeError:
- pass
-
- # Remove markdown code blocks
- cleaned = response_text.strip()
- if '```' in cleaned:
- match = re.search(r'```(?:json)?\s*(\{.*\})\s*```', cleaned, re.DOTALL)
- if match:
- cleaned = match.group(1)
- else:
- cleaned = re.sub(r'```(?:json)?', '', cleaned).strip()
-
- # Find first { and last }
- first_brace = cleaned.find('{')
- last_brace = cleaned.rfind('}')
-
- if first_brace != -1 and last_brace != -1 and last_brace > first_brace:
- cleaned = cleaned[first_brace:last_brace + 1]
-
- # Try parsing cleaned JSON
- try:
- parsed = json.loads(cleaned)
- logger.info("Successfully parsed JSON after cleaning")
- return parsed
- except json.JSONDecodeError as e:
- logger.warning(f"JSON parse error: {e}")
-
- # Fix common JSON issues
- cleaned = self._fix_json_syntax(cleaned)
- try:
- parsed = json.loads(cleaned)
- logger.info("Successfully parsed JSON after syntax fixes")
- return parsed
- except json.JSONDecodeError:
- pass
-
- # Extract partial valid JSON
- partial_json = self._extract_partial_json(cleaned)
- if partial_json:
- logger.warning("Using partial JSON response")
- return partial_json
-
- logger.error(f"All JSON parsing failed. Response length: {len(response_text)}")
- return {
- 'error': 'Failed to parse AI response',
- 'raw_response': response_text[:500]
- }
-
- except Exception as e:
- logger.error(f"Parse exception: {e}", exc_info=True)
- return {
- 'error': f'Parse exception: {str(e)}',
- 'raw_response': response_text[:500] if response_text else 'None'
- }
-
- def _fix_json_syntax(self, json_str: str) -> str:
- """Fix common JSON syntax issues"""
- try:
- # Remove trailing commas
- json_str = re.sub(r',\s*([}\]])', r'\1', json_str)
-
- # Remove trailing content after final }
- last_brace = json_str.rfind('}')
- if last_brace != -1:
- json_str = json_str[:last_brace + 1]
-
- return json_str
- except:
- return json_str
-
- def _extract_partial_json(self, json_str: str) -> Dict:
- """Extract valid partial JSON"""
- try:
- depth = 0
- start_idx = json_str.find('{')
- if start_idx == -1:
- return None
-
- for i in range(start_idx, len(json_str)):
- if json_str[i] == '{':
- depth += 1
- elif json_str[i] == '}':
- depth -= 1
- if depth == 0:
- try:
- return json.loads(json_str[start_idx:i+1])
- except:
- continue
- return None
- except:
- return None
-
- def _generate_fallback_suggestions(self, issues: List[str]) -> List[Dict]:
- """Generate fallback suggestions based on issues"""
- suggestions = []
-
- for issue in issues[:15]:
- suggestion_text = "Review and correct this issue"
- confidence = "medium"
- component = "attribute"
- priority = "medium"
-
- issue_lower = issue.lower()
-
- # Determine component
- if issue.startswith('Title:'):
- component = "title"
- elif issue.startswith('Description:'):
- component = "description"
- elif issue.startswith('SEO:'):
- component = "seo"
-
- # Specific suggestions
- if "missing mandatory" in issue_lower:
- attr = issue.split(":")[-1].strip()
- suggestion_text = f"Add required {attr} - check product packaging or manufacturer details"
- priority = "high"
- confidence = "high"
- elif "too short" in issue_lower:
- if "title" in issue_lower:
- suggestion_text = "Expand title to 50-100 characters including brand, model, and key features"
- component = "title"
- priority = "high"
- elif "description" in issue_lower:
- suggestion_text = "Write comprehensive 50-150 word description with features, benefits, and specifications"
- component = "description"
- priority = "high"
- else:
- suggestion_text = "Provide more detailed information"
- elif "placeholder" in issue_lower:
- suggestion_text = "Replace with actual product data from manufacturer or packaging"
- priority = "high"
- elif "grammar" in issue_lower or "spelling" in issue_lower:
- suggestion_text = "Run spell-check and grammar review, ensure professional language"
- component = "description"
- priority = "medium"
- elif "keyword" in issue_lower or "seo" in issue_lower:
- suggestion_text = "Add relevant search keywords and product attributes"
- component = "seo"
- priority = "medium"
- elif "duplicate" in issue_lower or "repetit" in issue_lower:
- suggestion_text = "Remove duplicate content, provide varied information with unique details"
- component = "description"
- priority = "medium"
- elif "not recognized" in issue_lower or "invalid" in issue_lower:
- suggestion_text = "Use standardized values from category rules"
- priority = "high"
- confidence = "high"
-
- suggestions.append({
- 'component': component,
- 'issue': issue,
- 'suggestion': suggestion_text,
- 'priority': priority,
- 'confidence': confidence
- })
-
- return suggestions
|