ec667dd56a
- Remove export_gltf.py (Blender-based GLB export replaced by OCC direct) - Remove unused export_gltf_for_order_line_task - Add Ultra tessellation preset to Admin settings - Improve tessellation preset descriptions and styling - Minor cleanup across media, rendering, and workflow modules Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
153 lines
6.6 KiB
Python
153 lines
6.6 KiB
Python
"""Execute a validated WorkflowDefinition config as individual Celery tasks.
|
|
|
|
Usage::
|
|
|
|
from app.domains.rendering.workflow_executor import dispatch_workflow
|
|
|
|
task_ids = dispatch_workflow(workflow_definition.config, context_id=str(order_line_id))
|
|
|
|
The function:
|
|
1. Validates the raw config dict via WorkflowConfig.
|
|
2. Sorts nodes into dependency order (topological sort, Kahn's algorithm).
|
|
3. Maps each StepName to the corresponding Celery task name string.
|
|
4. Sends each task independently (not chained) so that every node gets its own
|
|
Celery task ID and can be tracked individually.
|
|
|
|
Nodes whose StepName has no mapping in STEP_TASK_MAP are skipped with a
|
|
warning — this lets you add new StepName values to the enum without breaking
|
|
existing dispatchers.
|
|
"""
|
|
import logging
|
|
from collections import deque
|
|
|
|
from app.domains.rendering.workflow_schema import WorkflowConfig, WorkflowNode
|
|
from app.core.process_steps import StepName
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Step → Celery task name mapping
|
|
#
|
|
# Values are the full dotted task name strings passed to celery_app.send_task().
|
|
# Keep this in sync with the @celery_app.task(name=...) decorators in the
|
|
# various tasks modules.
|
|
# ---------------------------------------------------------------------------
|
|
STEP_TASK_MAP: dict[StepName, str] = {
|
|
# ── STEP file processing ─────────────────────────────────────────────
|
|
StepName.RESOLVE_STEP_PATH: "app.tasks.step_tasks.process_step_file",
|
|
StepName.OCC_OBJECT_EXTRACT: "app.tasks.step_tasks.process_step_file",
|
|
StepName.OCC_GLB_EXPORT: "app.tasks.step_tasks.generate_gltf_geometry_task",
|
|
StepName.STL_CACHE_GENERATE: "app.tasks.step_tasks.process_step_file",
|
|
# ── Thumbnail generation ─────────────────────────────────────────────
|
|
StepName.BLENDER_RENDER: "app.tasks.step_tasks.render_step_thumbnail",
|
|
StepName.THUMBNAIL_SAVE: "app.tasks.step_tasks.render_step_thumbnail",
|
|
# ── Order line stills & turntables ──────────────────────────────────
|
|
StepName.BLENDER_STILL: "app.domains.rendering.tasks.render_order_line_still_task",
|
|
StepName.BLENDER_TURNTABLE: "app.domains.rendering.tasks.render_turntable_task",
|
|
# ── Asset export ─────────────────────────────────────────────────────
|
|
StepName.EXPORT_BLEND: "app.domains.rendering.tasks.export_blend_for_order_line_task",
|
|
# ── Steps without a dedicated standalone task (no mapping) ───────────
|
|
# StepName.GLB_BBOX — computed inline inside process_step_file
|
|
# StepName.MATERIAL_MAP_RESOLVE — computed inline inside render tasks
|
|
# StepName.AUTO_POPULATE_MATERIALS — computed inline inside process_step_file
|
|
# StepName.THREEJS_RENDER — no standalone task exists yet
|
|
# StepName.ORDER_LINE_SETUP — computed inline inside render_order_line_task
|
|
# StepName.RESOLVE_TEMPLATE — computed inline inside render_order_line_task
|
|
# StepName.OUTPUT_SAVE — handled via publish_asset after render tasks
|
|
# StepName.NOTIFY — emitted inline via notification_service
|
|
}
|
|
|
|
|
|
def dispatch_workflow(workflow_config: dict, context_id: str) -> list[str]:
|
|
"""Execute workflow nodes in topological order as individual Celery tasks.
|
|
|
|
Args:
|
|
workflow_config: Raw dict from WorkflowDefinition.config JSONB.
|
|
context_id: UUID string of the entity being processed. Depending on
|
|
the StepName, this is interpreted as a ``cad_file_id`` (for STEP
|
|
processing / thumbnail tasks) or an ``order_line_id`` (for render
|
|
tasks).
|
|
|
|
Returns:
|
|
List of dispatched Celery task ID strings (one per mapped node).
|
|
|
|
Raises:
|
|
pydantic.ValidationError: if *workflow_config* does not conform to
|
|
WorkflowConfig schema.
|
|
ValueError: if the node graph contains a cycle.
|
|
"""
|
|
from app.tasks.celery_app import celery_app
|
|
|
|
# Validate + parse config
|
|
config = WorkflowConfig.model_validate(workflow_config)
|
|
|
|
# Topological sort so that dependency nodes dispatch first
|
|
ordered_nodes = _topological_sort(config)
|
|
|
|
task_ids: list[str] = []
|
|
for node in ordered_nodes:
|
|
task_name = STEP_TASK_MAP.get(node.step)
|
|
if task_name is None:
|
|
logger.warning(
|
|
"[WORKFLOW] No Celery task mapping for step %r — skipping node %r",
|
|
node.step,
|
|
node.id,
|
|
)
|
|
continue
|
|
|
|
result = celery_app.send_task(task_name, args=[context_id], kwargs=node.params)
|
|
task_ids.append(result.id)
|
|
logger.info(
|
|
"[WORKFLOW] Dispatched node %r (step=%s) → Celery task %s",
|
|
node.id,
|
|
node.step,
|
|
result.id,
|
|
)
|
|
|
|
logger.info(
|
|
"[WORKFLOW] dispatch_workflow complete: %d task(s) dispatched for context %s",
|
|
len(task_ids),
|
|
context_id,
|
|
)
|
|
return task_ids
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _topological_sort(config: WorkflowConfig) -> list[WorkflowNode]:
|
|
"""Return nodes in dependency order using Kahn's algorithm.
|
|
|
|
Nodes with no incoming edges are processed first. If the graph contains a
|
|
cycle a ``ValueError`` is raised (the caller should treat this as a bad
|
|
workflow config).
|
|
"""
|
|
node_map: dict[str, WorkflowNode] = {n.id: n for n in config.nodes}
|
|
in_degree: dict[str, int] = {n.id: 0 for n in config.nodes}
|
|
adjacency: dict[str, list[str]] = {n.id: [] for n in config.nodes}
|
|
|
|
for edge in config.edges:
|
|
adjacency[edge.from_node].append(edge.to_node)
|
|
in_degree[edge.to_node] += 1
|
|
|
|
queue: deque[str] = deque(nid for nid, deg in in_degree.items() if deg == 0)
|
|
result: list[WorkflowNode] = []
|
|
|
|
while queue:
|
|
nid = queue.popleft()
|
|
result.append(node_map[nid])
|
|
for neighbor in adjacency[nid]:
|
|
in_degree[neighbor] -= 1
|
|
if in_degree[neighbor] == 0:
|
|
queue.append(neighbor)
|
|
|
|
if len(result) != len(config.nodes):
|
|
raise ValueError(
|
|
"Workflow config contains a cycle — topological sort failed. "
|
|
f"Processed {len(result)}/{len(config.nodes)} nodes."
|
|
)
|
|
|
|
return result
|