Files
HartOMat/backend/app/domains/rendering/workflow_router.py
T
Hartmut 1409be171c feat(phase7.3): workflow editor pipeline step nodes
- GET /api/workflows/pipeline-steps: returns all StepName enum values
  with category (input|processing|rendering|output) + descriptions;
  registered before /{workflow_id} to avoid path collision
- frontend/src/api/workflows.ts: getPipelineSteps(), PipelineStep
  and PipelineStepsResponse interfaces
- WorkflowEditor: PipelineStepsPanel showing steps grouped by category
  with collapsible accordion sections
- ConfigSidepanel: "Pipeline Step" select dropdown binds any node to a
  StepName; selected step description shown below dropdown
- Active workflow indicator: green dot next to is_active=true entries
- Improved empty state with descriptive copy

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-08 20:24:17 +01:00

219 lines
8.2 KiB
Python

"""Workflow definition CRUD API.
Endpoints:
GET /api/workflows/ — list all workflow definitions (admin/PM)
GET /api/workflows/pipeline-steps — list available pipeline step definitions
GET /api/workflows/{id} — get single definition (admin/PM)
POST /api/workflows/ — create definition (admin only)
PUT /api/workflows/{id} — update definition (admin only)
DELETE /api/workflows/{id} — delete definition (admin only)
GET /api/workflows/{id}/runs — list runs for a definition (admin/PM)
"""
import uuid
from typing import Literal
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.domains.auth.models import User
from app.utils.auth import get_current_user, require_admin, require_admin_or_pm
from app.domains.rendering.models import WorkflowDefinition, WorkflowRun
from app.domains.rendering.schemas import (
WorkflowDefinitionCreate,
WorkflowDefinitionUpdate,
WorkflowDefinitionOut,
WorkflowRunOut,
)
from app.core.process_steps import StepName
# ── Pipeline-step metadata helpers ──────────────────────────────────────────
StepCategory = Literal["input", "processing", "rendering", "output"]
_STEP_CATEGORIES: dict[StepName, StepCategory] = {
StepName.RESOLVE_STEP_PATH: "input",
StepName.OCC_OBJECT_EXTRACT: "processing",
StepName.OCC_GLB_EXPORT: "processing",
StepName.GLB_BBOX: "processing",
StepName.MATERIAL_MAP_RESOLVE: "processing",
StepName.AUTO_POPULATE_MATERIALS: "processing",
StepName.BLENDER_RENDER: "rendering",
StepName.THREEJS_RENDER: "rendering",
StepName.THUMBNAIL_SAVE: "output",
StepName.ORDER_LINE_SETUP: "processing",
StepName.RESOLVE_TEMPLATE: "processing",
StepName.BLENDER_STILL: "rendering",
StepName.BLENDER_TURNTABLE: "rendering",
StepName.OUTPUT_SAVE: "output",
StepName.EXPORT_GLB_GEOMETRY: "output",
StepName.EXPORT_GLB_PRODUCTION: "output",
StepName.EXPORT_BLEND: "output",
StepName.STL_CACHE_GENERATE: "processing",
StepName.NOTIFY: "output",
}
_STEP_DESCRIPTIONS: dict[StepName, str] = {
StepName.RESOLVE_STEP_PATH: "Locate the STEP file on disk from the CadFile record",
StepName.OCC_OBJECT_EXTRACT: "Extract part objects and metadata from the STEP file using cadquery/OCC",
StepName.OCC_GLB_EXPORT: "Convert STEP geometry to glTF/GLB via cadquery",
StepName.GLB_BBOX: "Compute bounding-box from the exported GLB for camera framing",
StepName.MATERIAL_MAP_RESOLVE: "Resolve raw part-material names to SCHAEFFLER library materials via alias table",
StepName.AUTO_POPULATE_MATERIALS: "Auto-create Material records for any newly discovered part names",
StepName.BLENDER_RENDER: "Render a thumbnail PNG using Blender (Cycles or EEVEE)",
StepName.THREEJS_RENDER: "Render a thumbnail PNG using Three.js / Playwright headless browser",
StepName.THUMBNAIL_SAVE: "Persist the rendered thumbnail bytes to the CadFile record",
StepName.ORDER_LINE_SETUP: "Validate and prepare an order line for rendering (check STEP path, output type)",
StepName.RESOLVE_TEMPLATE: "Look up the matching RenderTemplate for the order line's category + output type",
StepName.BLENDER_STILL: "Render a production still image (PNG) via Blender HTTP micro-service",
StepName.BLENDER_TURNTABLE: "Render all turntable animation frames via Blender HTTP micro-service",
StepName.OUTPUT_SAVE: "Upload the rendered output file to storage and create a MediaAsset record",
StepName.EXPORT_GLB_GEOMETRY: "Export a geometry-only GLB for the 3-D viewer (no materials)",
StepName.EXPORT_GLB_PRODUCTION: "Export a production GLB with full materials from the .blend template",
StepName.EXPORT_BLEND: "Save the production .blend file as a downloadable MediaAsset",
StepName.STL_CACHE_GENERATE: "Convert STEP → STL (low + high quality) and cache next to the STEP file",
StepName.NOTIFY: "Emit a user notification via the audit-log notification channel",
}
class PipelineStepOut(BaseModel):
name: str
label: str
category: StepCategory
description: str
class PipelineStepsResponse(BaseModel):
steps: list[PipelineStepOut]
router = APIRouter(prefix="/api/workflows", tags=["workflows"])
@router.get("/pipeline-steps", response_model=PipelineStepsResponse)
async def get_pipeline_steps(
_user: User = Depends(require_admin_or_pm),
):
"""Return all available pipeline step definitions for the workflow editor."""
steps = [
PipelineStepOut(
name=step.value,
label=step.value.replace("_", " ").title(),
category=_STEP_CATEGORIES.get(step, "processing"),
description=_STEP_DESCRIPTIONS.get(step, ""),
)
for step in StepName
]
return PipelineStepsResponse(steps=steps)
@router.get("", response_model=list[WorkflowDefinitionOut])
async def list_workflows(
_user: User = Depends(require_admin_or_pm),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
select(WorkflowDefinition).order_by(WorkflowDefinition.created_at)
)
return result.scalars().all()
@router.get("/{workflow_id}", response_model=WorkflowDefinitionOut)
async def get_workflow(
workflow_id: uuid.UUID,
_user: User = Depends(require_admin_or_pm),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id)
)
wf = result.scalar_one_or_none()
if not wf:
raise HTTPException(status_code=404, detail="Workflow definition not found")
return wf
@router.post("", response_model=WorkflowDefinitionOut, status_code=201)
async def create_workflow(
body: WorkflowDefinitionCreate,
_user: User = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
wf = WorkflowDefinition(
name=body.name,
output_type_id=body.output_type_id,
config=body.config,
is_active=body.is_active,
)
db.add(wf)
await db.commit()
await db.refresh(wf)
return wf
@router.put("/{workflow_id}", response_model=WorkflowDefinitionOut)
async def update_workflow(
workflow_id: uuid.UUID,
body: WorkflowDefinitionUpdate,
_user: User = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id)
)
wf = result.scalar_one_or_none()
if not wf:
raise HTTPException(status_code=404, detail="Workflow definition not found")
if body.name is not None:
wf.name = body.name
if body.config is not None:
wf.config = body.config
if body.is_active is not None:
wf.is_active = body.is_active
await db.commit()
await db.refresh(wf)
return wf
@router.delete("/{workflow_id}", status_code=204)
async def delete_workflow(
workflow_id: uuid.UUID,
_user: User = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id)
)
wf = result.scalar_one_or_none()
if not wf:
raise HTTPException(status_code=404, detail="Workflow definition not found")
await db.delete(wf)
await db.commit()
@router.get("/{workflow_id}/runs", response_model=list[WorkflowRunOut])
async def list_workflow_runs(
workflow_id: uuid.UUID,
_user: User = Depends(require_admin_or_pm),
db: AsyncSession = Depends(get_db),
):
# Verify the workflow exists
wf_result = await db.execute(
select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id)
)
if not wf_result.scalar_one_or_none():
raise HTTPException(status_code=404, detail="Workflow definition not found")
result = await db.execute(
select(WorkflowRun)
.where(WorkflowRun.workflow_def_id == workflow_id)
.options(selectinload(WorkflowRun.node_results))
.order_by(WorkflowRun.created_at.desc())
)
return result.scalars().all()