"""Workflow definition CRUD API. Endpoints: GET /api/workflows/ — list all workflow definitions (admin/PM) GET /api/workflows/pipeline-steps — list available pipeline step definitions GET /api/workflows/{id} — get single definition (admin/PM) POST /api/workflows/ — create definition (admin only) PUT /api/workflows/{id} — update definition (admin only) DELETE /api/workflows/{id} — delete definition (admin only) GET /api/workflows/{id}/runs — list runs for a definition (admin/PM) """ import uuid from typing import Literal from fastapi import APIRouter, Depends, HTTPException, Query from pydantic import BaseModel, ValidationError from sqlalchemy import select from sqlalchemy.orm import selectinload from sqlalchemy.ext.asyncio import AsyncSession from app.database import get_db from app.domains.auth.models import User from app.utils.auth import get_current_user, require_global_admin, require_admin_or_pm, require_pm_or_above from app.domains.rendering.models import WorkflowDefinition, WorkflowRun from app.domains.rendering.schemas import ( WorkflowDefinitionCreate, WorkflowDefinitionUpdate, WorkflowDefinitionOut, WorkflowRunOut, ) from app.domains.rendering.workflow_schema import WorkflowConfig from app.core.process_steps import StepName # ── Pipeline-step metadata helpers ────────────────────────────────────────── StepCategory = Literal["input", "processing", "rendering", "output"] _STEP_CATEGORIES: dict[StepName, StepCategory] = { StepName.RESOLVE_STEP_PATH: "input", StepName.OCC_OBJECT_EXTRACT: "processing", StepName.OCC_GLB_EXPORT: "processing", StepName.GLB_BBOX: "processing", StepName.MATERIAL_MAP_RESOLVE: "processing", StepName.AUTO_POPULATE_MATERIALS: "processing", StepName.BLENDER_RENDER: "rendering", StepName.THREEJS_RENDER: "rendering", StepName.THUMBNAIL_SAVE: "output", StepName.ORDER_LINE_SETUP: "processing", StepName.RESOLVE_TEMPLATE: "processing", StepName.BLENDER_STILL: "rendering", StepName.BLENDER_TURNTABLE: "rendering", StepName.OUTPUT_SAVE: "output", StepName.EXPORT_BLEND: "output", StepName.STL_CACHE_GENERATE: "processing", StepName.NOTIFY: "output", } _STEP_DESCRIPTIONS: dict[StepName, str] = { StepName.RESOLVE_STEP_PATH: "Locate the STEP file on disk from the CadFile record", StepName.OCC_OBJECT_EXTRACT: "Extract part objects and metadata from the STEP file using cadquery/OCC", StepName.OCC_GLB_EXPORT: "Convert STEP geometry to glTF/GLB via cadquery", StepName.GLB_BBOX: "Compute bounding-box from the exported GLB for camera framing", StepName.MATERIAL_MAP_RESOLVE: "Resolve raw part-material names to HARTOMAT library materials via alias table", StepName.AUTO_POPULATE_MATERIALS: "Auto-create Material records for any newly discovered part names", StepName.BLENDER_RENDER: "Render a thumbnail PNG using Blender (Cycles or EEVEE)", StepName.THREEJS_RENDER: "Render a thumbnail PNG using Three.js / Playwright headless browser", StepName.THUMBNAIL_SAVE: "Persist the rendered thumbnail bytes to the CadFile record", StepName.ORDER_LINE_SETUP: "Validate and prepare an order line for rendering (check STEP path, output type)", StepName.RESOLVE_TEMPLATE: "Look up the matching RenderTemplate for the order line's category + output type", StepName.BLENDER_STILL: "Render a production still image (PNG) via Blender HTTP micro-service", StepName.BLENDER_TURNTABLE: "Render all turntable animation frames via Blender HTTP micro-service", StepName.OUTPUT_SAVE: "Upload the rendered output file to storage and create a MediaAsset record", StepName.EXPORT_BLEND: "Save the production .blend file as a downloadable MediaAsset", StepName.STL_CACHE_GENERATE: "Convert STEP → STL (low + high quality) and cache next to the STEP file", StepName.NOTIFY: "Emit a user notification via the audit-log notification channel", } class PipelineStepOut(BaseModel): name: str label: str category: StepCategory description: str class PipelineStepsResponse(BaseModel): steps: list[PipelineStepOut] router = APIRouter(prefix="/api/workflows", tags=["workflows"]) @router.get("/pipeline-steps", response_model=PipelineStepsResponse) async def get_pipeline_steps( _user: User = Depends(require_admin_or_pm), ): """Return all available pipeline step definitions for the workflow editor.""" steps = [ PipelineStepOut( name=step.value, label=step.value.replace("_", " ").title(), category=_STEP_CATEGORIES.get(step, "processing"), description=_STEP_DESCRIPTIONS.get(step, ""), ) for step in StepName ] return PipelineStepsResponse(steps=steps) @router.get("", response_model=list[WorkflowDefinitionOut]) async def list_workflows( _user: User = Depends(require_admin_or_pm), db: AsyncSession = Depends(get_db), ): result = await db.execute( select(WorkflowDefinition).order_by(WorkflowDefinition.created_at) ) return result.scalars().all() @router.get("/{workflow_id}", response_model=WorkflowDefinitionOut) async def get_workflow( workflow_id: uuid.UUID, _user: User = Depends(require_admin_or_pm), db: AsyncSession = Depends(get_db), ): result = await db.execute( select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id) ) wf = result.scalar_one_or_none() if not wf: raise HTTPException(status_code=404, detail="Workflow definition not found") return wf @router.post("", response_model=WorkflowDefinitionOut, status_code=201) async def create_workflow( body: WorkflowDefinitionCreate, _user: User = Depends(require_global_admin), db: AsyncSession = Depends(get_db), ): if body.config: try: WorkflowConfig.model_validate(body.config) except ValidationError as exc: raise HTTPException(status_code=422, detail=f"Invalid workflow config: {exc.errors()}") wf = WorkflowDefinition( name=body.name, output_type_id=body.output_type_id, config=body.config, is_active=body.is_active, ) db.add(wf) await db.commit() await db.refresh(wf) return wf @router.put("/{workflow_id}", response_model=WorkflowDefinitionOut) async def update_workflow( workflow_id: uuid.UUID, body: WorkflowDefinitionUpdate, _user: User = Depends(require_global_admin), db: AsyncSession = Depends(get_db), ): result = await db.execute( select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id) ) wf = result.scalar_one_or_none() if not wf: raise HTTPException(status_code=404, detail="Workflow definition not found") if body.name is not None: wf.name = body.name if body.config is not None: try: WorkflowConfig.model_validate(body.config) except ValidationError as exc: raise HTTPException(status_code=422, detail=f"Invalid workflow config: {exc.errors()}") wf.config = body.config if body.is_active is not None: wf.is_active = body.is_active await db.commit() await db.refresh(wf) return wf @router.delete("/{workflow_id}", status_code=204) async def delete_workflow( workflow_id: uuid.UUID, _user: User = Depends(require_global_admin), db: AsyncSession = Depends(get_db), ): result = await db.execute( select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id) ) wf = result.scalar_one_or_none() if not wf: raise HTTPException(status_code=404, detail="Workflow definition not found") await db.delete(wf) await db.commit() @router.get("/{workflow_id}/runs", response_model=list[WorkflowRunOut]) async def list_workflow_runs( workflow_id: uuid.UUID, _user: User = Depends(require_admin_or_pm), db: AsyncSession = Depends(get_db), ): # Verify the workflow exists wf_result = await db.execute( select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id) ) if not wf_result.scalar_one_or_none(): raise HTTPException(status_code=404, detail="Workflow definition not found") result = await db.execute( select(WorkflowRun) .where(WorkflowRun.workflow_def_id == workflow_id) .options(selectinload(WorkflowRun.node_results)) .order_by(WorkflowRun.created_at.desc()) ) return result.scalars().all() class WorkflowDispatchResponse(BaseModel): dispatched: int task_ids: list[str] @router.post("/{workflow_id}/dispatch", response_model=WorkflowDispatchResponse) async def dispatch_workflow_endpoint( workflow_id: uuid.UUID, context_id: str = Query( ..., description=( "UUID of the entity to process. " "For STEP/thumbnail steps this is a cad_file_id; " "for render steps this is an order_line_id." ), ), _user: User = Depends(require_pm_or_above), db: AsyncSession = Depends(get_db), ): """Dispatch a workflow's steps as Celery tasks for a given context entity. Each node in the workflow config is dispatched as an individual Celery task in topological (dependency) order. Returns the list of Celery task IDs so the caller can track progress. """ from pydantic import ValidationError as _ValidationError from app.domains.rendering.workflow_executor import dispatch_workflow result = await db.execute( select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id) ) wf = result.scalar_one_or_none() if not wf: raise HTTPException(status_code=404, detail="Workflow definition not found") if not wf.config: raise HTTPException(status_code=400, detail="Workflow has no config") try: task_ids = dispatch_workflow(wf.config, context_id) except _ValidationError as exc: raise HTTPException(status_code=422, detail=f"Invalid workflow config: {exc.errors()}") except ValueError as exc: raise HTTPException(status_code=422, detail=str(exc)) return WorkflowDispatchResponse(dispatched=len(task_ids), task_ids=task_ids)