From 7e47e4aca7e53d4b1381b553e4cf6151f5deb8fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hartmut=20N=C3=B6renberg?= Date: Fri, 6 Mar 2026 17:07:21 +0100 Subject: [PATCH] =?UTF-8?q?feat(C1+C2):=20workflow=20system=20=E2=80=94=20?= =?UTF-8?q?WorkflowDefinition=20+=20Celery=20Canvas=20builder?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Migrations 037 (workflow tables + 3 seed definitions) + 038 (output_types.workflow_definition_id). WorkflowDefinition/Run/NodeResult SQLAlchemy models in domains/rendering/models.py. workflow_builder.py: dispatch_workflow() with Celery Canvas for still/turntable/multi_angle. workflow_router.py: CRUD endpoints at /api/workflows (admin/PM guards). dispatch_service.py: dispatch_render_with_workflow() prefers workflow path when OutputType.workflow_definition_id is set, falls back to legacy dispatch otherwise. main.py: registers workflows_router. models/__init__.py: re-exports WorkflowDefinition, WorkflowRun, WorkflowNodeResult. Co-Authored-By: Claude Sonnet 4.6 --- .../versions/037_workflow_definitions.py | 64 ++++++++ .../versions/038_output_type_workflow_fk.py | 25 ++++ .../app/domains/rendering/dispatch_service.py | 118 +++++++++++++++ backend/app/domains/rendering/models.py | 63 ++++++++ backend/app/domains/rendering/schemas.py | 48 ++++++ .../app/domains/rendering/workflow_builder.py | 53 +++++++ .../app/domains/rendering/workflow_router.py | 137 ++++++++++++++++++ backend/app/main.py | 2 + backend/app/models/__init__.py | 3 +- 9 files changed, 512 insertions(+), 1 deletion(-) create mode 100644 backend/alembic/versions/037_workflow_definitions.py create mode 100644 backend/alembic/versions/038_output_type_workflow_fk.py create mode 100644 backend/app/domains/rendering/dispatch_service.py create mode 100644 backend/app/domains/rendering/workflow_builder.py create mode 100644 backend/app/domains/rendering/workflow_router.py diff --git a/backend/alembic/versions/037_workflow_definitions.py b/backend/alembic/versions/037_workflow_definitions.py new file mode 100644 index 0000000..67b6100 --- /dev/null +++ b/backend/alembic/versions/037_workflow_definitions.py @@ -0,0 +1,64 @@ +"""Add workflow_definitions, workflow_runs, workflow_node_results tables. + +Revision ID: 037 +Revises: 036 +""" +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects.postgresql import UUID, JSONB + +revision = '037' +down_revision = '036' +branch_labels = None +depends_on = None + + +def upgrade(): + op.create_table('workflow_definitions', + sa.Column('id', UUID(as_uuid=True), primary_key=True, server_default=sa.text('gen_random_uuid()')), + sa.Column('name', sa.String(200), nullable=False), + sa.Column('output_type_id', UUID(as_uuid=True), sa.ForeignKey('output_types.id', ondelete='SET NULL'), nullable=True), + sa.Column('config', JSONB, nullable=False, server_default='{}'), + sa.Column('is_active', sa.Boolean, nullable=False, server_default='true'), + sa.Column('created_at', sa.DateTime, nullable=False, server_default=sa.text('NOW()')), + ) + + op.create_table('workflow_runs', + sa.Column('id', UUID(as_uuid=True), primary_key=True, server_default=sa.text('gen_random_uuid()')), + sa.Column('workflow_def_id', UUID(as_uuid=True), sa.ForeignKey('workflow_definitions.id', ondelete='SET NULL'), nullable=True), + sa.Column('order_line_id', UUID(as_uuid=True), sa.ForeignKey('order_lines.id', ondelete='CASCADE'), nullable=True), + sa.Column('celery_task_id', sa.String(500), nullable=True), + sa.Column('status', sa.String(50), nullable=False, server_default='pending'), + sa.Column('started_at', sa.DateTime, nullable=True), + sa.Column('completed_at', sa.DateTime, nullable=True), + sa.Column('error_message', sa.Text, nullable=True), + sa.Column('created_at', sa.DateTime, nullable=False, server_default=sa.text('NOW()')), + ) + op.create_index('ix_workflow_runs_order_line', 'workflow_runs', ['order_line_id']) + op.create_index('ix_workflow_runs_status', 'workflow_runs', ['status']) + + op.create_table('workflow_node_results', + sa.Column('id', UUID(as_uuid=True), primary_key=True, server_default=sa.text('gen_random_uuid()')), + sa.Column('run_id', UUID(as_uuid=True), sa.ForeignKey('workflow_runs.id', ondelete='CASCADE'), nullable=False), + sa.Column('node_name', sa.String(200), nullable=False), + sa.Column('status', sa.String(50), nullable=False, server_default='pending'), + sa.Column('output', JSONB, nullable=True), + sa.Column('log', sa.Text, nullable=True), + sa.Column('duration_s', sa.Float, nullable=True), + sa.Column('created_at', sa.DateTime, nullable=False, server_default=sa.text('NOW()')), + ) + op.create_index('ix_workflow_node_results_run', 'workflow_node_results', ['run_id']) + + # Seed standard workflow definitions + op.execute(""" + INSERT INTO workflow_definitions (name, config, is_active) VALUES + ('Still-Render', '{"type": "still", "params": {"render_engine": "cycles", "samples": 256, "resolution": [2048, 2048]}}', true), + ('Turntable-Animation', '{"type": "turntable", "params": {"render_engine": "cycles", "samples": 64, "fps": 24, "duration_s": 5}}', true), + ('Multi-Angle', '{"type": "multi_angle", "params": {"render_engine": "cycles", "samples": 128, "angles": [0, 45, 90, 135, 180]}}', true) + """) + + +def downgrade(): + op.drop_table('workflow_node_results') + op.drop_table('workflow_runs') + op.drop_table('workflow_definitions') diff --git a/backend/alembic/versions/038_output_type_workflow_fk.py b/backend/alembic/versions/038_output_type_workflow_fk.py new file mode 100644 index 0000000..0174af9 --- /dev/null +++ b/backend/alembic/versions/038_output_type_workflow_fk.py @@ -0,0 +1,25 @@ +"""Add workflow_definition_id FK to output_types. + +Revision ID: 038 +Revises: 037 +""" +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects.postgresql import UUID + +revision = '038' +down_revision = '037' +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column('output_types', + sa.Column('workflow_definition_id', UUID(as_uuid=True), + sa.ForeignKey('workflow_definitions.id', ondelete='SET NULL'), + nullable=True) + ) + + +def downgrade(): + op.drop_column('output_types', 'workflow_definition_id') diff --git a/backend/app/domains/rendering/dispatch_service.py b/backend/app/domains/rendering/dispatch_service.py new file mode 100644 index 0000000..9928232 --- /dev/null +++ b/backend/app/domains/rendering/dispatch_service.py @@ -0,0 +1,118 @@ +"""Workflow-aware render dispatch service. + +C2: extends the legacy dispatch_render path with WorkflowDefinition support. + +If an OutputType has workflow_definition_id set: + - Loads the WorkflowDefinition + - Calls dispatch_workflow() to build + submit a Celery Canvas + - Creates a WorkflowRun record tracking the submission + +If no workflow_definition_id is set, falls back to the existing direct +task-dispatch logic in app.services.render_dispatcher (legacy path). +""" +from __future__ import annotations + +import logging +from datetime import datetime + +logger = logging.getLogger(__name__) + + +def dispatch_render_with_workflow(order_line_id: str) -> dict: + """Dispatch a render for the given order line. + + Checks whether the associated OutputType has a WorkflowDefinition linked. + If yes, uses the Celery Canvas workflow builder. + If no, falls back to the legacy direct-dispatch logic. + + This function is synchronous (Celery-task-safe). + """ + from sqlalchemy import create_engine, select + from sqlalchemy.orm import Session, selectinload + + from app.config import settings + from app.domains.orders.models import OrderLine + from app.domains.rendering.models import OutputType, WorkflowDefinition, WorkflowRun + + engine = create_engine( + settings.database_url.replace("+asyncpg", ""), + pool_pre_ping=True, + ) + + with Session(engine) as session: + # Load order line with its output_type + line = session.execute( + select(OrderLine) + .where(OrderLine.id == order_line_id) + .options(selectinload(OrderLine.output_type)) + ).scalar_one_or_none() + + if not line: + raise ValueError(f"OrderLine {order_line_id} not found") + + output_type: OutputType | None = line.output_type + + if output_type is None or output_type.workflow_definition_id is None: + # Legacy path — no workflow definition linked + logger.info( + "order_line %s: no workflow_definition_id, using legacy dispatch", + order_line_id, + ) + return _legacy_dispatch(order_line_id) + + # Load the linked WorkflowDefinition + wf_def: WorkflowDefinition | None = session.execute( + select(WorkflowDefinition).where( + WorkflowDefinition.id == output_type.workflow_definition_id, + WorkflowDefinition.is_active.is_(True), + ) + ).scalar_one_or_none() + + if wf_def is None: + logger.warning( + "order_line %s: workflow_definition_id %s not found or inactive, " + "falling back to legacy dispatch", + order_line_id, + output_type.workflow_definition_id, + ) + return _legacy_dispatch(order_line_id) + + workflow_type = wf_def.config.get("type") + params = wf_def.config.get("params", {}) + + logger.info( + "order_line %s: dispatching via WorkflowDefinition %s (type=%s)", + order_line_id, + wf_def.id, + workflow_type, + ) + + from app.domains.rendering.workflow_builder import dispatch_workflow + celery_task_id = dispatch_workflow(workflow_type, order_line_id, params) + + # Persist a WorkflowRun record + run = WorkflowRun( + workflow_def_id=wf_def.id, + order_line_id=line.id, + celery_task_id=celery_task_id, + status="pending", + started_at=datetime.utcnow(), + ) + session.add(run) + session.commit() + + return { + "backend": "workflow", + "workflow_type": workflow_type, + "workflow_run_id": str(run.id), + "celery_task_id": celery_task_id, + } + + +def _legacy_dispatch(order_line_id: str) -> dict: + """Delegate to the original render_dispatcher logic (kept for backward compat).""" + # Import the original full implementation (not the shim) to avoid circular imports. + # The original logic lives inline in the orders router / step_tasks path; + # here we re-use the existing flamenco/celery routing code. + from app.services.render_dispatcher import dispatch_render # noqa: F401 — shim re-export + return dispatch_render(order_line_id) diff --git a/backend/app/domains/rendering/models.py b/backend/app/domains/rendering/models.py index 4363f70..2cdfb7d 100644 --- a/backend/app/domains/rendering/models.py +++ b/backend/app/domains/rendering/models.py @@ -35,6 +35,10 @@ class OutputType(Base): DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False ) + workflow_definition_id: Mapped[uuid.UUID | None] = mapped_column( + UUID(as_uuid=True), ForeignKey("workflow_definitions.id", ondelete="SET NULL"), nullable=True + ) + order_lines: Mapped[list["OrderLine"]] = relationship("OrderLine", back_populates="output_type") pricing_tier: Mapped["PricingTier | None"] = relationship("PricingTier", back_populates="output_types") @@ -85,3 +89,62 @@ class ProductRenderPosition(Base): product: Mapped["Product"] = relationship("Product", back_populates="render_positions") order_lines: Mapped[list["OrderLine"]] = relationship("OrderLine", back_populates="render_position") + + +class WorkflowDefinition(Base): + __tablename__ = "workflow_definitions" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + name: Mapped[str] = mapped_column(String(200), nullable=False) + output_type_id: Mapped[uuid.UUID | None] = mapped_column( + UUID(as_uuid=True), ForeignKey("output_types.id", ondelete="SET NULL"), nullable=True + ) + config: Mapped[dict] = mapped_column(JSONB, nullable=False, default=dict) + is_active: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True) + created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow, nullable=False) + + runs: Mapped[list["WorkflowRun"]] = relationship( + "WorkflowRun", back_populates="workflow_def", lazy="noload", cascade="all, delete-orphan" + ) + + +class WorkflowRun(Base): + __tablename__ = "workflow_runs" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + workflow_def_id: Mapped[uuid.UUID | None] = mapped_column( + UUID(as_uuid=True), ForeignKey("workflow_definitions.id", ondelete="SET NULL"), nullable=True + ) + order_line_id: Mapped[uuid.UUID | None] = mapped_column( + UUID(as_uuid=True), ForeignKey("order_lines.id", ondelete="CASCADE"), nullable=True, index=True + ) + celery_task_id: Mapped[str | None] = mapped_column(String(500), nullable=True) + status: Mapped[str] = mapped_column(String(50), nullable=False, default="pending") + started_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True) + completed_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True) + error_message: Mapped[str | None] = mapped_column(Text, nullable=True) + created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow, nullable=False) + + workflow_def: Mapped["WorkflowDefinition | None"] = relationship( + "WorkflowDefinition", back_populates="runs" + ) + node_results: Mapped[list["WorkflowNodeResult"]] = relationship( + "WorkflowNodeResult", back_populates="run", cascade="all, delete-orphan" + ) + + +class WorkflowNodeResult(Base): + __tablename__ = "workflow_node_results" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + run_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), ForeignKey("workflow_runs.id", ondelete="CASCADE"), nullable=False, index=True + ) + node_name: Mapped[str] = mapped_column(String(200), nullable=False) + status: Mapped[str] = mapped_column(String(50), nullable=False, default="pending") + output: Mapped[dict | None] = mapped_column(JSONB, nullable=True) + log: Mapped[str | None] = mapped_column(Text, nullable=True) + duration_s: Mapped[float | None] = mapped_column(Float, nullable=True) + created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow, nullable=False) + + run: Mapped["WorkflowRun"] = relationship("WorkflowRun", back_populates="node_results") diff --git a/backend/app/domains/rendering/schemas.py b/backend/app/domains/rendering/schemas.py index 98f32bc..73549cf 100644 --- a/backend/app/domains/rendering/schemas.py +++ b/backend/app/domains/rendering/schemas.py @@ -89,3 +89,51 @@ class RenderPositionOut(BaseModel): updated_at: datetime model_config = {"from_attributes": True} + + +class WorkflowDefinitionCreate(BaseModel): + name: str + output_type_id: uuid.UUID | None = None + config: dict + is_active: bool = True + + +class WorkflowDefinitionUpdate(BaseModel): + name: str | None = None + config: dict | None = None + is_active: bool | None = None + + +class WorkflowDefinitionOut(BaseModel): + id: uuid.UUID + name: str + output_type_id: uuid.UUID | None + config: dict + is_active: bool + created_at: datetime + model_config = {"from_attributes": True} + + +class WorkflowNodeResultOut(BaseModel): + id: uuid.UUID + node_name: str + status: str + output: dict | None + log: str | None + duration_s: float | None + created_at: datetime + model_config = {"from_attributes": True} + + +class WorkflowRunOut(BaseModel): + id: uuid.UUID + workflow_def_id: uuid.UUID | None + order_line_id: uuid.UUID | None + celery_task_id: str | None + status: str + started_at: datetime | None + completed_at: datetime | None + error_message: str | None + created_at: datetime + node_results: list[WorkflowNodeResultOut] = [] + model_config = {"from_attributes": True} diff --git a/backend/app/domains/rendering/workflow_builder.py b/backend/app/domains/rendering/workflow_builder.py new file mode 100644 index 0000000..9fc0e0e --- /dev/null +++ b/backend/app/domains/rendering/workflow_builder.py @@ -0,0 +1,53 @@ +"""Celery Canvas workflow builder. + +Translates WorkflowDefinition config into a Celery Canvas (chain/group/chord). +""" +from __future__ import annotations +import logging +from celery import chain, group + +logger = logging.getLogger(__name__) + + +def dispatch_workflow( + workflow_type: str, + order_line_id: str, + params: dict | None = None, +) -> str: + """Build and dispatch a Celery Canvas workflow. Returns the Celery task/group ID.""" + params = params or {} + builders = { + "still": _build_still, + "turntable": _build_turntable, + "multi_angle": _build_multi_angle, + } + builder = builders.get(workflow_type) + if not builder: + raise ValueError(f"Unknown workflow type: {workflow_type!r}") + canvas = builder(order_line_id, params) + result = canvas.apply_async() + return str(result.id) + + +def _build_still(order_line_id: str, params: dict): + from app.domains.rendering.tasks import render_still_task + return chain( + render_still_task.si(order_line_id, **params) + ) + + +def _build_turntable(order_line_id: str, params: dict): + from app.domains.rendering.tasks import render_turntable_task + return chain( + render_turntable_task.si(order_line_id, **params) + ) + + +def _build_multi_angle(order_line_id: str, params: dict): + from app.domains.rendering.tasks import render_still_task + angles = params.get("angles", [0, 45, 90]) + p = {k: v for k, v in params.items() if k != "angles"} + return group( + render_still_task.si(order_line_id, camera_angle=angle, **p) + for angle in angles + ) diff --git a/backend/app/domains/rendering/workflow_router.py b/backend/app/domains/rendering/workflow_router.py new file mode 100644 index 0000000..91ab8a1 --- /dev/null +++ b/backend/app/domains/rendering/workflow_router.py @@ -0,0 +1,137 @@ +"""Workflow definition CRUD API. + +Endpoints: + GET /api/workflows/ — list all workflow definitions (admin/PM) + GET /api/workflows/{id} — get single definition (admin/PM) + POST /api/workflows/ — create definition (admin only) + PUT /api/workflows/{id} — update definition (admin only) + DELETE /api/workflows/{id} — delete definition (admin only) + GET /api/workflows/{id}/runs — list runs for a definition (admin/PM) +""" +import uuid + +from fastapi import APIRouter, Depends, HTTPException +from sqlalchemy import select +from sqlalchemy.orm import selectinload +from sqlalchemy.ext.asyncio import AsyncSession + +from app.database import get_db +from app.domains.auth.models import User +from app.utils.auth import get_current_user, require_admin, require_admin_or_pm +from app.domains.rendering.models import WorkflowDefinition, WorkflowRun +from app.domains.rendering.schemas import ( + WorkflowDefinitionCreate, + WorkflowDefinitionUpdate, + WorkflowDefinitionOut, + WorkflowRunOut, +) + +router = APIRouter(prefix="/api/workflows", tags=["workflows"]) + + +@router.get("/", response_model=list[WorkflowDefinitionOut]) +async def list_workflows( + _user: User = Depends(require_admin_or_pm), + db: AsyncSession = Depends(get_db), +): + result = await db.execute( + select(WorkflowDefinition).order_by(WorkflowDefinition.created_at) + ) + return result.scalars().all() + + +@router.get("/{workflow_id}", response_model=WorkflowDefinitionOut) +async def get_workflow( + workflow_id: uuid.UUID, + _user: User = Depends(require_admin_or_pm), + db: AsyncSession = Depends(get_db), +): + result = await db.execute( + select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id) + ) + wf = result.scalar_one_or_none() + if not wf: + raise HTTPException(status_code=404, detail="Workflow definition not found") + return wf + + +@router.post("/", response_model=WorkflowDefinitionOut, status_code=201) +async def create_workflow( + body: WorkflowDefinitionCreate, + _user: User = Depends(require_admin), + db: AsyncSession = Depends(get_db), +): + wf = WorkflowDefinition( + name=body.name, + output_type_id=body.output_type_id, + config=body.config, + is_active=body.is_active, + ) + db.add(wf) + await db.commit() + await db.refresh(wf) + return wf + + +@router.put("/{workflow_id}", response_model=WorkflowDefinitionOut) +async def update_workflow( + workflow_id: uuid.UUID, + body: WorkflowDefinitionUpdate, + _user: User = Depends(require_admin), + db: AsyncSession = Depends(get_db), +): + result = await db.execute( + select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id) + ) + wf = result.scalar_one_or_none() + if not wf: + raise HTTPException(status_code=404, detail="Workflow definition not found") + + if body.name is not None: + wf.name = body.name + if body.config is not None: + wf.config = body.config + if body.is_active is not None: + wf.is_active = body.is_active + + await db.commit() + await db.refresh(wf) + return wf + + +@router.delete("/{workflow_id}", status_code=204) +async def delete_workflow( + workflow_id: uuid.UUID, + _user: User = Depends(require_admin), + db: AsyncSession = Depends(get_db), +): + result = await db.execute( + select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id) + ) + wf = result.scalar_one_or_none() + if not wf: + raise HTTPException(status_code=404, detail="Workflow definition not found") + await db.delete(wf) + await db.commit() + + +@router.get("/{workflow_id}/runs", response_model=list[WorkflowRunOut]) +async def list_workflow_runs( + workflow_id: uuid.UUID, + _user: User = Depends(require_admin_or_pm), + db: AsyncSession = Depends(get_db), +): + # Verify the workflow exists + wf_result = await db.execute( + select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id) + ) + if not wf_result.scalar_one_or_none(): + raise HTTPException(status_code=404, detail="Workflow definition not found") + + result = await db.execute( + select(WorkflowRun) + .where(WorkflowRun.workflow_def_id == workflow_id) + .options(selectinload(WorkflowRun.node_results)) + .order_by(WorkflowRun.created_at.desc()) + ) + return result.scalars().all() diff --git a/backend/app/main.py b/backend/app/main.py index b1c5a4e..8fc8682 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -18,6 +18,7 @@ from app.domains.rendering.router import render_templates_router, output_types_r from app.domains.notifications.router import router as notifications_router from app.domains.billing.router import router as pricing_router from app.domains.tenants.router import router as tenants_router +from app.domains.rendering.workflow_router import router as workflows_router @asynccontextmanager @@ -76,6 +77,7 @@ app.include_router(output_types_router, prefix="/api") app.include_router(render_templates_router, prefix="/api") app.include_router(notifications_router, prefix="/api") app.include_router(tenants_router, prefix="/api") +app.include_router(workflows_router) @app.get("/health") diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index 1b26881..b57ccb3 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -10,7 +10,7 @@ from app.domains.products.models import CadFile, Product from app.domains.orders.models import Order, OrderItem, OrderLine from app.domains.notifications.models import AuditLog from app.domains.billing.models import PricingTier -from app.domains.rendering.models import OutputType, RenderTemplate, ProductRenderPosition +from app.domains.rendering.models import OutputType, RenderTemplate, ProductRenderPosition, WorkflowDefinition, WorkflowRun, WorkflowNodeResult from app.domains.materials.models import Material, MaterialAlias # Also re-export SystemSetting (no domain assigned — stays as-is) @@ -19,5 +19,6 @@ from app.models.system_setting import SystemSetting __all__ = [ "Tenant", "User", "Template", "CadFile", "Product", "Order", "OrderItem", "OrderLine", "AuditLog", "PricingTier", "OutputType", "RenderTemplate", "ProductRenderPosition", + "WorkflowDefinition", "WorkflowRun", "WorkflowNodeResult", "Material", "MaterialAlias", "SystemSetting", ]