Files
HartOMat/backend/app/domains/rendering/workflow_schema.py
T

290 lines
10 KiB
Python

"""Pydantic schema for validated WorkflowDefinition.config JSONB.
A workflow config is a versioned DAG description stored as JSONB. Before
being dispatched (or saved), the raw dict must pass this schema.
Example config::
{
"version": 1,
"nodes": [
{"id": "n1", "step": "resolve_step_path", "params": {}},
{"id": "n2", "step": "blender_still", "params": {"engine": "cycles"}}
],
"edges": [
{"from": "n1", "to": "n2"}
]
}
"""
from collections import deque
from typing import Any, Literal
from pydantic import BaseModel, Field, field_validator, model_validator
from app.core.process_steps import StepName
from app.domains.rendering.workflow_node_registry import (
WorkflowNodeDefinition,
WorkflowNodeFieldDefinition,
get_node_definition,
)
def _context_seed_artifacts(definition: WorkflowNodeDefinition) -> set[str]:
if definition.family == "order_line":
return {"order_line_record"}
if definition.family == "cad_file":
return {"cad_file_record"}
return set()
def _coerce_node_label(node: "WorkflowNode") -> str:
return f"{node.id!r} ({node.step.value})"
def _validate_param_value(
*,
node: "WorkflowNode",
field_definition: WorkflowNodeFieldDefinition,
value: Any,
) -> None:
if value is None:
return
field_label = f"node {_coerce_node_label(node)} param {field_definition.key!r}"
if field_definition.type == "number":
if isinstance(value, bool) or not isinstance(value, (int, float)):
raise ValueError(f"{field_label} must be a number")
numeric_value = float(value)
if field_definition.min is not None and numeric_value < field_definition.min:
raise ValueError(f"{field_label} must be >= {field_definition.min:g}")
if field_definition.max is not None and numeric_value > field_definition.max:
raise ValueError(f"{field_label} must be <= {field_definition.max:g}")
return
if field_definition.type == "boolean":
if not isinstance(value, bool):
raise ValueError(f"{field_label} must be a boolean")
return
if field_definition.type == "select":
valid_values = {option.value for option in field_definition.options}
if value not in valid_values:
allowed_values = ", ".join(repr(option) for option in sorted(valid_values, key=repr))
raise ValueError(f"{field_label} must be one of: {allowed_values}")
class WorkflowPosition(BaseModel):
x: float
y: float
class WorkflowNodeUI(BaseModel):
type: str | None = None
position: WorkflowPosition | None = None
label: str | None = None
class WorkflowNode(BaseModel):
id: str
step: StepName # validated against the StepName StrEnum
params: dict[str, Any] = Field(default_factory=dict)
ui: WorkflowNodeUI | None = None
class WorkflowEdge(BaseModel):
# "from" is a Python keyword, so we alias it.
from_node: str = Field(alias="from")
to_node: str = Field(alias="to")
model_config = {"populate_by_name": True}
class WorkflowUI(BaseModel):
preset: str | None = None
execution_mode: Literal["legacy", "graph", "shadow"] | None = None
family: Literal["cad_file", "order_line", "mixed"] | None = None
class WorkflowConfig(BaseModel):
version: int = 1
nodes: list[WorkflowNode]
edges: list[WorkflowEdge] = Field(default_factory=list)
ui: WorkflowUI | None = None
@field_validator("nodes")
@classmethod
def nodes_not_empty(cls, v: list[WorkflowNode]) -> list[WorkflowNode]:
if not v:
raise ValueError("workflow must have at least one node")
return v
@field_validator("edges")
@classmethod
def edges_reference_valid_nodes(
cls, edges: list[WorkflowEdge], info
) -> list[WorkflowEdge]:
if "nodes" in info.data:
node_ids = {n.id for n in info.data["nodes"]}
for edge in edges:
if edge.from_node not in node_ids:
raise ValueError(
f"edge references unknown node id: {edge.from_node!r}"
)
if edge.to_node not in node_ids:
raise ValueError(
f"edge references unknown node id: {edge.to_node!r}"
)
return edges
@model_validator(mode="after")
def node_ids_are_unique(self) -> "WorkflowConfig":
seen: set[str] = set()
for node in self.nodes:
if node.id in seen:
raise ValueError(f"duplicate node id: {node.id!r}")
seen.add(node.id)
return self
@model_validator(mode="after")
def node_params_match_registry(self) -> "WorkflowConfig":
for node in self.nodes:
definition = get_node_definition(node.step)
if definition is None:
continue
field_definitions = {field.key: field for field in definition.fields}
allowed_keys = {field.key for field in definition.fields}
unknown_keys = sorted(key for key in node.params if key not in allowed_keys)
if unknown_keys:
joined = ", ".join(repr(key) for key in unknown_keys)
raise ValueError(
f"node {node.id!r} ({node.step.value}) uses unknown param key(s): {joined}"
)
for key, value in node.params.items():
field_definition = field_definitions.get(key)
if field_definition is None:
continue
_validate_param_value(
node=node,
field_definition=field_definition,
value=value,
)
return self
@model_validator(mode="after")
def ui_family_matches_node_families(self) -> "WorkflowConfig":
families = {
definition.family
for node in self.nodes
if (definition := get_node_definition(node.step)) is not None
}
if not families:
return self
inferred_family = "mixed" if len(families) > 1 else next(iter(families))
execution_mode = self.ui.execution_mode if self.ui is not None else "legacy"
if execution_mode in {"graph", "shadow"} and inferred_family == "mixed":
raise ValueError(
"workflow ui.execution_mode must stay single-family for graph/shadow execution"
)
if self.ui is None or self.ui.family is None:
return self
if self.ui.family != inferred_family:
ordered_families = ", ".join(sorted(families))
raise ValueError(
f"workflow ui.family={self.ui.family!r} does not match node families: {ordered_families}"
)
return self
@model_validator(mode="after")
def node_contracts_are_connected(self) -> "WorkflowConfig":
execution_mode = self.ui.execution_mode if self.ui is not None else "legacy"
if execution_mode not in {"graph", "shadow"}:
return self
node_by_id = {node.id: node for node in self.nodes}
adjacency: dict[str, list[str]] = {node.id: [] for node in self.nodes}
in_degree: dict[str, int] = {node.id: 0 for node in self.nodes}
available_artifacts: dict[str, set[str]] = {node.id: set() for node in self.nodes}
for edge in self.edges:
adjacency[edge.from_node].append(edge.to_node)
in_degree[edge.to_node] += 1
queue: deque[str] = deque(
node_id for node_id, degree in in_degree.items() if degree == 0
)
processed = 0
while queue:
node_id = queue.popleft()
processed += 1
node = node_by_id[node_id]
definition = get_node_definition(node.step)
if definition is None:
continue
node_inputs = available_artifacts[node_id] | _context_seed_artifacts(definition)
required = set(definition.input_contract.get("requires", []))
missing_required = sorted(required - node_inputs)
if missing_required:
joined = ", ".join(repr(value) for value in missing_required)
raise ValueError(
f"node {_coerce_node_label(node)} is missing required input artifact(s): {joined}"
)
required_any = set(definition.input_contract.get("requires_any", []))
if required_any and not node_inputs.intersection(required_any):
joined = ", ".join(repr(value) for value in sorted(required_any))
raise ValueError(
f"node {_coerce_node_label(node)} requires at least one upstream artifact from: {joined}"
)
node_outputs = node_inputs | set(definition.output_contract.get("provides", []))
for downstream_id in adjacency[node_id]:
available_artifacts[downstream_id].update(node_outputs)
in_degree[downstream_id] -= 1
if in_degree[downstream_id] == 0:
queue.append(downstream_id)
if processed != len(self.nodes):
return self
return self
@model_validator(mode="after")
def edges_are_unique_and_acyclic(self) -> "WorkflowConfig":
edge_pairs: set[tuple[str, str]] = set()
adjacency: dict[str, list[str]] = {node.id: [] for node in self.nodes}
in_degree: dict[str, int] = {node.id: 0 for node in self.nodes}
for edge in self.edges:
edge_pair = (edge.from_node, edge.to_node)
if edge.from_node == edge.to_node:
raise ValueError(f"self-referential edge is not allowed: {edge.from_node!r}")
if edge_pair in edge_pairs:
raise ValueError(
f"duplicate edge is not allowed: {edge.from_node!r} -> {edge.to_node!r}"
)
edge_pairs.add(edge_pair)
adjacency[edge.from_node].append(edge.to_node)
in_degree[edge.to_node] += 1
queue = [node_id for node_id, degree in in_degree.items() if degree == 0]
processed = 0
while queue:
node_id = queue.pop(0)
processed += 1
for neighbor in adjacency[node_id]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
if processed != len(self.nodes):
raise ValueError(
"workflow graph must be acyclic"
)
return self