"""Workflow definition CRUD API.""" import uuid from fastapi import APIRouter, Depends, HTTPException, Query from pydantic import BaseModel, ValidationError from sqlalchemy import select from sqlalchemy.orm import selectinload from sqlalchemy.ext.asyncio import AsyncSession from app.database import get_db from app.domains.auth.models import User from app.utils.auth import get_current_user, require_global_admin, require_admin_or_pm, require_pm_or_above from app.domains.rendering.models import WorkflowDefinition, WorkflowRun from app.domains.rendering.schemas import ( WorkflowDefinitionCreate, WorkflowDefinitionUpdate, WorkflowDefinitionOut, WorkflowRunOut, ) from app.domains.rendering.workflow_config_utils import canonicalize_workflow_config from app.domains.rendering.workflow_node_registry import ( StepCategory, WorkflowNodeDefinition, list_node_definitions, ) from app.domains.rendering.workflow_schema import WorkflowConfig class PipelineStepOut(BaseModel): name: str label: str category: StepCategory description: str class PipelineStepsResponse(BaseModel): steps: list[PipelineStepOut] class NodeDefinitionsResponse(BaseModel): definitions: list[WorkflowNodeDefinition] router = APIRouter(prefix="/api/workflows", tags=["workflows"]) def _workflow_to_out(wf: WorkflowDefinition) -> WorkflowDefinitionOut: return WorkflowDefinitionOut( id=wf.id, name=wf.name, output_type_id=wf.output_type_id, config=canonicalize_workflow_config(wf.config), is_active=wf.is_active, created_at=wf.created_at, ) @router.get("/node-definitions", response_model=NodeDefinitionsResponse) async def get_node_definitions( _user: User = Depends(require_admin_or_pm), ): return NodeDefinitionsResponse(definitions=list_node_definitions()) @router.get("/pipeline-steps", response_model=PipelineStepsResponse) async def get_pipeline_steps( _user: User = Depends(require_admin_or_pm), ): steps = [ PipelineStepOut( name=definition.step, label=definition.label, category=definition.category, description=definition.description, ) for definition in list_node_definitions() ] return PipelineStepsResponse(steps=steps) @router.get("", response_model=list[WorkflowDefinitionOut]) async def list_workflows( _user: User = Depends(require_admin_or_pm), db: AsyncSession = Depends(get_db), ): result = await db.execute( select(WorkflowDefinition).order_by(WorkflowDefinition.created_at) ) return [_workflow_to_out(wf) for wf in result.scalars().all()] @router.get("/{workflow_id}", response_model=WorkflowDefinitionOut) async def get_workflow( workflow_id: uuid.UUID, _user: User = Depends(require_admin_or_pm), db: AsyncSession = Depends(get_db), ): result = await db.execute( select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id) ) wf = result.scalar_one_or_none() if not wf: raise HTTPException(status_code=404, detail="Workflow definition not found") return _workflow_to_out(wf) @router.post("", response_model=WorkflowDefinitionOut, status_code=201) async def create_workflow( body: WorkflowDefinitionCreate, _user: User = Depends(require_global_admin), db: AsyncSession = Depends(get_db), ): normalized_config = canonicalize_workflow_config(body.config) if body.config: try: WorkflowConfig.model_validate(normalized_config) except (ValidationError, ValueError) as exc: detail = exc.errors() if isinstance(exc, ValidationError) else str(exc) raise HTTPException(status_code=422, detail=f"Invalid workflow config: {detail}") wf = WorkflowDefinition( name=body.name, output_type_id=body.output_type_id, config=normalized_config, is_active=body.is_active, ) db.add(wf) await db.commit() await db.refresh(wf) return _workflow_to_out(wf) @router.put("/{workflow_id}", response_model=WorkflowDefinitionOut) async def update_workflow( workflow_id: uuid.UUID, body: WorkflowDefinitionUpdate, _user: User = Depends(require_global_admin), db: AsyncSession = Depends(get_db), ): result = await db.execute( select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id) ) wf = result.scalar_one_or_none() if not wf: raise HTTPException(status_code=404, detail="Workflow definition not found") if body.name is not None: wf.name = body.name if body.config is not None: try: normalized_config = canonicalize_workflow_config(body.config) WorkflowConfig.model_validate(normalized_config) except (ValidationError, ValueError) as exc: detail = exc.errors() if isinstance(exc, ValidationError) else str(exc) raise HTTPException(status_code=422, detail=f"Invalid workflow config: {detail}") wf.config = normalized_config if body.is_active is not None: wf.is_active = body.is_active await db.commit() await db.refresh(wf) return _workflow_to_out(wf) @router.delete("/{workflow_id}", status_code=204) async def delete_workflow( workflow_id: uuid.UUID, _user: User = Depends(require_global_admin), db: AsyncSession = Depends(get_db), ): result = await db.execute( select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id) ) wf = result.scalar_one_or_none() if not wf: raise HTTPException(status_code=404, detail="Workflow definition not found") await db.delete(wf) await db.commit() @router.get("/{workflow_id}/runs", response_model=list[WorkflowRunOut]) async def list_workflow_runs( workflow_id: uuid.UUID, _user: User = Depends(require_admin_or_pm), db: AsyncSession = Depends(get_db), ): # Verify the workflow exists wf_result = await db.execute( select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id) ) if not wf_result.scalar_one_or_none(): raise HTTPException(status_code=404, detail="Workflow definition not found") result = await db.execute( select(WorkflowRun) .where(WorkflowRun.workflow_def_id == workflow_id) .options(selectinload(WorkflowRun.node_results)) .order_by(WorkflowRun.created_at.desc()) ) return result.scalars().all() class WorkflowDispatchResponse(BaseModel): workflow_run: WorkflowRunOut context_id: str execution_mode: str dispatched: int task_ids: list[str] @router.post("/{workflow_id}/dispatch", response_model=WorkflowDispatchResponse) async def dispatch_workflow_endpoint( workflow_id: uuid.UUID, context_id: str = Query( ..., description=( "UUID of the entity to process. " "For STEP/thumbnail steps this is a cad_file_id; " "for render steps this is an order_line_id." ), ), _user: User = Depends(require_pm_or_above), db: AsyncSession = Depends(get_db), ): """Dispatch a workflow's steps as Celery tasks for a given context entity. Each node in the workflow config is dispatched as an individual Celery task in topological (dependency) order. Returns the list of Celery task IDs so the caller can track progress. """ from pydantic import ValidationError as _ValidationError from app.domains.rendering.workflow_executor import prepare_workflow_context from app.domains.rendering.workflow_graph_runtime import execute_graph_workflow from app.domains.rendering.workflow_run_service import ( create_workflow_run, mark_workflow_run_failed, ) result = await db.execute( select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id) ) wf = result.scalar_one_or_none() if not wf: raise HTTPException(status_code=404, detail="Workflow definition not found") if not wf.config: raise HTTPException(status_code=400, detail="Workflow has no config") try: workflow_context = prepare_workflow_context( wf.config, context_id=context_id, execution_mode="graph", ) except _ValidationError as exc: raise HTTPException(status_code=422, detail=f"Invalid workflow config: {exc.errors()}") except ValueError as exc: raise HTTPException(status_code=422, detail=str(exc)) run_id = await db.run_sync( lambda sync_session: create_workflow_run( sync_session, workflow_def_id=wf.id, order_line_id=None, workflow_context=workflow_context, ).id ) await db.commit() try: dispatch_result = await db.run_sync( lambda sync_session: execute_graph_workflow(sync_session, workflow_context) ) except Exception as exc: failed_result = await db.execute( select(WorkflowRun) .where(WorkflowRun.id == run_id) .options(selectinload(WorkflowRun.node_results)) ) failed_run = failed_result.scalar_one() mark_workflow_run_failed(failed_run, str(exc)) await db.commit() raise await db.commit() refreshed_result = await db.execute( select(WorkflowRun) .where(WorkflowRun.id == run_id) .options(selectinload(WorkflowRun.node_results)) ) refreshed_run = refreshed_result.scalar_one() return WorkflowDispatchResponse( workflow_run=refreshed_run, context_id=context_id, execution_mode=workflow_context.execution_mode, dispatched=len(dispatch_result.task_ids), task_ids=dispatch_result.task_ids, )