Files
HartOMat/backend/app/domains/rendering/workflow_schema.py
T
Hartmut b41e70cdad feat(phase7.3-ext): workflow executor + config validation
- workflow_schema.py: WorkflowConfig Pydantic model validates all node
  step values against StepName enum, edges reference declared node IDs,
  unique node IDs enforced; WorkflowEdge uses "from"/"to" aliases
- workflow_executor.py: dispatch_workflow() validates config, topological-
  sorts nodes (Kahn's algorithm, raises on cycles), maps StepName →
  Celery task name via STEP_TASK_MAP (all 15 StepName values covered),
  dispatches via celery_app.send_task()
- workflow_router.py: config validation on POST/PUT (422 on invalid);
  POST /{id}/dispatch?context_id= endpoint (PM+) dispatches workflow
  steps as Celery tasks for a given entity

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-08 20:41:26 +01:00

76 lines
2.2 KiB
Python

"""Pydantic schema for validated WorkflowDefinition.config JSONB.
A workflow config is a versioned DAG description stored as JSONB. Before
being dispatched (or saved), the raw dict must pass this schema.
Example config::
{
"version": 1,
"nodes": [
{"id": "n1", "step": "resolve_step_path", "params": {}},
{"id": "n2", "step": "blender_still", "params": {"engine": "cycles"}}
],
"edges": [
{"from": "n1", "to": "n2"}
]
}
"""
from pydantic import BaseModel, Field, field_validator, model_validator
from app.core.process_steps import StepName
class WorkflowNode(BaseModel):
id: str
step: StepName # validated against the StepName StrEnum
params: dict = {}
class WorkflowEdge(BaseModel):
# "from" is a Python keyword, so we alias it.
from_node: str = Field(alias="from")
to_node: str = Field(alias="to")
model_config = {"populate_by_name": True}
class WorkflowConfig(BaseModel):
version: int = 1
nodes: list[WorkflowNode]
edges: list[WorkflowEdge] = []
@field_validator("nodes")
@classmethod
def nodes_not_empty(cls, v: list[WorkflowNode]) -> list[WorkflowNode]:
if not v:
raise ValueError("workflow must have at least one node")
return v
@field_validator("edges")
@classmethod
def edges_reference_valid_nodes(
cls, edges: list[WorkflowEdge], info
) -> list[WorkflowEdge]:
if "nodes" in info.data:
node_ids = {n.id for n in info.data["nodes"]}
for edge in edges:
if edge.from_node not in node_ids:
raise ValueError(
f"edge references unknown node id: {edge.from_node!r}"
)
if edge.to_node not in node_ids:
raise ValueError(
f"edge references unknown node id: {edge.to_node!r}"
)
return edges
@model_validator(mode="after")
def node_ids_are_unique(self) -> "WorkflowConfig":
seen: set[str] = set()
for node in self.nodes:
if node.id in seen:
raise ValueError(f"duplicate node id: {node.id!r}")
seen.add(node.id)
return self