tasks.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. # tasks.py
  2. from celery import shared_task
  3. from django.core.cache import cache
  4. from .models import Product, ProductAttribute
  5. from .services.attribute_extractor import HybridAttributeExtractor
  6. import json
  7. import hashlib
  8. @shared_task(bind=True, max_retries=3)
  9. def extract_product_attributes(self, product_id: int):
  10. """
  11. Celery task to extract attributes from a product
  12. """
  13. try:
  14. product = Product.objects.get(id=product_id)
  15. # Check cache first
  16. cache_key = f"product_attrs_{product.id}_{product.updated_at.timestamp()}"
  17. cached_attrs = cache.get(cache_key)
  18. if cached_attrs:
  19. return cached_attrs
  20. # Prepare product data
  21. product_data = {
  22. 'title': product.title,
  23. 'description': product.description,
  24. 'short_description': product.short_description,
  25. 'images': [img.image.path for img in product.images.all()]
  26. }
  27. # Extract attributes
  28. extractor = HybridAttributeExtractor(
  29. anthropic_api_key=settings.ANTHROPIC_API_KEY
  30. )
  31. attributes = extractor.extract_attributes(product_data)
  32. # Save to database
  33. for attr_name, attr_value in attributes.items():
  34. if not attr_name.endswith('_confidence'):
  35. confidence = attributes.get(f'{attr_name}_confidence', 0.5)
  36. ProductAttribute.objects.update_or_create(
  37. product=product,
  38. attribute_name=attr_name,
  39. defaults={
  40. 'attribute_value': json.dumps(attr_value) if isinstance(attr_value, (list, dict)) else str(attr_value),
  41. 'confidence_score': confidence,
  42. 'extraction_method': 'hybrid'
  43. }
  44. )
  45. # Cache for 24 hours
  46. cache.set(cache_key, attributes, 86400)
  47. # Update product status
  48. product.attributes_extracted = True
  49. product.save()
  50. return attributes
  51. except Product.DoesNotExist:
  52. return {'error': 'Product not found'}
  53. except Exception as e:
  54. # Retry with exponential backoff
  55. raise self.retry(exc=e, countdown=60 * (2 ** self.request.retries))
  56. @shared_task
  57. def batch_extract_attributes(product_ids: list):
  58. """
  59. Process multiple products in batch
  60. """
  61. results = {}
  62. for product_id in product_ids:
  63. result = extract_product_attributes.delay(product_id)
  64. results[product_id] = result.id
  65. return results