"""Celery Beat periodic tasks.""" from __future__ import annotations import json import logging from datetime import datetime, timedelta from celery import shared_task logger = logging.getLogger(__name__) @shared_task(name="app.tasks.beat_tasks.apply_worker_concurrency", queue="step_processing") def apply_worker_concurrency() -> None: """Read worker_configs from DB and broadcast pool_grow to workers. Runs every 5 minutes via Celery Beat. Signals all workers to adjust their pool size to match the max_concurrency setting for each enabled queue. """ try: from sqlalchemy import create_engine, select from sqlalchemy.orm import Session from app.config import settings as app_settings from app.models.worker_config import WorkerConfig from app.tasks.celery_app import celery_app sync_url = app_settings.database_url.replace("+asyncpg", "") eng = create_engine(sync_url) try: with Session(eng) as session: configs = session.execute( select(WorkerConfig).where(WorkerConfig.enabled == True) # noqa: E712 ).scalars().all() for cfg in configs: try: celery_app.control.broadcast( "pool_grow", arguments={"n": cfg.max_concurrency}, destination=None, # all workers reply=False, ) logger.info( "[WORKER_SCALE] Signalled pool_grow n=%d for queue %s", cfg.max_concurrency, cfg.queue_name, ) except Exception as exc: logger.warning( "[WORKER_SCALE] pool_grow failed for %s: %s", cfg.queue_name, exc, ) finally: eng.dispose() except Exception as exc: logger.error("apply_worker_concurrency failed: %s", exc) @shared_task(name="app.tasks.beat_tasks.broadcast_queue_status", queue="step_processing") def broadcast_queue_status() -> None: """Broadcast current queue depths to all WebSocket clients every 10s. Publishes to the Redis '__broadcast__' channel which the WebSocket subscriber in the FastAPI process forwards to all connected clients. """ try: import redis as sync_redis from app.config import settings r = sync_redis.from_url(settings.redis_url, decode_responses=True) depths = { "step_processing": r.llen("step_processing"), "thumbnail_rendering": r.llen("thumbnail_rendering"), } event = {"type": "queue_update", "depths": depths} r.publish("__broadcast__", json.dumps(event)) r.close() logger.debug("Broadcast queue_update: %s", depths) except Exception as exc: logger.warning("broadcast_queue_status failed: %s", exc) @shared_task(name="app.tasks.beat_tasks.recover_stuck_cad_files", queue="step_processing") def recover_stuck_cad_files() -> None: """Reset CAD files stuck in 'processing' for more than 10 minutes to 'failed'. This recovers from worker crashes (container restarts, OOM kills) that leave the processing_status committed as 'processing' with no task running to complete it. Runs every 5 minutes via Celery Beat. """ try: from sqlalchemy import create_engine, update, and_ from sqlalchemy.orm import Session from app.config import settings from app.models.cad_file import CadFile, ProcessingStatus cutoff = datetime.utcnow() - timedelta(minutes=10) sync_url = settings.database_url.replace("+asyncpg", "") engine = create_engine(sync_url) with Session(engine) as session: result = session.execute( update(CadFile) .where( and_( CadFile.processing_status == ProcessingStatus.processing, CadFile.updated_at < cutoff, ) ) .values( processing_status=ProcessingStatus.failed, error_message="Processing timed out — worker may have crashed. Use 'Regenerate Thumbnail' to retry.", ) .returning(CadFile.id, CadFile.original_name) ) rows = result.fetchall() session.commit() engine.dispose() if rows: names = [r[1] for r in rows] logger.warning( "recover_stuck_cad_files: reset %d stuck file(s) to failed: %s", len(rows), names, ) else: logger.debug("recover_stuck_cad_files: no stuck files found") except Exception as exc: logger.error("recover_stuck_cad_files failed: %s", exc) @shared_task(name="app.tasks.beat_tasks.recover_stalled_renders", queue="step_processing") def recover_stalled_renders() -> None: """Detect order_lines stuck in render_status='processing' for longer than RENDER_STALL_TIMEOUT_MINUTES (default 30) and mark them as failed. Tries to revoke the Celery task if a celery_task_id is stored in render_job_doc. Emits a single batch system alert to admins via CHANNEL_ALERT. Runs every 5 minutes via Celery Beat. """ RENDER_STALL_TIMEOUT_MINUTES = 30 try: from sqlalchemy import create_engine, select from sqlalchemy.orm import Session from app.config import settings from app.domains.orders.models import OrderLine from app.domains.notifications.service import emit_notification_sync, CHANNEL_ALERT from app.tasks.celery_app import celery_app cutoff = datetime.utcnow() - timedelta(minutes=RENDER_STALL_TIMEOUT_MINUTES) sync_url = settings.database_url.replace("+asyncpg", "") engine = create_engine(sync_url) stalled_ids: list[str] = [] with Session(engine) as session: rows = session.execute( select(OrderLine).where( OrderLine.render_status == "processing", OrderLine.render_started_at < cutoff, ) ).scalars().all() for line in rows: elapsed_minutes = int( (datetime.utcnow() - line.render_started_at).total_seconds() / 60 ) # Try to revoke the Celery task if we have its ID job_doc = line.render_job_doc or {} celery_task_id = job_doc.get("celery_task_id") if celery_task_id: try: celery_app.control.revoke(celery_task_id, terminate=True) logger.info( "[WORKER_RECOVERY] Revoked Celery task %s for order_line %s", celery_task_id, line.id, ) except Exception as revoke_exc: logger.warning( "[WORKER_RECOVERY] Could not revoke task %s: %s", celery_task_id, revoke_exc, ) # Update render_job_doc with failure info updated_doc = dict(job_doc) updated_doc["state"] = "failed" updated_doc["error"] = ( f"Stalled render terminated by health recovery after {RENDER_STALL_TIMEOUT_MINUTES}min" ) line.render_status = "failed" line.render_job_doc = updated_doc stalled_ids.append(str(line.id)) logger.warning( "[WORKER_RECOVERY] Stalled render for order_line %s terminated after %dmin", line.id, elapsed_minutes, ) if stalled_ids: session.commit() engine.dispose() if stalled_ids: emit_notification_sync( action="worker.recovery", entity_type="system", details={ "message": ( f"Worker recovery: {len(stalled_ids)} stalled render(s) terminated " f"after {RENDER_STALL_TIMEOUT_MINUTES}min timeout" ), "stalled_order_line_ids": stalled_ids, "count": len(stalled_ids), }, channel=CHANNEL_ALERT, ) logger.warning( "recover_stalled_renders: terminated %d stalled render(s): %s", len(stalled_ids), stalled_ids, ) else: logger.debug("recover_stalled_renders: no stalled renders found") except Exception as exc: logger.error("recover_stalled_renders failed: %s", exc) @shared_task(name="app.tasks.beat_tasks.batch_render_notifications", queue="step_processing") def batch_render_notifications() -> None: """Check for orders where all lines are terminal but no batch notification emitted yet. This acts as a safety net for batch notifications that may have been missed if the order completion hook failed or was skipped. Runs every 60 seconds. """ try: from sqlalchemy import create_engine, select from sqlalchemy.orm import Session from app.config import settings from app.domains.orders.models import Order, OrderStatus from app.domains.notifications.models import AuditLog from app.domains.notifications.service import ( emit_batch_render_notification_sync, CHANNEL_NOTIFICATION, ) sync_url = settings.database_url.replace("+asyncpg", "") engine = create_engine(sync_url) with Session(engine) as session: # Find all completed orders completed_order_ids: list[str] = [ str(oid) for oid in session.execute( select(Order.id).where(Order.status == OrderStatus.completed) ).scalars().all() ] if not completed_order_ids: engine.dispose() logger.debug("batch_render_notifications: no completed orders found") return # Find which of those already have a batch notification notified_ids_raw = session.execute( select(AuditLog.entity_id).where( AuditLog.entity_type == "order", AuditLog.action == "order.completed", AuditLog.channel == CHANNEL_NOTIFICATION, AuditLog.entity_id.in_(completed_order_ids), ) ).scalars().all() notified_ids = set(notified_ids_raw) engine.dispose() orders_needing_notification = [ oid for oid in completed_order_ids if oid not in notified_ids ] if not orders_needing_notification: logger.debug("batch_render_notifications: no orders need batch notification") return logger.info( "batch_render_notifications: emitting batch notifications for %d order(s)", len(orders_needing_notification), ) for order_id in orders_needing_notification: try: emit_batch_render_notification_sync(str(order_id)) except Exception as exc: logger.error( "batch_render_notifications: failed for order %s: %s", order_id, exc ) except Exception as exc: logger.error("batch_render_notifications failed: %s", exc)