"""Celery Canvas workflow builder. Translates WorkflowDefinition config into a Celery Canvas (chain/group/chord). """ from __future__ import annotations import logging from celery import chain, group logger = logging.getLogger(__name__) def dispatch_workflow( workflow_type: str, order_line_id: str, params: dict | None = None, ) -> str: """Build and dispatch a Celery Canvas workflow. Returns the Celery task/group ID.""" params = params or {} builders = { "still": _build_still, "turntable": _build_turntable, "multi_angle": _build_multi_angle, } builder = builders.get(workflow_type) if not builder: raise ValueError(f"Unknown workflow type: {workflow_type!r}") canvas = builder(order_line_id, params) result = canvas.apply_async() return str(result.id) def _build_still(order_line_id: str, params: dict): from app.domains.rendering.tasks import render_still_task return chain( render_still_task.si(order_line_id, **params) ) def _build_turntable(order_line_id: str, params: dict): from app.domains.rendering.tasks import render_turntable_task return chain( render_turntable_task.si(order_line_id, **params) ) def _build_multi_angle(order_line_id: str, params: dict): from app.domains.rendering.tasks import render_still_task angles = params.get("angles", [0, 45, 90]) p = {k: v for k, v in params.items() if k != "angles"} return group( render_still_task.si(order_line_id, camera_angle=angle, **p) for angle in angles )