"""Workflow definition CRUD API.""" from pathlib import Path import uuid from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, Query from pydantic import BaseModel, ValidationError from sqlalchemy import select from sqlalchemy.orm import Session, selectinload from sqlalchemy.orm.attributes import flag_modified from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import or_ from app.core.process_steps import StepName from app.database import get_db from app.domains.auth.models import User from app.domains.orders.models import Order, OrderLine, OrderStatus from app.domains.products.models import CadFile from app.domains.rendering.models import ( OutputType, WorkflowDefinition, WorkflowRun, ) from app.domains.rendering.schemas import ( WorkflowDefinitionCreate, WorkflowDefinitionUpdate, WorkflowDraftDispatchRequest, WorkflowDefinitionOut, WorkflowDraftPreflightRequest, WorkflowPreflightIssueOut, WorkflowPreflightNodeOut, WorkflowPreflightOut, WorkflowOrderLineContextGroupOut, WorkflowOrderLineContextOptionOut, WorkflowRolloutLinkedOutputTypeOut, WorkflowRolloutLatestRunOut, WorkflowRolloutSummaryOut, WorkflowRunComparisonOut, WorkflowRunOut, ) from app.domains.rendering.workflow_comparison_service import build_workflow_run_comparison from app.domains.rendering.workflow_config_utils import canonicalize_workflow_config, get_workflow_execution_mode from app.domains.rendering.workflow_graph_runtime import find_unsupported_graph_nodes from app.domains.rendering.workflow_node_registry import ( StepCategory, WorkflowNodeDefinition, get_node_definition, list_node_definitions, ) from app.domains.rendering.workflow_schema import WorkflowConfig from app.domains.rendering.workflow_runtime_services import ( prepare_order_line_render_context, resolve_cad_bbox, resolve_order_line_template_context, ) from app.domains.rendering.output_type_contracts import ( derive_supported_artifact_kinds_from_workflow_config, infer_workflow_family_from_config, validate_output_type_contract, ) from app.utils.auth import require_global_admin, require_admin_or_pm, require_pm_or_above class PipelineStepOut(BaseModel): name: str label: str category: StepCategory description: str class PipelineStepsResponse(BaseModel): steps: list[PipelineStepOut] class NodeDefinitionsResponse(BaseModel): definitions: list[WorkflowNodeDefinition] router = APIRouter(prefix="/api/workflows", tags=["workflows"]) _ORDER_LINE_RUNTIME_STEPS = { StepName.ORDER_LINE_SETUP, StepName.RESOLVE_TEMPLATE, StepName.MATERIAL_MAP_RESOLVE, StepName.AUTO_POPULATE_MATERIALS, StepName.GLB_BBOX, StepName.BLENDER_STILL, StepName.BLENDER_TURNTABLE, StepName.OUTPUT_SAVE, StepName.EXPORT_BLEND, StepName.NOTIFY, } _CAD_FILE_ENTRY_STEPS = { StepName.RESOLVE_STEP_PATH, StepName.OCC_OBJECT_EXTRACT, StepName.OCC_GLB_EXPORT, StepName.STL_CACHE_GENERATE, StepName.BLENDER_RENDER, StepName.THREEJS_RENDER, StepName.THUMBNAIL_SAVE, } _ORDER_LINE_SETUP_REQUIRED_STEPS = { StepName.RESOLVE_TEMPLATE, StepName.MATERIAL_MAP_RESOLVE, StepName.AUTO_POPULATE_MATERIALS, StepName.GLB_BBOX, StepName.BLENDER_STILL, StepName.BLENDER_TURNTABLE, StepName.OUTPUT_SAVE, StepName.EXPORT_BLEND, StepName.NOTIFY, } _RESOLVE_TEMPLATE_RECOMMENDED_STEPS = { StepName.BLENDER_STILL, StepName.BLENDER_TURNTABLE, StepName.EXPORT_BLEND, StepName.NOTIFY, } _WORKFLOW_ROLLOUT_DISPLAY_ORDER = ("legacy_only", "shadow", "graph") def _sort_rollout_modes(modes: set[str]) -> list[str]: ordered = [mode for mode in _WORKFLOW_ROLLOUT_DISPLAY_ORDER if mode in modes] extras = sorted(mode for mode in modes if mode not in _WORKFLOW_ROLLOUT_DISPLAY_ORDER) return [*ordered, *extras] def _build_latest_run_out(run: WorkflowRun | None) -> WorkflowRolloutLatestRunOut | None: if run is None: return None return WorkflowRolloutLatestRunOut( workflow_run_id=run.id, execution_mode=run.execution_mode, status=run.status, created_at=run.created_at, completed_at=run.completed_at, ) def _collect_output_type_contract_reasons( *, output_types: list[OutputType], workflow_family: str | None, supported_artifact_kinds: tuple[str, ...], ) -> list[str]: reasons: list[str] = [] for output_type in output_types: try: validate_output_type_contract( workflow_family=output_type.workflow_family, artifact_kind=output_type.artifact_kind, output_format=output_type.output_format, is_animation=output_type.is_animation, ) except ValueError as exc: reasons.append(f"{output_type.name}: {exc}") continue if workflow_family == "mixed": reasons.append(f"{output_type.name}: mixed-family workflows cannot be promoted.") continue if workflow_family is not None and workflow_family != output_type.workflow_family: reasons.append( f"{output_type.name}: workflow family '{workflow_family}' does not match output type family '{output_type.workflow_family}'." ) if output_type.artifact_kind not in supported_artifact_kinds: supported = ", ".join(supported_artifact_kinds) if supported_artifact_kinds else "none" reasons.append( f"{output_type.name}: workflow does not support artifact '{output_type.artifact_kind}' (supports {supported})." ) return reasons async def _build_rollout_summary( db: AsyncSession, wf: WorkflowDefinition, *, workflow_family: str | None, supported_artifact_kinds: tuple[str, ...], ) -> WorkflowRolloutSummaryOut: output_type_filters = [OutputType.workflow_definition_id == wf.id] if wf.output_type_id is not None: output_type_filters.append(OutputType.id == wf.output_type_id) output_type_result = await db.execute( select(OutputType) .where(or_(*output_type_filters)) .order_by(OutputType.is_active.desc(), OutputType.sort_order, OutputType.name) ) linked_output_types = list(output_type_result.scalars().unique().all()) rollout_modes = _sort_rollout_modes( { (getattr(output_type, "workflow_rollout_mode", None) or "legacy_only").strip().lower() for output_type in linked_output_types } ) blocking_reasons = _collect_output_type_contract_reasons( output_types=linked_output_types, workflow_family=workflow_family, supported_artifact_kinds=supported_artifact_kinds, ) latest_run_result = await db.execute( select(WorkflowRun) .where(WorkflowRun.workflow_def_id == wf.id) .order_by(WorkflowRun.created_at.desc()) .limit(1) ) latest_run = latest_run_result.scalar_one_or_none() latest_shadow_run_result = await db.execute( select(WorkflowRun) .where( WorkflowRun.workflow_def_id == wf.id, WorkflowRun.execution_mode == "shadow", ) .order_by(WorkflowRun.created_at.desc()) .limit(1) ) latest_shadow_run = latest_shadow_run_result.scalar_one_or_none() latest_comparison = ( await build_workflow_run_comparison(db, latest_shadow_run.id) if latest_shadow_run is not None else None ) return WorkflowRolloutSummaryOut( linked_output_type_count=len(linked_output_types), active_output_type_count=sum(1 for output_type in linked_output_types if output_type.is_active), linked_output_type_names=[output_type.name for output_type in linked_output_types], linked_output_types=[ WorkflowRolloutLinkedOutputTypeOut( id=output_type.id, name=output_type.name, is_active=bool(output_type.is_active), artifact_kind=(output_type.artifact_kind or "custom").strip().lower(), workflow_rollout_mode=( getattr(output_type, "workflow_rollout_mode", None) or "legacy_only" ).strip().lower(), ) for output_type in linked_output_types ], rollout_modes=rollout_modes, has_blocking_contracts=bool(blocking_reasons), blocking_reasons=blocking_reasons, latest_run=_build_latest_run_out(latest_run), latest_shadow_run=_build_latest_run_out(latest_shadow_run), latest_rollout_gate_verdict=( latest_comparison.rollout_gate_verdict if latest_comparison is not None else None ), latest_rollout_ready=( latest_comparison.workflow_rollout_ready if latest_comparison is not None else None ), latest_rollout_status=( latest_comparison.workflow_rollout_status if latest_comparison is not None else None ), latest_rollout_reasons=( latest_comparison.rollout_reasons if latest_comparison is not None else [] ), ) async def _workflow_to_out(db: AsyncSession, wf: WorkflowDefinition) -> WorkflowDefinitionOut: canonical_config = canonicalize_workflow_config(wf.config) workflow_family = infer_workflow_family_from_config(canonical_config) supported_artifact_kinds = tuple( derive_supported_artifact_kinds_from_workflow_config(canonical_config) ) return WorkflowDefinitionOut( id=wf.id, name=wf.name, output_type_id=wf.output_type_id, config=canonical_config, family=workflow_family, supported_artifact_kinds=list(supported_artifact_kinds), rollout_summary=await _build_rollout_summary( db, wf, workflow_family=workflow_family, supported_artifact_kinds=supported_artifact_kinds, ), is_active=wf.is_active, created_at=wf.created_at, ) def _format_order_line_context_label(order: Order, line: OrderLine) -> tuple[str, str]: product_label = "Unnamed product" if line.product is not None: product_label = ( (getattr(line.product, "name", None) or "").strip() or (getattr(line.product, "pim_id", None) or "").strip() or product_label ) output_label = ( (getattr(line.output_type, "name", None) or "").strip() if line.output_type is not None else "Tracking only" ) detail_bits = [output_label or "Tracking only"] image_number = (line.gewuenschte_bildnummer or "").strip() if image_number: detail_bits.append(f"Image {image_number}") render_position_name = ( (getattr(line.render_position, "name", None) or "").strip() if getattr(line, "render_position", None) is not None else "" ) if render_position_name: detail_bits.append(render_position_name) return ( f"{product_label} · {' · '.join(detail_bits)}", f"{order.order_number} · {line.render_status or 'pending'}", ) def _issue( severity: str, code: str, message: str, *, node_id: str | None = None, step: str | None = None, ) -> WorkflowPreflightIssueOut: return WorkflowPreflightIssueOut( severity=severity, code=code, message=message, node_id=node_id, step=step, ) def _node_status(issues: list[WorkflowPreflightIssueOut], *, supported: bool) -> str: if not supported: return "unsupported" if any(issue.severity == "error" for issue in issues): return "error" if any(issue.severity == "warning" for issue in issues): return "warning" return "ready" def _infer_expected_context_kind(ordered_nodes: list) -> str: if any(node.step in _ORDER_LINE_RUNTIME_STEPS for node in ordered_nodes): return "order_line" return "cad_file" def _build_workflow_preflight( session: Session, wf: WorkflowDefinition, *, context_id: str, ) -> WorkflowPreflightOut: return _build_workflow_preflight_for_config( session, workflow_id=wf.id, workflow_config=wf.config, context_id=context_id, ) def _build_workflow_preflight_for_config( session: Session, *, workflow_id: uuid.UUID | None, workflow_config: dict, context_id: str, ) -> WorkflowPreflightOut: from pydantic import ValidationError as _ValidationError from app.domains.rendering.workflow_executor import ( prepare_workflow_context, submit_prepared_workflow_tasks, ) normalized_config = canonicalize_workflow_config(workflow_config) try: workflow_context = prepare_workflow_context( normalized_config, context_id=context_id, execution_mode="graph", ) except _ValidationError as exc: raise HTTPException(status_code=422, detail=f"Invalid workflow config: {exc.errors()}") except ValueError as exc: raise HTTPException(status_code=422, detail=str(exc)) execution_mode = get_workflow_execution_mode(normalized_config, default="legacy") unsupported_node_ids = find_unsupported_graph_nodes(workflow_context) expected_context_kind = _infer_expected_context_kind(workflow_context.ordered_nodes) issues: list[WorkflowPreflightIssueOut] = [] parsed_context_id: uuid.UUID | None = None try: parsed_context_id = uuid.UUID(context_id) except ValueError: issues.append(_issue("error", "invalid_context_id", "Context ID must be a valid UUID.")) order_line: OrderLine | None = None cad_file: CadFile | None = None context_kind: str | None = None resolved_order_line_id: uuid.UUID | None = None resolved_cad_file_id: uuid.UUID | None = None setup = None template_resolution = None bbox_resolution = None if parsed_context_id is not None: order_line = session.get(OrderLine, parsed_context_id) if order_line is not None: context_kind = "order_line" resolved_order_line_id = order_line.id else: cad_file = session.get(CadFile, parsed_context_id) if cad_file is not None: context_kind = "cad_file" resolved_cad_file_id = cad_file.id if context_kind is None and parsed_context_id is not None: issues.append( _issue( "error", "context_not_found", "Context ID did not match an existing order line or CAD file.", ) ) if context_kind is not None and context_kind != expected_context_kind: issues.append( _issue( "error", "context_kind_mismatch", f"Workflow expects a {expected_context_kind} context, but the supplied ID resolves to {context_kind}.", ) ) if context_kind == "order_line" and order_line is not None: setup = prepare_order_line_render_context(session, str(order_line.id), persist_state=False) if setup.order_line is not None: resolved_order_line_id = setup.order_line.id if setup.cad_file is not None: resolved_cad_file_id = setup.cad_file.id if setup.status == "missing": issues.append(_issue("error", "order_line_missing", "Order line could not be loaded.")) elif setup.status == "failed": issues.append( _issue( "error", "order_line_not_renderable", f"Order line is not renderable: {setup.reason or 'unknown reason'}.", ) ) elif setup.status == "skip": issues.append( _issue( "error", "order_line_skipped", f"Order line would be skipped by the legacy setup step: {setup.reason or 'skip'}.", ) ) if setup.is_ready: template_resolution = resolve_order_line_template_context(session, setup) if any(node.step == StepName.GLB_BBOX for node in workflow_context.ordered_nodes): glb_path = str(setup.glb_reuse_path) if setup.glb_reuse_path is not None else None if glb_path is None and setup.cad_file is not None: step_file = Path(setup.cad_file.stored_path) fallback_glb = step_file.parent / f"{step_file.stem}_thumbnail.glb" if fallback_glb.exists(): glb_path = str(fallback_glb) bbox_resolution = resolve_cad_bbox( setup.cad_file.stored_path, glb_path=glb_path, ) elif context_kind == "cad_file" and cad_file is not None: resolved_cad_file_id = cad_file.id step_path = Path(cad_file.stored_path) if not cad_file.stored_path: issues.append(_issue("error", "cad_file_missing_path", "CAD file has no stored STEP path.")) elif not step_path.exists(): issues.append( _issue( "error", "cad_file_step_missing", f"STEP source path does not exist: {cad_file.stored_path}", ) ) node_indices = {node.id: index for index, node in enumerate(workflow_context.ordered_nodes)} nodes_out: list[WorkflowPreflightNodeOut] = [] blocking_issue_found = any(issue.severity == "error" for issue in issues) for node in workflow_context.ordered_nodes: definition = get_node_definition(node.step) supported = node.id not in unsupported_node_ids node_issues: list[WorkflowPreflightIssueOut] = [] if not supported: node_issues.append( _issue( "error", "unsupported_node", f"Graph runtime has no executable implementation for step '{node.step.value}'.", node_id=node.id, step=node.step.value, ) ) if expected_context_kind == "order_line" and node.step in _CAD_FILE_ENTRY_STEPS: node_issues.append( _issue( "error", "cad_file_only_node", "This node currently requires a direct cad_file entry context and cannot run inside an order-line graph.", node_id=node.id, step=node.step.value, ) ) if node.step in _ORDER_LINE_SETUP_REQUIRED_STEPS: setup_indices = [ node_indices[candidate.id] for candidate in workflow_context.ordered_nodes if candidate.step == StepName.ORDER_LINE_SETUP ] has_prior_setup = any(index < node_indices[node.id] for index in setup_indices) if node.step != StepName.ORDER_LINE_SETUP and not has_prior_setup: node_issues.append( _issue( "error", "missing_order_line_setup", "This node requires an earlier order_line_setup node in the graph.", node_id=node.id, step=node.step.value, ) ) if context_kind != "order_line": node_issues.append( _issue( "error", "invalid_context_kind", "This node requires an order_line context.", node_id=node.id, step=node.step.value, ) ) elif setup is None or not setup.is_ready: reason = setup.reason if setup is not None else "order_line_setup_not_executed" node_issues.append( _issue( "error", "setup_not_ready", f"Order-line setup is not ready for this node: {reason}.", node_id=node.id, step=node.step.value, ) ) if node.step in _RESOLVE_TEMPLATE_RECOMMENDED_STEPS: template_indices = [ node_indices[candidate.id] for candidate in workflow_context.ordered_nodes if candidate.step == StepName.RESOLVE_TEMPLATE ] has_prior_template = any(index < node_indices[node.id] for index in template_indices) if not has_prior_template: node_issues.append( _issue( "warning", "missing_resolve_template", "No earlier resolve_template node found. Render defaults may drift from legacy behavior.", node_id=node.id, step=node.step.value, ) ) if node.step == StepName.RESOLVE_TEMPLATE and template_resolution is not None and template_resolution.template is None: node_issues.append( _issue( "warning", "template_missing", "No render template matched this order line. The graph would fall back to factory settings.", node_id=node.id, step=node.step.value, ) ) if node.step == StepName.GLB_BBOX and bbox_resolution is not None and not bbox_resolution.has_bbox: node_issues.append( _issue( "warning", "bbox_unresolved", "Bounding box data could not be derived from the available GLB or STEP source.", node_id=node.id, step=node.step.value, ) ) if node.step in _CAD_FILE_ENTRY_STEPS and context_kind == "cad_file" and cad_file is not None: if not cad_file.stored_path: node_issues.append( _issue( "error", "cad_file_missing_path", "CAD file has no stored STEP path.", node_id=node.id, step=node.step.value, ) ) elif not Path(cad_file.stored_path).exists(): node_issues.append( _issue( "error", "cad_file_step_missing", f"STEP source path does not exist: {cad_file.stored_path}", node_id=node.id, step=node.step.value, ) ) status = _node_status(node_issues, supported=supported) if status in {"error", "unsupported"}: blocking_issue_found = True nodes_out.append( WorkflowPreflightNodeOut( node_id=node.id, step=node.step.value, label=node.ui.label if node.ui is not None else None, execution_kind=definition.execution_kind if definition is not None else "bridge", supported=supported, status=status, issues=node_issues, ) ) graph_dispatch_allowed = not blocking_issue_found warning_count = sum(1 for node in nodes_out if node.status == "warning") + sum( 1 for issue in issues if issue.severity == "warning" ) if graph_dispatch_allowed: summary = ( "Preflight passed with warnings." if warning_count > 0 else "Graph runtime is ready for this context." ) else: summary = "Preflight found blocking issues that would prevent a safe graph dispatch." return WorkflowPreflightOut( workflow_id=workflow_id, context_id=context_id, context_kind=context_kind, expected_context_kind=expected_context_kind, execution_mode=execution_mode, graph_dispatch_allowed=graph_dispatch_allowed, summary=summary, resolved_order_line_id=resolved_order_line_id, resolved_cad_file_id=resolved_cad_file_id, unsupported_node_ids=unsupported_node_ids, issues=issues, nodes=nodes_out, ) @router.get("/{workflow_id}/preflight", response_model=WorkflowPreflightOut) async def preflight_workflow( workflow_id: uuid.UUID, context_id: str = Query( ..., description=( "UUID of the entity to validate against the graph runtime. " "For order-line workflows this is an order_line_id; " "for STEP/thumbnail workflows this is a cad_file_id." ), ), _user: User = Depends(require_admin_or_pm), db: AsyncSession = Depends(get_db), ): result = await db.execute( select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id) ) wf = result.scalar_one_or_none() if not wf: raise HTTPException(status_code=404, detail="Workflow definition not found") if not wf.config: raise HTTPException(status_code=400, detail="Workflow has no config") return await db.run_sync( lambda sync_session: _build_workflow_preflight( sync_session, wf, context_id=context_id, ) ) @router.post("/preflight", response_model=WorkflowPreflightOut) async def preflight_workflow_draft( body: WorkflowDraftPreflightRequest, _user: User = Depends(require_admin_or_pm), db: AsyncSession = Depends(get_db), ): if not body.config: raise HTTPException(status_code=400, detail="Workflow has no config") return await db.run_sync( lambda sync_session: _build_workflow_preflight_for_config( sync_session, workflow_id=body.workflow_id, workflow_config=body.config, context_id=body.context_id, ) ) @router.get("/node-definitions", response_model=NodeDefinitionsResponse) async def get_node_definitions( _user: User = Depends(require_admin_or_pm), ): return NodeDefinitionsResponse(definitions=list_node_definitions()) @router.get("/contexts/order-lines", response_model=list[WorkflowOrderLineContextGroupOut]) async def list_workflow_order_line_contexts( limit: int = Query(50, ge=1, le=200), user: User = Depends(require_admin_or_pm), db: AsyncSession = Depends(get_db), ): """Return a lightweight order-line picker model for the workflow editor.""" order_query = ( select(Order) .where(Order.lines.any()) .options( selectinload(Order.lines).selectinload(OrderLine.product), selectinload(Order.lines).selectinload(OrderLine.output_type), selectinload(Order.lines).selectinload(OrderLine.render_position), ) .order_by(Order.updated_at.desc()) .limit(limit) ) if user.role.value not in {"global_admin", "tenant_admin", "admin", "project_manager"}: order_query = order_query.where(Order.created_by == user.id) result = await db.execute(order_query) orders = result.scalars().all() groups: list[WorkflowOrderLineContextGroupOut] = [] for order in orders: options: list[WorkflowOrderLineContextOptionOut] = [] for line in order.lines: label, meta = _format_order_line_context_label(order, line) options.append( WorkflowOrderLineContextOptionOut( value=line.id, label=label, meta=meta, ) ) if options: groups.append( WorkflowOrderLineContextGroupOut( order_id=order.id, order_label=order.order_number, options=options, ) ) return groups @router.get("/pipeline-steps", response_model=PipelineStepsResponse) async def get_pipeline_steps( _user: User = Depends(require_admin_or_pm), ): steps = [ PipelineStepOut( name=definition.step, label=definition.label, category=definition.category, description=definition.description, ) for definition in list_node_definitions() ] return PipelineStepsResponse(steps=steps) @router.get("", response_model=list[WorkflowDefinitionOut]) async def list_workflows( _user: User = Depends(require_admin_or_pm), db: AsyncSession = Depends(get_db), ): result = await db.execute( select(WorkflowDefinition).order_by(WorkflowDefinition.created_at) ) return [await _workflow_to_out(db, wf) for wf in result.scalars().all()] @router.get("/{workflow_id}", response_model=WorkflowDefinitionOut) async def get_workflow( workflow_id: uuid.UUID, _user: User = Depends(require_admin_or_pm), db: AsyncSession = Depends(get_db), ): result = await db.execute( select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id) ) wf = result.scalar_one_or_none() if not wf: raise HTTPException(status_code=404, detail="Workflow definition not found") return await _workflow_to_out(db, wf) @router.post("", response_model=WorkflowDefinitionOut, status_code=201) async def create_workflow( body: WorkflowDefinitionCreate, _user: User = Depends(require_global_admin), db: AsyncSession = Depends(get_db), ): normalized_config = canonicalize_workflow_config(body.config) if body.config: try: WorkflowConfig.model_validate(normalized_config) except (ValidationError, ValueError) as exc: detail = exc.errors() if isinstance(exc, ValidationError) else str(exc) raise HTTPException(status_code=422, detail=f"Invalid workflow config: {detail}") wf = WorkflowDefinition( name=body.name, output_type_id=body.output_type_id, config=normalized_config, is_active=body.is_active, ) db.add(wf) await db.commit() await db.refresh(wf) return await _workflow_to_out(db, wf) @router.put("/{workflow_id}", response_model=WorkflowDefinitionOut) async def update_workflow( workflow_id: uuid.UUID, body: WorkflowDefinitionUpdate, _user: User = Depends(require_global_admin), db: AsyncSession = Depends(get_db), ): result = await db.execute( select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id) ) wf = result.scalar_one_or_none() if not wf: raise HTTPException(status_code=404, detail="Workflow definition not found") if body.name is not None: wf.name = body.name if body.config is not None: try: normalized_config = canonicalize_workflow_config(body.config) WorkflowConfig.model_validate(normalized_config) except (ValidationError, ValueError) as exc: detail = exc.errors() if isinstance(exc, ValidationError) else str(exc) raise HTTPException(status_code=422, detail=f"Invalid workflow config: {detail}") wf.config = normalized_config flag_modified(wf, "config") if body.is_active is not None: wf.is_active = body.is_active await db.commit() await db.refresh(wf) return await _workflow_to_out(db, wf) @router.delete("/{workflow_id}", status_code=204) async def delete_workflow( workflow_id: uuid.UUID, _user: User = Depends(require_global_admin), db: AsyncSession = Depends(get_db), ): result = await db.execute( select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id) ) wf = result.scalar_one_or_none() if not wf: raise HTTPException(status_code=404, detail="Workflow definition not found") await db.delete(wf) await db.commit() @router.get("/{workflow_id}/runs", response_model=list[WorkflowRunOut]) async def list_workflow_runs( workflow_id: uuid.UUID, _user: User = Depends(require_admin_or_pm), db: AsyncSession = Depends(get_db), ): # Verify the workflow exists wf_result = await db.execute( select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id) ) if not wf_result.scalar_one_or_none(): raise HTTPException(status_code=404, detail="Workflow definition not found") result = await db.execute( select(WorkflowRun) .where(WorkflowRun.workflow_def_id == workflow_id) .options(selectinload(WorkflowRun.node_results)) .order_by(WorkflowRun.created_at.desc()) ) return result.scalars().all() @router.get("/runs/{run_id}/comparison", response_model=WorkflowRunComparisonOut) async def get_workflow_run_comparison( run_id: uuid.UUID, _user: User = Depends(require_admin_or_pm), db: AsyncSession = Depends(get_db), ): comparison = await build_workflow_run_comparison(db, run_id) if comparison is None: raise HTTPException(status_code=404, detail="Workflow run not found") return comparison class WorkflowDispatchResponse(BaseModel): workflow_run: WorkflowRunOut context_id: str execution_mode: str dispatched: int task_ids: list[str] async def _dispatch_workflow_for_config( db: AsyncSession, *, workflow_id: uuid.UUID | None, workflow_config: dict, context_id: str, ) -> WorkflowDispatchResponse: from app.domains.rendering.workflow_executor import prepare_workflow_context from app.domains.rendering.workflow_graph_runtime import execute_graph_workflow from app.domains.rendering.workflow_run_service import ( create_workflow_run, mark_workflow_run_failed, ) try: normalized_config = canonicalize_workflow_config(workflow_config) workflow_context = prepare_workflow_context( normalized_config, context_id=context_id, execution_mode="graph", ) except ValidationError as exc: raise HTTPException(status_code=422, detail=f"Invalid workflow config: {exc.errors()}") except ValueError as exc: raise HTTPException(status_code=422, detail=str(exc)) resolved_order_line_id: uuid.UUID | None = None if _infer_expected_context_kind(workflow_context.ordered_nodes) == "order_line": try: parsed_context_id = uuid.UUID(context_id) except ValueError as exc: raise HTTPException(status_code=422, detail="Context ID must be a valid UUID.") from exc order_line_result = await db.execute( select(OrderLine.id).where(OrderLine.id == parsed_context_id) ) resolved_order_line_id = order_line_result.scalar_one_or_none() if resolved_order_line_id is None: raise HTTPException(status_code=404, detail="Order line context not found") order_result = await db.execute( select(Order) .join(OrderLine, OrderLine.order_id == Order.id) .where(OrderLine.id == resolved_order_line_id) ) order = order_result.scalar_one_or_none() if order is not None and order.status in (OrderStatus.submitted, OrderStatus.completed): now = datetime.utcnow() order.status = OrderStatus.processing order.processing_started_at = now order.completed_at = None order.updated_at = now run_id = await db.run_sync( lambda sync_session: create_workflow_run( sync_session, workflow_def_id=workflow_id, order_line_id=resolved_order_line_id, workflow_context=workflow_context, ).id ) await db.commit() try: dispatch_result = await db.run_sync( lambda sync_session: execute_graph_workflow( sync_session, workflow_context, dispatch_tasks=False, ) ) except Exception as exc: failed_result = await db.execute( select(WorkflowRun) .where(WorkflowRun.id == run_id) .options(selectinload(WorkflowRun.node_results)) ) failed_run = failed_result.scalar_one() mark_workflow_run_failed(failed_run, str(exc)) await db.commit() raise await db.commit() try: submit_prepared_workflow_tasks(dispatch_result) except Exception as exc: failed_result = await db.execute( select(WorkflowRun) .where(WorkflowRun.id == run_id) .options(selectinload(WorkflowRun.node_results)) ) failed_run = failed_result.scalar_one() mark_workflow_run_failed(failed_run, str(exc)) await db.commit() raise refreshed_result = await db.execute( select(WorkflowRun) .where(WorkflowRun.id == run_id) .options(selectinload(WorkflowRun.node_results)) ) refreshed_run = refreshed_result.scalar_one() return WorkflowDispatchResponse( workflow_run=refreshed_run, context_id=context_id, execution_mode=workflow_context.execution_mode, dispatched=len(dispatch_result.task_ids), task_ids=dispatch_result.task_ids, ) @router.post("/dispatch", response_model=WorkflowDispatchResponse) async def dispatch_workflow_draft( request: WorkflowDraftDispatchRequest, _user: User = Depends(require_pm_or_above), db: AsyncSession = Depends(get_db), ): workflow_id = request.workflow_id if workflow_id is not None: result = await db.execute( select(WorkflowDefinition.id).where(WorkflowDefinition.id == workflow_id) ) if result.scalar_one_or_none() is None: raise HTTPException(status_code=404, detail="Workflow definition not found") return await _dispatch_workflow_for_config( db, workflow_id=workflow_id, workflow_config=request.config, context_id=request.context_id, ) @router.post("/{workflow_id}/dispatch", response_model=WorkflowDispatchResponse) async def dispatch_workflow_endpoint( workflow_id: uuid.UUID, context_id: str = Query( ..., description=( "UUID of the entity to process. " "For STEP/thumbnail steps this is a cad_file_id; " "for render steps this is an order_line_id." ), ), _user: User = Depends(require_pm_or_above), db: AsyncSession = Depends(get_db), ): """Dispatch a workflow's steps as Celery tasks for a given context entity. Each node in the workflow config is dispatched as an individual Celery task in topological (dependency) order. Returns the list of Celery task IDs so the caller can track progress. """ result = await db.execute( select(WorkflowDefinition).where(WorkflowDefinition.id == workflow_id) ) wf = result.scalar_one_or_none() if not wf: raise HTTPException(status_code=404, detail="Workflow definition not found") if not wf.config: raise HTTPException(status_code=400, detail="Workflow has no config") return await _dispatch_workflow_for_config( db, workflow_id=wf.id, workflow_config=wf.config, context_id=context_id, )