feat: add duplicate-safe workflow shadow dispatch
This commit is contained in:
@@ -15,30 +15,86 @@ from app.core.task_logs import log_task_event
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _update_workflow_run_status(order_line_id: str, status: str, error: str | None = None) -> None:
|
||||
"""Update the most recent WorkflowRun for an order_line after task completion."""
|
||||
def _update_workflow_run_status(
|
||||
order_line_id: str,
|
||||
status: str,
|
||||
error: str | None = None,
|
||||
*,
|
||||
workflow_run_id: str | None = None,
|
||||
workflow_node_id: str | None = None,
|
||||
) -> None:
|
||||
"""Update WorkflowRun / WorkflowNodeResult state after task completion."""
|
||||
try:
|
||||
import asyncio
|
||||
import uuid
|
||||
from datetime import datetime as _dt
|
||||
|
||||
async def _run():
|
||||
from app.database import AsyncSessionLocal
|
||||
from app.domains.rendering.models import WorkflowRun
|
||||
from app.domains.rendering.models import WorkflowNodeResult, WorkflowRun
|
||||
from sqlalchemy import select as _sel
|
||||
|
||||
async with AsyncSessionLocal() as db:
|
||||
res = await db.execute(
|
||||
_sel(WorkflowRun)
|
||||
.where(WorkflowRun.order_line_id == order_line_id)
|
||||
.order_by(WorkflowRun.created_at.desc())
|
||||
.limit(1)
|
||||
run = None
|
||||
if workflow_run_id:
|
||||
try:
|
||||
resolved_run_id = uuid.UUID(str(workflow_run_id))
|
||||
except (TypeError, ValueError):
|
||||
resolved_run_id = workflow_run_id
|
||||
run_res = await db.execute(
|
||||
_sel(WorkflowRun).where(WorkflowRun.id == resolved_run_id)
|
||||
)
|
||||
run = run_res.scalar_one_or_none()
|
||||
else:
|
||||
res = await db.execute(
|
||||
_sel(WorkflowRun)
|
||||
.where(WorkflowRun.order_line_id == order_line_id)
|
||||
.order_by(WorkflowRun.created_at.desc())
|
||||
.limit(1)
|
||||
)
|
||||
run = res.scalar_one_or_none()
|
||||
|
||||
if run is None:
|
||||
return
|
||||
|
||||
if workflow_node_id:
|
||||
node_res = await db.execute(
|
||||
_sel(WorkflowNodeResult).where(
|
||||
WorkflowNodeResult.run_id == run.id,
|
||||
WorkflowNodeResult.node_name == workflow_node_id,
|
||||
)
|
||||
)
|
||||
node_result = node_res.scalar_one_or_none()
|
||||
if node_result is not None:
|
||||
metadata = dict(node_result.output or {})
|
||||
if error:
|
||||
metadata["last_error"] = error[:2000]
|
||||
node_result.status = status
|
||||
node_result.log = error[:2000] if error else None
|
||||
node_result.output = metadata
|
||||
|
||||
node_results_res = await db.execute(
|
||||
_sel(WorkflowNodeResult).where(WorkflowNodeResult.run_id == run.id)
|
||||
)
|
||||
run = res.scalar_one_or_none()
|
||||
if run and run.status == "pending":
|
||||
run.status = status
|
||||
node_results = list(node_results_res.scalars().all())
|
||||
|
||||
if any(node.status == "failed" for node in node_results):
|
||||
run.status = "failed"
|
||||
run.completed_at = _dt.utcnow()
|
||||
if error:
|
||||
run.error_message = error[:2000]
|
||||
await db.commit()
|
||||
elif any(node.status in {"pending", "queued", "running", "retrying"} for node in node_results):
|
||||
run.status = "pending"
|
||||
run.completed_at = None
|
||||
if status != "failed":
|
||||
run.error_message = None
|
||||
else:
|
||||
run.status = status
|
||||
run.completed_at = _dt.utcnow()
|
||||
if status != "failed":
|
||||
run.error_message = None
|
||||
|
||||
await db.commit()
|
||||
|
||||
asyncio.get_event_loop().run_until_complete(_run())
|
||||
except Exception as _exc:
|
||||
@@ -315,6 +371,7 @@ def publish_asset(
|
||||
asset_type: str,
|
||||
storage_key: str,
|
||||
render_config: dict | None = None,
|
||||
workflow_run_id: str | None = None,
|
||||
) -> str | None:
|
||||
"""Create a MediaAsset record after a successful render."""
|
||||
import asyncio
|
||||
@@ -345,6 +402,7 @@ def publish_asset(
|
||||
order_line_id=line.id,
|
||||
product_id=line.product_id,
|
||||
cad_file_id=cad_file_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
asset_type=MediaAssetType(asset_type),
|
||||
storage_key=storage_key,
|
||||
render_config=render_config,
|
||||
@@ -404,6 +462,13 @@ def render_order_line_still_task(self, order_line_id: str, **params) -> dict:
|
||||
from app.domains.rendering.job_document import RenderJobDocument, JobState
|
||||
from app.core.process_steps import StepName
|
||||
|
||||
workflow_run_id = params.pop("workflow_run_id", None)
|
||||
workflow_node_id = params.pop("workflow_node_id", None)
|
||||
publish_asset_enabled = bool(params.pop("publish_asset_enabled", True))
|
||||
emit_events = bool(params.pop("emit_events", True))
|
||||
job_document_enabled = bool(params.pop("job_document_enabled", True))
|
||||
output_name_suffix = params.pop("output_name_suffix", None)
|
||||
|
||||
log_task_event(self.request.id, f"Starting render_order_line_still_task: order_line={order_line_id}", "info")
|
||||
|
||||
# Initialise job document and store real Celery task ID
|
||||
@@ -422,6 +487,8 @@ def render_order_line_still_task(self, order_line_id: str, **params) -> dict:
|
||||
.values(render_job_doc=job_doc.to_dict())
|
||||
)
|
||||
await db.commit()
|
||||
if not job_document_enabled:
|
||||
return
|
||||
try:
|
||||
asyncio.get_event_loop().run_until_complete(_run())
|
||||
except Exception as _exc:
|
||||
@@ -445,7 +512,10 @@ def render_order_line_still_task(self, order_line_id: str, **params) -> dict:
|
||||
step = Path(step_path_str)
|
||||
output_dir = step.parent / "renders"
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
output_path = output_dir / f"line_{order_line_id}.png"
|
||||
output_filename = f"line_{order_line_id}.png"
|
||||
if output_name_suffix:
|
||||
output_filename = f"line_{order_line_id}_{output_name_suffix}.png"
|
||||
output_path = output_dir / output_filename
|
||||
|
||||
try:
|
||||
job_doc.begin_step(StepName.BLENDER_STILL)
|
||||
@@ -466,12 +536,14 @@ def render_order_line_still_task(self, order_line_id: str, **params) -> dict:
|
||||
})
|
||||
_save_job_doc()
|
||||
|
||||
publish_asset.delay(
|
||||
order_line_id,
|
||||
"still",
|
||||
str(output_path),
|
||||
render_config=result,
|
||||
)
|
||||
if publish_asset_enabled:
|
||||
publish_asset.delay(
|
||||
order_line_id,
|
||||
"still",
|
||||
str(output_path),
|
||||
render_config=result,
|
||||
workflow_run_id=workflow_run_id,
|
||||
)
|
||||
log_task_event(self.request.id, f"Completed successfully in {result.get('total_duration_s', 0):.1f}s", "done")
|
||||
logger.info(
|
||||
"render_order_line_still_task completed for line %s in %.1fs",
|
||||
@@ -479,13 +551,19 @@ def render_order_line_still_task(self, order_line_id: str, **params) -> dict:
|
||||
)
|
||||
try:
|
||||
from app.core.websocket import publish_event_sync
|
||||
publish_event_sync(None, {
|
||||
"type": "render.order_line.completed",
|
||||
"order_line_id": order_line_id,
|
||||
})
|
||||
if emit_events:
|
||||
publish_event_sync(None, {
|
||||
"type": "render.order_line.completed",
|
||||
"order_line_id": order_line_id,
|
||||
})
|
||||
except Exception:
|
||||
pass
|
||||
_update_workflow_run_status(order_line_id, "completed")
|
||||
_update_workflow_run_status(
|
||||
order_line_id,
|
||||
"completed",
|
||||
workflow_run_id=workflow_run_id,
|
||||
workflow_node_id=workflow_node_id,
|
||||
)
|
||||
return result
|
||||
except Exception as exc:
|
||||
job_doc.fail_step(StepName.BLENDER_STILL, str(exc))
|
||||
@@ -495,14 +573,21 @@ def render_order_line_still_task(self, order_line_id: str, **params) -> dict:
|
||||
logger.error("render_order_line_still_task failed for %s: %s", order_line_id, exc)
|
||||
try:
|
||||
from app.core.websocket import publish_event_sync
|
||||
publish_event_sync(None, {
|
||||
"type": "render.order_line.failed",
|
||||
"order_line_id": order_line_id,
|
||||
"error": str(exc),
|
||||
})
|
||||
if emit_events:
|
||||
publish_event_sync(None, {
|
||||
"type": "render.order_line.failed",
|
||||
"order_line_id": order_line_id,
|
||||
"error": str(exc),
|
||||
})
|
||||
except Exception:
|
||||
pass
|
||||
_update_workflow_run_status(order_line_id, "failed", str(exc))
|
||||
_update_workflow_run_status(
|
||||
order_line_id,
|
||||
"failed",
|
||||
str(exc),
|
||||
workflow_run_id=workflow_run_id,
|
||||
workflow_node_id=workflow_node_id,
|
||||
)
|
||||
raise self.retry(exc=exc, countdown=30)
|
||||
|
||||
|
||||
@@ -512,7 +597,15 @@ def render_order_line_still_task(self, order_line_id: str, **params) -> dict:
|
||||
queue="asset_pipeline",
|
||||
max_retries=1,
|
||||
)
|
||||
def export_blend_for_order_line_task(self, order_line_id: str) -> dict:
|
||||
def export_blend_for_order_line_task(
|
||||
self,
|
||||
order_line_id: str,
|
||||
workflow_run_id: str | None = None,
|
||||
workflow_node_id: str | None = None,
|
||||
publish_asset_enabled: bool = True,
|
||||
output_name_suffix: str | None = None,
|
||||
**_kwargs,
|
||||
) -> dict:
|
||||
"""Export a production .blend file via Blender + asset library (export_blend.py).
|
||||
|
||||
Publishes a MediaAsset with asset_type='blend_production'.
|
||||
@@ -542,7 +635,10 @@ def export_blend_for_order_line_task(self, order_line_id: str) -> dict:
|
||||
if occ_res.returncode != 0:
|
||||
raise RuntimeError(f"GLB generation failed:\n{occ_res.stderr[-500:]}")
|
||||
|
||||
output_path = step.parent / f"{step.stem}_production.blend"
|
||||
output_name = f"{step.stem}_production.blend"
|
||||
if output_name_suffix:
|
||||
output_name = f"{step.stem}_production_{output_name_suffix}.blend"
|
||||
output_path = step.parent / output_name
|
||||
scripts_dir = Path(os.environ.get("RENDER_SCRIPTS_DIR", "/render-scripts"))
|
||||
export_script = scripts_dir / "export_blend.py"
|
||||
|
||||
@@ -589,11 +685,30 @@ def export_blend_for_order_line_task(self, order_line_id: str) -> dict:
|
||||
raise RuntimeError(
|
||||
f"export_blend.py exited {result.returncode}:\n{result.stderr[-500:]}"
|
||||
)
|
||||
publish_asset.delay(order_line_id, "blend_production", str(output_path))
|
||||
if publish_asset_enabled:
|
||||
publish_asset.delay(
|
||||
order_line_id,
|
||||
"blend_production",
|
||||
str(output_path),
|
||||
workflow_run_id=workflow_run_id,
|
||||
)
|
||||
logger.info("export_blend_for_order_line_task completed: %s", output_path.name)
|
||||
_update_workflow_run_status(
|
||||
order_line_id,
|
||||
"completed",
|
||||
workflow_run_id=workflow_run_id,
|
||||
workflow_node_id=workflow_node_id,
|
||||
)
|
||||
return {"blend_path": str(output_path)}
|
||||
except Exception as exc:
|
||||
logger.error("export_blend_for_order_line_task failed for %s: %s", order_line_id, exc)
|
||||
_update_workflow_run_status(
|
||||
order_line_id,
|
||||
"failed",
|
||||
str(exc),
|
||||
workflow_run_id=workflow_run_id,
|
||||
workflow_node_id=workflow_node_id,
|
||||
)
|
||||
raise self.retry(exc=exc, countdown=30)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user