Files
HartOMat/backend/app/services/render_dispatcher.py
T
Hartmut 1d6864fb64 refactor(A1): remove Flamenco, simplify render pipeline to Celery-only
- Remove flamenco-manager and flamenco-worker from docker-compose.yml
- Delete flamenco_client.py, flamenco_tasks.py, docker_scaler.py
- Simplify render_dispatcher.py to Celery-only (removes ~300 lines)
- Remove Flamenco beat schedule from celery_app.py
- Clean admin.py: remove flamenco settings, endpoints, threejs validation
- Clean orders.py cancel-render: Celery revoke only
- Clean worker.py: remove flamenco_job_id from activity response
- Migration 032: cancel lingering flamenco jobs, remove flamenco settings
- PLAN.md: mark all decisions confirmed, status IN UMSETZUNG

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-06 15:38:37 +01:00

97 lines
3.4 KiB
Python

"""Render dispatcher — routes render jobs to Celery.
All renders run via Celery workers (Flamenco removed in v2 refactor).
"""
import logging
from datetime import datetime
from sqlalchemy import select, update as sql_update
from sqlalchemy.orm import Session, joinedload
from app.models.order_line import OrderLine
from app.models.product import Product
from app.models.system_setting import SystemSetting
logger = logging.getLogger(__name__)
def _load_setting(session: Session, key: str, default: str = "") -> str:
"""Load a single system setting (sync)."""
row = session.execute(
select(SystemSetting).where(SystemSetting.key == key)
).scalar_one_or_none()
return row.value if row else default
def dispatch_render(order_line_id: str) -> dict:
"""Dispatch a render job to Celery.
Must be called from a sync context (Celery task or sync wrapper).
Returns {"backend": "celery", "job_ref": str}.
"""
from app.config import settings as app_settings
from app.services.render_log import emit, clear
clear(order_line_id)
emit(order_line_id, "Dispatch started — loading order line data")
sync_url = app_settings.database_url.replace("+asyncpg", "")
from sqlalchemy import create_engine
engine_db = create_engine(sync_url)
with Session(engine_db) as session:
line = session.execute(
select(OrderLine)
.where(OrderLine.id == order_line_id)
.options(
joinedload(OrderLine.product).joinedload(Product.cad_file),
joinedload(OrderLine.output_type),
)
).scalar_one_or_none()
if line is None:
emit(order_line_id, "Order line not found", "error")
logger.error(f"OrderLine {order_line_id} not found")
return {"backend": "none", "job_ref": "", "error": "not_found"}
product_name = line.product.name or line.product.pim_id or "unknown"
output_name = line.output_type.name if line.output_type else "default"
emit(order_line_id, f"Product: {product_name} | Output: {output_name}")
if line.product.cad_file_id is None:
emit(order_line_id, "Product has no CAD file — marking as failed", "error")
logger.warning(f"OrderLine {order_line_id}: product has no CAD file")
session.execute(
sql_update(OrderLine)
.where(OrderLine.id == line.id)
.values(render_status="failed")
)
session.commit()
return {"backend": "none", "job_ref": "", "error": "no_cad_file"}
cad_name = line.product.cad_file.original_name if line.product.cad_file else "?"
emit(order_line_id, f"CAD file: {cad_name}")
emit(order_line_id, "Dispatching to Celery render worker")
now = datetime.utcnow()
session.execute(
sql_update(OrderLine)
.where(OrderLine.id == line.id)
.values(
render_status="processing",
render_backend_used="celery",
render_started_at=now,
)
)
session.commit()
engine_db.dispose()
return _dispatch_celery(order_line_id)
def _dispatch_celery(order_line_id: str) -> dict:
"""Dispatch to the Celery render task."""
from app.tasks.step_tasks import render_order_line_task
result = render_order_line_task.delay(order_line_id)
return {"backend": "celery", "job_ref": result.id}