"""Worker activity router — exposes recent background task status.""" from datetime import datetime from fastapi import APIRouter, Depends from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from sqlalchemy.orm import selectinload from pydantic import BaseModel from app.database import get_db from app.models.cad_file import CadFile, ProcessingStatus from app.models.order_item import OrderItem from app.models.order import Order from app.models.order_line import OrderLine from app.models.product import Product from app.models.user import User from app.utils.auth import get_current_user, require_admin_or_pm router = APIRouter(prefix="/worker", tags=["worker"]) class CadActivityEntry(BaseModel): cad_file_id: str original_name: str file_size: int | None processing_status: str error_message: str | None updated_at: str created_at: str order_numbers: list[str] render_log: dict | None class RenderJobEntry(BaseModel): order_line_id: str order_id: str | None order_number: str | None product_name: str | None output_type_name: str | None render_status: str render_backend_used: str | None render_started_at: str | None render_completed_at: str | None updated_at: str class WorkerActivity(BaseModel): cad_processing: list[CadActivityEntry] active_count: int # files currently in "processing" state failed_count: int # files in "failed" state (recent 50) render_jobs: list[RenderJobEntry] = [] render_active_count: int = 0 render_failed_count: int = 0 @router.get("/activity", response_model=WorkerActivity) async def get_worker_activity( user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """ Return recent CAD file processing activity. Shows the last 30 processed/failed/processing CAD files so the user can see what the worker is doing without needing Flower or Celery logs. """ # Recent CadFiles ordered by last update, with order_items to resolve order numbers result = await db.execute( select(CadFile) .order_by(CadFile.updated_at.desc()) .limit(30) ) cad_files = result.scalars().all() if not cad_files: return WorkerActivity(cad_processing=[], active_count=0, failed_count=0) # Fetch order items referencing these CAD files in one query cad_ids = [cf.id for cf in cad_files] items_result = await db.execute( select(OrderItem) .options(selectinload(OrderItem.order)) .where(OrderItem.cad_file_id.in_(cad_ids)) ) items = items_result.scalars().all() # Build cad_file_id → list[order_number] mapping from collections import defaultdict cad_to_orders: dict[str, list[str]] = defaultdict(list) for item in items: if item.order and item.order.order_number: key = str(item.cad_file_id) if item.order.order_number not in cad_to_orders[key]: cad_to_orders[key].append(item.order.order_number) entries = [] for cf in cad_files: entries.append(CadActivityEntry( cad_file_id=str(cf.id), original_name=cf.original_name or "unknown", file_size=getattr(cf, "file_size", None), processing_status=cf.processing_status.value if cf.processing_status else "unknown", error_message=getattr(cf, "error_message", None), updated_at=cf.updated_at.isoformat() if cf.updated_at else datetime.utcnow().isoformat(), created_at=cf.created_at.isoformat() if cf.created_at else datetime.utcnow().isoformat(), order_numbers=cad_to_orders.get(str(cf.id), []), render_log=getattr(cf, "render_log", None), )) active_count = sum( 1 for cf in cad_files if cf.processing_status == ProcessingStatus.processing ) failed_count = sum( 1 for cf in cad_files if cf.processing_status == ProcessingStatus.failed ) # ── Render job activity ────────────────────────────────────────────── render_result = await db.execute( select(OrderLine) .options( selectinload(OrderLine.product), selectinload(OrderLine.output_type), selectinload(OrderLine.order), ) .where(OrderLine.output_type_id.isnot(None)) .where(OrderLine.render_status != "pending") .order_by(OrderLine.updated_at.desc()) .limit(30) ) render_lines = render_result.scalars().all() render_entries = [] for rl in render_lines: render_entries.append(RenderJobEntry( order_line_id=str(rl.id), order_id=str(rl.order_id) if rl.order_id else None, order_number=rl.order.order_number if rl.order else None, product_name=rl.product.name if rl.product else None, output_type_name=rl.output_type.name if rl.output_type else None, render_status=rl.render_status, render_backend_used=rl.render_backend_used, render_started_at=rl.render_started_at.isoformat() if rl.render_started_at else None, render_completed_at=rl.render_completed_at.isoformat() if rl.render_completed_at else None, updated_at=rl.updated_at.isoformat(), )) render_active = sum(1 for rl in render_lines if rl.render_status == "processing") render_failed = sum(1 for rl in render_lines if rl.render_status == "failed") return WorkerActivity( cad_processing=entries, active_count=active_count, failed_count=failed_count, render_jobs=render_entries, render_active_count=render_active, render_failed_count=render_failed, ) @router.get("/render-log/{order_line_id}") async def get_render_log( order_line_id: str, after: int = 0, user: User = Depends(get_current_user), ): """Return render log entries for an order line (polling fallback).""" from app.services.render_log import get_entries, count entries = get_entries(order_line_id, after_index=after) total = count(order_line_id) return {"entries": entries, "total": total, "next_after": total} @router.get("/render-log/{order_line_id}/stream") async def stream_render_log( order_line_id: str, user: User = Depends(get_current_user), ): """SSE stream of render log entries for an order line.""" import asyncio import json from fastapi.responses import StreamingResponse from app.services.render_log import get_entries, count async def event_generator(): cursor = 0 idle_ticks = 0 max_idle = 120 # stop after 2 minutes of no new entries while idle_ticks < max_idle: entries = get_entries(order_line_id, after_index=cursor) if entries: idle_ticks = 0 for entry in entries: yield f"data: {json.dumps(entry)}\n\n" cursor += len(entries) else: idle_ticks += 1 await asyncio.sleep(1) yield f"data: {json.dumps({'level': 'info', 'msg': 'Stream ended (idle timeout)', 't': ''})}\n\n" return StreamingResponse( event_generator(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, ) from fastapi import status as http_status @router.post("/activity/{cad_file_id}/reprocess", status_code=http_status.HTTP_202_ACCEPTED) async def reprocess_cad_file( cad_file_id: str, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Re-queue a CAD file for full processing (STEP extraction + thumbnail + glTF).""" result = await db.execute(select(CadFile).where(CadFile.id == cad_file_id)) cad_file = result.scalar_one_or_none() if not cad_file: from fastapi import HTTPException raise HTTPException(404, detail="CAD file not found") cad_file.processing_status = ProcessingStatus.pending await db.commit() from app.tasks.step_tasks import process_step_file process_step_file.delay(cad_file_id) return {"queued": cad_file_id, "task": "process_step_file"} # --------------------------------------------------------------------------- # Queue inspection + control # --------------------------------------------------------------------------- MONITORED_QUEUES = ["step_processing", "thumbnail_rendering", "ai_validation"] def _parse_redis_task(raw: str) -> dict | None: """Parse a raw Redis Celery message into a simplified dict.""" import json, base64 try: msg = json.loads(raw) headers = msg.get("headers", {}) task_name = headers.get("task", "unknown") task_id = headers.get("id", "unknown") argsrepr = headers.get("argsrepr", "") args: list = [] try: body = json.loads(base64.b64decode(msg.get("body", ""))) if isinstance(body, list) and body: args = list(body[0]) except Exception: pass return { "task_id": task_id, "task_name": task_name, "args": args, "argsrepr": argsrepr, "status": "pending", } except Exception: return None @router.get("/queue") async def get_queue_status(user: User = Depends(get_current_user)): """Return Celery queue depths, pending tasks, and active/reserved tasks.""" import asyncio import redis as redis_lib from app.config import settings as app_settings from app.tasks.celery_app import celery_app r = redis_lib.from_url(app_settings.redis_url, decode_responses=True) # Pending tasks per queue from Redis queue_depths: dict[str, int] = {} pending: list[dict] = [] for q in MONITORED_QUEUES: depth = r.llen(q) or 0 queue_depths[q] = depth if depth > 0: raw_items = r.lrange(q, 0, 99) for raw in raw_items: task = _parse_redis_task(raw) if task: task["queue"] = q pending.append(task) # Active / reserved from Celery inspect (runs in thread, 1.5 s timeout) active: list[dict] = [] reserved: list[dict] = [] def _inspect() -> tuple[dict, dict]: try: insp = celery_app.control.inspect(timeout=1.5) return (insp.active() or {}), (insp.reserved() or {}) except Exception: return {}, {} act_raw, rsv_raw = await asyncio.to_thread(_inspect) for worker, tasks in act_raw.items(): for t in (tasks or []): active.append({ "task_id": t.get("id", ""), "task_name": t.get("name", ""), "args": list(t.get("args") or []), "argsrepr": t.get("kwargs", {}).get("argsrepr", ""), "status": "active", "worker": worker, }) for worker, tasks in rsv_raw.items(): for t in (tasks or []): reserved.append({ "task_id": t.get("id", ""), "task_name": t.get("name", ""), "args": list(t.get("args") or []), "argsrepr": "", "status": "reserved", "worker": worker, }) return { "queue_depths": queue_depths, "pending_count": sum(queue_depths.values()), "active": active, "reserved": reserved, "pending": pending, } @router.post("/queue/purge", status_code=http_status.HTTP_202_ACCEPTED) async def purge_queue(user: User = Depends(require_admin_or_pm)): """Delete all pending tasks from all monitored queues.""" import redis as redis_lib from app.config import settings as app_settings r = redis_lib.from_url(app_settings.redis_url, decode_responses=True) total = 0 for q in MONITORED_QUEUES: count = r.llen(q) or 0 if count: r.delete(q) total += count return {"purged": total, "message": f"Removed {total} pending task(s) from queue"} @router.post("/queue/cancel/{task_id}", status_code=http_status.HTTP_202_ACCEPTED) async def cancel_task(task_id: str, user: User = Depends(require_admin_or_pm)): """Revoke a task by ID. Terminates it if running, skips it if still pending.""" from app.tasks.celery_app import celery_app celery_app.control.revoke(task_id, terminate=True, signal="SIGTERM") return {"revoked": task_id} # --------------------------------------------------------------------------- # Render health check # --------------------------------------------------------------------------- class RenderHealthStatus(BaseModel): status: str # "ok" | "degraded" | "down" render_worker_connected: bool blender_available: bool thumbnail_queue_depth: int thumbnail_queue_ok: bool last_render_at: str | None last_render_success: bool | None last_render_age_minutes: float | None details: dict @router.get("/health/render", response_model=RenderHealthStatus) async def render_health( user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Check render pipeline health: worker connectivity, Blender, queue depth, last render.""" import asyncio import redis as redis_lib from app.config import settings as app_settings from app.tasks.celery_app import celery_app from app.models.order_line import OrderLine details: dict = {} # 1. Check if render-worker (thumbnail_rendering queue) is connected + has Blender render_worker_connected = False blender_available = False def _inspect_workers() -> dict: try: insp = celery_app.control.inspect(timeout=2.0) ping = insp.ping() or {} active_queues = insp.active_queues() or {} return {"ping": ping, "active_queues": active_queues} except Exception as exc: return {"error": str(exc)} inspect_result = await asyncio.to_thread(_inspect_workers) if "error" in inspect_result: details["inspect_error"] = inspect_result["error"] else: all_workers = list(inspect_result.get("ping", {}).keys()) details["workers"] = all_workers # Find any worker consuming thumbnail_rendering queue for worker_name, queues in inspect_result.get("active_queues", {}).items(): queue_names = [q.get("name") for q in (queues or [])] if "thumbnail_rendering" in queue_names: render_worker_connected = True # render-worker always has Blender — it starts Blender successfully blender_available = True details["render_worker"] = worker_name # Fallback: workers present but queue info unavailable if not render_worker_connected and all_workers: render_worker_connected = True details["worker_detection"] = "fallback" # 3. Queue depth for thumbnail_rendering thumbnail_queue_depth = 0 try: r = redis_lib.from_url(app_settings.redis_url, decode_responses=True) thumbnail_queue_depth = r.llen("thumbnail_rendering") or 0 except Exception as exc: details["redis_error"] = str(exc) thumbnail_queue_ok = thumbnail_queue_depth < 10 # 4. Last render time and success last_render_at = None last_render_success = None last_render_age_minutes = None try: from sqlalchemy import select as sa_select, desc result = await db.execute( sa_select(OrderLine.render_completed_at, OrderLine.render_status) .where(OrderLine.render_completed_at.isnot(None)) .order_by(desc(OrderLine.render_completed_at)) .limit(1) ) row = result.first() if row: last_render_at = row[0].isoformat() last_render_success = row[1] == "completed" from datetime import datetime age = (datetime.utcnow() - row[0]).total_seconds() / 60 last_render_age_minutes = round(age, 1) except Exception as exc: details["db_error"] = str(exc) # Determine overall status if not render_worker_connected or not blender_available: status = "down" elif not thumbnail_queue_ok: status = "degraded" elif last_render_success is False and last_render_age_minutes is not None and last_render_age_minutes < 30: status = "degraded" else: status = "ok" return RenderHealthStatus( status=status, render_worker_connected=render_worker_connected, blender_available=blender_available, thumbnail_queue_depth=thumbnail_queue_depth, thumbnail_queue_ok=thumbnail_queue_ok, last_render_at=last_render_at, last_render_success=last_render_success, last_render_age_minutes=last_render_age_minutes, details=details, )