refactor(A1): remove Flamenco, simplify render pipeline to Celery-only
- Remove flamenco-manager and flamenco-worker from docker-compose.yml - Delete flamenco_client.py, flamenco_tasks.py, docker_scaler.py - Simplify render_dispatcher.py to Celery-only (removes ~300 lines) - Remove Flamenco beat schedule from celery_app.py - Clean admin.py: remove flamenco settings, endpoints, threejs validation - Clean orders.py cancel-render: Celery revoke only - Clean worker.py: remove flamenco_job_id from activity response - Migration 032: cancel lingering flamenco jobs, remove flamenco settings - PLAN.md: mark all decisions confirmed, status IN UMSETZUNG Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -5,7 +5,7 @@ celery_app = Celery(
|
||||
"schaefflerautomat",
|
||||
broker=settings.redis_url,
|
||||
backend=settings.redis_url,
|
||||
include=["app.tasks.step_tasks", "app.tasks.ai_tasks", "app.tasks.flamenco_tasks"],
|
||||
include=["app.tasks.step_tasks", "app.tasks.ai_tasks"],
|
||||
)
|
||||
|
||||
celery_app.conf.update(
|
||||
@@ -17,20 +17,6 @@ celery_app.conf.update(
|
||||
task_routes={
|
||||
"app.tasks.step_tasks.*": {"queue": "step_processing"},
|
||||
"app.tasks.ai_tasks.*": {"queue": "ai_validation"},
|
||||
"app.tasks.flamenco_tasks.*": {"queue": "step_processing"},
|
||||
},
|
||||
beat_schedule={
|
||||
"poll-flamenco-jobs": {
|
||||
"task": "app.tasks.flamenco_tasks.poll_flamenco_jobs",
|
||||
"schedule": 10.0, # every 10 seconds
|
||||
# Discard if not consumed before the next run; prevents queue build-up
|
||||
# when workers are busy with long-running STEP/render tasks.
|
||||
"options": {"expires": 9},
|
||||
},
|
||||
"check-stalled-renders": {
|
||||
"task": "app.tasks.flamenco_tasks.check_stalled_renders",
|
||||
"schedule": 300.0, # every 5 minutes
|
||||
"options": {"expires": 290},
|
||||
},
|
||||
},
|
||||
beat_schedule={},
|
||||
)
|
||||
|
||||
@@ -1,335 +0,0 @@
|
||||
"""Celery tasks for polling Flamenco job status and watchdog recovery."""
|
||||
import logging
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from app.tasks.celery_app import celery_app
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Flamenco status → our render_status mapping
|
||||
FLAMENCO_STATUS_MAP = {
|
||||
"queued": "processing",
|
||||
"active": "processing",
|
||||
"completed": "completed",
|
||||
"failed": "failed",
|
||||
"canceled": "failed",
|
||||
"cancel-requested": "processing",
|
||||
"paused": "processing",
|
||||
}
|
||||
|
||||
|
||||
@celery_app.task(name="app.tasks.flamenco_tasks.poll_flamenco_jobs", queue="step_processing")
|
||||
def poll_flamenco_jobs():
|
||||
"""Poll Flamenco Manager for active render jobs and update OrderLine status.
|
||||
|
||||
Runs on a Celery Beat schedule (every 10 seconds).
|
||||
|
||||
Uses a Redis lock (TTL=9s) to ensure at most one poll executes per 10-second
|
||||
window. When the queue backs up with many duplicates (e.g. all workers are
|
||||
busy with long STEP/render tasks), duplicates acquire the lock, find it taken,
|
||||
and return immediately — draining the queue without doing redundant work.
|
||||
"""
|
||||
import redis as redis_lib
|
||||
from app.config import settings as app_settings
|
||||
|
||||
# Deduplicate: skip if a poll ran within the last 9 seconds
|
||||
try:
|
||||
r = redis_lib.from_url(app_settings.redis_url)
|
||||
acquired = r.set("flamenco_poll_lock", "1", nx=True, ex=9)
|
||||
if not acquired:
|
||||
return {"skipped": "deduplicated"}
|
||||
except Exception:
|
||||
pass # Redis unavailable — proceed anyway
|
||||
|
||||
from sqlalchemy import create_engine, select, update as sql_update
|
||||
from sqlalchemy.orm import Session
|
||||
from app.models.order_line import OrderLine
|
||||
from app.models.system_setting import SystemSetting
|
||||
from app.services.flamenco_client import get_flamenco_client
|
||||
|
||||
sync_url = app_settings.database_url.replace("+asyncpg", "")
|
||||
engine = create_engine(sync_url)
|
||||
|
||||
# Track orders whose lines transitioned to a terminal state
|
||||
completed_order_ids = set()
|
||||
|
||||
with Session(engine) as session:
|
||||
# Load Flamenco Manager URL
|
||||
row = session.execute(
|
||||
select(SystemSetting).where(SystemSetting.key == "flamenco_manager_url")
|
||||
).scalar_one_or_none()
|
||||
manager_url = row.value if row else "http://flamenco-manager:8080"
|
||||
|
||||
# Find all OrderLines dispatched to Flamenco that are still processing
|
||||
lines = session.execute(
|
||||
select(OrderLine).where(
|
||||
OrderLine.render_backend_used == "flamenco",
|
||||
OrderLine.render_status == "processing",
|
||||
OrderLine.flamenco_job_id.isnot(None),
|
||||
)
|
||||
).scalars().all()
|
||||
|
||||
if not lines:
|
||||
engine.dispose()
|
||||
return {"polled": 0}
|
||||
|
||||
client = get_flamenco_client(manager_url)
|
||||
updated = 0
|
||||
|
||||
for line in lines:
|
||||
try:
|
||||
job = client.get_job(line.flamenco_job_id)
|
||||
flamenco_status = job.get("status", "")
|
||||
our_status = FLAMENCO_STATUS_MAP.get(flamenco_status, "processing")
|
||||
|
||||
if our_status == line.render_status:
|
||||
continue # No change
|
||||
|
||||
updates = {"render_status": our_status}
|
||||
|
||||
if our_status == "completed":
|
||||
updates["render_completed_at"] = datetime.utcnow()
|
||||
# Try to extract result path from job activity
|
||||
activity = job.get("activity", "")
|
||||
if activity:
|
||||
updates["render_log"] = {
|
||||
"flamenco_job_id": line.flamenco_job_id,
|
||||
"flamenco_status": flamenco_status,
|
||||
"activity": activity,
|
||||
}
|
||||
# Set result path based on job type
|
||||
job_type = job.get("type", "")
|
||||
metadata = job.get("metadata", {})
|
||||
if job_type == "schaeffler-turntable":
|
||||
output_dir = job.get("settings", {}).get("output_dir", "")
|
||||
output_name = job.get("settings", {}).get("output_name", "turntable")
|
||||
updates["result_path"] = f"{output_dir}/{output_name}.mp4"
|
||||
elif job_type == "schaeffler-still":
|
||||
updates["result_path"] = job.get("settings", {}).get("output_path", "")
|
||||
|
||||
elif our_status == "failed":
|
||||
updates["render_completed_at"] = datetime.utcnow()
|
||||
updates["render_log"] = {
|
||||
"flamenco_job_id": line.flamenco_job_id,
|
||||
"flamenco_status": flamenco_status,
|
||||
"error": job.get("activity", "Job failed"),
|
||||
}
|
||||
|
||||
session.execute(
|
||||
sql_update(OrderLine)
|
||||
.where(OrderLine.id == line.id)
|
||||
.values(**updates)
|
||||
)
|
||||
updated += 1
|
||||
logger.info(
|
||||
f"Flamenco job {line.flamenco_job_id}: "
|
||||
f"{flamenco_status} → render_status={our_status}"
|
||||
)
|
||||
|
||||
# Track orders with lines that reached a terminal state
|
||||
if our_status in ("completed", "failed"):
|
||||
completed_order_ids.add(str(line.order_id))
|
||||
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
f"Failed to poll Flamenco job {line.flamenco_job_id}: {exc}"
|
||||
)
|
||||
|
||||
if updated:
|
||||
session.commit()
|
||||
|
||||
engine.dispose()
|
||||
|
||||
# Auto-advance orders if all renderable lines are done
|
||||
if completed_order_ids:
|
||||
from app.services.order_status_service import check_order_completion
|
||||
for oid in completed_order_ids:
|
||||
check_order_completion(oid)
|
||||
|
||||
return {"polled": len(lines), "updated": updated}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Stalled-render watchdog
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@celery_app.task(name="app.tasks.flamenco_tasks.check_stalled_renders", queue="step_processing")
|
||||
def check_stalled_renders():
|
||||
"""Watchdog: detect and re-dispatch render jobs stuck in 'processing'.
|
||||
|
||||
Runs on a Celery Beat schedule (every 5 minutes).
|
||||
|
||||
After a docker restart, Celery workers lose in-flight tasks — the DB still
|
||||
shows render_status='processing' indefinitely. This task:
|
||||
|
||||
* For **Celery** lines: uses Celery inspect to check whether any worker is
|
||||
still actively executing the task. If not (e.g. after a restart), and
|
||||
the job has been stuck longer than ``render_stall_timeout_minutes``
|
||||
(default: 120 min), it is reset to 'pending' and re-dispatched.
|
||||
|
||||
* For **Flamenco** lines: queries the Flamenco Manager. If the manager
|
||||
reports the job as still active the line is left alone; if the job is
|
||||
gone or in a terminal/error state it is re-dispatched.
|
||||
"""
|
||||
from sqlalchemy import create_engine, select, update as sql_update
|
||||
from sqlalchemy.orm import Session
|
||||
from app.config import settings as app_settings
|
||||
from app.models.order_line import OrderLine
|
||||
from app.models.system_setting import SystemSetting
|
||||
|
||||
sync_url = app_settings.database_url.replace("+asyncpg", "")
|
||||
engine = create_engine(sync_url)
|
||||
|
||||
with Session(engine) as session:
|
||||
# ── Read timeout from system settings ────────────────────────────────
|
||||
row = session.execute(
|
||||
select(SystemSetting).where(SystemSetting.key == "render_stall_timeout_minutes")
|
||||
).scalar_one_or_none()
|
||||
try:
|
||||
timeout_minutes = int(row.value) if row else 120
|
||||
except (ValueError, TypeError):
|
||||
timeout_minutes = 120
|
||||
|
||||
cutoff = datetime.utcnow() - timedelta(minutes=timeout_minutes)
|
||||
|
||||
stalled_lines = session.execute(
|
||||
select(OrderLine).where(
|
||||
OrderLine.render_status == "processing",
|
||||
OrderLine.render_started_at.isnot(None),
|
||||
OrderLine.render_started_at < cutoff,
|
||||
)
|
||||
).scalars().all()
|
||||
|
||||
if not stalled_lines:
|
||||
engine.dispose()
|
||||
return {"checked": 0, "restarted": 0, "timeout_minutes": timeout_minutes}
|
||||
|
||||
logger.info(
|
||||
"[watchdog] Found %d stalled render(s) older than %d minutes",
|
||||
len(stalled_lines), timeout_minutes,
|
||||
)
|
||||
|
||||
# ── Build set of order_line_ids actively running on Celery workers ───
|
||||
active_celery_line_ids: set[str] = set()
|
||||
inspect_ok = False
|
||||
try:
|
||||
inspect = celery_app.control.inspect(timeout=2)
|
||||
active_tasks = inspect.active() or {}
|
||||
for worker_tasks in active_tasks.values():
|
||||
for task_info in (worker_tasks or []):
|
||||
args = task_info.get("args", [])
|
||||
if args:
|
||||
active_celery_line_ids.add(str(args[0]))
|
||||
inspect_ok = True
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"[watchdog] Celery inspect failed (%s) — will re-dispatch all timed-out Celery jobs",
|
||||
exc,
|
||||
)
|
||||
|
||||
# ── Load Flamenco Manager URL ─────────────────────────────────────────
|
||||
manager_url = "http://flamenco-manager:8080"
|
||||
try:
|
||||
url_row = session.execute(
|
||||
select(SystemSetting).where(SystemSetting.key == "flamenco_manager_url")
|
||||
).scalar_one_or_none()
|
||||
if url_row:
|
||||
manager_url = url_row.value
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# ── Decide which lines to restart ────────────────────────────────────
|
||||
to_restart: list[OrderLine] = []
|
||||
|
||||
for line in stalled_lines:
|
||||
line_id = str(line.id)
|
||||
|
||||
if line.flamenco_job_id:
|
||||
# Flamenco job: verify with manager before re-dispatching
|
||||
try:
|
||||
from app.services.flamenco_client import get_flamenco_client
|
||||
client = get_flamenco_client(manager_url)
|
||||
job = client.get_job(line.flamenco_job_id)
|
||||
flamenco_status = job.get("status", "")
|
||||
if flamenco_status in (
|
||||
"active", "queued", "paused",
|
||||
"pause-requested", "cancel-requested",
|
||||
):
|
||||
logger.info(
|
||||
"[watchdog] Flamenco job %s is still %s — skipping line %s",
|
||||
line.flamenco_job_id, flamenco_status, line_id,
|
||||
)
|
||||
continue
|
||||
logger.info(
|
||||
"[watchdog] Flamenco job %s status=%r → re-dispatching line %s",
|
||||
line.flamenco_job_id, flamenco_status, line_id,
|
||||
)
|
||||
except Exception as exc:
|
||||
# Manager unreachable — skip to avoid false restarts
|
||||
logger.warning(
|
||||
"[watchdog] Cannot reach Flamenco for job %s (%s) — skipping line %s",
|
||||
line.flamenco_job_id, exc, line_id,
|
||||
)
|
||||
continue
|
||||
else:
|
||||
# Celery job: skip if still actively running on a worker
|
||||
if inspect_ok and line_id in active_celery_line_ids:
|
||||
logger.info(
|
||||
"[watchdog] Celery render for line %s still active — skipping", line_id
|
||||
)
|
||||
continue
|
||||
logger.info(
|
||||
"[watchdog] Celery render for line %s not found in active tasks — re-dispatching",
|
||||
line_id,
|
||||
)
|
||||
|
||||
to_restart.append(line)
|
||||
|
||||
if not to_restart:
|
||||
engine.dispose()
|
||||
return {
|
||||
"checked": len(stalled_lines),
|
||||
"restarted": 0,
|
||||
"timeout_minutes": timeout_minutes,
|
||||
}
|
||||
|
||||
# ── Reset stalled lines to pending ───────────────────────────────────
|
||||
for line in to_restart:
|
||||
session.execute(
|
||||
sql_update(OrderLine)
|
||||
.where(OrderLine.id == line.id)
|
||||
.values(
|
||||
render_status="pending",
|
||||
render_started_at=None,
|
||||
render_backend_used=None,
|
||||
flamenco_job_id=None,
|
||||
render_log={
|
||||
"watchdog": (
|
||||
f"Auto-restarted after {timeout_minutes} min stall "
|
||||
f"(previous backend: {line.render_backend_used or 'unknown'})"
|
||||
)
|
||||
},
|
||||
)
|
||||
)
|
||||
session.commit()
|
||||
|
||||
engine.dispose()
|
||||
|
||||
# ── Re-dispatch outside DB session ───────────────────────────────────────
|
||||
from app.services.render_dispatcher import dispatch_render
|
||||
restarted = 0
|
||||
for line in to_restart:
|
||||
try:
|
||||
dispatch_render(str(line.id))
|
||||
restarted += 1
|
||||
logger.info("[watchdog] Re-dispatched render for order line %s", line.id)
|
||||
except Exception as exc:
|
||||
logger.error(
|
||||
"[watchdog] Failed to re-dispatch line %s: %s — left as pending", line.id, exc
|
||||
)
|
||||
|
||||
return {
|
||||
"checked": len(stalled_lines),
|
||||
"restarted": restarted,
|
||||
"timeout_minutes": timeout_minutes,
|
||||
}
|
||||
Reference in New Issue
Block a user