# ==================== Cleaned & Optimized Imports ==================== import os import io import json import logging import threading from datetime import datetime import pandas as pd import concurrent.futures from django.conf import settings from django.http import HttpResponse from django.db import transaction from django.db.models import Prefetch from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import status from rest_framework.parsers import MultiPartParser, FormParser from openpyxl import Workbook from openpyxl.styles import Font, PatternFill, Alignment from rest_framework.views import APIView from . import cache_config # --- Local imports --- from .models import ( Product, ProductType, ProductAttribute, ProductAttributeValue, AttributePossibleValue, ) from .serializers import ( ProductSerializer, ProductTypeSerializer, ProductAttributeSerializer, AttributePossibleValueSerializer, SingleProductRequestSerializer, BatchProductRequestSerializer, ProductAttributeResultSerializer, BatchProductResponseSerializer, ProductAttributeValueSerializer, ProductAttributeValueInputSerializer, BulkProductAttributeValueSerializer, ProductWithAttributesSerializer, ) from .services import ProductAttributeService from .ocr_service import OCRService from .visual_processing_service import VisualProcessingService # --- Configuration for Generated Outputs Folder --- OUTPUT_FOLDER_NAME = 'generated_outputs' OUTPUT_ROOT = os.path.join(settings.MEDIA_ROOT, OUTPUT_FOLDER_NAME) OUTPUT_URL = os.path.join(settings.MEDIA_URL, OUTPUT_FOLDER_NAME).replace('\\', '/') # Ensure forward slashes LOG_FILE_NAME = 'excel_generation.log' STATUS_FILE_NAME = 'excel_generation_status.json' EXCEL_FILE_NAME = 'generated_products.xlsx' LOG_FILE_PATH = os.path.join(OUTPUT_ROOT, LOG_FILE_NAME) STATUS_FILE_PATH = os.path.join(OUTPUT_ROOT, STATUS_FILE_NAME) EXCEL_FILE_PATH = os.path.join(OUTPUT_ROOT, EXCEL_FILE_NAME) # Ensure the output folder exists if not os.path.exists(OUTPUT_ROOT): os.makedirs(OUTPUT_ROOT) # Configure logging logging.basicConfig( filename=LOG_FILE_PATH, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class ExtractProductAttributesView(APIView): """ API endpoint to extract product attributes for a single product by item_id. Fetches product details from database with source tracking. Returns attributes in array format: [{"value": "...", "source": "..."}] Includes OCR and Visual Processing results. """ def post(self, request): serializer = SingleProductRequestSerializer(data=request.data) if not serializer.is_valid(): return Response({"error": serializer.errors}, status=status.HTTP_400_BAD_REQUEST) validated_data = serializer.validated_data item_id = validated_data.get("item_id") # Fetch product from DB try: product = Product.objects.get(item_id=item_id) except Product.DoesNotExist: return Response( {"error": f"Product with item_id '{item_id}' not found."}, status=status.HTTP_404_NOT_FOUND ) # Extract product details title = product.product_name short_desc = product.product_short_description long_desc = product.product_long_description image_url = product.image_path # Process image for OCR if required ocr_results = None ocr_text = None visual_results = None if validated_data.get("process_image", True) and image_url: # OCR Processing ocr_service = OCRService() ocr_results = ocr_service.process_image(image_url) if ocr_results and ocr_results.get("detected_text"): ocr_attrs = ProductAttributeService.extract_attributes_from_ocr( ocr_results, validated_data.get("model") ) ocr_results["extracted_attributes"] = ocr_attrs ocr_text = "\n".join([ f"{item['text']} (confidence: {item['confidence']:.2f})" for item in ocr_results["detected_text"] ]) # Visual Processing visual_service = VisualProcessingService() product_type_hint = product.product_type if hasattr(product, 'product_type') else None visual_results = visual_service.process_image(image_url, product_type_hint) # Combine all product text with source tracking product_text, source_map = ProductAttributeService.combine_product_text( title=title, short_desc=short_desc, long_desc=long_desc, ocr_text=ocr_text ) # Extract attributes with enhanced features and source tracking result = ProductAttributeService.extract_attributes( product_text=product_text, mandatory_attrs=validated_data["mandatory_attrs"], source_map=source_map, model=validated_data.get("model"), extract_additional=validated_data.get("extract_additional", True), multiple=validated_data.get("multiple", []), threshold_abs=validated_data.get("threshold_abs", 0.65), margin=validated_data.get("margin", 0.15), use_dynamic_thresholds=validated_data.get("use_dynamic_thresholds", True), use_adaptive_margin=validated_data.get("use_adaptive_margin", True), use_semantic_clustering=validated_data.get("use_semantic_clustering", True) ) # Attach OCR results if available if ocr_results: result["ocr_results"] = ocr_results # Attach Visual Processing results if available if visual_results: result["visual_results"] = visual_results response_serializer = ProductAttributeResultSerializer(data=result) if response_serializer.is_valid(): return Response(response_serializer.data, status=status.HTTP_200_OK) return Response(result, status=status.HTTP_200_OK) # ==================== OPTIMIZED BATCH VIEW ==================== # class BatchExtractProductAttributesView(APIView): # """ # โšก PERFORMANCE OPTIMIZED: Batch extraction with intelligent parallelization # Expected performance: 10 products in 30-60 seconds (with image processing) # """ # def post(self, request): # import time # start_time = time.time() # serializer = BatchProductRequestSerializer(data=request.data) # if not serializer.is_valid(): # return Response({"error": serializer.errors}, status=status.HTTP_400_BAD_REQUEST) # validated_data = serializer.validated_data # product_list = validated_data.get("products", []) # logger.info(f"๐Ÿš€ Starting batch processing for {len(product_list)} products") # # ==================== OPTIMIZATION 1: Bulk DB Query ==================== # item_ids = [p['item_id'] for p in product_list] # products_queryset = Product.objects.filter( # item_id__in=item_ids # ).prefetch_related('attribute_values') # product_map = {product.item_id: product for product in products_queryset} # # Prefetch ALL original attribute values in ONE query # original_values_qs = ProductAttributeValue.objects.filter( # product__item_id__in=item_ids # ).select_related('product') # original_values_map = {} # for attr_val in original_values_qs: # item_id = attr_val.product.item_id # if item_id not in original_values_map: # original_values_map[item_id] = {} # original_values_map[item_id][attr_val.attribute_name] = attr_val.original_value # logger.info(f"โœ“ Loaded {len(product_map)} products from database") # # Extract settings # model = validated_data.get("model") # extract_additional = validated_data.get("extract_additional", True) # process_image = validated_data.get("process_image", True) # multiple = validated_data.get("multiple", []) # threshold_abs = validated_data.get("threshold_abs", 0.65) # margin = validated_data.get("margin", 0.15) # use_dynamic_thresholds = validated_data.get("use_dynamic_thresholds", False) # use_adaptive_margin = validated_data.get("use_adaptive_margin", False) # use_semantic_clustering = validated_data.get("use_semantic_clustering", False) # results = [] # successful = 0 # failed = 0 # # ==================== OPTIMIZATION 2: Conditional Service Init ==================== # # Only initialize if processing images # ocr_service = None # visual_service = None # if process_image: # from .ocr_service import OCRService # from .visual_processing_service import VisualProcessingService # ocr_service = OCRService() # visual_service = VisualProcessingService() # logger.info("โœ“ Image processing services initialized") # # ==================== OPTIMIZATION 3: Smart Parallelization ==================== # def process_single_product(product_entry): # """Process a single product (runs in parallel)""" # import time # product_start = time.time() # item_id = product_entry['item_id'] # mandatory_attrs = product_entry['mandatory_attrs'] # if item_id not in product_map: # return { # "product_id": item_id, # "error": "Product not found in database" # }, False # product = product_map[item_id] # try: # title = product.product_name # short_desc = product.product_short_description # long_desc = product.product_long_description # image_url = product.image_path # ocr_results = None # ocr_text = None # visual_results = None # # โšก SKIP IMAGE PROCESSING IF DISABLED (HUGE TIME SAVER) # if process_image and image_url: # if ocr_service: # ocr_results = ocr_service.process_image(image_url) # if ocr_results and ocr_results.get("detected_text"): # ocr_attrs = ProductAttributeService.extract_attributes_from_ocr( # ocr_results, model # ) # ocr_results["extracted_attributes"] = ocr_attrs # ocr_text = "\n".join([ # f"{item['text']} (confidence: {item['confidence']:.2f})" # for item in ocr_results["detected_text"] # ]) # if visual_service: # product_type_hint = product.product_type if hasattr(product, 'product_type') else None # visual_results = visual_service.process_image(image_url, product_type_hint) # if visual_results and visual_results.get('visual_attributes'): # visual_results['visual_attributes'] = ProductAttributeService.format_visual_attributes( # visual_results['visual_attributes'] # ) # # Combine product text with source tracking # product_text, source_map = ProductAttributeService.combine_product_text( # title=title, # short_desc=short_desc, # long_desc=long_desc, # ocr_text=ocr_text # ) # # โšก EXTRACT ATTRIBUTES WITH CACHING ENABLED # extracted = ProductAttributeService.extract_attributes( # product_text=product_text, # mandatory_attrs=mandatory_attrs, # source_map=source_map, # model=model, # extract_additional=extract_additional, # multiple=multiple, # # threshold_abs=threshold_abs, # # margin=margin, # # use_dynamic_thresholds=use_dynamic_thresholds, # # use_adaptive_margin=use_adaptive_margin, # # use_semantic_clustering=use_semantic_clustering, # use_cache=True # โšก CRITICAL: Enable caching # ) # # Add original values # original_attrs = original_values_map.get(item_id, {}) # for attr_name, attr_values in extracted.get("mandatory", {}).items(): # if isinstance(attr_values, list): # for attr_obj in attr_values: # if isinstance(attr_obj, dict): # attr_obj["original_value"] = original_attrs.get(attr_name, "") # for attr_name, attr_values in extracted.get("additional", {}).items(): # if isinstance(attr_values, list): # for attr_obj in attr_values: # if isinstance(attr_obj, dict): # attr_obj["original_value"] = original_attrs.get(attr_name, "") # result = { # "product_id": product.item_id, # "mandatory": extracted.get("mandatory", {}), # "additional": extracted.get("additional", {}), # } # if ocr_results: # result["ocr_results"] = ocr_results # if visual_results: # result["visual_results"] = visual_results # processing_time = time.time() - product_start # logger.info(f"โœ“ Processed {item_id} in {processing_time:.2f}s") # return result, True # except Exception as e: # logger.error(f"โŒ Error processing {item_id}: {str(e)}") # return { # "product_id": item_id, # "error": str(e) # }, False # # ==================== OPTIMIZATION 4: Parallel Execution ==================== # # Adjust workers based on whether image processing is enabled # max_workers = min(3 if process_image else 10, len(product_list)) # logger.info(f"โšก Using {max_workers} parallel workers") # with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: # # Submit all tasks # future_to_product = { # executor.submit(process_single_product, product): product # for product in product_list # } # # Collect results as they complete # for future in concurrent.futures.as_completed(future_to_product): # try: # result, success = future.result() # results.append(result) # if success: # successful += 1 # else: # failed += 1 # except Exception as e: # failed += 1 # logger.error(f"โŒ Future execution error: {str(e)}") # results.append({ # "product_id": "unknown", # "error": str(e) # }) # total_time = time.time() - start_time # # Get cache statistics # cache_stats = ProductAttributeService.get_cache_stats() # logger.info(f""" # ๐ŸŽ‰ BATCH PROCESSING COMPLETE # โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ” # Total products: {len(product_list)} # Successful: {successful} # Failed: {failed} # Total time: {total_time:.2f}s # Avg time/product: {total_time/len(product_list):.2f}s # โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ” # """) # batch_result = { # "results": results, # "total_products": len(product_list), # "successful": successful, # "failed": failed, # "performance": { # "total_time_seconds": round(total_time, 2), # "avg_time_per_product": round(total_time / len(product_list), 2), # "workers_used": max_workers # }, # "cache_stats": cache_stats # } # response_serializer = BatchProductResponseSerializer(data=batch_result) # if response_serializer.is_valid(): # return Response(response_serializer.data, status=status.HTTP_200_OK) # return Response(batch_result, status=status.HTTP_200_OK) # VERSION WITH PARALLELIZATION class BatchExtractProductAttributesView(APIView): """ โšก PERFORMANCE OPTIMIZED: Batch extraction with intelligent parallelization Expected performance: 10 products in 30-60 seconds (with image processing) NOW WITH USER VALUE REASONING """ def post(self, request): import time start_time = time.time() serializer = BatchProductRequestSerializer(data=request.data) if not serializer.is_valid(): return Response({"error": serializer.errors}, status=status.HTTP_400_BAD_REQUEST) validated_data = serializer.validated_data product_list = validated_data.get("products", []) logger.info(f"๐Ÿš€ Starting batch processing for {len(product_list)} products") # ==================== OPTIMIZATION 1: Bulk DB Query ==================== item_ids = [p['item_id'] for p in product_list] products_queryset = Product.objects.filter( item_id__in=item_ids ).prefetch_related('attribute_values') product_map = {product.item_id: product for product in products_queryset} # Prefetch ALL original attribute values in ONE query original_values_qs = ProductAttributeValue.objects.filter( product__item_id__in=item_ids ).select_related('product') original_values_map = {} for attr_val in original_values_qs: item_id = attr_val.product.item_id if item_id not in original_values_map: original_values_map[item_id] = {} original_values_map[item_id][attr_val.attribute_name] = attr_val.original_value logger.info(f"โœ“ Loaded {len(product_map)} products from database") logger.info(f"โœ“ Loaded user values for {len(original_values_map)} products") # Extract settings model = validated_data.get("model") extract_additional = validated_data.get("extract_additional", True) process_image = validated_data.get("process_image", True) multiple = validated_data.get("multiple", []) threshold_abs = validated_data.get("threshold_abs", 0.65) margin = validated_data.get("margin", 0.15) use_dynamic_thresholds = validated_data.get("use_dynamic_thresholds", False) use_adaptive_margin = validated_data.get("use_adaptive_margin", False) use_semantic_clustering = validated_data.get("use_semantic_clustering", False) results = [] successful = 0 failed = 0 # ==================== OPTIMIZATION 2: Conditional Service Init ==================== ocr_service = None visual_service = None if process_image: from .ocr_service import OCRService from .visual_processing_service import VisualProcessingService ocr_service = OCRService() visual_service = VisualProcessingService() logger.info("โœ“ Image processing services initialized") # ==================== OPTIMIZATION 3: Smart Parallelization ==================== def process_single_product(product_entry): """Process a single product (runs in parallel)""" import time product_start = time.time() item_id = product_entry['item_id'] mandatory_attrs = product_entry['mandatory_attrs'] if item_id not in product_map: return { "product_id": item_id, "error": "Product not found in database" }, False product = product_map[item_id] try: title = product.product_name short_desc = product.product_short_description long_desc = product.product_long_description image_url = product.image_path ocr_results = None ocr_text = None visual_results = None # โšก SKIP IMAGE PROCESSING IF DISABLED if process_image and image_url: if ocr_service: ocr_results = ocr_service.process_image(image_url) if ocr_results and ocr_results.get("detected_text"): ocr_attrs = ProductAttributeService.extract_attributes_from_ocr( ocr_results, model ) ocr_results["extracted_attributes"] = ocr_attrs ocr_text = "\n".join([ f"{item['text']} (confidence: {item['confidence']:.2f})" for item in ocr_results["detected_text"] ]) if visual_service: product_type_hint = product.product_type if hasattr(product, 'product_type') else None visual_results = visual_service.process_image(image_url, product_type_hint) if visual_results and visual_results.get('visual_attributes'): visual_results['visual_attributes'] = ProductAttributeService.format_visual_attributes( visual_results['visual_attributes'] ) # Combine product text with source tracking product_text, source_map = ProductAttributeService.combine_product_text( title=title, short_desc=short_desc, long_desc=long_desc, ocr_text=ocr_text ) # ๐Ÿ†• GET USER-ENTERED VALUES FOR THIS PRODUCT user_entered_values = original_values_map.get(item_id, {}) print("user entered values are ") print(user_entered_values) logger.info(f"Processing {item_id} with {len(user_entered_values)} user-entered values") # โšก EXTRACT ATTRIBUTES WITH USER VALUES AND REASONING extracted = ProductAttributeService.extract_attributes( product_text=product_text, mandatory_attrs=mandatory_attrs, source_map=source_map, model=model, extract_additional=extract_additional, multiple=multiple, use_cache=True, user_entered_values=user_entered_values # ๐Ÿ†• PASS USER VALUES ) # NOTE: Original values are now part of LLM response with reasoning # No need to add them separately - they're already in the "user_value" field result = { "product_id": product.item_id, "mandatory": extracted.get("mandatory", {}), "additional": extracted.get("additional", {}), } if ocr_results: result["ocr_results"] = ocr_results if visual_results: result["visual_results"] = visual_results processing_time = time.time() - product_start logger.info(f"โœ“ Processed {item_id} in {processing_time:.2f}s") return result, True except Exception as e: logger.error(f"โŒ Error processing {item_id}: {str(e)}") return { "product_id": item_id, "error": str(e) }, False # ==================== OPTIMIZATION 4: Parallel Execution ==================== max_workers = min(1 if process_image else 1, len(product_list)) logger.info(f"โšก Using {max_workers} parallel workers") with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_product = { executor.submit(process_single_product, product): product for product in product_list } for future in concurrent.futures.as_completed(future_to_product): try: result, success = future.result() results.append(result) if success: successful += 1 else: failed += 1 except Exception as e: failed += 1 logger.error(f"โŒ Future execution error: {str(e)}") results.append({ "product_id": "unknown", "error": str(e) }) total_time = time.time() - start_time # Get cache statistics cache_stats = ProductAttributeService.get_cache_stats() logger.info(f""" ๐ŸŽ‰ BATCH PROCESSING COMPLETE โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ” Total products: {len(product_list)} Successful: {successful} Failed: {failed} Total time: {total_time:.2f}s Avg time/product: {total_time/len(product_list):.2f}s โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ” """) batch_result = { "results": results, "total_products": len(product_list), "successful": successful, "failed": failed, "performance": { "total_time_seconds": round(total_time, 2), "avg_time_per_product": round(total_time / len(product_list), 2), "workers_used": max_workers }, "cache_stats": cache_stats } response_serializer = BatchProductResponseSerializer(data=batch_result) if response_serializer.is_valid(): return Response(response_serializer.data, status=status.HTTP_200_OK) return Response(batch_result, status=status.HTTP_200_OK) # # ==================== views.py (OPTIMIZED FOR 4 API KEYS) ==================== # import concurrent.futures # import logging # import time # from rest_framework.views import APIView # from rest_framework.response import Response # from rest_framework import status # from .models import Product, ProductAttributeValue # from .serializers import BatchProductRequestSerializer, BatchProductResponseSerializer # from .services import ProductAttributeService # logger = logging.getLogger(__name__) # class BatchExtractProductAttributesView(APIView): # """ # โšก PERFORMANCE OPTIMIZED: Batch extraction with 4-key load balancer # Expected performance with 4 keys: 10 products in 20-35 seconds (with image processing) # NOW WITH USER VALUE REASONING + MULTI-KEY SUPPORT # """ # def post(self, request): # start_time = time.time() # serializer = BatchProductRequestSerializer(data=request.data) # if not serializer.is_valid(): # return Response({"error": serializer.errors}, status=status.HTTP_400_BAD_REQUEST) # validated_data = serializer.validated_data # product_list = validated_data.get("products", []) # logger.info(f"๐Ÿš€ Starting batch processing for {len(product_list)} products") # # ==================== OPTIMIZATION 1: Bulk DB Query ==================== # item_ids = [p['item_id'] for p in product_list] # products_queryset = Product.objects.filter( # item_id__in=item_ids # ).prefetch_related('attribute_values') # product_map = {product.item_id: product for product in products_queryset} # # Prefetch ALL original attribute values in ONE query # original_values_qs = ProductAttributeValue.objects.filter( # product__item_id__in=item_ids # ).select_related('product') # original_values_map = {} # for attr_val in original_values_qs: # item_id = attr_val.product.item_id # if item_id not in original_values_map: # original_values_map[item_id] = {} # original_values_map[item_id][attr_val.attribute_name] = attr_val.original_value # logger.info(f"โœ“ Loaded {len(product_map)} products from database") # logger.info(f"โœ“ Loaded user values for {len(original_values_map)} products") # # Extract settings # model = validated_data.get("model") # extract_additional = validated_data.get("extract_additional", True) # process_image = validated_data.get("process_image", True) # multiple = validated_data.get("multiple", []) # threshold_abs = validated_data.get("threshold_abs", 0.65) # margin = validated_data.get("margin", 0.15) # use_dynamic_thresholds = validated_data.get("use_dynamic_thresholds", False) # use_adaptive_margin = validated_data.get("use_adaptive_margin", False) # use_semantic_clustering = validated_data.get("use_semantic_clustering", False) # results = [] # successful = 0 # failed = 0 # # ==================== OPTIMIZATION 2: Conditional Service Init ==================== # ocr_service = None # visual_service = None # if process_image: # try: # from .ocr_service import OCRService # from .visual_processing_service import VisualProcessingService # ocr_service = OCRService() # visual_service = VisualProcessingService() # logger.info("โœ“ Image processing services initialized") # except ImportError as e: # logger.warning(f"โš ๏ธ Image processing services not available: {e}") # process_image = False # # ==================== OPTIMIZATION 3: Smart Parallelization with 4 Keys ==================== # def process_single_product(product_entry): # """Process a single product (runs in parallel)""" # product_start = time.time() # item_id = product_entry['item_id'] # mandatory_attrs = product_entry['mandatory_attrs'] # if item_id not in product_map: # return { # "product_id": item_id, # "error": "Product not found in database" # }, False # product = product_map[item_id] # try: # title = product.product_name # short_desc = product.product_short_description # long_desc = product.product_long_description # image_url = product.image_path # ocr_results = None # ocr_text = None # visual_results = None # # โšก IMAGE PROCESSING (if enabled) # if process_image and image_url: # if ocr_service: # try: # ocr_results = ocr_service.process_image(image_url) # if ocr_results and ocr_results.get("detected_text"): # ocr_attrs = ProductAttributeService.extract_attributes_from_ocr( # ocr_results, model # ) # ocr_results["extracted_attributes"] = ocr_attrs # ocr_text = "\n".join([ # f"{item['text']} (confidence: {item['confidence']:.2f})" # for item in ocr_results["detected_text"] # ]) # except Exception as e: # logger.warning(f"OCR failed for {item_id}: {e}") # if visual_service: # try: # product_type_hint = product.product_type if hasattr(product, 'product_type') else None # visual_results = visual_service.process_image(image_url, product_type_hint) # if visual_results and visual_results.get('visual_attributes'): # visual_results['visual_attributes'] = ProductAttributeService.format_visual_attributes( # visual_results['visual_attributes'] # ) # except Exception as e: # logger.warning(f"Visual processing failed for {item_id}: {e}") # # Combine product text with source tracking # product_text, source_map = ProductAttributeService.combine_product_text( # title=title, # short_desc=short_desc, # long_desc=long_desc, # ocr_text=ocr_text # ) # # ๐Ÿ†• GET USER-ENTERED VALUES FOR THIS PRODUCT # user_entered_values = original_values_map.get(item_id, {}) # if user_entered_values: # logger.debug(f"Processing {item_id} with {len(user_entered_values)} user-entered values") # # โšก EXTRACT ATTRIBUTES WITH MULTI-KEY LOAD BALANCER # extracted = ProductAttributeService.extract_attributes( # product_text=product_text, # mandatory_attrs=mandatory_attrs, # source_map=source_map, # model=model, # extract_additional=extract_additional, # multiple=multiple, # use_cache=True, # user_entered_values=user_entered_values # ) # result = { # "product_id": product.item_id, # "mandatory": extracted.get("mandatory", {}), # "additional": extracted.get("additional", {}), # } # if ocr_results: # result["ocr_results"] = ocr_results # if visual_results: # result["visual_results"] = visual_results # processing_time = time.time() - product_start # logger.info(f"โœ“ Processed {item_id} in {processing_time:.2f}s") # return result, True # except Exception as e: # logger.error(f"โŒ Error processing {item_id}: {str(e)}", exc_info=True) # return { # "product_id": item_id, # "error": str(e) # }, False # # ==================== OPTIMIZATION 4: Parallel Execution with 4 Keys ==================== # # With 4 API keys, we can safely run more workers # # Image processing: 4-6 workers (I/O bound) # # No image processing: 8-12 workers (CPU bound) # if process_image: # max_workers = min(3, len(product_list)) # else: # max_workers = min(5, len(product_list)) # logger.info(f"โšก Using {max_workers} parallel workers with 4-key load balancer") # with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: # future_to_product = { # executor.submit(process_single_product, product): product # for product in product_list # } # for future in concurrent.futures.as_completed(future_to_product): # try: # result, success = future.result() # results.append(result) # if success: # successful += 1 # else: # failed += 1 # except Exception as e: # failed += 1 # logger.error(f"โŒ Future execution error: {str(e)}") # results.append({ # "product_id": "unknown", # "error": str(e) # }) # total_time = time.time() - start_time # # Get comprehensive statistics # cache_stats = ProductAttributeService.get_cache_stats() # logger.info(f""" # โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•— # โ•‘ ๐ŸŽ‰ BATCH PROCESSING COMPLETE โ•‘ # โ• โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ฃ # โ•‘ Total products: {len(product_list):<3} โ•‘ # โ•‘ Successful: {successful:<3} โ•‘ # โ•‘ Failed: {failed:<3} โ•‘ # โ•‘ Total time: {total_time:.2f}s โ•‘ # โ•‘ Avg time/product: {total_time/len(product_list):.2f}s โ•‘ # โ•‘ Workers used: {max_workers:<2} โ•‘ # โ•‘ API keys available: {cache_stats['load_balancer'].get('available_keys', 0)}/{cache_stats['load_balancer'].get('total_keys', 0)} โ•‘ # โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ• # """) # batch_result = { # "results": results, # "total_products": len(product_list), # "successful": successful, # "failed": failed, # "performance": { # "total_time_seconds": round(total_time, 2), # "avg_time_per_product": round(total_time / len(product_list), 2), # "workers_used": max_workers, # "throughput_products_per_second": round(len(product_list) / total_time, 2) # }, # "cache_stats": cache_stats # } # response_serializer = BatchProductResponseSerializer(data=batch_result) # if response_serializer.is_valid(): # return Response(response_serializer.data, status=status.HTTP_200_OK) # return Response(batch_result, status=status.HTTP_200_OK) class ProductListView(APIView): """ GET API to list all products with details """ def get(self, request): products = Product.objects.all() serializer = ProductSerializer(products, many=True) return Response(serializer.data, status=status.HTTP_200_OK) # ------------------------------------------------------------------------------------------------- def generate_product_excel_background(): """ Function to perform batch attribute extraction for all products and generate an Excel file. Runs in a background thread to avoid blocking the API response. Logs success/failure and saves a status file for external monitoring. """ logger.info(f"[{datetime.now().isoformat()}] Starting background product Excel generation and attribute extraction.") successful = 0 failed = 0 results = [] # To store detailed extraction results for Excel sheet 2 # Function to write status file (SUCCESS/FAILED) def write_status(status_type, error_msg=None): status_data = { "status": status_type, "timestamp": datetime.now().isoformat(), "products_processed": successful + failed, "products_successful": successful, "products_failed": failed, # FIX: Use the updated EXCEL_FILE_NAME and OUTPUT_URL "excel_path": os.path.join(OUTPUT_URL, EXCEL_FILE_NAME) if status_type == "SUCCESS" else None, "log_path": os.path.join(OUTPUT_URL, LOG_FILE_NAME), "error_message": error_msg } try: # FIX: STATUS_FILE_PATH is now inside generated_outputs with open(STATUS_FILE_PATH, 'w') as f: json.dump(status_data, f, indent=4) except Exception as e: logger.exception(f"CRITICAL ERROR: Failed to write status file at {STATUS_FILE_PATH}: {e}") try: # 1. PREFETCH all necessary related data to minimize database queries # Prefetch possible values for mandatory attributes possible_values_prefetch = Prefetch( 'attributes', queryset=ProductAttribute.objects.filter(is_mandatory=True).prefetch_related('possible_values') ) # Fetch all ProductTypes with their mandatory attributes and possible values all_product_types = ProductType.objects.prefetch_related(possible_values_prefetch) product_type_map = { pt.name: pt for pt in all_product_types } # Prepare product_list for batch extraction all_products = Product.objects.all() product_list = [] for p in all_products: mandatory_attrs_dict = {} product_type_name = p.product_type.strip() if p.product_type else None if product_type_name and product_type_name in product_type_map: pt = product_type_map[product_type_name] # Build the mandatory_attrs dictionary: { "Attribute Name": ["Value 1", "Value 2"], ... } for attr in pt.attributes.all(): mandatory_attrs_dict[attr.name] = [ pv.value for pv in attr.possible_values.all() ] product_list.append({ "item_id": p.item_id, "product_type_name": product_type_name, "mandatory_attrs": mandatory_attrs_dict }) # Batch settings (using defaults) model = "llama-3.1-8b-instant" extract_additional = True process_image = False multiple = [] threshold_abs = 0.65 margin = 0.15 use_dynamic_thresholds = True use_adaptive_margin = True use_semantic_clustering = True # Batch extraction logic item_ids = [p['item_id'] for p in product_list] products_queryset = Product.objects.filter(item_id__in=item_ids) product_map = {product.item_id: product for product in products_queryset} found_ids = set(product_map.keys()) for product_entry in product_list: item_id = product_entry['item_id'] mandatory_attrs = product_entry['mandatory_attrs'] if item_id not in found_ids: failed += 1 results.append({ "product_id": item_id, "error": "Product not found in database" }) logger.warning(f"Product {item_id} not found in database. Skipping extraction.") continue product = product_map[item_id] try: title = product.product_name short_desc = product.product_short_description long_desc = product.product_long_description image_url = product.image_path ocr_results = None ocr_text = None visual_results = None if process_image and image_url: logger.info(f"Processing image for product {item_id}...") # OCR Processing ocr_service = OCRService() ocr_results = ocr_service.process_image(image_url) if ocr_results and ocr_results.get("detected_text"): ocr_attrs = ProductAttributeService.extract_attributes_from_ocr( ocr_results, model ) ocr_results["extracted_attributes"] = ocr_attrs ocr_text = "\n".join([ f"{item['text']} (confidence: {item['confidence']:.2f})" for item in ocr_results["detected_text"] ]) # Visual Processing visual_service = VisualProcessingService() product_type_hint = product.product_type if product.product_type else None visual_results = visual_service.process_image(image_url, product_type_hint) if visual_results and visual_results.get('visual_attributes'): visual_results['visual_attributes'] = ProductAttributeService.format_visual_attributes( visual_results['visual_attributes'] ) logger.info(f"Image processing done for product {item_id}.") # Combine product text with source tracking product_text, source_map = ProductAttributeService.combine_product_text( title=title, short_desc=short_desc, long_desc=long_desc, ocr_text=ocr_text ) # Attribute Extraction with source tracking extracted = ProductAttributeService.extract_attributes( product_text=product_text, mandatory_attrs=mandatory_attrs, source_map=source_map, model=model, extract_additional=extract_additional, multiple=multiple, threshold_abs=threshold_abs, margin=margin, use_dynamic_thresholds=use_dynamic_thresholds, use_adaptive_margin=use_adaptive_margin, use_semantic_clustering=use_semantic_clustering ) result = { "product_id": item_id, "mandatory": extracted.get("mandatory", {}), "additional": extracted.get("additional", {}), } if ocr_results: result["ocr_results"] = ocr_results if visual_results: result["visual_results"] = visual_results results.append(result) successful += 1 logger.info(f"Attribute extraction successful for product {item_id}.") except Exception as e: failed += 1 results.append({ "product_id": item_id, "error": str(e) }) logger.exception(f"Error during attribute extraction for product {item_id}.") logger.info(f"Batch extraction phase complete. Successful: {successful}, Failed: {failed}") # -------------------------------------------------------------------------------- # Generate and save the Excel file # -------------------------------------------------------------------------------- wb = Workbook() # Sheet 1: Products (from DB) (Logic is the same, skipped for brevity) ws_products = wb.active ws_products.title = "Products" products_headers = ['ITEM ID', 'PRODUCT NAME', 'PRODUCT TYPE', 'Product Short Description', 'Product Long Description', 'image_path'] header_fill = PatternFill(start_color="366092", end_color="366092", fill_type="solid") header_font = Font(bold=True, color="FFFFFF") for col_num, header in enumerate(products_headers, 1): cell = ws_products.cell(row=1, column=col_num) cell.value = header cell.fill = header_fill cell.font = header_font cell.alignment = Alignment(horizontal="center", vertical="center") all_products_db = Product.objects.all() for row_num, product in enumerate(all_products_db, 2): ws_products.cell(row=row_num, column=1, value=product.item_id) ws_products.cell(row=row_num, column=2, value=product.product_name) ws_products.cell(row=row_num, column=3, value=product.product_type) ws_products.cell(row=row_num, column=4, value=product.product_short_description) ws_products.cell(row=row_num, column=5, value=product.product_long_description) ws_products.cell(row=row_num, column=6, value=product.image_path) for col_dim, width in zip(['A', 'B', 'C', 'D', 'E', 'F'], [15, 25, 15, 35, 50, 45]): ws_products.column_dimensions[col_dim].width = width # Sheet 2: Attribute_values (Logic is the same, skipped for brevity) ws_attributes = wb.create_sheet("Attribute_values") attributes_headers = ['item_id', 'attribute_name', 'original_value', 'generated_value'] for col_num, header in enumerate(attributes_headers, 1): cell = ws_attributes.cell(row=1, column=col_num) cell.value = header cell.fill = header_fill cell.font = header_font cell.alignment = Alignment(horizontal="center", vertical="center") row_num = 2 all_original_attrs = ProductAttributeValue.objects.all() original_attrs_lookup = { (attr.product.item_id, attr.attribute_name): attr.original_value for attr in all_original_attrs } processed_original_keys = set() for res in results: # ... (Excel writing logic for generated/original attributes remains unchanged) item_id = res["product_id"] if "error" in res: for (orig_item_id, orig_attr_name), orig_value in original_attrs_lookup.items(): if orig_item_id == item_id: ws_attributes.cell(row=row_num, column=1, value=orig_item_id) ws_attributes.cell(row=row_num, column=2, value=orig_attr_name) ws_attributes.cell(row=row_num, column=3, value=orig_value) ws_attributes.cell(row=row_num, column=4, value=f"Extraction Failed: {res['error']}") processed_original_keys.add((orig_item_id, orig_attr_name)) row_num += 1 continue generated_attrs = {} for cat in ["mandatory", "additional"]: attrs = res.get(cat, {}) for attr_name, values in attrs.items(): for val in values: key = (item_id, attr_name) if key not in generated_attrs: generated_attrs[key] = [] generated_attrs[key].append(f"{val['value']} (source: {val['source']})") ocr = res.get("ocr_results") if ocr and "extracted_attributes" in ocr and isinstance(ocr["extracted_attributes"], dict): for attr_name, values in ocr["extracted_attributes"].items(): for val in values: key = (item_id, attr_name) if key not in generated_attrs: generated_attrs[key] = [] generated_attrs[key].append(f"{val['value']} (source: {val['source']})") visual = res.get("visual_results") if visual and "visual_attributes" in visual: vis_attrs = visual["visual_attributes"] if isinstance(vis_attrs, dict): for attr_name, values in vis_attrs.items(): if not isinstance(values, list): values = [{"value": values, "source": "visual"}] for val in values: key = (item_id, attr_name) if key not in generated_attrs: generated_attrs[key] = [] generated_attrs[key].append(f"{val['value']} (source: {val.get('source', 'visual')})") elif isinstance(vis_attrs, list): for item in vis_attrs: attr_name = item.get("attribute_name") or item.get("name") if not attr_name: continue value = item.get("value", "") source = item.get("source", "visual") key = (item_id, attr_name) if key not in generated_attrs: generated_attrs[key] = [] generated_attrs[key].append(f"{value} (source: {source})") for (attr_item_id, attr_name), gen_values in generated_attrs.items(): original_value = original_attrs_lookup.get((attr_item_id, attr_name), "") generated_value = "; ".join(gen_values) if gen_values else "" ws_attributes.cell(row=row_num, column=1, value=attr_item_id) ws_attributes.cell(row=row_num, column=2, value=attr_name) ws_attributes.cell(row=row_num, column=3, value=original_value) ws_attributes.cell(row=row_num, column=4, value=generated_value) processed_original_keys.add((attr_item_id, attr_name)) row_num += 1 for (orig_item_id, orig_attr_name), orig_value in original_attrs_lookup.items(): if orig_item_id == item_id and (orig_item_id, orig_attr_name) not in processed_original_keys: ws_attributes.cell(row=row_num, column=1, value=orig_item_id) ws_attributes.cell(row=row_num, column=2, value=orig_attr_name) ws_attributes.cell(row=row_num, column=3, value=orig_value) ws_attributes.cell(row=row_num, column=4, value="") processed_original_keys.add((orig_item_id, orig_attr_name)) row_num += 1 for (orig_item_id, orig_attr_name), orig_value in original_attrs_lookup.items(): if (orig_item_id, orig_attr_name) not in processed_original_keys: ws_attributes.cell(row=row_num, column=1, value=orig_item_id) ws_attributes.cell(row=row_num, column=2, value=orig_attr_name) ws_attributes.cell(row=row_num, column=3, value=orig_value) ws_attributes.cell(row=row_num, column=4, value="Original value only (Product not processed in batch)") row_num += 1 for col_dim, width in zip(['A', 'B', 'C', 'D'], [15, 35, 50, 50]): ws_attributes.column_dimensions[col_dim].width = width # FIX: Save to the new EXCEL_FILE_PATH wb.save(EXCEL_FILE_PATH) logger.info(f"Excel file successfully saved to {EXCEL_FILE_PATH}") # Write SUCCESS status write_status("SUCCESS") logger.info("Background task finished successfully.") except Exception as e: # Log the critical error and write FAILED status logger.exception("CRITICAL ERROR during background Excel generation process.") write_status("FAILED", error_msg=str(e)) # ------------------------------------------------------------------------------------------------- class ProductUploadExcelView(APIView): """ POST API to upload an Excel file. """ parser_classes = (MultiPartParser, FormParser) def post(self, request, *args, **kwargs): file_obj = request.FILES.get('file') if not file_obj: return Response({'error': 'No file provided'}, status=status.HTTP_400_BAD_REQUEST) try: # ... (Upload and DB processing logic remains unchanged) # Read all sheets from Excel file excel_file = pd.ExcelFile(file_obj) # Check if required sheets exist if 'Products' not in excel_file.sheet_names: logger.error(f"Upload failed: Missing 'Products' sheet in file.") return Response({ 'error': "Missing 'Products' sheet", 'available_sheets': excel_file.sheet_names }, status=status.HTTP_400_BAD_REQUEST) df_products = pd.read_excel(excel_file, sheet_name='Products') df_products.columns = [c.strip().lower().replace(' ', '_') for c in df_products.columns] expected_product_cols = { 'item_id', 'product_name', 'product_long_description', 'product_short_description', 'product_type', 'image_path' } if not expected_product_cols.issubset(df_products.columns): logger.error(f"Upload failed: Missing required columns in Products sheet.") return Response({ 'error': 'Missing required columns in Products sheet', 'required_columns': list(expected_product_cols), 'found_columns': list(df_products.columns) }, status=status.HTTP_400_BAD_REQUEST) df_attributes = None has_attributes_sheet = 'Attribute_values' in excel_file.sheet_names if has_attributes_sheet: df_attributes = pd.read_excel(excel_file, sheet_name='Attribute_values') df_attributes.columns = [c.strip().lower().replace(' ', '_') for c in df_attributes.columns] expected_attr_cols = {'item_id', 'attribute_name', 'original_value'} if not expected_attr_cols.issubset(df_attributes.columns): logger.error(f"Upload failed: Missing required columns in Attribute_values sheet.") return Response({ 'error': 'Missing required columns in Attribute_values sheet', 'required_columns': list(expected_attr_cols), 'found_columns': list(df_attributes.columns) }, status=status.HTTP_400_BAD_REQUEST) products_created = 0 products_updated = 0 attributes_created = 0 attributes_updated = 0 products_failed = 0 attributes_failed = 0 errors = [] with transaction.atomic(): for idx, row in df_products.iterrows(): item_id = str(row.get('item_id', '')).strip() product_type = str(row.get('product_type', '')).strip() if not item_id: products_failed += 1 errors.append(f"Products Row {idx + 2}: Missing item_id") continue try: if product_type: ProductType.objects.get_or_create(name=product_type) defaults = { 'product_name': str(row.get('product_name', '')), 'product_long_description': str(row.get('product_long_description', '')), 'product_short_description': str(row.get('product_short_description', '')), 'product_type': product_type, 'image_path': str(row.get('image_path', '')), } obj, created = Product.objects.update_or_create(item_id=item_id, defaults=defaults) if created: products_created += 1 else: products_updated += 1 except Exception as e: products_failed += 1 errors.append(f"Products Row {idx + 2} (item_id: {item_id}): {str(e)}") logger.error(f"Error processing product {item_id} in Products sheet: {e}") if has_attributes_sheet and df_attributes is not None: item_ids_in_attrs = df_attributes['item_id'].astype(str).unique() existing_products = {p.item_id: p for p in Product.objects.filter(item_id__in=item_ids_in_attrs)} for idx, row in df_attributes.iterrows(): item_id = str(row.get('item_id', '')).strip() attribute_name = str(row.get('attribute_name', '')).strip() original_value = str(row.get('original_value', '')).strip() if not item_id or not attribute_name: attributes_failed += 1 errors.append(f"Attribute_values Row {idx + 2}: Missing item_id or attribute_name") continue product = existing_products.get(item_id) if not product: attributes_failed += 1 errors.append(f"Attribute_values Row {idx + 2}: Product with item_id '{item_id}' not found. Make sure it exists in Products sheet.") continue try: attr_obj, created = ProductAttributeValue.objects.update_or_create( product=product, attribute_name=attribute_name, defaults={'original_value': original_value} ) if created: attributes_created += 1 else: attributes_updated += 1 except Exception as e: attributes_failed += 1 errors.append(f"Attribute_values Row {idx + 2} (item_id: {item_id}, attribute: {attribute_name}): {str(e)}") logger.error(f"Error processing attribute {attribute_name} for product {item_id}: {e}") # Prepare response data response_data = { 'message': 'Upload completed', 'products': { 'created': products_created, 'updated': products_updated, 'failed': products_failed, 'total_processed': products_created + products_updated + products_failed }, 'attribute_values': { 'created': attributes_created, 'updated': attributes_updated, 'failed': attributes_failed, 'total_processed': attributes_created + attributes_updated + attributes_failed } if has_attributes_sheet else {'message': 'Attribute_values sheet not found in Excel file'}, 'generated_excel_status': 'Excel generation started in the background.' } if errors: response_data['errors'] = errors[:50] if len(errors) > 50: response_data['errors'].append(f"... and {len(errors) - 50} more errors") upload_status = status.HTTP_201_CREATED if products_failed == 0 and attributes_failed == 0 else status.HTTP_207_MULTI_STATUS # Start background thread for Excel generation if upload was successful if products_failed == 0 and attributes_failed == 0: logger.info("API call successful. Triggering background Excel generation thread is commented for now !!!!.") # threading.Thread(target=generate_product_excel_background, daemon=True).start() ## FIX: Update monitoring URLs to point to the new generated_outputs subfolder # response_data['generated_excel_status'] = 'Background Excel generation triggered successfully.' # response_data['monitoring'] = { # 'excel_file': os.path.join(OUTPUT_URL, EXCEL_FILE_NAME), # 'status_file': os.path.join(OUTPUT_URL, STATUS_FILE_NAME), # 'log_file': os.path.join(OUTPUT_URL, LOG_FILE_NAME), # 'note': 'These files will be available once the background process completes.' # } else: logger.warning(f"API call finished with errors ({products_failed} products, {attributes_failed} attributes). Not triggering background excel generation.") response_data['generated_excel_status'] = 'Background Excel generation was NOT triggered due to upload errors. Fix upload errors and re-upload.' return Response(response_data, status=upload_status) except pd.errors.EmptyDataError: logger.error('The uploaded Excel file is empty or invalid.') return Response({'error': 'The uploaded Excel file is empty or invalid'}, status=status.HTTP_400_BAD_REQUEST) except Exception as e: logger.exception(f'An unexpected error occurred while processing the file.') return Response({'error': f'An unexpected error occurred while processing the file: {str(e)}'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) class DownloadExcelTemplateView(APIView): """ GET API to download an Excel template with two sheets: 1. Products sheet with sample data 2. Attribute_values sheet with sample data """ def get(self, request): # Create a new workbook wb = Workbook() # Remove default sheet if 'Sheet' in wb.sheetnames: wb.remove(wb['Sheet']) # ===== Create Products Sheet ===== ws_products = wb.create_sheet("Products", 0) # Define headers for Products products_headers = [ 'ITEM ID', 'PRODUCT NAME', 'PRODUCT TYPE', 'Product Short Description', 'Product Long Description', 'image_path' ] # Style for headers header_fill = PatternFill(start_color="366092", end_color="366092", fill_type="solid") header_font = Font(bold=True, color="FFFFFF") # Add headers to Products sheet for col_num, header in enumerate(products_headers, 1): cell = ws_products.cell(row=1, column=col_num) cell.value = header cell.fill = header_fill cell.font = header_font cell.alignment = Alignment(horizontal="center", vertical="center") # Add sample data to Products sheet sample_products = [ [ '3217373735', 'Blue V-Neck T-Shirt', 'Clothing', 'Stylish blue t-shirt with v-neck design', 'Premium quality cotton t-shirt featuring a classic v-neck design. Perfect for casual wear. Available in vibrant blue color.', 'https://images.unsplash.com/photo-1521572163474-6864f9cf17ab' ], [ '1234567890', 'Red Cotton Dress', 'Clothing', 'Beautiful red dress for special occasions', 'Elegant red dress made from 100% cotton fabric. Features a flowing design perfect for summer events and parties.', 'https://images.unsplash.com/photo-1595777457583-95e059d581b8' ], [ '9876543210', 'Steel Screws Pack', 'Hardware', 'Pack of zinc plated steel screws', 'Professional grade steel screws with zinc plating for corrosion resistance. Pack contains 50 pieces, 2 inch length, M6 thread size.', 'https://images.unsplash.com/photo-1542272604-787c3835535d' ] ] for row_num, row_data in enumerate(sample_products, 2): for col_num, value in enumerate(row_data, 1): ws_products.cell(row=row_num, column=col_num, value=value) # Adjust column widths for Products sheet ws_products.column_dimensions['A'].width = 15 # ITEM ID ws_products.column_dimensions['B'].width = 25 # PRODUCT NAME ws_products.column_dimensions['C'].width = 15 # PRODUCT TYPE ws_products.column_dimensions['D'].width = 35 # Short Description ws_products.column_dimensions['E'].width = 50 # Long Description ws_products.column_dimensions['F'].width = 45 # image_path # ===== Create Attribute_values Sheet ===== ws_attributes = wb.create_sheet("Attribute_values", 1) # Define headers for Attribute_values attributes_headers = ['item_id', 'attribute_name', 'original_value'] # Add headers to Attribute_values sheet for col_num, header in enumerate(attributes_headers, 1): cell = ws_attributes.cell(row=1, column=col_num) cell.value = header cell.fill = header_fill cell.font = header_font cell.alignment = Alignment(horizontal="center", vertical="center") # Add sample data to Attribute_values sheet sample_attributes = [ ['3217373735', 'Clothing Neck Style', 'V-Neck Square'], ['3217373735', 'Condition', 'New with tags'], ['3217373735', 'Material', '100% Cotton'], ['3217373735', 'Color', 'Sky Blue'], ['3217373735', 'Size', 'Medium'], ['1234567890', 'Sleeve Length', 'Sleeveless'], ['1234567890', 'Condition', 'Brand New'], ['1234567890', 'Pattern', 'Solid'], ['1234567890', 'Material', 'Cotton Blend'], ['1234567890', 'Color', 'Crimson Red'], ['9876543210', 'Material', 'Stainless Steel'], ['9876543210', 'Thread Size', 'M6'], ['9876543210', 'Length', '2 inches'], ['9876543210', 'Coating', 'Zinc Plated'], ['9876543210', 'Package Quantity', '50 pieces'], ] for row_num, row_data in enumerate(sample_attributes, 2): for col_num, value in enumerate(row_data, 1): ws_attributes.cell(row=row_num, column=col_num, value=value) # Adjust column widths for Attribute_values sheet ws_attributes.column_dimensions['A'].width = 15 # item_id ws_attributes.column_dimensions['B'].width = 25 # attribute_name ws_attributes.column_dimensions['C'].width = 30 # original_value # Add instructions sheet ws_instructions = wb.create_sheet("Instructions", 2) instructions_text = [ ['Excel Upload Instructions', ''], ['', ''], ['Sheet 1: Products', ''], ['- Contains product basic information', ''], ['- All columns are required', ''], ['- ITEM ID must be unique', ''], ['', ''], ['Sheet 2: Attribute_values', ''], ['- Contains original/manual attribute values', ''], ['- item_id must match an ITEM ID from Products sheet', ''], ['- Multiple rows can have the same item_id (for different attributes)', ''], ['- Each attribute per product should be on a separate row', ''], ['', ''], ['Upload Process:', ''], ['1. Fill in your product data in the Products sheet', ''], ['2. Fill in attribute values in the Attribute_values sheet', ''], ['3. Ensure item_id values match between both sheets', ''], ['4. Save the file and upload via API', ''], ['', ''], ['Notes:', ''], ['- Do not change sheet names (must be "Products" and "Attribute_values")', ''], ['- Do not change column header names', ''], ['- You can delete the sample data rows', ''], ['- You can delete this Instructions sheet before uploading', ''], ] for row_num, row_data in enumerate(instructions_text, 1): ws_instructions.cell(row=row_num, column=1, value=row_data[0]) if row_num == 1: cell = ws_instructions.cell(row=row_num, column=1) cell.font = Font(bold=True, size=14) ws_instructions.column_dimensions['A'].width = 60 # Save to BytesIO output = io.BytesIO() wb.save(output) output.seek(0) # Create response response = HttpResponse( output.getvalue(), content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' ) response['Content-Disposition'] = 'attachment; filename=product_upload_template.xlsx' return response class DownloadProductsWithAttributesExcelView(APIView): """ GET API to download existing products with their attribute values as Excel. Useful for users to update existing data. """ def get(self, request): from .models import Product, ProductAttributeValue # Create workbook wb = Workbook() if 'Sheet' in wb.sheetnames: wb.remove(wb['Sheet']) # ===== Products Sheet ===== ws_products = wb.create_sheet("Products", 0) # Headers products_headers = [ 'ITEM ID', 'PRODUCT NAME', 'PRODUCT TYPE', 'Product Short Description', 'Product Long Description', 'image_path' ] header_fill = PatternFill(start_color="366092", end_color="366092", fill_type="solid") header_font = Font(bold=True, color="FFFFFF") for col_num, header in enumerate(products_headers, 1): cell = ws_products.cell(row=1, column=col_num) cell.value = header cell.fill = header_fill cell.font = header_font cell.alignment = Alignment(horizontal="center", vertical="center") # Fetch and add product data products = Product.objects.all() for row_num, product in enumerate(products, 2): ws_products.cell(row=row_num, column=1, value=product.item_id) ws_products.cell(row=row_num, column=2, value=product.product_name) ws_products.cell(row=row_num, column=3, value=product.product_type) ws_products.cell(row=row_num, column=4, value=product.product_short_description) ws_products.cell(row=row_num, column=5, value=product.product_long_description) ws_products.cell(row=row_num, column=6, value=product.image_path) # Adjust widths ws_products.column_dimensions['A'].width = 15 ws_products.column_dimensions['B'].width = 25 ws_products.column_dimensions['C'].width = 15 ws_products.column_dimensions['D'].width = 35 ws_products.column_dimensions['E'].width = 50 ws_products.column_dimensions['F'].width = 45 # ===== Attribute_values Sheet ===== ws_attributes = wb.create_sheet("Attribute_values", 1) attributes_headers = ['item_id', 'attribute_name', 'original_value'] for col_num, header in enumerate(attributes_headers, 1): cell = ws_attributes.cell(row=1, column=col_num) cell.value = header cell.fill = header_fill cell.font = header_font cell.alignment = Alignment(horizontal="center", vertical="center") # Fetch and add attribute values attributes = ProductAttributeValue.objects.select_related('product').all() for row_num, attr in enumerate(attributes, 2): ws_attributes.cell(row=row_num, column=1, value=attr.product.item_id) ws_attributes.cell(row=row_num, column=2, value=attr.attribute_name) ws_attributes.cell(row=row_num, column=3, value=attr.original_value) ws_attributes.column_dimensions['A'].width = 15 ws_attributes.column_dimensions['B'].width = 25 ws_attributes.column_dimensions['C'].width = 30 # Save to BytesIO output = io.BytesIO() wb.save(output) output.seek(0) response = HttpResponse( output.getvalue(), content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' ) response['Content-Disposition'] = 'attachment; filename=products_export.xlsx' return response # class ProductAttributesUploadView(APIView): # """ # POST API to upload an Excel file and add mandatory/additional attributes # for product types with possible values. # """ # parser_classes = (MultiPartParser, FormParser) # def post(self, request): # file_obj = request.FILES.get('file') # if not file_obj: # return Response({"error": "No file provided."}, status=status.HTTP_400_BAD_REQUEST) # try: # df = pd.read_excel(file_obj) # required_columns = {'product_type', 'attribute_name', 'is_mandatory', 'possible_values'} # if not required_columns.issubset(df.columns): # return Response({ # "error": f"Missing required columns. Found: {list(df.columns)}" # }, status=status.HTTP_400_BAD_REQUEST) # for _, row in df.iterrows(): # product_type_name = str(row['product_type']).strip() # attr_name = str(row['attribute_name']).strip() # is_mandatory = str(row['is_mandatory']).strip().lower() in ['yes', 'true', '1'] # possible_values = str(row.get('possible_values', '')).strip() # # Get or create product type # product_type, _ = ProductType.objects.get_or_create(name=product_type_name) # # Get or create attribute # attribute, _ = ProductAttribute.objects.get_or_create( # product_type=product_type, # name=attr_name, # defaults={'is_mandatory': is_mandatory} # ) # attribute.is_mandatory = is_mandatory # attribute.save() # # Handle possible values # AttributePossibleValue.objects.filter(attribute=attribute).delete() # if possible_values: # for val in [v.strip() for v in possible_values.split(',') if v.strip()]: # AttributePossibleValue.objects.create(attribute=attribute, value=val) # return Response({"message": "Attributes uploaded successfully."}, status=status.HTTP_201_CREATED) # except Exception as e: # return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import status from rest_framework.parsers import MultiPartParser, FormParser import pandas as pd from .models import ProductType, ProductAttribute, AttributePossibleValue class ProductAttributesUploadView(APIView): """ POST API to upload an Excel file and synchronize attributes for each product type. Adds, updates, and deletes attributes and possible values. """ parser_classes = (MultiPartParser, FormParser) def post(self, request): file_obj = request.FILES.get('file') if not file_obj: return Response({"error": "No file provided."}, status=status.HTTP_400_BAD_REQUEST) try: df = pd.read_excel(file_obj) required_columns = {'product_type', 'attribute_name', 'is_mandatory', 'possible_values'} if not required_columns.issubset(df.columns): return Response({ "error": f"Missing required columns. Found: {list(df.columns)}" }, status=status.HTTP_400_BAD_REQUEST) # Normalize data df['product_type'] = df['product_type'].astype(str).str.strip() df['attribute_name'] = df['attribute_name'].astype(str).str.strip() # Track processed attributes per product type processed_attrs = {} # Group by product type for deletion logic for product_type_name, group_df in df.groupby('product_type'): product_type, _ = ProductType.objects.get_or_create(name=product_type_name) uploaded_attrs = set(group_df['attribute_name'].tolist()) # --- ๐Ÿงน Delete attributes that are NOT in the uploaded file --- existing_attrs = set( ProductAttribute.objects.filter(product_type=product_type) .values_list('name', flat=True) ) to_delete = existing_attrs - uploaded_attrs if to_delete: ProductAttribute.objects.filter( product_type=product_type, name__in=to_delete ).delete() # --- ๐Ÿ†• Create / Update remaining attributes --- for _, row in group_df.iterrows(): attr_name = str(row['attribute_name']).strip() is_mandatory = str(row['is_mandatory']).strip().lower() in ['yes', 'true', '1'] possible_values = str(row.get('possible_values', '')).strip() attribute, _ = ProductAttribute.objects.get_or_create( product_type=product_type, name=attr_name, defaults={'is_mandatory': is_mandatory} ) # Update mandatory flag attribute.is_mandatory = is_mandatory attribute.save() # Replace possible values AttributePossibleValue.objects.filter(attribute=attribute).delete() if possible_values: for val in [v.strip() for v in possible_values.split(',') if v.strip()]: AttributePossibleValue.objects.create(attribute=attribute, value=val) processed_attrs[product_type_name] = { "uploaded": list(uploaded_attrs), "deleted": list(to_delete) } return Response({ "message": "Attributes synchronized successfully.", "summary": processed_attrs }, status=status.HTTP_201_CREATED) except Exception as e: return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) class ProductTypeAttributesView(APIView): """ API to view, create, update, and delete product type attributes and their possible values. Also supports dynamic product type creation. """ def get(self, request): """ Retrieve all product types with their attributes and possible values. """ product_types = ProductType.objects.all() serializer = ProductTypeSerializer(product_types, many=True) # Transform the serialized data into the requested format result = [] for pt in serializer.data: for attr in pt['attributes']: result.append({ 'product_type': pt['name'], 'attribute_name': attr['name'], 'is_mandatory': 'Yes' if attr['is_mandatory'] else 'No', 'possible_values': ', '.join([pv['value'] for pv in attr['possible_values']]) }) return Response(result, status=status.HTTP_200_OK) def post(self, request): """ Create a new product type or attribute with possible values. Expected payload example: { "product_type": "Hardware Screws", "attribute_name": "Material", "is_mandatory": "Yes", "possible_values": "Steel, Zinc Plated, Stainless Steel" } """ try: product_type_name = request.data.get('product_type') attribute_name = request.data.get('attribute_name', '') is_mandatory = request.data.get('is_mandatory', '').lower() in ['yes', 'true', '1'] possible_values = request.data.get('possible_values', '') if not product_type_name: return Response({ "error": "product_type is required" }, status=status.HTTP_400_BAD_REQUEST) with transaction.atomic(): # Get or create product type product_type, created = ProductType.objects.get_or_create(name=product_type_name) if created and not attribute_name: return Response({ "message": f"Product type '{product_type_name}' created successfully", "data": {"product_type": product_type_name} }, status=status.HTTP_201_CREATED) if attribute_name: # Create attribute attribute, attr_created = ProductAttribute.objects.get_or_create( product_type=product_type, name=attribute_name, defaults={'is_mandatory': is_mandatory} ) if not attr_created: return Response({ "error": f"Attribute '{attribute_name}' already exists for product type '{product_type_name}'" }, status=status.HTTP_400_BAD_REQUEST) # Handle possible values if possible_values: for val in [v.strip() for v in possible_values.split(',') if v.strip()]: AttributePossibleValue.objects.create(attribute=attribute, value=val) return Response({ "message": "Attribute created successfully", "data": { "product_type": product_type_name, "attribute_name": attribute_name, "is_mandatory": "Yes" if is_mandatory else "No", "possible_values": possible_values } }, status=status.HTTP_201_CREATED) return Response({ "message": f"Product type '{product_type_name}' already exists", "data": {"product_type": product_type_name} }, status=status.HTTP_200_OK) except Exception as e: return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) def put(self, request): """ Update an existing product type attribute and its possible values. Expected payload example: { "product_type": "Hardware Screws", "attribute_name": "Material", "is_mandatory": "Yes", "possible_values": "Steel, Zinc Plated, Stainless Steel, Brass" } """ try: product_type_name = request.data.get('product_type') attribute_name = request.data.get('attribute_name') is_mandatory = request.data.get('is_mandatory', '').lower() in ['yes', 'true', '1'] possible_values = request.data.get('possible_values', '') if not all([product_type_name, attribute_name]): return Response({ "error": "product_type and attribute_name are required" }, status=status.HTTP_400_BAD_REQUEST) with transaction.atomic(): try: product_type = ProductType.objects.get(name=product_type_name) attribute = ProductAttribute.objects.get( product_type=product_type, name=attribute_name ) except ProductType.DoesNotExist: return Response({ "error": f"Product type '{product_type_name}' not found" }, status=status.HTTP_404_NOT_FOUND) except ProductAttribute.DoesNotExist: return Response({ "error": f"Attribute '{attribute_name}' not found for product type '{product_type_name}'" }, status=status.HTTP_404_NOT_FOUND) # Update attribute attribute.is_mandatory = is_mandatory attribute.save() # Update possible values AttributePossibleValue.objects.filter(attribute=attribute).delete() if possible_values: for val in [v.strip() for v in possible_values.split(',') if v.strip()]: AttributePossibleValue.objects.create(attribute=attribute, value=val) return Response({ "message": "Attribute updated successfully", "data": { "product_type": product_type_name, "attribute_name": attribute_name, "is_mandatory": "Yes" if is_mandatory else "No", "possible_values": possible_values } }, status=status.HTTP_200_OK) except Exception as e: return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) def delete(self, request): """ Delete a product type or a specific attribute. Expected payload example: { "product_type": "Hardware Screws", "attribute_name": "Material" } """ try: product_type_name = request.data.get('product_type') attribute_name = request.data.get('attribute_name', '') if not product_type_name: return Response({ "error": "product_type is required" }, status=status.HTTP_400_BAD_REQUEST) with transaction.atomic(): try: product_type = ProductType.objects.get(name=product_type_name) except ProductType.DoesNotExist: return Response({ "error": f"Product type '{product_type_name}' not found" }, status=status.HTTP_404_NOT_FOUND) if attribute_name: # Delete specific attribute try: attribute = ProductAttribute.objects.get( product_type=product_type, name=attribute_name ) attribute.delete() return Response({ "message": f"Attribute '{attribute_name}' deleted successfully from product type '{product_type_name}'" }, status=status.HTTP_200_OK) except ProductAttribute.DoesNotExist: return Response({ "error": f"Attribute '{attribute_name}' not found for product type '{product_type_name}'" }, status=status.HTTP_404_NOT_FOUND) else: # Delete entire product type product_type.delete() return Response({ "message": f"Product type '{product_type_name}' and all its attributes deleted successfully" }, status=status.HTTP_200_OK) except Exception as e: return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) class ProductTypeListView(APIView): """ GET API to list all product types (only names). """ def get(self, request): product_types = ProductType.objects.values_list('name', flat=True) return Response({"product_types": list(product_types)}, status=status.HTTP_200_OK) class ProductAttributeValueView(APIView): """ API to manage manually entered original attribute values. GET: Retrieve all attribute values for a product POST: Create or update attribute values for a product DELETE: Delete attribute values """ def get(self, request): """ Get original attribute values for a specific product or all products. Query params: item_id (optional) """ item_id = request.query_params.get('item_id') if item_id: try: product = Product.objects.get(item_id=item_id) values = ProductAttributeValue.objects.filter(product=product) serializer = ProductAttributeValueSerializer(values, many=True) return Response({ "item_id": item_id, "attributes": serializer.data }, status=status.HTTP_200_OK) except Product.DoesNotExist: return Response({ "error": f"Product with item_id '{item_id}' not found" }, status=status.HTTP_404_NOT_FOUND) else: # Return all attribute values grouped by product values = ProductAttributeValue.objects.all().select_related('product') serializer = ProductAttributeValueSerializer(values, many=True) return Response(serializer.data, status=status.HTTP_200_OK) def post(self, request): """ Create or update original attribute value for a product. Expected payload: { "item_id": "3217373735", "attribute_name": "Clothing Neck Style", "original_value": "V-Neck Square" } """ serializer = ProductAttributeValueInputSerializer(data=request.data) if not serializer.is_valid(): return Response({"error": serializer.errors}, status=status.HTTP_400_BAD_REQUEST) validated_data = serializer.validated_data item_id = validated_data['item_id'] attribute_name = validated_data['attribute_name'] original_value = validated_data['original_value'] try: product = Product.objects.get(item_id=item_id) except Product.DoesNotExist: return Response({ "error": f"Product with item_id '{item_id}' not found" }, status=status.HTTP_404_NOT_FOUND) # Create or update the attribute value attr_value, created = ProductAttributeValue.objects.update_or_create( product=product, attribute_name=attribute_name, defaults={'original_value': original_value} ) response_serializer = ProductAttributeValueSerializer(attr_value) return Response({ "message": "Attribute value created" if created else "Attribute value updated", "data": response_serializer.data }, status=status.HTTP_201_CREATED if created else status.HTTP_200_OK) def delete(self, request): """ Delete original attribute value(s). Expected payload: { "item_id": "3217373735", "attribute_name": "Clothing Neck Style" # Optional, if not provided deletes all for product } """ item_id = request.data.get('item_id') attribute_name = request.data.get('attribute_name') if not item_id: return Response({ "error": "item_id is required" }, status=status.HTTP_400_BAD_REQUEST) try: product = Product.objects.get(item_id=item_id) except Product.DoesNotExist: return Response({ "error": f"Product with item_id '{item_id}' not found" }, status=status.HTTP_404_NOT_FOUND) if attribute_name: # Delete specific attribute deleted_count, _ = ProductAttributeValue.objects.filter( product=product, attribute_name=attribute_name ).delete() if deleted_count == 0: return Response({ "error": f"Attribute '{attribute_name}' not found for product '{item_id}'" }, status=status.HTTP_404_NOT_FOUND) return Response({ "message": f"Attribute '{attribute_name}' deleted successfully" }, status=status.HTTP_200_OK) else: # Delete all attributes for product deleted_count, _ = ProductAttributeValue.objects.filter(product=product).delete() return Response({ "message": f"Deleted {deleted_count} attribute(s) for product '{item_id}'" }, status=status.HTTP_200_OK) class BulkProductAttributeValueView(APIView): """ API for bulk operations on original attribute values. POST: Create/update multiple attribute values at once """ def post(self, request): """ Bulk create or update attribute values for multiple products. Expected payload: { "products": [ { "item_id": "3217373735", "attributes": { "Clothing Neck Style": "V-Neck Square", "Condition": "New with tags" } }, { "item_id": "1234567890", "attributes": { "Material": "Cotton", "Size": "L" } } ] } """ products_data = request.data.get('products', []) if not products_data: return Response({ "error": "products list is required" }, status=status.HTTP_400_BAD_REQUEST) results = [] successful = 0 failed = 0 with transaction.atomic(): for product_data in products_data: serializer = BulkProductAttributeValueSerializer(data=product_data) if not serializer.is_valid(): failed += 1 results.append({ "item_id": product_data.get('item_id'), "status": "failed", "error": serializer.errors }) continue validated_data = serializer.validated_data item_id = validated_data['item_id'] attributes = validated_data['attributes'] try: product = Product.objects.get(item_id=item_id) created_count = 0 updated_count = 0 for attr_name, original_value in attributes.items(): _, created = ProductAttributeValue.objects.update_or_create( product=product, attribute_name=attr_name, defaults={'original_value': original_value} ) if created: created_count += 1 else: updated_count += 1 successful += 1 results.append({ "item_id": item_id, "status": "success", "created": created_count, "updated": updated_count }) except Product.DoesNotExist: failed += 1 results.append({ "item_id": item_id, "status": "failed", "error": f"Product not found" }) return Response({ "results": results, "total_products": len(products_data), "successful": successful, "failed": failed }, status=status.HTTP_200_OK) class ProductListWithAttributesView(APIView): """ GET API to list all products with their original attribute values. """ def get(self, request): item_id = request.query_params.get('item_id') if item_id: try: product = Product.objects.get(item_id=item_id) serializer = ProductWithAttributesSerializer(product) return Response(serializer.data, status=status.HTTP_200_OK) except Product.DoesNotExist: return Response({ "error": f"Product with item_id '{item_id}' not found" }, status=status.HTTP_404_NOT_FOUND) else: products = Product.objects.all() serializer = ProductWithAttributesSerializer(products, many=True) return Response(serializer.data, status=status.HTTP_200_OK) class CacheManagementView(APIView): """ API endpoint to manage caching system. GET: Get current cache statistics and configuration POST: Enable/disable caching or clear caches """ def get(self, request): """ Get current cache configuration and statistics. """ config = cache_config.get_cache_config() stats = ProductAttributeService.get_cache_stats() return Response({ "configuration": config, "statistics": stats, "message": "Cache status retrieved successfully" }, status=status.HTTP_200_OK) def post(self, request): """ Manage cache settings. Expected payload examples: 1. Enable/disable caching: { "action": "toggle", "enable": true // or false } 2. Clear all caches: { "action": "clear" } 3. Clear specific cache: { "action": "clear", "cache_type": "embedding" // or "attribute" or "clip" } 4. Get statistics: { "action": "stats" } """ action = request.data.get('action') if not action: return Response({ "error": "action is required", "valid_actions": ["toggle", "clear", "stats"] }, status=status.HTTP_400_BAD_REQUEST) # Toggle caching on/off if action == "toggle": enable = request.data.get('enable') if enable is None: return Response({ "error": "enable parameter is required (true/false)" }, status=status.HTTP_400_BAD_REQUEST) # Update the cache configuration cache_config.ENABLE_CACHING = bool(enable) cache_config.ENABLE_ATTRIBUTE_EXTRACTION_CACHE = bool(enable) cache_config.ENABLE_EMBEDDING_CACHE = bool(enable) cache_config.ENABLE_CLIP_MODEL_CACHE = bool(enable) status_msg = "enabled" if enable else "disabled" return Response({ "message": f"Caching has been {status_msg}", "configuration": cache_config.get_cache_config() }, status=status.HTTP_200_OK) # Clear caches elif action == "clear": cache_type = request.data.get('cache_type', 'all') if cache_type == 'all': ProductAttributeService.clear_all_caches() VisualProcessingService.clear_clip_cache() message = "All caches cleared successfully" elif cache_type == 'embedding': from .services import EmbeddingCache EmbeddingCache.clear() message = "Embedding cache cleared successfully" elif cache_type == 'attribute': from .services import SimpleCache SimpleCache.clear() message = "Attribute extraction cache cleared successfully" elif cache_type == 'clip': VisualProcessingService.clear_clip_cache() message = "CLIP model cache cleared successfully" else: return Response({ "error": f"Invalid cache_type: {cache_type}", "valid_types": ["all", "embedding", "attribute", "clip"] }, status=status.HTTP_400_BAD_REQUEST) return Response({ "message": message, "statistics": ProductAttributeService.get_cache_stats() }, status=status.HTTP_200_OK) # Get statistics elif action == "stats": stats = ProductAttributeService.get_cache_stats() config = cache_config.get_cache_config() return Response({ "configuration": config, "statistics": stats }, status=status.HTTP_200_OK) else: return Response({ "error": f"Invalid action: {action}", "valid_actions": ["toggle", "clear", "stats"] }, status=status.HTTP_400_BAD_REQUEST) class CacheStatsView(APIView): """ Simple GET endpoint to retrieve cache statistics. """ def get(self, request): """Get current cache statistics.""" stats = ProductAttributeService.get_cache_stats() config = cache_config.get_cache_config() return Response({ "cache_enabled": config["master_cache_enabled"], "statistics": stats, "timestamp": datetime.now().isoformat() }, status=status.HTTP_200_OK)