"""Workflow definition CRUD API. Endpoints: GET /api/workflows/ — list all workflow definitions (admin/PM) GET /api/workflows/{id} — get single definition (admin/PM) POST /api/workflows/ — create definition (admin only) PUT /api/workflows/{id} — update definition (admin only) DELETE /api/workflows/{id} — delete definition (admin only) GET /api/workflows/{id}/runs — list runs for a definition (admin/PM) """ import uuid from fastapi import APIRouter, Depends, HTTPException from sqlalchemy import select from sqlalchemy.orm import selectinload from sqlalchemy.ext.asyncio import AsyncSession from app.database import get_db from app.domains.auth.models import User from app.utils.auth import get_current_user, require_admin, require_admin_or_pm from app.domains.rendering.models import WorkflowDefinition, WorkflowRun from app.domains.rendering.schemas import ( WorkflowDefinitionCreate, WorkflowDefinitionUpdate, WorkflowDefinitionOut, WorkflowRunOut, ) router = APIRouter(prefix="/api/workflows", tags=["workflows"]) @router.get("/", response_model=list[WorkflowDefinitionOut]) async def list_workflows( _user: User = Depends(require_admin_or_pm), db: AsyncSession = Depends(get_db), ): result = await db.execute( select(WorkflowDefinition).order_by(WorkflowDefinition.created_at) ) return result.scalars().all() @router.get("/{workflow_id}", response_model=WorkflowDefinitionOut) async def get_workflow( workflow_id: uuid.UUID, _user: User = Depends(require_admin_or_pm), db: AsyncSession = Depends(get_db), ): result = await db.execute( select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id) ) wf = result.scalar_one_or_none() if not wf: raise HTTPException(status_code=404, detail="Workflow definition not found") return wf @router.post("/", response_model=WorkflowDefinitionOut, status_code=201) async def create_workflow( body: WorkflowDefinitionCreate, _user: User = Depends(require_admin), db: AsyncSession = Depends(get_db), ): wf = WorkflowDefinition( name=body.name, output_type_id=body.output_type_id, config=body.config, is_active=body.is_active, ) db.add(wf) await db.commit() await db.refresh(wf) return wf @router.put("/{workflow_id}", response_model=WorkflowDefinitionOut) async def update_workflow( workflow_id: uuid.UUID, body: WorkflowDefinitionUpdate, _user: User = Depends(require_admin), db: AsyncSession = Depends(get_db), ): result = await db.execute( select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id) ) wf = result.scalar_one_or_none() if not wf: raise HTTPException(status_code=404, detail="Workflow definition not found") if body.name is not None: wf.name = body.name if body.config is not None: wf.config = body.config if body.is_active is not None: wf.is_active = body.is_active await db.commit() await db.refresh(wf) return wf @router.delete("/{workflow_id}", status_code=204) async def delete_workflow( workflow_id: uuid.UUID, _user: User = Depends(require_admin), db: AsyncSession = Depends(get_db), ): result = await db.execute( select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id) ) wf = result.scalar_one_or_none() if not wf: raise HTTPException(status_code=404, detail="Workflow definition not found") await db.delete(wf) await db.commit() @router.get("/{workflow_id}/runs", response_model=list[WorkflowRunOut]) async def list_workflow_runs( workflow_id: uuid.UUID, _user: User = Depends(require_admin_or_pm), db: AsyncSession = Depends(get_db), ): # Verify the workflow exists wf_result = await db.execute( select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id) ) if not wf_result.scalar_one_or_none(): raise HTTPException(status_code=404, detail="Workflow definition not found") result = await db.execute( select(WorkflowRun) .where(WorkflowRun.workflow_def_id == workflow_id) .options(selectinload(WorkflowRun.node_results)) .order_by(WorkflowRun.created_at.desc()) ) return result.scalars().all()