feat(C1+C2): workflow system — WorkflowDefinition + Celery Canvas builder

Migrations 037 (workflow tables + 3 seed definitions) + 038 (output_types.workflow_definition_id).
WorkflowDefinition/Run/NodeResult SQLAlchemy models in domains/rendering/models.py.
workflow_builder.py: dispatch_workflow() with Celery Canvas for still/turntable/multi_angle.
workflow_router.py: CRUD endpoints at /api/workflows (admin/PM guards).
dispatch_service.py: dispatch_render_with_workflow() prefers workflow path when
  OutputType.workflow_definition_id is set, falls back to legacy dispatch otherwise.
main.py: registers workflows_router.
models/__init__.py: re-exports WorkflowDefinition, WorkflowRun, WorkflowNodeResult.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-06 17:07:21 +01:00
parent 217555025f
commit 7e47e4aca7
9 changed files with 512 additions and 1 deletions
@@ -0,0 +1,118 @@
"""Workflow-aware render dispatch service.
C2: extends the legacy dispatch_render path with WorkflowDefinition support.
If an OutputType has workflow_definition_id set:
- Loads the WorkflowDefinition
- Calls dispatch_workflow() to build + submit a Celery Canvas
- Creates a WorkflowRun record tracking the submission
If no workflow_definition_id is set, falls back to the existing direct
task-dispatch logic in app.services.render_dispatcher (legacy path).
"""
from __future__ import annotations
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
def dispatch_render_with_workflow(order_line_id: str) -> dict:
"""Dispatch a render for the given order line.
Checks whether the associated OutputType has a WorkflowDefinition linked.
If yes, uses the Celery Canvas workflow builder.
If no, falls back to the legacy direct-dispatch logic.
This function is synchronous (Celery-task-safe).
"""
from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session, selectinload
from app.config import settings
from app.domains.orders.models import OrderLine
from app.domains.rendering.models import OutputType, WorkflowDefinition, WorkflowRun
engine = create_engine(
settings.database_url.replace("+asyncpg", ""),
pool_pre_ping=True,
)
with Session(engine) as session:
# Load order line with its output_type
line = session.execute(
select(OrderLine)
.where(OrderLine.id == order_line_id)
.options(selectinload(OrderLine.output_type))
).scalar_one_or_none()
if not line:
raise ValueError(f"OrderLine {order_line_id} not found")
output_type: OutputType | None = line.output_type
if output_type is None or output_type.workflow_definition_id is None:
# Legacy path — no workflow definition linked
logger.info(
"order_line %s: no workflow_definition_id, using legacy dispatch",
order_line_id,
)
return _legacy_dispatch(order_line_id)
# Load the linked WorkflowDefinition
wf_def: WorkflowDefinition | None = session.execute(
select(WorkflowDefinition).where(
WorkflowDefinition.id == output_type.workflow_definition_id,
WorkflowDefinition.is_active.is_(True),
)
).scalar_one_or_none()
if wf_def is None:
logger.warning(
"order_line %s: workflow_definition_id %s not found or inactive, "
"falling back to legacy dispatch",
order_line_id,
output_type.workflow_definition_id,
)
return _legacy_dispatch(order_line_id)
workflow_type = wf_def.config.get("type")
params = wf_def.config.get("params", {})
logger.info(
"order_line %s: dispatching via WorkflowDefinition %s (type=%s)",
order_line_id,
wf_def.id,
workflow_type,
)
from app.domains.rendering.workflow_builder import dispatch_workflow
celery_task_id = dispatch_workflow(workflow_type, order_line_id, params)
# Persist a WorkflowRun record
run = WorkflowRun(
workflow_def_id=wf_def.id,
order_line_id=line.id,
celery_task_id=celery_task_id,
status="pending",
started_at=datetime.utcnow(),
)
session.add(run)
session.commit()
return {
"backend": "workflow",
"workflow_type": workflow_type,
"workflow_run_id": str(run.id),
"celery_task_id": celery_task_id,
}
def _legacy_dispatch(order_line_id: str) -> dict:
"""Delegate to the original render_dispatcher logic (kept for backward compat)."""
# Import the original full implementation (not the shim) to avoid circular imports.
# The original logic lives inline in the orders router / step_tasks path;
# here we re-use the existing flamenco/celery routing code.
from app.services.render_dispatcher import dispatch_render # noqa: F401 — shim re-export
return dispatch_render(order_line_id)
+63
View File
@@ -35,6 +35,10 @@ class OutputType(Base):
DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False
)
workflow_definition_id: Mapped[uuid.UUID | None] = mapped_column(
UUID(as_uuid=True), ForeignKey("workflow_definitions.id", ondelete="SET NULL"), nullable=True
)
order_lines: Mapped[list["OrderLine"]] = relationship("OrderLine", back_populates="output_type")
pricing_tier: Mapped["PricingTier | None"] = relationship("PricingTier", back_populates="output_types")
@@ -85,3 +89,62 @@ class ProductRenderPosition(Base):
product: Mapped["Product"] = relationship("Product", back_populates="render_positions")
order_lines: Mapped[list["OrderLine"]] = relationship("OrderLine", back_populates="render_position")
class WorkflowDefinition(Base):
__tablename__ = "workflow_definitions"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name: Mapped[str] = mapped_column(String(200), nullable=False)
output_type_id: Mapped[uuid.UUID | None] = mapped_column(
UUID(as_uuid=True), ForeignKey("output_types.id", ondelete="SET NULL"), nullable=True
)
config: Mapped[dict] = mapped_column(JSONB, nullable=False, default=dict)
is_active: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow, nullable=False)
runs: Mapped[list["WorkflowRun"]] = relationship(
"WorkflowRun", back_populates="workflow_def", lazy="noload", cascade="all, delete-orphan"
)
class WorkflowRun(Base):
__tablename__ = "workflow_runs"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
workflow_def_id: Mapped[uuid.UUID | None] = mapped_column(
UUID(as_uuid=True), ForeignKey("workflow_definitions.id", ondelete="SET NULL"), nullable=True
)
order_line_id: Mapped[uuid.UUID | None] = mapped_column(
UUID(as_uuid=True), ForeignKey("order_lines.id", ondelete="CASCADE"), nullable=True, index=True
)
celery_task_id: Mapped[str | None] = mapped_column(String(500), nullable=True)
status: Mapped[str] = mapped_column(String(50), nullable=False, default="pending")
started_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
completed_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
error_message: Mapped[str | None] = mapped_column(Text, nullable=True)
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow, nullable=False)
workflow_def: Mapped["WorkflowDefinition | None"] = relationship(
"WorkflowDefinition", back_populates="runs"
)
node_results: Mapped[list["WorkflowNodeResult"]] = relationship(
"WorkflowNodeResult", back_populates="run", cascade="all, delete-orphan"
)
class WorkflowNodeResult(Base):
__tablename__ = "workflow_node_results"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
run_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), ForeignKey("workflow_runs.id", ondelete="CASCADE"), nullable=False, index=True
)
node_name: Mapped[str] = mapped_column(String(200), nullable=False)
status: Mapped[str] = mapped_column(String(50), nullable=False, default="pending")
output: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
log: Mapped[str | None] = mapped_column(Text, nullable=True)
duration_s: Mapped[float | None] = mapped_column(Float, nullable=True)
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow, nullable=False)
run: Mapped["WorkflowRun"] = relationship("WorkflowRun", back_populates="node_results")
+48
View File
@@ -89,3 +89,51 @@ class RenderPositionOut(BaseModel):
updated_at: datetime
model_config = {"from_attributes": True}
class WorkflowDefinitionCreate(BaseModel):
name: str
output_type_id: uuid.UUID | None = None
config: dict
is_active: bool = True
class WorkflowDefinitionUpdate(BaseModel):
name: str | None = None
config: dict | None = None
is_active: bool | None = None
class WorkflowDefinitionOut(BaseModel):
id: uuid.UUID
name: str
output_type_id: uuid.UUID | None
config: dict
is_active: bool
created_at: datetime
model_config = {"from_attributes": True}
class WorkflowNodeResultOut(BaseModel):
id: uuid.UUID
node_name: str
status: str
output: dict | None
log: str | None
duration_s: float | None
created_at: datetime
model_config = {"from_attributes": True}
class WorkflowRunOut(BaseModel):
id: uuid.UUID
workflow_def_id: uuid.UUID | None
order_line_id: uuid.UUID | None
celery_task_id: str | None
status: str
started_at: datetime | None
completed_at: datetime | None
error_message: str | None
created_at: datetime
node_results: list[WorkflowNodeResultOut] = []
model_config = {"from_attributes": True}
@@ -0,0 +1,53 @@
"""Celery Canvas workflow builder.
Translates WorkflowDefinition config into a Celery Canvas (chain/group/chord).
"""
from __future__ import annotations
import logging
from celery import chain, group
logger = logging.getLogger(__name__)
def dispatch_workflow(
workflow_type: str,
order_line_id: str,
params: dict | None = None,
) -> str:
"""Build and dispatch a Celery Canvas workflow. Returns the Celery task/group ID."""
params = params or {}
builders = {
"still": _build_still,
"turntable": _build_turntable,
"multi_angle": _build_multi_angle,
}
builder = builders.get(workflow_type)
if not builder:
raise ValueError(f"Unknown workflow type: {workflow_type!r}")
canvas = builder(order_line_id, params)
result = canvas.apply_async()
return str(result.id)
def _build_still(order_line_id: str, params: dict):
from app.domains.rendering.tasks import render_still_task
return chain(
render_still_task.si(order_line_id, **params)
)
def _build_turntable(order_line_id: str, params: dict):
from app.domains.rendering.tasks import render_turntable_task
return chain(
render_turntable_task.si(order_line_id, **params)
)
def _build_multi_angle(order_line_id: str, params: dict):
from app.domains.rendering.tasks import render_still_task
angles = params.get("angles", [0, 45, 90])
p = {k: v for k, v in params.items() if k != "angles"}
return group(
render_still_task.si(order_line_id, camera_angle=angle, **p)
for angle in angles
)
@@ -0,0 +1,137 @@
"""Workflow definition CRUD API.
Endpoints:
GET /api/workflows/ — list all workflow definitions (admin/PM)
GET /api/workflows/{id} — get single definition (admin/PM)
POST /api/workflows/ — create definition (admin only)
PUT /api/workflows/{id} — update definition (admin only)
DELETE /api/workflows/{id} — delete definition (admin only)
GET /api/workflows/{id}/runs — list runs for a definition (admin/PM)
"""
import uuid
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.domains.auth.models import User
from app.utils.auth import get_current_user, require_admin, require_admin_or_pm
from app.domains.rendering.models import WorkflowDefinition, WorkflowRun
from app.domains.rendering.schemas import (
WorkflowDefinitionCreate,
WorkflowDefinitionUpdate,
WorkflowDefinitionOut,
WorkflowRunOut,
)
router = APIRouter(prefix="/api/workflows", tags=["workflows"])
@router.get("/", response_model=list[WorkflowDefinitionOut])
async def list_workflows(
_user: User = Depends(require_admin_or_pm),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
select(WorkflowDefinition).order_by(WorkflowDefinition.created_at)
)
return result.scalars().all()
@router.get("/{workflow_id}", response_model=WorkflowDefinitionOut)
async def get_workflow(
workflow_id: uuid.UUID,
_user: User = Depends(require_admin_or_pm),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id)
)
wf = result.scalar_one_or_none()
if not wf:
raise HTTPException(status_code=404, detail="Workflow definition not found")
return wf
@router.post("/", response_model=WorkflowDefinitionOut, status_code=201)
async def create_workflow(
body: WorkflowDefinitionCreate,
_user: User = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
wf = WorkflowDefinition(
name=body.name,
output_type_id=body.output_type_id,
config=body.config,
is_active=body.is_active,
)
db.add(wf)
await db.commit()
await db.refresh(wf)
return wf
@router.put("/{workflow_id}", response_model=WorkflowDefinitionOut)
async def update_workflow(
workflow_id: uuid.UUID,
body: WorkflowDefinitionUpdate,
_user: User = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id)
)
wf = result.scalar_one_or_none()
if not wf:
raise HTTPException(status_code=404, detail="Workflow definition not found")
if body.name is not None:
wf.name = body.name
if body.config is not None:
wf.config = body.config
if body.is_active is not None:
wf.is_active = body.is_active
await db.commit()
await db.refresh(wf)
return wf
@router.delete("/{workflow_id}", status_code=204)
async def delete_workflow(
workflow_id: uuid.UUID,
_user: User = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id)
)
wf = result.scalar_one_or_none()
if not wf:
raise HTTPException(status_code=404, detail="Workflow definition not found")
await db.delete(wf)
await db.commit()
@router.get("/{workflow_id}/runs", response_model=list[WorkflowRunOut])
async def list_workflow_runs(
workflow_id: uuid.UUID,
_user: User = Depends(require_admin_or_pm),
db: AsyncSession = Depends(get_db),
):
# Verify the workflow exists
wf_result = await db.execute(
select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id)
)
if not wf_result.scalar_one_or_none():
raise HTTPException(status_code=404, detail="Workflow definition not found")
result = await db.execute(
select(WorkflowRun)
.where(WorkflowRun.workflow_def_id == workflow_id)
.options(selectinload(WorkflowRun.node_results))
.order_by(WorkflowRun.created_at.desc())
)
return result.scalars().all()