import os import json import time import requests import uuid import threading import pandas as pd from bs4 import BeautifulSoup from django.shortcuts import get_object_or_404, redirect, render from django.core.files.storage import FileSystemStorage from django.http import JsonResponse from .models import TitleMapping, AttributeMaster,ProcessingTask # <--- THIS FIXES THE ERROR from django.conf import settings import cloudscraper from django.contrib import messages from django.contrib.auth import authenticate, login, logout # from django.contrib.auth.decorators import login_required from .decorators import login_required from django.contrib.auth.hashers import make_password import random # To login def login_view(request): if request.method == "POST": email = request.POST.get("username") password = request.POST.get("password") print("Email: ", email) print("Password: ", password) # Authenticate the user user = authenticate(request, username=email, password=password) print("user",user) if user is not None: print("User authenticated successfully.") login(request, user) request.session['user_email'] = user.email # request.session = user # request.session['full_name'] = f"{user.firstName} {user.lastName or ''}".strip() # # Store both human-readable role and code # request.session['role'] = user.get_role_display() # 'Super Admin', 'Admin', 'RTA' # request.session['role_code'] = user.role # '0', '1', '2' # request.session['joining_date'] = user.createdDate.strftime("%b, %Y") # request.session['userId'] = user.userId # 📌 Store client_id if user has a client associated # request.session['client_id'] = user.client.clientId if user.client else None return redirect('title_creator_home') else: print("Invalid credentials.") messages.error(request, "Invalid email or password.") return redirect('login') print("Rendering login page.") return render(request, 'login.html') # To logout @login_required def logout_view(request): logout(request) messages.success(request, "You have been logged out successfully.") return redirect('login') @login_required def master_config_view(request): if request.method == 'POST': action = request.POST.get('action') # Part 1: Add New Attribute if action == 'add_attribute': name = request.POST.get('attr_name') is_m = request.POST.get('is_mandatory') == 'on' if name: AttributeMaster.objects.get_or_create(name=name.strip(), defaults={'is_mandatory': is_m}) # Part 2: Add New Title Mapping (Product Type) # --- MAPPING ACTIONS (CREATE & UPDATE) --- elif action in ['add_mapping', 'update_mapping']: pt = request.POST.get('pt_name') seq = request.POST.get('sequence') edit_id = request.POST.get('edit_id') if action == 'update_mapping' and edit_id: # Update existing mapping = get_object_or_404(TitleMapping, id=edit_id) mapping.product_type = pt.strip() mapping.format_sequence = seq mapping.save() else: # Create new (using get_or_create to prevent exact duplicates) if pt: TitleMapping.objects.get_or_create( product_type=pt.strip(), defaults={'format_sequence': seq} ) # --- MAPPING DELETE --- elif action == 'delete_mapping': mapping_id = request.POST.get('id') TitleMapping.objects.filter(id=mapping_id).delete() # Part 3: Delete functionality elif action == 'delete_attribute': AttributeMaster.objects.filter(id=request.POST.get('id')).delete() return redirect('title_creator_master') # GET: Load all data context = { 'attributes': AttributeMaster.objects.all().order_by('name'), 'mappings': TitleMapping.objects.all().order_by('product_type'), } return render(request, 'title_creator_master.html', context) def save_config_api(request): if request.method == 'POST': try: data = json.loads(request.body) # Update Mandatory Attributes # Expected data: { "mandatory_ids": [1, 3, 5] } AttributeMaster.objects.all().update(is_mandatory=False) AttributeMaster.objects.filter(id__in=data.get('mandatory_ids', [])).update(is_mandatory=True) # Update Title Sequences # Expected data: { "mappings": [{"id": 1, "sequence": "Brand,Color"}] } for m in data.get('mappings', []): TitleMapping.objects.filter(id=m['id']).update(format_sequence=m['sequence']) return JsonResponse({'success': True}) except Exception as e: return JsonResponse({'success': False, 'error': str(e)}) # def extract_title_or_error(product,selected_pt): # # 1. Identify Product Type from JSON to fetch the correct Mapping # pt_name = selected_pt # try: # mapping = TitleMapping.objects.get(product_type=pt_name) # config_sequence = mapping.get_sequence_list() # except TitleMapping.DoesNotExist: # return f"No Title Configuration found for Product Type: {pt_name}" # # 2. Get Mandatory list from DB # mandatory_fields = list(AttributeMaster.objects.filter(is_mandatory=True).values_list('name', flat=True)) # # 3. Data Extraction (Your logic) # extracted_data = { # "Brand": product.get("brand"), # "Product Type": pt_name # } # dimensions = {} # for group in product.get("attributeGroups", []): # for attr in group.get("attributes", []): # desc = attr.get("attributeDesc") # value = attr.get("attributeValue") # if desc == "Capacity": # extracted_data[desc] = f"Capacity {value}" # if desc in ["Door Type", "Capacity", "Color"]: # extracted_data[desc] = value # elif desc in ["Width", "Depth", "Height"]: # dimensions[desc] = value # if {"Width", "Depth", "Height"}.issubset(dimensions): # # extracted_data["Dimensions"] = f'{dimensions["Width"]} x {dimensions["Depth"]} x {dimensions["Height"]}' # w, d, h = dimensions["Width"], dimensions["Depth"], dimensions["Height"] # extracted_data["Dimensions"] = f'{w}"w x {d}"d x {h}"h' # # 4. Build Title and Check Mandatory Rules from DB # final_title_parts = [] # missing_mandatory = [] # for attr_name in config_sequence: # val = extracted_data.get(attr_name) # if not val or str(val).strip() == "": # # If DB says it's mandatory, track the error # if attr_name in mandatory_fields: # missing_mandatory.append(attr_name) # continue # final_title_parts.append(str(val)) # # 5. Result # if missing_mandatory: # return f"Could not found {', '.join(missing_mandatory)} on Product Details page" # return " ".join(final_title_parts) def extract_title_or_error(product, selected_pt): # 1. Identify Product Type pt_name = selected_pt try: mapping = TitleMapping.objects.get(product_type=pt_name) config_sequence = mapping.get_sequence_list() except TitleMapping.DoesNotExist: return f"No Title Configuration found for Product Type: {pt_name}" mandatory_fields = list(AttributeMaster.objects.filter(is_mandatory=True).values_list('name', flat=True)) # 2. Data Extraction extracted_data = { "Brand": product.get("brand")+"©", "Product Type": pt_name } dimensions = {} for group in product.get("attributeGroups", []): for attr in group.get("attributes", []): desc = attr.get("attributeDesc") val = attr.get("attributeValue") if desc == "Capacity": extracted_data[desc] = f"Capacity {val}" elif desc in ["Door Type", "Color"]: extracted_data[desc] = val elif desc in ["Width", "Depth", "Height"]: dimensions[desc] = val if {"Width", "Depth", "Height"}.issubset(dimensions): w, d, h = dimensions["Width"], dimensions["Depth"], dimensions["Height"] # We use .replace(" in", "") to remove the existing unit before adding the " symbol w = dimensions["Width"].replace(" in", "").strip() d = dimensions["Depth"].replace(" in", "").strip() h = dimensions["Height"].replace(" in", "").strip() extracted_data["Dimensions"] = f'{w}"W x {d}"D x {h}"H' # 3. Build Title Parts final_title_parts = [] missing_mandatory = [] for attr_name in config_sequence: val = extracted_data.get(attr_name) if not val or str(val).strip() == "": if attr_name in mandatory_fields: missing_mandatory.append(attr_name) continue final_title_parts.append(str(val)) if missing_mandatory: return f"Could not found {', '.join(missing_mandatory)} on Product Details page" # Helper function to join parts: Brand PT, Param1, Param2 def construct_string(parts): if len(parts) <= 2: return " ".join(parts) return f"{parts[0]} {parts[1]}, {', '.join(parts[2:])}" current_title = construct_string(final_title_parts) # 4. Length Reduction Logic (Step-by-Step) print("Current Title 1 ########",current_title,len(current_title)) # Step 1: Change "Capacity" -> "Cap." if len(current_title) > 100: for i, part in enumerate(final_title_parts): if "Capacity" in part: final_title_parts[i] = part.replace("Capacity", "Cap.") current_title = construct_string(final_title_parts) print("Current Title 2 ########",current_title,len(current_title)) # Step 2: Shorten Product Type (e.g., Stainless Steel -> SS) # Step B: Dynamic Product Type Acronym if len(current_title) > 100: pt_part = final_title_parts[1] words = pt_part.split() if len(words) > 1: # Takes first letter of every word in the Product Type final_title_parts[1] = "".join([w[0].upper() for w in words]) current_title = construct_string(final_title_parts) print("Current Title 3 ########",current_title,len(current_title)) # Step 3: Remove spaces from attributes starting from the back # Brand (0) and Product Type (1) are skipped if len(current_title) > 100: for i in range(len(final_title_parts) - 1, 1, -1): if len(current_title) <= 100: break # Remove white spaces from the current attribute part final_title_parts[i] = final_title_parts[i].replace(" ", "") current_title = construct_string(final_title_parts) print("Current Title 4 ########",current_title,len(current_title)) return current_title def construct_dynamic_title(raw_data,selected_pt): try: product = raw_data.get("props", {}).get("pageProps", {}).get("product", {}) if not product: return "Product data not found" return extract_title_or_error(product,selected_pt).strip() except Exception: return "Could not found attribute name on product details page" @login_required def title_creator_view(request): if request.method == 'POST' and request.FILES.get('file'): excel_file = request.FILES['file'] selected_pt = request.POST.get('product_type') fs = FileSystemStorage() filename = fs.save(excel_file.name, excel_file) file_path = fs.path(filename) try: # 1. Read Excel df = pd.read_excel(file_path) # 2. Add the NEW COLUMN if it doesn't exist if 'New_Generated_Title' not in df.columns: df['New_Generated_Title'] = "" headers = {"User-Agent": "Mozilla/5.0"} results_for_ui = [] # Specific Headers for the Item# API api_headers = { "accept": "application/json, text/plain, */*", "authorization": "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzUxMiJ9.eyJzdWIiOiJERVYifQ.uOFB7h7_Aw6jbA1HSqVJ44tKMO7E1ljz1kV_JddeKL64YCOH57-l1ZX2Lly-Jnhdnxk3xMAeW5FawAgymEaMKA", "client_id": "GEC", "referer": "https://www.globalindustrial.com/" } # 3. Process each row for index, row in df.iterrows(): url = row.get('URL') # Assumes your excel has a 'URL' column item_number = row.get('Item#') new_title = "" final_url = None # Step 1: Resolve the URL if pd.notna(url) and str(url).startswith('http'): final_url = url elif pd.notna(item_number): # Call API to get URL from Item# api_url = f"https://www.globalindustrial.com/catalogApis/catalog/autosuggest?key={item_number}&features=true" try: api_resp = requests.get(api_url, headers=api_headers, timeout=10) if api_resp.status_code == 200: data = api_resp.json() final_url = data.get('exactMatch', {}).get('canonicalLink') except Exception as e: new_title = f"API Error for Item# {item_number}" if pd.notna(final_url): try: resp = requests.get(final_url, headers=headers, timeout=10) soup = BeautifulSoup(resp.content, 'html.parser') script_tag = soup.find('script', id='__NEXT_DATA__') if script_tag: raw_data = json.loads(script_tag.string) new_title = construct_dynamic_title(raw_data,selected_pt) else: new_title = "Could not found attribute name on product details page" except: new_title = "Could not found attribute name on product details page" else: new_title = "URL Missing" # Update the DataFrame column for this row df.at[index, 'New_Generated_Title'] = new_title results_for_ui.append({ "id" : index + 1, "url": final_url, "new_title": new_title, "status": True }) # Generates a random float between 3.0 and 7.0 time.sleep(random.uniform(3, 7)) # time.sleep(1) # Safety delay # 4. Save the modified Excel to a new path output_filename = f"processed_{excel_file.name}" output_path = os.path.join(fs.location, output_filename) df.to_excel(output_path, index=False) return JsonResponse({ 'success': True, 'results': results_for_ui, 'download_url': fs.url(output_filename) }) finally: if os.path.exists(file_path): os.remove(file_path) # GET request: Fetch all product types for the dropdown product_types = TitleMapping.objects.all().values_list('product_type', flat=True) return render(request, 'title_creator_index.html', {'product_types': product_types}) # return render(request, 'title_creator_index.html') async def get_fresh_token(scraper): """Hits the homepage once to extract the latest Bearer token.""" base_url = "https://www.globalindustrial.com" try: # One-time hit to the base URL response = scraper.get(base_url, timeout=15) # 1. Check Cookies for 'Authorization' token = scraper.cookies.get('Authorization') if token: return token.replace('Bearer ', '').strip() # 2. Check __NEXT_DATA__ script in HTML soup = BeautifulSoup(response.content, 'html.parser') script_tag = soup.find('script', id='__NEXT_DATA__') if script_tag: data = json.loads(script_tag.string) # Standard Next.js path for auth tokens token = data.get('props', {}).get('pageProps', {}).get('token') if token: return token except Exception as e: print(f"Token retrieval failed: {e}") # Fallback to your hardcoded token if extraction fails return "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzUxMiJ9.eyJzdWIiOiJERVYifQ.uOFB7h7_Aw6jbA1HSqVJ44tKMO7E1ljz1kV_JddeKL64YCOH57-l1ZX2Lly-Jnhdnxk3xMAeW5FawAgymEaMKA" async def process_excel_task(file_path, selected_pt, task_id): # Retrieve the task record from the database scraper = cloudscraper.create_scraper() # This replaces requests.get task = ProcessingTask.objects.get(task_id=task_id) try: # 1. Read Excel df = pd.read_excel(file_path) # 2. Add the NEW COLUMN if it doesn't exist if 'New_Generated_Title' not in df.columns: df['New_Generated_Title'] = "" headers = {"User-Agent": "Mozilla/5.0"} # dynamic_token = await get_fresh_token(scraper) # Specific Headers for the Item# API api_headers = { "accept": "application/json, text/plain, */*", "authorization": "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzUxMiJ9.eyJzdWIiOiJERVYifQ.uOFB7h7_Aw6jbA1HSqVJ44tKMO7E1ljz1kV_JddeKL64YCOH57-l1ZX2Lly-Jnhdnxk3xMAeW5FawAgymEaMKA", #f"Bearer {dynamic_token}", "client_id": "GEC", "referer": "https://www.globalindustrial.com/" } # 3. Process each row for index, row in df.iterrows(): url = row.get('URL') new_title = "" item_number = row.get('Item#') final_url = None # Step 1: Resolve the URL if pd.notna(url) and str(url).startswith('http'): final_url = url elif pd.notna(item_number): # Call API to get URL from Item# api_url = f"https://www.globalindustrial.com/catalogApis/catalog/autosuggest?key={item_number}&features=true" try: api_resp = requests.get(api_url, headers=api_headers, timeout=10) if api_resp.status_code == 200: data = api_resp.json() final_url = data.get('exactMatch', {}).get('canonicalLink') except Exception as e: new_title = f"API Error for Item# {item_number}" if pd.notna(final_url): try: # Scraping logic # resp = scraper.get(url, timeout=15) resp = requests.get(final_url, headers=headers, timeout=10) if resp.status_code == 200: soup = BeautifulSoup(resp.content, 'html.parser') script_tag = soup.find('script', id='__NEXT_DATA__') if script_tag: try: raw_data = json.loads(script_tag.string) # Calling your dynamic title helper new_title = construct_dynamic_title(raw_data, selected_pt) except Exception: new_title = "Data Parsing Error" else: new_title = "Could not found attribute name on product details page" else: new_title = f"HTTP Error: {resp.status_code}" except Exception: new_title = "Request Failed (Timeout/Connection)" else: new_title = "URL Missing" # Update the DataFrame df.at[index, 'New_Generated_Title'] = new_title # Optional: Sleep to prevent getting blocked by the server # Generates a random float between 3.0 and 7.0 time.sleep(random.uniform(3, 7)) # time.sleep(1) # 4. Save the modified Excel to the MEDIA folder output_filename = f"completed_{task_id}_{task.original_filename}" # Ensure media directory exists if not os.path.exists(settings.MEDIA_ROOT): os.makedirs(settings.MEDIA_ROOT) output_path = os.path.join(settings.MEDIA_ROOT, output_filename) df.to_excel(output_path, index=False) # 5. Final Status Update task.status = 'COMPLETED' # Construct the URL for the frontend to download task.download_url = f"{settings.MEDIA_URL}{output_filename}" task.save() except Exception as e: print(f"Critical Task Failure: {e}") task.status = 'FAILED' task.save() finally: # 6. Cleanup the temporary uploaded file if os.path.exists(file_path): os.remove(file_path) @login_required def title_creator_async_view(request): if request.method == 'POST' and request.FILES.get('file'): excel_file = request.FILES['file'] selected_pt = request.POST.get('product_type') # 1. Save file temporarily fs = FileSystemStorage() filename = fs.save(f"temp_{uuid.uuid4().hex}_{excel_file.name}", excel_file) file_path = fs.path(filename) # 2. Create Task Record task_id = str(uuid.uuid4()) ProcessingTask.objects.create( task_id=task_id, original_filename=excel_file.name, status='PENDING' ) # 3. Start Background Thread thread = threading.Thread( target=process_excel_task, args=(file_path, selected_pt, task_id) ) thread.start() return JsonResponse({ 'status': 'started', 'task_id': task_id, 'message': 'File is processing in the background.' }) return JsonResponse({'error': 'Invalid request'}, status=400) # 2. This view is called repeatedly by pollStatus() in your JS def check_status(request, task_id): # Look up the task in the database task = get_object_or_404(ProcessingTask, task_id=task_id) return JsonResponse({ 'status': task.status, # 'PENDING', 'COMPLETED', or 'FAILED' 'file_name': task.original_filename, 'download_url': task.download_url # This will be null until status is COMPLETED }) @login_required def title_creator_history_page(request): # Renders the HTML page return render(request, 'title_creator_history.html') @login_required def get_title_creator_tasks_json(request): # Returns the list of tasks as JSON for the history table tasks = ProcessingTask.objects.all().order_by('-created_at')[:50] # Latest 50 tasks data = [] for t in tasks: data.append({ 'task_id': t.task_id, 'filename': t.original_filename or "Unknown File", 'status': t.status, 'url': t.download_url, 'date': t.created_at.strftime("%d %b %Y, %I:%M %p") }) return JsonResponse(data, safe=False)