| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425 |
- import os
- import json
- import time
- import requests
- import uuid
- import threading
- import pandas as pd
- from bs4 import BeautifulSoup
- from django.shortcuts import get_object_or_404, redirect, render
- from django.core.files.storage import FileSystemStorage
- from django.http import JsonResponse
- from .models import TitleMapping, AttributeMaster,ProcessingTask # <--- THIS FIXES THE ERROR
- from django.conf import settings
- import cloudscraper
- from django.contrib import messages
- from django.contrib.auth import authenticate, login, logout
- # from django.contrib.auth.decorators import login_required
- from .decorators import login_required
- from django.contrib.auth.hashers import make_password
- # To login
- def login_view(request):
- if request.method == "POST":
- email = request.POST.get("username")
- password = request.POST.get("password")
- print("Email: ", email)
- print("Password: ", password)
- # Authenticate the user
- user = authenticate(request, username=email, password=password)
- print("user",user)
-
- if user is not None:
- print("User authenticated successfully.")
- login(request, user)
- request.session['user_email'] = user.email
- # request.session = user
- # request.session['full_name'] = f"{user.firstName} {user.lastName or ''}".strip()
- # # Store both human-readable role and code
- # request.session['role'] = user.get_role_display() # 'Super Admin', 'Admin', 'RTA'
- # request.session['role_code'] = user.role # '0', '1', '2'
- # request.session['joining_date'] = user.createdDate.strftime("%b, %Y")
- # request.session['userId'] = user.userId
- # 📌 Store client_id if user has a client associated
- # request.session['client_id'] = user.client.clientId if user.client else None
- return redirect('title_creator_home')
- else:
- print("Invalid credentials.")
- messages.error(request, "Invalid email or password.")
- return redirect('login')
- print("Rendering login page.")
- return render(request, 'login.html')
- # To logout
- @login_required
- def logout_view(request):
- logout(request)
- messages.success(request, "You have been logged out successfully.")
- return redirect('login')
- @login_required
- def master_config_view(request):
- if request.method == 'POST':
- action = request.POST.get('action')
-
- # Part 1: Add New Attribute
- if action == 'add_attribute':
- name = request.POST.get('attr_name')
- is_m = request.POST.get('is_mandatory') == 'on'
- if name:
- AttributeMaster.objects.get_or_create(name=name.strip(), defaults={'is_mandatory': is_m})
-
- # Part 2: Add New Title Mapping (Product Type)
- # --- MAPPING ACTIONS (CREATE & UPDATE) ---
- elif action in ['add_mapping', 'update_mapping']:
- pt = request.POST.get('pt_name')
- seq = request.POST.get('sequence')
- edit_id = request.POST.get('edit_id')
- if action == 'update_mapping' and edit_id:
- # Update existing
- mapping = get_object_or_404(TitleMapping, id=edit_id)
- mapping.product_type = pt.strip()
- mapping.format_sequence = seq
- mapping.save()
- else:
- # Create new (using get_or_create to prevent exact duplicates)
- if pt:
- TitleMapping.objects.get_or_create(
- product_type=pt.strip(),
- defaults={'format_sequence': seq}
- )
- # --- MAPPING DELETE ---
- elif action == 'delete_mapping':
- mapping_id = request.POST.get('id')
- TitleMapping.objects.filter(id=mapping_id).delete()
-
- # Part 3: Delete functionality
- elif action == 'delete_attribute':
- AttributeMaster.objects.filter(id=request.POST.get('id')).delete()
-
- return redirect('title_creator_master')
- # GET: Load all data
- context = {
- 'attributes': AttributeMaster.objects.all().order_by('name'),
- 'mappings': TitleMapping.objects.all().order_by('product_type'),
- }
- return render(request, 'title_creator_master.html', context)
- def save_config_api(request):
- if request.method == 'POST':
- try:
- data = json.loads(request.body)
-
- # Update Mandatory Attributes
- # Expected data: { "mandatory_ids": [1, 3, 5] }
- AttributeMaster.objects.all().update(is_mandatory=False)
- AttributeMaster.objects.filter(id__in=data.get('mandatory_ids', [])).update(is_mandatory=True)
-
- # Update Title Sequences
- # Expected data: { "mappings": [{"id": 1, "sequence": "Brand,Color"}] }
- for m in data.get('mappings', []):
- TitleMapping.objects.filter(id=m['id']).update(format_sequence=m['sequence'])
-
- return JsonResponse({'success': True})
- except Exception as e:
- return JsonResponse({'success': False, 'error': str(e)})
- def extract_title_or_error(product,selected_pt):
- # 1. Identify Product Type from JSON to fetch the correct Mapping
- pt_name = selected_pt
- try:
- mapping = TitleMapping.objects.get(product_type=pt_name)
- config_sequence = mapping.get_sequence_list()
- except TitleMapping.DoesNotExist:
- return f"No Title Configuration found for Product Type: {pt_name}"
- # 2. Get Mandatory list from DB
- mandatory_fields = list(AttributeMaster.objects.filter(is_mandatory=True).values_list('name', flat=True))
- # 3. Data Extraction (Your logic)
- extracted_data = {
- "Brand": product.get("brand"),
- "Product Type": pt_name
- }
- dimensions = {}
- for group in product.get("attributeGroups", []):
- for attr in group.get("attributes", []):
- desc = attr.get("attributeDesc")
- value = attr.get("attributeValue")
-
- if desc in ["Door Type", "Capacity", "Color"]:
- extracted_data[desc] = value
- elif desc in ["Width", "Depth", "Height"]:
- dimensions[desc] = value
- if {"Width", "Depth", "Height"}.issubset(dimensions):
- extracted_data["Dimensions"] = f'{dimensions["Width"]} x {dimensions["Depth"]} x {dimensions["Height"]}'
- # 4. Build Title and Check Mandatory Rules from DB
- final_title_parts = []
- missing_mandatory = []
- for attr_name in config_sequence:
- val = extracted_data.get(attr_name)
-
- if not val or str(val).strip() == "":
- # If DB says it's mandatory, track the error
- if attr_name in mandatory_fields:
- missing_mandatory.append(attr_name)
- continue
-
- final_title_parts.append(str(val))
- # 5. Result
- if missing_mandatory:
- return f"Could not found {', '.join(missing_mandatory)} on Product Details page"
-
- return " ".join(final_title_parts)
- def construct_dynamic_title(raw_data,selected_pt):
- try:
- product = raw_data.get("props", {}).get("pageProps", {}).get("product", {})
- if not product: return "Product data not found"
-
- return extract_title_or_error(product,selected_pt).strip()
- except Exception:
- return "Could not found attribute name on product details page"
- @login_required
- def title_creator_view(request):
- if request.method == 'POST' and request.FILES.get('file'):
- excel_file = request.FILES['file']
- selected_pt = request.POST.get('product_type')
- fs = FileSystemStorage()
- filename = fs.save(excel_file.name, excel_file)
- file_path = fs.path(filename)
- try:
- # 1. Read Excel
- df = pd.read_excel(file_path)
-
- # 2. Add the NEW COLUMN if it doesn't exist
- if 'New_Generated_Title' not in df.columns:
- df['New_Generated_Title'] = ""
- headers = {"User-Agent": "Mozilla/5.0"}
- results_for_ui = []
- # 3. Process each row
- for index, row in df.iterrows():
- url = row.get('URL') # Assumes your excel has a 'URL' column
- new_title = ""
- if pd.notna(url):
- try:
- resp = requests.get(url, headers=headers, timeout=10)
- soup = BeautifulSoup(resp.content, 'html.parser')
- script_tag = soup.find('script', id='__NEXT_DATA__')
-
- if script_tag:
- raw_data = json.loads(script_tag.string)
- new_title = construct_dynamic_title(raw_data,selected_pt)
- else:
- new_title = "Could not found attribute name on product details page"
- except:
- new_title = "Could not found attribute name on product details page"
- else:
- new_title = "URL Missing"
- # Update the DataFrame column for this row
- df.at[index, 'New_Generated_Title'] = new_title
-
- results_for_ui.append({
- "id" : index + 1,
- "url": url,
- "new_title": new_title,
- "status": True
- })
- time.sleep(1) # Safety delay
- # 4. Save the modified Excel to a new path
- output_filename = f"processed_{excel_file.name}"
- output_path = os.path.join(fs.location, output_filename)
- df.to_excel(output_path, index=False)
- return JsonResponse({
- 'success': True,
- 'results': results_for_ui,
- 'download_url': fs.url(output_filename)
- })
- finally:
- if os.path.exists(file_path): os.remove(file_path)
- # GET request: Fetch all product types for the dropdown
- product_types = TitleMapping.objects.all().values_list('product_type', flat=True)
- return render(request, 'title_creator_index.html', {'product_types': product_types})
- # return render(request, 'title_creator_index.html')
- # def process_excel_task(file_path, selected_pt, task_id):
- # task = ProcessingTask.objects.get(task_id=task_id)
- # try:
- # df = pd.read_excel(file_path)
- # if 'New_Generated_Title' not in df.columns:
- # df['New_Generated_Title'] = ""
- # headers = {"User-Agent": "Mozilla/5.0"}
- # for index, row in df.iterrows():
- # url = row.get('URL')
- # # ... [Insert your existing BeautifulSoup/Scraping Logic Here] ...
- # # Example:
- # # new_title = construct_dynamic_title(raw_data, selected_pt)
- # # df.at[index, 'New_Generated_Title'] = new_title
- # time.sleep(1)
- # # Save Final File
- # output_filename = f"completed_{task.original_filename}"
- # from django.conf import settings
- # output_path = os.path.join(settings.MEDIA_ROOT, output_filename)
- # df.to_excel(output_path, index=False)
- # # Update Task Status
- # task.status = 'COMPLETED'
- # task.download_url = f"{settings.MEDIA_URL}{output_filename}"
- # task.save()
- # except Exception as e:
- # task.status = 'FAILED'
- # task.save()
- # print(f"Error: {e}")
- # finally:
- # if os.path.exists(file_path):
- # os.remove(file_path)
- def process_excel_task(file_path, selected_pt, task_id):
- # Retrieve the task record from the database
- scraper = cloudscraper.create_scraper() # This replaces requests.get
- task = ProcessingTask.objects.get(task_id=task_id)
-
- try:
- # 1. Read Excel
- df = pd.read_excel(file_path)
-
- # 2. Add the NEW COLUMN if it doesn't exist
- if 'New_Generated_Title' not in df.columns:
- df['New_Generated_Title'] = ""
- headers = {"User-Agent": "Mozilla/5.0"}
- # 3. Process each row
- for index, row in df.iterrows():
- url = row.get('URL')
- new_title = ""
- if pd.notna(url):
- try:
- # Scraping logic
- # resp = scraper.get(url, timeout=15)
- resp = requests.get(url, headers=headers, timeout=10)
- if resp.status_code == 200:
- soup = BeautifulSoup(resp.content, 'html.parser')
- script_tag = soup.find('script', id='__NEXT_DATA__')
-
- if script_tag:
- try:
- raw_data = json.loads(script_tag.string)
- # Calling your dynamic title helper
- new_title = construct_dynamic_title(raw_data, selected_pt)
- except Exception:
- new_title = "Data Parsing Error"
- else:
- new_title = "Could not found attribute name on product details page"
- else:
- new_title = f"HTTP Error: {resp.status_code}"
- except Exception:
- new_title = "Request Failed (Timeout/Connection)"
- else:
- new_title = "URL Missing"
- # Update the DataFrame
- df.at[index, 'New_Generated_Title'] = new_title
-
- # Optional: Sleep to prevent getting blocked by the server
- time.sleep(1)
- # 4. Save the modified Excel to the MEDIA folder
- output_filename = f"completed_{task_id}_{task.original_filename}"
-
- # Ensure media directory exists
- if not os.path.exists(settings.MEDIA_ROOT):
- os.makedirs(settings.MEDIA_ROOT)
-
- output_path = os.path.join(settings.MEDIA_ROOT, output_filename)
- df.to_excel(output_path, index=False)
- # 5. Final Status Update
- task.status = 'COMPLETED'
- # Construct the URL for the frontend to download
- task.download_url = f"{settings.MEDIA_URL}{output_filename}"
- task.save()
- except Exception as e:
- print(f"Critical Task Failure: {e}")
- task.status = 'FAILED'
- task.save()
-
- finally:
- # 6. Cleanup the temporary uploaded file
- if os.path.exists(file_path):
- os.remove(file_path)
- @login_required
- def title_creator_async_view(request):
- if request.method == 'POST' and request.FILES.get('file'):
- excel_file = request.FILES['file']
- selected_pt = request.POST.get('product_type')
-
- # 1. Save file temporarily
- fs = FileSystemStorage()
- filename = fs.save(f"temp_{uuid.uuid4().hex}_{excel_file.name}", excel_file)
- file_path = fs.path(filename)
- # 2. Create Task Record
- task_id = str(uuid.uuid4())
- ProcessingTask.objects.create(
- task_id=task_id,
- original_filename=excel_file.name,
- status='PENDING'
- )
- # 3. Start Background Thread
- thread = threading.Thread(
- target=process_excel_task,
- args=(file_path, selected_pt, task_id)
- )
- thread.start()
- return JsonResponse({
- 'status': 'started',
- 'task_id': task_id,
- 'message': 'File is processing in the background.'
- })
-
- return JsonResponse({'error': 'Invalid request'}, status=400)
- # 2. This view is called repeatedly by pollStatus() in your JS
- def check_status(request, task_id):
- # Look up the task in the database
- task = get_object_or_404(ProcessingTask, task_id=task_id)
-
- return JsonResponse({
- 'status': task.status, # 'PENDING', 'COMPLETED', or 'FAILED'
- 'file_name': task.original_filename,
- 'download_url': task.download_url # This will be null until status is COMPLETED
- })
|