# views.py (FULLY FIXED with comprehensive logging) from django.shortcuts import render, get_object_or_404 from django.http import HttpResponse, JsonResponse from django.views import View from django.core.cache import cache from django.db.models import Q from django.views.decorators.csrf import csrf_exempt from django.utils.decorators import method_decorator from django.conf import settings import json import logging # logging.basicConfig( # level=logging.INFO, # or DEBUG if you want more detail # format="%(asctime)s [%(levelname)s] %(name)s: %(message)s" # ) import os from core.services.image_scorer import ImageQualityScorer from core.models import AttributeScore, CategoryAttributeRule, ProductContentRule, Product from core.services.attribute_scorer import AttributeQualityScorer logger = logging.getLogger(__name__) def categorize_issues_and_suggestions(issues, suggestions): """Categorize issues and suggestions by component""" categorized = { 'attributes': {'issues': [], 'suggestions': []}, 'title': {'issues': [], 'suggestions': []}, 'description': {'issues': [], 'suggestions': []}, 'seo': {'issues': [], 'suggestions': []}, 'content_rules': {'issues': [], 'suggestions': []}, 'images': {'issues': [], 'suggestions': []}, 'general': {'issues': [], 'suggestions': []} } for issue in issues: issue_lower = issue.lower() if issue.startswith('Title:'): categorized['title']['issues'].append(issue.replace('Title:', '').strip()) elif issue.startswith('Description:'): categorized['description']['issues'].append(issue.replace('Description:', '').strip()) elif issue.startswith('SEO:'): categorized['seo']['issues'].append(issue.replace('SEO:', '').strip()) elif issue.startswith('Image:'): categorized['images']['issues'].append(issue.replace('Image:', '').strip()) elif any(keyword in issue_lower for keyword in ['image', 'photo', 'picture', 'resolution', 'blur', 'background']): categorized['images']['issues'].append(issue) elif any(field in issue_lower for field in ['seo_title', 'seo title', 'seo_description', 'seo description']): categorized['seo']['issues'].append(issue) elif 'content rule' in issue_lower: categorized['content_rules']['issues'].append(issue) elif any(keyword in issue_lower for keyword in ['mandatory field', 'attribute', 'valid values', 'placeholder', 'standardiz']): categorized['attributes']['issues'].append(issue) else: categorized['general']['issues'].append(issue) for suggestion in suggestions: suggestion_lower = suggestion.lower() if any(prefix in suggestion_lower for prefix in ['title:', 'expand title', 'shorten title', 'add brand name']): categorized['title']['suggestions'].append(suggestion) elif any(prefix in suggestion_lower for prefix in ['description:', 'expand description', 'write comprehensive', 'add features']): categorized['description']['suggestions'].append(suggestion) elif any(prefix in suggestion_lower for prefix in ['seo:', 'improve seo', 'add keywords', 'search', 'discoverability']): categorized['seo']['suggestions'].append(suggestion) elif any(keyword in suggestion_lower for keyword in ['image', 'photo', 'resolution', 'compress', 'background', 'white background']): categorized['images']['suggestions'].append(suggestion) elif any(field in suggestion_lower for field in ['seo_title', 'seo title', 'seo_description', 'seo description']): categorized['seo']['suggestions'].append(suggestion) elif 'content rule' in suggestion_lower: categorized['content_rules']['suggestions'].append(suggestion) elif any(keyword in suggestion_lower for keyword in ['add required', 'add mandatory', 'provide a', 'attribute', 'standardize']): categorized['attributes']['suggestions'].append(suggestion) else: categorized['general']['suggestions'].append(suggestion) return {k: v for k, v in categorized.items() if v['issues'] or v['suggestions']} def resolve_image_path(image_rel_path): """ Resolve relative image path to full absolute path with comprehensive logging """ logger.info(f"[IMAGE PATH] Starting resolution for: {image_rel_path}") if not image_rel_path: logger.warning("[IMAGE PATH] No image path provided") return None # Log settings logger.info(f"[IMAGE PATH] MEDIA_ROOT: {settings.MEDIA_ROOT}") logger.info(f"[IMAGE PATH] MEDIA_URL: {settings.MEDIA_URL}") # Remove MEDIA_URL prefix if present cleaned_path = image_rel_path.replace(settings.MEDIA_URL, "").lstrip("/\\") logger.info(f"[IMAGE PATH] Cleaned path: {cleaned_path}") # Build full path full_path = os.path.join(settings.MEDIA_ROOT, cleaned_path) logger.info(f"[IMAGE PATH] Full path before normalization: {full_path}") # Normalize path (resolve .., ., etc.) full_path = os.path.normpath(full_path) logger.info(f"[IMAGE PATH] Normalized path: {full_path}") # Security check media_root_normalized = os.path.normpath(settings.MEDIA_ROOT) logger.info(f"[IMAGE PATH] Normalized MEDIA_ROOT: {media_root_normalized}") if not full_path.startswith(media_root_normalized): logger.error(f"[IMAGE PATH] SECURITY: Path outside MEDIA_ROOT! Path: {full_path}, MEDIA_ROOT: {media_root_normalized}") return None # Check if file exists if not os.path.exists(full_path): logger.error(f"[IMAGE PATH] FILE NOT FOUND: {full_path}") # List directory contents for debugging dir_path = os.path.dirname(full_path) if os.path.exists(dir_path): files = os.listdir(dir_path) logger.info(f"[IMAGE PATH] Files in {dir_path}: {files}") else: logger.error(f"[IMAGE PATH] Directory does not exist: {dir_path}") return None logger.info(f"[IMAGE PATH] ✓ Image file found and validated: {full_path}") return full_path @method_decorator(csrf_exempt, name='dispatch') class AttributeScoreView(View): """Enhanced API view with Image scoring support""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.scorer = AttributeQualityScorer(use_ai=True) self.image_scorer = ImageQualityScorer(use_ai=True) logger.info("[INIT] AttributeScoreView initialized with image scorer") def post(self, request, *args, **kwargs): """Score a single product with image scoring""" logger.info("[POST] ========== NEW SCORING REQUEST ==========") try: data = json.loads(request.body) logger.info(f"[POST] Request data keys: {data.keys()}") product_data = data.get('product', {}) logger.info(f"[POST] Product data keys: {product_data.keys()}") sku = product_data.get('sku') use_ai = data.get('use_ai', True) if not sku: logger.error("[POST] SKU is missing") return JsonResponse({'error': 'SKU is required'}, status=400) logger.info(f"[POST] Processing SKU: {sku}") category = product_data.get('category', '') if not category: logger.error(f"[POST] Category missing for SKU: {sku}") return JsonResponse({'error': 'Category is required'}, status=400) logger.info(f"[POST] Category: {category}") # Get or create product product, created = Product.objects.get_or_create( sku=sku, defaults={ 'title': product_data.get('title', ''), 'description': product_data.get('description', ''), 'short_description': product_data.get('short_description', ''), 'seo_title': product_data.get('seo_title', ''), 'seo_description': product_data.get('seo_description', ''), 'category': category, 'attributes': product_data.get('attributes', {}) } ) logger.info(f"[POST] Product {'created' if created else 'retrieved'}: {sku}") if not created: product.title = product_data.get('title', product.title) product.description = product_data.get('description', product.description) product.short_description = product_data.get('short_description', product.short_description) product.seo_title = product_data.get('seo_title', product.seo_title) product.seo_description = product_data.get('seo_description', product.seo_description) product.attributes = product_data.get('attributes', product.attributes) product.save() logger.info(f"[POST] Product updated: {sku}") # Get rules cache_key = f"category_rules_{category}" category_rules = cache.get(cache_key) if category_rules is None: category_rules = list(CategoryAttributeRule.objects.filter(category=category).values()) cache.set(cache_key, category_rules, 3600) logger.info(f"[POST] Loaded {len(category_rules)} category rules for {category}") if not category_rules: logger.error(f"[POST] No attribute rules for category: {category}") return JsonResponse({'error': f'No attribute rules defined for {category}'}, status=400) # Get content rules content_cache_key = f"content_rules_{category}" content_rules = cache.get(content_cache_key) if content_rules is None: content_rules = list( ProductContentRule.objects.filter( Q(category__isnull=True) | Q(category=category) ).values() ) cache.set(content_cache_key, content_rules, 3600) logger.info(f"[POST] Loaded {len(content_rules)} content rules for {category}") # Build product dict product_dict = { 'sku': product.sku, 'category': product.category, 'title': product.title, 'description': product.description, 'short_description': product.short_description, 'seo_title': product.seo_title, 'seo_description': product.seo_description, 'attributes': product.attributes } logger.info("[POST] Starting attribute and content scoring...") # Score product attributes and content score_result = self.scorer.score_product( product_dict, category_rules, content_rules=content_rules, generate_ai_suggestions=use_ai ) logger.info(f"[POST] Attribute scoring complete. Score: {score_result.get('final_score')}") # ========== IMAGE SCORING ========== logger.info("[POST] ========== STARTING IMAGE SCORING ==========") image_score_result = None # Check for image_path in product_data image_rel_path = product_data.get('image_path') logger.info(f"[POST] Image path from payload: {image_rel_path}") if image_rel_path: image_path = resolve_image_path(image_rel_path) if image_path: try: logger.info(f"[POST] Calling image scorer for: {image_path}") image_score_result = self.image_scorer.score_image( product_dict, image_path=image_path ) logger.info(f"[POST] ✓ Image scoring complete. Score: {image_score_result.get('image_score')}") logger.info(f"[POST] Image issues: {len(image_score_result.get('issues', []))}") logger.info(f"[POST] Image suggestions: {len(image_score_result.get('suggestions', []))}") except Exception as img_err: logger.error(f"[POST] ✗ Image scoring FAILED: {img_err}", exc_info=True) image_score_result = { 'image_score': None, 'breakdown': {}, 'issues': [f'Image scoring error: {str(img_err)}'], 'suggestions': ['Check image file format and accessibility'], 'image_metadata': {} } else: logger.warning(f"[POST] Image path resolution failed for: {image_rel_path}") image_score_result = { 'image_score': None, 'breakdown': {}, 'issues': ['Image file not found or inaccessible'], 'suggestions': [f'Verify image exists at path: {image_rel_path}'], 'image_metadata': {} } else: logger.warning("[POST] No image_path provided in payload") image_score_result = { 'image_score': None, 'breakdown': {}, 'issues': ['No image provided'], 'suggestions': ['Upload a product image to improve quality score'], 'image_metadata': {} } logger.info("[POST] ========== IMAGE SCORING COMPLETE ==========") # Combine issues and suggestions all_issues = score_result['issues'] + (image_score_result.get('issues', []) if image_score_result else []) all_suggestions = score_result['suggestions'] + (image_score_result.get('suggestions', []) if image_score_result else []) logger.info(f"[POST] Total issues: {len(all_issues)} (Attributes: {len(score_result['issues'])}, Images: {len(image_score_result.get('issues', []))})") logger.info(f"[POST] Total suggestions: {len(all_suggestions)}") # Categorize feedback categorized = categorize_issues_and_suggestions(all_issues, all_suggestions) logger.info(f"[POST] Categorized feedback into {len(categorized)} categories") # Save score AttributeScore.objects.create( product=product, score=score_result['final_score'], max_score=score_result['max_score'], details={ **score_result['breakdown'], 'image_score': image_score_result.get('image_score') if image_score_result else None, 'image_breakdown': image_score_result.get('breakdown', {}) if image_score_result else {} }, issues=all_issues, suggestions=all_suggestions, ai_suggestions=score_result.get('ai_suggestions', {}), processing_time=score_result.get('processing_time', 0) ) logger.info(f"[POST] Score saved to database for SKU: {sku}") # Enhanced response response_data = { 'success': True, 'product_sku': sku, 'created': created, 'score_result': { 'final_score': score_result['final_score'], 'max_score': score_result['max_score'], 'breakdown': score_result['breakdown'], 'image_score': image_score_result.get('image_score') if image_score_result else None, 'image_breakdown': image_score_result.get('breakdown', {}) if image_score_result else {}, 'image_metadata': image_score_result.get('image_metadata', {}) if image_score_result else {}, 'categorized_feedback': categorized, 'ai_suggestions': { 'content': score_result.get('ai_suggestions', {}), 'image': image_score_result.get('ai_improvements') if image_score_result else None }, 'processing_time': score_result.get('processing_time', 0), 'issues': all_issues, 'suggestions': all_suggestions } } logger.info(f"[POST] ========== REQUEST COMPLETE: {sku} ==========") return JsonResponse(response_data) except json.JSONDecodeError as e: logger.error(f"[POST] JSON decode error: {e}", exc_info=True) return JsonResponse({'error': 'Invalid JSON'}, status=400) except Exception as e: logger.error(f"[POST] Unexpected error: {str(e)}", exc_info=True) return JsonResponse({'error': str(e)}, status=500) @method_decorator(csrf_exempt, name='dispatch') class BatchScoreView(View): """Batch scoring with image support""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.attribute_scorer = AttributeQualityScorer(use_ai=True) self.image_scorer = ImageQualityScorer(use_ai=True) logger.info("[INIT] BatchScoreView initialized") def get(self, request): """ Automatically score all products from the database (no payload needed). """ logger.info("[BATCH][GET] ========== AUTO BATCH SCORING STARTED ==========") try: # Fetch all products products = list(Product.objects.all().values()) logger.info(f"[BATCH][GET] Found {len(products)} products in DB") if not products: return JsonResponse({'error': 'No products found in the database.'}, status=404) results = [] errors = [] for idx, product_data in enumerate(products, 1): sku = product_data.get('sku') category = product_data.get('category') logger.info(f"[BATCH][GET] [{idx}/{len(products)}] Processing: {sku}") if not sku or not category: errors.append({'sku': sku, 'error': 'Missing SKU or category'}) continue try: # Load rules category_rules = list(CategoryAttributeRule.objects.filter(category=category).values()) if not category_rules: errors.append({'sku': sku, 'error': f'No attribute rules for category {category}'}) continue content_rules = list( ProductContentRule.objects.filter( Q(category__isnull=True) | Q(category=category) ).values() ) # Score product attributes/content score_result = self.attribute_scorer.score_product( product_data, category_rules, content_rules=content_rules, generate_ai_suggestions=True ) # Score product image image_path = resolve_image_path(product_data.get('image_path')) image_score_result = None if image_path: try: logger.info(f"[BATCH][GET] Scoring image for {sku}") image_score_result = self.image_scorer.score_image( product_data, image_path=image_path ) except Exception as img_err: logger.warning(f"[BATCH][GET] Image scoring failed for {sku}: {img_err}") image_score_result = { 'image_score': None, 'breakdown': {}, 'issues': ['Image scoring failed'], 'suggestions': ['Check image file'], 'image_metadata': {} } else: image_score_result = { 'image_score': None, 'breakdown': {}, 'issues': ['No image provided'], 'suggestions': ['Upload product image'], 'image_metadata': {} } # Combine issues and suggestions all_issues = score_result['issues'] + image_score_result.get('issues', []) all_suggestions = score_result['suggestions'] + image_score_result.get('suggestions', []) categorized = categorize_issues_and_suggestions(all_issues, all_suggestions) score_result['breakdown']['image_score'] = image_score_result.get('image_score') score_result['breakdown']['attributes'] = score_result['breakdown']['mandatory_fields'] results.append({ 'sku': sku, 'title': product_data['title'], 'description': product_data['description'], 'image_path': product_data['image_path'], 'created_at': product_data['created_at'], 'final_score': score_result['final_score'], 'max_score': score_result['max_score'], 'breakdown': score_result['breakdown'], 'image_score': image_score_result.get('image_score'), 'image_breakdown': image_score_result.get('breakdown', {}), 'image_metadata': image_score_result.get('image_metadata', {}), 'ai_suggestions': { 'content': score_result.get('ai_suggestions', {}), 'image': image_score_result.get('ai_improvements', {}) }, 'categorized_feedback': categorized, 'processing_time': score_result.get('processing_time', 0), 'issues': all_issues, 'suggestions': all_suggestions }) except Exception as e: logger.error(f"[BATCH][GET] Error scoring {sku}: {str(e)}", exc_info=True) errors.append({'sku': sku, 'error': str(e)}) logger.info(f"[BATCH][GET] ===== COMPLETE: {len(results)} success, {len(errors)} errors =====") return JsonResponse({ 'success': True, 'processed': len(results), 'results': results, 'errors': errors }) except Exception as e: logger.error(f"[BATCH][GET] Batch scoring error: {str(e)}", exc_info=True) return JsonResponse({'error': str(e)}, status=500) def post(self, request): logger.info("[BATCH] ========== NEW BATCH REQUEST ==========") try: data = json.loads(request.body) products = data.get('products', []) logger.info(f"[BATCH] Processing {len(products)} products") print(products) if not products: return JsonResponse({'error': 'No products provided'}, status=400) results = [] errors = [] for idx, product_data in enumerate(products[:100], 1): sku = product_data.get('sku') category = product_data.get('category') logger.info(f"[BATCH] [{idx}/{len(products)}] Processing: {sku}") if not sku or not category: errors.append({'sku': sku, 'error': 'Missing SKU or category'}) continue try: # Get rules category_rules = list(CategoryAttributeRule.objects.filter(category=category).values()) if not category_rules: errors.append({'sku': sku, 'error': f'No attribute rules for category {category}'}) continue content_rules = list( ProductContentRule.objects.filter( Q(category__isnull=True) | Q(category=category) ).values() ) # Score attributes score_result = self.attribute_scorer.score_product( product_data, category_rules, content_rules=content_rules, generate_ai_suggestions=True ) # Score image image_path = resolve_image_path(product_data.get('image_path')) image_score_result = None if image_path: try: logger.info(f"[BATCH] Scoring image for {sku}") image_score_result = self.image_scorer.score_image( product_data, image_path=image_path ) logger.info(f"[BATCH] Image score for {sku}: {image_score_result.get('image_score')}") except Exception as img_err: logger.warning(f"[BATCH] Image scoring failed for {sku}: {img_err}") image_score_result = { 'image_score': None, 'breakdown': {}, 'issues': ['Image scoring failed'], 'suggestions': ['Check image file'], 'image_metadata': {} } else: logger.info(f"[BATCH] No valid image for {sku}") image_score_result = { 'image_score': None, 'breakdown': {}, 'issues': ['No image provided'], 'suggestions': ['Upload product image'], 'image_metadata': {} } all_issues = score_result['issues'] + image_score_result.get('issues', []) all_suggestions = score_result['suggestions'] + image_score_result.get('suggestions', []) categorized = categorize_issues_and_suggestions(all_issues, all_suggestions) results.append({ 'sku': sku, 'title': product_data['title'], 'description': product_data['description'], 'image_path': image_path, 'final_score': score_result['final_score'], 'max_score': score_result['max_score'], 'breakdown': score_result['breakdown'], 'image_score': image_score_result.get('image_score'), 'image_breakdown': image_score_result.get('breakdown', {}), 'image_metadata': image_score_result.get('image_metadata', {}), 'ai_suggestions': { 'content': score_result.get('ai_suggestions', {}), 'image': image_score_result.get('ai_improvements', {}) }, 'categorized_feedback': categorized, 'processing_time': score_result.get('processing_time', 0), 'issues': all_issues, 'suggestions': all_suggestions }) except Exception as e: logger.error(f"[BATCH] Error scoring {sku}: {str(e)}", exc_info=True) errors.append({'sku': sku, 'error': str(e)}) logger.info(f"[BATCH] ========== BATCH COMPLETE: {len(results)} success, {len(errors)} errors ==========") return JsonResponse({ 'success': True, 'processed': len(results), 'results': results, 'errors': errors }) except Exception as e: logger.error(f"[BATCH] Batch scoring error: {str(e)}", exc_info=True) return JsonResponse({'error': str(e)}, status=500) import json import logging import concurrent.futures from django.http import JsonResponse from django.utils.decorators import method_decorator from django.views import View from django.views.decorators.csrf import csrf_exempt from django.db.models import Q from textblob import download_corpora from concurrent.futures import ThreadPoolExecutor, as_completed import time from core.models import Product, CategoryAttributeRule, ProductContentRule # from .scorers.attribute_scorer import AttributeQualityScorer # from .scorers.image_scorer import ImageQualityScorer # from .utils import resolve_image_path, categorize_issues_and_suggestions logger = logging.getLogger(__name__) # Try to ensure corpora are downloaded once at import try: download_corpora.download_all() logger.info("[INIT] TextBlob corpora verified.") except Exception as e: logger.warning(f"[INIT] Failed to auto-download TextBlob corpora: {e}") @method_decorator(csrf_exempt, name='dispatch') class BatchScoreViewV2(View): """Optimized Batch scoring with image support and parallel processing""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.attribute_scorer = AttributeQualityScorer(use_ai=True) self.image_scorer = ImageQualityScorer(use_ai=True) logger.info("[INIT] BatchScoreView initialized with AI scorers preloaded") def _process_single_product(self, product_data, idx, total): """Process a single product safely (run inside thread pool)""" start = time.time() sku = product_data.get('sku') category = product_data.get('category') img_raw_path = product_data.get('image_path') if not sku or not category: return {'sku': sku, 'error': 'Missing SKU or category'} try: logger.info(f"[BATCH][{idx}/{total}] Processing SKU: {sku}") # Fetch rules category_rules = list(CategoryAttributeRule.objects.filter(category=category).values()) if not category_rules: return {'sku': sku, 'error': f'No attribute rules for category {category}'} content_rules = list( ProductContentRule.objects.filter( Q(category__isnull=True) | Q(category=category) ).values() ) # Score attributes score_result = self.attribute_scorer.score_product( product_data, category_rules, content_rules=content_rules, generate_ai_suggestions=True ) # Score image image_path = resolve_image_path(product_data.get('image_path')) image_score_result = None if image_path: try: image_score_result = self.image_scorer.score_image( product_data, image_path=image_path ) except Exception as img_err: logger.warning(f"[IMG] Scoring failed for {sku}: {img_err}") image_score_result = { 'image_score': None, 'breakdown': {}, 'issues': ['Image scoring failed'], 'suggestions': ['Check image file'], 'image_metadata': {} } else: image_score_result = { 'image_score': None, 'breakdown': {}, 'issues': ['No image provided'], 'suggestions': ['Upload product image'], 'image_metadata': {} } # Combine results all_issues = score_result['issues'] + image_score_result.get('issues', []) all_suggestions = score_result['suggestions'] + image_score_result.get('suggestions', []) categorized = categorize_issues_and_suggestions(all_issues, all_suggestions) total_time = round(time.time() - start, 2) score_result['breakdown']['attributes'] = score_result['breakdown']['mandatory_fields'] return { 'sku': sku, 'title': product_data.get('title'), 'description': product_data.get('description'), 'image_path': img_raw_path, 'final_score': score_result['final_score'], 'max_score': score_result['max_score'], 'breakdown': {**score_result['breakdown'], 'image_score': image_score_result.get('image_score')}, 'image_score': image_score_result.get('image_score'), 'image_breakdown': image_score_result.get('breakdown', {}), 'image_metadata': image_score_result.get('image_metadata', {}), 'ai_suggestions': { 'content': score_result.get('ai_suggestions', {}), 'image': image_score_result.get('ai_improvements', {}) }, 'categorized_feedback': categorized, 'issues': all_issues, 'suggestions': all_suggestions, 'processing_time': total_time } except Exception as e: logger.error(f"[BATCH][{sku}] Error: {e}", exc_info=True) return {'sku': sku, 'error': str(e)} def _run_batch(self, products): """Run products in parallel using thread pool""" results, errors = [], [] total = len(products) start_time = time.time() # Use up to 8 threads (tune based on your CPU) with ThreadPoolExecutor(max_workers=8) as executor: future_to_product = { executor.submit(self._process_single_product, product, idx + 1, total): product for idx, product in enumerate(products) } for future in as_completed(future_to_product): res = future.result() if "error" in res: errors.append(res) else: results.append(res) elapsed = round(time.time() - start_time, 2) logger.info(f"[BATCH] Completed {len(results)} products in {elapsed}s ({len(errors)} errors)") return results, errors, elapsed def get(self, request): """Score all products from DB automatically""" try: products = list(Product.objects.all().values()) if not products: return JsonResponse({'error': 'No products found in database'}, status=404) results, errors, elapsed = self._run_batch(products) return JsonResponse({ 'success': True, 'processed': len(results), 'results': results, 'errors': errors, 'elapsed_seconds': elapsed }) except Exception as e: logger.error(f"[BATCH][GET] Error: {e}", exc_info=True) return JsonResponse({'error': str(e)}, status=500) def post(self, request): """Batch score via payload""" try: data = json.loads(request.body) products = data.get('products', []) if not products: return JsonResponse({'error': 'No products provided'}, status=400) results, errors, elapsed = self._run_batch(products) return JsonResponse({ 'success': True, 'processed': len(results), 'results': results, 'errors': errors, 'elapsed_seconds': elapsed }) except Exception as e: logger.error(f"[BATCH][POST] Error: {e}", exc_info=True) return JsonResponse({'error': str(e)}, status=500) @method_decorator(csrf_exempt, name='dispatch') class ContentRulesView(View): """API to manage ProductContentRules""" def get(self, request): try: category = request.GET.get('category') if category: rules = ProductContentRule.objects.filter( Q(category__isnull=True) | Q(category=category) ) else: rules = ProductContentRule.objects.all() rules_data = list(rules.values()) return JsonResponse({ 'success': True, 'count': len(rules_data), 'rules': rules_data }) except Exception as e: logger.error(f"Error fetching content rules: {e}", exc_info=True) return JsonResponse({'error': str(e)}, status=500) def post(self, request): try: data = json.loads(request.body) if 'field_name' not in data: return JsonResponse({'error': 'field_name is required'}, status=400) rule = ProductContentRule.objects.create( category=data.get('category'), field_name=data['field_name'], is_mandatory=data.get('is_mandatory', True), min_length=data.get('min_length'), max_length=data.get('max_length'), min_word_count=data.get('min_word_count'), max_word_count=data.get('max_word_count'), must_contain_keywords=data.get('must_contain_keywords', []), validation_regex=data.get('validation_regex', ''), description=data.get('description', '') ) if data.get('category'): cache.delete(f"content_rules_{data['category']}") return JsonResponse({ 'success': True, 'rule_id': rule.id, 'message': 'Content rule created successfully' }) except Exception as e: logger.error(f"Error creating content rule: {e}", exc_info=True) return JsonResponse({'error': str(e)}, status=500) # views.py from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import status import tempfile import os # Import os to remove the temporary file # from core.models import Product, CategoryAttributeRule, ProductContentRule # Not strictly needed here from .management.commands.load_excel_data import Command as ExcelLoader class ExcelUploadView(APIView): def post(self, request): excel_file = request.FILES.get('file') if not excel_file: return Response({'error': 'No file uploaded'}, status=status.HTTP_400_BAD_REQUEST) tmp_path = None try: # 1. Save uploaded file to a temporary location with tempfile.NamedTemporaryFile(delete=False, suffix=".xlsx") as tmp: for chunk in excel_file.chunks(): tmp.write(chunk) tmp_path = tmp.name # 2. Execute the modified loader command loader = ExcelLoader() # The handle method now returns a dictionary with 'success' and 'message' # We pass the path as a keyword argument to match the command structure result = loader.handle(excel_path=tmp_path) # 3. Return the response based on the loader result if result.get('success'): # Send the detailed message back in the API response return Response({'success': True, 'message': result['message']}, status=status.HTTP_200_OK) else: # Handle error case returned by the loader (e.g., file read failure) return Response({'success': False, 'error': result.get('error', 'An unknown error occurred during loading.')}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) except Exception as e: # Catch unexpected exceptions during the process return Response({'success': False, 'error': f'Upload failed: {str(e)}'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) finally: import time import gc if tmp_path and os.path.exists(tmp_path): try: # Force garbage collection to ensure pandas releases the file handle gc.collect() time.sleep(0.3) # brief pause for Windows file handle release os.remove(tmp_path) except PermissionError: # Retry once after short delay (Windows-specific fix) time.sleep(0.5) try: os.remove(tmp_path) except Exception: pass # give up silently; OS will clean temp files later import os import urllib.parse from django.http import HttpResponse from django.conf import settings def open_outlook_mail(request): try: body_path = os.path.join(settings.BASE_DIR, 'core', 'scripts', 'mailbody.txt') with open(body_path, 'r', encoding='utf-8') as f: body = f.read().strip() subject = "ETS Competitor Intelligence - October 28, 2025" recipient = "manager@example.com" encoded_subject = urllib.parse.quote(subject) encoded_body = urllib.parse.quote(body) mailto_url = f"mailto:{recipient}?subject={encoded_subject}&body={encoded_body}" html = f""" Opening Outlook...

📧 Opening Outlook...

If Outlook does not open automatically, click here.

""" return HttpResponse(html) except Exception as e: return HttpResponse(f"

Error:

{str(e)}

", status=500) # views.py from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import status from django.db.models import Avg, Count from .models import Product, AttributeScore # adjust the import path as needed from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import status from django.db.models import Avg, Count class ProductTypeQualityMetricsView(APIView): """ API endpoint to fetch product type quality metrics. Returns metrics for all products, ignoring query parameters. """ def get(self, request): print("****************** get called") # Fetch all products, no filtering queryset = Product.objects.all() print("queryset", queryset) scored = ( AttributeScore.objects.filter(product__in=queryset) .annotate( title_quality=Avg('details__title_quality'), description_quality=Avg('details__description_quality'), image_quality=Avg('details__image_score'), attributes_quality=Avg('details__attributes'), ) .values('product__product_type') .annotate( product_count=Count('product', distinct=True), scored_product_count=Count('id'), avg_overall_score=Avg('score'), avg_title_quality=Avg('details__title_quality'), avg_description_quality=Avg('details__description_quality'), avg_image_quality=Avg('details__image_score'), avg_attributes_quality=Avg('details__attributes'), ) ) results = [ { "product_type": item['product__product_type'], "product_count": item['product_count'], "scored_product_count": item['scored_product_count'], "avg_overall_score": round(item['avg_overall_score'] or 0, 2), "avg_title_quality_percent": round(item['avg_title_quality'] or 0, 2), "avg_description_quality_percent": round(item['avg_description_quality'] or 0, 2), "avg_image_quality_percent": round(item['avg_image_quality'] or 0, 2), "avg_attributes_quality_percent": round(item['avg_attributes_quality'] or 0, 2), } for item in scored ] return Response(results, status=status.HTTP_200_OK) # class ProductTypeQualityMetricsView(APIView): """ API endpoint to fetch product type quality metrics. Supports optional ?product_type= query param. """ def get(self, request): product_type_filter = request.query_params.get('product_type', None) queryset = Product.objects.all() print("queryset",queryset) if product_type_filter: queryset = queryset.filter(product_type=product_type_filter) scored = ( AttributeScore.objects.filter(product__in=queryset) .annotate( title_quality=Avg('details__title_quality'), description_quality=Avg('details__description_quality'), image_quality=Avg('details__image_score'), attributes_quality=Avg('details__attributes'), ) .values('product__category') .annotate( product_count=Count('product', distinct=True), scored_product_count=Count('id'), avg_overall_score=Avg('score'), avg_title_quality=Avg('details__title_quality'), avg_description_quality=Avg('details__description_quality'), avg_image_quality=Avg('details__image_score'), avg_attributes_quality=Avg('details__attributes'), ) ) results = [ { "product_type": item['product__product_type'], "product_count": item['product_count'], "scored_product_count": item['scored_product_count'], "avg_overall_score": round(item['avg_overall_score'] or 0, 2), "avg_title_quality_percent": round(item['avg_title_quality'] or 0, 2), "avg_description_quality_percent": round(item['avg_description_quality'] or 0, 2), "avg_image_quality_percent": round(item['avg_image_quality'] or 0, 2), "avg_attributes_quality_percent": round(item['avg_attributes_quality'] or 0, 2), } for item in scored ] return Response(results, status=status.HTTP_200_OK)