feat(phase8.3): recover_stalled_renders beat task
Detect OrderLines stuck in render_status='processing' for >30min, revoke their Celery task, mark failed, update render_job_doc, and emit a single CHANNEL_ALERT batch notification to admins. Runs every 5min via beat schedule alongside recover_stuck_cad_files. Section B (tenant_id in JWT) confirmed already complete from Phase 4. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -82,6 +82,103 @@ def recover_stuck_cad_files() -> None:
|
||||
logger.error("recover_stuck_cad_files failed: %s", exc)
|
||||
|
||||
|
||||
@shared_task(name="app.tasks.beat_tasks.recover_stalled_renders", queue="step_processing")
|
||||
def recover_stalled_renders() -> None:
|
||||
"""Detect order_lines stuck in render_status='processing' for longer than
|
||||
RENDER_STALL_TIMEOUT_MINUTES (default 30) and mark them as failed.
|
||||
|
||||
Tries to revoke the Celery task if a celery_task_id is stored in render_job_doc.
|
||||
Emits a single batch system alert to admins via CHANNEL_ALERT.
|
||||
Runs every 5 minutes via Celery Beat.
|
||||
"""
|
||||
RENDER_STALL_TIMEOUT_MINUTES = 30
|
||||
try:
|
||||
from sqlalchemy import create_engine, select
|
||||
from sqlalchemy.orm import Session
|
||||
from app.config import settings
|
||||
from app.domains.orders.models import OrderLine
|
||||
from app.domains.notifications.service import emit_notification_sync, CHANNEL_ALERT
|
||||
from app.tasks.celery_app import celery_app
|
||||
|
||||
cutoff = datetime.utcnow() - timedelta(minutes=RENDER_STALL_TIMEOUT_MINUTES)
|
||||
sync_url = settings.database_url.replace("+asyncpg", "")
|
||||
engine = create_engine(sync_url)
|
||||
|
||||
stalled_ids: list[str] = []
|
||||
|
||||
with Session(engine) as session:
|
||||
rows = session.execute(
|
||||
select(OrderLine).where(
|
||||
OrderLine.render_status == "processing",
|
||||
OrderLine.render_started_at < cutoff,
|
||||
)
|
||||
).scalars().all()
|
||||
|
||||
for line in rows:
|
||||
elapsed_minutes = int(
|
||||
(datetime.utcnow() - line.render_started_at).total_seconds() / 60
|
||||
)
|
||||
# Try to revoke the Celery task if we have its ID
|
||||
job_doc = line.render_job_doc or {}
|
||||
celery_task_id = job_doc.get("celery_task_id")
|
||||
if celery_task_id:
|
||||
try:
|
||||
celery_app.control.revoke(celery_task_id, terminate=True)
|
||||
logger.info(
|
||||
"[WORKER_RECOVERY] Revoked Celery task %s for order_line %s",
|
||||
celery_task_id, line.id,
|
||||
)
|
||||
except Exception as revoke_exc:
|
||||
logger.warning(
|
||||
"[WORKER_RECOVERY] Could not revoke task %s: %s",
|
||||
celery_task_id, revoke_exc,
|
||||
)
|
||||
|
||||
# Update render_job_doc with failure info
|
||||
updated_doc = dict(job_doc)
|
||||
updated_doc["state"] = "failed"
|
||||
updated_doc["error"] = (
|
||||
f"Stalled render terminated by health recovery after {RENDER_STALL_TIMEOUT_MINUTES}min"
|
||||
)
|
||||
|
||||
line.render_status = "failed"
|
||||
line.render_job_doc = updated_doc
|
||||
|
||||
stalled_ids.append(str(line.id))
|
||||
logger.warning(
|
||||
"[WORKER_RECOVERY] Stalled render for order_line %s terminated after %dmin",
|
||||
line.id, elapsed_minutes,
|
||||
)
|
||||
|
||||
if stalled_ids:
|
||||
session.commit()
|
||||
|
||||
engine.dispose()
|
||||
|
||||
if stalled_ids:
|
||||
emit_notification_sync(
|
||||
action="worker.recovery",
|
||||
entity_type="system",
|
||||
details={
|
||||
"message": (
|
||||
f"Worker recovery: {len(stalled_ids)} stalled render(s) terminated "
|
||||
f"after {RENDER_STALL_TIMEOUT_MINUTES}min timeout"
|
||||
),
|
||||
"stalled_order_line_ids": stalled_ids,
|
||||
"count": len(stalled_ids),
|
||||
},
|
||||
channel=CHANNEL_ALERT,
|
||||
)
|
||||
logger.warning(
|
||||
"recover_stalled_renders: terminated %d stalled render(s): %s",
|
||||
len(stalled_ids), stalled_ids,
|
||||
)
|
||||
else:
|
||||
logger.debug("recover_stalled_renders: no stalled renders found")
|
||||
except Exception as exc:
|
||||
logger.error("recover_stalled_renders failed: %s", exc)
|
||||
|
||||
|
||||
@shared_task(name="app.tasks.beat_tasks.batch_render_notifications", queue="step_processing")
|
||||
def batch_render_notifications() -> None:
|
||||
"""Check for orders where all lines are terminal but no batch notification emitted yet.
|
||||
|
||||
@@ -42,5 +42,9 @@ celery_app.conf.update(
|
||||
"task": "app.tasks.beat_tasks.batch_render_notifications",
|
||||
"schedule": 60.0, # every 60 seconds
|
||||
},
|
||||
"recover-stalled-renders-every-5m": {
|
||||
"task": "app.tasks.beat_tasks.recover_stalled_renders",
|
||||
"schedule": 300.0, # every 5 minutes
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user