from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import status from rest_framework.parsers import MultiPartParser, FormParser from django.db import transaction import pandas as pd from .models import Product, ProductType, ProductAttribute, AttributePossibleValue from .serializers import ( SingleProductRequestSerializer, BatchProductRequestSerializer, ProductAttributeResultSerializer, BatchProductResponseSerializer, ProductSerializer, ProductTypeSerializer, ProductAttributeSerializer, AttributePossibleValueSerializer ) from .services import ProductAttributeService from .ocr_service import OCRService # Sample test images (publicly available) SAMPLE_IMAGES = { "tshirt": "https://images.unsplash.com/photo-1521572163474-6864f9cf17ab", "dress": "https://images.unsplash.com/photo-1595777457583-95e059d581b8", "jeans": "https://images.unsplash.com/photo-1542272604-787c3835535d" } # ==================== Updated views.py ==================== from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import status from .models import Product from .services import ProductAttributeService from .ocr_service import OCRService from .visual_processing_service import VisualProcessingService class ExtractProductAttributesView(APIView): """ API endpoint to extract product attributes for a single product by item_id. Fetches product details from database with source tracking. Returns attributes in array format: [{"value": "...", "source": "..."}] Includes OCR and Visual Processing results. """ def post(self, request): serializer = SingleProductRequestSerializer(data=request.data) if not serializer.is_valid(): return Response({"error": serializer.errors}, status=status.HTTP_400_BAD_REQUEST) validated_data = serializer.validated_data item_id = validated_data.get("item_id") # Fetch product from DB try: product = Product.objects.get(item_id=item_id) except Product.DoesNotExist: return Response( {"error": f"Product with item_id '{item_id}' not found."}, status=status.HTTP_404_NOT_FOUND ) # Extract product details title = product.product_name short_desc = product.product_short_description long_desc = product.product_long_description image_url = product.image_path # Process image for OCR if required ocr_results = None ocr_text = None visual_results = None if validated_data.get("process_image", True) and image_url: # OCR Processing ocr_service = OCRService() ocr_results = ocr_service.process_image(image_url) if ocr_results and ocr_results.get("detected_text"): ocr_attrs = ProductAttributeService.extract_attributes_from_ocr( ocr_results, validated_data.get("model") ) ocr_results["extracted_attributes"] = ocr_attrs ocr_text = "\n".join([ f"{item['text']} (confidence: {item['confidence']:.2f})" for item in ocr_results["detected_text"] ]) # Visual Processing visual_service = VisualProcessingService() product_type_hint = product.product_type if hasattr(product, 'product_type') else None visual_results = visual_service.process_image(image_url, product_type_hint) # Combine all product text with source tracking product_text, source_map = ProductAttributeService.combine_product_text( title=title, short_desc=short_desc, long_desc=long_desc, ocr_text=ocr_text ) # Extract attributes with enhanced features and source tracking result = ProductAttributeService.extract_attributes( product_text=product_text, mandatory_attrs=validated_data["mandatory_attrs"], source_map=source_map, model=validated_data.get("model"), extract_additional=validated_data.get("extract_additional", True), multiple=validated_data.get("multiple", []), threshold_abs=validated_data.get("threshold_abs", 0.65), margin=validated_data.get("margin", 0.15), use_dynamic_thresholds=validated_data.get("use_dynamic_thresholds", True), use_adaptive_margin=validated_data.get("use_adaptive_margin", True), use_semantic_clustering=validated_data.get("use_semantic_clustering", True) ) # Attach OCR results if available if ocr_results: result["ocr_results"] = ocr_results # Attach Visual Processing results if available if visual_results: result["visual_results"] = visual_results response_serializer = ProductAttributeResultSerializer(data=result) if response_serializer.is_valid(): return Response(response_serializer.data, status=status.HTTP_200_OK) return Response(result, status=status.HTTP_200_OK) # class BatchExtractProductAttributesView(APIView): # """ # API endpoint to extract product attributes for multiple products in batch. # Uses item-specific mandatory_attrs with source tracking. # Returns attributes in array format: [{"value": "...", "source": "..."}] # Includes OCR and Visual Processing results. # """ # def post(self, request): # serializer = BatchProductRequestSerializer(data=request.data) # if not serializer.is_valid(): # return Response({"error": serializer.errors}, status=status.HTTP_400_BAD_REQUEST) # validated_data = serializer.validated_data # # DEBUG: Print what we received # print("\n" + "="*80) # print("BATCH REQUEST - RECEIVED DATA") # print("="*80) # print(f"Raw request data keys: {request.data.keys()}") # print(f"Multiple field in request: {request.data.get('multiple')}") # print(f"Validated multiple field: {validated_data.get('multiple')}") # print("="*80 + "\n") # # Get batch-level settings # product_list = validated_data.get("products", []) # model = validated_data.get("model") # extract_additional = validated_data.get("extract_additional", True) # process_image = validated_data.get("process_image", True) # multiple = validated_data.get("multiple", []) # threshold_abs = validated_data.get("threshold_abs", 0.65) # margin = validated_data.get("margin", 0.15) # use_dynamic_thresholds = validated_data.get("use_dynamic_thresholds", True) # use_adaptive_margin = validated_data.get("use_adaptive_margin", True) # use_semantic_clustering = validated_data.get("use_semantic_clustering", True) # # DEBUG: Print extracted settings # print(f"Extracted multiple parameter: {multiple}") # print(f"Type: {type(multiple)}") # # Extract all item_ids to query the database efficiently # item_ids = [p['item_id'] for p in product_list] # # Fetch all products in one query # products_queryset = Product.objects.filter(item_id__in=item_ids) # # Create a dictionary for easy lookup: item_id -> Product object # product_map = {product.item_id: product for product in products_queryset} # found_ids = set(product_map.keys()) # results = [] # successful = 0 # failed = 0 # for product_entry in product_list: # item_id = product_entry['item_id'] # # Get item-specific mandatory attributes # mandatory_attrs = product_entry['mandatory_attrs'] # if item_id not in found_ids: # failed += 1 # results.append({ # "product_id": item_id, # "error": "Product not found in database" # }) # continue # product = product_map[item_id] # try: # title = product.product_name # short_desc = product.product_short_description # long_desc = product.product_long_description # image_url = product.image_path # # image_url = "https://images.unsplash.com/photo-1595777457583-95e059d581b8" # ocr_results = None # ocr_text = None # visual_results = None # # Image Processing Logic # if process_image and image_url: # # OCR Processing # ocr_service = OCRService() # ocr_results = ocr_service.process_image(image_url) # print(f"OCR results for {item_id}: {ocr_results}") # if ocr_results and ocr_results.get("detected_text"): # ocr_attrs = ProductAttributeService.extract_attributes_from_ocr( # ocr_results, model # ) # ocr_results["extracted_attributes"] = ocr_attrs # ocr_text = "\n".join([ # f"{item['text']} (confidence: {item['confidence']:.2f})" # for item in ocr_results["detected_text"] # ]) # # Visual Processing # visual_service = VisualProcessingService() # product_type_hint = product.product_type if hasattr(product, 'product_type') else None # visual_results = visual_service.process_image(image_url, product_type_hint) # print(f"Visual results for {item_id}: {visual_results.get('visual_attributes', {})}") # # Format visual attributes to array format with source tracking # if visual_results and visual_results.get('visual_attributes'): # visual_results['visual_attributes'] = ProductAttributeService.format_visual_attributes( # visual_results['visual_attributes'] # ) # # Combine product text with source tracking # product_text, source_map = ProductAttributeService.combine_product_text( # title=title, # short_desc=short_desc, # long_desc=long_desc, # ocr_text=ocr_text # ) # # DEBUG: Print before extraction # print(f"\n>>> Extracting for product {item_id}") # print(f" Passing multiple: {multiple}") # # Attribute Extraction with source tracking (returns array format) # extracted = ProductAttributeService.extract_attributes( # product_text=product_text, # mandatory_attrs=mandatory_attrs, # source_map=source_map, # model=model, # extract_additional=extract_additional, # multiple=multiple, # threshold_abs=threshold_abs, # margin=margin, # use_dynamic_thresholds=use_dynamic_thresholds, # use_adaptive_margin=use_adaptive_margin, # use_semantic_clustering=use_semantic_clustering # ) # result = { # "product_id": product.item_id, # "mandatory": extracted.get("mandatory", {}), # "additional": extracted.get("additional", {}), # } # # Attach OCR results if available # if ocr_results: # result["ocr_results"] = ocr_results # # Attach Visual Processing results if available # if visual_results: # result["visual_results"] = visual_results # results.append(result) # successful += 1 # except Exception as e: # failed += 1 # results.append({ # "product_id": item_id, # "error": str(e) # }) # batch_result = { # "results": results, # "total_products": len(product_list), # "successful": successful, # "failed": failed # } # response_serializer = BatchProductResponseSerializer(data=batch_result) # if response_serializer.is_valid(): # return Response(response_serializer.data, status=status.HTTP_200_OK) # return Response(batch_result, status=status.HTTP_200_OK) # Replace the BatchExtractProductAttributesView in your views.py with this updated version class BatchExtractProductAttributesView(APIView): """ API endpoint to extract product attributes for multiple products in batch. Uses item-specific mandatory_attrs with source tracking. Returns attributes in array format with original_value field. Includes OCR and Visual Processing results. """ def post(self, request): serializer = BatchProductRequestSerializer(data=request.data) if not serializer.is_valid(): return Response({"error": serializer.errors}, status=status.HTTP_400_BAD_REQUEST) validated_data = serializer.validated_data # Get batch-level settings product_list = validated_data.get("products", []) model = validated_data.get("model") extract_additional = validated_data.get("extract_additional", True) process_image = validated_data.get("process_image", True) multiple = validated_data.get("multiple", []) threshold_abs = validated_data.get("threshold_abs", 0.65) margin = validated_data.get("margin", 0.15) use_dynamic_thresholds = validated_data.get("use_dynamic_thresholds", True) use_adaptive_margin = validated_data.get("use_adaptive_margin", True) use_semantic_clustering = validated_data.get("use_semantic_clustering", True) # Extract all item_ids to query the database efficiently item_ids = [p['item_id'] for p in product_list] # Fetch all products in one query products_queryset = Product.objects.filter(item_id__in=item_ids) product_map = {product.item_id: product for product in products_queryset} found_ids = set(product_map.keys()) # Fetch all original attribute values for these products in one query original_values_qs = ProductAttributeValue.objects.filter( product__item_id__in=item_ids ).select_related('product') # Create a nested dictionary: {item_id: {attribute_name: original_value}} original_values_map = {} for attr_val in original_values_qs: item_id = attr_val.product.item_id if item_id not in original_values_map: original_values_map[item_id] = {} original_values_map[item_id][attr_val.attribute_name] = attr_val.original_value results = [] successful = 0 failed = 0 for product_entry in product_list: item_id = product_entry['item_id'] mandatory_attrs = product_entry['mandatory_attrs'] if item_id not in found_ids: failed += 1 results.append({ "product_id": item_id, "error": "Product not found in database" }) continue product = product_map[item_id] try: title = product.product_name short_desc = product.product_short_description long_desc = product.product_long_description image_url = product.image_path ocr_results = None ocr_text = None visual_results = None # Image Processing Logic if process_image and image_url: # OCR Processing ocr_service = OCRService() ocr_results = ocr_service.process_image(image_url) if ocr_results and ocr_results.get("detected_text"): ocr_attrs = ProductAttributeService.extract_attributes_from_ocr( ocr_results, model ) ocr_results["extracted_attributes"] = ocr_attrs ocr_text = "\n".join([ f"{item['text']} (confidence: {item['confidence']:.2f})" for item in ocr_results["detected_text"] ]) # Visual Processing visual_service = VisualProcessingService() product_type_hint = product.product_type if hasattr(product, 'product_type') else None visual_results = visual_service.process_image(image_url, product_type_hint) # Format visual attributes to array format with source tracking if visual_results and visual_results.get('visual_attributes'): visual_results['visual_attributes'] = ProductAttributeService.format_visual_attributes( visual_results['visual_attributes'] ) # Combine product text with source tracking product_text, source_map = ProductAttributeService.combine_product_text( title=title, short_desc=short_desc, long_desc=long_desc, ocr_text=ocr_text ) # Attribute Extraction with source tracking (returns array format) extracted = ProductAttributeService.extract_attributes( product_text=product_text, mandatory_attrs=mandatory_attrs, source_map=source_map, model=model, extract_additional=extract_additional, multiple=multiple, threshold_abs=threshold_abs, margin=margin, use_dynamic_thresholds=use_dynamic_thresholds, use_adaptive_margin=use_adaptive_margin, use_semantic_clustering=use_semantic_clustering ) # Add original_value to each extracted attribute original_attrs = original_values_map.get(item_id, {}) # Process mandatory attributes for attr_name, attr_values in extracted.get("mandatory", {}).items(): if isinstance(attr_values, list): for attr_obj in attr_values: if isinstance(attr_obj, dict): # Add original_value if it exists attr_obj["original_value"] = original_attrs.get(attr_name, "") # Process additional attributes for attr_name, attr_values in extracted.get("additional", {}).items(): if isinstance(attr_values, list): for attr_obj in attr_values: if isinstance(attr_obj, dict): # Add original_value if it exists attr_obj["original_value"] = original_attrs.get(attr_name, "") result = { "product_id": product.item_id, "mandatory": extracted.get("mandatory", {}), "additional": extracted.get("additional", {}), } # Attach OCR results if available if ocr_results: result["ocr_results"] = ocr_results # Attach Visual Processing results if available if visual_results: result["visual_results"] = visual_results results.append(result) successful += 1 except Exception as e: failed += 1 results.append({ "product_id": item_id, "error": str(e) }) batch_result = { "results": results, "total_products": len(product_list), "successful": successful, "failed": failed } response_serializer = BatchProductResponseSerializer(data=batch_result) if response_serializer.is_valid(): return Response(response_serializer.data, status=status.HTTP_200_OK) return Response(batch_result, status=status.HTTP_200_OK) class ProductListView(APIView): """ GET API to list all products with details """ def get(self, request): products = Product.objects.all() serializer = ProductSerializer(products, many=True) return Response(serializer.data, status=status.HTTP_200_OK) from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import status from rest_framework.parsers import MultiPartParser, FormParser import pandas as pd from .models import Product # class ProductUploadExcelView(APIView): # """ # POST API to upload an Excel file and add/update data in Product model. # - Creates new records if they don't exist. # - Updates existing ones (e.g., when image_path or other fields change). # """ # parser_classes = (MultiPartParser, FormParser) # def post(self, request, *args, **kwargs): # file_obj = request.FILES.get('file') # if not file_obj: # return Response({'error': 'No file provided'}, status=status.HTTP_400_BAD_REQUEST) # try: # # Read Excel into DataFrame # df = pd.read_excel(file_obj) # df.columns = [c.strip().lower().replace(' ', '_') for c in df.columns] # expected_cols = { # 'item_id', # 'product_name', # 'product_long_description', # 'product_short_description', # 'product_type', # 'image_path' # } # # Check required columns # if not expected_cols.issubset(df.columns): # return Response({ # 'error': 'Missing required columns', # 'required_columns': list(expected_cols) # }, status=status.HTTP_400_BAD_REQUEST) # created_count = 0 # updated_count = 0 # # Loop through rows and update or create # for _, row in df.iterrows(): # item_id = str(row.get('item_id', '')).strip() # if not item_id: # continue # Skip rows without an item_id # defaults = { # 'product_name': row.get('product_name', ''), # 'product_long_description': row.get('product_long_description', ''), # 'product_short_description': row.get('product_short_description', ''), # 'product_type': row.get('product_type', ''), # 'image_path': row.get('image_path', ''), # } # obj, created = Product.objects.update_or_create( # item_id=item_id, # defaults=defaults # ) # if created: # created_count += 1 # else: # updated_count += 1 # return Response({ # 'message': f'Upload successful.', # 'created': f'{created_count} new records added.', # 'updated': f'{updated_count} existing records updated.' # }, status=status.HTTP_201_CREATED) # except Exception as e: # return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) # Replace the ProductUploadExcelView in your views.py with this updated version from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import status from rest_framework.parsers import MultiPartParser, FormParser from django.db import transaction import pandas as pd from .models import Product, ProductAttributeValue # class ProductUploadExcelView(APIView): # """ # POST API to upload an Excel file with two sheets: # 1. 'Products' sheet - Product details # 2. 'Attribute_values' sheet - Original attribute values # Creates/updates both products and their attribute values in a single transaction. # """ # parser_classes = (MultiPartParser, FormParser) # def post(self, request, *args, **kwargs): # file_obj = request.FILES.get('file') # if not file_obj: # return Response({'error': 'No file provided'}, status=status.HTTP_400_BAD_REQUEST) # try: # # Read all sheets from Excel file # excel_file = pd.ExcelFile(file_obj) # # Check if required sheets exist # if 'Products' not in excel_file.sheet_names: # return Response({ # 'error': "Missing 'Products' sheet", # 'available_sheets': excel_file.sheet_names # }, status=status.HTTP_400_BAD_REQUEST) # # Read Products sheet # df_products = pd.read_excel(excel_file, sheet_name='Products') # df_products.columns = [c.strip().lower().replace(' ', '_') for c in df_products.columns] # # Check required columns for Products # expected_product_cols = { # 'item_id', # 'product_name', # 'product_long_description', # 'product_short_description', # 'product_type', # 'image_path' # } # if not expected_product_cols.issubset(df_products.columns): # return Response({ # 'error': 'Missing required columns in Products sheet', # 'required_columns': list(expected_product_cols), # 'found_columns': list(df_products.columns) # }, status=status.HTTP_400_BAD_REQUEST) # # Read Attribute_values sheet if it exists # df_attributes = None # has_attributes_sheet = 'Attribute_values' in excel_file.sheet_names # if has_attributes_sheet: # df_attributes = pd.read_excel(excel_file, sheet_name='Attribute_values') # df_attributes.columns = [c.strip().lower().replace(' ', '_') for c in df_attributes.columns] # # Check required columns for Attribute_values # expected_attr_cols = {'item_id', 'attribute_name', 'original_value'} # if not expected_attr_cols.issubset(df_attributes.columns): # return Response({ # 'error': 'Missing required columns in Attribute_values sheet', # 'required_columns': list(expected_attr_cols), # 'found_columns': list(df_attributes.columns) # }, status=status.HTTP_400_BAD_REQUEST) # # Initialize counters # products_created = 0 # products_updated = 0 # attributes_created = 0 # attributes_updated = 0 # products_failed = 0 # attributes_failed = 0 # errors = [] # # Use transaction to ensure atomicity # with transaction.atomic(): # # Process Products sheet # for idx, row in df_products.iterrows(): # item_id = str(row.get('item_id', '')).strip() # if not item_id: # products_failed += 1 # errors.append(f"Products Row {idx + 2}: Missing item_id") # continue # try: # defaults = { # 'product_name': str(row.get('product_name', '')), # 'product_long_description': str(row.get('product_long_description', '')), # 'product_short_description': str(row.get('product_short_description', '')), # 'product_type': str(row.get('product_type', '')), # 'image_path': str(row.get('image_path', '')), # } # obj, created = Product.objects.update_or_create( # item_id=item_id, # defaults=defaults # ) # if created: # products_created += 1 # else: # products_updated += 1 # except Exception as e: # products_failed += 1 # errors.append(f"Products Row {idx + 2} (item_id: {item_id}): {str(e)}") # # Process Attribute_values sheet if it exists # if has_attributes_sheet and df_attributes is not None: # # Group by item_id to optimize lookups # item_ids_in_attrs = df_attributes['item_id'].unique() # # Fetch all products at once # existing_products = { # p.item_id: p # for p in Product.objects.filter(item_id__in=item_ids_in_attrs) # } # for idx, row in df_attributes.iterrows(): # item_id = str(row.get('item_id', '')).strip() # attribute_name = str(row.get('attribute_name', '')).strip() # original_value = str(row.get('original_value', '')).strip() # if not item_id or not attribute_name: # attributes_failed += 1 # errors.append( # f"Attribute_values Row {idx + 2}: Missing item_id or attribute_name" # ) # continue # # Check if product exists # product = existing_products.get(item_id) # if not product: # attributes_failed += 1 # errors.append( # f"Attribute_values Row {idx + 2}: Product with item_id '{item_id}' not found. " # "Make sure it exists in Products sheet." # ) # continue # try: # attr_obj, created = ProductAttributeValue.objects.update_or_create( # product=product, # attribute_name=attribute_name, # defaults={'original_value': original_value} # ) # if created: # attributes_created += 1 # else: # attributes_updated += 1 # except Exception as e: # attributes_failed += 1 # errors.append( # f"Attribute_values Row {idx + 2} " # f"(item_id: {item_id}, attribute: {attribute_name}): {str(e)}" # ) # # Prepare response # response_data = { # 'message': 'Upload completed successfully', # 'products': { # 'created': products_created, # 'updated': products_updated, # 'failed': products_failed, # 'total_processed': products_created + products_updated + products_failed # } # } # if has_attributes_sheet: # response_data['attribute_values'] = { # 'created': attributes_created, # 'updated': attributes_updated, # 'failed': attributes_failed, # 'total_processed': attributes_created + attributes_updated + attributes_failed # } # else: # response_data['attribute_values'] = { # 'message': 'Attribute_values sheet not found in Excel file' # } # if errors: # response_data['errors'] = errors[:50] # Limit to first 50 errors # if len(errors) > 50: # response_data['errors'].append(f"... and {len(errors) - 50} more errors") # # Determine status code # if products_failed > 0 or attributes_failed > 0: # status_code = status.HTTP_207_MULTI_STATUS # else: # status_code = status.HTTP_201_CREATED # return Response(response_data, status=status_code) # except pd.errors.EmptyDataError: # return Response({ # 'error': 'The uploaded Excel file is empty or invalid' # }, status=status.HTTP_400_BAD_REQUEST) # except Exception as e: # return Response({ # 'error': f'An error occurred while processing the file: {str(e)}' # }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) # import logging # import json # from rest_framework.views import APIView # from rest_framework.response import Response # from rest_framework import status # from rest_framework.parsers import MultiPartParser, FormParser # from django.db import transaction # from django.db.models import Prefetch, F # import pandas as pd # # Import ALL your models # from .models import Product, ProductAttributeValue, ProductType, ProductAttribute, AttributePossibleValue # from .services import ProductAttributeService # from .ocr_service import OCRService # from .visual_processing_service import VisualProcessingService # from openpyxl import Workbook # from openpyxl.styles import Font, PatternFill, Alignment # from django.conf import settings # import os # import threading # from datetime import datetime # # --- Logging Setup --- # # Define log and status file paths in MEDIA_ROOT # LOG_FILE_PATH = os.path.join(settings.MEDIA_ROOT, 'excel_generation.log') # STATUS_FILE_PATH = os.path.join(settings.MEDIA_ROOT, 'excel_generation_status.json') # # Ensure the MEDIA_ROOT exists for files to be saved # if not os.path.exists(settings.MEDIA_ROOT): # os.makedirs(settings.MEDIA_ROOT) # # Configure basic logging # logging.basicConfig( # filename=LOG_FILE_PATH, # level=logging.INFO, # format='%(asctime)s - %(levelname)s - %(message)s' # ) # logger = logging.getLogger(__name__) # # ------------------------------------------------------------------------------------------------- # def generate_product_excel_background(): # """ # Function to perform batch attribute extraction for all products and generate an Excel file. # Runs in a background thread to avoid blocking the API response. # Logs success/failure and saves a status file for external monitoring. # """ # logger.info(f"[{datetime.now().isoformat()}] Starting background product Excel generation and attribute extraction.") # successful = 0 # failed = 0 # results = [] # To store detailed extraction results for Excel sheet 2 # # Function to write status file (SUCCESS/FAILED) # def write_status(status_type, error_msg=None): # status_data = { # "status": status_type, # "timestamp": datetime.now().isoformat(), # "products_processed": successful + failed, # "products_successful": successful, # "products_failed": failed, # "excel_path": os.path.join(settings.MEDIA_URL, 'generated_products.xlsx') if status_type == "SUCCESS" else None, # "log_path": os.path.join(settings.MEDIA_URL, 'excel_generation.log'), # "error_message": error_msg # } # try: # with open(STATUS_FILE_PATH, 'w') as f: # json.dump(status_data, f, indent=4) # except Exception as e: # logger.exception(f"CRITICAL ERROR: Failed to write status file at {STATUS_FILE_PATH}: {e}") # try: # # 1. PREFETCH all necessary related data to minimize database queries # # Prefetch possible values for mandatory attributes # possible_values_prefetch = Prefetch( # 'attributes', # queryset=ProductAttribute.objects.filter(is_mandatory=True).prefetch_related('possible_values') # ) # # Fetch all ProductTypes with their mandatory attributes and possible values # all_product_types = ProductType.objects.prefetch_related(possible_values_prefetch) # product_type_map = { # pt.name: pt for pt in all_product_types # } # # Prepare product_list for batch extraction # all_products = Product.objects.all() # product_list = [] # for p in all_products: # # mandatory_attrs will be the dictionary required by the service # mandatory_attrs_dict = {} # product_type_name = p.product_type.strip() if p.product_type else None # if product_type_name and product_type_name in product_type_map: # pt = product_type_map[product_type_name] # # Build the mandatory_attrs dictionary: { "Attribute Name": ["Value 1", "Value 2"], ... } # for attr in pt.attributes.all(): # .all() here works because we used Prefetch for 'attributes' # # attr.possible_values.all() works because we used prefetch_related('possible_values') # mandatory_attrs_dict[attr.name] = [ # pv.value for pv in attr.possible_values.all() # ] # product_list.append({ # "item_id": p.item_id, # "product_type_name": product_type_name, # "mandatory_attrs": mandatory_attrs_dict # <-- FIX: Pass the dictionary here # }) # # Batch settings (using defaults) # model = "llama-3.1-8b-instant" # extract_additional = True # process_image = False # multiple = [] # threshold_abs = 0.65 # margin = 0.15 # use_dynamic_thresholds = True # use_adaptive_margin = True # use_semantic_clustering = True # # Batch extraction logic # item_ids = [p['item_id'] for p in product_list] # products_queryset = Product.objects.filter(item_id__in=item_ids) # product_map = {product.item_id: product for product in products_queryset} # found_ids = set(product_map.keys()) # for product_entry in product_list: # item_id = product_entry['item_id'] # # FIX: mandatory_attrs is now correctly a dictionary (or an empty dictionary) # mandatory_attrs = product_entry['mandatory_attrs'] # if item_id not in found_ids: # failed += 1 # results.append({ # "product_id": item_id, # "error": "Product not found in database" # }) # logger.warning(f"Product {item_id} not found in database. Skipping extraction.") # continue # product = product_map[item_id] # try: # title = product.product_name # short_desc = product.product_short_description # long_desc = product.product_long_description # image_url = product.image_path # ocr_results = None # ocr_text = None # visual_results = None # if process_image and image_url: # logger.info(f"Processing image for product {item_id}...") # # OCR Processing # ocr_service = OCRService() # ocr_results = ocr_service.process_image(image_url) # if ocr_results and ocr_results.get("detected_text"): # # NOTE: Assuming ProductAttributeService.extract_attributes_from_ocr exists # ocr_attrs = ProductAttributeService.extract_attributes_from_ocr( # ocr_results, model # ) # ocr_results["extracted_attributes"] = ocr_attrs # ocr_text = "\n".join([ # f"{item['text']} (confidence: {item['confidence']:.2f})" # for item in ocr_results["detected_text"] # ]) # # Visual Processing # visual_service = VisualProcessingService() # product_type_hint = product.product_type if product.product_type else None # visual_results = visual_service.process_image(image_url, product_type_hint) # if visual_results and visual_results.get('visual_attributes'): # visual_results['visual_attributes'] = ProductAttributeService.format_visual_attributes( # visual_results['visual_attributes'] # ) # logger.info(f"Image processing done for product {item_id}.") # # Combine product text with source tracking # product_text, source_map = ProductAttributeService.combine_product_text( # title=title, # short_desc=short_desc, # long_desc=long_desc, # ocr_text=ocr_text # ) # # Attribute Extraction with source tracking # extracted = ProductAttributeService.extract_attributes( # product_text=product_text, # mandatory_attrs=mandatory_attrs, # <-- This is now the dictionary with possible values # source_map=source_map, # model=model, # extract_additional=extract_additional, # multiple=multiple, # threshold_abs=threshold_abs, # margin=margin, # use_dynamic_thresholds=use_dynamic_thresholds, # use_adaptive_margin=use_adaptive_margin, # use_semantic_clustering=use_semantic_clustering # ) # result = { # "product_id": item_id, # "mandatory": extracted.get("mandatory", {}), # "additional": extracted.get("additional", {}), # } # if ocr_results: # result["ocr_results"] = ocr_results # if visual_results: # result["visual_results"] = visual_results # results.append(result) # successful += 1 # logger.info(f"Attribute extraction successful for product {item_id}.") # except Exception as e: # failed += 1 # results.append({ # "product_id": item_id, # "error": str(e) # }) # # Original Error: AttributeError: 'list' object has no attribute 'items' # # This should now be fixed, but we keep the robust exception handling. # logger.exception(f"Error during attribute extraction for product {item_id}.") # logger.info(f"Batch extraction phase complete. Successful: {successful}, Failed: {failed}") # # -------------------------------------------------------------------------------- # # Generate and save the Excel file (Unchanged) # # -------------------------------------------------------------------------------- # wb = Workbook() # # Sheet 1: Products (from DB) # ws_products = wb.active # ws_products.title = "Products" # products_headers = ['ITEM ID', 'PRODUCT NAME', 'PRODUCT TYPE', 'Product Short Description', 'Product Long Description', 'image_path'] # header_fill = PatternFill(start_color="366092", end_color="366092", fill_type="solid") # header_font = Font(bold=True, color="FFFFFF") # for col_num, header in enumerate(products_headers, 1): # cell = ws_products.cell(row=1, column=col_num) # cell.value = header # cell.fill = header_fill # cell.font = header_font # cell.alignment = Alignment(horizontal="center", vertical="center") # all_products_db = Product.objects.all() # for row_num, product in enumerate(all_products_db, 2): # ws_products.cell(row=row_num, column=1, value=product.item_id) # ws_products.cell(row=row_num, column=2, value=product.product_name) # ws_products.cell(row=row_num, column=3, value=product.product_type) # ws_products.cell(row=row_num, column=4, value=product.product_short_description) # ws_products.cell(row=row_num, column=5, value=product.product_long_description) # ws_products.cell(row=row_num, column=6, value=product.image_path) # # Adjust column widths # for col_dim, width in zip(['A', 'B', 'C', 'D', 'E', 'F'], [15, 25, 15, 35, 50, 45]): # ws_products.column_dimensions[col_dim].width = width # # Sheet 2: Attribute_values (item_id, attribute_name, original_value, generated_value) # ws_attributes = wb.create_sheet("Attribute_values") # attributes_headers = ['item_id', 'attribute_name', 'original_value', 'generated_value'] # for col_num, header in enumerate(attributes_headers, 1): # cell = ws_attributes.cell(row=1, column=col_num) # cell.value = header # cell.fill = header_fill # cell.font = header_font # cell.alignment = Alignment(horizontal="center", vertical="center") # # Fetch all original attributes # row_num = 2 # all_original_attrs = ProductAttributeValue.objects.all() # # Create a lookup for original attributes by item_id and attribute_name # original_attrs_lookup = { # (attr.product.item_id, attr.attribute_name): attr.original_value # for attr in all_original_attrs # } # # Add attributes (original and generated) # processed_original_keys = set() # for res in results: # item_id = res["product_id"] # if "error" in res: # # Add existing original attributes for failed products to the sheet # for (orig_item_id, orig_attr_name), orig_value in original_attrs_lookup.items(): # if orig_item_id == item_id: # ws_attributes.cell(row=row_num, column=1, value=orig_item_id) # ws_attributes.cell(row=row_num, column=2, value=orig_attr_name) # ws_attributes.cell(row=row_num, column=3, value=orig_value) # ws_attributes.cell(row=row_num, column=4, value=f"Extraction Failed: {res['error']}") # processed_original_keys.add((orig_item_id, orig_attr_name)) # row_num += 1 # continue # # Combine all generated attributes (mandatory, additional, OCR, visual) # generated_attrs = {} # for cat in ["mandatory", "additional"]: # attrs = res.get(cat, {}) # for attr_name, values in attrs.items(): # for val in values: # key = (item_id, attr_name) # if key not in generated_attrs: # generated_attrs[key] = [] # generated_attrs[key].append(f"{val['value']} (source: {val['source']})") # # OCR extracted # ocr = res.get("ocr_results") # if ocr and "extracted_attributes" in ocr and isinstance(ocr["extracted_attributes"], dict): # for attr_name, values in ocr["extracted_attributes"].items(): # for val in values: # key = (item_id, attr_name) # if key not in generated_attrs: # generated_attrs[key] = [] # generated_attrs[key].append(f"{val['value']} (source: {val['source']})") # # Visual extracted # visual = res.get("visual_results") # if visual and "visual_attributes" in visual: # vis_attrs = visual["visual_attributes"] # if isinstance(vis_attrs, dict): # # Handle dict format where value might be a list of dicts or a single value # for attr_name, values in vis_attrs.items(): # if not isinstance(values, list): # values = [{"value": values, "source": "visual"}] # for val in values: # key = (item_id, attr_name) # if key not in generated_attrs: # generated_attrs[key] = [] # generated_attrs[key].append(f"{val['value']} (source: {val.get('source', 'visual')})") # elif isinstance(vis_attrs, list): # # Handle list of dicts format # for item in vis_attrs: # attr_name = item.get("attribute_name") or item.get("name") # if not attr_name: continue # value = item.get("value", "") # source = item.get("source", "visual") # key = (item_id, attr_name) # if key not in generated_attrs: # generated_attrs[key] = [] # generated_attrs[key].append(f"{value} (source: {source})") # # Write attributes to Excel # for (attr_item_id, attr_name), gen_values in generated_attrs.items(): # # Get original value from lookup (if it exists) # original_value = original_attrs_lookup.get((attr_item_id, attr_name), "") # # Combine multiple generated values into a single string # generated_value = "; ".join(gen_values) if gen_values else "" # # Write row # ws_attributes.cell(row=row_num, column=1, value=attr_item_id) # ws_attributes.cell(row=row_num, column=2, value=attr_name) # ws_attributes.cell(row=row_num, column=3, value=original_value) # ws_attributes.cell(row=row_num, column=4, value=generated_value) # processed_original_keys.add((attr_item_id, attr_name)) # row_num += 1 # # Add original attributes that have no generated values for this item_id # for (orig_item_id, orig_attr_name), orig_value in original_attrs_lookup.items(): # if orig_item_id == item_id and (orig_item_id, orig_attr_name) not in processed_original_keys: # ws_attributes.cell(row=row_num, column=1, value=orig_item_id) # ws_attributes.cell(row=row_num, column=2, value=orig_attr_name) # ws_attributes.cell(row=row_num, column=3, value=orig_value) # ws_attributes.cell(row=row_num, column=4, value="") # No generated value # processed_original_keys.add((orig_item_id, orig_attr_name)) # row_num += 1 # # Add original attributes for products not included in the 'results' (e.g. if they didn't exist in product_list) # # We assume all products are in product_list, so this step might be redundant, but safe for completeness. # for (orig_item_id, orig_attr_name), orig_value in original_attrs_lookup.items(): # if (orig_item_id, orig_attr_name) not in processed_original_keys: # ws_attributes.cell(row=row_num, column=1, value=orig_item_id) # ws_attributes.cell(row=row_num, column=2, value=orig_attr_name) # ws_attributes.cell(row=row_num, column=3, value=orig_value) # ws_attributes.cell(row=row_num, column=4, value="Original value only (Product not processed in batch)") # row_num += 1 # # Adjust column widths for attributes # for col_dim, width in zip(['A', 'B', 'C', 'D'], [15, 35, 50, 50]): # ws_attributes.column_dimensions[col_dim].width = width # # Save the generated Excel (replace existing) # save_path = os.path.join(settings.MEDIA_ROOT, 'generated_products.xlsx') # wb.save(save_path) # logger.info(f"Excel file successfully saved to {save_path}") # # Write SUCCESS status # write_status("SUCCESS") # logger.info("Background task finished successfully.") # except Exception as e: # # Log the critical error and write FAILED status # logger.exception("CRITICAL ERROR during background Excel generation process.") # write_status("FAILED", error_msg=str(e)) # # ------------------------------------------------------------------------------------------------- # class ProductUploadExcelView(APIView): # """ # POST API to upload an Excel file. (Unchanged) # """ # parser_classes = (MultiPartParser, FormParser) # def post(self, request, *args, **kwargs): # file_obj = request.FILES.get('file') # if not file_obj: # return Response({'error': 'No file provided'}, status=status.HTTP_400_BAD_REQUEST) # try: # # Read all sheets from Excel file # excel_file = pd.ExcelFile(file_obj) # # Check if required sheets exist # if 'Products' not in excel_file.sheet_names: # logger.error(f"Upload failed: Missing 'Products' sheet in file.") # return Response({ # 'error': "Missing 'Products' sheet", # 'available_sheets': excel_file.sheet_names # }, status=status.HTTP_400_BAD_REQUEST) # # Read Products sheet # df_products = pd.read_excel(excel_file, sheet_name='Products') # df_products.columns = [c.strip().lower().replace(' ', '_') for c in df_products.columns] # # Check required columns for Products # expected_product_cols = { # 'item_id', 'product_name', 'product_long_description', # 'product_short_description', 'product_type', 'image_path' # } # if not expected_product_cols.issubset(df_products.columns): # logger.error(f"Upload failed: Missing required columns in Products sheet.") # return Response({ # 'error': 'Missing required columns in Products sheet', # 'required_columns': list(expected_product_cols), # 'found_columns': list(df_products.columns) # }, status=status.HTTP_400_BAD_REQUEST) # # Read Attribute_values sheet if it exists # df_attributes = None # has_attributes_sheet = 'Attribute_values' in excel_file.sheet_names # if has_attributes_sheet: # df_attributes = pd.read_excel(excel_file, sheet_name='Attribute_values') # df_attributes.columns = [c.strip().lower().replace(' ', '_') for c in df_attributes.columns] # # Check required columns for Attribute_values # expected_attr_cols = {'item_id', 'attribute_name', 'original_value'} # if not expected_attr_cols.issubset(df_attributes.columns): # logger.error(f"Upload failed: Missing required columns in Attribute_values sheet.") # return Response({ # 'error': 'Missing required columns in Attribute_values sheet', # 'required_columns': list(expected_attr_cols), # 'found_columns': list(df_attributes.columns) # }, status=status.HTTP_400_BAD_REQUEST) # # Initialize counters # products_created = 0 # products_updated = 0 # attributes_created = 0 # attributes_updated = 0 # products_failed = 0 # attributes_failed = 0 # errors = [] # # Use transaction to ensure atomicity # with transaction.atomic(): # # Process Products sheet # for idx, row in df_products.iterrows(): # item_id = str(row.get('item_id', '')).strip() # product_type = str(row.get('product_type', '')).strip() # if not item_id: # products_failed += 1 # errors.append(f"Products Row {idx + 2}: Missing item_id") # continue # try: # # Auto-create ProductType if provided and doesn't exist # if product_type: # ProductType.objects.get_or_create(name=product_type) # defaults = { # 'product_name': str(row.get('product_name', '')), # 'product_long_description': str(row.get('product_long_description', '')), # 'product_short_description': str(row.get('product_short_description', '')), # 'product_type': product_type, # 'image_path': str(row.get('image_path', '')), # } # obj, created = Product.objects.update_or_create( # item_id=item_id, # defaults=defaults # ) # if created: # products_created += 1 # else: # products_updated += 1 # except Exception as e: # products_failed += 1 # errors.append(f"Products Row {idx + 2} (item_id: {item_id}): {str(e)}") # logger.error(f"Error processing product {item_id} in Products sheet: {e}") # # Process Attribute_values sheet if it exists # if has_attributes_sheet and df_attributes is not None: # # Group by item_id to optimize lookups # item_ids_in_attrs = df_attributes['item_id'].astype(str).unique() # # Fetch all products at once # existing_products = { # p.item_id: p # for p in Product.objects.filter(item_id__in=item_ids_in_attrs) # } # for idx, row in df_attributes.iterrows(): # item_id = str(row.get('item_id', '')).strip() # attribute_name = str(row.get('attribute_name', '')).strip() # original_value = str(row.get('original_value', '')).strip() # if not item_id or not attribute_name: # attributes_failed += 1 # errors.append( # f"Attribute_values Row {idx + 2}: Missing item_id or attribute_name" # ) # continue # # Check if product exists # product = existing_products.get(item_id) # if not product: # attributes_failed += 1 # errors.append( # f"Attribute_values Row {idx + 2}: Product with item_id '{item_id}' not found. " # "Make sure it exists in Products sheet." # ) # continue # try: # attr_obj, created = ProductAttributeValue.objects.update_or_create( # product=product, # attribute_name=attribute_name, # defaults={'original_value': original_value} # ) # if created: # attributes_created += 1 # else: # attributes_updated += 1 # except Exception as e: # attributes_failed += 1 # errors.append( # f"Attribute_values Row {idx + 2} " # f"(item_id: {item_id}, attribute: {attribute_name}): {str(e)}" # ) # logger.error(f"Error processing attribute {attribute_name} for product {item_id}: {e}") # # Prepare response data # response_data = { # 'message': 'Upload completed', # 'products': { # 'created': products_created, # 'updated': products_updated, # 'failed': products_failed, # 'total_processed': products_created + products_updated + products_failed # }, # 'attribute_values': { # 'created': attributes_created, # 'updated': attributes_updated, # 'failed': attributes_failed, # 'total_processed': attributes_created + attributes_updated + attributes_failed # } if has_attributes_sheet else {'message': 'Attribute_values sheet not found in Excel file'}, # 'generated_excel_status': 'Excel generation started in the background.' # } # if errors: # response_data['errors'] = errors[:50] # if len(errors) > 50: # response_data['errors'].append(f"... and {len(errors) - 50} more errors") # # Determine status code for upload # upload_status = status.HTTP_201_CREATED if products_failed == 0 and attributes_failed == 0 else status.HTTP_207_MULTI_STATUS # # Start background thread for Excel generation if upload was successful # if products_failed == 0 and attributes_failed == 0: # logger.info("API call successful. Triggering background Excel generation thread.") # threading.Thread(target=generate_product_excel_background, daemon=True).start() # # Update response to provide monitoring paths # response_data['generated_excel_status'] = 'Background Excel generation triggered successfully.' # response_data['monitoring'] = { # 'excel_file': os.path.join(settings.MEDIA_URL, 'generated_products.xlsx'), # 'status_file': os.path.join(settings.MEDIA_URL, 'excel_generation_status.json'), # 'log_file': os.path.join(settings.MEDIA_URL, 'excel_generation.log'), # 'note': 'These files will be available once the background process completes.' # } # else: # logger.warning(f"API call finished with errors ({products_failed} products, {attributes_failed} attributes). Not triggering background excel generation.") # response_data['generated_excel_status'] = 'Background Excel generation was NOT triggered due to upload errors. Fix upload errors and re-upload.' # return Response(response_data, status=upload_status) # except pd.errors.EmptyDataError: # logger.error('The uploaded Excel file is empty or invalid.') # return Response({ # 'error': 'The uploaded Excel file is empty or invalid' # }, status=status.HTTP_400_BAD_REQUEST) # except Exception as e: # logger.exception(f'An unexpected error occurred while processing the file.') # return Response({ # 'error': f'An unexpected error occurred while processing the file: {str(e)}' # }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) import logging import json from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import status from rest_framework.parsers import MultiPartParser, FormParser from django.db import transaction from django.db.models import Prefetch import pandas as pd # Import ALL your models from .models import Product, ProductAttributeValue, ProductType, ProductAttribute, AttributePossibleValue from .services import ProductAttributeService from .ocr_service import OCRService from .visual_processing_service import VisualProcessingService from openpyxl import Workbook from openpyxl.styles import Font, PatternFill, Alignment from django.conf import settings import os import threading from datetime import datetime # --- Configuration for Generated Outputs Folder --- OUTPUT_FOLDER_NAME = 'generated_outputs' OUTPUT_ROOT = os.path.join(settings.MEDIA_ROOT, OUTPUT_FOLDER_NAME) OUTPUT_URL = os.path.join(settings.MEDIA_URL, OUTPUT_FOLDER_NAME).replace('\\', '/') # Use forward slashes for URL # Define log and status file paths within the new subfolder LOG_FILE_NAME = 'excel_generation.log' STATUS_FILE_NAME = 'excel_generation_status.json' EXCEL_FILE_NAME = 'generated_products.xlsx' LOG_FILE_PATH = os.path.join(OUTPUT_ROOT, LOG_FILE_NAME) STATUS_FILE_PATH = os.path.join(OUTPUT_ROOT, STATUS_FILE_NAME) EXCEL_FILE_PATH = os.path.join(OUTPUT_ROOT, EXCEL_FILE_NAME) # Ensure the OUTPUT_ROOT exists for files to be saved if not os.path.exists(OUTPUT_ROOT): os.makedirs(OUTPUT_ROOT) # Configure basic logging to the new path logging.basicConfig( filename=LOG_FILE_PATH, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # ------------------------------------------------------------------------------------------------- def generate_product_excel_background(): """ Function to perform batch attribute extraction for all products and generate an Excel file. Runs in a background thread to avoid blocking the API response. Logs success/failure and saves a status file for external monitoring. """ logger.info(f"[{datetime.now().isoformat()}] Starting background product Excel generation and attribute extraction.") successful = 0 failed = 0 results = [] # To store detailed extraction results for Excel sheet 2 # Function to write status file (SUCCESS/FAILED) def write_status(status_type, error_msg=None): status_data = { "status": status_type, "timestamp": datetime.now().isoformat(), "products_processed": successful + failed, "products_successful": successful, "products_failed": failed, # FIX: Use the updated EXCEL_FILE_NAME and OUTPUT_URL "excel_path": os.path.join(OUTPUT_URL, EXCEL_FILE_NAME) if status_type == "SUCCESS" else None, "log_path": os.path.join(OUTPUT_URL, LOG_FILE_NAME), "error_message": error_msg } try: # FIX: STATUS_FILE_PATH is now inside generated_outputs with open(STATUS_FILE_PATH, 'w') as f: json.dump(status_data, f, indent=4) except Exception as e: logger.exception(f"CRITICAL ERROR: Failed to write status file at {STATUS_FILE_PATH}: {e}") try: # 1. PREFETCH all necessary related data to minimize database queries # Prefetch possible values for mandatory attributes possible_values_prefetch = Prefetch( 'attributes', queryset=ProductAttribute.objects.filter(is_mandatory=True).prefetch_related('possible_values') ) # Fetch all ProductTypes with their mandatory attributes and possible values all_product_types = ProductType.objects.prefetch_related(possible_values_prefetch) product_type_map = { pt.name: pt for pt in all_product_types } # Prepare product_list for batch extraction all_products = Product.objects.all() product_list = [] for p in all_products: mandatory_attrs_dict = {} product_type_name = p.product_type.strip() if p.product_type else None if product_type_name and product_type_name in product_type_map: pt = product_type_map[product_type_name] # Build the mandatory_attrs dictionary: { "Attribute Name": ["Value 1", "Value 2"], ... } for attr in pt.attributes.all(): mandatory_attrs_dict[attr.name] = [ pv.value for pv in attr.possible_values.all() ] product_list.append({ "item_id": p.item_id, "product_type_name": product_type_name, "mandatory_attrs": mandatory_attrs_dict }) # Batch settings (using defaults) model = "llama-3.1-8b-instant" extract_additional = True process_image = False multiple = [] threshold_abs = 0.65 margin = 0.15 use_dynamic_thresholds = True use_adaptive_margin = True use_semantic_clustering = True # Batch extraction logic item_ids = [p['item_id'] for p in product_list] products_queryset = Product.objects.filter(item_id__in=item_ids) product_map = {product.item_id: product for product in products_queryset} found_ids = set(product_map.keys()) for product_entry in product_list: item_id = product_entry['item_id'] mandatory_attrs = product_entry['mandatory_attrs'] if item_id not in found_ids: failed += 1 results.append({ "product_id": item_id, "error": "Product not found in database" }) logger.warning(f"Product {item_id} not found in database. Skipping extraction.") continue product = product_map[item_id] try: title = product.product_name short_desc = product.product_short_description long_desc = product.product_long_description image_url = product.image_path ocr_results = None ocr_text = None visual_results = None if process_image and image_url: logger.info(f"Processing image for product {item_id}...") # OCR Processing ocr_service = OCRService() ocr_results = ocr_service.process_image(image_url) if ocr_results and ocr_results.get("detected_text"): ocr_attrs = ProductAttributeService.extract_attributes_from_ocr( ocr_results, model ) ocr_results["extracted_attributes"] = ocr_attrs ocr_text = "\n".join([ f"{item['text']} (confidence: {item['confidence']:.2f})" for item in ocr_results["detected_text"] ]) # Visual Processing visual_service = VisualProcessingService() product_type_hint = product.product_type if product.product_type else None visual_results = visual_service.process_image(image_url, product_type_hint) if visual_results and visual_results.get('visual_attributes'): visual_results['visual_attributes'] = ProductAttributeService.format_visual_attributes( visual_results['visual_attributes'] ) logger.info(f"Image processing done for product {item_id}.") # Combine product text with source tracking product_text, source_map = ProductAttributeService.combine_product_text( title=title, short_desc=short_desc, long_desc=long_desc, ocr_text=ocr_text ) # Attribute Extraction with source tracking extracted = ProductAttributeService.extract_attributes( product_text=product_text, mandatory_attrs=mandatory_attrs, source_map=source_map, model=model, extract_additional=extract_additional, multiple=multiple, threshold_abs=threshold_abs, margin=margin, use_dynamic_thresholds=use_dynamic_thresholds, use_adaptive_margin=use_adaptive_margin, use_semantic_clustering=use_semantic_clustering ) result = { "product_id": item_id, "mandatory": extracted.get("mandatory", {}), "additional": extracted.get("additional", {}), } if ocr_results: result["ocr_results"] = ocr_results if visual_results: result["visual_results"] = visual_results results.append(result) successful += 1 logger.info(f"Attribute extraction successful for product {item_id}.") except Exception as e: failed += 1 results.append({ "product_id": item_id, "error": str(e) }) logger.exception(f"Error during attribute extraction for product {item_id}.") logger.info(f"Batch extraction phase complete. Successful: {successful}, Failed: {failed}") # -------------------------------------------------------------------------------- # Generate and save the Excel file # -------------------------------------------------------------------------------- wb = Workbook() # Sheet 1: Products (from DB) (Logic is the same, skipped for brevity) ws_products = wb.active ws_products.title = "Products" products_headers = ['ITEM ID', 'PRODUCT NAME', 'PRODUCT TYPE', 'Product Short Description', 'Product Long Description', 'image_path'] header_fill = PatternFill(start_color="366092", end_color="366092", fill_type="solid") header_font = Font(bold=True, color="FFFFFF") for col_num, header in enumerate(products_headers, 1): cell = ws_products.cell(row=1, column=col_num) cell.value = header cell.fill = header_fill cell.font = header_font cell.alignment = Alignment(horizontal="center", vertical="center") all_products_db = Product.objects.all() for row_num, product in enumerate(all_products_db, 2): ws_products.cell(row=row_num, column=1, value=product.item_id) ws_products.cell(row=row_num, column=2, value=product.product_name) ws_products.cell(row=row_num, column=3, value=product.product_type) ws_products.cell(row=row_num, column=4, value=product.product_short_description) ws_products.cell(row=row_num, column=5, value=product.product_long_description) ws_products.cell(row=row_num, column=6, value=product.image_path) for col_dim, width in zip(['A', 'B', 'C', 'D', 'E', 'F'], [15, 25, 15, 35, 50, 45]): ws_products.column_dimensions[col_dim].width = width # Sheet 2: Attribute_values (Logic is the same, skipped for brevity) ws_attributes = wb.create_sheet("Attribute_values") attributes_headers = ['item_id', 'attribute_name', 'original_value', 'generated_value'] for col_num, header in enumerate(attributes_headers, 1): cell = ws_attributes.cell(row=1, column=col_num) cell.value = header cell.fill = header_fill cell.font = header_font cell.alignment = Alignment(horizontal="center", vertical="center") row_num = 2 all_original_attrs = ProductAttributeValue.objects.all() original_attrs_lookup = { (attr.product.item_id, attr.attribute_name): attr.original_value for attr in all_original_attrs } processed_original_keys = set() for res in results: # ... (Excel writing logic for generated/original attributes remains unchanged) item_id = res["product_id"] if "error" in res: for (orig_item_id, orig_attr_name), orig_value in original_attrs_lookup.items(): if orig_item_id == item_id: ws_attributes.cell(row=row_num, column=1, value=orig_item_id) ws_attributes.cell(row=row_num, column=2, value=orig_attr_name) ws_attributes.cell(row=row_num, column=3, value=orig_value) ws_attributes.cell(row=row_num, column=4, value=f"Extraction Failed: {res['error']}") processed_original_keys.add((orig_item_id, orig_attr_name)) row_num += 1 continue generated_attrs = {} for cat in ["mandatory", "additional"]: attrs = res.get(cat, {}) for attr_name, values in attrs.items(): for val in values: key = (item_id, attr_name) if key not in generated_attrs: generated_attrs[key] = [] generated_attrs[key].append(f"{val['value']} (source: {val['source']})") ocr = res.get("ocr_results") if ocr and "extracted_attributes" in ocr and isinstance(ocr["extracted_attributes"], dict): for attr_name, values in ocr["extracted_attributes"].items(): for val in values: key = (item_id, attr_name) if key not in generated_attrs: generated_attrs[key] = [] generated_attrs[key].append(f"{val['value']} (source: {val['source']})") visual = res.get("visual_results") if visual and "visual_attributes" in visual: vis_attrs = visual["visual_attributes"] if isinstance(vis_attrs, dict): for attr_name, values in vis_attrs.items(): if not isinstance(values, list): values = [{"value": values, "source": "visual"}] for val in values: key = (item_id, attr_name) if key not in generated_attrs: generated_attrs[key] = [] generated_attrs[key].append(f"{val['value']} (source: {val.get('source', 'visual')})") elif isinstance(vis_attrs, list): for item in vis_attrs: attr_name = item.get("attribute_name") or item.get("name") if not attr_name: continue value = item.get("value", "") source = item.get("source", "visual") key = (item_id, attr_name) if key not in generated_attrs: generated_attrs[key] = [] generated_attrs[key].append(f"{value} (source: {source})") for (attr_item_id, attr_name), gen_values in generated_attrs.items(): original_value = original_attrs_lookup.get((attr_item_id, attr_name), "") generated_value = "; ".join(gen_values) if gen_values else "" ws_attributes.cell(row=row_num, column=1, value=attr_item_id) ws_attributes.cell(row=row_num, column=2, value=attr_name) ws_attributes.cell(row=row_num, column=3, value=original_value) ws_attributes.cell(row=row_num, column=4, value=generated_value) processed_original_keys.add((attr_item_id, attr_name)) row_num += 1 for (orig_item_id, orig_attr_name), orig_value in original_attrs_lookup.items(): if orig_item_id == item_id and (orig_item_id, orig_attr_name) not in processed_original_keys: ws_attributes.cell(row=row_num, column=1, value=orig_item_id) ws_attributes.cell(row=row_num, column=2, value=orig_attr_name) ws_attributes.cell(row=row_num, column=3, value=orig_value) ws_attributes.cell(row=row_num, column=4, value="") processed_original_keys.add((orig_item_id, orig_attr_name)) row_num += 1 for (orig_item_id, orig_attr_name), orig_value in original_attrs_lookup.items(): if (orig_item_id, orig_attr_name) not in processed_original_keys: ws_attributes.cell(row=row_num, column=1, value=orig_item_id) ws_attributes.cell(row=row_num, column=2, value=orig_attr_name) ws_attributes.cell(row=row_num, column=3, value=orig_value) ws_attributes.cell(row=row_num, column=4, value="Original value only (Product not processed in batch)") row_num += 1 for col_dim, width in zip(['A', 'B', 'C', 'D'], [15, 35, 50, 50]): ws_attributes.column_dimensions[col_dim].width = width # FIX: Save to the new EXCEL_FILE_PATH wb.save(EXCEL_FILE_PATH) logger.info(f"Excel file successfully saved to {EXCEL_FILE_PATH}") # Write SUCCESS status write_status("SUCCESS") logger.info("Background task finished successfully.") except Exception as e: # Log the critical error and write FAILED status logger.exception("CRITICAL ERROR during background Excel generation process.") write_status("FAILED", error_msg=str(e)) # ------------------------------------------------------------------------------------------------- class ProductUploadExcelView(APIView): """ POST API to upload an Excel file. """ parser_classes = (MultiPartParser, FormParser) def post(self, request, *args, **kwargs): file_obj = request.FILES.get('file') if not file_obj: return Response({'error': 'No file provided'}, status=status.HTTP_400_BAD_REQUEST) try: # ... (Upload and DB processing logic remains unchanged) # Read all sheets from Excel file excel_file = pd.ExcelFile(file_obj) # Check if required sheets exist if 'Products' not in excel_file.sheet_names: logger.error(f"Upload failed: Missing 'Products' sheet in file.") return Response({ 'error': "Missing 'Products' sheet", 'available_sheets': excel_file.sheet_names }, status=status.HTTP_400_BAD_REQUEST) df_products = pd.read_excel(excel_file, sheet_name='Products') df_products.columns = [c.strip().lower().replace(' ', '_') for c in df_products.columns] expected_product_cols = { 'item_id', 'product_name', 'product_long_description', 'product_short_description', 'product_type', 'image_path' } if not expected_product_cols.issubset(df_products.columns): logger.error(f"Upload failed: Missing required columns in Products sheet.") return Response({ 'error': 'Missing required columns in Products sheet', 'required_columns': list(expected_product_cols), 'found_columns': list(df_products.columns) }, status=status.HTTP_400_BAD_REQUEST) df_attributes = None has_attributes_sheet = 'Attribute_values' in excel_file.sheet_names if has_attributes_sheet: df_attributes = pd.read_excel(excel_file, sheet_name='Attribute_values') df_attributes.columns = [c.strip().lower().replace(' ', '_') for c in df_attributes.columns] expected_attr_cols = {'item_id', 'attribute_name', 'original_value'} if not expected_attr_cols.issubset(df_attributes.columns): logger.error(f"Upload failed: Missing required columns in Attribute_values sheet.") return Response({ 'error': 'Missing required columns in Attribute_values sheet', 'required_columns': list(expected_attr_cols), 'found_columns': list(df_attributes.columns) }, status=status.HTTP_400_BAD_REQUEST) products_created = 0 products_updated = 0 attributes_created = 0 attributes_updated = 0 products_failed = 0 attributes_failed = 0 errors = [] with transaction.atomic(): for idx, row in df_products.iterrows(): item_id = str(row.get('item_id', '')).strip() product_type = str(row.get('product_type', '')).strip() if not item_id: products_failed += 1 errors.append(f"Products Row {idx + 2}: Missing item_id") continue try: if product_type: ProductType.objects.get_or_create(name=product_type) defaults = { 'product_name': str(row.get('product_name', '')), 'product_long_description': str(row.get('product_long_description', '')), 'product_short_description': str(row.get('product_short_description', '')), 'product_type': product_type, 'image_path': str(row.get('image_path', '')), } obj, created = Product.objects.update_or_create(item_id=item_id, defaults=defaults) if created: products_created += 1 else: products_updated += 1 except Exception as e: products_failed += 1 errors.append(f"Products Row {idx + 2} (item_id: {item_id}): {str(e)}") logger.error(f"Error processing product {item_id} in Products sheet: {e}") if has_attributes_sheet and df_attributes is not None: item_ids_in_attrs = df_attributes['item_id'].astype(str).unique() existing_products = {p.item_id: p for p in Product.objects.filter(item_id__in=item_ids_in_attrs)} for idx, row in df_attributes.iterrows(): item_id = str(row.get('item_id', '')).strip() attribute_name = str(row.get('attribute_name', '')).strip() original_value = str(row.get('original_value', '')).strip() if not item_id or not attribute_name: attributes_failed += 1 errors.append(f"Attribute_values Row {idx + 2}: Missing item_id or attribute_name") continue product = existing_products.get(item_id) if not product: attributes_failed += 1 errors.append(f"Attribute_values Row {idx + 2}: Product with item_id '{item_id}' not found. Make sure it exists in Products sheet.") continue try: attr_obj, created = ProductAttributeValue.objects.update_or_create( product=product, attribute_name=attribute_name, defaults={'original_value': original_value} ) if created: attributes_created += 1 else: attributes_updated += 1 except Exception as e: attributes_failed += 1 errors.append(f"Attribute_values Row {idx + 2} (item_id: {item_id}, attribute: {attribute_name}): {str(e)}") logger.error(f"Error processing attribute {attribute_name} for product {item_id}: {e}") # Prepare response data response_data = { 'message': 'Upload completed', 'products': { 'created': products_created, 'updated': products_updated, 'failed': products_failed, 'total_processed': products_created + products_updated + products_failed }, 'attribute_values': { 'created': attributes_created, 'updated': attributes_updated, 'failed': attributes_failed, 'total_processed': attributes_created + attributes_updated + attributes_failed } if has_attributes_sheet else {'message': 'Attribute_values sheet not found in Excel file'}, 'generated_excel_status': 'Excel generation started in the background.' } if errors: response_data['errors'] = errors[:50] if len(errors) > 50: response_data['errors'].append(f"... and {len(errors) - 50} more errors") upload_status = status.HTTP_201_CREATED if products_failed == 0 and attributes_failed == 0 else status.HTTP_207_MULTI_STATUS # Start background thread for Excel generation if upload was successful if products_failed == 0 and attributes_failed == 0: logger.info("API call successful. Triggering background Excel generation thread.") threading.Thread(target=generate_product_excel_background, daemon=True).start() # FIX: Update monitoring URLs to point to the new generated_outputs subfolder response_data['generated_excel_status'] = 'Background Excel generation triggered successfully.' response_data['monitoring'] = { 'excel_file': os.path.join(OUTPUT_URL, EXCEL_FILE_NAME), 'status_file': os.path.join(OUTPUT_URL, STATUS_FILE_NAME), 'log_file': os.path.join(OUTPUT_URL, LOG_FILE_NAME), 'note': 'These files will be available once the background process completes.' } else: logger.warning(f"API call finished with errors ({products_failed} products, {attributes_failed} attributes). Not triggering background excel generation.") response_data['generated_excel_status'] = 'Background Excel generation was NOT triggered due to upload errors. Fix upload errors and re-upload.' return Response(response_data, status=upload_status) except pd.errors.EmptyDataError: logger.error('The uploaded Excel file is empty or invalid.') return Response({'error': 'The uploaded Excel file is empty or invalid'}, status=status.HTTP_400_BAD_REQUEST) except Exception as e: logger.exception(f'An unexpected error occurred while processing the file.') return Response({'error': f'An unexpected error occurred while processing the file: {str(e)}'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) # Add this view to your views.py for downloading a template from django.http import HttpResponse from openpyxl import Workbook from openpyxl.styles import Font, PatternFill, Alignment from rest_framework.views import APIView import io class DownloadExcelTemplateView(APIView): """ GET API to download an Excel template with two sheets: 1. Products sheet with sample data 2. Attribute_values sheet with sample data """ def get(self, request): # Create a new workbook wb = Workbook() # Remove default sheet if 'Sheet' in wb.sheetnames: wb.remove(wb['Sheet']) # ===== Create Products Sheet ===== ws_products = wb.create_sheet("Products", 0) # Define headers for Products products_headers = [ 'ITEM ID', 'PRODUCT NAME', 'PRODUCT TYPE', 'Product Short Description', 'Product Long Description', 'image_path' ] # Style for headers header_fill = PatternFill(start_color="366092", end_color="366092", fill_type="solid") header_font = Font(bold=True, color="FFFFFF") # Add headers to Products sheet for col_num, header in enumerate(products_headers, 1): cell = ws_products.cell(row=1, column=col_num) cell.value = header cell.fill = header_fill cell.font = header_font cell.alignment = Alignment(horizontal="center", vertical="center") # Add sample data to Products sheet sample_products = [ [ '3217373735', 'Blue V-Neck T-Shirt', 'Clothing', 'Stylish blue t-shirt with v-neck design', 'Premium quality cotton t-shirt featuring a classic v-neck design. Perfect for casual wear. Available in vibrant blue color.', 'https://images.unsplash.com/photo-1521572163474-6864f9cf17ab' ], [ '1234567890', 'Red Cotton Dress', 'Clothing', 'Beautiful red dress for special occasions', 'Elegant red dress made from 100% cotton fabric. Features a flowing design perfect for summer events and parties.', 'https://images.unsplash.com/photo-1595777457583-95e059d581b8' ], [ '9876543210', 'Steel Screws Pack', 'Hardware', 'Pack of zinc plated steel screws', 'Professional grade steel screws with zinc plating for corrosion resistance. Pack contains 50 pieces, 2 inch length, M6 thread size.', 'https://images.unsplash.com/photo-1542272604-787c3835535d' ] ] for row_num, row_data in enumerate(sample_products, 2): for col_num, value in enumerate(row_data, 1): ws_products.cell(row=row_num, column=col_num, value=value) # Adjust column widths for Products sheet ws_products.column_dimensions['A'].width = 15 # ITEM ID ws_products.column_dimensions['B'].width = 25 # PRODUCT NAME ws_products.column_dimensions['C'].width = 15 # PRODUCT TYPE ws_products.column_dimensions['D'].width = 35 # Short Description ws_products.column_dimensions['E'].width = 50 # Long Description ws_products.column_dimensions['F'].width = 45 # image_path # ===== Create Attribute_values Sheet ===== ws_attributes = wb.create_sheet("Attribute_values", 1) # Define headers for Attribute_values attributes_headers = ['item_id', 'attribute_name', 'original_value'] # Add headers to Attribute_values sheet for col_num, header in enumerate(attributes_headers, 1): cell = ws_attributes.cell(row=1, column=col_num) cell.value = header cell.fill = header_fill cell.font = header_font cell.alignment = Alignment(horizontal="center", vertical="center") # Add sample data to Attribute_values sheet sample_attributes = [ ['3217373735', 'Clothing Neck Style', 'V-Neck Square'], ['3217373735', 'Condition', 'New with tags'], ['3217373735', 'Material', '100% Cotton'], ['3217373735', 'Color', 'Sky Blue'], ['3217373735', 'Size', 'Medium'], ['1234567890', 'Sleeve Length', 'Sleeveless'], ['1234567890', 'Condition', 'Brand New'], ['1234567890', 'Pattern', 'Solid'], ['1234567890', 'Material', 'Cotton Blend'], ['1234567890', 'Color', 'Crimson Red'], ['9876543210', 'Material', 'Stainless Steel'], ['9876543210', 'Thread Size', 'M6'], ['9876543210', 'Length', '2 inches'], ['9876543210', 'Coating', 'Zinc Plated'], ['9876543210', 'Package Quantity', '50 pieces'], ] for row_num, row_data in enumerate(sample_attributes, 2): for col_num, value in enumerate(row_data, 1): ws_attributes.cell(row=row_num, column=col_num, value=value) # Adjust column widths for Attribute_values sheet ws_attributes.column_dimensions['A'].width = 15 # item_id ws_attributes.column_dimensions['B'].width = 25 # attribute_name ws_attributes.column_dimensions['C'].width = 30 # original_value # Add instructions sheet ws_instructions = wb.create_sheet("Instructions", 2) instructions_text = [ ['Excel Upload Instructions', ''], ['', ''], ['Sheet 1: Products', ''], ['- Contains product basic information', ''], ['- All columns are required', ''], ['- ITEM ID must be unique', ''], ['', ''], ['Sheet 2: Attribute_values', ''], ['- Contains original/manual attribute values', ''], ['- item_id must match an ITEM ID from Products sheet', ''], ['- Multiple rows can have the same item_id (for different attributes)', ''], ['- Each attribute per product should be on a separate row', ''], ['', ''], ['Upload Process:', ''], ['1. Fill in your product data in the Products sheet', ''], ['2. Fill in attribute values in the Attribute_values sheet', ''], ['3. Ensure item_id values match between both sheets', ''], ['4. Save the file and upload via API', ''], ['', ''], ['Notes:', ''], ['- Do not change sheet names (must be "Products" and "Attribute_values")', ''], ['- Do not change column header names', ''], ['- You can delete the sample data rows', ''], ['- You can delete this Instructions sheet before uploading', ''], ] for row_num, row_data in enumerate(instructions_text, 1): ws_instructions.cell(row=row_num, column=1, value=row_data[0]) if row_num == 1: cell = ws_instructions.cell(row=row_num, column=1) cell.font = Font(bold=True, size=14) ws_instructions.column_dimensions['A'].width = 60 # Save to BytesIO output = io.BytesIO() wb.save(output) output.seek(0) # Create response response = HttpResponse( output.getvalue(), content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' ) response['Content-Disposition'] = 'attachment; filename=product_upload_template.xlsx' return response class DownloadProductsWithAttributesExcelView(APIView): """ GET API to download existing products with their attribute values as Excel. Useful for users to update existing data. """ def get(self, request): from .models import Product, ProductAttributeValue # Create workbook wb = Workbook() if 'Sheet' in wb.sheetnames: wb.remove(wb['Sheet']) # ===== Products Sheet ===== ws_products = wb.create_sheet("Products", 0) # Headers products_headers = [ 'ITEM ID', 'PRODUCT NAME', 'PRODUCT TYPE', 'Product Short Description', 'Product Long Description', 'image_path' ] header_fill = PatternFill(start_color="366092", end_color="366092", fill_type="solid") header_font = Font(bold=True, color="FFFFFF") for col_num, header in enumerate(products_headers, 1): cell = ws_products.cell(row=1, column=col_num) cell.value = header cell.fill = header_fill cell.font = header_font cell.alignment = Alignment(horizontal="center", vertical="center") # Fetch and add product data products = Product.objects.all() for row_num, product in enumerate(products, 2): ws_products.cell(row=row_num, column=1, value=product.item_id) ws_products.cell(row=row_num, column=2, value=product.product_name) ws_products.cell(row=row_num, column=3, value=product.product_type) ws_products.cell(row=row_num, column=4, value=product.product_short_description) ws_products.cell(row=row_num, column=5, value=product.product_long_description) ws_products.cell(row=row_num, column=6, value=product.image_path) # Adjust widths ws_products.column_dimensions['A'].width = 15 ws_products.column_dimensions['B'].width = 25 ws_products.column_dimensions['C'].width = 15 ws_products.column_dimensions['D'].width = 35 ws_products.column_dimensions['E'].width = 50 ws_products.column_dimensions['F'].width = 45 # ===== Attribute_values Sheet ===== ws_attributes = wb.create_sheet("Attribute_values", 1) attributes_headers = ['item_id', 'attribute_name', 'original_value'] for col_num, header in enumerate(attributes_headers, 1): cell = ws_attributes.cell(row=1, column=col_num) cell.value = header cell.fill = header_fill cell.font = header_font cell.alignment = Alignment(horizontal="center", vertical="center") # Fetch and add attribute values attributes = ProductAttributeValue.objects.select_related('product').all() for row_num, attr in enumerate(attributes, 2): ws_attributes.cell(row=row_num, column=1, value=attr.product.item_id) ws_attributes.cell(row=row_num, column=2, value=attr.attribute_name) ws_attributes.cell(row=row_num, column=3, value=attr.original_value) ws_attributes.column_dimensions['A'].width = 15 ws_attributes.column_dimensions['B'].width = 25 ws_attributes.column_dimensions['C'].width = 30 # Save to BytesIO output = io.BytesIO() wb.save(output) output.seek(0) response = HttpResponse( output.getvalue(), content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' ) response['Content-Disposition'] = 'attachment; filename=products_export.xlsx' return response class ProductAttributesUploadView(APIView): """ POST API to upload an Excel file and add mandatory/additional attributes for product types with possible values. """ parser_classes = (MultiPartParser, FormParser) def post(self, request): file_obj = request.FILES.get('file') if not file_obj: return Response({"error": "No file provided."}, status=status.HTTP_400_BAD_REQUEST) try: df = pd.read_excel(file_obj) required_columns = {'product_type', 'attribute_name', 'is_mandatory', 'possible_values'} if not required_columns.issubset(df.columns): return Response({ "error": f"Missing required columns. Found: {list(df.columns)}" }, status=status.HTTP_400_BAD_REQUEST) for _, row in df.iterrows(): product_type_name = str(row['product_type']).strip() attr_name = str(row['attribute_name']).strip() is_mandatory = str(row['is_mandatory']).strip().lower() in ['yes', 'true', '1'] possible_values = str(row.get('possible_values', '')).strip() # Get or create product type product_type, _ = ProductType.objects.get_or_create(name=product_type_name) # Get or create attribute attribute, _ = ProductAttribute.objects.get_or_create( product_type=product_type, name=attr_name, defaults={'is_mandatory': is_mandatory} ) attribute.is_mandatory = is_mandatory attribute.save() # Handle possible values AttributePossibleValue.objects.filter(attribute=attribute).delete() if possible_values: for val in [v.strip() for v in possible_values.split(',') if v.strip()]: AttributePossibleValue.objects.create(attribute=attribute, value=val) return Response({"message": "Attributes uploaded successfully."}, status=status.HTTP_201_CREATED) except Exception as e: return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) class ProductTypeAttributesView(APIView): """ API to view, create, update, and delete product type attributes and their possible values. Also supports dynamic product type creation. """ def get(self, request): """ Retrieve all product types with their attributes and possible values. """ product_types = ProductType.objects.all() serializer = ProductTypeSerializer(product_types, many=True) # Transform the serialized data into the requested format result = [] for pt in serializer.data: for attr in pt['attributes']: result.append({ 'product_type': pt['name'], 'attribute_name': attr['name'], 'is_mandatory': 'Yes' if attr['is_mandatory'] else 'No', 'possible_values': ', '.join([pv['value'] for pv in attr['possible_values']]) }) return Response(result, status=status.HTTP_200_OK) def post(self, request): """ Create a new product type or attribute with possible values. Expected payload example: { "product_type": "Hardware Screws", "attribute_name": "Material", "is_mandatory": "Yes", "possible_values": "Steel, Zinc Plated, Stainless Steel" } """ try: product_type_name = request.data.get('product_type') attribute_name = request.data.get('attribute_name', '') is_mandatory = request.data.get('is_mandatory', '').lower() in ['yes', 'true', '1'] possible_values = request.data.get('possible_values', '') if not product_type_name: return Response({ "error": "product_type is required" }, status=status.HTTP_400_BAD_REQUEST) with transaction.atomic(): # Get or create product type product_type, created = ProductType.objects.get_or_create(name=product_type_name) if created and not attribute_name: return Response({ "message": f"Product type '{product_type_name}' created successfully", "data": {"product_type": product_type_name} }, status=status.HTTP_201_CREATED) if attribute_name: # Create attribute attribute, attr_created = ProductAttribute.objects.get_or_create( product_type=product_type, name=attribute_name, defaults={'is_mandatory': is_mandatory} ) if not attr_created: return Response({ "error": f"Attribute '{attribute_name}' already exists for product type '{product_type_name}'" }, status=status.HTTP_400_BAD_REQUEST) # Handle possible values if possible_values: for val in [v.strip() for v in possible_values.split(',') if v.strip()]: AttributePossibleValue.objects.create(attribute=attribute, value=val) return Response({ "message": "Attribute created successfully", "data": { "product_type": product_type_name, "attribute_name": attribute_name, "is_mandatory": "Yes" if is_mandatory else "No", "possible_values": possible_values } }, status=status.HTTP_201_CREATED) return Response({ "message": f"Product type '{product_type_name}' already exists", "data": {"product_type": product_type_name} }, status=status.HTTP_200_OK) except Exception as e: return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) def put(self, request): """ Update an existing product type attribute and its possible values. Expected payload example: { "product_type": "Hardware Screws", "attribute_name": "Material", "is_mandatory": "Yes", "possible_values": "Steel, Zinc Plated, Stainless Steel, Brass" } """ try: product_type_name = request.data.get('product_type') attribute_name = request.data.get('attribute_name') is_mandatory = request.data.get('is_mandatory', '').lower() in ['yes', 'true', '1'] possible_values = request.data.get('possible_values', '') if not all([product_type_name, attribute_name]): return Response({ "error": "product_type and attribute_name are required" }, status=status.HTTP_400_BAD_REQUEST) with transaction.atomic(): try: product_type = ProductType.objects.get(name=product_type_name) attribute = ProductAttribute.objects.get( product_type=product_type, name=attribute_name ) except ProductType.DoesNotExist: return Response({ "error": f"Product type '{product_type_name}' not found" }, status=status.HTTP_404_NOT_FOUND) except ProductAttribute.DoesNotExist: return Response({ "error": f"Attribute '{attribute_name}' not found for product type '{product_type_name}'" }, status=status.HTTP_404_NOT_FOUND) # Update attribute attribute.is_mandatory = is_mandatory attribute.save() # Update possible values AttributePossibleValue.objects.filter(attribute=attribute).delete() if possible_values: for val in [v.strip() for v in possible_values.split(',') if v.strip()]: AttributePossibleValue.objects.create(attribute=attribute, value=val) return Response({ "message": "Attribute updated successfully", "data": { "product_type": product_type_name, "attribute_name": attribute_name, "is_mandatory": "Yes" if is_mandatory else "No", "possible_values": possible_values } }, status=status.HTTP_200_OK) except Exception as e: return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) def delete(self, request): """ Delete a product type or a specific attribute. Expected payload example: { "product_type": "Hardware Screws", "attribute_name": "Material" } """ try: product_type_name = request.data.get('product_type') attribute_name = request.data.get('attribute_name', '') if not product_type_name: return Response({ "error": "product_type is required" }, status=status.HTTP_400_BAD_REQUEST) with transaction.atomic(): try: product_type = ProductType.objects.get(name=product_type_name) except ProductType.DoesNotExist: return Response({ "error": f"Product type '{product_type_name}' not found" }, status=status.HTTP_404_NOT_FOUND) if attribute_name: # Delete specific attribute try: attribute = ProductAttribute.objects.get( product_type=product_type, name=attribute_name ) attribute.delete() return Response({ "message": f"Attribute '{attribute_name}' deleted successfully from product type '{product_type_name}'" }, status=status.HTTP_200_OK) except ProductAttribute.DoesNotExist: return Response({ "error": f"Attribute '{attribute_name}' not found for product type '{product_type_name}'" }, status=status.HTTP_404_NOT_FOUND) else: # Delete entire product type product_type.delete() return Response({ "message": f"Product type '{product_type_name}' and all its attributes deleted successfully" }, status=status.HTTP_200_OK) except Exception as e: return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) class ProductTypeListView(APIView): """ GET API to list all product types (only names). """ def get(self, request): product_types = ProductType.objects.values_list('name', flat=True) return Response({"product_types": list(product_types)}, status=status.HTTP_200_OK) # Add these views to your views.py from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import status from rest_framework.parsers import MultiPartParser, FormParser from django.db import transaction import pandas as pd from .models import Product, ProductAttributeValue from .serializers import ( ProductAttributeValueSerializer, ProductAttributeValueInputSerializer, BulkProductAttributeValueSerializer, ProductWithAttributesSerializer ) class ProductAttributeValueView(APIView): """ API to manage manually entered original attribute values. GET: Retrieve all attribute values for a product POST: Create or update attribute values for a product DELETE: Delete attribute values """ def get(self, request): """ Get original attribute values for a specific product or all products. Query params: item_id (optional) """ item_id = request.query_params.get('item_id') if item_id: try: product = Product.objects.get(item_id=item_id) values = ProductAttributeValue.objects.filter(product=product) serializer = ProductAttributeValueSerializer(values, many=True) return Response({ "item_id": item_id, "attributes": serializer.data }, status=status.HTTP_200_OK) except Product.DoesNotExist: return Response({ "error": f"Product with item_id '{item_id}' not found" }, status=status.HTTP_404_NOT_FOUND) else: # Return all attribute values grouped by product values = ProductAttributeValue.objects.all().select_related('product') serializer = ProductAttributeValueSerializer(values, many=True) return Response(serializer.data, status=status.HTTP_200_OK) def post(self, request): """ Create or update original attribute value for a product. Expected payload: { "item_id": "3217373735", "attribute_name": "Clothing Neck Style", "original_value": "V-Neck Square" } """ serializer = ProductAttributeValueInputSerializer(data=request.data) if not serializer.is_valid(): return Response({"error": serializer.errors}, status=status.HTTP_400_BAD_REQUEST) validated_data = serializer.validated_data item_id = validated_data['item_id'] attribute_name = validated_data['attribute_name'] original_value = validated_data['original_value'] try: product = Product.objects.get(item_id=item_id) except Product.DoesNotExist: return Response({ "error": f"Product with item_id '{item_id}' not found" }, status=status.HTTP_404_NOT_FOUND) # Create or update the attribute value attr_value, created = ProductAttributeValue.objects.update_or_create( product=product, attribute_name=attribute_name, defaults={'original_value': original_value} ) response_serializer = ProductAttributeValueSerializer(attr_value) return Response({ "message": "Attribute value created" if created else "Attribute value updated", "data": response_serializer.data }, status=status.HTTP_201_CREATED if created else status.HTTP_200_OK) def delete(self, request): """ Delete original attribute value(s). Expected payload: { "item_id": "3217373735", "attribute_name": "Clothing Neck Style" # Optional, if not provided deletes all for product } """ item_id = request.data.get('item_id') attribute_name = request.data.get('attribute_name') if not item_id: return Response({ "error": "item_id is required" }, status=status.HTTP_400_BAD_REQUEST) try: product = Product.objects.get(item_id=item_id) except Product.DoesNotExist: return Response({ "error": f"Product with item_id '{item_id}' not found" }, status=status.HTTP_404_NOT_FOUND) if attribute_name: # Delete specific attribute deleted_count, _ = ProductAttributeValue.objects.filter( product=product, attribute_name=attribute_name ).delete() if deleted_count == 0: return Response({ "error": f"Attribute '{attribute_name}' not found for product '{item_id}'" }, status=status.HTTP_404_NOT_FOUND) return Response({ "message": f"Attribute '{attribute_name}' deleted successfully" }, status=status.HTTP_200_OK) else: # Delete all attributes for product deleted_count, _ = ProductAttributeValue.objects.filter(product=product).delete() return Response({ "message": f"Deleted {deleted_count} attribute(s) for product '{item_id}'" }, status=status.HTTP_200_OK) class BulkProductAttributeValueView(APIView): """ API for bulk operations on original attribute values. POST: Create/update multiple attribute values at once """ def post(self, request): """ Bulk create or update attribute values for multiple products. Expected payload: { "products": [ { "item_id": "3217373735", "attributes": { "Clothing Neck Style": "V-Neck Square", "Condition": "New with tags" } }, { "item_id": "1234567890", "attributes": { "Material": "Cotton", "Size": "L" } } ] } """ products_data = request.data.get('products', []) if not products_data: return Response({ "error": "products list is required" }, status=status.HTTP_400_BAD_REQUEST) results = [] successful = 0 failed = 0 with transaction.atomic(): for product_data in products_data: serializer = BulkProductAttributeValueSerializer(data=product_data) if not serializer.is_valid(): failed += 1 results.append({ "item_id": product_data.get('item_id'), "status": "failed", "error": serializer.errors }) continue validated_data = serializer.validated_data item_id = validated_data['item_id'] attributes = validated_data['attributes'] try: product = Product.objects.get(item_id=item_id) created_count = 0 updated_count = 0 for attr_name, original_value in attributes.items(): _, created = ProductAttributeValue.objects.update_or_create( product=product, attribute_name=attr_name, defaults={'original_value': original_value} ) if created: created_count += 1 else: updated_count += 1 successful += 1 results.append({ "item_id": item_id, "status": "success", "created": created_count, "updated": updated_count }) except Product.DoesNotExist: failed += 1 results.append({ "item_id": item_id, "status": "failed", "error": f"Product not found" }) return Response({ "results": results, "total_products": len(products_data), "successful": successful, "failed": failed }, status=status.HTTP_200_OK) class ProductListWithAttributesView(APIView): """ GET API to list all products with their original attribute values. """ def get(self, request): item_id = request.query_params.get('item_id') if item_id: try: product = Product.objects.get(item_id=item_id) serializer = ProductWithAttributesSerializer(product) return Response(serializer.data, status=status.HTTP_200_OK) except Product.DoesNotExist: return Response({ "error": f"Product with item_id '{item_id}' not found" }, status=status.HTTP_404_NOT_FOUND) else: products = Product.objects.all() serializer = ProductWithAttributesSerializer(products, many=True) return Response(serializer.data, status=status.HTTP_200_OK)