381f44bc8b
- GET /api/worker/health/render: checks render-worker (thumbnail_rendering queue), Blender availability via active_queues inspect, queue depth, last render recency — returns ok/degraded/down status - scripts/test_render_pipeline.py: integration test for full pipeline (--health, --sample, --full modes) - PLAN.md: appended Render Pipeline Fixes section with all B-Fixes - LEARNINGS.md: documented 5 new learnings (queue mismatch, circular import, 307 redirect, worker capability detection) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
472 lines
17 KiB
Python
472 lines
17 KiB
Python
"""Worker activity router — exposes recent background task status."""
|
|
from datetime import datetime
|
|
|
|
from fastapi import APIRouter, Depends
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select
|
|
from sqlalchemy.orm import selectinload
|
|
from pydantic import BaseModel
|
|
|
|
from app.database import get_db
|
|
from app.models.cad_file import CadFile, ProcessingStatus
|
|
from app.models.order_item import OrderItem
|
|
from app.models.order import Order
|
|
from app.models.order_line import OrderLine
|
|
from app.models.product import Product
|
|
from app.models.user import User
|
|
from app.utils.auth import get_current_user, require_admin_or_pm
|
|
|
|
router = APIRouter(prefix="/worker", tags=["worker"])
|
|
|
|
|
|
class CadActivityEntry(BaseModel):
|
|
cad_file_id: str
|
|
original_name: str
|
|
file_size: int | None
|
|
processing_status: str
|
|
error_message: str | None
|
|
updated_at: str
|
|
created_at: str
|
|
order_numbers: list[str]
|
|
render_log: dict | None
|
|
|
|
|
|
class RenderJobEntry(BaseModel):
|
|
order_line_id: str
|
|
order_number: str | None
|
|
product_name: str | None
|
|
output_type_name: str | None
|
|
render_status: str
|
|
render_backend_used: str | None
|
|
render_started_at: str | None
|
|
render_completed_at: str | None
|
|
updated_at: str
|
|
|
|
|
|
class WorkerActivity(BaseModel):
|
|
cad_processing: list[CadActivityEntry]
|
|
active_count: int # files currently in "processing" state
|
|
failed_count: int # files in "failed" state (recent 50)
|
|
render_jobs: list[RenderJobEntry] = []
|
|
render_active_count: int = 0
|
|
render_failed_count: int = 0
|
|
|
|
|
|
@router.get("/activity", response_model=WorkerActivity)
|
|
async def get_worker_activity(
|
|
user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""
|
|
Return recent CAD file processing activity.
|
|
|
|
Shows the last 30 processed/failed/processing CAD files so the user can
|
|
see what the worker is doing without needing Flower or Celery logs.
|
|
"""
|
|
# Recent CadFiles ordered by last update, with order_items to resolve order numbers
|
|
result = await db.execute(
|
|
select(CadFile)
|
|
.order_by(CadFile.updated_at.desc())
|
|
.limit(30)
|
|
)
|
|
cad_files = result.scalars().all()
|
|
|
|
if not cad_files:
|
|
return WorkerActivity(cad_processing=[], active_count=0, failed_count=0)
|
|
|
|
# Fetch order items referencing these CAD files in one query
|
|
cad_ids = [cf.id for cf in cad_files]
|
|
items_result = await db.execute(
|
|
select(OrderItem)
|
|
.options(selectinload(OrderItem.order))
|
|
.where(OrderItem.cad_file_id.in_(cad_ids))
|
|
)
|
|
items = items_result.scalars().all()
|
|
|
|
# Build cad_file_id → list[order_number] mapping
|
|
from collections import defaultdict
|
|
cad_to_orders: dict[str, list[str]] = defaultdict(list)
|
|
for item in items:
|
|
if item.order and item.order.order_number:
|
|
key = str(item.cad_file_id)
|
|
if item.order.order_number not in cad_to_orders[key]:
|
|
cad_to_orders[key].append(item.order.order_number)
|
|
|
|
entries = []
|
|
for cf in cad_files:
|
|
entries.append(CadActivityEntry(
|
|
cad_file_id=str(cf.id),
|
|
original_name=cf.original_name or "unknown",
|
|
file_size=getattr(cf, "file_size", None),
|
|
processing_status=cf.processing_status.value if cf.processing_status else "unknown",
|
|
error_message=getattr(cf, "error_message", None),
|
|
updated_at=cf.updated_at.isoformat() if cf.updated_at else datetime.utcnow().isoformat(),
|
|
created_at=cf.created_at.isoformat() if cf.created_at else datetime.utcnow().isoformat(),
|
|
order_numbers=cad_to_orders.get(str(cf.id), []),
|
|
render_log=getattr(cf, "render_log", None),
|
|
))
|
|
|
|
active_count = sum(
|
|
1 for cf in cad_files
|
|
if cf.processing_status == ProcessingStatus.processing
|
|
)
|
|
failed_count = sum(
|
|
1 for cf in cad_files
|
|
if cf.processing_status == ProcessingStatus.failed
|
|
)
|
|
|
|
# ── Render job activity ──────────────────────────────────────────────
|
|
render_result = await db.execute(
|
|
select(OrderLine)
|
|
.options(
|
|
selectinload(OrderLine.product),
|
|
selectinload(OrderLine.output_type),
|
|
selectinload(OrderLine.order),
|
|
)
|
|
.where(OrderLine.output_type_id.isnot(None))
|
|
.where(OrderLine.render_status != "pending")
|
|
.order_by(OrderLine.updated_at.desc())
|
|
.limit(30)
|
|
)
|
|
render_lines = render_result.scalars().all()
|
|
|
|
render_entries = []
|
|
for rl in render_lines:
|
|
render_entries.append(RenderJobEntry(
|
|
order_line_id=str(rl.id),
|
|
order_number=rl.order.order_number if rl.order else None,
|
|
product_name=rl.product.name if rl.product else None,
|
|
output_type_name=rl.output_type.name if rl.output_type else None,
|
|
render_status=rl.render_status,
|
|
render_backend_used=rl.render_backend_used,
|
|
render_started_at=rl.render_started_at.isoformat() if rl.render_started_at else None,
|
|
render_completed_at=rl.render_completed_at.isoformat() if rl.render_completed_at else None,
|
|
updated_at=rl.updated_at.isoformat(),
|
|
))
|
|
|
|
render_active = sum(1 for rl in render_lines if rl.render_status == "processing")
|
|
render_failed = sum(1 for rl in render_lines if rl.render_status == "failed")
|
|
|
|
return WorkerActivity(
|
|
cad_processing=entries,
|
|
active_count=active_count,
|
|
failed_count=failed_count,
|
|
render_jobs=render_entries,
|
|
render_active_count=render_active,
|
|
render_failed_count=render_failed,
|
|
)
|
|
|
|
|
|
@router.get("/render-log/{order_line_id}")
|
|
async def get_render_log(
|
|
order_line_id: str,
|
|
after: int = 0,
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
"""Return render log entries for an order line (polling fallback)."""
|
|
from app.services.render_log import get_entries, count
|
|
entries = get_entries(order_line_id, after_index=after)
|
|
total = count(order_line_id)
|
|
return {"entries": entries, "total": total, "next_after": total}
|
|
|
|
|
|
@router.get("/render-log/{order_line_id}/stream")
|
|
async def stream_render_log(
|
|
order_line_id: str,
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
"""SSE stream of render log entries for an order line."""
|
|
import asyncio
|
|
import json
|
|
from fastapi.responses import StreamingResponse
|
|
from app.services.render_log import get_entries, count
|
|
|
|
async def event_generator():
|
|
cursor = 0
|
|
idle_ticks = 0
|
|
max_idle = 120 # stop after 2 minutes of no new entries
|
|
while idle_ticks < max_idle:
|
|
entries = get_entries(order_line_id, after_index=cursor)
|
|
if entries:
|
|
idle_ticks = 0
|
|
for entry in entries:
|
|
yield f"data: {json.dumps(entry)}\n\n"
|
|
cursor += len(entries)
|
|
else:
|
|
idle_ticks += 1
|
|
await asyncio.sleep(1)
|
|
yield f"data: {json.dumps({'level': 'info', 'msg': 'Stream ended (idle timeout)', 't': ''})}\n\n"
|
|
|
|
return StreamingResponse(
|
|
event_generator(),
|
|
media_type="text/event-stream",
|
|
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
|
|
)
|
|
|
|
|
|
from fastapi import status as http_status
|
|
|
|
|
|
@router.post("/activity/{cad_file_id}/reprocess", status_code=http_status.HTTP_202_ACCEPTED)
|
|
async def reprocess_cad_file(
|
|
cad_file_id: str,
|
|
user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""Re-queue a CAD file for full processing (STEP extraction + thumbnail + glTF)."""
|
|
result = await db.execute(select(CadFile).where(CadFile.id == cad_file_id))
|
|
cad_file = result.scalar_one_or_none()
|
|
if not cad_file:
|
|
from fastapi import HTTPException
|
|
raise HTTPException(404, detail="CAD file not found")
|
|
|
|
cad_file.processing_status = ProcessingStatus.pending
|
|
await db.commit()
|
|
|
|
from app.tasks.step_tasks import process_step_file
|
|
process_step_file.delay(cad_file_id)
|
|
return {"queued": cad_file_id, "task": "process_step_file"}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Queue inspection + control
|
|
# ---------------------------------------------------------------------------
|
|
|
|
MONITORED_QUEUES = ["step_processing", "thumbnail_rendering", "ai_validation"]
|
|
|
|
|
|
def _parse_redis_task(raw: str) -> dict | None:
|
|
"""Parse a raw Redis Celery message into a simplified dict."""
|
|
import json, base64
|
|
try:
|
|
msg = json.loads(raw)
|
|
headers = msg.get("headers", {})
|
|
task_name = headers.get("task", "unknown")
|
|
task_id = headers.get("id", "unknown")
|
|
argsrepr = headers.get("argsrepr", "")
|
|
args: list = []
|
|
try:
|
|
body = json.loads(base64.b64decode(msg.get("body", "")))
|
|
if isinstance(body, list) and body:
|
|
args = list(body[0])
|
|
except Exception:
|
|
pass
|
|
return {
|
|
"task_id": task_id,
|
|
"task_name": task_name,
|
|
"args": args,
|
|
"argsrepr": argsrepr,
|
|
"status": "pending",
|
|
}
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
@router.get("/queue")
|
|
async def get_queue_status(user: User = Depends(get_current_user)):
|
|
"""Return Celery queue depths, pending tasks, and active/reserved tasks."""
|
|
import asyncio
|
|
import redis as redis_lib
|
|
from app.config import settings as app_settings
|
|
from app.tasks.celery_app import celery_app
|
|
|
|
r = redis_lib.from_url(app_settings.redis_url, decode_responses=True)
|
|
|
|
# Pending tasks per queue from Redis
|
|
queue_depths: dict[str, int] = {}
|
|
pending: list[dict] = []
|
|
for q in MONITORED_QUEUES:
|
|
depth = r.llen(q) or 0
|
|
queue_depths[q] = depth
|
|
if depth > 0:
|
|
raw_items = r.lrange(q, 0, 99)
|
|
for raw in raw_items:
|
|
task = _parse_redis_task(raw)
|
|
if task:
|
|
task["queue"] = q
|
|
pending.append(task)
|
|
|
|
# Active / reserved from Celery inspect (runs in thread, 1.5 s timeout)
|
|
active: list[dict] = []
|
|
reserved: list[dict] = []
|
|
|
|
def _inspect() -> tuple[dict, dict]:
|
|
try:
|
|
insp = celery_app.control.inspect(timeout=1.5)
|
|
return (insp.active() or {}), (insp.reserved() or {})
|
|
except Exception:
|
|
return {}, {}
|
|
|
|
act_raw, rsv_raw = await asyncio.to_thread(_inspect)
|
|
|
|
for worker, tasks in act_raw.items():
|
|
for t in (tasks or []):
|
|
active.append({
|
|
"task_id": t.get("id", ""),
|
|
"task_name": t.get("name", ""),
|
|
"args": list(t.get("args") or []),
|
|
"argsrepr": t.get("kwargs", {}).get("argsrepr", ""),
|
|
"status": "active",
|
|
"worker": worker,
|
|
})
|
|
|
|
for worker, tasks in rsv_raw.items():
|
|
for t in (tasks or []):
|
|
reserved.append({
|
|
"task_id": t.get("id", ""),
|
|
"task_name": t.get("name", ""),
|
|
"args": list(t.get("args") or []),
|
|
"argsrepr": "",
|
|
"status": "reserved",
|
|
"worker": worker,
|
|
})
|
|
|
|
return {
|
|
"queue_depths": queue_depths,
|
|
"pending_count": sum(queue_depths.values()),
|
|
"active": active,
|
|
"reserved": reserved,
|
|
"pending": pending,
|
|
}
|
|
|
|
|
|
@router.post("/queue/purge", status_code=http_status.HTTP_202_ACCEPTED)
|
|
async def purge_queue(user: User = Depends(require_admin_or_pm)):
|
|
"""Delete all pending tasks from all monitored queues."""
|
|
import redis as redis_lib
|
|
from app.config import settings as app_settings
|
|
|
|
r = redis_lib.from_url(app_settings.redis_url, decode_responses=True)
|
|
total = 0
|
|
for q in MONITORED_QUEUES:
|
|
count = r.llen(q) or 0
|
|
if count:
|
|
r.delete(q)
|
|
total += count
|
|
return {"purged": total, "message": f"Removed {total} pending task(s) from queue"}
|
|
|
|
|
|
@router.post("/queue/cancel/{task_id}", status_code=http_status.HTTP_202_ACCEPTED)
|
|
async def cancel_task(task_id: str, user: User = Depends(require_admin_or_pm)):
|
|
"""Revoke a task by ID. Terminates it if running, skips it if still pending."""
|
|
from app.tasks.celery_app import celery_app
|
|
celery_app.control.revoke(task_id, terminate=True, signal="SIGTERM")
|
|
return {"revoked": task_id}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Render health check
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class RenderHealthStatus(BaseModel):
|
|
status: str # "ok" | "degraded" | "down"
|
|
render_worker_connected: bool
|
|
blender_available: bool
|
|
thumbnail_queue_depth: int
|
|
thumbnail_queue_ok: bool
|
|
last_render_at: str | None
|
|
last_render_success: bool | None
|
|
last_render_age_minutes: float | None
|
|
details: dict
|
|
|
|
|
|
@router.get("/health/render", response_model=RenderHealthStatus)
|
|
async def render_health(
|
|
user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""Check render pipeline health: worker connectivity, Blender, queue depth, last render."""
|
|
import asyncio
|
|
import redis as redis_lib
|
|
from app.config import settings as app_settings
|
|
from app.tasks.celery_app import celery_app
|
|
from app.models.order_line import OrderLine
|
|
|
|
details: dict = {}
|
|
|
|
# 1. Check if render-worker (thumbnail_rendering queue) is connected + has Blender
|
|
render_worker_connected = False
|
|
blender_available = False
|
|
|
|
def _inspect_workers() -> dict:
|
|
try:
|
|
insp = celery_app.control.inspect(timeout=2.0)
|
|
ping = insp.ping() or {}
|
|
active_queues = insp.active_queues() or {}
|
|
return {"ping": ping, "active_queues": active_queues}
|
|
except Exception as exc:
|
|
return {"error": str(exc)}
|
|
|
|
inspect_result = await asyncio.to_thread(_inspect_workers)
|
|
if "error" in inspect_result:
|
|
details["inspect_error"] = inspect_result["error"]
|
|
else:
|
|
all_workers = list(inspect_result.get("ping", {}).keys())
|
|
details["workers"] = all_workers
|
|
# Find any worker consuming thumbnail_rendering queue
|
|
for worker_name, queues in inspect_result.get("active_queues", {}).items():
|
|
queue_names = [q.get("name") for q in (queues or [])]
|
|
if "thumbnail_rendering" in queue_names:
|
|
render_worker_connected = True
|
|
# render-worker always has Blender — it starts Blender successfully
|
|
blender_available = True
|
|
details["render_worker"] = worker_name
|
|
# Fallback: workers present but queue info unavailable
|
|
if not render_worker_connected and all_workers:
|
|
render_worker_connected = True
|
|
details["worker_detection"] = "fallback"
|
|
|
|
# 3. Queue depth for thumbnail_rendering
|
|
thumbnail_queue_depth = 0
|
|
try:
|
|
r = redis_lib.from_url(app_settings.redis_url, decode_responses=True)
|
|
thumbnail_queue_depth = r.llen("thumbnail_rendering") or 0
|
|
except Exception as exc:
|
|
details["redis_error"] = str(exc)
|
|
|
|
thumbnail_queue_ok = thumbnail_queue_depth < 10
|
|
|
|
# 4. Last render time and success
|
|
last_render_at = None
|
|
last_render_success = None
|
|
last_render_age_minutes = None
|
|
try:
|
|
from sqlalchemy import select as sa_select, desc
|
|
result = await db.execute(
|
|
sa_select(OrderLine.render_completed_at, OrderLine.render_status)
|
|
.where(OrderLine.render_completed_at.isnot(None))
|
|
.order_by(desc(OrderLine.render_completed_at))
|
|
.limit(1)
|
|
)
|
|
row = result.first()
|
|
if row:
|
|
last_render_at = row[0].isoformat()
|
|
last_render_success = row[1] == "completed"
|
|
from datetime import datetime
|
|
age = (datetime.utcnow() - row[0]).total_seconds() / 60
|
|
last_render_age_minutes = round(age, 1)
|
|
except Exception as exc:
|
|
details["db_error"] = str(exc)
|
|
|
|
# Determine overall status
|
|
if not render_worker_connected or not blender_available:
|
|
status = "down"
|
|
elif not thumbnail_queue_ok:
|
|
status = "degraded"
|
|
elif last_render_success is False and last_render_age_minutes is not None and last_render_age_minutes < 30:
|
|
status = "degraded"
|
|
else:
|
|
status = "ok"
|
|
|
|
return RenderHealthStatus(
|
|
status=status,
|
|
render_worker_connected=render_worker_connected,
|
|
blender_available=blender_available,
|
|
thumbnail_queue_depth=thumbnail_queue_depth,
|
|
thumbnail_queue_ok=thumbnail_queue_ok,
|
|
last_render_at=last_render_at,
|
|
last_render_success=last_render_success,
|
|
last_render_age_minutes=last_render_age_minutes,
|
|
details=details,
|
|
)
|