# # #==================== views.py ==================== # # from rest_framework.views import APIView # # from rest_framework.response import Response # # from rest_framework import status # # from .serializers import ( # # ProductAttributeRequestSerializer, # # ProductAttributeResponseSerializer # # ) # # from .services import ProductAttributeService # # class ExtractProductAttributesView(APIView): # # """ # # API endpoint to extract product attributes using Groq LLM. # # POST /api/extract-attributes/ # # Request Body: # # { # # "title": "Product title (optional)", # # "short_desc": "Short description (optional)", # # "long_desc": "Long description (optional)", # # "mandatory_attrs": { # # "Attribute1": ["value1", "value2", "value3"], # # "Attribute2": ["valueA", "valueB"] # # }, # # "model": "llama-3.1-8b-instant (optional)", # # "extract_additional": true (optional, default: true) # # } # # Response: # # { # # "mandatory": { # # "Attribute1": "value1", # # "Attribute2": "valueA" # # }, # # "additional": { # # "Color": "Blue", # # "Brand": "Example" # # } # # } # # """ # # def post(self, request): # # # Validate request data # # serializer = ProductAttributeRequestSerializer(data=request.data) # # if not serializer.is_valid(): # # return Response( # # {"error": serializer.errors}, # # status=status.HTTP_400_BAD_REQUEST # # ) # # validated_data = serializer.validated_data # # # Combine product text # # product_text = ProductAttributeService.combine_product_text( # # title=validated_data.get('title'), # # short_desc=validated_data.get('short_desc'), # # long_desc=validated_data.get('long_desc') # # ) # # # Extract attributes # # result = ProductAttributeService.extract_attributes( # # product_text=product_text, # # mandatory_attrs=validated_data['mandatory_attrs'], # # model=validated_data.get('model'), # # extract_additional=validated_data.get('extract_additional', True) # # ) # # # Return response # # response_serializer = ProductAttributeResponseSerializer(data=result) # # if response_serializer.is_valid(): # # return Response(response_serializer.data, status=status.HTTP_200_OK) # # return Response(result, status=status.HTTP_200_OK) # from rest_framework.views import APIView # from rest_framework.response import Response # from rest_framework import status # from .serializers import ( # SingleProductRequestSerializer, # BatchProductRequestSerializer, # ProductAttributeResultSerializer, # BatchProductResponseSerializer # ) # from .services import ProductAttributeService # class ExtractProductAttributesView(APIView): # """ # API endpoint to extract product attributes for a single product. # POST /api/extract-attributes/ # Request Body: # { # "title": "Product title (optional)", # "short_desc": "Short description (optional)", # "long_desc": "Long description (optional)", # "mandatory_attrs": { # "Attribute1": ["value1", "value2", "value3"], # "Attribute2": ["valueA", "valueB"] # }, # "model": "llama-3.1-8b-instant (optional)", # "extract_additional": true (optional, default: true) # } # """ # def post(self, request): # serializer = SingleProductRequestSerializer(data=request.data) # if not serializer.is_valid(): # return Response( # {"error": serializer.errors}, # status=status.HTTP_400_BAD_REQUEST # ) # validated_data = serializer.validated_data # product_text = ProductAttributeService.combine_product_text( # title=validated_data.get('title'), # short_desc=validated_data.get('short_desc'), # long_desc=validated_data.get('long_desc') # ) # result = ProductAttributeService.extract_attributes( # product_text=product_text, # mandatory_attrs=validated_data['mandatory_attrs'], # model=validated_data.get('model'), # extract_additional=validated_data.get('extract_additional', True) # ) # response_serializer = ProductAttributeResultSerializer(data=result) # if response_serializer.is_valid(): # return Response(response_serializer.data, status=status.HTTP_200_OK) # return Response(result, status=status.HTTP_200_OK) # class BatchExtractProductAttributesView(APIView): # """ # API endpoint to extract product attributes for multiple products in batch. # POST /api/batch-extract-attributes/ # Request Body: # { # "products": [ # { # "product_id": "prod_001", # "title": "Product 1 title", # "short_desc": "Short description", # "long_desc": "Long description" # }, # { # "product_id": "prod_002", # "title": "Product 2 title", # "short_desc": "Short description" # } # ], # "mandatory_attrs": { # "Attribute1": ["value1", "value2", "value3"], # "Attribute2": ["valueA", "valueB"] # }, # "model": "llama-3.1-8b-instant (optional)", # "extract_additional": true (optional, default: true) # } # Response: # { # "results": [ # { # "product_id": "prod_001", # "mandatory": {...}, # "additional": {...} # }, # { # "product_id": "prod_002", # "mandatory": {...}, # "additional": {...} # } # ], # "total_products": 2, # "successful": 2, # "failed": 0 # } # """ # def post(self, request): # serializer = BatchProductRequestSerializer(data=request.data) # if not serializer.is_valid(): # return Response( # {"error": serializer.errors}, # status=status.HTTP_400_BAD_REQUEST # ) # validated_data = serializer.validated_data # # Extract attributes for all products in batch # result = ProductAttributeService.extract_attributes_batch( # products=validated_data['products'], # mandatory_attrs=validated_data['mandatory_attrs'], # model=validated_data.get('model'), # extract_additional=validated_data.get('extract_additional', True) # ) # response_serializer = BatchProductResponseSerializer(data=result) # if response_serializer.is_valid(): # return Response(response_serializer.data, status=status.HTTP_200_OK) # return Response(result, status=status.HTTP_200_OK) # ==================== views.py ==================== from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import status from .serializers import ( SingleProductRequestSerializer, BatchProductRequestSerializer, ProductAttributeResultSerializer, BatchProductResponseSerializer ) from .services import ProductAttributeService from .ocr_service import OCRService from .models import Product class ExtractProductAttributesView(APIView): """ API endpoint to extract product attributes for a single product by item_id. Fetches product details from database. """ def post(self, request): serializer = SingleProductRequestSerializer(data=request.data) if not serializer.is_valid(): return Response({"error": serializer.errors}, status=status.HTTP_400_BAD_REQUEST) validated_data = serializer.validated_data item_id = validated_data.get("item_id") # Fetch product from DB try: product = Product.objects.get(item_id=item_id) except Product.DoesNotExist: return Response( {"error": f"Product with item_id '{item_id}' not found."}, status=status.HTTP_404_NOT_FOUND ) # Extract product details title = product.product_name short_desc = product.product_short_description long_desc = product.product_long_description image_url = product.image_path # Process image for OCR if required ocr_results = None ocr_text = None if validated_data.get("process_image", True) and image_url: ocr_service = OCRService() ocr_results = ocr_service.process_image(image_url) if ocr_results and ocr_results.get("detected_text"): ocr_attrs = ProductAttributeService.extract_attributes_from_ocr( ocr_results, validated_data.get("model") ) ocr_results["extracted_attributes"] = ocr_attrs ocr_text = "\n".join([ f"{item['text']} (confidence: {item['confidence']:.2f})" for item in ocr_results["detected_text"] ]) # Combine all product text product_text = ProductAttributeService.combine_product_text( title=title, short_desc=short_desc, long_desc=long_desc, ocr_text=ocr_text ) # Extract attributes result = ProductAttributeService.extract_attributes( product_text=product_text, mandatory_attrs=validated_data["mandatory_attrs"], model=validated_data.get("model"), extract_additional=validated_data.get("extract_additional", True) ) # Attach OCR results if available if ocr_results: result["ocr_results"] = ocr_results response_serializer = ProductAttributeResultSerializer(data=result) if response_serializer.is_valid(): return Response(response_serializer.data, status=status.HTTP_200_OK) return Response(result, status=status.HTTP_200_OK) from .models import Product # class BatchExtractProductAttributesView(APIView): # """ # API endpoint to extract product attributes for multiple products in batch by item_id. # Fetches all product details from database automatically. # """ # def post(self, request): # serializer = BatchProductRequestSerializer(data=request.data) # if not serializer.is_valid(): # return Response({"error": serializer.errors}, status=status.HTTP_400_BAD_REQUEST) # validated_data = serializer.validated_data # item_ids = validated_data.get("item_ids", []) # model = validated_data.get("model") # extract_additional = validated_data.get("extract_additional", True) # process_image = validated_data.get("process_image", True) # mandatory_attrs = validated_data["mandatory_attrs"] # # Fetch all products in one query # products = Product.objects.filter(item_id__in=item_ids) # found_ids = set(products.values_list("item_id", flat=True)) # missing_ids = [pid for pid in item_ids if pid not in found_ids] # results = [] # successful = 0 # failed = 0 # for product in products: # try: # title = product.product_name # short_desc = product.product_short_description # long_desc = product.product_long_description # image_url = product.image_path # ocr_results = None # ocr_text = None # if process_image and image_url: # ocr_service = OCRService() # ocr_results = ocr_service.process_image(image_url) # if ocr_results and ocr_results.get("detected_text"): # ocr_attrs = ProductAttributeService.extract_attributes_from_ocr( # ocr_results, model # ) # ocr_results["extracted_attributes"] = ocr_attrs # ocr_text = "\n".join([ # f"{item['text']} (confidence: {item['confidence']:.2f})" # for item in ocr_results["detected_text"] # ]) # product_text = ProductAttributeService.combine_product_text( # title=title, # short_desc=short_desc, # long_desc=long_desc, # ocr_text=ocr_text # ) # extracted = ProductAttributeService.extract_attributes( # product_text=product_text, # mandatory_attrs=mandatory_attrs, # model=model, # extract_additional=extract_additional # ) # result = { # "product_id": product.item_id, # "mandatory": extracted.get("mandatory", {}), # "additional": extracted.get("additional", {}), # } # if ocr_results: # result["ocr_results"] = ocr_results # results.append(result) # successful += 1 # except Exception as e: # failed += 1 # results.append({ # "product_id": product.item_id, # "error": str(e) # }) # # Add missing item_ids as failed entries # for mid in missing_ids: # failed += 1 # results.append({ # "product_id": mid, # "error": "Product not found in database" # }) # batch_result = { # "results": results, # "total_products": len(item_ids), # "successful": successful, # "failed": failed # } # response_serializer = BatchProductResponseSerializer(data=batch_result) # if response_serializer.is_valid(): # return Response(response_serializer.data, status=status.HTTP_200_OK) # return Response(batch_result, status=status.HTTP_200_OK) class BatchExtractProductAttributesView(APIView): """ API endpoint to extract product attributes for multiple products in batch. Uses item-specific mandatory_attrs. """ def post(self, request): serializer = BatchProductRequestSerializer(data=request.data) if not serializer.is_valid(): return Response({"error": serializer.errors}, status=status.HTTP_400_BAD_REQUEST) validated_data = serializer.validated_data # Get batch-level settings product_list = validated_data.get("products", []) # New: list of {item_id, mandatory_attrs} model = validated_data.get("model") extract_additional = validated_data.get("extract_additional", True) process_image = validated_data.get("process_image", True) # Extract all item_ids to query the database efficiently item_ids = [p['item_id'] for p in product_list] # Fetch all products in one query products_queryset = Product.objects.filter(item_id__in=item_ids) # Create a dictionary for easy lookup: item_id -> Product object product_map = {product.item_id: product for product in products_queryset} found_ids = set(product_map.keys()) results = [] successful = 0 failed = 0 for product_entry in product_list: item_id = product_entry['item_id'] # Get item-specific mandatory attributes mandatory_attrs = product_entry['mandatory_attrs'] if item_id not in found_ids: failed += 1 results.append({ "product_id": item_id, "error": "Product not found in database" }) continue # Skip to the next product product = product_map[item_id] try: title = product.product_name short_desc = product.product_short_description long_desc = product.product_long_description image_url = product.image_path ocr_results = None ocr_text = None # Image Processing Logic (same as before) if process_image and image_url: ocr_service = OCRService() ocr_results = ocr_service.process_image(image_url) if ocr_results and ocr_results.get("detected_text"): # Ensure the services are designed to handle 'mandatory_attrs' # for attribute extraction from OCR text ocr_attrs = ProductAttributeService.extract_attributes_from_ocr( ocr_results, model ) ocr_results["extracted_attributes"] = ocr_attrs ocr_text = "\n".join([ f"{item['text']} (confidence: {item['confidence']:.2f})" for item in ocr_results["detected_text"] ]) product_text = ProductAttributeService.combine_product_text( title=title, short_desc=short_desc, long_desc=long_desc, ocr_text=ocr_text ) # Attribute Extraction Logic - NOW USING ITEM-SPECIFIC mandatory_attrs extracted = ProductAttributeService.extract_attributes( product_text=product_text, mandatory_attrs=mandatory_attrs, # <--- Changed: now item-specific model=model, extract_additional=extract_additional ) result = { "product_id": product.item_id, "mandatory": extracted.get("mandatory", {}), "additional": extracted.get("additional", {}), } if ocr_results: result["ocr_results"] = ocr_results results.append(result) successful += 1 except Exception as e: failed += 1 results.append({ "product_id": item_id, "error": str(e) }) # No need for a separate missing_ids loop since we handle it when iterating over product_list # The list comprehension `item_ids = [p['item_id'] for p in product_list]` and the check # `if item_id not in found_ids:` now correctly handle missing products from the input list. batch_result = { "results": results, "total_products": len(product_list), "successful": successful, "failed": failed } response_serializer = BatchProductResponseSerializer(data=batch_result) if response_serializer.is_valid(): return Response(response_serializer.data, status=status.HTTP_200_OK) return Response(batch_result, status=status.HTTP_200_OK) from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import status from .models import Product from .serializers import ProductSerializer class ProductListView(APIView): """ GET API to list all products with details """ def get(self, request): products = Product.objects.all() serializer = ProductSerializer(products, many=True) return Response(serializer.data, status=status.HTTP_200_OK) import pandas as pd from rest_framework.parsers import MultiPartParser, FormParser from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import status from .models import Product from .serializers import ProductSerializer class ProductUploadExcelView(APIView): """ POST API to upload an Excel file and add data to Product model (skip duplicates) """ parser_classes = (MultiPartParser, FormParser) def post(self, request, *args, **kwargs): file_obj = request.FILES.get('file') if not file_obj: return Response({'error': 'No file provided'}, status=status.HTTP_400_BAD_REQUEST) try: import pandas as pd df = pd.read_excel(file_obj) df.columns = [c.strip().lower().replace(' ', '_') for c in df.columns] expected_cols = { 'item_id', 'product_name', 'product_long_description', 'product_short_description', 'product_type', 'image_path' } if not expected_cols.issubset(df.columns): return Response({ 'error': 'Missing required columns', 'required_columns': list(expected_cols) }, status=status.HTTP_400_BAD_REQUEST) created_count = 0 skipped_count = 0 for _, row in df.iterrows(): item_id = row.get('item_id', '') # Check if this item already exists if Product.objects.filter(item_id=item_id).exists(): skipped_count += 1 continue Product.objects.create( item_id=item_id, product_name=row.get('product_name', ''), product_long_description=row.get('product_long_description', ''), product_short_description=row.get('product_short_description', ''), product_type=row.get('product_type', ''), image_path=row.get('image_path', ''), ) created_count += 1 return Response({ 'message': f'Successfully uploaded {created_count} products.', 'skipped': f'Skipped {skipped_count} duplicates.' }, status=status.HTTP_201_CREATED) except Exception as e: return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) import pandas as pd from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import status from rest_framework.parsers import MultiPartParser, FormParser from .models import ProductType, ProductAttribute, AttributePossibleValue class ProductAttributesUploadView(APIView): """ POST API to upload an Excel file and add mandatory/additional attributes for product types with possible values. """ parser_classes = (MultiPartParser, FormParser) def post(self, request): file_obj = request.FILES.get('file') if not file_obj: return Response({"error": "No file provided."}, status=status.HTTP_400_BAD_REQUEST) try: df = pd.read_excel(file_obj) required_columns = {'product_type', 'attribute_name', 'is_mandatory', 'possible_values'} if not required_columns.issubset(df.columns): return Response({ "error": f"Missing required columns. Found: {list(df.columns)}" }, status=status.HTTP_400_BAD_REQUEST) for _, row in df.iterrows(): product_type_name = str(row['product_type']).strip() attr_name = str(row['attribute_name']).strip() is_mandatory = str(row['is_mandatory']).strip().lower() in ['yes', 'true', '1'] possible_values = str(row.get('possible_values', '')).strip() # Get or create product type product_type, _ = ProductType.objects.get_or_create(name=product_type_name) # Get or create attribute attribute, _ = ProductAttribute.objects.get_or_create( product_type=product_type, name=attr_name, defaults={'is_mandatory': is_mandatory} ) attribute.is_mandatory = is_mandatory attribute.save() # Handle possible values AttributePossibleValue.objects.filter(attribute=attribute).delete() if possible_values: for val in [v.strip() for v in possible_values.split(',') if v.strip()]: AttributePossibleValue.objects.create(attribute=attribute, value=val) return Response({"message": "Attributes uploaded successfully."}, status=status.HTTP_201_CREATED) except Exception as e: return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import status from .models import ProductType, ProductAttribute, AttributePossibleValue from .serializers import ProductTypeSerializer, ProductAttributeSerializer, AttributePossibleValueSerializer from django.db import transaction class ProductTypeAttributesView(APIView): """ API to view, create, update, and delete product type attributes and their possible values. Also supports dynamic product type creation. """ def get(self, request): """ Retrieve all product types with their attributes and possible values. """ product_types = ProductType.objects.all() serializer = ProductTypeSerializer(product_types, many=True) # Transform the serialized data into the requested format result = [] for pt in serializer.data: for attr in pt['attributes']: result.append({ 'product_type': pt['name'], 'attribute_name': attr['name'], 'is_mandatory': 'Yes' if attr['is_mandatory'] else 'No', 'possible_values': ', '.join([pv['value'] for pv in attr['possible_values']]) }) return Response(result, status=status.HTTP_200_OK) def post(self, request): """ Create a new product type or attribute with possible values. Expected payload example: { "product_type": "Hardware Screws", "attribute_name": "Material", // Optional if only creating product type "is_mandatory": "Yes", // Optional if only creating product type "possible_values": "Steel, Zinc Plated, Stainless Steel" // Optional } """ try: product_type_name = request.data.get('product_type') attribute_name = request.data.get('attribute_name', '') is_mandatory = request.data.get('is_mandatory', '').lower() in ['yes', 'true', '1'] possible_values = request.data.get('possible_values', '') if not product_type_name: return Response({ "error": "product_type is required" }, status=status.HTTP_400_BAD_REQUEST) with transaction.atomic(): # Get or create product type product_type, created = ProductType.objects.get_or_create(name=product_type_name) if created and not attribute_name: # Only product type was created return Response({ "message": f"Product type '{product_type_name}' created successfully", "data": {"product_type": product_type_name} }, status=status.HTTP_201_CREATED) if attribute_name: # Create attribute attribute, attr_created = ProductAttribute.objects.get_or_create( product_type=product_type, name=attribute_name, defaults={'is_mandatory': is_mandatory} ) if not attr_created: return Response({ "error": f"Attribute '{attribute_name}' already exists for product type '{product_type_name}'" }, status=status.HTTP_400_BAD_REQUEST) # Handle possible values if possible_values: for val in [v.strip() for v in possible_values.split(',') if v.strip()]: AttributePossibleValue.objects.create(attribute=attribute, value=val) return Response({ "message": "Attribute created successfully", "data": { "product_type": product_type_name, "attribute_name": attribute_name, "is_mandatory": "Yes" if is_mandatory else "No", "possible_values": possible_values } }, status=status.HTTP_201_CREATED) return Response({ "message": f"Product type '{product_type_name}' already exists", "data": {"product_type": product_type_name} }, status=status.HTTP_200_OK) except Exception as e: return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) def put(self, request): """ Update an existing product type attribute and its possible values. Expected payload example: { "product_type": "Hardware Screws", "attribute_name": "Material", "is_mandatory": "Yes", "possible_values": "Steel, Zinc Plated, Stainless Steel, Brass" } """ try: product_type_name = request.data.get('product_type') attribute_name = request.data.get('attribute_name') is_mandatory = request.data.get('is_mandatory', '').lower() in ['yes', 'true', '1'] possible_values = request.data.get('possible_values', '') if not all([product_type_name, attribute_name]): return Response({ "error": "product_type and attribute_name are required" }, status=status.HTTP_400_BAD_REQUEST) with transaction.atomic(): try: product_type = ProductType.objects.get(name=product_type_name) attribute = ProductAttribute.objects.get( product_type=product_type, name=attribute_name ) except ProductType.DoesNotExist: return Response({ "error": f"Product type '{product_type_name}' not found" }, status=status.HTTP_404_NOT_FOUND) except ProductAttribute.DoesNotExist: return Response({ "error": f"Attribute '{attribute_name}' not found for product type '{product_type_name}'" }, status=status.HTTP_404_NOT_FOUND) # Update attribute attribute.is_mandatory = is_mandatory attribute.save() # Update possible values AttributePossibleValue.objects.filter(attribute=attribute).delete() if possible_values: for val in [v.strip() for v in possible_values.split(',') if v.strip()]: AttributePossibleValue.objects.create(attribute=attribute, value=val) return Response({ "message": "Attribute updated successfully", "data": { "product_type": product_type_name, "attribute_name": attribute_name, "is_mandatory": "Yes" if is_mandatory else "No", "possible_values": possible_values } }, status=status.HTTP_200_OK) except Exception as e: return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) def delete(self, request): """ Delete a product type or a specific attribute. Expected payload example: { "product_type": "Hardware Screws", "attribute_name": "Material" // Optional: if omitted, deletes entire product type } """ try: product_type_name = request.data.get('product_type') attribute_name = request.data.get('attribute_name', '') if not product_type_name: return Response({ "error": "product_type is required" }, status=status.HTTP_400_BAD_REQUEST) with transaction.atomic(): try: product_type = ProductType.objects.get(name=product_type_name) except ProductType.DoesNotExist: return Response({ "error": f"Product type '{product_type_name}' not found" }, status=status.HTTP_404_NOT_FOUND) if attribute_name: # Delete specific attribute try: attribute = ProductAttribute.objects.get( product_type=product_type, name=attribute_name ) attribute.delete() return Response({ "message": f"Attribute '{attribute_name}' deleted successfully from product type '{product_type_name}'" }, status=status.HTTP_200_OK) except ProductAttribute.DoesNotExist: return Response({ "error": f"Attribute '{attribute_name}' not found for product type '{product_type_name}'" }, status=status.HTTP_404_NOT_FOUND) else: # Delete entire product type (and its attributes and possible values) product_type.delete() return Response({ "message": f"Product type '{product_type_name}' and all its attributes deleted successfully" }, status=status.HTTP_200_OK) except Exception as e: return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) # views.py from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import status from .models import ProductType from .serializers import ProductTypeSerializer from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import status from .models import ProductType class ProductTypeListView(APIView): """ GET API to list all product types (only names). """ def get(self, request): product_types = ProductType.objects.values_list('name', flat=True) return Response({"product_types": list(product_types)}, status=status.HTTP_200_OK)