"""Celery Beat periodic tasks.""" from __future__ import annotations import json import logging from datetime import datetime, timedelta from celery import shared_task logger = logging.getLogger(__name__) @shared_task(name="app.tasks.beat_tasks.broadcast_queue_status", queue="step_processing") def broadcast_queue_status() -> None: """Broadcast current queue depths to all WebSocket clients every 10s. Publishes to the Redis '__broadcast__' channel which the WebSocket subscriber in the FastAPI process forwards to all connected clients. """ try: import redis as sync_redis from app.config import settings r = sync_redis.from_url(settings.redis_url, decode_responses=True) depths = { "step_processing": r.llen("step_processing"), "thumbnail_rendering": r.llen("thumbnail_rendering"), } event = {"type": "queue_update", "depths": depths} r.publish("__broadcast__", json.dumps(event)) r.close() logger.debug("Broadcast queue_update: %s", depths) except Exception as exc: logger.warning("broadcast_queue_status failed: %s", exc) @shared_task(name="app.tasks.beat_tasks.recover_stuck_cad_files", queue="step_processing") def recover_stuck_cad_files() -> None: """Reset CAD files stuck in 'processing' for more than 10 minutes to 'failed'. This recovers from worker crashes (container restarts, OOM kills) that leave the processing_status committed as 'processing' with no task running to complete it. Runs every 5 minutes via Celery Beat. """ try: from sqlalchemy import create_engine, update, and_ from sqlalchemy.orm import Session from app.config import settings from app.models.cad_file import CadFile, ProcessingStatus cutoff = datetime.utcnow() - timedelta(minutes=10) sync_url = settings.database_url.replace("+asyncpg", "") engine = create_engine(sync_url) with Session(engine) as session: result = session.execute( update(CadFile) .where( and_( CadFile.processing_status == ProcessingStatus.processing, CadFile.updated_at < cutoff, ) ) .values( processing_status=ProcessingStatus.failed, error_message="Processing timed out — worker may have crashed. Use 'Regenerate Thumbnail' to retry.", ) .returning(CadFile.id, CadFile.original_name) ) rows = result.fetchall() session.commit() engine.dispose() if rows: names = [r[1] for r in rows] logger.warning( "recover_stuck_cad_files: reset %d stuck file(s) to failed: %s", len(rows), names, ) else: logger.debug("recover_stuck_cad_files: no stuck files found") except Exception as exc: logger.error("recover_stuck_cad_files failed: %s", exc)