Files
HartOMat/backend/app/domains/rendering/workflow_router.py
T

298 lines
9.6 KiB
Python

"""Workflow definition CRUD API."""
import uuid
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel, ValidationError
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.domains.auth.models import User
from app.utils.auth import get_current_user, require_global_admin, require_admin_or_pm, require_pm_or_above
from app.domains.rendering.models import WorkflowDefinition, WorkflowRun
from app.domains.rendering.schemas import (
WorkflowDefinitionCreate,
WorkflowDefinitionUpdate,
WorkflowDefinitionOut,
WorkflowRunOut,
)
from app.domains.rendering.workflow_config_utils import canonicalize_workflow_config
from app.domains.rendering.workflow_node_registry import (
StepCategory,
WorkflowNodeDefinition,
list_node_definitions,
)
from app.domains.rendering.workflow_schema import WorkflowConfig
class PipelineStepOut(BaseModel):
name: str
label: str
category: StepCategory
description: str
class PipelineStepsResponse(BaseModel):
steps: list[PipelineStepOut]
class NodeDefinitionsResponse(BaseModel):
definitions: list[WorkflowNodeDefinition]
router = APIRouter(prefix="/api/workflows", tags=["workflows"])
def _workflow_to_out(wf: WorkflowDefinition) -> WorkflowDefinitionOut:
return WorkflowDefinitionOut(
id=wf.id,
name=wf.name,
output_type_id=wf.output_type_id,
config=canonicalize_workflow_config(wf.config),
is_active=wf.is_active,
created_at=wf.created_at,
)
@router.get("/node-definitions", response_model=NodeDefinitionsResponse)
async def get_node_definitions(
_user: User = Depends(require_admin_or_pm),
):
return NodeDefinitionsResponse(definitions=list_node_definitions())
@router.get("/pipeline-steps", response_model=PipelineStepsResponse)
async def get_pipeline_steps(
_user: User = Depends(require_admin_or_pm),
):
steps = [
PipelineStepOut(
name=definition.step,
label=definition.label,
category=definition.category,
description=definition.description,
)
for definition in list_node_definitions()
]
return PipelineStepsResponse(steps=steps)
@router.get("", response_model=list[WorkflowDefinitionOut])
async def list_workflows(
_user: User = Depends(require_admin_or_pm),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
select(WorkflowDefinition).order_by(WorkflowDefinition.created_at)
)
return [_workflow_to_out(wf) for wf in result.scalars().all()]
@router.get("/{workflow_id}", response_model=WorkflowDefinitionOut)
async def get_workflow(
workflow_id: uuid.UUID,
_user: User = Depends(require_admin_or_pm),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id)
)
wf = result.scalar_one_or_none()
if not wf:
raise HTTPException(status_code=404, detail="Workflow definition not found")
return _workflow_to_out(wf)
@router.post("", response_model=WorkflowDefinitionOut, status_code=201)
async def create_workflow(
body: WorkflowDefinitionCreate,
_user: User = Depends(require_global_admin),
db: AsyncSession = Depends(get_db),
):
normalized_config = canonicalize_workflow_config(body.config)
if body.config:
try:
WorkflowConfig.model_validate(normalized_config)
except (ValidationError, ValueError) as exc:
detail = exc.errors() if isinstance(exc, ValidationError) else str(exc)
raise HTTPException(status_code=422, detail=f"Invalid workflow config: {detail}")
wf = WorkflowDefinition(
name=body.name,
output_type_id=body.output_type_id,
config=normalized_config,
is_active=body.is_active,
)
db.add(wf)
await db.commit()
await db.refresh(wf)
return _workflow_to_out(wf)
@router.put("/{workflow_id}", response_model=WorkflowDefinitionOut)
async def update_workflow(
workflow_id: uuid.UUID,
body: WorkflowDefinitionUpdate,
_user: User = Depends(require_global_admin),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id)
)
wf = result.scalar_one_or_none()
if not wf:
raise HTTPException(status_code=404, detail="Workflow definition not found")
if body.name is not None:
wf.name = body.name
if body.config is not None:
try:
normalized_config = canonicalize_workflow_config(body.config)
WorkflowConfig.model_validate(normalized_config)
except (ValidationError, ValueError) as exc:
detail = exc.errors() if isinstance(exc, ValidationError) else str(exc)
raise HTTPException(status_code=422, detail=f"Invalid workflow config: {detail}")
wf.config = normalized_config
if body.is_active is not None:
wf.is_active = body.is_active
await db.commit()
await db.refresh(wf)
return _workflow_to_out(wf)
@router.delete("/{workflow_id}", status_code=204)
async def delete_workflow(
workflow_id: uuid.UUID,
_user: User = Depends(require_global_admin),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id)
)
wf = result.scalar_one_or_none()
if not wf:
raise HTTPException(status_code=404, detail="Workflow definition not found")
await db.delete(wf)
await db.commit()
@router.get("/{workflow_id}/runs", response_model=list[WorkflowRunOut])
async def list_workflow_runs(
workflow_id: uuid.UUID,
_user: User = Depends(require_admin_or_pm),
db: AsyncSession = Depends(get_db),
):
# Verify the workflow exists
wf_result = await db.execute(
select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id)
)
if not wf_result.scalar_one_or_none():
raise HTTPException(status_code=404, detail="Workflow definition not found")
result = await db.execute(
select(WorkflowRun)
.where(WorkflowRun.workflow_def_id == workflow_id)
.options(selectinload(WorkflowRun.node_results))
.order_by(WorkflowRun.created_at.desc())
)
return result.scalars().all()
class WorkflowDispatchResponse(BaseModel):
workflow_run: WorkflowRunOut
context_id: str
execution_mode: str
dispatched: int
task_ids: list[str]
@router.post("/{workflow_id}/dispatch", response_model=WorkflowDispatchResponse)
async def dispatch_workflow_endpoint(
workflow_id: uuid.UUID,
context_id: str = Query(
...,
description=(
"UUID of the entity to process. "
"For STEP/thumbnail steps this is a cad_file_id; "
"for render steps this is an order_line_id."
),
),
_user: User = Depends(require_pm_or_above),
db: AsyncSession = Depends(get_db),
):
"""Dispatch a workflow's steps as Celery tasks for a given context entity.
Each node in the workflow config is dispatched as an individual Celery task
in topological (dependency) order. Returns the list of Celery task IDs so
the caller can track progress.
"""
from pydantic import ValidationError as _ValidationError
from app.domains.rendering.workflow_executor import prepare_workflow_context
from app.domains.rendering.workflow_graph_runtime import execute_graph_workflow
from app.domains.rendering.workflow_run_service import (
create_workflow_run,
mark_workflow_run_failed,
)
result = await db.execute(
select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id)
)
wf = result.scalar_one_or_none()
if not wf:
raise HTTPException(status_code=404, detail="Workflow definition not found")
if not wf.config:
raise HTTPException(status_code=400, detail="Workflow has no config")
try:
workflow_context = prepare_workflow_context(
wf.config,
context_id=context_id,
execution_mode="graph",
)
except _ValidationError as exc:
raise HTTPException(status_code=422, detail=f"Invalid workflow config: {exc.errors()}")
except ValueError as exc:
raise HTTPException(status_code=422, detail=str(exc))
run_id = await db.run_sync(
lambda sync_session: create_workflow_run(
sync_session,
workflow_def_id=wf.id,
order_line_id=None,
workflow_context=workflow_context,
).id
)
await db.commit()
try:
dispatch_result = await db.run_sync(
lambda sync_session: execute_graph_workflow(sync_session, workflow_context)
)
except Exception as exc:
failed_result = await db.execute(
select(WorkflowRun)
.where(WorkflowRun.id == run_id)
.options(selectinload(WorkflowRun.node_results))
)
failed_run = failed_result.scalar_one()
mark_workflow_run_failed(failed_run, str(exc))
await db.commit()
raise
await db.commit()
refreshed_result = await db.execute(
select(WorkflowRun)
.where(WorkflowRun.id == run_id)
.options(selectinload(WorkflowRun.node_results))
)
refreshed_run = refreshed_result.scalar_one()
return WorkflowDispatchResponse(
workflow_run=refreshed_run,
context_id=context_id,
execution_mode=workflow_context.execution_mode,
dispatched=len(dispatch_result.task_ids),
task_ids=dispatch_result.task_ids,
)