Files
HartOMat/backend/app/tasks/beat_tasks.py
T
Hartmut 07e3d1e026 feat(phase8.1-8.2): dynamic worker concurrency via worker_configs
- Migration 054: worker_configs table (queue_name PK, max/min_concurrency,
  enabled, updated_at); seeds step_processing(8/2), thumbnail_rendering(1/1),
  ai_validation(4/1)
- WorkerConfig SQLAlchemy model
- apply_worker_concurrency beat task: reads enabled configs, broadcasts
  pool_grow to all Celery workers every 5min
- GET/PUT /api/worker/configs (admin): list + update per-queue concurrency
- docker-compose.yml: worker uses --autoscale=${MAX_CONCURRENCY:-8},${MIN_CONCURRENCY:-2};
  render-worker uses --autoscale=1,1 --concurrency=1
- WorkerManagement.tsx: "Concurrency Settings" section with +/- steppers
  and Save button per queue

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-08 20:41:57 +01:00

295 lines
12 KiB
Python

"""Celery Beat periodic tasks."""
from __future__ import annotations
import json
import logging
from datetime import datetime, timedelta
from celery import shared_task
logger = logging.getLogger(__name__)
@shared_task(name="app.tasks.beat_tasks.apply_worker_concurrency", queue="step_processing")
def apply_worker_concurrency() -> None:
"""Read worker_configs from DB and broadcast pool_grow to workers.
Runs every 5 minutes via Celery Beat. Signals all workers to adjust their
pool size to match the max_concurrency setting for each enabled queue.
"""
try:
from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session
from app.config import settings as app_settings
from app.models.worker_config import WorkerConfig
from app.tasks.celery_app import celery_app
sync_url = app_settings.database_url.replace("+asyncpg", "")
eng = create_engine(sync_url)
try:
with Session(eng) as session:
configs = session.execute(
select(WorkerConfig).where(WorkerConfig.enabled == True) # noqa: E712
).scalars().all()
for cfg in configs:
try:
celery_app.control.broadcast(
"pool_grow",
arguments={"n": cfg.max_concurrency},
destination=None, # all workers
reply=False,
)
logger.info(
"[WORKER_SCALE] Signalled pool_grow n=%d for queue %s",
cfg.max_concurrency, cfg.queue_name,
)
except Exception as exc:
logger.warning(
"[WORKER_SCALE] pool_grow failed for %s: %s",
cfg.queue_name, exc,
)
finally:
eng.dispose()
except Exception as exc:
logger.error("apply_worker_concurrency failed: %s", exc)
@shared_task(name="app.tasks.beat_tasks.broadcast_queue_status", queue="step_processing")
def broadcast_queue_status() -> None:
"""Broadcast current queue depths to all WebSocket clients every 10s.
Publishes to the Redis '__broadcast__' channel which the WebSocket
subscriber in the FastAPI process forwards to all connected clients.
"""
try:
import redis as sync_redis
from app.config import settings
r = sync_redis.from_url(settings.redis_url, decode_responses=True)
depths = {
"step_processing": r.llen("step_processing"),
"thumbnail_rendering": r.llen("thumbnail_rendering"),
}
event = {"type": "queue_update", "depths": depths}
r.publish("__broadcast__", json.dumps(event))
r.close()
logger.debug("Broadcast queue_update: %s", depths)
except Exception as exc:
logger.warning("broadcast_queue_status failed: %s", exc)
@shared_task(name="app.tasks.beat_tasks.recover_stuck_cad_files", queue="step_processing")
def recover_stuck_cad_files() -> None:
"""Reset CAD files stuck in 'processing' for more than 10 minutes to 'failed'.
This recovers from worker crashes (container restarts, OOM kills) that leave
the processing_status committed as 'processing' with no task running to complete it.
Runs every 5 minutes via Celery Beat.
"""
try:
from sqlalchemy import create_engine, update, and_
from sqlalchemy.orm import Session
from app.config import settings
from app.models.cad_file import CadFile, ProcessingStatus
cutoff = datetime.utcnow() - timedelta(minutes=10)
sync_url = settings.database_url.replace("+asyncpg", "")
engine = create_engine(sync_url)
with Session(engine) as session:
result = session.execute(
update(CadFile)
.where(
and_(
CadFile.processing_status == ProcessingStatus.processing,
CadFile.updated_at < cutoff,
)
)
.values(
processing_status=ProcessingStatus.failed,
error_message="Processing timed out — worker may have crashed. Use 'Regenerate Thumbnail' to retry.",
)
.returning(CadFile.id, CadFile.original_name)
)
rows = result.fetchall()
session.commit()
engine.dispose()
if rows:
names = [r[1] for r in rows]
logger.warning(
"recover_stuck_cad_files: reset %d stuck file(s) to failed: %s",
len(rows), names,
)
else:
logger.debug("recover_stuck_cad_files: no stuck files found")
except Exception as exc:
logger.error("recover_stuck_cad_files failed: %s", exc)
@shared_task(name="app.tasks.beat_tasks.recover_stalled_renders", queue="step_processing")
def recover_stalled_renders() -> None:
"""Detect order_lines stuck in render_status='processing' for longer than
RENDER_STALL_TIMEOUT_MINUTES (default 30) and mark them as failed.
Tries to revoke the Celery task if a celery_task_id is stored in render_job_doc.
Emits a single batch system alert to admins via CHANNEL_ALERT.
Runs every 5 minutes via Celery Beat.
"""
RENDER_STALL_TIMEOUT_MINUTES = 30
try:
from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session
from app.config import settings
from app.domains.orders.models import OrderLine
from app.domains.notifications.service import emit_notification_sync, CHANNEL_ALERT
from app.tasks.celery_app import celery_app
cutoff = datetime.utcnow() - timedelta(minutes=RENDER_STALL_TIMEOUT_MINUTES)
sync_url = settings.database_url.replace("+asyncpg", "")
engine = create_engine(sync_url)
stalled_ids: list[str] = []
with Session(engine) as session:
rows = session.execute(
select(OrderLine).where(
OrderLine.render_status == "processing",
OrderLine.render_started_at < cutoff,
)
).scalars().all()
for line in rows:
elapsed_minutes = int(
(datetime.utcnow() - line.render_started_at).total_seconds() / 60
)
# Try to revoke the Celery task if we have its ID
job_doc = line.render_job_doc or {}
celery_task_id = job_doc.get("celery_task_id")
if celery_task_id:
try:
celery_app.control.revoke(celery_task_id, terminate=True)
logger.info(
"[WORKER_RECOVERY] Revoked Celery task %s for order_line %s",
celery_task_id, line.id,
)
except Exception as revoke_exc:
logger.warning(
"[WORKER_RECOVERY] Could not revoke task %s: %s",
celery_task_id, revoke_exc,
)
# Update render_job_doc with failure info
updated_doc = dict(job_doc)
updated_doc["state"] = "failed"
updated_doc["error"] = (
f"Stalled render terminated by health recovery after {RENDER_STALL_TIMEOUT_MINUTES}min"
)
line.render_status = "failed"
line.render_job_doc = updated_doc
stalled_ids.append(str(line.id))
logger.warning(
"[WORKER_RECOVERY] Stalled render for order_line %s terminated after %dmin",
line.id, elapsed_minutes,
)
if stalled_ids:
session.commit()
engine.dispose()
if stalled_ids:
emit_notification_sync(
action="worker.recovery",
entity_type="system",
details={
"message": (
f"Worker recovery: {len(stalled_ids)} stalled render(s) terminated "
f"after {RENDER_STALL_TIMEOUT_MINUTES}min timeout"
),
"stalled_order_line_ids": stalled_ids,
"count": len(stalled_ids),
},
channel=CHANNEL_ALERT,
)
logger.warning(
"recover_stalled_renders: terminated %d stalled render(s): %s",
len(stalled_ids), stalled_ids,
)
else:
logger.debug("recover_stalled_renders: no stalled renders found")
except Exception as exc:
logger.error("recover_stalled_renders failed: %s", exc)
@shared_task(name="app.tasks.beat_tasks.batch_render_notifications", queue="step_processing")
def batch_render_notifications() -> None:
"""Check for orders where all lines are terminal but no batch notification emitted yet.
This acts as a safety net for batch notifications that may have been missed
if the order completion hook failed or was skipped. Runs every 60 seconds.
"""
try:
from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session
from app.config import settings
from app.domains.orders.models import Order, OrderStatus
from app.domains.notifications.models import AuditLog
from app.domains.notifications.service import (
emit_batch_render_notification_sync,
CHANNEL_NOTIFICATION,
)
sync_url = settings.database_url.replace("+asyncpg", "")
engine = create_engine(sync_url)
with Session(engine) as session:
# Find all completed orders
completed_order_ids: list[str] = [
str(oid) for oid in session.execute(
select(Order.id).where(Order.status == OrderStatus.completed)
).scalars().all()
]
if not completed_order_ids:
engine.dispose()
logger.debug("batch_render_notifications: no completed orders found")
return
# Find which of those already have a batch notification
notified_ids_raw = session.execute(
select(AuditLog.entity_id).where(
AuditLog.entity_type == "order",
AuditLog.action == "order.completed",
AuditLog.channel == CHANNEL_NOTIFICATION,
AuditLog.entity_id.in_(completed_order_ids),
)
).scalars().all()
notified_ids = set(notified_ids_raw)
engine.dispose()
orders_needing_notification = [
oid for oid in completed_order_ids if oid not in notified_ids
]
if not orders_needing_notification:
logger.debug("batch_render_notifications: no orders need batch notification")
return
logger.info(
"batch_render_notifications: emitting batch notifications for %d order(s)",
len(orders_needing_notification),
)
for order_id in orders_needing_notification:
try:
emit_batch_render_notification_sync(str(order_id))
except Exception as exc:
logger.error(
"batch_render_notifications: failed for order %s: %s", order_id, exc
)
except Exception as exc:
logger.error("batch_render_notifications failed: %s", exc)