Files
HartOMat/backend/app/domains/rendering/dispatch_service.py
T
Hartmut ab3f9c734a fix: render pipeline + multi-tenancy bugs (B-Fix-1 through B-Fix-9)
- Remove worker-thumbnail (no Blender, was competing on thumbnail_rendering)
- Move render_order_line_task to thumbnail_rendering queue (render-worker)
- Restore template_service.py real implementation (fix circular import shim)
- Thread tenant_id through STEP upload, Excel import, product create
- Make system tables (output_types, materials, etc.) tenant_id nullable
- Fix tenants frontend 307-redirect: use trailing slash /tenants/
- Remove Flamenco + Three.js from Admin UI (unsupported)
- Set all output_types render_backend to celery (was flamenco)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-06 19:34:20 +01:00

117 lines
4.0 KiB
Python

"""Workflow-aware render dispatch service.
C2: extends the legacy dispatch_render path with WorkflowDefinition support.
If an OutputType has workflow_definition_id set:
- Loads the WorkflowDefinition
- Calls dispatch_workflow() to build + submit a Celery Canvas
- Creates a WorkflowRun record tracking the submission
If no workflow_definition_id is set, falls back to the existing direct
task-dispatch logic in app.services.render_dispatcher (legacy path).
"""
from __future__ import annotations
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
def dispatch_render_with_workflow(order_line_id: str) -> dict:
"""Dispatch a render for the given order line.
Checks whether the associated OutputType has a WorkflowDefinition linked.
If yes, uses the Celery Canvas workflow builder.
If no, falls back to the legacy direct-dispatch logic.
This function is synchronous (Celery-task-safe).
"""
from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session, selectinload
from app.config import settings
from app.domains.orders.models import OrderLine
from app.domains.rendering.models import OutputType, WorkflowDefinition, WorkflowRun
engine = create_engine(
settings.database_url.replace("+asyncpg", ""),
pool_pre_ping=True,
)
with Session(engine) as session:
# Load order line with its output_type
line = session.execute(
select(OrderLine)
.where(OrderLine.id == order_line_id)
.options(selectinload(OrderLine.output_type))
).scalar_one_or_none()
if not line:
raise ValueError(f"OrderLine {order_line_id} not found")
output_type: OutputType | None = line.output_type
if output_type is None or output_type.workflow_definition_id is None:
# Legacy path — no workflow definition linked
logger.info(
"order_line %s: no workflow_definition_id, using legacy dispatch",
order_line_id,
)
return _legacy_dispatch(order_line_id)
# Load the linked WorkflowDefinition
wf_def: WorkflowDefinition | None = session.execute(
select(WorkflowDefinition).where(
WorkflowDefinition.id == output_type.workflow_definition_id,
WorkflowDefinition.is_active.is_(True),
)
).scalar_one_or_none()
if wf_def is None:
logger.warning(
"order_line %s: workflow_definition_id %s not found or inactive, "
"falling back to legacy dispatch",
order_line_id,
output_type.workflow_definition_id,
)
return _legacy_dispatch(order_line_id)
workflow_type = wf_def.config.get("type")
params = wf_def.config.get("params", {})
logger.info(
"order_line %s: dispatching via WorkflowDefinition %s (type=%s)",
order_line_id,
wf_def.id,
workflow_type,
)
from app.domains.rendering.workflow_builder import dispatch_workflow
celery_task_id = dispatch_workflow(workflow_type, order_line_id, params)
# Persist a WorkflowRun record
run = WorkflowRun(
workflow_def_id=wf_def.id,
order_line_id=line.id,
celery_task_id=celery_task_id,
status="pending",
started_at=datetime.utcnow(),
)
session.add(run)
session.commit()
return {
"backend": "workflow",
"workflow_type": workflow_type,
"workflow_run_id": str(run.id),
"celery_task_id": celery_task_id,
}
def _legacy_dispatch(order_line_id: str) -> dict:
"""Queue render_order_line_task (the working Celery render implementation)."""
from app.tasks.step_tasks import render_order_line_task
render_order_line_task.delay(order_line_id)
return {"backend": "celery", "queued": True}