diff --git a/backend/app/tasks/beat_tasks.py b/backend/app/tasks/beat_tasks.py index 9c38022..dbad854 100644 --- a/backend/app/tasks/beat_tasks.py +++ b/backend/app/tasks/beat_tasks.py @@ -82,6 +82,103 @@ def recover_stuck_cad_files() -> None: logger.error("recover_stuck_cad_files failed: %s", exc) +@shared_task(name="app.tasks.beat_tasks.recover_stalled_renders", queue="step_processing") +def recover_stalled_renders() -> None: + """Detect order_lines stuck in render_status='processing' for longer than + RENDER_STALL_TIMEOUT_MINUTES (default 30) and mark them as failed. + + Tries to revoke the Celery task if a celery_task_id is stored in render_job_doc. + Emits a single batch system alert to admins via CHANNEL_ALERT. + Runs every 5 minutes via Celery Beat. + """ + RENDER_STALL_TIMEOUT_MINUTES = 30 + try: + from sqlalchemy import create_engine, select + from sqlalchemy.orm import Session + from app.config import settings + from app.domains.orders.models import OrderLine + from app.domains.notifications.service import emit_notification_sync, CHANNEL_ALERT + from app.tasks.celery_app import celery_app + + cutoff = datetime.utcnow() - timedelta(minutes=RENDER_STALL_TIMEOUT_MINUTES) + sync_url = settings.database_url.replace("+asyncpg", "") + engine = create_engine(sync_url) + + stalled_ids: list[str] = [] + + with Session(engine) as session: + rows = session.execute( + select(OrderLine).where( + OrderLine.render_status == "processing", + OrderLine.render_started_at < cutoff, + ) + ).scalars().all() + + for line in rows: + elapsed_minutes = int( + (datetime.utcnow() - line.render_started_at).total_seconds() / 60 + ) + # Try to revoke the Celery task if we have its ID + job_doc = line.render_job_doc or {} + celery_task_id = job_doc.get("celery_task_id") + if celery_task_id: + try: + celery_app.control.revoke(celery_task_id, terminate=True) + logger.info( + "[WORKER_RECOVERY] Revoked Celery task %s for order_line %s", + celery_task_id, line.id, + ) + except Exception as revoke_exc: + logger.warning( + "[WORKER_RECOVERY] Could not revoke task %s: %s", + celery_task_id, revoke_exc, + ) + + # Update render_job_doc with failure info + updated_doc = dict(job_doc) + updated_doc["state"] = "failed" + updated_doc["error"] = ( + f"Stalled render terminated by health recovery after {RENDER_STALL_TIMEOUT_MINUTES}min" + ) + + line.render_status = "failed" + line.render_job_doc = updated_doc + + stalled_ids.append(str(line.id)) + logger.warning( + "[WORKER_RECOVERY] Stalled render for order_line %s terminated after %dmin", + line.id, elapsed_minutes, + ) + + if stalled_ids: + session.commit() + + engine.dispose() + + if stalled_ids: + emit_notification_sync( + action="worker.recovery", + entity_type="system", + details={ + "message": ( + f"Worker recovery: {len(stalled_ids)} stalled render(s) terminated " + f"after {RENDER_STALL_TIMEOUT_MINUTES}min timeout" + ), + "stalled_order_line_ids": stalled_ids, + "count": len(stalled_ids), + }, + channel=CHANNEL_ALERT, + ) + logger.warning( + "recover_stalled_renders: terminated %d stalled render(s): %s", + len(stalled_ids), stalled_ids, + ) + else: + logger.debug("recover_stalled_renders: no stalled renders found") + except Exception as exc: + logger.error("recover_stalled_renders failed: %s", exc) + + @shared_task(name="app.tasks.beat_tasks.batch_render_notifications", queue="step_processing") def batch_render_notifications() -> None: """Check for orders where all lines are terminal but no batch notification emitted yet. diff --git a/backend/app/tasks/celery_app.py b/backend/app/tasks/celery_app.py index 6ef0e9d..15a1c55 100644 --- a/backend/app/tasks/celery_app.py +++ b/backend/app/tasks/celery_app.py @@ -42,5 +42,9 @@ celery_app.conf.update( "task": "app.tasks.beat_tasks.batch_render_notifications", "schedule": 60.0, # every 60 seconds }, + "recover-stalled-renders-every-5m": { + "task": "app.tasks.beat_tasks.recover_stalled_renders", + "schedule": 300.0, # every 5 minutes + }, }, )