Jelajahi Sumber

Thread added

Student Yadav 3 bulan lalu
induk
melakukan
4d1209166a
1 mengubah file dengan 114 tambahan dan 8 penghapusan
  1. 114 8
      core/services/gemini_service.py

+ 114 - 8
core/services/gemini_service.py

@@ -1863,30 +1863,62 @@
 
 
 
-
 
 
 # gemini_service_enhanced.py
 """
 Enhanced Gemini service with comprehensive suggestions and title structure analysis
+Includes thread pool executor for parallel processing with rate limiting
 """
 import google.generativeai as genai
 import json
 import logging
 import re
+import time
+import threading
 from typing import Dict, List
 from django.conf import settings
 from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
+from concurrent.futures import ThreadPoolExecutor, as_completed
 import traceback
-import time
 
 # Configure logging
 logger = logging.getLogger(__name__)
 
+# Global rate limiter
+class RateLimiter:
+    """Thread-safe rate limiter for API calls"""
+    def __init__(self, max_calls_per_minute=10):
+        self.max_calls = max_calls_per_minute
+        self.calls = []
+        self.lock = threading.Lock()
+    
+    def wait_if_needed(self):
+        """Wait if rate limit would be exceeded"""
+        with self.lock:
+            now = time.time()
+            # Remove calls older than 60 seconds
+            self.calls = [call_time for call_time in self.calls if now - call_time < 60]
+            
+            if len(self.calls) >= self.max_calls:
+                # Calculate wait time
+                oldest_call = min(self.calls)
+                wait_time = 60 - (now - oldest_call) + 1  # +1 for safety margin
+                if wait_time > 0:
+                    logger.info(f"Rate limit reached. Waiting {wait_time:.2f} seconds...")
+                    time.sleep(wait_time)
+                    # Clean up old calls again after waiting
+                    now = time.time()
+                    self.calls = [call_time for call_time in self.calls if now - call_time < 60]
+            
+            # Record this call
+            self.calls.append(time.time())
+            logger.debug(f"Rate limiter: {len(self.calls)} calls in last 60 seconds")
+
 class GeminiAttributeService:
     """Enhanced service with comprehensive AI suggestions and title structure analysis"""
     
-    def __init__(self):
+    def __init__(self, max_workers=3, max_calls_per_minute=10):
         api_key = getattr(settings, 'GEMINI_API_KEY', None)
         if not api_key:
             logger.error("GEMINI_API_KEY not found in settings")
@@ -1894,15 +1926,20 @@ class GeminiAttributeService:
         
         genai.configure(api_key=api_key)
         self.model = genai.GenerativeModel('gemini-2.5-flash')
-        logger.info("GeminiAttributeService initialized successfully")
+        self.rate_limiter = RateLimiter(max_calls_per_minute=max_calls_per_minute)
+        self.max_workers = max_workers
+        logger.info(f"GeminiAttributeService initialized with {max_workers} workers, {max_calls_per_minute} calls/min")
     
     @retry(
         stop=stop_after_attempt(3),
-        wait=wait_exponential(multiplier=1, min=2, max=10),
+        wait=wait_exponential(multiplier=2, min=4, max=30),
         retry=retry_if_exception_type((Exception,))
     )
     def _call_gemini_api(self, prompt, max_tokens=8192, attempt=1):
-        """Helper method to call Gemini API with retry logic"""
+        """Helper method to call Gemini API with retry logic and rate limiting"""
+        # Wait if rate limit would be exceeded
+        self.rate_limiter.wait_if_needed()
+        
         logger.info(f"Calling Gemini API (attempt {attempt}, max_tokens={max_tokens})")
         logger.debug(f"Prompt length: {len(prompt)} characters")
         
@@ -1950,8 +1987,75 @@ class GeminiAttributeService:
         except Exception as e:
             logger.error(f"Gemini API call failed (attempt {attempt}): {type(e).__name__} - {str(e)}")
             logger.debug(f"Full exception traceback: {traceback.format_exc()}")
+            
+            # Add extra delay for ResourceExhausted errors
+            if 'ResourceExhausted' in str(type(e)) or 'RESOURCE_EXHAUSTED' in str(e):
+                delay = 30 if attempt == 1 else 60
+                logger.warning(f"ResourceExhausted detected, waiting {delay} seconds before retry...")
+                time.sleep(delay)
+            
             raise
         
+    def generate_comprehensive_suggestions_batch(
+        self,
+        products: List[Dict],
+        issues_list: List[List[str]],
+        category_rules_list: List[List[Dict]],
+        scores_list: List[Dict]
+    ) -> List[Dict]:
+        """
+        Generate comprehensive AI suggestions for multiple products in parallel
+        
+        Args:
+            products: List of product dictionaries
+            issues_list: List of issues for each product
+            category_rules_list: List of category rules for each product
+            scores_list: List of scores for each product
+            
+        Returns:
+            List of suggestion dictionaries in the same order as input
+        """
+        total_products = len(products)
+        logger.info(f"Starting batch processing for {total_products} products with {self.max_workers} workers")
+        
+        results = [None] * total_products  # Preserve order
+        
+        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
+            # Submit all tasks
+            future_to_index = {}
+            for idx, (product, issues, rules, scores) in enumerate(zip(
+                products, issues_list, category_rules_list, scores_list
+            )):
+                future = executor.submit(
+                    self.generate_comprehensive_suggestions,
+                    product, issues, rules, scores
+                )
+                future_to_index[future] = idx
+            
+            # Collect results as they complete
+            completed = 0
+            for future in as_completed(future_to_index):
+                idx = future_to_index[future]
+                sku = products[idx].get('sku', 'UNKNOWN')
+                
+                try:
+                    result = future.result()
+                    results[idx] = result
+                    completed += 1
+                    logger.info(f"Completed {completed}/{total_products}: SKU {sku}")
+                except Exception as e:
+                    logger.error(f"Failed to process SKU {sku}: {type(e).__name__} - {str(e)}")
+                    results[idx] = {
+                        'error': f'{type(e).__name__}: {str(e)}',
+                        'fallback_suggestions': self._generate_fallback_suggestions(
+                            issues_list[idx][:15] if idx < len(issues_list) else []
+                        )
+                    }
+                    completed += 1
+        
+        logger.info(f"Batch processing complete: {completed}/{total_products} products processed")
+        return results
+    
     def generate_comprehensive_suggestions(
         self,
         product: Dict,
@@ -2052,7 +2156,6 @@ class GeminiAttributeService:
                 'error': f'{type(e).__name__}: {str(e)}',
                 'fallback_suggestions': self._generate_fallback_suggestions(issues[:15])
             }
-            time.sleep(6)
             return result
     
     def _build_comprehensive_prompt(
@@ -2414,4 +2517,7 @@ CRITICAL: Keep response under 7000 tokens. Focus on top 5 most impactful improve
             })
         
         logger.info(f"Generated {len(suggestions)} fallback suggestions")
-        return suggestions
+        return suggestions
+    
+
+