| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677 |
- import os
- import json
- import time
- import requests
- import uuid
- import threading
- import pandas as pd
- from bs4 import BeautifulSoup
- from django.shortcuts import get_object_or_404, redirect, render
- from django.core.files.storage import FileSystemStorage
- from django.http import JsonResponse
- from .models import TitleMapping, AttributeMaster,ProcessingTask # <--- THIS FIXES THE ERROR
- from django.conf import settings
- import cloudscraper
- from django.contrib import messages
- from django.contrib.auth import authenticate, login, logout
- # from django.contrib.auth.decorators import login_required
- from .decorators import login_required
- from django.contrib.auth.hashers import make_password
- import random
- from rest_framework import status
- from rest_framework.views import APIView
- from django.utils import timezone
- import logging
- from zoneinfo import ZoneInfo
- logger = logging.getLogger(__name__)
- # To login
- def login_view(request):
- if request.method == "POST":
- email = request.POST.get("username")
- password = request.POST.get("password")
- print("Email: ", email)
- print("Password: ", password)
- # Authenticate the user
- user = authenticate(request, username=email, password=password)
- print("user",user)
-
- if user is not None:
- print("User authenticated successfully.")
- login(request, user)
- request.session['user_email'] = user.email
- # request.session = user
- # request.session['full_name'] = f"{user.firstName} {user.lastName or ''}".strip()
- # # Store both human-readable role and code
- # request.session['role'] = user.get_role_display() # 'Super Admin', 'Admin', 'RTA'
- # request.session['role_code'] = user.role # '0', '1', '2'
- # request.session['joining_date'] = user.createdDate.strftime("%b, %Y")
- # request.session['userId'] = user.userId
- # 📌 Store client_id if user has a client associated
- # request.session['client_id'] = user.client.clientId if user.client else None
- return redirect('title_creator_home')
- else:
- print("Invalid credentials.")
- messages.error(request, "Invalid email or password.")
- return redirect('login')
- print("Rendering login page.")
- return render(request, 'login.html')
- # To logout
- @login_required
- def logout_view(request):
- logout(request)
- messages.success(request, "You have been logged out successfully.")
- return redirect('login')
- @login_required
- def master_config_view(request):
- if request.method == 'POST':
- action = request.POST.get('action')
-
- # Part 1: Add New Attribute
- if action == 'add_attribute':
- name = request.POST.get('attr_name')
- is_m = request.POST.get('is_mandatory') == 'on'
- if name:
- AttributeMaster.objects.get_or_create(name=name.strip(), defaults={'is_mandatory': is_m})
-
- # Part 2: Add New Title Mapping (Product Type)
- # --- MAPPING ACTIONS (CREATE & UPDATE) ---
- elif action in ['add_mapping', 'update_mapping']:
- pt = request.POST.get('pt_name')
- seq = request.POST.get('sequence')
- edit_id = request.POST.get('edit_id')
- if action == 'update_mapping' and edit_id:
- # Update existing
- mapping = get_object_or_404(TitleMapping, id=edit_id)
- mapping.product_type = pt.strip()
- mapping.format_sequence = seq
- mapping.save()
- else:
- # Create new (using get_or_create to prevent exact duplicates)
- if pt:
- TitleMapping.objects.get_or_create(
- product_type=pt.strip(),
- defaults={'format_sequence': seq}
- )
- # --- MAPPING DELETE ---
- elif action == 'delete_mapping':
- mapping_id = request.POST.get('id')
- TitleMapping.objects.filter(id=mapping_id).delete()
-
- # Part 3: Delete functionality
- elif action == 'delete_attribute':
- AttributeMaster.objects.filter(id=request.POST.get('id')).delete()
-
- return redirect('title_creator_master')
- # GET: Load all data
- context = {
- 'attributes': AttributeMaster.objects.all().order_by('name'),
- 'mappings': TitleMapping.objects.all().order_by('product_type'),
- }
- return render(request, 'title_creator_master.html', context)
- def save_config_api(request):
- if request.method == 'POST':
- try:
- data = json.loads(request.body)
-
- # Update Mandatory Attributes
- # Expected data: { "mandatory_ids": [1, 3, 5] }
- AttributeMaster.objects.all().update(is_mandatory=False)
- AttributeMaster.objects.filter(id__in=data.get('mandatory_ids', [])).update(is_mandatory=True)
-
- # Update Title Sequences
- # Expected data: { "mappings": [{"id": 1, "sequence": "Brand,Color"}] }
- for m in data.get('mappings', []):
- TitleMapping.objects.filter(id=m['id']).update(format_sequence=m['sequence'])
-
- return JsonResponse({'success': True})
- except Exception as e:
- return JsonResponse({'success': False, 'error': str(e)})
- # def extract_title_or_error(product,selected_pt):
- # # 1. Identify Product Type from JSON to fetch the correct Mapping
- # pt_name = selected_pt
- # try:
- # mapping = TitleMapping.objects.get(product_type=pt_name)
- # config_sequence = mapping.get_sequence_list()
- # except TitleMapping.DoesNotExist:
- # return f"No Title Configuration found for Product Type: {pt_name}"
- # # 2. Get Mandatory list from DB
- # mandatory_fields = list(AttributeMaster.objects.filter(is_mandatory=True).values_list('name', flat=True))
- # # 3. Data Extraction (Your logic)
- # extracted_data = {
- # "Brand": product.get("brand"),
- # "Product Type": pt_name
- # }
- # dimensions = {}
- # for group in product.get("attributeGroups", []):
- # for attr in group.get("attributes", []):
- # desc = attr.get("attributeDesc")
- # value = attr.get("attributeValue")
-
- # if desc == "Capacity":
- # extracted_data[desc] = f"Capacity {value}"
- # if desc in ["Door Type", "Capacity", "Color"]:
- # extracted_data[desc] = value
- # elif desc in ["Width", "Depth", "Height"]:
- # dimensions[desc] = value
- # if {"Width", "Depth", "Height"}.issubset(dimensions):
- # # extracted_data["Dimensions"] = f'{dimensions["Width"]} x {dimensions["Depth"]} x {dimensions["Height"]}'
- # w, d, h = dimensions["Width"], dimensions["Depth"], dimensions["Height"]
- # extracted_data["Dimensions"] = f'{w}"w x {d}"d x {h}"h'
- # # 4. Build Title and Check Mandatory Rules from DB
- # final_title_parts = []
- # missing_mandatory = []
- # for attr_name in config_sequence:
- # val = extracted_data.get(attr_name)
-
- # if not val or str(val).strip() == "":
- # # If DB says it's mandatory, track the error
- # if attr_name in mandatory_fields:
- # missing_mandatory.append(attr_name)
- # continue
-
- # final_title_parts.append(str(val))
- # # 5. Result
- # if missing_mandatory:
- # return f"Could not found {', '.join(missing_mandatory)} on Product Details page"
-
- # return " ".join(final_title_parts)
- def extract_title_or_error(product, selected_pt):
- # 1. Identify Product Type
- pt_name = selected_pt
- logger.info(f"IN extract_title_or_error")
- try:
- mapping = TitleMapping.objects.get(product_type=pt_name)
- config_sequence = mapping.get_sequence_list()
- except TitleMapping.DoesNotExist:
- return None,f"No Title Configuration found for Product Type: {pt_name}"
- mandatory_fields = list(AttributeMaster.objects.filter(is_mandatory=True).values_list('name', flat=True))
- # Loop through each group (e.g., Weights & Dimensions, Product Details)
- product_type = None
- # for product type
- for group in product.get("attributeGroups", []):
- # Loop through each attribute in that group
- for attr in group.get("attributes", []):
- if attr.get("attributeDesc") == "Type":
- product_type = attr.get("attributeValue")
- break # Stop searching once found
- # 2. Data Extraction
- extracted_data = {
- "Brand": product.get("brand"),
- "Product Type": product_type
- }
- dimensions = {}
- for group in product.get("attributeGroups", []):
- for attr in group.get("attributes", []):
- desc = attr.get("attributeDesc")
- val = attr.get("attributeValue")
-
- if desc == "Capacity":
- extracted_data[desc] = f"{val} Capacity"
- elif desc in ["Door Type", "Color"]:
- extracted_data[desc] = val
- elif desc in ["Width", "Depth", "Height"]:
- dimensions[desc] = val
- if {"Width", "Depth", "Height"}.issubset(dimensions):
- w, d, h = dimensions["Width"], dimensions["Depth"], dimensions["Height"]
- # We use .replace(" in", "") to remove the existing unit before adding the " symbol
- w = dimensions["Width"].replace(" in", "").strip()
- d = dimensions["Depth"].replace(" in", "").strip()
- h = dimensions["Height"].replace(" in", "").strip()
- extracted_data["Dimensions"] = f'{w}"W x {d}"D x {h}"H'
- # 3. Build Title Parts
- final_title_parts = []
- missing_mandatory = []
- for attr_name in config_sequence:
- val = extracted_data.get(attr_name)
- if not val or str(val).strip() == "":
- if attr_name in mandatory_fields:
- missing_mandatory.append(attr_name)
- continue
- final_title_parts.append(str(val))
- comment = None
- if missing_mandatory:
- comment = f"Could not found {', '.join(missing_mandatory)} on Product Details page"
- # return f"Could not found {', '.join(missing_mandatory)} on Product Details page"
- # Helper function to join parts: Brand PT, Param1, Param2
- def construct_string(parts):
- if len(parts) <= 2:
- return " ".join(parts)
- return f"{parts[0]} {parts[1]}, {', '.join(parts[2:])}"
- current_title = construct_string(final_title_parts)
- # 4. Length Reduction Logic (Step-by-Step)
- print("Current Title 1 ######## ",current_title,len(current_title))
- logger.info(f"Current Title 1 Initial ########,{current_title},{len(current_title)}")
- # Step 1: Change "Capacity" -> "Cap."
- if len(current_title) > 100:
- for i, part in enumerate(final_title_parts):
- if "Capacity" in part:
- final_title_parts[i] = part.replace("Capacity", "Cap.")
- current_title = construct_string(final_title_parts)
- print("Current Title 2 ########",current_title,len(current_title))
- logger.info(f"Current Title 2 shorting capacity ########,{current_title},{len(current_title)}")
- # Step 2: Shorten Product Type (e.g., Stainless Steel -> SS)
- # Step B: Dynamic Product Type Acronym
- # if len(current_title) > 100:
- # pt_part = final_title_parts[1]
- # words = pt_part.split()
- # if len(words) > 1:
- # # Takes first letter of every word in the Product Type
- # final_title_parts[1] = "".join([w[0].upper() for w in words])
- # current_title = construct_string(final_title_parts)
- # print("Current Title 3 ########",current_title,len(current_title))
- # logger.info(f"Current Title 3 change the title ########,{current_title},{len(current_title)}")
- # Step 3: Remove spaces from attributes starting from the back
- # Brand (0) and Product Type (1) are skipped
- if len(current_title) > 100:
- for i in range(len(final_title_parts) - 1, 1, -1):
- if len(current_title) <= 100:
- break
- # Remove white spaces from the current attribute part
- final_title_parts[i] = final_title_parts[i].replace(" ", "")
- current_title = construct_string(final_title_parts)
- print("Current Title 4 ########",current_title,len(current_title))
- logger.info(f"Current Title 4 Removing space ########,{current_title},{len(current_title)}")
- return current_title,comment
- def construct_dynamic_title(raw_data,selected_pt):
- try:
- product = raw_data.get("props", {}).get("pageProps", {}).get("product", {})
- if not product: return "Product data not found"
- logger.info(f"IN construct_dynamic_title")
- return extract_title_or_error(product,selected_pt)
- except Exception:
- return None,"Could not found attribute name on product details page"
- @login_required
- def title_creator_view(request):
- if request.method == 'POST' and request.FILES.get('file'):
- fresh_token = get_fresh_token()
- logger.info(f"fresh_token Value: {fresh_token}")
- if not fresh_token:
- fresh_token = "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzUxMiJ9.eyJzdWIiOiJERVYifQ.uOFB7h7_Aw6jbA1HSqVJ44tKMO7E1ljz1kV_JddeKL64YCOH57-l1ZX2Lly-Jnhdnxk3xMAeW5FawAgymEaMKA"
- excel_file = request.FILES['file']
- selected_pt = request.POST.get('product_type')
- fs = FileSystemStorage()
- filename = fs.save(excel_file.name, excel_file)
- file_path = fs.path(filename)
- try:
- # 1. Read Excel
- df = pd.read_excel(file_path)
-
- # 2. Add the NEW COLUMN if it doesn't exist
- if 'New_Generated_Title' not in df.columns:
- df['New_Generated_Title'] = ""
- if 'Comment' not in df.columns:
- df['Comment'] = ""
- headers = {"User-Agent": "Mozilla/5.0"}
- results_for_ui = []
- # Specific Headers for the Item# API
- api_headers = {
- "accept": "application/json, text/plain, */*",
- "authorization": fresh_token,#"Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzUxMiJ9.eyJzdWIiOiJERVYifQ.uOFB7h7_Aw6jbA1HSqVJ44tKMO7E1ljz1kV_JddeKL64YCOH57-l1ZX2Lly-Jnhdnxk3xMAeW5FawAgymEaMKA",
- "client_id": "GEC",
- "referer": "https://www.globalindustrial.com/"
- }
- # 3. Process each row
- for index, row in df.iterrows():
- url = row.get('URL') # Assumes your excel has a 'URL' column
- item_number = row.get('Item#')
- new_title = ""
- final_url = None
- comment = ""
- # Step 1: Resolve the URL
- if pd.notna(url) and str(url).startswith('http'):
- final_url = url
- elif pd.notna(item_number):
- # Call API to get URL from Item#
- api_url = f"https://www.globalindustrial.com/catalogApis/catalog/autosuggest?key={item_number}&features=true"
- try:
- api_resp = requests.get(api_url, headers=api_headers, timeout=10)
- if api_resp.status_code == 200:
- data = api_resp.json()
- final_url = data.get('exactMatch', {}).get('canonicalLink')
- except Exception as e:
- new_title = f"API Error for Item# {item_number}"
- if pd.notna(final_url):
- try:
- resp = requests.get(final_url, headers=headers, timeout=10)
- soup = BeautifulSoup(resp.content, 'html.parser')
- script_tag = soup.find('script', id='__NEXT_DATA__')
-
- if script_tag:
- raw_data = json.loads(script_tag.string)
- new_title,comment = construct_dynamic_title(raw_data,selected_pt)
- else:
- new_title,comment = "Could not found attribute name on product details page",None
- except:
- new_title,comment = "Could not found attribute name on product details page",None
- else:
- new_title,comment = "URL Missing",None
- # Update the DataFrame column for this row
- df.at[index, 'New_Generated_Title'] = new_title
- df.at[index, 'Comment'] = comment
-
- results_for_ui.append({
- "id" : index + 1,
- "url": final_url,
- "new_title": new_title,
- "comment": comment,
- "status": True
- })
- # Generates a random float between 3.0 and 7.0
- time.sleep(random.uniform(3, 7))
- # time.sleep(1) # Safety delay
- # 4. Save the modified Excel to a new path
- output_filename = f"processed_{excel_file.name}"
- output_path = os.path.join(fs.location, output_filename)
- df.to_excel(output_path, index=False)
- return JsonResponse({
- 'success': True,
- 'results': results_for_ui,
- 'download_url': fs.url(output_filename)
- })
- finally:
- if os.path.exists(file_path): os.remove(file_path)
- # GET request: Fetch all product types for the dropdown
- product_types = TitleMapping.objects.all().values_list('product_type', flat=True)
- return render(request, 'title_creator_index.html', {'product_types': product_types})
- # return render(request, 'title_creator_index.html')
- def get_fresh_token():
- """Hits the homepage to extract the latest Bearer token."""
- base_url = "https://www.globalindustrial.com"
- try:
- # Use a session to persist cookies
- session = requests.Session()
- response = session.get(base_url, timeout=15, headers={"User-Agent": "Mozilla/5.0"})
-
- # 1. Try Cookies
- token = session.cookies.get('Authorization')
- if token:
- return token if "Bearer" in token else f"Bearer {token}"
- # 2. Try NEXT_DATA
- soup = BeautifulSoup(response.content, 'html.parser')
- script_tag = soup.find('script', id='__NEXT_DATA__')
- if script_tag:
- data = json.loads(script_tag.string)
- token = data.get('props', {}).get('pageProps', {}).get('token')
- if token:
- return f"Bearer {token}"
- except Exception as e:
- print(f"Token retrieval failed: {e}")
- return None
- def process_excel_task(file_path, selected_pt, task_id):
- print("process excel task started.")
- # Retrieve the task record from the database
- # scraper = cloudscraper.create_scraper() # This replaces requests.get
- task = ProcessingTask.objects.get(task_id=task_id)
- fresh_token = get_fresh_token()
- logger.info(f"fresh_token Value: {fresh_token}")
- if not fresh_token:
- fresh_token = "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzUxMiJ9.eyJzdWIiOiJERVYifQ.uOFB7h7_Aw6jbA1HSqVJ44tKMO7E1ljz1kV_JddeKL64YCOH57-l1ZX2Lly-Jnhdnxk3xMAeW5FawAgymEaMKA"
-
- try:
- # 1. Read Excel
- df = pd.read_excel(file_path)
-
- # 2. Add the NEW COLUMN if it doesn't exist
- if 'New_Generated_Title' not in df.columns:
- df['New_Generated_Title'] = ""
- if 'Comment' not in df.columns:
- df['Comment'] = ""
- headers = {"User-Agent": "Mozilla/5.0"}
- # dynamic_token = await get_fresh_token(scraper)
- # Specific Headers for the Item# API
- api_headers = {
- "accept": "application/json, text/plain, */*",
- "authorization": fresh_token, #"Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzUxMiJ9.eyJzdWIiOiJERVYifQ.uOFB7h7_Aw6jbA1HSqVJ44tKMO7E1ljz1kV_JddeKL64YCOH57-l1ZX2Lly-Jnhdnxk3xMAeW5FawAgymEaMKA", #f"Bearer {dynamic_token}",
- "client_id": "GEC",
- "referer": "https://www.globalindustrial.com/"
- }
- # 3. Process each row
- for index, row in df.iterrows():
- logger.info(f"STARTED: {index}")
- url = row.get('URL')
- new_title = ""
- comment = ""
- item_number = row.get('Item#')
- final_url = None
- # Step 1: Resolve the URL
- if pd.notna(url) and str(url).startswith('http'):
- final_url = url
- elif pd.notna(item_number):
- # Call API to get URL from Item#
- api_url = f"https://www.globalindustrial.com/catalogApis/catalog/autosuggest?key={item_number}&features=true"
- try:
- api_resp = requests.get(api_url, headers=api_headers, timeout=10)
- if api_resp.status_code == 200:
- data = api_resp.json()
- final_url = data.get('exactMatch', {}).get('canonicalLink')
- except Exception as e:
- new_title,comment = f"API Error for Item# {item_number}"
- if pd.notna(final_url):
- try:
- # Scraping logic
- # resp = scraper.get(url, timeout=15)
- resp = requests.get(final_url, headers=headers, timeout=10)
- if resp.status_code == 200:
- soup = BeautifulSoup(resp.content, 'html.parser')
- script_tag = soup.find('script', id='__NEXT_DATA__')
-
- if script_tag:
- try:
- raw_data = json.loads(script_tag.string)
- # Calling your dynamic title helper
- new_title,comment = construct_dynamic_title(raw_data, selected_pt)
- except Exception:
- new_title,comment = "Data Parsing Error",None
- else:
- new_title,comment = "Could not found attribute name on product details page",None
- else:
- new_title,comment = f"HTTP Error: {resp.status_code}",None
- except Exception:
- new_title,comment = "Request Failed (Timeout/Connection)",None
- else:
- new_title,comment = "URL Missing",None
- # Update the DataFrame
- df.at[index, 'New_Generated_Title'] = new_title
- df.at[index, 'Comment'] = comment
-
- # Optional: Sleep to prevent getting blocked by the server
- # Generates a random float between 3.0 and 7.0
- time.sleep(random.uniform(3, 7))
- logger.info(f"ENDED: {index}")
- # time.sleep(1)
- # 4. Save the modified Excel to the MEDIA folder
- output_filename = f"completed_{task_id}_{task.original_filename}"
-
- # Ensure media directory exists
- if not os.path.exists(settings.MEDIA_ROOT):
- os.makedirs(settings.MEDIA_ROOT)
-
- output_path = os.path.join(settings.MEDIA_ROOT, output_filename)
- df.to_excel(output_path, index=False)
- # 5. Final Status Update
- task.status = 'COMPLETED'
- # Construct the URL for the frontend to download
- task.download_url = f"{settings.MEDIA_URL}{output_filename}"
- task.completed_at = timezone.now() # Sets the completion time to NOW (IST)
- task.save()
- print("process excel task ended.")
- except Exception as e:
- print(f"Critical Task Failure: {e}")
- task.status = 'FAILED'
- task.save()
-
- finally:
- # 6. Cleanup the temporary uploaded file
- if os.path.exists(file_path):
- os.remove(file_path)
- @login_required
- def title_creator_async_view(request):
- if request.method == 'POST' and request.FILES.get('file'):
-
- excel_file = request.FILES['file']
- selected_pt = request.POST.get('product_type')
-
- # 1. Save file temporarily
- fs = FileSystemStorage()
- filename = fs.save(f"temp_{uuid.uuid4().hex}_{excel_file.name}", excel_file)
- file_path = fs.path(filename)
- # 2. Create Task Record
- task_id = str(uuid.uuid4())
- ProcessingTask.objects.create(
- task_id=task_id,
- original_filename=excel_file.name,
- status='PENDING'
- )
- # 3. Start Background Thread
- thread = threading.Thread(
- target=process_excel_task,
- args=(file_path, selected_pt, task_id)
- )
- thread.start()
- return JsonResponse({
- 'status': 'started',
- 'task_id': task_id,
- 'message': 'File is processing in the background.'
- })
-
- return JsonResponse({'error': 'Invalid request'}, status=400)
- # 2. This view is called repeatedly by pollStatus() in your JS
- def check_status(request, task_id):
- # Look up the task in the database
- task = get_object_or_404(ProcessingTask, task_id=task_id)
-
- return JsonResponse({
- 'status': task.status, # 'PENDING', 'COMPLETED', or 'FAILED'
- 'file_name': task.original_filename,
- 'download_url': task.download_url # This will be null until status is COMPLETED
- })
- @login_required
- def title_creator_history_page(request):
- # Renders the HTML page
- return render(request, 'title_creator_history.html')
- @login_required
- def get_title_creator_tasks_json(request):
- # Returns the list of tasks as JSON for the history table
- tasks = ProcessingTask.objects.all().order_by('-created_at')[:50] # Latest 50 tasks
- data = []
- for t in tasks:
- data.append({
- 'task_id': t.task_id,
- 'filename': t.original_filename or "Unknown File",
- 'status': t.status,
- 'url': t.download_url,
- 'date': t.created_at.astimezone(ZoneInfo("Asia/Kolkata")).strftime("%d %b %Y, %I:%M %p"),
- # Use a conditional (ternary) operator to handle the null case
- 'completed_at': t.completed_at.astimezone(ZoneInfo("Asia/Kolkata")).strftime("%d %b %Y, %I:%M %p") if t.completed_at else ""
- # 'completed_at': t.completed_at.strftime("%d %b %Y, %I:%M %p")
- })
- print("data",data)
- return JsonResponse(data, safe=False)
- class TokenFetcherAPI(APIView):
- def get(self, request):
- token = fetch_global_industrial_token()
-
- if token:
- return JsonResponse({
- "status": "success",
- "token": token
- }, status=status.HTTP_200_OK)
-
- return JsonResponse({
- "status": "error",
- "message": "Could not retrieve token"
- }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
-
- def fetch_global_industrial_token():
- """Logic to scrape the token."""
- base_url = "https://www.globalindustrial.com"
- # Using cloudscraper to handle potential bot detection
- scraper = cloudscraper.create_scraper()
-
- try:
- response = scraper.get(base_url, timeout=15)
-
- # 1. Check Cookies
- token = scraper.cookies.get('Authorization')
- if token:
- return token.replace('Bearer ', '').strip()
- # 2. Check __NEXT_DATA__
- soup = BeautifulSoup(response.content, 'html.parser')
- script_tag = soup.find('script', id='__NEXT_DATA__')
- if script_tag:
- data = json.loads(script_tag.string)
- token = data.get('props', {}).get('pageProps', {}).get('token')
- if token:
- return token
-
- except Exception as e:
- return None
- return None
|