"""Celery tasks for polling Flamenco job status and watchdog recovery.""" import logging from datetime import datetime, timedelta from app.tasks.celery_app import celery_app logger = logging.getLogger(__name__) # Flamenco status → our render_status mapping FLAMENCO_STATUS_MAP = { "queued": "processing", "active": "processing", "completed": "completed", "failed": "failed", "canceled": "failed", "cancel-requested": "processing", "paused": "processing", } @celery_app.task(name="app.tasks.flamenco_tasks.poll_flamenco_jobs", queue="step_processing") def poll_flamenco_jobs(): """Poll Flamenco Manager for active render jobs and update OrderLine status. Runs on a Celery Beat schedule (every 10 seconds). Uses a Redis lock (TTL=9s) to ensure at most one poll executes per 10-second window. When the queue backs up with many duplicates (e.g. all workers are busy with long STEP/render tasks), duplicates acquire the lock, find it taken, and return immediately — draining the queue without doing redundant work. """ import redis as redis_lib from app.config import settings as app_settings # Deduplicate: skip if a poll ran within the last 9 seconds try: r = redis_lib.from_url(app_settings.redis_url) acquired = r.set("flamenco_poll_lock", "1", nx=True, ex=9) if not acquired: return {"skipped": "deduplicated"} except Exception: pass # Redis unavailable — proceed anyway from sqlalchemy import create_engine, select, update as sql_update from sqlalchemy.orm import Session from app.models.order_line import OrderLine from app.models.system_setting import SystemSetting from app.services.flamenco_client import get_flamenco_client sync_url = app_settings.database_url.replace("+asyncpg", "") engine = create_engine(sync_url) # Track orders whose lines transitioned to a terminal state completed_order_ids = set() with Session(engine) as session: # Load Flamenco Manager URL row = session.execute( select(SystemSetting).where(SystemSetting.key == "flamenco_manager_url") ).scalar_one_or_none() manager_url = row.value if row else "http://flamenco-manager:8080" # Find all OrderLines dispatched to Flamenco that are still processing lines = session.execute( select(OrderLine).where( OrderLine.render_backend_used == "flamenco", OrderLine.render_status == "processing", OrderLine.flamenco_job_id.isnot(None), ) ).scalars().all() if not lines: engine.dispose() return {"polled": 0} client = get_flamenco_client(manager_url) updated = 0 for line in lines: try: job = client.get_job(line.flamenco_job_id) flamenco_status = job.get("status", "") our_status = FLAMENCO_STATUS_MAP.get(flamenco_status, "processing") if our_status == line.render_status: continue # No change updates = {"render_status": our_status} if our_status == "completed": updates["render_completed_at"] = datetime.utcnow() # Try to extract result path from job activity activity = job.get("activity", "") if activity: updates["render_log"] = { "flamenco_job_id": line.flamenco_job_id, "flamenco_status": flamenco_status, "activity": activity, } # Set result path based on job type job_type = job.get("type", "") metadata = job.get("metadata", {}) if job_type == "schaeffler-turntable": output_dir = job.get("settings", {}).get("output_dir", "") output_name = job.get("settings", {}).get("output_name", "turntable") updates["result_path"] = f"{output_dir}/{output_name}.mp4" elif job_type == "schaeffler-still": updates["result_path"] = job.get("settings", {}).get("output_path", "") elif our_status == "failed": updates["render_completed_at"] = datetime.utcnow() updates["render_log"] = { "flamenco_job_id": line.flamenco_job_id, "flamenco_status": flamenco_status, "error": job.get("activity", "Job failed"), } session.execute( sql_update(OrderLine) .where(OrderLine.id == line.id) .values(**updates) ) updated += 1 logger.info( f"Flamenco job {line.flamenco_job_id}: " f"{flamenco_status} → render_status={our_status}" ) # Track orders with lines that reached a terminal state if our_status in ("completed", "failed"): completed_order_ids.add(str(line.order_id)) except Exception as exc: logger.warning( f"Failed to poll Flamenco job {line.flamenco_job_id}: {exc}" ) if updated: session.commit() engine.dispose() # Auto-advance orders if all renderable lines are done if completed_order_ids: from app.services.order_status_service import check_order_completion for oid in completed_order_ids: check_order_completion(oid) return {"polled": len(lines), "updated": updated} # --------------------------------------------------------------------------- # Stalled-render watchdog # --------------------------------------------------------------------------- @celery_app.task(name="app.tasks.flamenco_tasks.check_stalled_renders", queue="step_processing") def check_stalled_renders(): """Watchdog: detect and re-dispatch render jobs stuck in 'processing'. Runs on a Celery Beat schedule (every 5 minutes). After a docker restart, Celery workers lose in-flight tasks — the DB still shows render_status='processing' indefinitely. This task: * For **Celery** lines: uses Celery inspect to check whether any worker is still actively executing the task. If not (e.g. after a restart), and the job has been stuck longer than ``render_stall_timeout_minutes`` (default: 120 min), it is reset to 'pending' and re-dispatched. * For **Flamenco** lines: queries the Flamenco Manager. If the manager reports the job as still active the line is left alone; if the job is gone or in a terminal/error state it is re-dispatched. """ from sqlalchemy import create_engine, select, update as sql_update from sqlalchemy.orm import Session from app.config import settings as app_settings from app.models.order_line import OrderLine from app.models.system_setting import SystemSetting sync_url = app_settings.database_url.replace("+asyncpg", "") engine = create_engine(sync_url) with Session(engine) as session: # ── Read timeout from system settings ──────────────────────────────── row = session.execute( select(SystemSetting).where(SystemSetting.key == "render_stall_timeout_minutes") ).scalar_one_or_none() try: timeout_minutes = int(row.value) if row else 120 except (ValueError, TypeError): timeout_minutes = 120 cutoff = datetime.utcnow() - timedelta(minutes=timeout_minutes) stalled_lines = session.execute( select(OrderLine).where( OrderLine.render_status == "processing", OrderLine.render_started_at.isnot(None), OrderLine.render_started_at < cutoff, ) ).scalars().all() if not stalled_lines: engine.dispose() return {"checked": 0, "restarted": 0, "timeout_minutes": timeout_minutes} logger.info( "[watchdog] Found %d stalled render(s) older than %d minutes", len(stalled_lines), timeout_minutes, ) # ── Build set of order_line_ids actively running on Celery workers ─── active_celery_line_ids: set[str] = set() inspect_ok = False try: inspect = celery_app.control.inspect(timeout=2) active_tasks = inspect.active() or {} for worker_tasks in active_tasks.values(): for task_info in (worker_tasks or []): args = task_info.get("args", []) if args: active_celery_line_ids.add(str(args[0])) inspect_ok = True except Exception as exc: logger.warning( "[watchdog] Celery inspect failed (%s) — will re-dispatch all timed-out Celery jobs", exc, ) # ── Load Flamenco Manager URL ───────────────────────────────────────── manager_url = "http://flamenco-manager:8080" try: url_row = session.execute( select(SystemSetting).where(SystemSetting.key == "flamenco_manager_url") ).scalar_one_or_none() if url_row: manager_url = url_row.value except Exception: pass # ── Decide which lines to restart ──────────────────────────────────── to_restart: list[OrderLine] = [] for line in stalled_lines: line_id = str(line.id) if line.flamenco_job_id: # Flamenco job: verify with manager before re-dispatching try: from app.services.flamenco_client import get_flamenco_client client = get_flamenco_client(manager_url) job = client.get_job(line.flamenco_job_id) flamenco_status = job.get("status", "") if flamenco_status in ( "active", "queued", "paused", "pause-requested", "cancel-requested", ): logger.info( "[watchdog] Flamenco job %s is still %s — skipping line %s", line.flamenco_job_id, flamenco_status, line_id, ) continue logger.info( "[watchdog] Flamenco job %s status=%r → re-dispatching line %s", line.flamenco_job_id, flamenco_status, line_id, ) except Exception as exc: # Manager unreachable — skip to avoid false restarts logger.warning( "[watchdog] Cannot reach Flamenco for job %s (%s) — skipping line %s", line.flamenco_job_id, exc, line_id, ) continue else: # Celery job: skip if still actively running on a worker if inspect_ok and line_id in active_celery_line_ids: logger.info( "[watchdog] Celery render for line %s still active — skipping", line_id ) continue logger.info( "[watchdog] Celery render for line %s not found in active tasks — re-dispatching", line_id, ) to_restart.append(line) if not to_restart: engine.dispose() return { "checked": len(stalled_lines), "restarted": 0, "timeout_minutes": timeout_minutes, } # ── Reset stalled lines to pending ─────────────────────────────────── for line in to_restart: session.execute( sql_update(OrderLine) .where(OrderLine.id == line.id) .values( render_status="pending", render_started_at=None, render_backend_used=None, flamenco_job_id=None, render_log={ "watchdog": ( f"Auto-restarted after {timeout_minutes} min stall " f"(previous backend: {line.render_backend_used or 'unknown'})" ) }, ) ) session.commit() engine.dispose() # ── Re-dispatch outside DB session ─────────────────────────────────────── from app.services.render_dispatcher import dispatch_render restarted = 0 for line in to_restart: try: dispatch_render(str(line.id)) restarted += 1 logger.info("[watchdog] Re-dispatched render for order line %s", line.id) except Exception as exc: logger.error( "[watchdog] Failed to re-dispatch line %s: %s — left as pending", line.id, exc ) return { "checked": len(stalled_lines), "restarted": restarted, "timeout_minutes": timeout_minutes, }