"""Execute a validated WorkflowDefinition config as individual Celery tasks. Usage:: from app.domains.rendering.workflow_executor import dispatch_workflow task_ids = dispatch_workflow(workflow_definition.config, context_id=str(order_line_id)) The function: 1. Validates the raw config dict via WorkflowConfig. 2. Sorts nodes into dependency order (topological sort, Kahn's algorithm). 3. Maps each StepName to the corresponding Celery task name string. 4. Sends each task independently (not chained) so that every node gets its own Celery task ID and can be tracked individually. Nodes whose StepName has no mapping in STEP_TASK_MAP are skipped with a warning — this lets you add new StepName values to the enum without breaking existing dispatchers. """ import logging from collections import deque from app.domains.rendering.workflow_schema import WorkflowConfig, WorkflowNode from app.core.process_steps import StepName logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Step → Celery task name mapping # # Values are the full dotted task name strings passed to celery_app.send_task(). # Keep this in sync with the @celery_app.task(name=...) decorators in the # various tasks modules. # --------------------------------------------------------------------------- STEP_TASK_MAP: dict[StepName, str] = { # ── STEP file processing ───────────────────────────────────────────── StepName.RESOLVE_STEP_PATH: "app.tasks.step_tasks.process_step_file", StepName.OCC_OBJECT_EXTRACT: "app.tasks.step_tasks.process_step_file", StepName.OCC_GLB_EXPORT: "app.tasks.step_tasks.generate_gltf_geometry_task", StepName.STL_CACHE_GENERATE: "app.tasks.step_tasks.process_step_file", # ── Thumbnail generation ───────────────────────────────────────────── StepName.BLENDER_RENDER: "app.tasks.step_tasks.render_step_thumbnail", StepName.THUMBNAIL_SAVE: "app.tasks.step_tasks.render_step_thumbnail", # ── Order line stills & turntables ────────────────────────────────── StepName.BLENDER_STILL: "app.domains.rendering.tasks.render_order_line_still_task", StepName.BLENDER_TURNTABLE: "app.domains.rendering.tasks.render_turntable_task", # ── GLB / asset export ─────────────────────────────────────────────── StepName.EXPORT_GLB_GEOMETRY: "app.domains.rendering.tasks.export_gltf_for_order_line_task", StepName.EXPORT_BLEND: "app.domains.rendering.tasks.export_blend_for_order_line_task", # ── Steps without a dedicated standalone task (no mapping) ─────────── # StepName.GLB_BBOX — computed inline inside process_step_file # StepName.MATERIAL_MAP_RESOLVE — computed inline inside render tasks # StepName.AUTO_POPULATE_MATERIALS — computed inline inside process_step_file # StepName.THREEJS_RENDER — no standalone task exists yet # StepName.ORDER_LINE_SETUP — computed inline inside render_order_line_task # StepName.RESOLVE_TEMPLATE — computed inline inside render_order_line_task # StepName.OUTPUT_SAVE — handled via publish_asset after render tasks # StepName.NOTIFY — emitted inline via notification_service } def dispatch_workflow(workflow_config: dict, context_id: str) -> list[str]: """Execute workflow nodes in topological order as individual Celery tasks. Args: workflow_config: Raw dict from WorkflowDefinition.config JSONB. context_id: UUID string of the entity being processed. Depending on the StepName, this is interpreted as a ``cad_file_id`` (for STEP processing / thumbnail tasks) or an ``order_line_id`` (for render tasks). Returns: List of dispatched Celery task ID strings (one per mapped node). Raises: pydantic.ValidationError: if *workflow_config* does not conform to WorkflowConfig schema. ValueError: if the node graph contains a cycle. """ from app.tasks.celery_app import celery_app # Validate + parse config config = WorkflowConfig.model_validate(workflow_config) # Topological sort so that dependency nodes dispatch first ordered_nodes = _topological_sort(config) task_ids: list[str] = [] for node in ordered_nodes: task_name = STEP_TASK_MAP.get(node.step) if task_name is None: logger.warning( "[WORKFLOW] No Celery task mapping for step %r — skipping node %r", node.step, node.id, ) continue result = celery_app.send_task(task_name, args=[context_id], kwargs=node.params) task_ids.append(result.id) logger.info( "[WORKFLOW] Dispatched node %r (step=%s) → Celery task %s", node.id, node.step, result.id, ) logger.info( "[WORKFLOW] dispatch_workflow complete: %d task(s) dispatched for context %s", len(task_ids), context_id, ) return task_ids # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _topological_sort(config: WorkflowConfig) -> list[WorkflowNode]: """Return nodes in dependency order using Kahn's algorithm. Nodes with no incoming edges are processed first. If the graph contains a cycle a ``ValueError`` is raised (the caller should treat this as a bad workflow config). """ node_map: dict[str, WorkflowNode] = {n.id: n for n in config.nodes} in_degree: dict[str, int] = {n.id: 0 for n in config.nodes} adjacency: dict[str, list[str]] = {n.id: [] for n in config.nodes} for edge in config.edges: adjacency[edge.from_node].append(edge.to_node) in_degree[edge.to_node] += 1 queue: deque[str] = deque(nid for nid, deg in in_degree.items() if deg == 0) result: list[WorkflowNode] = [] while queue: nid = queue.popleft() result.append(node_map[nid]) for neighbor in adjacency[nid]: in_degree[neighbor] -= 1 if in_degree[neighbor] == 0: queue.append(neighbor) if len(result) != len(config.nodes): raise ValueError( "Workflow config contains a cycle — topological sort failed. " f"Processed {len(result)}/{len(config.nodes)} nodes." ) return result