1321ef2bd4
The queue handles far more than thumbnails: OCC tessellation, USD master generation, GLB production, order line renders, and workflow renders. asset_pipeline better reflects its role as the render-worker's primary queue. Updated all references in: task decorators, celery_app.py, beat_tasks.py, docker-compose.yml worker command, worker.py MONITORED_QUEUES, admin.py, CLAUDE.md, LEARNINGS.md, Dockerfile, helpTexts.ts, test files, and all .claude/commands/*.md skill files. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
300 lines
12 KiB
Python
300 lines
12 KiB
Python
"""Celery Beat periodic tasks."""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
|
|
from celery import shared_task
|
|
|
|
# Eagerly import all models so SQLAlchemy can resolve string-based relationship
|
|
# references (e.g. relationship("Template"), relationship("User")) when domain
|
|
# models are imported individually inside task functions.
|
|
import app.models as _all_models # noqa: F401
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@shared_task(name="app.tasks.beat_tasks.apply_worker_concurrency", queue="step_processing")
|
|
def apply_worker_concurrency() -> None:
|
|
"""Read worker_configs from DB and broadcast pool_grow to workers.
|
|
|
|
Runs every 5 minutes via Celery Beat. Signals all workers to adjust their
|
|
pool size to match the max_concurrency setting for each enabled queue.
|
|
"""
|
|
try:
|
|
from sqlalchemy import create_engine, select
|
|
from sqlalchemy.orm import Session
|
|
from app.config import settings as app_settings
|
|
from app.models.worker_config import WorkerConfig
|
|
from app.tasks.celery_app import celery_app
|
|
|
|
sync_url = app_settings.database_url.replace("+asyncpg", "")
|
|
eng = create_engine(sync_url)
|
|
try:
|
|
with Session(eng) as session:
|
|
configs = session.execute(
|
|
select(WorkerConfig).where(WorkerConfig.enabled == True) # noqa: E712
|
|
).scalars().all()
|
|
for cfg in configs:
|
|
try:
|
|
celery_app.control.broadcast(
|
|
"pool_grow",
|
|
arguments={"n": cfg.max_concurrency},
|
|
destination=None, # all workers
|
|
reply=False,
|
|
)
|
|
logger.info(
|
|
"[WORKER_SCALE] Signalled pool_grow n=%d for queue %s",
|
|
cfg.max_concurrency, cfg.queue_name,
|
|
)
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"[WORKER_SCALE] pool_grow failed for %s: %s",
|
|
cfg.queue_name, exc,
|
|
)
|
|
finally:
|
|
eng.dispose()
|
|
except Exception as exc:
|
|
logger.error("apply_worker_concurrency failed: %s", exc)
|
|
|
|
|
|
@shared_task(name="app.tasks.beat_tasks.broadcast_queue_status", queue="step_processing")
|
|
def broadcast_queue_status() -> None:
|
|
"""Broadcast current queue depths to all WebSocket clients every 10s.
|
|
|
|
Publishes to the Redis '__broadcast__' channel which the WebSocket
|
|
subscriber in the FastAPI process forwards to all connected clients.
|
|
"""
|
|
try:
|
|
import redis as sync_redis
|
|
from app.config import settings
|
|
|
|
r = sync_redis.from_url(settings.redis_url, decode_responses=True)
|
|
depths = {
|
|
"step_processing": r.llen("step_processing"),
|
|
"asset_pipeline": r.llen("asset_pipeline"),
|
|
}
|
|
event = {"type": "queue_update", "depths": depths}
|
|
r.publish("__broadcast__", json.dumps(event))
|
|
r.close()
|
|
logger.debug("Broadcast queue_update: %s", depths)
|
|
except Exception as exc:
|
|
logger.warning("broadcast_queue_status failed: %s", exc)
|
|
|
|
|
|
@shared_task(name="app.tasks.beat_tasks.recover_stuck_cad_files", queue="step_processing")
|
|
def recover_stuck_cad_files() -> None:
|
|
"""Reset CAD files stuck in 'processing' for more than 10 minutes to 'failed'.
|
|
|
|
This recovers from worker crashes (container restarts, OOM kills) that leave
|
|
the processing_status committed as 'processing' with no task running to complete it.
|
|
Runs every 5 minutes via Celery Beat.
|
|
"""
|
|
try:
|
|
from sqlalchemy import create_engine, update, and_
|
|
from sqlalchemy.orm import Session
|
|
from app.config import settings
|
|
from app.models.cad_file import CadFile, ProcessingStatus
|
|
|
|
cutoff = datetime.utcnow() - timedelta(minutes=10)
|
|
sync_url = settings.database_url.replace("+asyncpg", "")
|
|
engine = create_engine(sync_url)
|
|
with Session(engine) as session:
|
|
result = session.execute(
|
|
update(CadFile)
|
|
.where(
|
|
and_(
|
|
CadFile.processing_status == ProcessingStatus.processing,
|
|
CadFile.updated_at < cutoff,
|
|
)
|
|
)
|
|
.values(
|
|
processing_status=ProcessingStatus.failed,
|
|
error_message="Processing timed out — worker may have crashed. Use 'Regenerate Thumbnail' to retry.",
|
|
)
|
|
.returning(CadFile.id, CadFile.original_name)
|
|
)
|
|
rows = result.fetchall()
|
|
session.commit()
|
|
engine.dispose()
|
|
|
|
if rows:
|
|
names = [r[1] for r in rows]
|
|
logger.warning(
|
|
"recover_stuck_cad_files: reset %d stuck file(s) to failed: %s",
|
|
len(rows), names,
|
|
)
|
|
else:
|
|
logger.debug("recover_stuck_cad_files: no stuck files found")
|
|
except Exception as exc:
|
|
logger.error("recover_stuck_cad_files failed: %s", exc)
|
|
|
|
|
|
@shared_task(name="app.tasks.beat_tasks.recover_stalled_renders", queue="step_processing")
|
|
def recover_stalled_renders() -> None:
|
|
"""Detect order_lines stuck in render_status='processing' for longer than
|
|
RENDER_STALL_TIMEOUT_MINUTES (default 30) and mark them as failed.
|
|
|
|
Tries to revoke the Celery task if a celery_task_id is stored in render_job_doc.
|
|
Emits a single batch system alert to admins via CHANNEL_ALERT.
|
|
Runs every 5 minutes via Celery Beat.
|
|
"""
|
|
RENDER_STALL_TIMEOUT_MINUTES = 30
|
|
try:
|
|
from sqlalchemy import create_engine, select
|
|
from sqlalchemy.orm import Session
|
|
from app.config import settings
|
|
from app.domains.orders.models import OrderLine
|
|
from app.domains.notifications.service import emit_notification_sync, CHANNEL_ALERT
|
|
from app.tasks.celery_app import celery_app
|
|
|
|
cutoff = datetime.utcnow() - timedelta(minutes=RENDER_STALL_TIMEOUT_MINUTES)
|
|
sync_url = settings.database_url.replace("+asyncpg", "")
|
|
engine = create_engine(sync_url)
|
|
|
|
stalled_ids: list[str] = []
|
|
|
|
with Session(engine) as session:
|
|
rows = session.execute(
|
|
select(OrderLine).where(
|
|
OrderLine.render_status == "processing",
|
|
OrderLine.render_started_at < cutoff,
|
|
)
|
|
).scalars().all()
|
|
|
|
for line in rows:
|
|
elapsed_minutes = int(
|
|
(datetime.utcnow() - line.render_started_at).total_seconds() / 60
|
|
)
|
|
# Try to revoke the Celery task if we have its ID
|
|
job_doc = line.render_job_doc or {}
|
|
celery_task_id = job_doc.get("celery_task_id")
|
|
if celery_task_id:
|
|
try:
|
|
celery_app.control.revoke(celery_task_id, terminate=True)
|
|
logger.info(
|
|
"[WORKER_RECOVERY] Revoked Celery task %s for order_line %s",
|
|
celery_task_id, line.id,
|
|
)
|
|
except Exception as revoke_exc:
|
|
logger.warning(
|
|
"[WORKER_RECOVERY] Could not revoke task %s: %s",
|
|
celery_task_id, revoke_exc,
|
|
)
|
|
|
|
# Update render_job_doc with failure info
|
|
updated_doc = dict(job_doc)
|
|
updated_doc["state"] = "failed"
|
|
updated_doc["error"] = (
|
|
f"Stalled render terminated by health recovery after {RENDER_STALL_TIMEOUT_MINUTES}min"
|
|
)
|
|
|
|
line.render_status = "failed"
|
|
line.render_job_doc = updated_doc
|
|
|
|
stalled_ids.append(str(line.id))
|
|
logger.warning(
|
|
"[WORKER_RECOVERY] Stalled render for order_line %s terminated after %dmin",
|
|
line.id, elapsed_minutes,
|
|
)
|
|
|
|
if stalled_ids:
|
|
session.commit()
|
|
|
|
engine.dispose()
|
|
|
|
if stalled_ids:
|
|
emit_notification_sync(
|
|
action="worker.recovery",
|
|
entity_type="system",
|
|
details={
|
|
"message": (
|
|
f"Worker recovery: {len(stalled_ids)} stalled render(s) terminated "
|
|
f"after {RENDER_STALL_TIMEOUT_MINUTES}min timeout"
|
|
),
|
|
"stalled_order_line_ids": stalled_ids,
|
|
"count": len(stalled_ids),
|
|
},
|
|
channel=CHANNEL_ALERT,
|
|
)
|
|
logger.warning(
|
|
"recover_stalled_renders: terminated %d stalled render(s): %s",
|
|
len(stalled_ids), stalled_ids,
|
|
)
|
|
else:
|
|
logger.debug("recover_stalled_renders: no stalled renders found")
|
|
except Exception as exc:
|
|
logger.error("recover_stalled_renders failed: %s", exc)
|
|
|
|
|
|
@shared_task(name="app.tasks.beat_tasks.batch_render_notifications", queue="step_processing")
|
|
def batch_render_notifications() -> None:
|
|
"""Check for orders where all lines are terminal but no batch notification emitted yet.
|
|
|
|
This acts as a safety net for batch notifications that may have been missed
|
|
if the order completion hook failed or was skipped. Runs every 60 seconds.
|
|
"""
|
|
try:
|
|
from sqlalchemy import create_engine, select
|
|
from sqlalchemy.orm import Session
|
|
from app.config import settings
|
|
from app.domains.orders.models import Order, OrderStatus
|
|
from app.domains.notifications.models import AuditLog
|
|
from app.domains.notifications.service import (
|
|
emit_batch_render_notification_sync,
|
|
CHANNEL_NOTIFICATION,
|
|
)
|
|
|
|
sync_url = settings.database_url.replace("+asyncpg", "")
|
|
engine = create_engine(sync_url)
|
|
|
|
with Session(engine) as session:
|
|
# Find all completed orders
|
|
completed_order_ids: list[str] = [
|
|
str(oid) for oid in session.execute(
|
|
select(Order.id).where(Order.status == OrderStatus.completed)
|
|
).scalars().all()
|
|
]
|
|
|
|
if not completed_order_ids:
|
|
engine.dispose()
|
|
logger.debug("batch_render_notifications: no completed orders found")
|
|
return
|
|
|
|
# Find which of those already have a batch notification
|
|
notified_ids_raw = session.execute(
|
|
select(AuditLog.entity_id).where(
|
|
AuditLog.entity_type == "order",
|
|
AuditLog.action == "order.completed",
|
|
AuditLog.channel == CHANNEL_NOTIFICATION,
|
|
AuditLog.entity_id.in_(completed_order_ids),
|
|
)
|
|
).scalars().all()
|
|
notified_ids = set(notified_ids_raw)
|
|
|
|
engine.dispose()
|
|
|
|
orders_needing_notification = [
|
|
oid for oid in completed_order_ids if oid not in notified_ids
|
|
]
|
|
|
|
if not orders_needing_notification:
|
|
logger.debug("batch_render_notifications: no orders need batch notification")
|
|
return
|
|
|
|
logger.info(
|
|
"batch_render_notifications: emitting batch notifications for %d order(s)",
|
|
len(orders_needing_notification),
|
|
)
|
|
for order_id in orders_needing_notification:
|
|
try:
|
|
emit_batch_render_notification_sync(str(order_id))
|
|
except Exception as exc:
|
|
logger.error(
|
|
"batch_render_notifications: failed for order %s: %s", order_id, exc
|
|
)
|
|
|
|
except Exception as exc:
|
|
logger.error("batch_render_notifications failed: %s", exc)
|