Files
HartOMat/backend/app/api/routers/worker.py
T

737 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Worker activity router — exposes recent background task status."""
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from pydantic import BaseModel
from app.database import get_db
from app.models.cad_file import CadFile, ProcessingStatus
from app.models.order_item import OrderItem
from app.models.order import Order
from app.models.order_line import OrderLine
from app.models.product import Product
from app.models.user import User
from app.models.worker_config import WorkerConfig
from app.models.system_setting import SystemSetting
from app.utils.auth import get_current_user, require_admin_or_pm, require_global_admin
router = APIRouter(prefix="/worker", tags=["worker"])
class CadActivityEntry(BaseModel):
cad_file_id: str
original_name: str
file_size: int | None
processing_status: str
error_message: str | None
updated_at: str
created_at: str
order_numbers: list[str]
render_log: dict | None
class RenderJobEntry(BaseModel):
order_line_id: str
order_id: str | None
order_number: str | None
product_name: str | None
output_type_name: str | None
render_status: str
render_backend_used: str | None
render_started_at: str | None
render_completed_at: str | None
updated_at: str
class WorkerActivity(BaseModel):
cad_processing: list[CadActivityEntry]
active_count: int # files currently in "processing" state
failed_count: int # files in "failed" state (recent 50)
render_jobs: list[RenderJobEntry] = []
render_active_count: int = 0
render_failed_count: int = 0
@router.get("/activity", response_model=WorkerActivity)
async def get_worker_activity(
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
Return recent CAD file processing activity.
Shows the last 30 processed/failed/processing CAD files so the user can
see what the worker is doing without needing Flower or Celery logs.
"""
# Recent CadFiles ordered by last update, with order_items to resolve order numbers
result = await db.execute(
select(CadFile)
.order_by(CadFile.updated_at.desc())
.limit(30)
)
cad_files = result.scalars().all()
if not cad_files:
return WorkerActivity(cad_processing=[], active_count=0, failed_count=0)
# Fetch order items referencing these CAD files in one query
cad_ids = [cf.id for cf in cad_files]
items_result = await db.execute(
select(OrderItem)
.options(selectinload(OrderItem.order))
.where(OrderItem.cad_file_id.in_(cad_ids))
)
items = items_result.scalars().all()
# Build cad_file_id → list[order_number] mapping
from collections import defaultdict
cad_to_orders: dict[str, list[str]] = defaultdict(list)
for item in items:
if item.order and item.order.order_number:
key = str(item.cad_file_id)
if item.order.order_number not in cad_to_orders[key]:
cad_to_orders[key].append(item.order.order_number)
entries = []
for cf in cad_files:
entries.append(CadActivityEntry(
cad_file_id=str(cf.id),
original_name=cf.original_name or "unknown",
file_size=getattr(cf, "file_size", None),
processing_status=cf.processing_status.value if cf.processing_status else "unknown",
error_message=getattr(cf, "error_message", None),
updated_at=cf.updated_at.isoformat() if cf.updated_at else datetime.utcnow().isoformat(),
created_at=cf.created_at.isoformat() if cf.created_at else datetime.utcnow().isoformat(),
order_numbers=cad_to_orders.get(str(cf.id), []),
render_log=getattr(cf, "render_log", None),
))
active_count = sum(
1 for cf in cad_files
if cf.processing_status == ProcessingStatus.processing
)
failed_count = sum(
1 for cf in cad_files
if cf.processing_status == ProcessingStatus.failed
)
# ── Render job activity ──────────────────────────────────────────────
render_result = await db.execute(
select(OrderLine)
.options(
selectinload(OrderLine.product),
selectinload(OrderLine.output_type),
selectinload(OrderLine.order),
)
.where(OrderLine.output_type_id.isnot(None))
.where(OrderLine.render_status != "pending")
.order_by(OrderLine.updated_at.desc())
.limit(30)
)
render_lines = render_result.scalars().all()
render_entries = []
for rl in render_lines:
render_entries.append(RenderJobEntry(
order_line_id=str(rl.id),
order_id=str(rl.order_id) if rl.order_id else None,
order_number=rl.order.order_number if rl.order else None,
product_name=rl.product.name if rl.product else None,
output_type_name=rl.output_type.name if rl.output_type else None,
render_status=rl.render_status,
render_backend_used=rl.render_backend_used,
render_started_at=rl.render_started_at.isoformat() if rl.render_started_at else None,
render_completed_at=rl.render_completed_at.isoformat() if rl.render_completed_at else None,
updated_at=rl.updated_at.isoformat(),
))
render_active = sum(1 for rl in render_lines if rl.render_status == "processing")
render_failed = sum(1 for rl in render_lines if rl.render_status == "failed")
return WorkerActivity(
cad_processing=entries,
active_count=active_count,
failed_count=failed_count,
render_jobs=render_entries,
render_active_count=render_active,
render_failed_count=render_failed,
)
@router.get("/render-log/{order_line_id}")
async def get_render_log(
order_line_id: str,
after: int = 0,
user: User = Depends(get_current_user),
):
"""Return render log entries for an order line (polling fallback)."""
from app.services.render_log import get_entries, count
entries = get_entries(order_line_id, after_index=after)
total = count(order_line_id)
return {"entries": entries, "total": total, "next_after": total}
@router.get("/render-log/{order_line_id}/stream")
async def stream_render_log(
order_line_id: str,
user: User = Depends(get_current_user),
):
"""SSE stream of render log entries for an order line."""
import asyncio
import json
from fastapi.responses import StreamingResponse
from app.services.render_log import get_entries, count
async def event_generator():
cursor = 0
idle_ticks = 0
max_idle = 120 # stop after 2 minutes of no new entries
while idle_ticks < max_idle:
entries = get_entries(order_line_id, after_index=cursor)
if entries:
idle_ticks = 0
for entry in entries:
yield f"data: {json.dumps(entry)}\n\n"
cursor += len(entries)
else:
idle_ticks += 1
await asyncio.sleep(1)
yield f"data: {json.dumps({'level': 'info', 'msg': 'Stream ended (idle timeout)', 't': ''})}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
from fastapi import status as http_status
@router.post("/activity/dismiss-failed", status_code=http_status.HTTP_200_OK)
async def dismiss_all_failed(
user: User = Depends(require_admin_or_pm),
db: AsyncSession = Depends(get_db),
):
"""Reset all failed render jobs and CAD files so they stop showing as failures."""
from sqlalchemy import update as sql_update
render_result = await db.execute(
sql_update(OrderLine)
.where(OrderLine.render_status == "failed")
.values(render_status="cancelled")
)
cad_result = await db.execute(
sql_update(CadFile)
.where(CadFile.processing_status == ProcessingStatus.failed)
.values(processing_status=ProcessingStatus.pending)
)
await db.commit()
return {"dismissed_renders": render_result.rowcount, "dismissed_cad": cad_result.rowcount}
@router.post("/activity/dismiss-render/{order_line_id}", status_code=http_status.HTTP_200_OK)
async def dismiss_single_failed_render(
order_line_id: str,
user: User = Depends(require_admin_or_pm),
db: AsyncSession = Depends(get_db),
):
"""Dismiss a single failed render job by setting its status to 'cancelled'."""
result = await db.execute(
select(OrderLine).where(
OrderLine.id == order_line_id,
OrderLine.render_status == "failed",
)
)
line = result.scalar_one_or_none()
if not line:
raise HTTPException(404, detail="Failed render job not found")
line.render_status = "cancelled"
await db.commit()
return {"dismissed": order_line_id}
@router.post("/activity/{cad_file_id}/reprocess", status_code=http_status.HTTP_202_ACCEPTED)
async def reprocess_cad_file(
cad_file_id: str,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Re-queue a CAD file for full processing (STEP extraction + thumbnail + glTF)."""
result = await db.execute(select(CadFile).where(CadFile.id == cad_file_id))
cad_file = result.scalar_one_or_none()
if not cad_file:
from fastapi import HTTPException
raise HTTPException(404, detail="CAD file not found")
cad_file.processing_status = ProcessingStatus.pending
await db.commit()
from app.tasks.step_tasks import process_step_file
process_step_file.delay(cad_file_id)
return {"queued": cad_file_id, "task": "process_step_file"}
# ---------------------------------------------------------------------------
# Queue inspection + control
# ---------------------------------------------------------------------------
MONITORED_QUEUES = ["step_processing", "asset_pipeline", "ai_validation"]
def _parse_redis_task(raw: str) -> dict | None:
"""Parse a raw Redis Celery message into a simplified dict."""
import json, base64
try:
msg = json.loads(raw)
headers = msg.get("headers", {})
task_name = headers.get("task", "unknown")
task_id = headers.get("id", "unknown")
argsrepr = headers.get("argsrepr", "")
args: list = []
try:
body = json.loads(base64.b64decode(msg.get("body", "")))
if isinstance(body, list) and body:
args = list(body[0])
except Exception:
pass
return {
"task_id": task_id,
"task_name": task_name,
"args": args,
"argsrepr": argsrepr,
"status": "pending",
}
except Exception:
return None
@router.get("/queue")
async def get_queue_status(user: User = Depends(get_current_user)):
"""Return Celery queue depths, pending tasks, and active/reserved tasks."""
import asyncio
import redis as redis_lib
from app.config import settings as app_settings
from app.tasks.celery_app import celery_app
r = redis_lib.from_url(app_settings.redis_url, decode_responses=True)
# Pending tasks per queue from Redis
queue_depths: dict[str, int] = {}
pending: list[dict] = []
for q in MONITORED_QUEUES:
depth = r.llen(q) or 0
queue_depths[q] = depth
if depth > 0:
raw_items = r.lrange(q, 0, 99)
for raw in raw_items:
task = _parse_redis_task(raw)
if task:
task["queue"] = q
pending.append(task)
# Active / reserved from Celery inspect (runs in thread, 1.5 s timeout)
active: list[dict] = []
reserved: list[dict] = []
def _inspect() -> tuple[dict, dict]:
try:
insp = celery_app.control.inspect(timeout=1.5)
return (insp.active() or {}), (insp.reserved() or {})
except Exception:
return {}, {}
act_raw, rsv_raw = await asyncio.to_thread(_inspect)
for worker, tasks in act_raw.items():
for t in (tasks or []):
active.append({
"task_id": t.get("id", ""),
"task_name": t.get("name", ""),
"args": list(t.get("args") or []),
"argsrepr": t.get("kwargs", {}).get("argsrepr", ""),
"status": "active",
"worker": worker,
})
for worker, tasks in rsv_raw.items():
for t in (tasks or []):
reserved.append({
"task_id": t.get("id", ""),
"task_name": t.get("name", ""),
"args": list(t.get("args") or []),
"argsrepr": "",
"status": "reserved",
"worker": worker,
})
return {
"queue_depths": queue_depths,
"pending_count": sum(queue_depths.values()),
"active": active,
"reserved": reserved,
"pending": pending,
}
@router.post("/queue/purge", status_code=http_status.HTTP_202_ACCEPTED)
async def purge_queue(user: User = Depends(require_admin_or_pm)):
"""Delete all pending tasks from all monitored queues."""
import redis as redis_lib
from app.config import settings as app_settings
r = redis_lib.from_url(app_settings.redis_url, decode_responses=True)
total = 0
for q in MONITORED_QUEUES:
count = r.llen(q) or 0
if count:
r.delete(q)
total += count
return {"purged": total, "message": f"Removed {total} pending task(s) from queue"}
@router.post("/queue/cancel/{task_id}", status_code=http_status.HTTP_202_ACCEPTED)
async def cancel_task(task_id: str, user: User = Depends(require_admin_or_pm)):
"""Revoke a task by ID. Terminates it if running, skips it if still pending."""
from app.tasks.celery_app import celery_app
celery_app.control.revoke(task_id, terminate=True, signal="SIGTERM")
return {"revoked": task_id}
# ---------------------------------------------------------------------------
# Worker management — list workers + scale
# ---------------------------------------------------------------------------
class ScaleRequest(BaseModel):
service: str # "render-worker" | "worker"
count: int # 020
@router.get("/celery-workers")
async def get_celery_workers(user: User = Depends(require_admin_or_pm)):
"""List active Celery workers with their queues and active task counts."""
import asyncio
from app.tasks.celery_app import celery_app
def _inspect() -> dict:
try:
insp = celery_app.control.inspect(timeout=2.0)
return {
"active_queues": insp.active_queues() or {},
"active": insp.active() or {},
"stats": insp.stats() or {},
}
except Exception as exc:
return {"error": str(exc)}
data = await asyncio.to_thread(_inspect)
if "error" in data:
return {"workers": [], "error": data["error"]}
workers = []
for worker_name, queues in data.get("active_queues", {}).items():
queue_names = [q.get("name") for q in (queues or [])]
active_tasks = data.get("active", {}).get(worker_name, [])
stats = data.get("stats", {}).get(worker_name, {})
workers.append({
"name": worker_name,
"queues": queue_names,
"active_task_count": len(active_tasks),
"active_tasks": [
{"name": t.get("name"), "id": t.get("id")} for t in active_tasks
],
"total_tasks_processed": stats.get("total", {}),
})
return {"workers": workers}
@router.post("/scale", status_code=http_status.HTTP_202_ACCEPTED)
async def scale_workers(
body: ScaleRequest,
user: User = Depends(require_admin_or_pm),
):
"""Scale a Compose service (render-worker, worker) up or down.
Requires the docker socket and compose file to be accessible inside the container
(see docker-compose.yml COMPOSE_PROJECT_DIR env var).
"""
import asyncio
import os
import subprocess
from fastapi import HTTPException
ALLOWED_SERVICES = {"render-worker", "worker"}
if body.service not in ALLOWED_SERVICES:
raise HTTPException(400, detail=f"service must be one of {ALLOWED_SERVICES}")
if not (0 <= body.count <= 20):
raise HTTPException(400, detail="count must be between 0 and 20")
compose_dir = os.environ.get("COMPOSE_PROJECT_DIR", "/compose")
compose_file = os.path.join(compose_dir, "docker-compose.yml")
# Derive project name from compose dir on host (directory name = project name).
# Inside the container the compose file is at /compose, but the host project
# dir name determines the container naming prefix (e.g. "schaefflerautomat").
compose_project = os.environ.get("COMPOSE_PROJECT_NAME", "schaefflerautomat")
def _scale() -> subprocess.CompletedProcess:
return subprocess.run(
[
"docker", "compose",
"-f", compose_file,
"-p", compose_project,
"up",
"--scale", f"{body.service}={body.count}",
"--no-build",
"--no-recreate",
"-d",
],
capture_output=True, text=True, timeout=120,
)
try:
result = await asyncio.to_thread(_scale)
except subprocess.TimeoutExpired:
raise HTTPException(504, detail="Scale operation timed out")
if result.returncode != 0:
raise HTTPException(
500,
detail=f"docker compose scale failed: {result.stderr[-500:]}",
)
return {"service": body.service, "count": body.count, "status": "scaling"}
# ---------------------------------------------------------------------------
# GPU probe
# ---------------------------------------------------------------------------
@router.post("/probe/gpu", status_code=http_status.HTTP_202_ACCEPTED)
async def trigger_gpu_probe(current_user: User = Depends(require_global_admin)):
"""Queue a GPU probe task on the render-worker."""
from app.tasks.gpu_tasks import probe_gpu
result = probe_gpu.delay()
return {"task_id": str(result.id), "queued": True}
@router.get("/probe/gpu/result")
async def get_gpu_probe_result(
current_user: User = Depends(require_global_admin),
db: AsyncSession = Depends(get_db),
):
"""Return the last GPU probe result from system_settings."""
import json
row = await db.execute(
select(SystemSetting).where(SystemSetting.key == "gpu_probe_last_result")
)
setting = row.scalar_one_or_none()
if not setting:
return {"status": "unknown", "message": "No probe run yet. Click Run GPU Check."}
return json.loads(setting.value)
# ---------------------------------------------------------------------------
# Render health check
# ---------------------------------------------------------------------------
class RenderHealthStatus(BaseModel):
status: str # "ok" | "degraded" | "down"
render_worker_connected: bool
blender_available: bool
thumbnail_queue_depth: int
thumbnail_queue_ok: bool
last_render_at: str | None
last_render_success: bool | None
last_render_age_minutes: float | None
details: dict
@router.get("/health/render", response_model=RenderHealthStatus)
async def render_health(
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Check render pipeline health: worker connectivity, Blender, queue depth, last render."""
import asyncio
import redis as redis_lib
from app.config import settings as app_settings
from app.tasks.celery_app import celery_app
from app.models.order_line import OrderLine
details: dict = {}
# 1. Check if render-worker (asset_pipeline queue) is connected + has Blender
render_worker_connected = False
blender_available = False
def _inspect_workers() -> dict:
try:
insp = celery_app.control.inspect(timeout=2.0)
ping = insp.ping() or {}
active_queues = insp.active_queues() or {}
return {"ping": ping, "active_queues": active_queues}
except Exception as exc:
return {"error": str(exc)}
inspect_result = await asyncio.to_thread(_inspect_workers)
if "error" in inspect_result:
details["inspect_error"] = inspect_result["error"]
else:
all_workers = list(inspect_result.get("ping", {}).keys())
details["workers"] = all_workers
# Find any worker consuming asset_pipeline queue
for worker_name, queues in inspect_result.get("active_queues", {}).items():
queue_names = [q.get("name") for q in (queues or [])]
if "asset_pipeline" in queue_names:
render_worker_connected = True
# render-worker always has Blender — it starts Blender successfully
blender_available = True
details["render_worker"] = worker_name
# Fallback: workers present but queue info unavailable
if not render_worker_connected and all_workers:
render_worker_connected = True
details["worker_detection"] = "fallback"
# 3. Queue depth for asset_pipeline
thumbnail_queue_depth = 0
try:
r = redis_lib.from_url(app_settings.redis_url, decode_responses=True)
thumbnail_queue_depth = r.llen("asset_pipeline") or 0
except Exception as exc:
details["redis_error"] = str(exc)
thumbnail_queue_ok = thumbnail_queue_depth < 10
# 4. Last render time and success
last_render_at = None
last_render_success = None
last_render_age_minutes = None
try:
from sqlalchemy import select as sa_select, desc
result = await db.execute(
sa_select(OrderLine.render_completed_at, OrderLine.render_status)
.where(OrderLine.render_completed_at.isnot(None))
.order_by(desc(OrderLine.render_completed_at))
.limit(1)
)
row = result.first()
if row:
last_render_at = row[0].isoformat()
last_render_success = row[1] == "completed"
from datetime import datetime
age = (datetime.utcnow() - row[0]).total_seconds() / 60
last_render_age_minutes = round(age, 1)
except Exception as exc:
details["db_error"] = str(exc)
# Determine overall status
if not render_worker_connected or not blender_available:
status = "down"
elif not thumbnail_queue_ok:
status = "degraded"
elif last_render_success is False and last_render_age_minutes is not None and last_render_age_minutes < 30:
status = "degraded"
else:
status = "ok"
return RenderHealthStatus(
status=status,
render_worker_connected=render_worker_connected,
blender_available=blender_available,
thumbnail_queue_depth=thumbnail_queue_depth,
thumbnail_queue_ok=thumbnail_queue_ok,
last_render_at=last_render_at,
last_render_success=last_render_success,
last_render_age_minutes=last_render_age_minutes,
details=details,
)
# ---------------------------------------------------------------------------
# Worker concurrency configuration
# ---------------------------------------------------------------------------
class WorkerConfigOut(BaseModel):
queue_name: str
max_concurrency: int
min_concurrency: int
enabled: bool
updated_at: str
class WorkerConfigUpdate(BaseModel):
max_concurrency: Optional[int] = None
min_concurrency: Optional[int] = None
enabled: Optional[bool] = None
@router.get("/configs", response_model=list[WorkerConfigOut])
async def list_worker_configs(
user: User = Depends(require_global_admin),
db: AsyncSession = Depends(get_db),
):
"""List all worker concurrency configurations (admin only)."""
result = await db.execute(select(WorkerConfig).order_by(WorkerConfig.queue_name))
configs = result.scalars().all()
return [
WorkerConfigOut(
queue_name=cfg.queue_name,
max_concurrency=cfg.max_concurrency,
min_concurrency=cfg.min_concurrency,
enabled=cfg.enabled,
updated_at=cfg.updated_at.isoformat() if cfg.updated_at else datetime.utcnow().isoformat(),
)
for cfg in configs
]
@router.put("/configs/{queue_name}", response_model=WorkerConfigOut)
async def update_worker_config(
queue_name: str,
body: WorkerConfigUpdate,
user: User = Depends(require_global_admin),
db: AsyncSession = Depends(get_db),
):
"""Update concurrency settings for a specific queue (admin only)."""
result = await db.execute(
select(WorkerConfig).where(WorkerConfig.queue_name == queue_name)
)
cfg = result.scalar_one_or_none()
if not cfg:
raise HTTPException(404, detail=f"No worker config found for queue '{queue_name}'")
if body.max_concurrency is not None:
if body.max_concurrency < 1:
raise HTTPException(400, detail="max_concurrency must be >= 1")
cfg.max_concurrency = body.max_concurrency
if body.min_concurrency is not None:
if body.min_concurrency < 1:
raise HTTPException(400, detail="min_concurrency must be >= 1")
cfg.min_concurrency = body.min_concurrency
if body.enabled is not None:
cfg.enabled = body.enabled
# Validate min <= max after updates
if cfg.min_concurrency > cfg.max_concurrency:
raise HTTPException(400, detail="min_concurrency cannot exceed max_concurrency")
cfg.updated_at = datetime.utcnow()
await db.commit()
await db.refresh(cfg)
return WorkerConfigOut(
queue_name=cfg.queue_name,
max_concurrency=cfg.max_concurrency,
min_concurrency=cfg.min_concurrency,
enabled=cfg.enabled,
updated_at=cfg.updated_at.isoformat(),
)