Files
HartOMat/backend/app/domains/rendering/workflow_router.py
T

272 lines
10 KiB
Python

"""Workflow definition CRUD API.
Endpoints:
GET /api/workflows/ — list all workflow definitions (admin/PM)
GET /api/workflows/pipeline-steps — list available pipeline step definitions
GET /api/workflows/{id} — get single definition (admin/PM)
POST /api/workflows/ — create definition (admin only)
PUT /api/workflows/{id} — update definition (admin only)
DELETE /api/workflows/{id} — delete definition (admin only)
GET /api/workflows/{id}/runs — list runs for a definition (admin/PM)
"""
import uuid
from typing import Literal
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel, ValidationError
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.domains.auth.models import User
from app.utils.auth import get_current_user, require_global_admin, require_admin_or_pm, require_pm_or_above
from app.domains.rendering.models import WorkflowDefinition, WorkflowRun
from app.domains.rendering.schemas import (
WorkflowDefinitionCreate,
WorkflowDefinitionUpdate,
WorkflowDefinitionOut,
WorkflowRunOut,
)
from app.domains.rendering.workflow_schema import WorkflowConfig
from app.core.process_steps import StepName
# ── Pipeline-step metadata helpers ──────────────────────────────────────────
StepCategory = Literal["input", "processing", "rendering", "output"]
_STEP_CATEGORIES: dict[StepName, StepCategory] = {
StepName.RESOLVE_STEP_PATH: "input",
StepName.OCC_OBJECT_EXTRACT: "processing",
StepName.OCC_GLB_EXPORT: "processing",
StepName.GLB_BBOX: "processing",
StepName.MATERIAL_MAP_RESOLVE: "processing",
StepName.AUTO_POPULATE_MATERIALS: "processing",
StepName.BLENDER_RENDER: "rendering",
StepName.THREEJS_RENDER: "rendering",
StepName.THUMBNAIL_SAVE: "output",
StepName.ORDER_LINE_SETUP: "processing",
StepName.RESOLVE_TEMPLATE: "processing",
StepName.BLENDER_STILL: "rendering",
StepName.BLENDER_TURNTABLE: "rendering",
StepName.OUTPUT_SAVE: "output",
StepName.EXPORT_BLEND: "output",
StepName.STL_CACHE_GENERATE: "processing",
StepName.NOTIFY: "output",
}
_STEP_DESCRIPTIONS: dict[StepName, str] = {
StepName.RESOLVE_STEP_PATH: "Locate the STEP file on disk from the CadFile record",
StepName.OCC_OBJECT_EXTRACT: "Extract part objects and metadata from the STEP file using cadquery/OCC",
StepName.OCC_GLB_EXPORT: "Convert STEP geometry to glTF/GLB via cadquery",
StepName.GLB_BBOX: "Compute bounding-box from the exported GLB for camera framing",
StepName.MATERIAL_MAP_RESOLVE: "Resolve raw part-material names to HARTOMAT library materials via alias table",
StepName.AUTO_POPULATE_MATERIALS: "Auto-create Material records for any newly discovered part names",
StepName.BLENDER_RENDER: "Render a thumbnail PNG using Blender (Cycles or EEVEE)",
StepName.THREEJS_RENDER: "Render a thumbnail PNG using Three.js / Playwright headless browser",
StepName.THUMBNAIL_SAVE: "Persist the rendered thumbnail bytes to the CadFile record",
StepName.ORDER_LINE_SETUP: "Validate and prepare an order line for rendering (check STEP path, output type)",
StepName.RESOLVE_TEMPLATE: "Look up the matching RenderTemplate for the order line's category + output type",
StepName.BLENDER_STILL: "Render a production still image (PNG) via Blender HTTP micro-service",
StepName.BLENDER_TURNTABLE: "Render all turntable animation frames via Blender HTTP micro-service",
StepName.OUTPUT_SAVE: "Upload the rendered output file to storage and create a MediaAsset record",
StepName.EXPORT_BLEND: "Save the production .blend file as a downloadable MediaAsset",
StepName.STL_CACHE_GENERATE: "Convert STEP → STL (low + high quality) and cache next to the STEP file",
StepName.NOTIFY: "Emit a user notification via the audit-log notification channel",
}
class PipelineStepOut(BaseModel):
name: str
label: str
category: StepCategory
description: str
class PipelineStepsResponse(BaseModel):
steps: list[PipelineStepOut]
router = APIRouter(prefix="/api/workflows", tags=["workflows"])
@router.get("/pipeline-steps", response_model=PipelineStepsResponse)
async def get_pipeline_steps(
_user: User = Depends(require_admin_or_pm),
):
"""Return all available pipeline step definitions for the workflow editor."""
steps = [
PipelineStepOut(
name=step.value,
label=step.value.replace("_", " ").title(),
category=_STEP_CATEGORIES.get(step, "processing"),
description=_STEP_DESCRIPTIONS.get(step, ""),
)
for step in StepName
]
return PipelineStepsResponse(steps=steps)
@router.get("", response_model=list[WorkflowDefinitionOut])
async def list_workflows(
_user: User = Depends(require_admin_or_pm),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
select(WorkflowDefinition).order_by(WorkflowDefinition.created_at)
)
return result.scalars().all()
@router.get("/{workflow_id}", response_model=WorkflowDefinitionOut)
async def get_workflow(
workflow_id: uuid.UUID,
_user: User = Depends(require_admin_or_pm),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id)
)
wf = result.scalar_one_or_none()
if not wf:
raise HTTPException(status_code=404, detail="Workflow definition not found")
return wf
@router.post("", response_model=WorkflowDefinitionOut, status_code=201)
async def create_workflow(
body: WorkflowDefinitionCreate,
_user: User = Depends(require_global_admin),
db: AsyncSession = Depends(get_db),
):
if body.config:
try:
WorkflowConfig.model_validate(body.config)
except ValidationError as exc:
raise HTTPException(status_code=422, detail=f"Invalid workflow config: {exc.errors()}")
wf = WorkflowDefinition(
name=body.name,
output_type_id=body.output_type_id,
config=body.config,
is_active=body.is_active,
)
db.add(wf)
await db.commit()
await db.refresh(wf)
return wf
@router.put("/{workflow_id}", response_model=WorkflowDefinitionOut)
async def update_workflow(
workflow_id: uuid.UUID,
body: WorkflowDefinitionUpdate,
_user: User = Depends(require_global_admin),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id)
)
wf = result.scalar_one_or_none()
if not wf:
raise HTTPException(status_code=404, detail="Workflow definition not found")
if body.name is not None:
wf.name = body.name
if body.config is not None:
try:
WorkflowConfig.model_validate(body.config)
except ValidationError as exc:
raise HTTPException(status_code=422, detail=f"Invalid workflow config: {exc.errors()}")
wf.config = body.config
if body.is_active is not None:
wf.is_active = body.is_active
await db.commit()
await db.refresh(wf)
return wf
@router.delete("/{workflow_id}", status_code=204)
async def delete_workflow(
workflow_id: uuid.UUID,
_user: User = Depends(require_global_admin),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id)
)
wf = result.scalar_one_or_none()
if not wf:
raise HTTPException(status_code=404, detail="Workflow definition not found")
await db.delete(wf)
await db.commit()
@router.get("/{workflow_id}/runs", response_model=list[WorkflowRunOut])
async def list_workflow_runs(
workflow_id: uuid.UUID,
_user: User = Depends(require_admin_or_pm),
db: AsyncSession = Depends(get_db),
):
# Verify the workflow exists
wf_result = await db.execute(
select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id)
)
if not wf_result.scalar_one_or_none():
raise HTTPException(status_code=404, detail="Workflow definition not found")
result = await db.execute(
select(WorkflowRun)
.where(WorkflowRun.workflow_def_id == workflow_id)
.options(selectinload(WorkflowRun.node_results))
.order_by(WorkflowRun.created_at.desc())
)
return result.scalars().all()
class WorkflowDispatchResponse(BaseModel):
dispatched: int
task_ids: list[str]
@router.post("/{workflow_id}/dispatch", response_model=WorkflowDispatchResponse)
async def dispatch_workflow_endpoint(
workflow_id: uuid.UUID,
context_id: str = Query(
...,
description=(
"UUID of the entity to process. "
"For STEP/thumbnail steps this is a cad_file_id; "
"for render steps this is an order_line_id."
),
),
_user: User = Depends(require_pm_or_above),
db: AsyncSession = Depends(get_db),
):
"""Dispatch a workflow's steps as Celery tasks for a given context entity.
Each node in the workflow config is dispatched as an individual Celery task
in topological (dependency) order. Returns the list of Celery task IDs so
the caller can track progress.
"""
from pydantic import ValidationError as _ValidationError
from app.domains.rendering.workflow_executor import dispatch_workflow
result = await db.execute(
select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id)
)
wf = result.scalar_one_or_none()
if not wf:
raise HTTPException(status_code=404, detail="Workflow definition not found")
if not wf.config:
raise HTTPException(status_code=400, detail="Workflow has no config")
try:
task_ids = dispatch_workflow(wf.config, context_id)
except _ValidationError as exc:
raise HTTPException(status_code=422, detail=f"Invalid workflow config: {exc.errors()}")
except ValueError as exc:
raise HTTPException(status_code=422, detail=str(exc))
return WorkflowDispatchResponse(dispatched=len(task_ids), task_ids=task_ids)