"""Workflow-aware render dispatch service. C2: extends the legacy dispatch_render path with WorkflowDefinition support. If an OutputType has workflow_definition_id set: - Loads the WorkflowDefinition - Calls dispatch_workflow() to build + submit a Celery Canvas - Creates a WorkflowRun record tracking the submission If no workflow_definition_id is set, falls back to the existing direct task-dispatch logic in app.services.render_dispatcher (legacy path). """ from __future__ import annotations import logging from datetime import datetime logger = logging.getLogger(__name__) def dispatch_render_with_workflow(order_line_id: str) -> dict: """Dispatch a render for the given order line. Checks whether the associated OutputType has a WorkflowDefinition linked. If yes, uses the Celery Canvas workflow builder. If no, falls back to the legacy direct-dispatch logic. This function is synchronous (Celery-task-safe). """ from sqlalchemy import create_engine, select from sqlalchemy.orm import Session, selectinload from app.config import settings from app.domains.orders.models import OrderLine from app.domains.rendering.models import OutputType, WorkflowDefinition, WorkflowRun engine = create_engine( settings.database_url.replace("+asyncpg", ""), pool_pre_ping=True, ) with Session(engine) as session: # Load order line with its output_type line = session.execute( select(OrderLine) .where(OrderLine.id == order_line_id) .options(selectinload(OrderLine.output_type)) ).scalar_one_or_none() if not line: raise ValueError(f"OrderLine {order_line_id} not found") output_type: OutputType | None = line.output_type if output_type is None or output_type.workflow_definition_id is None: # Legacy path — no workflow definition linked logger.info( "order_line %s: no workflow_definition_id, using legacy dispatch", order_line_id, ) return _legacy_dispatch(order_line_id) # Load the linked WorkflowDefinition wf_def: WorkflowDefinition | None = session.execute( select(WorkflowDefinition).where( WorkflowDefinition.id == output_type.workflow_definition_id, WorkflowDefinition.is_active.is_(True), ) ).scalar_one_or_none() if wf_def is None: logger.warning( "order_line %s: workflow_definition_id %s not found or inactive, " "falling back to legacy dispatch", order_line_id, output_type.workflow_definition_id, ) return _legacy_dispatch(order_line_id) workflow_type = wf_def.config.get("type") params = wf_def.config.get("params", {}) logger.info( "order_line %s: dispatching via WorkflowDefinition %s (type=%s)", order_line_id, wf_def.id, workflow_type, ) # For turntable workflows: resolve step_path + output_dir from the order line at runtime if workflow_type == "turntable" and ("step_path" not in params or "output_dir" not in params): from app.domains.products.models import CadFile as _CadFile from pathlib import Path as _Path from app.config import settings as _cfg _product = line.product if hasattr(line, "product") else None if _product is None: from sqlalchemy.orm import selectinload as _si from app.domains.orders.models import OrderLine as _OL _line_full = session.execute( select(_OL).where(_OL.id == line.id).options(_si(_OL.product)) ).scalar_one_or_none() _product = _line_full.product if _line_full else None if _product and _product.cad_file_id: _cad = session.execute( select(_CadFile).where(_CadFile.id == _product.cad_file_id) ).scalar_one_or_none() if _cad and _cad.stored_path: params.setdefault("step_path", _cad.stored_path) params.setdefault( "output_dir", str(_Path(_cfg.upload_dir) / "renders" / str(line.id)), ) from app.domains.rendering.workflow_builder import dispatch_workflow celery_task_id = dispatch_workflow(workflow_type, order_line_id, params) # Persist a WorkflowRun record run = WorkflowRun( workflow_def_id=wf_def.id, order_line_id=line.id, celery_task_id=celery_task_id, status="pending", started_at=datetime.utcnow(), ) session.add(run) session.commit() return { "backend": "workflow", "workflow_type": workflow_type, "workflow_run_id": str(run.id), "celery_task_id": celery_task_id, } def _legacy_dispatch(order_line_id: str) -> dict: """Queue render_order_line_task (the working Celery render implementation).""" from app.tasks.step_tasks import render_order_line_task render_order_line_task.delay(order_line_id) return {"backend": "celery", "queued": True}