| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296 |
- # # background_remover/views.py
- # import io
- # from PIL import Image
- # from django.http import HttpResponse
- # from rest_framework.views import APIView
- # from rest_framework.parsers import MultiPartParser
- # from .services import BiRefNetService
- # from django.shortcuts import render
- # import zipfile
- # import time
- # class BackgroundRemovalView(APIView):
- # parser_classes = [MultiPartParser]
- # def get(self, request):
- # return render(request, "bg_remover_index.html")
- # def post(self, request, *args, **kwargs):
- # if 'image' not in request.FILES:
- # return HttpResponse("No image provided", status=400)
- # # 1. Load image from request
- # input_file = request.FILES['image']
- # input_image = Image.open(input_file)
- # # 2. Process via Service
- # service = BiRefNetService()
- # output_image = service.remove_background(input_image)
- # # 3. Prepare response
- # buffer = io.BytesIO()
- # output_image.save(buffer, format="PNG")
-
- # return HttpResponse(buffer.getvalue(), content_type="image/png")
-
- # class BulkBackgroundRemovalAPI(APIView):
- # parser_classes = [MultiPartParser]
- # def post(self, request):
- # files = request.FILES.getlist("images")
- # if not files:
- # return HttpResponse({"error": "No images provided"}, status=400)
- # service = BiRefNetService()
- # start_time = time.time()
- # zip_buffer = io.BytesIO()
- # # with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
- # # for file in files:
- # # try:
- # # image = Image.open(file).convert("RGB")
- # # output = service.remove_background(image)
- # # img_buffer = io.BytesIO()
- # # output.save(img_buffer, format="PNG")
- # # name = file.name.rsplit(".", 1)[0] + "_whitebg.png"
- # # zip_file.writestr(name, img_buffer.getvalue())
- # # except Exception as e:
- # # print(f"Error processing {file.name}: {e}")
- # with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as master_zip:
- # for file in files:
- # # CHECK IF FILE IS A ZIP
- # if file.name.lower().endswith('.zip'):
- # with zipfile.ZipFile(file, 'r') as user_zip:
- # for inner_file_name in user_zip.namelist():
- # # Skip directories and non-image files
- # if inner_file_name.endswith(('.png', '.jpg', '.jpeg', '.webp')):
- # with user_zip.open(inner_file_name) as inner_file:
- # img_data = inner_file.read()
- # image = Image.open(io.BytesIO(img_data)).convert("RGB")
- # output = service.remove_background(image)
-
- # img_io = io.BytesIO()
- # output.save(img_io, format="PNG")
-
- # # Create clean name for the output zip
- # clean_name = inner_file_name.rsplit(".", 1)[0].split('/')[-1] + "_no_bg.png"
- # master_zip.writestr(clean_name, img_io.getvalue())
-
- # # PROCESS INDIVIDUAL IMAGES (Existing logic)
- # else:
- # try:
- # image = Image.open(file).convert("RGB")
- # output = service.remove_background(image)
- # img_io = io.BytesIO()
- # output.save(img_io, format="PNG")
- # name = file.name.rsplit(".", 1)[0] + "_no_bg.png"
- # master_zip.writestr(name, img_io.getvalue())
- # except Exception as e:
- # print(f"Error processing {file.name}: {e}")
- # zip_buffer.seek(0)
- # total_time = round(time.time() - start_time, 2)
- # response = HttpResponse(
- # zip_buffer.getvalue(),
- # content_type="application/zip"
- # )
- # response["Content-Disposition"] = "attachment; filename=processed_images.zip"
- # response["X-Processing-Time"] = f"{total_time}s"
- # return response
- import uuid
- import threading
- import os
- import io
- import zipfile
- import csv
- from django.shortcuts import render
- from django.http import HttpResponse, JsonResponse
- from django.conf import settings
- from rest_framework.views import APIView
- from rest_framework.parsers import MultiPartParser
- from PIL import Image
- from .models import BackgroundTask
- from .services import BiRefNetService
- class BackgroundRemovalView(APIView):
- parser_classes = [MultiPartParser]
- def get(self, request):
- return render(request, "bg_remover_index.html")
- def post(self, request, *args, **kwargs):
- if 'image' not in request.FILES:
- return HttpResponse("No image provided", status=400)
- # 1. Load image from request
- input_file = request.FILES['image']
- input_image = Image.open(input_file)
- # 2. Process via Service
- service = BiRefNetService()
- output_image = service.remove_background(input_image)
- # 3. Prepare response
- buffer = io.BytesIO()
- output_image.save(buffer, format="PNG")
-
- return HttpResponse(buffer.getvalue(), content_type="image/png")
-
- class BulkBackgroundRemovalAPI(APIView):
- parser_classes = [MultiPartParser]
- def post(self, request):
- files = request.FILES.getlist("images")
- if not files:
- return JsonResponse({"error": "No images provided"}, status=400)
- # 1. Create a database record for the task
- task_id = uuid.uuid4()
- BackgroundTask.objects.create(task_id=task_id, status='PROCESSING')
- # 2. Read files into memory to pass to the thread
- file_list = []
- for f in files:
- file_list.append({
- "name": f.name,
- "content": f.read()
- })
- # 3. Start background thread
- thread = threading.Thread(
- target=self.process_images_logic,
- args=(task_id, file_list)
- )
- thread.start()
- return JsonResponse({
- "task_id": str(task_id),
- "message": "Task started",
- "status_url": f"/api/status/{task_id}/"
- }, status=202)
- def process_images_logic(self, task_id, file_list):
- """Heavy AI logic running in background"""
- service = BiRefNetService()
- log_data = []
-
- # Prepare folders
- folder_path = os.path.join(settings.MEDIA_ROOT, 'bulk_results', str(task_id))
- os.makedirs(folder_path, exist_ok=True)
- zip_filename = f"result_{task_id}.zip"
- zip_abs_path = os.path.join(folder_path, zip_filename)
- try:
- with zipfile.ZipFile(zip_abs_path, "w", zipfile.ZIP_DEFLATED) as master_zip:
- for item in file_list:
- name, content = item["name"], item["content"]
-
- try:
- # Process if it's a zip
- if name.lower().endswith('.zip'):
- with zipfile.ZipFile(io.BytesIO(content)) as user_zip:
- for inner_name in user_zip.namelist():
- if inner_name.lower().endswith(('.png', '.jpg', '.jpeg', '.webp')):
- with user_zip.open(inner_name) as inner_file:
- img = Image.open(io.BytesIO(inner_file.read())).convert("RGB")
- out = service.remove_background(img)
- buf = io.BytesIO()
- out.save(buf, format="PNG")
- master_zip.writestr(f"no_bg_{inner_name.split('/')[-1]}", buf.getvalue())
- log_data.append([inner_name, "SUCCESS", "Done"])
-
- # Process if it's an image
- else:
- img = Image.open(io.BytesIO(content)).convert("RGB")
- out = service.remove_background(img)
- buf = io.BytesIO()
- out.save(buf, format="PNG")
- clean_name = name.rsplit(".", 1)[0] + "_no_bg.png"
- master_zip.writestr(clean_name, buf.getvalue())
- log_data.append([name, "SUCCESS", "Done"])
- except Exception as e:
- log_data.append([name, "FAIL", str(e)])
- # Create CSV Log inside the Zip
- csv_buf = io.StringIO()
- writer = csv.writer(csv_buf)
- writer.writerow(["Filename", "Status", "Message"])
- writer.writerows(log_data)
- master_zip.writestr("processing_report.csv", csv_buf.getvalue())
- # 4. Update model to COMPLETED
- task = BackgroundTask.objects.get(task_id=task_id)
- task.status = 'COMPLETED'
- # Save the relative path for the FileField
- task.zip_file = f"bulk_results/{task_id}/{zip_filename}"
- task.save()
- except Exception as e:
- task = BackgroundTask.objects.get(task_id=task_id)
- task.status = 'FAILED'
- task.error_message = str(e)
- task.save()
- class TaskStatusAPI(APIView):
- def get(self, request, task_id):
- try:
- task = BackgroundTask.objects.get(task_id=task_id)
- return JsonResponse({
- "task_id": task.task_id,
- "status": task.status,
- "download_url": task.zip_file.url if task.zip_file else None,
- "error": task.error_message
- })
- except BackgroundTask.DoesNotExist:
- return JsonResponse({"error": "Task not found"}, status=404)
-
- class TaskHistoryListView(APIView):
- def get(self, request):
- # Fetch latest 20 tasks
- tasks = BackgroundTask.objects.all().order_by('-created_at')[:20]
- data = []
- for t in tasks:
- data.append({
- "task_id": str(t.task_id),
- "status": t.status,
- "created_at": t.created_at.strftime("%Y-%m-%d %H:%M"),
- "download_url": t.zip_file.url if t.zip_file else None
- })
- return HttpResponse(data)
- # Make sure this name matches what you put in urls.py
- class TaskHistoryAPI(APIView):
- def get(self, request):
- # Fetch latest 20 tasks
- tasks = BackgroundTask.objects.all().order_by('-created_at')[:20]
- data = []
- for t in tasks:
- data.append({
- "task_id": str(t.task_id),
- "status": t.status,
- # Using 'date' key to match the JS 't.date' in the history page
- "date": t.created_at.strftime("%Y-%m-%d %H:%M"),
- "url": t.zip_file.url if t.zip_file else None
- })
- # Use JsonResponse so the frontend can read it as an object
- return JsonResponse(data, safe=False)
- class TaskHistoryPageView(APIView):
- def get(self, request):
- return render(request, "bg_remover_history.html")
|