| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- # background_remover/tasks.py
- import os
- import io
- import zipfile
- import csv
- from celery import shared_task
- from django.conf import settings
- from PIL import Image
- from .services import BiRefNetService
- @shared_task
- def process_bulk_images_task(file_paths, task_id):
- service = BiRefNetService()
-
- # Define paths
- results_dir = os.path.join(settings.MEDIA_ROOT, 'bulk_results', task_id)
- os.makedirs(results_dir, exist_ok=True)
-
- zip_output_path = os.path.join(results_dir, f"processed_{task_id}.zip")
- log_file_path = os.path.join(results_dir, "report.csv")
-
- log_data = [] # To store rows: [Filename, Status, Error]
- with zipfile.ZipFile(zip_output_path, "w", zipfile.ZIP_DEFLATED) as master_zip:
- for file_path in file_paths:
- full_path = os.path.join(settings.MEDIA_ROOT, file_path)
- original_name = os.path.basename(file_path)
- try:
- # CASE 1: INPUT IS A ZIP
- if original_name.lower().endswith('.zip'):
- with zipfile.ZipFile(full_path, 'r') as user_zip:
- for inner_name in user_zip.namelist():
- if inner_name.lower().endswith(('.png', '.jpg', '.jpeg', '.webp')):
- try:
- with user_zip.open(inner_name) as inner_file:
- img = Image.open(io.BytesIO(inner_file.read())).convert("RGB")
- output = service.remove_background(img)
-
- img_io = io.BytesIO()
- output.save(img_io, format="PNG")
-
- clean_name = inner_name.split('/')[-1].rsplit('.', 1)[0] + "_no_bg.png"
- master_zip.writestr(clean_name, img_io.getvalue())
- log_data.append([inner_name, "SUCCESS", ""])
- except Exception as e:
- log_data.append([inner_name, "FAILED", str(e)])
- # CASE 2: INPUT IS AN IMAGE
- else:
- try:
- img = Image.open(full_path).convert("RGB")
- output = service.remove_background(img)
-
- img_io = io.BytesIO()
- output.save(img_io, format="PNG")
-
- name = original_name.rsplit(".", 1)[0] + "_no_bg.png"
- master_zip.writestr(name, img_io.getvalue())
- log_data.append([original_name, "SUCCESS", ""])
- except Exception as e:
- log_data.append([original_name, "FAILED", str(e)])
- finally:
- # Cleanup temporary uploaded files
- if os.path.exists(full_path):
- os.remove(full_path)
- # Create the CSV log inside the task folder
- with open(log_file_path, mode='w', newline='') as csvfile:
- writer = csv.writer(csvfile)
- writer.writerow(["Filename", "Status", "Error Message"])
- writer.writerows(log_data)
-
- # Add the log file into the zip too
- master_zip.write(log_file_path, "report.csv")
- return {
- "zip_url": f"{settings.MEDIA_URL}bulk_results/{task_id}/processed_{task_id}.zip",
- "log_url": f"{settings.MEDIA_URL}bulk_results/{task_id}/report.csv"
- }
|