||
- # views.py (FULLY FIXED with comprehensive logging)
- from django.shortcuts import render, get_object_or_404
- from django.http import HttpResponse, JsonResponse
- from django.views import View
- from django.core.cache import cache
- from django.db.models import Q
- from django.views.decorators.csrf import csrf_exempt
- from django.utils.decorators import method_decorator
- from django.conf import settings
- import json
- import logging
- # logging.basicConfig(
- # level=logging.INFO, # or DEBUG if you want more detail
- # format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
- # )
- import os
- from core.services.image_scorer import ImageQualityScorer
- from core.models import AttributeScore, CategoryAttributeRule, ProductContentRule, Product
- from core.services.attribute_scorer import AttributeQualityScorer
- logger = logging.getLogger(__name__)
- def categorize_issues_and_suggestions(issues, suggestions):
- """Categorize issues and suggestions by component"""
- categorized = {
- 'attributes': {'issues': [], 'suggestions': []},
- 'title': {'issues': [], 'suggestions': []},
- 'description': {'issues': [], 'suggestions': []},
- 'seo': {'issues': [], 'suggestions': []},
- 'content_rules': {'issues': [], 'suggestions': []},
- 'images': {'issues': [], 'suggestions': []},
- 'general': {'issues': [], 'suggestions': []}
- }
-
- for issue in issues:
- issue_lower = issue.lower()
-
- if issue.startswith('Title:'):
- categorized['title']['issues'].append(issue.replace('Title:', '').strip())
- elif issue.startswith('Description:'):
- categorized['description']['issues'].append(issue.replace('Description:', '').strip())
- elif issue.startswith('SEO:'):
- categorized['seo']['issues'].append(issue.replace('SEO:', '').strip())
- elif issue.startswith('Image:'):
- categorized['images']['issues'].append(issue.replace('Image:', '').strip())
- elif any(keyword in issue_lower for keyword in ['image', 'photo', 'picture', 'resolution', 'blur', 'background']):
- categorized['images']['issues'].append(issue)
- elif any(field in issue_lower for field in ['seo_title', 'seo title', 'seo_description', 'seo description']):
- categorized['seo']['issues'].append(issue)
- elif 'content rule' in issue_lower:
- categorized['content_rules']['issues'].append(issue)
- elif any(keyword in issue_lower for keyword in ['mandatory field', 'attribute', 'valid values', 'placeholder', 'standardiz']):
- categorized['attributes']['issues'].append(issue)
- else:
- categorized['general']['issues'].append(issue)
-
- for suggestion in suggestions:
- suggestion_lower = suggestion.lower()
-
- if any(prefix in suggestion_lower for prefix in ['title:', 'expand title', 'shorten title', 'add brand name']):
- categorized['title']['suggestions'].append(suggestion)
- elif any(prefix in suggestion_lower for prefix in ['description:', 'expand description', 'write comprehensive', 'add features']):
- categorized['description']['suggestions'].append(suggestion)
- elif any(prefix in suggestion_lower for prefix in ['seo:', 'improve seo', 'add keywords', 'search', 'discoverability']):
- categorized['seo']['suggestions'].append(suggestion)
- elif any(keyword in suggestion_lower for keyword in ['image', 'photo', 'resolution', 'compress', 'background', 'white background']):
- categorized['images']['suggestions'].append(suggestion)
- elif any(field in suggestion_lower for field in ['seo_title', 'seo title', 'seo_description', 'seo description']):
- categorized['seo']['suggestions'].append(suggestion)
- elif 'content rule' in suggestion_lower:
- categorized['content_rules']['suggestions'].append(suggestion)
- elif any(keyword in suggestion_lower for keyword in ['add required', 'add mandatory', 'provide a', 'attribute', 'standardize']):
- categorized['attributes']['suggestions'].append(suggestion)
- else:
- categorized['general']['suggestions'].append(suggestion)
-
- return {k: v for k, v in categorized.items() if v['issues'] or v['suggestions']}
- def resolve_image_path(image_rel_path):
- """
- Resolve relative image path to full absolute path with comprehensive logging
- """
- logger.info(f"[IMAGE PATH] Starting resolution for: {image_rel_path}")
-
- if not image_rel_path:
- logger.warning("[IMAGE PATH] No image path provided")
- return None
-
- # Log settings
- logger.info(f"[IMAGE PATH] MEDIA_ROOT: {settings.MEDIA_ROOT}")
- logger.info(f"[IMAGE PATH] MEDIA_URL: {settings.MEDIA_URL}")
-
- # Remove MEDIA_URL prefix if present
- cleaned_path = image_rel_path.replace(settings.MEDIA_URL, "").lstrip("/\\")
- logger.info(f"[IMAGE PATH] Cleaned path: {cleaned_path}")
-
- # Build full path
- full_path = os.path.join(settings.MEDIA_ROOT, cleaned_path)
- logger.info(f"[IMAGE PATH] Full path before normalization: {full_path}")
-
- # Normalize path (resolve .., ., etc.)
- full_path = os.path.normpath(full_path)
- logger.info(f"[IMAGE PATH] Normalized path: {full_path}")
-
- # Security check
- media_root_normalized = os.path.normpath(settings.MEDIA_ROOT)
- logger.info(f"[IMAGE PATH] Normalized MEDIA_ROOT: {media_root_normalized}")
-
- if not full_path.startswith(media_root_normalized):
- logger.error(f"[IMAGE PATH] SECURITY: Path outside MEDIA_ROOT! Path: {full_path}, MEDIA_ROOT: {media_root_normalized}")
- return None
-
- # Check if file exists
- if not os.path.exists(full_path):
- logger.error(f"[IMAGE PATH] FILE NOT FOUND: {full_path}")
- # List directory contents for debugging
- dir_path = os.path.dirname(full_path)
- if os.path.exists(dir_path):
- files = os.listdir(dir_path)
- logger.info(f"[IMAGE PATH] Files in {dir_path}: {files}")
- else:
- logger.error(f"[IMAGE PATH] Directory does not exist: {dir_path}")
- return None
-
- logger.info(f"[IMAGE PATH] ✓ Image file found and validated: {full_path}")
- return full_path
- @method_decorator(csrf_exempt, name='dispatch')
- class AttributeScoreView(View):
- """Enhanced API view with Image scoring support"""
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.scorer = AttributeQualityScorer(use_ai=True)
- self.image_scorer = ImageQualityScorer(use_ai=True)
- logger.info("[INIT] AttributeScoreView initialized with image scorer")
- def post(self, request, *args, **kwargs):
- """Score a single product with image scoring"""
- logger.info("[POST] ========== NEW SCORING REQUEST ==========")
-
- try:
- data = json.loads(request.body)
- logger.info(f"[POST] Request data keys: {data.keys()}")
-
- product_data = data.get('product', {})
- logger.info(f"[POST] Product data keys: {product_data.keys()}")
-
- sku = product_data.get('sku')
- use_ai = data.get('use_ai', True)
- if not sku:
- logger.error("[POST] SKU is missing")
- return JsonResponse({'error': 'SKU is required'}, status=400)
- logger.info(f"[POST] Processing SKU: {sku}")
- category = product_data.get('category', '')
- if not category:
- logger.error(f"[POST] Category missing for SKU: {sku}")
- return JsonResponse({'error': 'Category is required'}, status=400)
- logger.info(f"[POST] Category: {category}")
- # Get or create product
- product, created = Product.objects.get_or_create(
- sku=sku,
- defaults={
- 'title': product_data.get('title', ''),
- 'description': product_data.get('description', ''),
- 'short_description': product_data.get('short_description', ''),
- 'seo_title': product_data.get('seo_title', ''),
- 'seo_description': product_data.get('seo_description', ''),
- 'category': category,
- 'attributes': product_data.get('attributes', {})
- }
- )
- logger.info(f"[POST] Product {'created' if created else 'retrieved'}: {sku}")
- if not created:
- product.title = product_data.get('title', product.title)
- product.description = product_data.get('description', product.description)
- product.short_description = product_data.get('short_description', product.short_description)
- product.seo_title = product_data.get('seo_title', product.seo_title)
- product.seo_description = product_data.get('seo_description', product.seo_description)
- product.attributes = product_data.get('attributes', product.attributes)
- product.save()
- logger.info(f"[POST] Product updated: {sku}")
- # Get rules
- cache_key = f"category_rules_{category}"
- category_rules = cache.get(cache_key)
- if category_rules is None:
- category_rules = list(CategoryAttributeRule.objects.filter(category=category).values())
- cache.set(cache_key, category_rules, 3600)
- logger.info(f"[POST] Loaded {len(category_rules)} category rules for {category}")
-
- if not category_rules:
- logger.error(f"[POST] No attribute rules for category: {category}")
- return JsonResponse({'error': f'No attribute rules defined for {category}'}, status=400)
- # Get content rules
- content_cache_key = f"content_rules_{category}"
- content_rules = cache.get(content_cache_key)
- if content_rules is None:
- content_rules = list(
- ProductContentRule.objects.filter(
- Q(category__isnull=True) | Q(category=category)
- ).values()
- )
- cache.set(content_cache_key, content_rules, 3600)
- logger.info(f"[POST] Loaded {len(content_rules)} content rules for {category}")
- # Build product dict
- product_dict = {
- 'sku': product.sku,
- 'category': product.category,
- 'title': product.title,
- 'description': product.description,
- 'short_description': product.short_description,
- 'seo_title': product.seo_title,
- 'seo_description': product.seo_description,
- 'attributes': product.attributes
- }
- logger.info("[POST] Starting attribute and content scoring...")
- # Score product attributes and content
- score_result = self.scorer.score_product(
- product_dict,
- category_rules,
- content_rules=content_rules,
- generate_ai_suggestions=use_ai
- )
- logger.info(f"[POST] Attribute scoring complete. Score: {score_result.get('final_score')}")
- # ========== IMAGE SCORING ==========
- logger.info("[POST] ========== STARTING IMAGE SCORING ==========")
- image_score_result = None
-
- # Check for image_path in product_data
- image_rel_path = product_data.get('image_path')
- logger.info(f"[POST] Image path from payload: {image_rel_path}")
-
- if image_rel_path:
- image_path = resolve_image_path(image_rel_path)
-
- if image_path:
- try:
- logger.info(f"[POST] Calling image scorer for: {image_path}")
- image_score_result = self.image_scorer.score_image(
- product_dict,
- image_path=image_path
- )
- logger.info(f"[POST] ✓ Image scoring complete. Score: {image_score_result.get('image_score')}")
- logger.info(f"[POST] Image issues: {len(image_score_result.get('issues', []))}")
- logger.info(f"[POST] Image suggestions: {len(image_score_result.get('suggestions', []))}")
- except Exception as img_err:
- logger.error(f"[POST] ✗ Image scoring FAILED: {img_err}", exc_info=True)
- image_score_result = {
- 'image_score': None,
- 'breakdown': {},
- 'issues': [f'Image scoring error: {str(img_err)}'],
- 'suggestions': ['Check image file format and accessibility'],
- 'image_metadata': {}
- }
- else:
- logger.warning(f"[POST] Image path resolution failed for: {image_rel_path}")
- image_score_result = {
- 'image_score': None,
- 'breakdown': {},
- 'issues': ['Image file not found or inaccessible'],
- 'suggestions': [f'Verify image exists at path: {image_rel_path}'],
- 'image_metadata': {}
- }
- else:
- logger.warning("[POST] No image_path provided in payload")
- image_score_result = {
- 'image_score': None,
- 'breakdown': {},
- 'issues': ['No image provided'],
- 'suggestions': ['Upload a product image to improve quality score'],
- 'image_metadata': {}
- }
- logger.info("[POST] ========== IMAGE SCORING COMPLETE ==========")
- # Combine issues and suggestions
- all_issues = score_result['issues'] + (image_score_result.get('issues', []) if image_score_result else [])
- all_suggestions = score_result['suggestions'] + (image_score_result.get('suggestions', []) if image_score_result else [])
- logger.info(f"[POST] Total issues: {len(all_issues)} (Attributes: {len(score_result['issues'])}, Images: {len(image_score_result.get('issues', []))})")
- logger.info(f"[POST] Total suggestions: {len(all_suggestions)}")
- # Categorize feedback
- categorized = categorize_issues_and_suggestions(all_issues, all_suggestions)
- logger.info(f"[POST] Categorized feedback into {len(categorized)} categories")
- # Save score
- AttributeScore.objects.create(
- product=product,
- score=score_result['final_score'],
- max_score=score_result['max_score'],
- details={
- **score_result['breakdown'],
- 'image_score': image_score_result.get('image_score') if image_score_result else None,
- 'image_breakdown': image_score_result.get('breakdown', {}) if image_score_result else {}
- },
- issues=all_issues,
- suggestions=all_suggestions,
- ai_suggestions=score_result.get('ai_suggestions', {}),
- processing_time=score_result.get('processing_time', 0)
- )
- logger.info(f"[POST] Score saved to database for SKU: {sku}")
- # Enhanced response
- response_data = {
- 'success': True,
- 'product_sku': sku,
- 'created': created,
- 'score_result': {
- 'final_score': score_result['final_score'],
- 'max_score': score_result['max_score'],
- 'breakdown': score_result['breakdown'],
- 'image_score': image_score_result.get('image_score') if image_score_result else None,
- 'image_breakdown': image_score_result.get('breakdown', {}) if image_score_result else {},
- 'image_metadata': image_score_result.get('image_metadata', {}) if image_score_result else {},
- 'categorized_feedback': categorized,
- 'ai_suggestions': {
- 'content': score_result.get('ai_suggestions', {}),
- 'image': image_score_result.get('ai_improvements') if image_score_result else None
- },
- 'processing_time': score_result.get('processing_time', 0),
- 'issues': all_issues,
- 'suggestions': all_suggestions
- }
- }
-
- logger.info(f"[POST] ========== REQUEST COMPLETE: {sku} ==========")
- return JsonResponse(response_data)
- except json.JSONDecodeError as e:
- logger.error(f"[POST] JSON decode error: {e}", exc_info=True)
- return JsonResponse({'error': 'Invalid JSON'}, status=400)
- except Exception as e:
- logger.error(f"[POST] Unexpected error: {str(e)}", exc_info=True)
- return JsonResponse({'error': str(e)}, status=500)
- @method_decorator(csrf_exempt, name='dispatch')
- class BatchScoreView(View):
- """Batch scoring with image support"""
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.attribute_scorer = AttributeQualityScorer(use_ai=True)
- self.image_scorer = ImageQualityScorer(use_ai=True)
- logger.info("[INIT] BatchScoreView initialized")
- def get(self, request):
- """
- Automatically score all products from the database (no payload needed).
- """
- logger.info("[BATCH][GET] ========== AUTO BATCH SCORING STARTED ==========")
- try:
- # Fetch all products
- products = list(Product.objects.all().values())
- logger.info(f"[BATCH][GET] Found {len(products)} products in DB")
- if not products:
- return JsonResponse({'error': 'No products found in the database.'}, status=404)
- results = []
- errors = []
- for idx, product_data in enumerate(products, 1):
- sku = product_data.get('sku')
- category = product_data.get('category')
- logger.info(f"[BATCH][GET] [{idx}/{len(products)}] Processing: {sku}")
- if not sku or not category:
- errors.append({'sku': sku, 'error': 'Missing SKU or category'})
- continue
- try:
- # Load rules
- category_rules = list(CategoryAttributeRule.objects.filter(category=category).values())
- if not category_rules:
- errors.append({'sku': sku, 'error': f'No attribute rules for category {category}'})
- continue
- content_rules = list(
- ProductContentRule.objects.filter(
- Q(category__isnull=True) | Q(category=category)
- ).values()
- )
- # Score product attributes/content
- score_result = self.attribute_scorer.score_product(
- product_data,
- category_rules,
- content_rules=content_rules,
- generate_ai_suggestions=True
- )
- # Score product image
- image_path = resolve_image_path(product_data.get('image_path'))
- image_score_result = None
- if image_path:
- try:
- logger.info(f"[BATCH][GET] Scoring image for {sku}")
- image_score_result = self.image_scorer.score_image(
- product_data,
- image_path=image_path
- )
- except Exception as img_err:
- logger.warning(f"[BATCH][GET] Image scoring failed for {sku}: {img_err}")
- image_score_result = {
- 'image_score': None,
- 'breakdown': {},
- 'issues': ['Image scoring failed'],
- 'suggestions': ['Check image file'],
- 'image_metadata': {}
- }
- else:
- image_score_result = {
- 'image_score': None,
- 'breakdown': {},
- 'issues': ['No image provided'],
- 'suggestions': ['Upload product image'],
- 'image_metadata': {}
- }
- # Combine issues and suggestions
- all_issues = score_result['issues'] + image_score_result.get('issues', [])
- all_suggestions = score_result['suggestions'] + image_score_result.get('suggestions', [])
- categorized = categorize_issues_and_suggestions(all_issues, all_suggestions)
-
- score_result['breakdown']['image_score'] = image_score_result.get('image_score')
- score_result['breakdown']['attributes'] = score_result['breakdown']['mandatory_fields']
-
- results.append({
- 'sku': sku,
- 'title': product_data['title'],
- 'description': product_data['description'],
- 'image_path': product_data['image_path'],
- 'created_at': product_data['created_at'],
- 'final_score': score_result['final_score'],
- 'max_score': score_result['max_score'],
- 'breakdown': score_result['breakdown'],
- 'image_score': image_score_result.get('image_score'),
- 'image_breakdown': image_score_result.get('breakdown', {}),
- 'image_metadata': image_score_result.get('image_metadata', {}),
- 'ai_suggestions': {
- 'content': score_result.get('ai_suggestions', {}),
- 'image': image_score_result.get('ai_improvements', {})
- },
- 'categorized_feedback': categorized,
- 'processing_time': score_result.get('processing_time', 0),
- 'issues': all_issues,
- 'suggestions': all_suggestions
- })
- except Exception as e:
- logger.error(f"[BATCH][GET] Error scoring {sku}: {str(e)}", exc_info=True)
- errors.append({'sku': sku, 'error': str(e)})
- logger.info(f"[BATCH][GET] ===== COMPLETE: {len(results)} success, {len(errors)} errors =====")
- return JsonResponse({
- 'success': True,
- 'processed': len(results),
- 'results': results,
- 'errors': errors
- })
- except Exception as e:
- logger.error(f"[BATCH][GET] Batch scoring error: {str(e)}", exc_info=True)
- return JsonResponse({'error': str(e)}, status=500)
- def post(self, request):
- logger.info("[BATCH] ========== NEW BATCH REQUEST ==========")
-
- try:
- data = json.loads(request.body)
- products = data.get('products', [])
-
- logger.info(f"[BATCH] Processing {len(products)} products")
- print(products)
- if not products:
- return JsonResponse({'error': 'No products provided'}, status=400)
- results = []
- errors = []
- for idx, product_data in enumerate(products[:100], 1):
- sku = product_data.get('sku')
-
- category = product_data.get('category')
-
- logger.info(f"[BATCH] [{idx}/{len(products)}] Processing: {sku}")
- if not sku or not category:
- errors.append({'sku': sku, 'error': 'Missing SKU or category'})
- continue
- try:
- # Get rules
- category_rules = list(CategoryAttributeRule.objects.filter(category=category).values())
- if not category_rules:
- errors.append({'sku': sku, 'error': f'No attribute rules for category {category}'})
- continue
- content_rules = list(
- ProductContentRule.objects.filter(
- Q(category__isnull=True) | Q(category=category)
- ).values()
- )
- # Score attributes
- score_result = self.attribute_scorer.score_product(
- product_data,
- category_rules,
- content_rules=content_rules,
- generate_ai_suggestions=True
- )
- # Score image
- image_path = resolve_image_path(product_data.get('image_path'))
- image_score_result = None
- if image_path:
- try:
- logger.info(f"[BATCH] Scoring image for {sku}")
- image_score_result = self.image_scorer.score_image(
- product_data,
- image_path=image_path
- )
- logger.info(f"[BATCH] Image score for {sku}: {image_score_result.get('image_score')}")
- except Exception as img_err:
- logger.warning(f"[BATCH] Image scoring failed for {sku}: {img_err}")
- image_score_result = {
- 'image_score': None,
- 'breakdown': {},
- 'issues': ['Image scoring failed'],
- 'suggestions': ['Check image file'],
- 'image_metadata': {}
- }
- else:
- logger.info(f"[BATCH] No valid image for {sku}")
- image_score_result = {
- 'image_score': None,
- 'breakdown': {},
- 'issues': ['No image provided'],
- 'suggestions': ['Upload product image'],
- 'image_metadata': {}
- }
- all_issues = score_result['issues'] + image_score_result.get('issues', [])
- all_suggestions = score_result['suggestions'] + image_score_result.get('suggestions', [])
- categorized = categorize_issues_and_suggestions(all_issues, all_suggestions)
- results.append({
- 'sku': sku,
- 'title': product_data['title'],
- 'description': product_data['description'],
- 'image_path': image_path,
- 'final_score': score_result['final_score'],
- 'max_score': score_result['max_score'],
- 'breakdown': score_result['breakdown'],
- 'image_score': image_score_result.get('image_score'),
- 'image_breakdown': image_score_result.get('breakdown', {}),
- 'image_metadata': image_score_result.get('image_metadata', {}),
- 'ai_suggestions': {
- 'content': score_result.get('ai_suggestions', {}),
- 'image': image_score_result.get('ai_improvements', {})
- },
- 'categorized_feedback': categorized,
- 'processing_time': score_result.get('processing_time', 0),
- 'issues': all_issues,
- 'suggestions': all_suggestions
- })
- except Exception as e:
- logger.error(f"[BATCH] Error scoring {sku}: {str(e)}", exc_info=True)
- errors.append({'sku': sku, 'error': str(e)})
- logger.info(f"[BATCH] ========== BATCH COMPLETE: {len(results)} success, {len(errors)} errors ==========")
-
- return JsonResponse({
- 'success': True,
- 'processed': len(results),
- 'results': results,
- 'errors': errors
- })
- except Exception as e:
- logger.error(f"[BATCH] Batch scoring error: {str(e)}", exc_info=True)
- return JsonResponse({'error': str(e)}, status=500)
- import json
- import logging
- import concurrent.futures
- from django.http import JsonResponse
- from django.utils.decorators import method_decorator
- from django.views import View
- from django.views.decorators.csrf import csrf_exempt
- from django.db.models import Q
- from textblob import download_corpora
- from concurrent.futures import ThreadPoolExecutor, as_completed
- import time
- from core.models import Product, CategoryAttributeRule, ProductContentRule
- # from .scorers.attribute_scorer import AttributeQualityScorer
- # from .scorers.image_scorer import ImageQualityScorer
- # from .utils import resolve_image_path, categorize_issues_and_suggestions
- logger = logging.getLogger(__name__)
- # Try to ensure corpora are downloaded once at import
- try:
- download_corpora.download_all()
- logger.info("[INIT] TextBlob corpora verified.")
- except Exception as e:
- logger.warning(f"[INIT] Failed to auto-download TextBlob corpora: {e}")
- @method_decorator(csrf_exempt, name='dispatch')
- class BatchScoreViewV2(View):
- """Optimized Batch scoring with image support and parallel processing"""
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.attribute_scorer = AttributeQualityScorer(use_ai=True)
- self.image_scorer = ImageQualityScorer(use_ai=True)
- logger.info("[INIT] BatchScoreView initialized with AI scorers preloaded")
- def _process_single_product(self, product_data, idx, total):
- """Process a single product safely (run inside thread pool)"""
- start = time.time()
- sku = product_data.get('sku')
- category = product_data.get('category')
- img_raw_path = product_data.get('image_path')
- if not sku or not category:
- return {'sku': sku, 'error': 'Missing SKU or category'}
- try:
- logger.info(f"[BATCH][{idx}/{total}] Processing SKU: {sku}")
- # Fetch rules
- category_rules = list(CategoryAttributeRule.objects.filter(category=category).values())
- if not category_rules:
- return {'sku': sku, 'error': f'No attribute rules for category {category}'}
- content_rules = list(
- ProductContentRule.objects.filter(
- Q(category__isnull=True) | Q(category=category)
- ).values()
- )
- # Score attributes
- score_result = self.attribute_scorer.score_product(
- product_data,
- category_rules,
- content_rules=content_rules,
- generate_ai_suggestions=True
- )
- # Score image
- image_path = resolve_image_path(product_data.get('image_path'))
- image_score_result = None
- if image_path:
- try:
- image_score_result = self.image_scorer.score_image(
- product_data,
- image_path=image_path
- )
- except Exception as img_err:
- logger.warning(f"[IMG] Scoring failed for {sku}: {img_err}")
- image_score_result = {
- 'image_score': None,
- 'breakdown': {},
- 'issues': ['Image scoring failed'],
- 'suggestions': ['Check image file'],
- 'image_metadata': {}
- }
- else:
- image_score_result = {
- 'image_score': None,
- 'breakdown': {},
- 'issues': ['No image provided'],
- 'suggestions': ['Upload product image'],
- 'image_metadata': {}
- }
- # Combine results
- all_issues = score_result['issues'] + image_score_result.get('issues', [])
- all_suggestions = score_result['suggestions'] + image_score_result.get('suggestions', [])
- categorized = categorize_issues_and_suggestions(all_issues, all_suggestions)
- total_time = round(time.time() - start, 2)
- score_result['breakdown']['attributes'] = score_result['breakdown']['mandatory_fields']
- return {
- 'sku': sku,
- 'title': product_data.get('title'),
- 'description': product_data.get('description'),
- 'image_path': img_raw_path,
- 'final_score': score_result['final_score'],
- 'max_score': score_result['max_score'],
- 'breakdown': {**score_result['breakdown'], 'image_score': image_score_result.get('image_score')},
- 'image_score': image_score_result.get('image_score'),
- 'image_breakdown': image_score_result.get('breakdown', {}),
- 'image_metadata': image_score_result.get('image_metadata', {}),
- 'ai_suggestions': {
- 'content': score_result.get('ai_suggestions', {}),
- 'image': image_score_result.get('ai_improvements', {})
- },
- 'categorized_feedback': categorized,
- 'issues': all_issues,
- 'suggestions': all_suggestions,
- 'processing_time': total_time,
- 'product_type': category
- }
- except Exception as e:
- logger.error(f"[BATCH][{sku}] Error: {e}", exc_info=True)
- return {'sku': sku, 'error': str(e)}
- def _run_batch(self, products):
- """Run products in parallel using thread pool"""
- results, errors = [], []
- total = len(products)
- start_time = time.time()
- # Use up to 8 threads (tune based on your CPU)
- with ThreadPoolExecutor(max_workers=8) as executor:
- future_to_product = {
- executor.submit(self._process_single_product, product, idx + 1, total): product
- for idx, product in enumerate(products)
- }
- for future in as_completed(future_to_product):
- res = future.result()
- if "error" in res:
- errors.append(res)
- else:
- results.append(res)
- elapsed = round(time.time() - start_time, 2)
- logger.info(f"[BATCH] Completed {len(results)} products in {elapsed}s ({len(errors)} errors)")
- return results, errors, elapsed
- def get(self, request):
- """Score all products from DB automatically"""
- try:
- products = list(Product.objects.all().values())
- if not products:
- return JsonResponse({'error': 'No products found in database'}, status=404)
- results, errors, elapsed = self._run_batch(products)
- return JsonResponse({
- 'success': True,
- 'processed': len(results),
- 'results': results,
- 'errors': errors,
- 'elapsed_seconds': elapsed
- })
- except Exception as e:
- logger.error(f"[BATCH][GET] Error: {e}", exc_info=True)
- return JsonResponse({'error': str(e)}, status=500)
- def post(self, request):
- """Batch score via payload"""
- try:
- data = json.loads(request.body)
- products = data.get('products', [])
- if not products:
- return JsonResponse({'error': 'No products provided'}, status=400)
- results, errors, elapsed = self._run_batch(products)
- return JsonResponse({
- 'success': True,
- 'processed': len(results),
- 'results': results,
- 'errors': errors,
- 'elapsed_seconds': elapsed
- })
- except Exception as e:
- logger.error(f"[BATCH][POST] Error: {e}", exc_info=True)
- return JsonResponse({'error': str(e)}, status=500)
- @method_decorator(csrf_exempt, name='dispatch')
- class ContentRulesView(View):
- """API to manage ProductContentRules"""
- def get(self, request):
- try:
- category = request.GET.get('category')
-
- if category:
- rules = ProductContentRule.objects.filter(
- Q(category__isnull=True) | Q(category=category)
- )
- else:
- rules = ProductContentRule.objects.all()
-
- rules_data = list(rules.values())
-
- return JsonResponse({
- 'success': True,
- 'count': len(rules_data),
- 'rules': rules_data
- })
-
- except Exception as e:
- logger.error(f"Error fetching content rules: {e}", exc_info=True)
- return JsonResponse({'error': str(e)}, status=500)
- def post(self, request):
- try:
- data = json.loads(request.body)
-
- if 'field_name' not in data:
- return JsonResponse({'error': 'field_name is required'}, status=400)
-
- rule = ProductContentRule.objects.create(
- category=data.get('category'),
- field_name=data['field_name'],
- is_mandatory=data.get('is_mandatory', True),
- min_length=data.get('min_length'),
- max_length=data.get('max_length'),
- min_word_count=data.get('min_word_count'),
- max_word_count=data.get('max_word_count'),
- must_contain_keywords=data.get('must_contain_keywords', []),
- validation_regex=data.get('validation_regex', ''),
- description=data.get('description', '')
- )
-
- if data.get('category'):
- cache.delete(f"content_rules_{data['category']}")
-
- return JsonResponse({
- 'success': True,
- 'rule_id': rule.id,
- 'message': 'Content rule created successfully'
- })
-
- except Exception as e:
- logger.error(f"Error creating content rule: {e}", exc_info=True)
- return JsonResponse({'error': str(e)}, status=500)
-
- # views.py
- from rest_framework.views import APIView
- from rest_framework.response import Response
- from rest_framework import status
- import tempfile
- import os # Import os to remove the temporary file
- # from core.models import Product, CategoryAttributeRule, ProductContentRule # Not strictly needed here
- from .management.commands.load_excel_data import Command as ExcelLoader
- class ExcelUploadView(APIView):
- def post(self, request):
- excel_file = request.FILES.get('file')
- if not excel_file:
- return Response({'error': 'No file uploaded'}, status=status.HTTP_400_BAD_REQUEST)
- tmp_path = None
- try:
- # 1. Save uploaded file to a temporary location
- with tempfile.NamedTemporaryFile(delete=False, suffix=".xlsx") as tmp:
- for chunk in excel_file.chunks():
- tmp.write(chunk)
- tmp_path = tmp.name
- # 2. Execute the modified loader command
- loader = ExcelLoader()
- # The handle method now returns a dictionary with 'success' and 'message'
- # We pass the path as a keyword argument to match the command structure
- result = loader.handle(excel_path=tmp_path)
- # 3. Return the response based on the loader result
- if result.get('success'):
- # Send the detailed message back in the API response
- return Response({'success': True, 'message': result['message']}, status=status.HTTP_200_OK)
- else:
- # Handle error case returned by the loader (e.g., file read failure)
- return Response({'success': False, 'error': result.get('error', 'An unknown error occurred during loading.')}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
- except Exception as e:
- # Catch unexpected exceptions during the process
- return Response({'success': False, 'error': f'Upload failed: {str(e)}'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
- finally:
- import time
- import gc
- if tmp_path and os.path.exists(tmp_path):
- try:
- # Force garbage collection to ensure pandas releases the file handle
- gc.collect()
- time.sleep(0.3) # brief pause for Windows file handle release
- os.remove(tmp_path)
- except PermissionError:
- # Retry once after short delay (Windows-specific fix)
- time.sleep(0.5)
- try:
- os.remove(tmp_path)
- except Exception:
- pass # give up silently; OS will clean temp files later
- import os
- import urllib.parse
- from django.http import HttpResponse
- from django.conf import settings
- def open_outlook_mail(request):
- try:
- body_path = os.path.join(settings.BASE_DIR, 'core', 'scripts', 'mailbody.txt')
- with open(body_path, 'r', encoding='utf-8') as f:
- body = f.read().strip()
- subject = "ETS Competitor Intelligence - October 28, 2025"
- recipient = "manager@example.com"
- encoded_subject = urllib.parse.quote(subject)
- encoded_body = urllib.parse.quote(body)
- mailto_url = f"mailto:{recipient}?subject={encoded_subject}&body={encoded_body}"
- html = f"""
- <html>
- <head><title>Opening Outlook...</title></head>
- <body style="font-family: Arial, sans-serif; text-align: center; margin-top: 80px;">
- <script>
- window.onload = function() {{
- window.location.href = "{mailto_url}";
- }};
- </script>
- <h2>📧 Opening Outlook...</h2>
- <p>If Outlook does not open automatically, <a href="{mailto_url}">click here</a>.</p>
- </body>
- </html>
- """
- return HttpResponse(html)
- except Exception as e:
- return HttpResponse(f"<h3>Error:</h3><p>{str(e)}</p>", status=500)
- # views.py
- from rest_framework.views import APIView
- from rest_framework.response import Response
- from rest_framework import status
- from django.db.models import Avg, Count
- from .models import Product, AttributeScore # adjust the import path as needed
- class ProductTypeQualityMetricsView(APIView):
- """
- API endpoint to fetch product type quality metrics.
- Supports optional ?product_type=<type> query param.
- """
- def get(self, request):
- product_type_filter = request.query_params.get('product_type', None)
- queryset = Product.objects.all()
- if product_type_filter:
- queryset = queryset.filter(product_type=product_type_filter)
- scored = (
- AttributeScore.objects.filter(product__in=queryset)
- .annotate(
- title_quality=Avg('details__title_quality'),
- description_quality=Avg('details__description_quality'),
- image_quality=Avg('details__image_score'),
- attributes_quality=Avg('details__attributes'),
- )
- .values('product__product_type')
- .annotate(
- product_count=Count('product', distinct=True),
- scored_product_count=Count('id'),
- avg_overall_score=Avg('score'),
- avg_title_quality=Avg('details__title_quality'),
- avg_description_quality=Avg('details__description_quality'),
- avg_image_quality=Avg('details__image_score'),
- avg_attributes_quality=Avg('details__attributes'),
- )
- )
- results = [
- {
- "product_type": item['product__product_type'],
- "product_count": item['product_count'],
- "scored_product_count": item['scored_product_count'],
- "avg_overall_score": round(item['avg_overall_score'] or 0, 2),
- "avg_title_quality_percent": round(item['avg_title_quality'] or 0, 2),
- "avg_description_quality_percent": round(item['avg_description_quality'] or 0, 2),
- "avg_image_quality_percent": round(item['avg_image_quality'] or 0, 2),
- "avg_attributes_quality_percent": round(item['avg_attributes_quality'] or 0, 2),
- }
- for item in scored
- ]
- return Response(results, status=status.HTTP_200_OK)
|