# tasks.py from celery import shared_task from django.core.cache import cache from .models import Product, ProductAttribute from .services.attribute_extractor import HybridAttributeExtractor import json import hashlib @shared_task(bind=True, max_retries=3) def extract_product_attributes(self, product_id: int): """ Celery task to extract attributes from a product """ try: product = Product.objects.get(id=product_id) # Check cache first cache_key = f"product_attrs_{product.id}_{product.updated_at.timestamp()}" cached_attrs = cache.get(cache_key) if cached_attrs: return cached_attrs # Prepare product data product_data = { 'title': product.title, 'description': product.description, 'short_description': product.short_description, 'images': [img.image.path for img in product.images.all()] } # Extract attributes extractor = HybridAttributeExtractor( anthropic_api_key=settings.ANTHROPIC_API_KEY ) attributes = extractor.extract_attributes(product_data) # Save to database for attr_name, attr_value in attributes.items(): if not attr_name.endswith('_confidence'): confidence = attributes.get(f'{attr_name}_confidence', 0.5) ProductAttribute.objects.update_or_create( product=product, attribute_name=attr_name, defaults={ 'attribute_value': json.dumps(attr_value) if isinstance(attr_value, (list, dict)) else str(attr_value), 'confidence_score': confidence, 'extraction_method': 'hybrid' } ) # Cache for 24 hours cache.set(cache_key, attributes, 86400) # Update product status product.attributes_extracted = True product.save() return attributes except Product.DoesNotExist: return {'error': 'Product not found'} except Exception as e: # Retry with exponential backoff raise self.retry(exc=e, countdown=60 * (2 ** self.request.retries)) @shared_task def batch_extract_attributes(product_ids: list): """ Process multiple products in batch """ results = {} for product_id in product_ids: result = extract_product_attributes.delay(product_id) results[product_id] = result.id return results