"""Celery Beat periodic tasks.""" from __future__ import annotations import json import logging from datetime import datetime, timedelta from celery import shared_task logger = logging.getLogger(__name__) @shared_task(name="app.tasks.beat_tasks.broadcast_queue_status", queue="step_processing") def broadcast_queue_status() -> None: """Broadcast current queue depths to all WebSocket clients every 10s. Publishes to the Redis '__broadcast__' channel which the WebSocket subscriber in the FastAPI process forwards to all connected clients. """ try: import redis as sync_redis from app.config import settings r = sync_redis.from_url(settings.redis_url, decode_responses=True) depths = { "step_processing": r.llen("step_processing"), "thumbnail_rendering": r.llen("thumbnail_rendering"), } event = {"type": "queue_update", "depths": depths} r.publish("__broadcast__", json.dumps(event)) r.close() logger.debug("Broadcast queue_update: %s", depths) except Exception as exc: logger.warning("broadcast_queue_status failed: %s", exc) @shared_task(name="app.tasks.beat_tasks.recover_stuck_cad_files", queue="step_processing") def recover_stuck_cad_files() -> None: """Reset CAD files stuck in 'processing' for more than 10 minutes to 'failed'. This recovers from worker crashes (container restarts, OOM kills) that leave the processing_status committed as 'processing' with no task running to complete it. Runs every 5 minutes via Celery Beat. """ try: from sqlalchemy import create_engine, update, and_ from sqlalchemy.orm import Session from app.config import settings from app.models.cad_file import CadFile, ProcessingStatus cutoff = datetime.utcnow() - timedelta(minutes=10) sync_url = settings.database_url.replace("+asyncpg", "") engine = create_engine(sync_url) with Session(engine) as session: result = session.execute( update(CadFile) .where( and_( CadFile.processing_status == ProcessingStatus.processing, CadFile.updated_at < cutoff, ) ) .values( processing_status=ProcessingStatus.failed, error_message="Processing timed out — worker may have crashed. Use 'Regenerate Thumbnail' to retry.", ) .returning(CadFile.id, CadFile.original_name) ) rows = result.fetchall() session.commit() engine.dispose() if rows: names = [r[1] for r in rows] logger.warning( "recover_stuck_cad_files: reset %d stuck file(s) to failed: %s", len(rows), names, ) else: logger.debug("recover_stuck_cad_files: no stuck files found") except Exception as exc: logger.error("recover_stuck_cad_files failed: %s", exc) @shared_task(name="app.tasks.beat_tasks.batch_render_notifications", queue="step_processing") def batch_render_notifications() -> None: """Check for orders where all lines are terminal but no batch notification emitted yet. This acts as a safety net for batch notifications that may have been missed if the order completion hook failed or was skipped. Runs every 60 seconds. """ try: from sqlalchemy import create_engine, select from sqlalchemy.orm import Session from app.config import settings from app.domains.orders.models import Order, OrderStatus from app.domains.notifications.models import AuditLog from app.domains.notifications.service import ( emit_batch_render_notification_sync, CHANNEL_NOTIFICATION, ) sync_url = settings.database_url.replace("+asyncpg", "") engine = create_engine(sync_url) with Session(engine) as session: # Find all completed orders completed_order_ids: list[str] = [ str(oid) for oid in session.execute( select(Order.id).where(Order.status == OrderStatus.completed) ).scalars().all() ] if not completed_order_ids: engine.dispose() logger.debug("batch_render_notifications: no completed orders found") return # Find which of those already have a batch notification notified_ids_raw = session.execute( select(AuditLog.entity_id).where( AuditLog.entity_type == "order", AuditLog.action == "order.completed", AuditLog.channel == CHANNEL_NOTIFICATION, AuditLog.entity_id.in_(completed_order_ids), ) ).scalars().all() notified_ids = set(notified_ids_raw) engine.dispose() orders_needing_notification = [ oid for oid in completed_order_ids if oid not in notified_ids ] if not orders_needing_notification: logger.debug("batch_render_notifications: no orders need batch notification") return logger.info( "batch_render_notifications: emitting batch notifications for %d order(s)", len(orders_needing_notification), ) for order_id in orders_needing_notification: try: emit_batch_render_notification_sync(str(order_id)) except Exception as exc: logger.error( "batch_render_notifications: failed for order %s: %s", order_id, exc ) except Exception as exc: logger.error("batch_render_notifications failed: %s", exc)