feat: stabilize workflow phase 1 foundation

This commit is contained in:
2026-04-07 08:48:48 +02:00
parent bc9ab5f864
commit 63e35ce807
8 changed files with 742 additions and 128 deletions
+17 -17
View File
@@ -680,35 +680,36 @@ async def seed_workflows(
):
"""Create the standard workflow definitions if they do not already exist."""
from app.domains.rendering.models import WorkflowDefinition
from app.domains.rendering.workflow_config_utils import build_preset_workflow_config
STANDARD_WORKFLOWS = [
{
"name": "Still Image — Cycles",
"config": {
"type": "still",
"params": {"render_engine": "cycles", "samples": 256, "resolution": [1920, 1080]},
},
"config": build_preset_workflow_config(
"still",
{"render_engine": "cycles", "samples": 256, "resolution": [1920, 1080]},
),
},
{
"name": "Still Image — EEVEE",
"config": {
"type": "still",
"params": {"render_engine": "eevee", "samples": 64, "resolution": [1920, 1080]},
},
"config": build_preset_workflow_config(
"still",
{"render_engine": "eevee", "samples": 64, "resolution": [1920, 1080]},
),
},
{
"name": "Turntable Animation",
"config": {
"type": "turntable",
"params": {"render_engine": "cycles", "samples": 64, "fps": 24, "duration_s": 5},
},
"config": build_preset_workflow_config(
"turntable",
{"render_engine": "cycles", "samples": 64, "fps": 24, "duration_s": 5},
),
},
{
"name": "Multi-Angle (0° / 45° / 90°)",
"config": {
"type": "multi_angle",
"params": {"render_engine": "cycles", "samples": 128, "angles": [0, 45, 90]},
},
"config": build_preset_workflow_config(
"multi_angle",
{"render_engine": "cycles", "samples": 128, "angles": [0, 45, 90]},
),
},
]
@@ -1093,4 +1094,3 @@ async def get_dashboard_stats(
product_stats=product_stats,
order_status=order_status,
)
@@ -77,8 +77,17 @@ def dispatch_render_with_workflow(order_line_id: str) -> dict:
)
return _legacy_dispatch(order_line_id)
workflow_type = wf_def.config.get("type")
params = wf_def.config.get("params", {})
from app.domains.rendering.workflow_config_utils import extract_runtime_workflow
workflow_type, params = extract_runtime_workflow(wf_def.config)
if workflow_type is None or workflow_type == "custom":
logger.warning(
"order_line %s: workflow_definition_id %s has no supported preset runtime, "
"falling back to legacy dispatch",
order_line_id,
wf_def.id,
)
return _legacy_dispatch(order_line_id)
logger.info(
"order_line %s: dispatching via WorkflowDefinition %s (type=%s)",
@@ -0,0 +1,286 @@
from __future__ import annotations
from copy import deepcopy
from typing import Any
from app.core.process_steps import StepName
WorkflowPresetType = str
_PRESET_TYPES = {
"still",
"turntable",
"multi_angle",
"still_with_exports",
"custom",
}
_STEP_TO_NODE_TYPE: dict[str, str] = {
StepName.RESOLVE_STEP_PATH.value: "inputNode",
StepName.STL_CACHE_GENERATE.value: "convertNode",
StepName.BLENDER_STILL.value: "renderNode",
StepName.BLENDER_TURNTABLE.value: "renderFramesNode",
StepName.OUTPUT_SAVE.value: "outputNode",
StepName.EXPORT_BLEND.value: "outputNode",
}
_NODE_TYPE_TO_STEP: dict[str, str] = {
"inputNode": StepName.RESOLVE_STEP_PATH.value,
"convertNode": StepName.STL_CACHE_GENERATE.value,
"renderNode": StepName.BLENDER_STILL.value,
"renderFramesNode": StepName.BLENDER_TURNTABLE.value,
"ffmpegNode": StepName.OUTPUT_SAVE.value,
"outputNode": StepName.OUTPUT_SAVE.value,
}
def _make_node(
node_id: str,
step: StepName,
x: int,
y: int,
*,
params: dict[str, Any] | None = None,
node_type: str | None = None,
label: str | None = None,
) -> dict[str, Any]:
return {
"id": node_id,
"step": step.value,
"params": deepcopy(params or {}),
"ui": {
"type": node_type or _STEP_TO_NODE_TYPE.get(step.value),
"position": {"x": x, "y": y},
"label": label,
},
}
def _resolution_to_dimensions(params: dict[str, Any]) -> dict[str, Any]:
merged = deepcopy(params)
resolution = merged.pop("resolution", None)
if isinstance(resolution, (list, tuple)) and len(resolution) == 2:
merged.setdefault("width", int(resolution[0]))
merged.setdefault("height", int(resolution[1]))
return merged
def build_preset_workflow_config(
preset_type: WorkflowPresetType,
params: dict[str, Any] | None = None,
) -> dict[str, Any]:
if preset_type not in _PRESET_TYPES:
raise ValueError(f"Unknown workflow preset type: {preset_type!r}")
params = deepcopy(params or {})
render_params = _resolution_to_dimensions(params)
if preset_type == "still":
nodes = [
_make_node("setup", StepName.ORDER_LINE_SETUP, 0, 100, label="Order Line Setup"),
_make_node("template", StepName.RESOLVE_TEMPLATE, 220, 100, label="Resolve Template"),
_make_node(
"render",
StepName.BLENDER_STILL,
440,
100,
params=render_params,
node_type="renderNode",
label="Still Render",
),
_make_node("output", StepName.OUTPUT_SAVE, 660, 100, label="Save Output"),
]
edges = [
{"from": "setup", "to": "template"},
{"from": "template", "to": "render"},
{"from": "render", "to": "output"},
]
elif preset_type == "turntable":
nodes = [
_make_node("setup", StepName.ORDER_LINE_SETUP, 0, 100, label="Order Line Setup"),
_make_node("template", StepName.RESOLVE_TEMPLATE, 220, 100, label="Resolve Template"),
_make_node(
"turntable",
StepName.BLENDER_TURNTABLE,
440,
100,
params=render_params,
node_type="renderFramesNode",
label="Turntable Render",
),
_make_node("output", StepName.OUTPUT_SAVE, 660, 100, label="Save Output"),
]
edges = [
{"from": "setup", "to": "template"},
{"from": "template", "to": "turntable"},
{"from": "turntable", "to": "output"},
]
elif preset_type == "multi_angle":
angles = params.get("angles") or [0, 45, 90]
shared = deepcopy(render_params)
shared.pop("angles", None)
nodes = [
_make_node("setup", StepName.ORDER_LINE_SETUP, 0, 195, label="Order Line Setup"),
_make_node("template", StepName.RESOLVE_TEMPLATE, 220, 195, label="Resolve Template"),
]
edges = [
{"from": "setup", "to": "template"},
]
for index, angle in enumerate(angles):
node_id = f"render_{index}"
node_params = {**shared, "rotation_z": float(angle)}
nodes.append(
_make_node(
node_id,
StepName.BLENDER_STILL,
440,
index * 130,
params=node_params,
node_type="renderNode",
label=f"Render {angle}°",
)
)
edges.append({"from": "template", "to": node_id})
nodes.append(_make_node("output", StepName.OUTPUT_SAVE, 700, 195, label="Save Output"))
edges.extend({"from": f"render_{index}", "to": "output"} for index, _ in enumerate(angles))
elif preset_type == "still_with_exports":
nodes = [
_make_node("setup", StepName.ORDER_LINE_SETUP, 0, 100, label="Order Line Setup"),
_make_node("template", StepName.RESOLVE_TEMPLATE, 220, 100, label="Resolve Template"),
_make_node(
"render",
StepName.BLENDER_STILL,
440,
100,
params=render_params,
node_type="renderNode",
label="Still Render",
),
_make_node("output", StepName.OUTPUT_SAVE, 660, 70, label="Save Output"),
_make_node("blend", StepName.EXPORT_BLEND, 660, 160, label="Export Blend"),
]
edges = [
{"from": "setup", "to": "template"},
{"from": "template", "to": "render"},
{"from": "render", "to": "output"},
{"from": "render", "to": "blend"},
]
else:
nodes = [
_make_node("setup", StepName.ORDER_LINE_SETUP, 120, 140, label="Order Line Setup"),
]
edges = []
return {
"version": 1,
"nodes": nodes,
"edges": edges,
"ui": {"preset": preset_type},
}
def _canonicalize_legacy_custom_config(raw: dict[str, Any]) -> dict[str, Any]:
legacy_nodes = raw.get("nodes") or []
legacy_edges = raw.get("edges") or []
nodes: list[dict[str, Any]] = []
for legacy_node in legacy_nodes:
data = legacy_node.get("data") or {}
node_type = legacy_node.get("type")
step_name = data.get("pipeline_step") or _NODE_TYPE_TO_STEP.get(node_type) or StepName.BLENDER_STILL.value
nodes.append(
{
"id": legacy_node["id"],
"step": step_name,
"params": deepcopy(data.get("params") or {}),
"ui": {
"type": node_type,
"position": deepcopy(legacy_node.get("position") or {"x": 0, "y": 0}),
"label": data.get("label"),
},
}
)
edges: list[dict[str, Any]] = []
for index, legacy_edge in enumerate(legacy_edges):
source = legacy_edge.get("source") or legacy_edge.get("from")
target = legacy_edge.get("target") or legacy_edge.get("to")
if not source or not target:
continue
edges.append(
{
"id": legacy_edge.get("id") or f"edge_{index}",
"from": source,
"to": target,
}
)
return {
"version": 1,
"nodes": nodes,
"edges": edges,
"ui": {"preset": "custom"},
}
def canonicalize_workflow_config(raw: dict[str, Any]) -> dict[str, Any]:
if not isinstance(raw, dict):
raise ValueError("Workflow config must be a JSON object")
if "version" in raw and "nodes" in raw:
normalized = deepcopy(raw)
normalized.setdefault("edges", [])
return normalized
workflow_type = raw.get("type")
if workflow_type in _PRESET_TYPES:
if workflow_type == "custom":
return _canonicalize_legacy_custom_config(raw)
return build_preset_workflow_config(workflow_type, raw.get("params") or {})
raise ValueError("Unsupported workflow config format")
def get_workflow_preset_type(config: dict[str, Any]) -> str | None:
canonical = canonicalize_workflow_config(config)
ui = canonical.get("ui") or {}
preset = ui.get("preset")
if preset in _PRESET_TYPES:
return preset
return None
def extract_runtime_workflow(config: dict[str, Any]) -> tuple[str | None, dict[str, Any]]:
canonical = canonicalize_workflow_config(config)
preset = get_workflow_preset_type(canonical)
if preset is None or preset == "custom":
return preset, {}
nodes = canonical.get("nodes") or []
if preset in {"still", "still_with_exports"}:
for node in nodes:
if node.get("step") == StepName.BLENDER_STILL.value:
return preset, _resolution_to_dimensions(node.get("params") or {})
return preset, {}
if preset == "turntable":
for node in nodes:
if node.get("step") == StepName.BLENDER_TURNTABLE.value:
return preset, _resolution_to_dimensions(node.get("params") or {})
return preset, {}
if preset == "multi_angle":
render_nodes = [node for node in nodes if node.get("step") == StepName.BLENDER_STILL.value]
if not render_nodes:
return preset, {}
first_params = _resolution_to_dimensions(render_nodes[0].get("params") or {})
angles = [
float((node.get("params") or {}).get("rotation_z", 0))
for node in render_nodes
]
first_params["angles"] = angles
first_params.pop("rotation_z", None)
return preset, first_params
return preset, {}
@@ -28,6 +28,7 @@ from app.domains.rendering.schemas import (
WorkflowDefinitionOut,
WorkflowRunOut,
)
from app.domains.rendering.workflow_config_utils import canonicalize_workflow_config
from app.domains.rendering.workflow_schema import WorkflowConfig
from app.core.process_steps import StepName
@@ -90,6 +91,17 @@ class PipelineStepsResponse(BaseModel):
router = APIRouter(prefix="/api/workflows", tags=["workflows"])
def _workflow_to_out(wf: WorkflowDefinition) -> WorkflowDefinitionOut:
return WorkflowDefinitionOut(
id=wf.id,
name=wf.name,
output_type_id=wf.output_type_id,
config=canonicalize_workflow_config(wf.config),
is_active=wf.is_active,
created_at=wf.created_at,
)
@router.get("/pipeline-steps", response_model=PipelineStepsResponse)
async def get_pipeline_steps(
_user: User = Depends(require_admin_or_pm),
@@ -115,7 +127,7 @@ async def list_workflows(
result = await db.execute(
select(WorkflowDefinition).order_by(WorkflowDefinition.created_at)
)
return result.scalars().all()
return [_workflow_to_out(wf) for wf in result.scalars().all()]
@router.get("/{workflow_id}", response_model=WorkflowDefinitionOut)
@@ -130,7 +142,7 @@ async def get_workflow(
wf = result.scalar_one_or_none()
if not wf:
raise HTTPException(status_code=404, detail="Workflow definition not found")
return wf
return _workflow_to_out(wf)
@router.post("", response_model=WorkflowDefinitionOut, status_code=201)
@@ -139,21 +151,23 @@ async def create_workflow(
_user: User = Depends(require_global_admin),
db: AsyncSession = Depends(get_db),
):
normalized_config = canonicalize_workflow_config(body.config)
if body.config:
try:
WorkflowConfig.model_validate(body.config)
except ValidationError as exc:
raise HTTPException(status_code=422, detail=f"Invalid workflow config: {exc.errors()}")
WorkflowConfig.model_validate(normalized_config)
except (ValidationError, ValueError) as exc:
detail = exc.errors() if isinstance(exc, ValidationError) else str(exc)
raise HTTPException(status_code=422, detail=f"Invalid workflow config: {detail}")
wf = WorkflowDefinition(
name=body.name,
output_type_id=body.output_type_id,
config=body.config,
config=normalized_config,
is_active=body.is_active,
)
db.add(wf)
await db.commit()
await db.refresh(wf)
return wf
return _workflow_to_out(wf)
@router.put("/{workflow_id}", response_model=WorkflowDefinitionOut)
@@ -174,16 +188,18 @@ async def update_workflow(
wf.name = body.name
if body.config is not None:
try:
WorkflowConfig.model_validate(body.config)
except ValidationError as exc:
raise HTTPException(status_code=422, detail=f"Invalid workflow config: {exc.errors()}")
wf.config = body.config
normalized_config = canonicalize_workflow_config(body.config)
WorkflowConfig.model_validate(normalized_config)
except (ValidationError, ValueError) as exc:
detail = exc.errors() if isinstance(exc, ValidationError) else str(exc)
raise HTTPException(status_code=422, detail=f"Invalid workflow config: {detail}")
wf.config = normalized_config
if body.is_active is not None:
wf.is_active = body.is_active
await db.commit()
await db.refresh(wf)
return wf
return _workflow_to_out(wf)
@router.delete("/{workflow_id}", status_code=204)
@@ -21,10 +21,22 @@ from pydantic import BaseModel, Field, field_validator, model_validator
from app.core.process_steps import StepName
class WorkflowPosition(BaseModel):
x: float
y: float
class WorkflowNodeUI(BaseModel):
type: str | None = None
position: WorkflowPosition | None = None
label: str | None = None
class WorkflowNode(BaseModel):
id: str
step: StepName # validated against the StepName StrEnum
params: dict = {}
ui: WorkflowNodeUI | None = None
class WorkflowEdge(BaseModel):
@@ -35,10 +47,15 @@ class WorkflowEdge(BaseModel):
model_config = {"populate_by_name": True}
class WorkflowUI(BaseModel):
preset: str | None = None
class WorkflowConfig(BaseModel):
version: int = 1
nodes: list[WorkflowNode]
edges: list[WorkflowEdge] = []
ui: WorkflowUI | None = None
@field_validator("nodes")
@classmethod
@@ -0,0 +1,93 @@
from app.domains.rendering.workflow_config_utils import (
build_preset_workflow_config,
canonicalize_workflow_config,
extract_runtime_workflow,
)
def test_build_preset_workflow_config_creates_canonical_dag():
config = build_preset_workflow_config(
"still",
{"render_engine": "cycles", "samples": 256, "resolution": [1920, 1080]},
)
assert config["version"] == 1
assert config["ui"]["preset"] == "still"
assert [node["step"] for node in config["nodes"]] == [
"order_line_setup",
"resolve_template",
"blender_still",
"output_save",
]
render_node = next(node for node in config["nodes"] if node["step"] == "blender_still")
assert render_node["params"]["width"] == 1920
assert render_node["params"]["height"] == 1080
def test_canonicalize_workflow_config_migrates_legacy_preset():
legacy = {
"type": "turntable",
"params": {"render_engine": "cycles", "samples": 64, "fps": 24},
}
canonical = canonicalize_workflow_config(legacy)
assert canonical["version"] == 1
assert canonical["ui"]["preset"] == "turntable"
assert any(node["step"] == "blender_turntable" for node in canonical["nodes"])
def test_extract_runtime_workflow_uses_canonical_render_node_params():
config = build_preset_workflow_config(
"multi_angle",
{"render_engine": "cycles", "samples": 128, "angles": [0, 45, 90]},
)
preset, params = extract_runtime_workflow(config)
assert preset == "multi_angle"
assert params["render_engine"] == "cycles"
assert params["samples"] == 128
assert params["angles"] == [0.0, 45.0, 90.0]
def test_canonicalize_legacy_custom_config_preserves_edges():
legacy = {
"type": "custom",
"nodes": [
{
"id": "input",
"type": "inputNode",
"position": {"x": 0, "y": 0},
"data": {"label": "Input", "pipeline_step": "resolve_step_path", "params": {}},
},
{
"id": "render",
"type": "renderNode",
"position": {"x": 200, "y": 0},
"data": {"label": "Render", "pipeline_step": "blender_still", "params": {"resolution": [1024, 1024]}},
},
],
"edges": [
{"id": "e1", "source": "input", "target": "render"},
],
}
canonical = canonicalize_workflow_config(legacy)
assert canonical["ui"]["preset"] == "custom"
assert canonical["edges"] == [{"id": "e1", "from": "input", "to": "render"}]
def test_extract_runtime_workflow_converts_resolution_to_dimensions():
config = build_preset_workflow_config(
"turntable",
{"resolution": [1920, 1080], "fps": 24},
)
preset, params = extract_runtime_workflow(config)
assert preset == "turntable"
assert params["width"] == 1920
assert params["height"] == 1080
assert "resolution" not in params