Jelajahi Sumber

all fixed and optimized with load balancer

Harshit Pathak 3 bulan lalu
induk
melakukan
fd7773bf97

+ 662 - 0
attr_extraction/llm_load_balancer.py

@@ -0,0 +1,662 @@
+"""
+Enhanced Multi-API-Key Load Balancer with Smart Rate Limiting
+Optimized for Groq's free tier (30 RPM per key, 14K daily limit)
+"""
+
+import time
+import threading
+import requests
+import logging
+from typing import List, Dict, Optional
+from dataclasses import dataclass, field
+from datetime import datetime
+from django.conf import settings
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class APIKeyState:
+    """Tracks state and health of a single API key"""
+    key: str
+    name: str
+    requests_made: int = 0
+    last_request_time: float = 0
+    is_available: bool = True
+    rate_limit_reset_time: Optional[float] = None
+    consecutive_failures: int = 0
+    total_requests: int = 0
+    total_failures: int = 0
+    request_times: list = field(default_factory=list)
+    requests_per_minute: int = 25  # Conservative: 25 instead of 30
+    min_request_interval: float = 2.5  # Minimum 2.5s between requests per key
+    
+    def can_make_request(self) -> bool:
+        """Check if key can make a request (rate limit + spacing)"""
+        now = time.time()
+        
+        # Check minimum interval between requests
+        if self.last_request_time and (now - self.last_request_time) < self.min_request_interval:
+            return False
+        
+        # Remove requests older than 1 minute
+        self.request_times = [t for t in self.request_times if now - t < 60]
+        return len(self.request_times) < self.requests_per_minute
+    
+    def mark_success(self):
+        now = time.time()
+        self.requests_made += 1
+        self.total_requests += 1
+        self.last_request_time = now
+        self.request_times.append(now)
+        self.consecutive_failures = 0
+        self.is_available = True
+        self.rate_limit_reset_time = None
+        
+        # Keep only last 60 seconds
+        self.request_times = [t for t in self.request_times if now - t < 60]
+    
+    def mark_failure(self, is_rate_limit: bool = False, retry_after: Optional[int] = None):
+        self.consecutive_failures += 1
+        self.total_failures += 1
+        
+        if is_rate_limit:
+            self.is_available = False
+            reset_time = time.time() + (retry_after or 65)  # 65s default
+            self.rate_limit_reset_time = reset_time
+            logger.warning(f"🚫 {self.name} rate limited until {datetime.fromtimestamp(reset_time).strftime('%H:%M:%S')}")
+        
+        # Disable after 5 consecutive failures (increased from 3)
+        if self.consecutive_failures >= 5:
+            self.is_available = False
+            self.rate_limit_reset_time = time.time() + 120  # 2 min cooldown
+            logger.error(f"❌ {self.name} disabled (cooldown 2min)")
+    
+    def check_availability(self) -> bool:
+        """Check if key is available"""
+        # Check rate limit reset
+        if self.rate_limit_reset_time and time.time() >= self.rate_limit_reset_time:
+            self.is_available = True
+            self.rate_limit_reset_time = None
+            self.consecutive_failures = 0
+            logger.info(f"✅ {self.name} recovered")
+            return True
+        
+        if not self.is_available:
+            return False
+        
+        return self.can_make_request()
+    
+    def get_stats(self) -> Dict:
+        success_count = self.total_requests - self.total_failures
+        success_rate = (success_count / max(self.total_requests, 1)) * 100
+        
+        return {
+            "name": self.name,
+            "total_requests": self.total_requests,
+            "total_failures": self.total_failures,
+            "success_rate": round(success_rate, 2),
+            "is_available": self.check_availability(),
+            "consecutive_failures": self.consecutive_failures,
+            "current_rpm": len(self.request_times),
+            "max_rpm": self.requests_per_minute,
+            "time_since_last_request": round(time.time() - self.last_request_time, 1) if self.last_request_time else None
+        }
+
+
+class MultiKeyLLMLoadBalancer:
+    """Enhanced load balancer with smart rate limiting"""
+    
+    def __init__(self, api_keys: List[Dict[str, str]], strategy: str = "round_robin"):
+        if not api_keys:
+            raise ValueError("At least one API key required")
+        
+        self.keys = [APIKeyState(key=k['key'], name=k['name']) for k in api_keys]
+        self.strategy = strategy
+        self.current_index = 0
+        self.lock = threading.Lock()
+        self.total_requests = 0
+        self.total_failures = 0
+        self.global_last_request = 0
+        self.min_global_interval = 0.5  # 500ms between ANY requests
+        
+        logger.info(f"🔑 Load balancer initialized: {len(self.keys)} keys, '{strategy}' strategy")
+    
+    def get_next_key(self) -> Optional[APIKeyState]:
+        """Get next available key with global rate limiting"""
+        with self.lock:
+            # Enforce minimum global interval
+            now = time.time()
+            time_since_last = now - self.global_last_request
+            if time_since_last < self.min_global_interval:
+                wait_time = self.min_global_interval - time_since_last
+                time.sleep(wait_time)
+            
+            if self.strategy == "least_loaded":
+                return self._least_loaded_select()
+            else:
+                return self._round_robin_select()
+    
+    def _round_robin_select(self) -> Optional[APIKeyState]:
+        """Round-robin with availability check"""
+        attempts = 0
+        total_keys = len(self.keys)
+        
+        while attempts < total_keys:
+            key = self.keys[self.current_index]
+            self.current_index = (self.current_index + 1) % total_keys
+            
+            if key.check_availability():
+                return key
+            
+            attempts += 1
+        
+        return self._wait_for_available_key()
+    
+    def _least_loaded_select(self) -> Optional[APIKeyState]:
+        """Select least loaded key"""
+        available = [k for k in self.keys if k.check_availability()]
+        
+        if not available:
+            return self._wait_for_available_key()
+        
+        available.sort(key=lambda k: (len(k.request_times), k.last_request_time))
+        return available[0]
+    
+    def _wait_for_available_key(self, max_wait: float = 5.0) -> Optional[APIKeyState]:
+        """Wait for next available key (with timeout)"""
+        keys_with_reset = [k for k in self.keys if k.rate_limit_reset_time]
+        
+        if not keys_with_reset:
+            # Check if any key just needs spacing
+            now = time.time()
+            for key in self.keys:
+                if key.is_available:
+                    wait = key.min_request_interval - (now - key.last_request_time)
+                    if 0 < wait < max_wait:
+                        logger.info(f"⏳ Waiting {wait:.1f}s for {key.name}...")
+                        time.sleep(wait + 0.1)
+                        return key if key.check_availability() else None
+            return None
+        
+        keys_with_reset.sort(key=lambda k: k.rate_limit_reset_time)
+        next_key = keys_with_reset[0]
+        wait = max(0, next_key.rate_limit_reset_time - time.time())
+        
+        if 0 < wait < max_wait:
+            logger.info(f"⏳ Waiting {wait:.1f}s for {next_key.name}...")
+            time.sleep(wait + 0.5)
+            return next_key if next_key.check_availability() else None
+        
+        return None
+    
+    def mark_success(self, key: APIKeyState):
+        with self.lock:
+            key.mark_success()
+            self.total_requests += 1
+            self.global_last_request = time.time()
+    
+    def mark_failure(self, key: APIKeyState, is_rate_limit: bool = False, retry_after: Optional[int] = None):
+        with self.lock:
+            key.mark_failure(is_rate_limit, retry_after)
+            self.total_failures += 1
+    
+    def get_stats(self) -> Dict:
+        with self.lock:
+            available_count = sum(1 for k in self.keys if k.check_availability())
+            success_rate = ((self.total_requests - self.total_failures) / max(self.total_requests, 1)) * 100
+            
+            return {
+                "total_keys": len(self.keys),
+                "available_keys": available_count,
+                "strategy": self.strategy,
+                "total_requests": self.total_requests,
+                "total_failures": self.total_failures,
+                "success_rate": round(success_rate, 2),
+                "keys": [k.get_stats() for k in self.keys]
+            }
+    
+    def call_llm(self, payload: dict, api_url: str, max_retries: int = None) -> str:
+        """Make LLM call with smart retry and failover"""
+        if max_retries is None:
+            max_retries = len(self.keys) * 3
+        
+        attempt = 0
+        last_error = None
+        keys_tried = set()
+        
+        while attempt < max_retries:
+            key_state = self.get_next_key()
+            
+            if not key_state:
+                if len(keys_tried) >= len(self.keys):
+                    # All keys tried, wait longer
+                    logger.warning(f"⏳ All keys exhausted. Waiting 3s...")
+                    time.sleep(3)
+                    keys_tried.clear()
+                
+                attempt += 1
+                continue
+            
+            keys_tried.add(key_state.name)
+            
+            try:
+                headers = {
+                    "Authorization": f"Bearer {key_state.key}",
+                    "Content-Type": "application/json"
+                }
+                
+                logger.debug(f"🔑 {key_state.name} (attempt {attempt + 1}/{max_retries})")
+                
+                response = requests.post(
+                    api_url,
+                    headers=headers,
+                    json=payload,
+                    timeout=30
+                )
+                
+                if response.status_code == 429:
+                    retry_after = int(response.headers.get('Retry-After', 65))
+                    self.mark_failure(key_state, is_rate_limit=True, retry_after=retry_after)
+                    attempt += 1
+                    time.sleep(1)  # Brief pause before next key
+                    continue
+                
+                response.raise_for_status()
+                
+                # Success
+                self.mark_success(key_state)
+                content = response.json()["choices"][0]["message"]["content"]
+                logger.debug(f"✅ Success via {key_state.name}")
+                return content
+                
+            except requests.exceptions.HTTPError as e:
+                if e.response and e.response.status_code == 429:
+                    retry_after = int(e.response.headers.get('Retry-After', 65))
+                    self.mark_failure(key_state, is_rate_limit=True, retry_after=retry_after)
+                else:
+                    self.mark_failure(key_state)
+                    logger.error(f"❌ HTTP error {key_state.name}: {e}")
+                last_error = e
+                attempt += 1
+                time.sleep(0.5)
+                
+            except Exception as e:
+                self.mark_failure(key_state)
+                logger.error(f"❌ Error {key_state.name}: {e}")
+                last_error = e
+                attempt += 1
+                time.sleep(0.5)
+        
+        stats = self.get_stats()
+        error_msg = (
+            f"LLM failed after {max_retries} attempts. "
+            f"Available: {stats['available_keys']}/{stats['total_keys']}. "
+            f"Error: {last_error}"
+        )
+        logger.error(f"💥 {error_msg}")
+        raise RuntimeError(error_msg)
+
+
+# Global instance
+_load_balancer: Optional[MultiKeyLLMLoadBalancer] = None
+_load_balancer_lock = threading.Lock()
+
+
+def get_llm_load_balancer() -> MultiKeyLLMLoadBalancer:
+    """Get singleton load balancer"""
+    global _load_balancer
+    
+    if _load_balancer is None:
+        with _load_balancer_lock:
+            if _load_balancer is None:
+                api_keys = getattr(settings, 'GROQ_API_KEYS', None)
+                
+                if not api_keys:
+                    single_key = getattr(settings, 'GROQ_API_KEY', None)
+                    if single_key:
+                        api_keys = [{'key': single_key, 'name': 'groq_key_1'}]
+                
+                if not api_keys:
+                    raise ValueError("No GROQ API keys configured")
+                
+                strategy = getattr(settings, 'LLM_LOAD_BALANCER_STRATEGY', 'round_robin')
+                _load_balancer = MultiKeyLLMLoadBalancer(api_keys, strategy=strategy)
+    
+    return _load_balancer
+
+
+def reset_load_balancer():
+    """Reset load balancer"""
+    global _load_balancer
+    with _load_balancer_lock:
+        _load_balancer = None
+
+
+def call_llm_with_load_balancer(payload: dict) -> str:
+    """Drop-in replacement for _call_llm"""
+    balancer = get_llm_load_balancer()
+    api_url = getattr(settings, 'GROQ_API_URL')
+    return balancer.call_llm(payload, api_url)
+
+
+def get_load_balancer_stats() -> Dict:
+    """Get stats"""
+    try:
+        return get_llm_load_balancer().get_stats()
+    except Exception as e:
+        return {"error": str(e), "total_keys": 0, "available_keys": 0}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+# """
+# Ultra-Safe Sequential Load Balancer with Adaptive Rate Limiting
+# Guaranteed to work with strict API rate limits
+# """
+
+# import time
+# import threading
+# import requests
+# import logging
+# from typing import List, Dict, Optional
+# from dataclasses import dataclass
+# from datetime import datetime
+# from django.conf import settings
+
+# logger = logging.getLogger(__name__)
+
+
+# @dataclass
+# class APIKeyState:
+#     """Simple key state tracker"""
+#     key: str
+#     name: str
+#     last_used: float = 0
+#     total_requests: int = 0
+#     total_failures: int = 0
+#     consecutive_failures: int = 0
+#     disabled_until: float = 0
+    
+#     def is_available(self) -> bool:
+#         """Check if key is available RIGHT NOW"""
+#         now = time.time()
+        
+#         # Check if disabled
+#         if self.disabled_until > now:
+#             return False
+        
+#         # Require 5 seconds between requests on SAME key
+#         if self.last_used > 0:
+#             elapsed = now - self.last_used
+#             if elapsed < 5.0:
+#                 return False
+        
+#         return True
+    
+#     def get_wait_time(self) -> float:
+#         """How long until this key is available?"""
+#         now = time.time()
+        
+#         if self.disabled_until > now:
+#             return self.disabled_until - now
+        
+#         if self.last_used > 0:
+#             elapsed = now - self.last_used
+#             if elapsed < 5.0:
+#                 return 5.0 - elapsed
+        
+#         return 0
+    
+#     def mark_success(self):
+#         self.last_used = time.time()
+#         self.total_requests += 1
+#         self.consecutive_failures = 0
+#         self.disabled_until = 0
+#         logger.info(f"✅ {self.name} success (total: {self.total_requests})")
+    
+#     def mark_failure(self, is_rate_limit: bool = False):
+#         self.last_used = time.time()
+#         self.total_requests += 1
+#         self.total_failures += 1
+#         self.consecutive_failures += 1
+        
+#         if is_rate_limit:
+#             # Rate limit: wait 90 seconds
+#             self.disabled_until = time.time() + 90
+#             logger.error(f"🚫 {self.name} RATE LIMITED → disabled for 90s")
+#         elif self.consecutive_failures >= 2:
+#             # 2 failures: wait 60 seconds
+#             self.disabled_until = time.time() + 60
+#             logger.error(f"❌ {self.name} FAILED {self.consecutive_failures}x → disabled for 60s")
+
+
+# class UltraSafeLoadBalancer:
+#     """
+#     Ultra-conservative load balancer
+#     - Minimum 5 seconds between requests on same key
+#     - Minimum 1 second between ANY requests (global)
+#     - Automatic waiting for key availability
+#     - No parallel requests
+#     """
+    
+#     def __init__(self, api_keys: List[Dict[str, str]]):
+#         if not api_keys:
+#             raise ValueError("At least one API key required")
+        
+#         self.keys = [APIKeyState(key=k['key'], name=k['name']) for k in api_keys]
+#         self.current_index = 0
+#         self.lock = threading.Lock()
+#         self.last_global_request = 0
+#         self.min_global_interval = 1.0  # 1 second between ANY requests
+        
+#         logger.info(f"🔑 Ultra-safe balancer: {len(self.keys)} keys, 5s per-key interval, 1s global interval")
+    
+#     def _enforce_global_rate_limit(self):
+#         """Ensure minimum time between ANY requests"""
+#         with self.lock:
+#             if self.last_global_request > 0:
+#                 elapsed = time.time() - self.last_global_request
+#                 if elapsed < self.min_global_interval:
+#                     wait = self.min_global_interval - elapsed
+#                     logger.debug(f"⏱️ Global rate limit: waiting {wait:.1f}s")
+#                     time.sleep(wait)
+#             self.last_global_request = time.time()
+    
+#     def get_next_key(self, max_wait: float = 30.0) -> Optional[APIKeyState]:
+#         """Get next available key, waiting if necessary"""
+#         start_time = time.time()
+        
+#         while (time.time() - start_time) < max_wait:
+#             with self.lock:
+#                 # Try round-robin
+#                 for _ in range(len(self.keys)):
+#                     key = self.keys[self.current_index]
+#                     self.current_index = (self.current_index + 1) % len(self.keys)
+                    
+#                     if key.is_available():
+#                         return key
+                
+#                 # No keys available - find the one that will be ready soonest
+#                 wait_times = [(k.get_wait_time(), k) for k in self.keys]
+#                 wait_times.sort()
+                
+#                 if wait_times:
+#                     min_wait, next_key = wait_times[0]
+                    
+#                     if min_wait > 0 and min_wait < 15:
+#                         logger.info(f"⏳ All keys busy. Waiting {min_wait:.1f}s for {next_key.name}...")
+#                         time.sleep(min_wait + 0.2)
+#                         continue
+            
+#             time.sleep(0.5)
+        
+#         # Timeout
+#         logger.error(f"❌ No keys available after {max_wait}s wait")
+#         return None
+    
+#     def call_llm(self, payload: dict, api_url: str, retry_count: int = 0) -> str:
+#         """
+#         Make LLM call with ONE key
+#         Retries with SAME key after waiting if it fails
+#         """
+#         # Enforce global rate limit FIRST
+#         self._enforce_global_rate_limit()
+        
+#         # Get available key
+#         key = self.get_next_key(max_wait=30.0)
+        
+#         if not key:
+#             raise RuntimeError("No API keys available after 30s wait")
+        
+#         try:
+#             headers = {
+#                 "Authorization": f"Bearer {key.key}",
+#                 "Content-Type": "application/json"
+#             }
+            
+#             logger.info(f"🔑 Request via {key.name} (retry: {retry_count})")
+            
+#             response = requests.post(
+#                 api_url,
+#                 headers=headers,
+#                 json=payload,
+#                 timeout=45
+#             )
+            
+#             # Check rate limit
+#             if response.status_code == 429:
+#                 key.mark_failure(is_rate_limit=True)
+                
+#                 if retry_count < 2:
+#                     logger.warning(f"⚠️ Rate limit hit, retrying with different key...")
+#                     time.sleep(2)
+#                     return self.call_llm(payload, api_url, retry_count + 1)
+#                 else:
+#                     raise RuntimeError(f"Rate limit on {key.name} after {retry_count} retries")
+            
+#             # Check for other errors
+#             response.raise_for_status()
+            
+#             # Success!
+#             key.mark_success()
+#             content = response.json()["choices"][0]["message"]["content"]
+#             return content
+            
+#         except requests.exceptions.HTTPError as e:
+#             if e.response and e.response.status_code == 429:
+#                 key.mark_failure(is_rate_limit=True)
+#             else:
+#                 key.mark_failure(is_rate_limit=False)
+            
+#             if retry_count < 2:
+#                 logger.warning(f"⚠️ HTTP error, retrying... ({e})")
+#                 time.sleep(2)
+#                 return self.call_llm(payload, api_url, retry_count + 1)
+#             else:
+#                 raise RuntimeError(f"HTTP error after {retry_count} retries: {e}")
+        
+#         except requests.exceptions.Timeout as e:
+#             key.mark_failure(is_rate_limit=False)
+            
+#             if retry_count < 1:
+#                 logger.warning(f"⏱️ Timeout, retrying...")
+#                 time.sleep(3)
+#                 return self.call_llm(payload, api_url, retry_count + 1)
+#             else:
+#                 raise RuntimeError(f"Timeout after {retry_count} retries: {e}")
+        
+#         except Exception as e:
+#             key.mark_failure(is_rate_limit=False)
+#             raise RuntimeError(f"Unexpected error with {key.name}: {e}")
+    
+#     def get_stats(self) -> Dict:
+#         with self.lock:
+#             available = sum(1 for k in self.keys if k.is_available())
+#             total_reqs = sum(k.total_requests for k in self.keys)
+#             total_fails = sum(k.total_failures for k in self.keys)
+#             success_rate = ((total_reqs - total_fails) / max(total_reqs, 1)) * 100
+            
+#             return {
+#                 "total_keys": len(self.keys),
+#                 "available_keys": available,
+#                 "total_requests": total_reqs,
+#                 "total_failures": total_fails,
+#                 "success_rate": round(success_rate, 2),
+#                 "keys": [
+#                     {
+#                         "name": k.name,
+#                         "total_requests": k.total_requests,
+#                         "total_failures": k.total_failures,
+#                         "consecutive_failures": k.consecutive_failures,
+#                         "is_available": k.is_available(),
+#                         "wait_time": round(k.get_wait_time(), 1)
+#                     }
+#                     for k in self.keys
+#                 ]
+#             }
+
+
+# # Global singleton
+# _balancer: Optional[UltraSafeLoadBalancer] = None
+# _balancer_lock = threading.Lock()
+
+
+# def get_llm_load_balancer() -> UltraSafeLoadBalancer:
+#     """Get singleton balancer"""
+#     global _balancer
+    
+#     if _balancer is None:
+#         with _balancer_lock:
+#             if _balancer is None:
+#                 api_keys = getattr(settings, 'GROQ_API_KEYS', None)
+                
+#                 if not api_keys:
+#                     single_key = getattr(settings, 'GROQ_API_KEY', None)
+#                     if single_key:
+#                         api_keys = [{'key': single_key, 'name': 'groq_key_1'}]
+                
+#                 if not api_keys:
+#                     raise ValueError("No GROQ_API_KEYS configured in settings")
+                
+#                 _balancer = UltraSafeLoadBalancer(api_keys)
+    
+#     return _balancer
+
+
+# def reset_load_balancer():
+#     """Reset balancer (for testing)"""
+#     global _balancer
+#     with _balancer_lock:
+#         _balancer = None
+#         logger.info("🔄 Balancer reset")
+
+
+# def call_llm_with_load_balancer(payload: dict) -> str:
+#     """
+#     Call LLM with ultra-safe rate limiting
+#     This is the drop-in replacement for services.py
+#     """
+#     balancer = get_llm_load_balancer()
+#     api_url = getattr(settings, 'GROQ_API_URL')
+#     return balancer.call_llm(payload, api_url)
+
+
+# def get_load_balancer_stats() -> Dict:
+#     """Get balancer stats"""
+#     try:
+#         return get_llm_load_balancer().get_stats()
+#     except Exception as e:
+#         return {"error": str(e)}

+ 399 - 36
attr_extraction/services.py

@@ -1,6 +1,357 @@
 
 
-# ==================== services.py (WITH USER VALUE REASONING) ====================
+# # ==================== services.py (WITH USER VALUE REASONING) ====================
+# import json
+# import hashlib
+# import logging
+# import time
+# from functools import wraps
+# from typing import Dict, List, Optional, Tuple
+# import requests
+# from django.conf import settings
+
+# from .llm_load_balancer import call_llm_with_load_balancer
+
+# from .cache_config import (
+#     is_caching_enabled,
+#     ENABLE_ATTRIBUTE_EXTRACTION_CACHE,
+#     ATTRIBUTE_CACHE_MAX_SIZE,
+# )
+
+# logger = logging.getLogger(__name__)
+
+# # --------------------------------------------------------------------------- #
+# # CACHES
+# # --------------------------------------------------------------------------- #
+# class SimpleCache:
+#     _cache = {}
+#     _max_size = ATTRIBUTE_CACHE_MAX_SIZE
+
+#     @classmethod
+#     def get(cls, key: str) -> Optional[Dict]:
+#         if not ENABLE_ATTRIBUTE_EXTRACTION_CACHE: return None
+#         return cls._cache.get(key)
+
+#     @classmethod
+#     def set(cls, key: str, value: Dict):
+#         if not ENABLE_ATTRIBUTE_EXTRACTION_CACHE: return
+#         if len(cls._cache) >= cls._max_size:
+#             items = list(cls._cache.items())
+#             cls._cache = dict(items[int(cls._max_size * 0.2):])
+#         cls._cache[key] = value
+
+#     @classmethod
+#     def clear(cls): cls._cache.clear()
+
+#     @classmethod
+#     def get_stats(cls) -> Dict:
+#         return {
+#             "enabled": ENABLE_ATTRIBUTE_EXTRACTION_CACHE,
+#             "size": len(cls._cache),
+#             "max_size": cls._max_size,
+#             "usage_percent": round(len(cls._cache)/cls._max_size*100, 2) if cls._max_size else 0
+#         }
+
+# # --------------------------------------------------------------------------- #
+# # RETRY DECORATOR
+# # --------------------------------------------------------------------------- #
+# def retry(max_attempts=3, delay=1.0):
+#     def decorator(f):
+#         @wraps(f)
+#         def wrapper(*args, **kwargs):
+#             last_exc = None
+#             for i in range(max_attempts):
+#                 try:
+#                     return f(*args, **kwargs)
+#                 except Exception as e:
+#                     last_exc = e
+#                     if i < max_attempts - 1:
+#                         wait = delay * (2 ** i)
+#                         logger.warning(f"Retry {i+1}/{max_attempts} after {wait}s: {e}")
+#                         time.sleep(wait)
+#             raise last_exc or RuntimeError("Retry failed")
+#         return wrapper
+#     return decorator
+
+# # --------------------------------------------------------------------------- #
+# # MAIN SERVICE
+# # --------------------------------------------------------------------------- #
+# class ProductAttributeService:
+#     @staticmethod
+#     def combine_product_text(title=None, short_desc=None, long_desc=None, ocr_text=None) -> Tuple[str, Dict[str, str]]:
+#         parts = []
+#         source_map = {}
+#         if title:
+#             t = str(title).strip()
+#             parts.append(f"Title: {t}")
+#             source_map["title"] = t
+#         if short_desc:
+#             s = str(short_desc).strip()
+#             parts.append(f"Description: {s}")
+#             source_map["short_desc"] = s
+#         if long_desc:
+#             l = str(long_desc).strip()
+#             parts.append(f"Details: {l}")
+#             source_map["long_desc"] = l
+#         if ocr_text:
+#             parts.append(f"OCR Text: {ocr_text}")
+#             source_map["ocr_text"] = ocr_text
+#         combined = "\n".join(parts).strip()
+#         return (combined or "No product information", source_map)
+
+#     @staticmethod
+#     def _cache_key(product_text: str, mandatory_attrs: Dict, extract_additional: bool, multiple: List[str], user_values: Dict = None) -> str:
+#         payload = {
+#             "text": product_text, 
+#             "attrs": mandatory_attrs, 
+#             "extra": extract_additional, 
+#             "multiple": sorted(multiple),
+#             "user_values": user_values or {}
+#         }
+#         return f"attr_{hashlib.md5(json.dumps(payload, sort_keys=True).encode()).hexdigest()}"
+
+#     @staticmethod
+#     def _clean_json(text: str) -> str:
+#         start = text.find("{")
+#         end = text.rfind("}") + 1
+#         if start != -1 and end > start:
+#             text = text[start:end]
+#         if "```json" in text:
+#             text = text.split("```json", 1)[1].split("```", 1)[0]
+#         elif "```" in text:
+#             text = text.split("```", 1)[1].split("```", 1)[0]
+#             if text.lstrip().startswith("json"): text = text[4:]
+#         return text.strip()
+
+#     @staticmethod
+#     def format_visual_attributes(visual_attributes: Dict) -> Dict:
+#         formatted = {}
+#         for key, value in visual_attributes.items():
+#             if isinstance(value, list):
+#                 formatted[key] = [{"value": str(item), "source": "image"} for item in value]
+#             elif isinstance(value, dict):
+#                 nested = {}
+#                 for sub_key, sub_val in value.items():
+#                     if isinstance(sub_val, list):
+#                         nested[sub_key] = [{"value": str(v), "source": "image"} for v in sub_val]
+#                     else:
+#                         nested[sub_key] = [{"value": str(sub_val), "source": "image"}]
+#                 formatted[key] = nested
+#             else:
+#                 formatted[key] = [{"value": str(value), "source": "image"}]
+#         return formatted
+
+#     # @staticmethod
+#     # @retry(max_attempts=3, delay=1.0)
+#     # def _call_llm(payload: dict) -> str:
+#     #     headers = {"Authorization": f"Bearer {settings.GROQ_API_KEY}", "Content-Type": "application/json"}
+#     #     resp = requests.post(settings.GROQ_API_URL, headers=headers, json=payload, timeout=30)
+#     #     resp.raise_for_status()
+#     #     return resp.json()["choices"][0]["message"]["content"]
+
+
+#     # At the top of services.py, add this import
+#     # from . import call_llm_with_load_balancer, get_load_balancer_stats
+
+#     # Replace the existing _call_llm method with this:
+#     @staticmethod
+#     @retry(max_attempts=3, delay=3.0)
+#     def _call_llm(payload: dict) -> str:
+#         """
+#         Call LLM using load balancer with multiple API keys
+#         Automatically handles rate limiting and failover
+#         """
+#         return call_llm_with_load_balancer(payload)
+
+
+
+#     @staticmethod
+#     def extract_attributes(
+#         product_text: str,
+#         mandatory_attrs: Dict[str, List[str]],
+#         source_map: Dict[str, str] = None,
+#         model: str = None,
+#         extract_additional: bool = True,
+#         multiple: Optional[List[str]] = None,
+#         use_cache: Optional[bool] = None,
+#         user_entered_values: Optional[Dict[str, str]] = None,  # NEW PARAMETER
+#     ) -> dict:
+#         if model is None: model = settings.SUPPORTED_MODELS[0]
+#         if multiple is None: multiple = []
+#         if source_map is None: source_map = {}
+#         if user_entered_values is None: user_entered_values = {}
+
+#         if use_cache is None: use_cache = ENABLE_ATTRIBUTE_EXTRACTION_CACHE
+#         if not is_caching_enabled(): use_cache = False
+
+#         cache_key = None
+#         if use_cache:
+#             cache_key = ProductAttributeService._cache_key(
+#                 product_text, mandatory_attrs, extract_additional, multiple, user_entered_values
+#             )
+#             cached = SimpleCache.get(cache_key)
+#             if cached:
+#                 logger.info(f"CACHE HIT {cache_key[:16]}...")
+#                 return cached
+
+#         # --------------------------- BUILD USER VALUES SECTION ---------------------------
+#         user_values_section = ""
+#         if user_entered_values:
+#             user_lines = []
+#             for attr, value in user_entered_values.items():
+#                 user_lines.append(f"  - {attr}: {value}")
+#             user_values_section = f"""
+# USER MANUALLY ENTERED VALUES:
+# {chr(10).join(user_lines)}
+
+# IMPORTANT INSTRUCTIONS FOR USER VALUES:
+# 1. Compare the user-entered value with what you find in the product text
+# 2. Evaluate if the user value is correct, partially correct, or incorrect for this product
+# 3. Choose the BEST value (could be user's value, or from allowed list, or inferred)
+# 4. Always provide a "reason" field explaining your decision
+# 5. DO NOT hallucinate - be honest if user's value seems wrong based on product evidence
+# 6. If user's value is not in the allowed list but seems correct, chose the most nearest value from the allowed list with proper reasoning.
+# """
+
+#         # --------------------------- PROMPT ---------------------------
+#         allowed_lines = [f"{attr}: {', '.join(vals)}" for attr, vals in mandatory_attrs.items()]
+#         allowed_text = "\n".join(allowed_lines)
+#         allowed_sources = list(source_map.keys()) + ["title", "description", "inferred"]
+#         source_hint = "|".join(allowed_sources)
+#         multiple_text = f"\nMULTIPLE ALLOWED FOR: {', '.join(multiple)}" if multiple else ""
+        
+#         print("Multiple text for attr: ")
+#         print(multiple_text)
+
+#         additional_instructions = """
+#         For the 'additional' section, identify any other important product attributes and their values (e.g., 'Color', 'Material', 'Weight' etc) that are present in the PRODUCT TEXT but not in the Mandatory Attribute list.
+#         For each additional attribute, use the best available value from the PRODUCT TEXT and specify the 'source'.
+#         """ if extract_additional else ""
+
+#         prompt = f"""
+# You are a product-attribute classifier and validator.
+# Understand the product text very deeply. If the same product is available somewhere online, use that knowledge to predict accurate attribute values.
+# Do not depend only on word-by-word matching from the product text - interpret the meaning and suggest attributes intelligently.
+# Pick the *closest meaning* value from the allowed list, even if not an exact word match.
+# I want values for all mandatory attributes.
+# If a value is not found anywhere, the source should be "inferred".
+# Note: Source means from where you have concluded the result. Choose one of these value <{source_hint}>
+
+# ALLOWED VALUES (MANDATORY):
+# {allowed_text}
+
+# Note: "Strictly" return multiple values for these attributes: {multiple_text}. These values must be most possible values from the list and should be max 2 values. 
+
+# {user_values_section}
+
+# {additional_instructions}
+
+# PRODUCT TEXT:
+# {product_text}
+
+# OUTPUT (strict JSON only):
+# {{
+#   "mandatory": {{
+#     "<attr>": [{{
+#       "value": "<chosen_value>", 
+#       "source": "<{source_hint}>",
+#       "reason": "Explanation of why this value was chosen. If user provided a value, explain why you agreed/disagreed with it.",
+#       "original_value": "<user_entered_value_if_provided>",
+#       "decision": "accepted|rejected"
+#     }}]
+#   }},
+#   "additional": {{
+#     "Additional_Attr_1": [{{
+#       "value": "Value 1", 
+#       "source": "<{source_hint}>",
+#       "reason": "Why this attribute and value were identified"
+#     }}]
+#   }}
+# }}
+
+# RULES:
+# - For each mandatory attribute with a user-entered value, include "original_value" and "decision" fields
+# - "decision" values: "accepted" (used user's value), "rejected" (used different value), "not_provided" (no user value given)
+# - "reason" must explain your choice, especially when rejecting user input
+# - For 'additional' attributes: Strictly Extract other key attributes other than mandatory attributes from the text. 
+# - For 'multiple' attributes, always give multiple value for those attribues, choose wisely and max 2 multiple attribute that are very close. 
+# - Source must be one of: {source_hint}
+# - Be honest and specific in your reasoning.
+# - Return ONLY valid JSON
+# """
+
+#         payload = {
+#             "model": model,
+#             "messages": [
+#                 {"role": "system", "content": "You are a JSON-only extractor and validator. Always provide clear reasoning for your decisions."},
+#                 {"role": "user", "content": prompt},
+#             ],
+#             "temperature": 0.3,
+#             "max_tokens": 2000,  # Increased for reasoning
+#         }
+
+#         try:
+#             raw = ProductAttributeService._call_llm(payload)
+#             logger.info("Raw LLM response received")
+#             print(raw)
+#             cleaned = ProductAttributeService._clean_json(raw)
+#             parsed = json.loads(cleaned)
+#         except Exception as exc:
+#             logger.error(f"LLM failed: {exc}")
+#             return {
+#                 "mandatory": {
+#                     a: [{
+#                         "value": "Not Specified", 
+#                         "source": "llm_error",
+#                         "reason": f"LLM processing failed: {str(exc)}"
+#                     }] for a in mandatory_attrs
+#                 },
+#                 "additional": {} if not extract_additional else {},
+#                 "error": str(exc)
+#             }
+
+#         if use_cache and cache_key:
+#             SimpleCache.set(cache_key, parsed)
+#             logger.info(f"CACHE SET {cache_key[:16]}...")
+
+#         return parsed
+
+#     @staticmethod
+#     def get_cache_stats() -> Dict:
+#         return {
+#             "global_enabled": is_caching_enabled(),
+#             "result_cache": SimpleCache.get_stats(),
+#         }
+
+#     @staticmethod
+#     def clear_all_caches():
+#         SimpleCache.clear()
+#         logger.info("All caches cleared")
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
 import json
 import hashlib
 import logging
@@ -10,6 +361,8 @@ from typing import Dict, List, Optional, Tuple
 import requests
 from django.conf import settings
 
+from .llm_load_balancer import call_llm_with_load_balancer
+
 from .cache_config import (
     is_caching_enabled,
     ENABLE_ATTRIBUTE_EXTRACTION_CACHE,
@@ -139,13 +492,29 @@ class ProductAttributeService:
                 formatted[key] = [{"value": str(value), "source": "image"}]
         return formatted
 
+    # @staticmethod
+    # @retry(max_attempts=3, delay=1.0)
+    # def _call_llm(payload: dict) -> str:
+    #     headers = {"Authorization": f"Bearer {settings.GROQ_API_KEY}", "Content-Type": "application/json"}
+    #     resp = requests.post(settings.GROQ_API_URL, headers=headers, json=payload, timeout=30)
+    #     resp.raise_for_status()
+    #     return resp.json()["choices"][0]["message"]["content"]
+
+
+    # At the top of services.py, add this import
+    # from . import call_llm_with_load_balancer, get_load_balancer_stats
+
+    # Replace the existing _call_llm method with this:
     @staticmethod
-    @retry(max_attempts=3, delay=1.0)
+    @retry(max_attempts=3, delay=3.0)
     def _call_llm(payload: dict) -> str:
-        headers = {"Authorization": f"Bearer {settings.GROQ_API_KEY}", "Content-Type": "application/json"}
-        resp = requests.post(settings.GROQ_API_URL, headers=headers, json=payload, timeout=30)
-        resp.raise_for_status()
-        return resp.json()["choices"][0]["message"]["content"]
+        """
+        Call LLM using load balancer with multiple API keys
+        Automatically handles rate limiting and failover
+        """
+        return call_llm_with_load_balancer(payload)
+
+
 
     @staticmethod
     def extract_attributes(
@@ -201,14 +570,27 @@ IMPORTANT INSTRUCTIONS FOR USER VALUES:
         allowed_sources = list(source_map.keys()) + ["title", "description", "inferred"]
         source_hint = "|".join(allowed_sources)
         multiple_text = f"\nMULTIPLE ALLOWED FOR: {', '.join(multiple)}" if multiple else ""
-        
-        print("Multiple text for attr: ")
-        print(multiple_text)
 
-        additional_instructions = """
-        For the 'additional' section, identify any other important product attributes and their values (e.g., 'Color', 'Material', 'Weight' etc) that are present in the PRODUCT TEXT but not in the Mandatory Attribute list.
-        For each additional attribute, use the best available value from the PRODUCT TEXT and specify the 'source'.
-        """ if extract_additional else ""
+        if extract_additional:
+            additional_instructions = """
+For the 'additional' section, identify any other important product attributes and their values (e.g., 'Color', 'Material', 'Weight' etc according to the product text) that are present in the PRODUCT TEXT but not in the Mandatory Attribute list.
+For each additional attribute, use the best available value from the PRODUCT TEXT and specify the 'source'.
+Strictly Extract other key attributes other than mandatory attributes from the text.
+"""
+            output_example_additional = """
+  "additional": {
+    "Additional_Attr_1": [{
+      "value": "Value 1", 
+      "source": "<{source_hint}>",
+      "reason": "Why this attribute and value were identified"
+    }]
+  }
+"""
+        else:
+            additional_instructions = """
+Do not identify or include any additional attributes. The 'additional' section must be an empty object {}.
+"""
+            output_example_additional = '  "additional": {}'
 
         prompt = f"""
 You are a product-attribute classifier and validator.
@@ -222,7 +604,7 @@ Note: Source means from where you have concluded the result. Choose one of these
 ALLOWED VALUES (MANDATORY):
 {allowed_text}
 
-Note: Always return multiple values for these attributes: {multiple_text}. These values must be most possible values from the list and should be max 2 values. 
+Note: "Strictly" return multiple values for these attributes: {multiple_text}. These values must be most possible values from the list and should be max 2 values. 
 
 {user_values_section}
 
@@ -239,24 +621,17 @@ OUTPUT (strict JSON only):
       "source": "<{source_hint}>",
       "reason": "Explanation of why this value was chosen. If user provided a value, explain why you agreed/disagreed with it.",
       "original_value": "<user_entered_value_if_provided>",
-      "decision": "accepted|rejected"
+      "decision": "accepted|rejected|not_provided"
     }}]
   }},
-  "additional": {{
-    "Additional_Attr_1": [{{
-      "value": "Value 1", 
-      "source": "<{source_hint}>",
-      "reason": "Why this attribute and value were identified"
-    }}]
-  }}
+{output_example_additional}
 }}
 
 RULES:
 - For each mandatory attribute with a user-entered value, include "original_value" and "decision" fields
 - "decision" values: "accepted" (used user's value), "rejected" (used different value), "not_provided" (no user value given)
 - "reason" must explain your choice, especially when rejecting user input
-- For 'additional' attributes: Strictly Extract other key attributes other than mandatory attributes from the text. 
-- For 'multiple' attributes, always give multiple value for those attribues, choose wisely and max 2 multiple attribute that are very close. 
+- For 'multiple' attributes, always give multiple values for those attributes, choose wisely and max 2 values per attribute that are very close. 
 - Source must be one of: {source_hint}
 - Be honest and specific in your reasoning.
 - Return ONLY valid JSON
@@ -275,7 +650,6 @@ RULES:
         try:
             raw = ProductAttributeService._call_llm(payload)
             logger.info("Raw LLM response received")
-            print(raw)
             cleaned = ProductAttributeService._clean_json(raw)
             parsed = json.loads(cleaned)
         except Exception as exc:
@@ -316,14 +690,3 @@ RULES:
 
 
 
-
-
-
-
-
-
-
-
-
-
-

+ 289 - 14
attr_extraction/views.py

@@ -597,10 +597,10 @@ class BatchExtractProductAttributesView(APIView):
                 }, False
 
         # ==================== OPTIMIZATION 4: Parallel Execution ====================
-        max_workers = min(3, len(product_list))  # ← CHANGED FROM 10
-
-        logger.info(f"Using {max_workers} parallel workers")
-
+        max_workers = min(1 if process_image else 1, len(product_list))
+        
+        logger.info(f"Using {max_workers} parallel workers")
+        
         with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
             future_to_product = {
                 executor.submit(process_single_product, product): product
@@ -660,6 +660,281 @@ class BatchExtractProductAttributesView(APIView):
 
 
 
+
+
+
+
+
+# # ==================== views.py (OPTIMIZED FOR 4 API KEYS) ====================
+# import concurrent.futures
+# import logging
+# import time
+# from rest_framework.views import APIView
+# from rest_framework.response import Response
+# from rest_framework import status
+
+# from .models import Product, ProductAttributeValue
+# from .serializers import BatchProductRequestSerializer, BatchProductResponseSerializer
+# from .services import ProductAttributeService
+
+# logger = logging.getLogger(__name__)
+
+
+# class BatchExtractProductAttributesView(APIView):
+#     """
+#     ⚡ PERFORMANCE OPTIMIZED: Batch extraction with 4-key load balancer
+#     Expected performance with 4 keys: 10 products in 20-35 seconds (with image processing)
+#     NOW WITH USER VALUE REASONING + MULTI-KEY SUPPORT
+#     """
+
+#     def post(self, request):
+#         start_time = time.time()
+
+#         serializer = BatchProductRequestSerializer(data=request.data)
+#         if not serializer.is_valid():
+#             return Response({"error": serializer.errors}, status=status.HTTP_400_BAD_REQUEST)
+
+#         validated_data = serializer.validated_data
+#         product_list = validated_data.get("products", [])
+        
+#         logger.info(f"🚀 Starting batch processing for {len(product_list)} products")
+        
+#         # ==================== OPTIMIZATION 1: Bulk DB Query ====================
+#         item_ids = [p['item_id'] for p in product_list]
+#         products_queryset = Product.objects.filter(
+#             item_id__in=item_ids
+#         ).prefetch_related('attribute_values')
+        
+#         product_map = {product.item_id: product for product in products_queryset}
+        
+#         # Prefetch ALL original attribute values in ONE query
+#         original_values_qs = ProductAttributeValue.objects.filter(
+#             product__item_id__in=item_ids
+#         ).select_related('product')
+        
+#         original_values_map = {}
+#         for attr_val in original_values_qs:
+#             item_id = attr_val.product.item_id
+#             if item_id not in original_values_map:
+#                 original_values_map[item_id] = {}
+#             original_values_map[item_id][attr_val.attribute_name] = attr_val.original_value
+        
+#         logger.info(f"✓ Loaded {len(product_map)} products from database")
+#         logger.info(f"✓ Loaded user values for {len(original_values_map)} products")
+        
+#         # Extract settings
+#         model = validated_data.get("model")
+#         extract_additional = validated_data.get("extract_additional", True)
+#         process_image = validated_data.get("process_image", True)
+#         multiple = validated_data.get("multiple", [])
+#         threshold_abs = validated_data.get("threshold_abs", 0.65)
+#         margin = validated_data.get("margin", 0.15)
+#         use_dynamic_thresholds = validated_data.get("use_dynamic_thresholds", False)
+#         use_adaptive_margin = validated_data.get("use_adaptive_margin", False)
+#         use_semantic_clustering = validated_data.get("use_semantic_clustering", False)
+        
+#         results = []
+#         successful = 0
+#         failed = 0
+        
+#         # ==================== OPTIMIZATION 2: Conditional Service Init ====================
+#         ocr_service = None
+#         visual_service = None
+        
+#         if process_image:
+#             try:
+#                 from .ocr_service import OCRService
+#                 from .visual_processing_service import VisualProcessingService
+#                 ocr_service = OCRService()
+#                 visual_service = VisualProcessingService()
+#                 logger.info("✓ Image processing services initialized")
+#             except ImportError as e:
+#                 logger.warning(f"⚠️ Image processing services not available: {e}")
+#                 process_image = False
+
+#         # ==================== OPTIMIZATION 3: Smart Parallelization with 4 Keys ====================
+#         def process_single_product(product_entry):
+#             """Process a single product (runs in parallel)"""
+#             product_start = time.time()
+            
+#             item_id = product_entry['item_id']
+#             mandatory_attrs = product_entry['mandatory_attrs']
+
+#             if item_id not in product_map:
+#                 return {
+#                     "product_id": item_id,
+#                     "error": "Product not found in database"
+#                 }, False
+
+#             product = product_map[item_id]
+            
+#             try:
+#                 title = product.product_name
+#                 short_desc = product.product_short_description
+#                 long_desc = product.product_long_description
+#                 image_url = product.image_path
+                
+#                 ocr_results = None
+#                 ocr_text = None
+#                 visual_results = None
+
+#                 # ⚡ IMAGE PROCESSING (if enabled)
+#                 if process_image and image_url:
+#                     if ocr_service:
+#                         try:
+#                             ocr_results = ocr_service.process_image(image_url)
+                            
+#                             if ocr_results and ocr_results.get("detected_text"):
+#                                 ocr_attrs = ProductAttributeService.extract_attributes_from_ocr(
+#                                     ocr_results, model
+#                                 )
+#                                 ocr_results["extracted_attributes"] = ocr_attrs
+#                                 ocr_text = "\n".join([
+#                                     f"{item['text']} (confidence: {item['confidence']:.2f})"
+#                                     for item in ocr_results["detected_text"]
+#                                 ])
+#                         except Exception as e:
+#                             logger.warning(f"OCR failed for {item_id}: {e}")
+                    
+#                     if visual_service:
+#                         try:
+#                             product_type_hint = product.product_type if hasattr(product, 'product_type') else None
+#                             visual_results = visual_service.process_image(image_url, product_type_hint)
+                            
+#                             if visual_results and visual_results.get('visual_attributes'):
+#                                 visual_results['visual_attributes'] = ProductAttributeService.format_visual_attributes(
+#                                     visual_results['visual_attributes']
+#                                 )
+#                         except Exception as e:
+#                             logger.warning(f"Visual processing failed for {item_id}: {e}")
+
+#                 # Combine product text with source tracking
+#                 product_text, source_map = ProductAttributeService.combine_product_text(
+#                     title=title,
+#                     short_desc=short_desc,
+#                     long_desc=long_desc,
+#                     ocr_text=ocr_text
+#                 )
+
+#                 # 🆕 GET USER-ENTERED VALUES FOR THIS PRODUCT
+#                 user_entered_values = original_values_map.get(item_id, {})
+#                 if user_entered_values:
+#                     logger.debug(f"Processing {item_id} with {len(user_entered_values)} user-entered values")
+
+#                 # ⚡ EXTRACT ATTRIBUTES WITH MULTI-KEY LOAD BALANCER
+#                 extracted = ProductAttributeService.extract_attributes(
+#                     product_text=product_text,
+#                     mandatory_attrs=mandatory_attrs,
+#                     source_map=source_map,
+#                     model=model,
+#                     extract_additional=extract_additional,
+#                     multiple=multiple,
+#                     use_cache=True,
+#                     user_entered_values=user_entered_values
+#                 )
+                
+#                 result = {
+#                     "product_id": product.item_id,
+#                     "mandatory": extracted.get("mandatory", {}),
+#                     "additional": extracted.get("additional", {}),
+#                 }
+
+#                 if ocr_results:
+#                     result["ocr_results"] = ocr_results
+                
+#                 if visual_results:
+#                     result["visual_results"] = visual_results
+                
+#                 processing_time = time.time() - product_start
+#                 logger.info(f"✓ Processed {item_id} in {processing_time:.2f}s")
+
+#                 return result, True
+
+#             except Exception as e:
+#                 logger.error(f"❌ Error processing {item_id}: {str(e)}", exc_info=True)
+#                 return {
+#                     "product_id": item_id,
+#                     "error": str(e)
+#                 }, False
+
+#         # ==================== OPTIMIZATION 4: Parallel Execution with 4 Keys ====================
+#         # With 4 API keys, we can safely run more workers
+#         # Image processing: 4-6 workers (I/O bound)
+#         # No image processing: 8-12 workers (CPU bound)
+#         if process_image:
+#             max_workers = min(3, len(product_list))
+#         else:
+#             max_workers = min(5, len(product_list))
+        
+#         logger.info(f"⚡ Using {max_workers} parallel workers with 4-key load balancer")
+        
+#         with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
+#             future_to_product = {
+#                 executor.submit(process_single_product, product): product
+#                 for product in product_list
+#             }
+            
+#             for future in concurrent.futures.as_completed(future_to_product):
+#                 try:
+#                     result, success = future.result()
+#                     results.append(result)
+#                     if success:
+#                         successful += 1
+#                     else:
+#                         failed += 1
+#                 except Exception as e:
+#                     failed += 1
+#                     logger.error(f"❌ Future execution error: {str(e)}")
+#                     results.append({
+#                         "product_id": "unknown",
+#                         "error": str(e)
+#                     })
+
+#         total_time = time.time() - start_time
+        
+#         # Get comprehensive statistics
+#         cache_stats = ProductAttributeService.get_cache_stats()
+        
+#         logger.info(f"""
+# ╔═══════════════════════════════════════════════════════════╗
+# ║           🎉 BATCH PROCESSING COMPLETE                    ║
+# ╠═══════════════════════════════════════════════════════════╣
+# ║  Total products:        {len(product_list):<3}                            ║
+# ║  Successful:            {successful:<3}                            ║
+# ║  Failed:                {failed:<3}                            ║
+# ║  Total time:            {total_time:.2f}s                        ║
+# ║  Avg time/product:      {total_time/len(product_list):.2f}s                        ║
+# ║  Workers used:          {max_workers:<2}                             ║
+# ║  API keys available:    {cache_stats['load_balancer'].get('available_keys', 0)}/{cache_stats['load_balancer'].get('total_keys', 0)}                        ║
+# ╚═══════════════════════════════════════════════════════════╝
+#         """)
+
+#         batch_result = {
+#             "results": results,
+#             "total_products": len(product_list),
+#             "successful": successful,
+#             "failed": failed,
+#             "performance": {
+#                 "total_time_seconds": round(total_time, 2),
+#                 "avg_time_per_product": round(total_time / len(product_list), 2),
+#                 "workers_used": max_workers,
+#                 "throughput_products_per_second": round(len(product_list) / total_time, 2)
+#             },
+#             "cache_stats": cache_stats
+#         }
+
+#         response_serializer = BatchProductResponseSerializer(data=batch_result)
+#         if response_serializer.is_valid():
+#             return Response(response_serializer.data, status=status.HTTP_200_OK)
+
+#         return Response(batch_result, status=status.HTTP_200_OK)
+
+
+
+
+
+
+
 class ProductListView(APIView):
     """
     GET API to list all products with details
@@ -1166,17 +1441,17 @@ class ProductUploadExcelView(APIView):
 
             # Start background thread for Excel generation if upload was successful
             if products_failed == 0 and attributes_failed == 0:
-                logger.info("API call successful. Triggering background Excel generation thread.")
-                threading.Thread(target=generate_product_excel_background, daemon=True).start()
+                logger.info("API call successful. Triggering background Excel generation thread is commented for now !!!!.")
+                # threading.Thread(target=generate_product_excel_background, daemon=True).start()
                 
-                # FIX: Update monitoring URLs to point to the new generated_outputs subfolder
-                response_data['generated_excel_status'] = 'Background Excel generation triggered successfully.'
-                response_data['monitoring'] = {
-                     'excel_file': os.path.join(OUTPUT_URL, EXCEL_FILE_NAME),
-                     'status_file': os.path.join(OUTPUT_URL, STATUS_FILE_NAME),
-                     'log_file': os.path.join(OUTPUT_URL, LOG_FILE_NAME),
-                     'note': 'These files will be available once the background process completes.'
-                }
+                ## FIX: Update monitoring URLs to point to the new generated_outputs subfolder
+                # response_data['generated_excel_status'] = 'Background Excel generation triggered successfully.'
+                # response_data['monitoring'] = {
+                #      'excel_file': os.path.join(OUTPUT_URL, EXCEL_FILE_NAME),
+                #      'status_file': os.path.join(OUTPUT_URL, STATUS_FILE_NAME),
+                #      'log_file': os.path.join(OUTPUT_URL, LOG_FILE_NAME),
+                #      'note': 'These files will be available once the background process completes.'
+                # }
             else:
                  logger.warning(f"API call finished with errors ({products_failed} products, {attributes_failed} attributes). Not triggering background excel generation.")
                  response_data['generated_excel_status'] = 'Background Excel generation was NOT triggered due to upload errors. Fix upload errors and re-upload.'

TEMPAT SAMPAH
content_quality_tool/__pycache__/settings.cpython-313.pyc


+ 112 - 0
content_quality_tool/settings.py

@@ -164,3 +164,115 @@ LOGGING = {
         },
     },
 }
+
+
+
+
+GROQ_API_KEY2 = "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"
+
+GROQ_API_KEY3 = "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"
+
+GROQ_API_KEY4 = "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"
+
+
+
+GROQ_API_KEYS = [
+    {
+        'key': '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',
+        'name': 'groq_key_1'
+    },
+    {
+        'key': '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',
+        'name': 'groq_key_2'
+    },
+    {
+        'key': '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',
+        'name': 'groq_key_3'
+    },
+    {
+        'key': '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',
+        'name': 'groq_key_4'
+    },
+]
+
+
+
+LLM_LOAD_BALANCER_STRATEGY = 'round_robin'
+
+# ==================== PERFORMANCE TUNING ====================
+
+# Per-key rate limiting (requests per minute)
+# Groq free tier typically allows 30 RPM per key
+# Adjust based on your plan: Free=30, Pay-as-you-go=60+
+GROQ_PER_KEY_RPM = 30
+
+# Thread pool configuration
+# With 4 keys and image processing
+MAX_WORKERS_WITH_IMAGES = 6
+# With 4 keys and no image processing
+MAX_WORKERS_NO_IMAGES = 12
+
+
+
+
+
+# ==================== MONITORING & DEBUGGING ====================
+
+# Enable detailed debugging for load balancer
+DEBUG_LOAD_BALANCER = False  # Set to True for troubleshooting
+
+# Track API usage statistics
+ENABLE_API_USAGE_TRACKING = True
+
+# Alert thresholds
+ALERT_FAILURE_RATE_THRESHOLD = 0.1  # Alert if >10% requests fail
+ALERT_KEY_UNAVAILABLE_THRESHOLD = 2  # Alert if >2 keys unavailable
+
+
+
+# # ==================== LOGGING CONFIGURATION ====================
+
+# LOGGING = {
+#     'version': 1,
+#     'disable_existing_loggers': False,
+#     'formatters': {
+#         'verbose': {
+#             'format': '{levelname} {asctime} {module} {message}',
+#             'style': '{',
+#         },
+#         'simple': {
+#             'format': '{levelname} {message}',
+#             'style': '{',
+#         },
+#     },
+#     'handlers': {
+#         'console': {
+#             'class': 'logging.StreamHandler',
+#             'formatter': 'verbose',
+#         },
+#         'file': {
+#             'class': 'logging.handlers.RotatingFileHandler',
+#             'filename': 'logs/product_attributes.log',
+#             'maxBytes': 1024 * 1024 * 10,  # 10MB
+#             'backupCount': 5,
+#             'formatter': 'verbose',
+#         },
+#     },
+#     'loggers': {
+#         'attr_extraction.services': {
+#             'handlers': ['console', 'file'],
+#             'level': 'INFO',
+#             'propagate': False,
+#         },
+#         'attr_extraction.llm_load_balancer': {
+#             'handlers': ['console', 'file'],
+#             'level': 'INFO',
+#             'propagate': False,
+#         },
+#         'attr_extraction.views': {
+#             'handlers': ['console', 'file'],
+#             'level': 'INFO',
+#             'propagate': False,
+#         },
+#     },
+# }

TEMPAT SAMPAH
db.sqlite3