"""Worker activity router — exposes recent background task status.""" from datetime import datetime from typing import Optional from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from sqlalchemy.orm import selectinload from pydantic import BaseModel from app.database import get_db from app.models.cad_file import CadFile, ProcessingStatus from app.models.order_item import OrderItem from app.models.order import Order from app.models.order_line import OrderLine from app.models.product import Product from app.models.user import User from app.models.worker_config import WorkerConfig from app.models.system_setting import SystemSetting from app.utils.auth import get_current_user, require_admin_or_pm, require_admin router = APIRouter(prefix="/worker", tags=["worker"]) class CadActivityEntry(BaseModel): cad_file_id: str original_name: str file_size: int | None processing_status: str error_message: str | None updated_at: str created_at: str order_numbers: list[str] render_log: dict | None class RenderJobEntry(BaseModel): order_line_id: str order_id: str | None order_number: str | None product_name: str | None output_type_name: str | None render_status: str render_backend_used: str | None render_started_at: str | None render_completed_at: str | None updated_at: str class WorkerActivity(BaseModel): cad_processing: list[CadActivityEntry] active_count: int # files currently in "processing" state failed_count: int # files in "failed" state (recent 50) render_jobs: list[RenderJobEntry] = [] render_active_count: int = 0 render_failed_count: int = 0 @router.get("/activity", response_model=WorkerActivity) async def get_worker_activity( user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """ Return recent CAD file processing activity. Shows the last 30 processed/failed/processing CAD files so the user can see what the worker is doing without needing Flower or Celery logs. """ # Recent CadFiles ordered by last update, with order_items to resolve order numbers result = await db.execute( select(CadFile) .order_by(CadFile.updated_at.desc()) .limit(30) ) cad_files = result.scalars().all() if not cad_files: return WorkerActivity(cad_processing=[], active_count=0, failed_count=0) # Fetch order items referencing these CAD files in one query cad_ids = [cf.id for cf in cad_files] items_result = await db.execute( select(OrderItem) .options(selectinload(OrderItem.order)) .where(OrderItem.cad_file_id.in_(cad_ids)) ) items = items_result.scalars().all() # Build cad_file_id → list[order_number] mapping from collections import defaultdict cad_to_orders: dict[str, list[str]] = defaultdict(list) for item in items: if item.order and item.order.order_number: key = str(item.cad_file_id) if item.order.order_number not in cad_to_orders[key]: cad_to_orders[key].append(item.order.order_number) entries = [] for cf in cad_files: entries.append(CadActivityEntry( cad_file_id=str(cf.id), original_name=cf.original_name or "unknown", file_size=getattr(cf, "file_size", None), processing_status=cf.processing_status.value if cf.processing_status else "unknown", error_message=getattr(cf, "error_message", None), updated_at=cf.updated_at.isoformat() if cf.updated_at else datetime.utcnow().isoformat(), created_at=cf.created_at.isoformat() if cf.created_at else datetime.utcnow().isoformat(), order_numbers=cad_to_orders.get(str(cf.id), []), render_log=getattr(cf, "render_log", None), )) active_count = sum( 1 for cf in cad_files if cf.processing_status == ProcessingStatus.processing ) failed_count = sum( 1 for cf in cad_files if cf.processing_status == ProcessingStatus.failed ) # ── Render job activity ────────────────────────────────────────────── render_result = await db.execute( select(OrderLine) .options( selectinload(OrderLine.product), selectinload(OrderLine.output_type), selectinload(OrderLine.order), ) .where(OrderLine.output_type_id.isnot(None)) .where(OrderLine.render_status != "pending") .order_by(OrderLine.updated_at.desc()) .limit(30) ) render_lines = render_result.scalars().all() render_entries = [] for rl in render_lines: render_entries.append(RenderJobEntry( order_line_id=str(rl.id), order_id=str(rl.order_id) if rl.order_id else None, order_number=rl.order.order_number if rl.order else None, product_name=rl.product.name if rl.product else None, output_type_name=rl.output_type.name if rl.output_type else None, render_status=rl.render_status, render_backend_used=rl.render_backend_used, render_started_at=rl.render_started_at.isoformat() if rl.render_started_at else None, render_completed_at=rl.render_completed_at.isoformat() if rl.render_completed_at else None, updated_at=rl.updated_at.isoformat(), )) render_active = sum(1 for rl in render_lines if rl.render_status == "processing") render_failed = sum(1 for rl in render_lines if rl.render_status == "failed") return WorkerActivity( cad_processing=entries, active_count=active_count, failed_count=failed_count, render_jobs=render_entries, render_active_count=render_active, render_failed_count=render_failed, ) @router.get("/render-log/{order_line_id}") async def get_render_log( order_line_id: str, after: int = 0, user: User = Depends(get_current_user), ): """Return render log entries for an order line (polling fallback).""" from app.services.render_log import get_entries, count entries = get_entries(order_line_id, after_index=after) total = count(order_line_id) return {"entries": entries, "total": total, "next_after": total} @router.get("/render-log/{order_line_id}/stream") async def stream_render_log( order_line_id: str, user: User = Depends(get_current_user), ): """SSE stream of render log entries for an order line.""" import asyncio import json from fastapi.responses import StreamingResponse from app.services.render_log import get_entries, count async def event_generator(): cursor = 0 idle_ticks = 0 max_idle = 120 # stop after 2 minutes of no new entries while idle_ticks < max_idle: entries = get_entries(order_line_id, after_index=cursor) if entries: idle_ticks = 0 for entry in entries: yield f"data: {json.dumps(entry)}\n\n" cursor += len(entries) else: idle_ticks += 1 await asyncio.sleep(1) yield f"data: {json.dumps({'level': 'info', 'msg': 'Stream ended (idle timeout)', 't': ''})}\n\n" return StreamingResponse( event_generator(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, ) from fastapi import status as http_status @router.post("/activity/{cad_file_id}/reprocess", status_code=http_status.HTTP_202_ACCEPTED) async def reprocess_cad_file( cad_file_id: str, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Re-queue a CAD file for full processing (STEP extraction + thumbnail + glTF).""" result = await db.execute(select(CadFile).where(CadFile.id == cad_file_id)) cad_file = result.scalar_one_or_none() if not cad_file: from fastapi import HTTPException raise HTTPException(404, detail="CAD file not found") cad_file.processing_status = ProcessingStatus.pending await db.commit() from app.tasks.step_tasks import process_step_file process_step_file.delay(cad_file_id) return {"queued": cad_file_id, "task": "process_step_file"} # --------------------------------------------------------------------------- # Queue inspection + control # --------------------------------------------------------------------------- MONITORED_QUEUES = ["step_processing", "thumbnail_rendering", "ai_validation"] def _parse_redis_task(raw: str) -> dict | None: """Parse a raw Redis Celery message into a simplified dict.""" import json, base64 try: msg = json.loads(raw) headers = msg.get("headers", {}) task_name = headers.get("task", "unknown") task_id = headers.get("id", "unknown") argsrepr = headers.get("argsrepr", "") args: list = [] try: body = json.loads(base64.b64decode(msg.get("body", ""))) if isinstance(body, list) and body: args = list(body[0]) except Exception: pass return { "task_id": task_id, "task_name": task_name, "args": args, "argsrepr": argsrepr, "status": "pending", } except Exception: return None @router.get("/queue") async def get_queue_status(user: User = Depends(get_current_user)): """Return Celery queue depths, pending tasks, and active/reserved tasks.""" import asyncio import redis as redis_lib from app.config import settings as app_settings from app.tasks.celery_app import celery_app r = redis_lib.from_url(app_settings.redis_url, decode_responses=True) # Pending tasks per queue from Redis queue_depths: dict[str, int] = {} pending: list[dict] = [] for q in MONITORED_QUEUES: depth = r.llen(q) or 0 queue_depths[q] = depth if depth > 0: raw_items = r.lrange(q, 0, 99) for raw in raw_items: task = _parse_redis_task(raw) if task: task["queue"] = q pending.append(task) # Active / reserved from Celery inspect (runs in thread, 1.5 s timeout) active: list[dict] = [] reserved: list[dict] = [] def _inspect() -> tuple[dict, dict]: try: insp = celery_app.control.inspect(timeout=1.5) return (insp.active() or {}), (insp.reserved() or {}) except Exception: return {}, {} act_raw, rsv_raw = await asyncio.to_thread(_inspect) for worker, tasks in act_raw.items(): for t in (tasks or []): active.append({ "task_id": t.get("id", ""), "task_name": t.get("name", ""), "args": list(t.get("args") or []), "argsrepr": t.get("kwargs", {}).get("argsrepr", ""), "status": "active", "worker": worker, }) for worker, tasks in rsv_raw.items(): for t in (tasks or []): reserved.append({ "task_id": t.get("id", ""), "task_name": t.get("name", ""), "args": list(t.get("args") or []), "argsrepr": "", "status": "reserved", "worker": worker, }) return { "queue_depths": queue_depths, "pending_count": sum(queue_depths.values()), "active": active, "reserved": reserved, "pending": pending, } @router.post("/queue/purge", status_code=http_status.HTTP_202_ACCEPTED) async def purge_queue(user: User = Depends(require_admin_or_pm)): """Delete all pending tasks from all monitored queues.""" import redis as redis_lib from app.config import settings as app_settings r = redis_lib.from_url(app_settings.redis_url, decode_responses=True) total = 0 for q in MONITORED_QUEUES: count = r.llen(q) or 0 if count: r.delete(q) total += count return {"purged": total, "message": f"Removed {total} pending task(s) from queue"} @router.post("/queue/cancel/{task_id}", status_code=http_status.HTTP_202_ACCEPTED) async def cancel_task(task_id: str, user: User = Depends(require_admin_or_pm)): """Revoke a task by ID. Terminates it if running, skips it if still pending.""" from app.tasks.celery_app import celery_app celery_app.control.revoke(task_id, terminate=True, signal="SIGTERM") return {"revoked": task_id} # --------------------------------------------------------------------------- # Worker management — list workers + scale # --------------------------------------------------------------------------- class ScaleRequest(BaseModel): service: str # "render-worker" | "worker" | "worker-thumbnail" count: int # 0–20 @router.get("/celery-workers") async def get_celery_workers(user: User = Depends(require_admin_or_pm)): """List active Celery workers with their queues and active task counts.""" import asyncio from app.tasks.celery_app import celery_app def _inspect() -> dict: try: insp = celery_app.control.inspect(timeout=2.0) return { "active_queues": insp.active_queues() or {}, "active": insp.active() or {}, "stats": insp.stats() or {}, } except Exception as exc: return {"error": str(exc)} data = await asyncio.to_thread(_inspect) if "error" in data: return {"workers": [], "error": data["error"]} workers = [] for worker_name, queues in data.get("active_queues", {}).items(): queue_names = [q.get("name") for q in (queues or [])] active_tasks = data.get("active", {}).get(worker_name, []) stats = data.get("stats", {}).get(worker_name, {}) workers.append({ "name": worker_name, "queues": queue_names, "active_task_count": len(active_tasks), "active_tasks": [ {"name": t.get("name"), "id": t.get("id")} for t in active_tasks ], "total_tasks_processed": stats.get("total", {}), }) return {"workers": workers} @router.post("/scale", status_code=http_status.HTTP_202_ACCEPTED) async def scale_workers( body: ScaleRequest, user: User = Depends(require_admin_or_pm), ): """Scale a Compose service (render-worker, worker, worker-thumbnail) up or down. Requires the docker socket and compose file to be accessible inside the container (see docker-compose.yml COMPOSE_PROJECT_DIR env var). """ import asyncio import os import subprocess from fastapi import HTTPException ALLOWED_SERVICES = {"render-worker", "worker", "worker-thumbnail"} if body.service not in ALLOWED_SERVICES: raise HTTPException(400, detail=f"service must be one of {ALLOWED_SERVICES}") if not (0 <= body.count <= 20): raise HTTPException(400, detail="count must be between 0 and 20") compose_dir = os.environ.get("COMPOSE_PROJECT_DIR", "/compose") compose_file = os.path.join(compose_dir, "docker-compose.yml") def _scale() -> subprocess.CompletedProcess: return subprocess.run( [ "docker", "compose", "-f", compose_file, "up", "--scale", f"{body.service}={body.count}", "--no-recreate", "-d", ], capture_output=True, text=True, timeout=120, ) try: result = await asyncio.to_thread(_scale) except subprocess.TimeoutExpired: raise HTTPException(504, detail="Scale operation timed out") if result.returncode != 0: raise HTTPException( 500, detail=f"docker compose scale failed: {result.stderr[-500:]}", ) return {"service": body.service, "count": body.count, "status": "scaling"} # --------------------------------------------------------------------------- # GPU probe # --------------------------------------------------------------------------- @router.post("/probe/gpu", status_code=http_status.HTTP_202_ACCEPTED) async def trigger_gpu_probe(current_user: User = Depends(require_admin)): """Queue a GPU probe task on the render-worker.""" from app.tasks.gpu_tasks import probe_gpu result = probe_gpu.delay() return {"task_id": str(result.id), "queued": True} @router.get("/probe/gpu/result") async def get_gpu_probe_result( current_user: User = Depends(require_admin), db: AsyncSession = Depends(get_db), ): """Return the last GPU probe result from system_settings.""" import json row = await db.execute( select(SystemSetting).where(SystemSetting.key == "gpu_probe_last_result") ) setting = row.scalar_one_or_none() if not setting: return {"status": "unknown", "message": "No probe run yet. Click Run GPU Check."} return json.loads(setting.value) # --------------------------------------------------------------------------- # Render health check # --------------------------------------------------------------------------- class RenderHealthStatus(BaseModel): status: str # "ok" | "degraded" | "down" render_worker_connected: bool blender_available: bool thumbnail_queue_depth: int thumbnail_queue_ok: bool last_render_at: str | None last_render_success: bool | None last_render_age_minutes: float | None details: dict @router.get("/health/render", response_model=RenderHealthStatus) async def render_health( user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Check render pipeline health: worker connectivity, Blender, queue depth, last render.""" import asyncio import redis as redis_lib from app.config import settings as app_settings from app.tasks.celery_app import celery_app from app.models.order_line import OrderLine details: dict = {} # 1. Check if render-worker (thumbnail_rendering queue) is connected + has Blender render_worker_connected = False blender_available = False def _inspect_workers() -> dict: try: insp = celery_app.control.inspect(timeout=2.0) ping = insp.ping() or {} active_queues = insp.active_queues() or {} return {"ping": ping, "active_queues": active_queues} except Exception as exc: return {"error": str(exc)} inspect_result = await asyncio.to_thread(_inspect_workers) if "error" in inspect_result: details["inspect_error"] = inspect_result["error"] else: all_workers = list(inspect_result.get("ping", {}).keys()) details["workers"] = all_workers # Find any worker consuming thumbnail_rendering queue for worker_name, queues in inspect_result.get("active_queues", {}).items(): queue_names = [q.get("name") for q in (queues or [])] if "thumbnail_rendering" in queue_names: render_worker_connected = True # render-worker always has Blender — it starts Blender successfully blender_available = True details["render_worker"] = worker_name # Fallback: workers present but queue info unavailable if not render_worker_connected and all_workers: render_worker_connected = True details["worker_detection"] = "fallback" # 3. Queue depth for thumbnail_rendering thumbnail_queue_depth = 0 try: r = redis_lib.from_url(app_settings.redis_url, decode_responses=True) thumbnail_queue_depth = r.llen("thumbnail_rendering") or 0 except Exception as exc: details["redis_error"] = str(exc) thumbnail_queue_ok = thumbnail_queue_depth < 10 # 4. Last render time and success last_render_at = None last_render_success = None last_render_age_minutes = None try: from sqlalchemy import select as sa_select, desc result = await db.execute( sa_select(OrderLine.render_completed_at, OrderLine.render_status) .where(OrderLine.render_completed_at.isnot(None)) .order_by(desc(OrderLine.render_completed_at)) .limit(1) ) row = result.first() if row: last_render_at = row[0].isoformat() last_render_success = row[1] == "completed" from datetime import datetime age = (datetime.utcnow() - row[0]).total_seconds() / 60 last_render_age_minutes = round(age, 1) except Exception as exc: details["db_error"] = str(exc) # Determine overall status if not render_worker_connected or not blender_available: status = "down" elif not thumbnail_queue_ok: status = "degraded" elif last_render_success is False and last_render_age_minutes is not None and last_render_age_minutes < 30: status = "degraded" else: status = "ok" return RenderHealthStatus( status=status, render_worker_connected=render_worker_connected, blender_available=blender_available, thumbnail_queue_depth=thumbnail_queue_depth, thumbnail_queue_ok=thumbnail_queue_ok, last_render_at=last_render_at, last_render_success=last_render_success, last_render_age_minutes=last_render_age_minutes, details=details, ) # --------------------------------------------------------------------------- # Worker concurrency configuration # --------------------------------------------------------------------------- class WorkerConfigOut(BaseModel): queue_name: str max_concurrency: int min_concurrency: int enabled: bool updated_at: str class WorkerConfigUpdate(BaseModel): max_concurrency: Optional[int] = None min_concurrency: Optional[int] = None enabled: Optional[bool] = None @router.get("/configs", response_model=list[WorkerConfigOut]) async def list_worker_configs( user: User = Depends(require_admin), db: AsyncSession = Depends(get_db), ): """List all worker concurrency configurations (admin only).""" result = await db.execute(select(WorkerConfig).order_by(WorkerConfig.queue_name)) configs = result.scalars().all() return [ WorkerConfigOut( queue_name=cfg.queue_name, max_concurrency=cfg.max_concurrency, min_concurrency=cfg.min_concurrency, enabled=cfg.enabled, updated_at=cfg.updated_at.isoformat() if cfg.updated_at else datetime.utcnow().isoformat(), ) for cfg in configs ] @router.put("/configs/{queue_name}", response_model=WorkerConfigOut) async def update_worker_config( queue_name: str, body: WorkerConfigUpdate, user: User = Depends(require_admin), db: AsyncSession = Depends(get_db), ): """Update concurrency settings for a specific queue (admin only).""" result = await db.execute( select(WorkerConfig).where(WorkerConfig.queue_name == queue_name) ) cfg = result.scalar_one_or_none() if not cfg: raise HTTPException(404, detail=f"No worker config found for queue '{queue_name}'") if body.max_concurrency is not None: if body.max_concurrency < 1: raise HTTPException(400, detail="max_concurrency must be >= 1") cfg.max_concurrency = body.max_concurrency if body.min_concurrency is not None: if body.min_concurrency < 1: raise HTTPException(400, detail="min_concurrency must be >= 1") cfg.min_concurrency = body.min_concurrency if body.enabled is not None: cfg.enabled = body.enabled # Validate min <= max after updates if cfg.min_concurrency > cfg.max_concurrency: raise HTTPException(400, detail="min_concurrency cannot exceed max_concurrency") cfg.updated_at = datetime.utcnow() await db.commit() await db.refresh(cfg) return WorkerConfigOut( queue_name=cfg.queue_name, max_concurrency=cfg.max_concurrency, min_concurrency=cfg.min_concurrency, enabled=cfg.enabled, updated_at=cfg.updated_at.isoformat(), )