views.py 18 KB


  1. # # background_remover/views.py
  2. # import io
  3. # from PIL import Image
  4. # from django.http import HttpResponse
  5. # from rest_framework.views import APIView
  6. # from rest_framework.parsers import MultiPartParser
  7. # from .services import BiRefNetService
  8. # from django.shortcuts import render
  9. # import zipfile
  10. # import time
  11. # class BackgroundRemovalView(APIView):
  12. # parser_classes = [MultiPartParser]
  13. # def get(self, request):
  14. # return render(request, "bg_remover_index.html")
  15. # def post(self, request, *args, **kwargs):
  16. # if 'image' not in request.FILES:
  17. # return HttpResponse("No image provided", status=400)
  18. # # 1. Load image from request
  19. # input_file = request.FILES['image']
  20. # input_image = Image.open(input_file)
  21. # # 2. Process via Service
  22. # service = BiRefNetService()
  23. # output_image = service.remove_background(input_image)
  24. # # 3. Prepare response
  25. # buffer = io.BytesIO()
  26. # output_image.save(buffer, format="PNG")
  27. # return HttpResponse(buffer.getvalue(), content_type="image/png")
  28. # class BulkBackgroundRemovalAPI(APIView):
  29. # parser_classes = [MultiPartParser]
  30. # def post(self, request):
  31. # files = request.FILES.getlist("images")
  32. # if not files:
  33. # return HttpResponse({"error": "No images provided"}, status=400)
  34. # service = BiRefNetService()
  35. # start_time = time.time()
  36. # zip_buffer = io.BytesIO()
  37. # # with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
  38. # # for file in files:
  39. # # try:
  40. # # image = Image.open(file).convert("RGB")
  41. # # output = service.remove_background(image)
  42. # # img_buffer = io.BytesIO()
  43. # # output.save(img_buffer, format="PNG")
  44. # # name = file.name.rsplit(".", 1)[0] + "_whitebg.png"
  45. # # zip_file.writestr(name, img_buffer.getvalue())
  46. # # except Exception as e:
  47. # # print(f"Error processing {file.name}: {e}")
  48. # with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as master_zip:
  49. # for file in files:
  50. # # CHECK IF FILE IS A ZIP
  51. # if file.name.lower().endswith('.zip'):
  52. # with zipfile.ZipFile(file, 'r') as user_zip:
  53. # for inner_file_name in user_zip.namelist():
  54. # # Skip directories and non-image files
  55. # if inner_file_name.endswith(('.png', '.jpg', '.jpeg', '.webp')):
  56. # with user_zip.open(inner_file_name) as inner_file:
  57. # img_data = inner_file.read()
  58. # image = Image.open(io.BytesIO(img_data)).convert("RGB")
  59. # output = service.remove_background(image)
  60. # img_io = io.BytesIO()
  61. # output.save(img_io, format="PNG")
  62. # # Create clean name for the output zip
  63. # clean_name = inner_file_name.rsplit(".", 1)[0].split('/')[-1] + "_no_bg.png"
  64. # master_zip.writestr(clean_name, img_io.getvalue())
  65. # # PROCESS INDIVIDUAL IMAGES (Existing logic)
  66. # else:
  67. # try:
  68. # image = Image.open(file).convert("RGB")
  69. # output = service.remove_background(image)
  70. # img_io = io.BytesIO()
  71. # output.save(img_io, format="PNG")
  72. # name = file.name.rsplit(".", 1)[0] + "_no_bg.png"
  73. # master_zip.writestr(name, img_io.getvalue())
  74. # except Exception as e:
  75. # print(f"Error processing {file.name}: {e}")
  76. # zip_buffer.seek(0)
  77. # total_time = round(time.time() - start_time, 2)
  78. # response = HttpResponse(
  79. # zip_buffer.getvalue(),
  80. # content_type="application/zip"
  81. # )
  82. # response["Content-Disposition"] = "attachment; filename=processed_images.zip"
  83. # response["X-Processing-Time"] = f"{total_time}s"
  84. # return response
  85. import uuid
  86. import threading
  87. import os
  88. import io
  89. import zipfile
  90. import csv
  91. from django.shortcuts import render
  92. from django.http import HttpResponse, JsonResponse
  93. from django.conf import settings
  94. from rest_framework.views import APIView
  95. from rest_framework.parsers import MultiPartParser
  96. from PIL import Image
  97. from .models import BackgroundTask
  98. from .services import BiRefNetService
  99. # from .blip_service import BLIPService
  100. class BackgroundRemovalView(APIView):
  101. parser_classes = [MultiPartParser]
  102. def get(self, request):
  103. return render(request, "bg_remover_index.html")
  104. def post(self, request, *args, **kwargs):
  105. if 'image' not in request.FILES:
  106. return HttpResponse("No image provided", status=400)
  107. # 1. Load image from request
  108. input_file = request.FILES['image']
  109. input_image = Image.open(input_file)
  110. # 2. Process via Service
  111. service = BiRefNetService()
  112. output_image = service.remove_background(input_image)
  113. # 3. Prepare response
  114. buffer = io.BytesIO()
  115. output_image.save(buffer, format="PNG")
  116. return HttpResponse(buffer.getvalue(), content_type="image/png")
  117. class BulkBackgroundRemovalAPI(APIView):
  118. parser_classes = [MultiPartParser]
  119. def post(self, request):
  120. files = request.FILES.getlist("images")
  121. if not files:
  122. return JsonResponse({"error": "No images provided"}, status=400)
  123. # 1. Create a database record for the task
  124. task_id = uuid.uuid4()
  125. BackgroundTask.objects.create(task_id=task_id, task_type="BG_REMOVE", status='PROCESSING')
  126. # 2. Read files into memory to pass to the thread
  127. file_list = []
  128. for f in files:
  129. file_list.append({
  130. "name": f.name,
  131. "content": f.read()
  132. })
  133. # 3. Start background thread
  134. thread = threading.Thread(
  135. target=self.process_images_logic,
  136. args=(task_id, file_list)
  137. )
  138. thread.start()
  139. return JsonResponse({
  140. "task_id": str(task_id),
  141. "message": "Task started",
  142. "status_url": f"/api/status/{task_id}/"
  143. }, status=202)
  144. def process_images_logic(self, task_id, file_list):
  145. """Heavy AI logic running in background"""
  146. service = BiRefNetService()
  147. log_data = []
  148. # Prepare folders
  149. folder_path = os.path.join(settings.MEDIA_ROOT, 'bulk_results', str(task_id))
  150. os.makedirs(folder_path, exist_ok=True)
  151. zip_filename = f"result_{task_id}.zip"
  152. zip_abs_path = os.path.join(folder_path, zip_filename)
  153. try:
  154. with zipfile.ZipFile(zip_abs_path, "w", zipfile.ZIP_DEFLATED) as master_zip:
  155. for item in file_list:
  156. name, content = item["name"], item["content"]
  157. try:
  158. # Process if it's a zip
  159. if name.lower().endswith('.zip'):
  160. with zipfile.ZipFile(io.BytesIO(content)) as user_zip:
  161. for inner_name in user_zip.namelist():
  162. if inner_name.lower().endswith(('.png', '.jpg', '.jpeg', '.webp')):
  163. with user_zip.open(inner_name) as inner_file:
  164. img = Image.open(io.BytesIO(inner_file.read())).convert("RGB")
  165. out = service.remove_background(img)
  166. buf = io.BytesIO()
  167. out.save(buf, format="PNG")
  168. master_zip.writestr(f"no_bg_{inner_name.split('/')[-1]}", buf.getvalue())
  169. log_data.append([inner_name, "SUCCESS", "Done"])
  170. # Process if it's an image
  171. else:
  172. img = Image.open(io.BytesIO(content)).convert("RGB")
  173. out = service.remove_background(img)
  174. buf = io.BytesIO()
  175. out.save(buf, format="PNG")
  176. clean_name = name.rsplit(".", 1)[0] + "_no_bg.png"
  177. master_zip.writestr(clean_name, buf.getvalue())
  178. log_data.append([name, "SUCCESS", "Done"])
  179. except Exception as e:
  180. log_data.append([name, "FAIL", str(e)])
  181. # Create CSV Log inside the Zip
  182. csv_buf = io.StringIO()
  183. writer = csv.writer(csv_buf)
  184. writer.writerow(["Filename", "Status", "Message"])
  185. writer.writerows(log_data)
  186. master_zip.writestr("processing_report.csv", csv_buf.getvalue())
  187. # 4. Update model to COMPLETED
  188. task = BackgroundTask.objects.get(task_id=task_id)
  189. task.status = 'COMPLETED'
  190. # Save the relative path for the FileField
  191. task.zip_file = f"bulk_results/{task_id}/{zip_filename}"
  192. task.save()
  193. except Exception as e:
  194. task = BackgroundTask.objects.get(task_id=task_id)
  195. task.status = 'FAILED'
  196. task.error_message = str(e)
  197. task.save()
  198. class TaskStatusAPI(APIView):
  199. def get(self, request, task_id):
  200. try:
  201. task = BackgroundTask.objects.get(task_id=task_id)
  202. return JsonResponse({
  203. "task_id": task.task_id,
  204. "status": task.status,
  205. "download_url": task.zip_file.url if task.zip_file else None,
  206. "error": task.error_message
  207. })
  208. except BackgroundTask.DoesNotExist:
  209. return JsonResponse({"error": "Task not found"}, status=404)
  210. class TaskHistoryListView(APIView):
  211. def get(self, request):
  212. # Fetch latest 20 tasks
  213. tasks = BackgroundTask.objects.all().order_by('-created_at')[:20]
  214. data = []
  215. for t in tasks:
  216. data.append({
  217. "task_id": str(t.task_id),
  218. "status": t.status,
  219. "created_at": t.created_at.strftime("%Y-%m-%d %H:%M"),
  220. "download_url": t.zip_file.url if t.zip_file else None
  221. })
  222. return HttpResponse(data)
  223. # Make sure this name matches what you put in urls.py
  224. class TaskHistoryAPI(APIView):
  225. def get(self, request):
  226. # Fetch latest 20 tasks
  227. tasks = BackgroundTask.objects.all().order_by('-created_at')[:20]
  228. data = []
  229. for t in tasks:
  230. data.append({
  231. "task_id": str(t.task_id),
  232. "status": t.status,
  233. "task_type": t.task_type,
  234. # Using 'date' key to match the JS 't.date' in the history page
  235. "date": t.created_at.strftime("%Y-%m-%d %H:%M"),
  236. "url": t.zip_file.url if t.zip_file else None
  237. })
  238. # Use JsonResponse so the frontend can read it as an object
  239. return JsonResponse(data, safe=False)
  240. class TaskHistoryPageView(APIView):
  241. def get(self, request):
  242. return render(request, "bg_remover_history.html")
  243. # from django.http import JsonResponse
  244. # def caption_image(request):
  245. # image_path = "media/test.jpg" # example image
  246. # caption = generate_caption(image_path)
  247. # return JsonResponse({"caption": caption})
  248. from .blip_service import BLIPServiceHF
  249. from .models import BackgroundTask
  250. class ImageCaptionView(APIView):
  251. parser_classes = [MultiPartParser]
  252. def get(self, request):
  253. return render(request, "caption_index.html")
  254. def post(self, request):
  255. if "image" not in request.FILES:
  256. return HttpResponse("No image provided", status=400)
  257. image = Image.open(request.FILES["image"]).convert("RGB")
  258. service = BLIPServiceHF() # Use HF version
  259. caption = service.generate_caption(image)
  260. return JsonResponse({
  261. "caption": caption
  262. })
  263. class BulkImageCaptionAPI(APIView):
  264. parser_classes = [MultiPartParser]
  265. def post(self, request):
  266. files = request.FILES.getlist("images")
  267. if not files:
  268. return JsonResponse({"error": "No images provided"}, status=400)
  269. task_id = uuid.uuid4()
  270. BackgroundTask.objects.create(
  271. task_id=task_id,
  272. task_type="CAPTION",
  273. status="PROCESSING"
  274. )
  275. file_list = [
  276. {"name": f.name, "content": f.read()}
  277. for f in files
  278. ]
  279. thread = threading.Thread(
  280. target=self.process_caption_logic,
  281. args=(task_id, file_list)
  282. )
  283. thread.start()
  284. return JsonResponse({
  285. "task_id": str(task_id),
  286. "message": "Caption task started",
  287. "status_url": f"/api/status/{task_id}/"
  288. }, status=202)
  289. def process_caption_logic(self, task_id, file_list):
  290. service = BLIPServiceHF() # Use HF version
  291. log_data = []
  292. folder_path = os.path.join(
  293. settings.MEDIA_ROOT, "caption_results", str(task_id)
  294. )
  295. os.makedirs(folder_path, exist_ok=True)
  296. zip_name = f"captions_{task_id}.zip"
  297. zip_path = os.path.join(folder_path, zip_name)
  298. try:
  299. with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf:
  300. for item in file_list:
  301. name, content = item["name"], item["content"]
  302. try:
  303. img = Image.open(io.BytesIO(content)).convert("RGB")
  304. caption = service.generate_caption(img)
  305. txt_name = name.rsplit(".", 1)[0] + ".txt"
  306. zipf.writestr(txt_name, caption)
  307. log_data.append([name, "SUCCESS", caption])
  308. except Exception as e:
  309. log_data.append([name, "FAIL", str(e)])
  310. # Write CSV log
  311. csv_buf = io.StringIO()
  312. writer = csv.writer(csv_buf)
  313. writer.writerow(["Filename", "Status", "Caption"])
  314. writer.writerows(log_data)
  315. zipf.writestr("captions.csv", csv_buf.getvalue())
  316. task = BackgroundTask.objects.get(task_id=task_id)
  317. task.status = "COMPLETED"
  318. task.zip_file = f"caption_results/{task_id}/{zip_name}"
  319. task.save()
  320. except Exception as e:
  321. task = BackgroundTask.objects.get(task_id=task_id)
  322. task.status = "FAILED"
  323. task.error_message = str(e)
  324. task.save()
  325. from .blip_service_itm import BLIPDecoderService
  326. class ImageCaptionITMView(APIView):
  327. parser_classes = [MultiPartParser]
  328. def get(self, request):
  329. return render(request, "caption_index.html")
  330. def post(self, request):
  331. if "image" not in request.FILES:
  332. return JsonResponse({"error": "No image provided"}, status=400)
  333. image = Image.open(request.FILES["image"]).convert("RGB")
  334. # Using the new Decoder Service
  335. service = BLIPDecoderService()
  336. caption = service.generate_caption(image)
  337. return JsonResponse({"caption": caption})
  338. class BulkImageCaptionITMAPI(APIView):
  339. parser_classes = [MultiPartParser]
  340. def post(self, request):
  341. files = request.FILES.getlist("images")
  342. if not files:
  343. return JsonResponse({"error": "No images provided"}, status=400)
  344. task_id = uuid.uuid4()
  345. BackgroundTask.objects.create(
  346. task_id=task_id,
  347. task_type="CAPTION",
  348. status="PROCESSING"
  349. )
  350. file_list = [{"name": f.name, "content": f.read()} for f in files]
  351. thread = threading.Thread(
  352. target=self.process_caption_logic,
  353. args=(task_id, file_list)
  354. )
  355. thread.start()
  356. return JsonResponse({
  357. "task_id": str(task_id),
  358. "status_url": f"/api/status/{task_id}/"
  359. }, status=202)
  360. def process_caption_logic(self, task_id, file_list):
  361. service = BLIPDecoderService() # Singleton ensures model isn't reloaded
  362. log_data = []
  363. folder_path = os.path.join(settings.MEDIA_ROOT, "caption_results", str(task_id))
  364. os.makedirs(folder_path, exist_ok=True)
  365. zip_path = os.path.join(folder_path, f"captions_{task_id}.zip")
  366. try:
  367. with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf:
  368. for item in file_list:
  369. try:
  370. img = Image.open(io.BytesIO(item["content"])).convert("RGB")
  371. caption = service.generate_caption(img)
  372. txt_name = item["name"].rsplit(".", 1)[0] + ".txt"
  373. zipf.writestr(txt_name, caption)
  374. log_data.append([item["name"], "SUCCESS", caption])
  375. except Exception as e:
  376. log_data.append([item["name"], "FAIL", str(e)])
  377. # CSV Report
  378. csv_buf = io.StringIO()
  379. writer = csv.writer(csv_buf)
  380. writer.writerow(["Filename", "Status", "Caption"])
  381. writer.writerows(log_data)
  382. zipf.writestr("captions_report.csv", csv_buf.getvalue())
  383. task = BackgroundTask.objects.get(task_id=task_id)
  384. task.status = "COMPLETED"
  385. task.zip_file = f"caption_results/{task_id}/captions_{task_id}.zip"
  386. task.save()
  387. except Exception as e:
  388. task = BackgroundTask.objects.get(task_id=task_id)
  389. task.status = "FAILED"
  390. task.error_message = str(e)
  391. task.save()