Files
HartOMat/backend/app/tasks/step_tasks.py
T
Hartmut f5ca91ee02 feat: layout hamburger, media browser filters+previews, billing fixes
- Layout: mobile hamburger menu + overlay backdrop + close button; content area always full-width
- Media browser: filter chips (default still+turntable); advanced toggle for GLB/STL; thumbnail_url previews for non-image types; video hover-play for turntable
- Backend: asset_types multi-filter, thumbnail_url in MediaAssetOut, download proxy endpoint for MinIO/local files
- Admin: "Import Existing Media" button → POST /api/admin/import-media-assets
- Billing: fix invoice create 500 (MissingGreenlet — use selectinload after commit); PDF download uses axios blob instead of bare <a href> (auth header missing); fix storage.upload() accepting str|Path
- SSE task logs: task_logs.py core + router, LiveRenderLog component
- CadPreview: fix infinite loop when no gltf_geometry assets; loading screen before ThreeDViewer render
- render-worker: add trimesh layer to Dockerfile

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-07 00:09:27 +01:00

771 lines
37 KiB
Python

"""Celery tasks for STEP file processing and thumbnail generation."""
import logging
from app.tasks.celery_app import celery_app
from app.core.task_logs import log_task_event
logger = logging.getLogger(__name__)
@celery_app.task(bind=True, name="app.tasks.step_tasks.process_step_file", queue="step_processing")
def process_step_file(self, cad_file_id: str):
"""Process a STEP file: extract objects, generate thumbnail, convert to glTF.
After processing completes, auto-populate cad_part_materials from Excel
component data for any linked products that don't yet have materials assigned.
A per-file Redis lock (TTL = 10 min) prevents duplicate tasks from processing
the same file concurrently — e.g. when 'Process Unprocessed' is clicked while
a file is already being processed.
"""
import redis as redis_lib
from app.config import settings as app_settings
lock_key = f"step_processing_lock:{cad_file_id}"
r = redis_lib.from_url(app_settings.redis_url)
acquired = r.set(lock_key, "1", nx=True, ex=600) # 10-minute TTL
if not acquired:
logger.warning(f"STEP file {cad_file_id} is already being processed — skipping duplicate task")
return
try:
logger.info(f"Processing STEP file (metadata only): {cad_file_id}")
try:
from app.services.step_processor import extract_cad_metadata
extract_cad_metadata(cad_file_id)
except Exception as exc:
logger.error(f"STEP metadata extraction failed for {cad_file_id}: {exc}")
r.delete(lock_key) # release lock so a retry can proceed
raise self.retry(exc=exc, countdown=60, max_retries=3)
finally:
r.delete(lock_key) # always release on completion or unhandled error
# Queue thumbnail rendering on the dedicated single-concurrency worker
render_step_thumbnail.delay(cad_file_id)
def _auto_populate_materials_for_cad(cad_file_id: str) -> None:
"""Sync helper: auto-populate cad_part_materials from Excel for newly-processed CAD files.
Only fills products where cad_part_materials is empty or all-blank,
preventing overwrites of manually assigned materials.
"""
from sqlalchemy import create_engine, select as sql_select, update as sql_update
from sqlalchemy.orm import Session
from app.config import settings as app_settings
from app.models.cad_file import CadFile
from app.models.product import Product
from app.api.routers.products import build_materials_from_excel
from app.services.step_processor import build_part_colors
sync_url = app_settings.database_url.replace("+asyncpg", "")
eng = create_engine(sync_url)
with Session(eng) as session:
# Load the CAD file to get parsed objects
cad_file = session.execute(
sql_select(CadFile).where(CadFile.id == cad_file_id)
).scalar_one_or_none()
if cad_file is None:
return
parsed_objects = cad_file.parsed_objects or {}
cad_parts: list[str] = parsed_objects.get("objects", [])
if not cad_parts:
return
# Find products linked to this CAD file that have Excel components
products = session.execute(
sql_select(Product).where(
Product.cad_file_id == cad_file.id,
Product.is_active.is_(True),
)
).scalars().all()
final_part_colors = None
for product in products:
excel_components: list[dict] = product.components or []
if not excel_components:
continue
# Only auto-fill when cad_part_materials is empty or all-blank
existing = product.cad_part_materials or []
if existing and any(m.get("material", "").strip() for m in existing):
continue # has at least one real material — don't overwrite
new_materials = build_materials_from_excel(cad_parts, excel_components)
session.execute(
sql_update(Product)
.where(Product.id == product.id)
.values(cad_part_materials=new_materials)
)
session.flush()
# Compute part colors; thumbnail queued once after the loop
try:
final_part_colors = build_part_colors(cad_parts, new_materials)
except Exception:
logger.exception(f"Part colors build failed for product {product.id}")
logger.info(
f"Auto-populated {len(new_materials)} materials for product {product.id} "
f"from {len(excel_components)} Excel components"
)
session.commit()
# Queue exactly ONE thumbnail regeneration per CAD file regardless of how many
# products were auto-populated. Queuing once-per-product multiplies the task
# count needlessly and causes the Redis queue depth to grow instead of shrink.
if final_part_colors is not None:
try:
regenerate_thumbnail.delay(str(cad_file_id), final_part_colors)
except Exception:
logger.exception(f"Thumbnail regen queue failed for cad_file {cad_file_id}")
eng.dispose()
@celery_app.task(bind=True, name="app.tasks.step_tasks.render_step_thumbnail", queue="thumbnail_rendering")
def render_step_thumbnail(self, cad_file_id: str):
"""Render the thumbnail for a freshly-processed STEP file.
Runs on the dedicated thumbnail_rendering queue (concurrency=1) so the
blender-renderer service is never overwhelmed by concurrent requests.
On success, also auto-populates materials and marks the CadFile as completed.
"""
logger.info(f"Rendering thumbnail for CAD file: {cad_file_id}")
# Compute and persist STEP file hash for STL cache lookups
try:
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from app.config import settings as app_settings
from app.models.cad_file import CadFile
from app.domains.products.cache_service import compute_step_hash
sync_url = app_settings.database_url.replace("+asyncpg", "")
_eng = create_engine(sync_url)
with Session(_eng) as _sess:
_cad = _sess.get(CadFile, cad_file_id)
if _cad and _cad.stored_path and not _cad.step_file_hash:
_hash = compute_step_hash(_cad.stored_path)
_cad.step_file_hash = _hash
_sess.commit()
logger.info(f"Saved step_file_hash for {cad_file_id}: {_hash[:12]}")
_eng.dispose()
except Exception:
logger.warning(f"step_file_hash computation failed for {cad_file_id} (non-fatal)")
try:
from app.services.step_processor import regenerate_cad_thumbnail
success = regenerate_cad_thumbnail(cad_file_id, part_colors={})
if not success:
raise RuntimeError("regenerate_cad_thumbnail returned False")
except Exception as exc:
logger.error(f"Thumbnail render failed for {cad_file_id}: {exc}")
raise self.retry(exc=exc, countdown=30, max_retries=2)
# Auto-populate materials now that parsed_objects are available
try:
_auto_populate_materials_for_cad(cad_file_id)
except Exception:
logger.exception(
f"Auto material population failed for cad_file {cad_file_id} (non-fatal)"
)
# Broadcast WebSocket event for live UI updates
try:
from sqlalchemy import create_engine, select as sql_select2
from sqlalchemy.orm import Session as _Session
from app.config import settings as _cfg
from app.models.cad_file import CadFile as _CadFile
_sync_url = _cfg.database_url.replace("+asyncpg", "")
_eng = create_engine(_sync_url)
with _Session(_eng) as _s:
_cad = _s.get(_CadFile, cad_file_id)
_tid = str(_cad.tenant_id) if _cad and _cad.tenant_id else None
_eng.dispose()
if _tid:
from app.core.websocket import publish_event_sync
publish_event_sync(_tid, {
"type": "cad_processing_complete",
"cad_file_id": cad_file_id,
"status": "completed",
})
except Exception:
logger.debug("WebSocket publish for CAD complete skipped (non-fatal)")
@celery_app.task(bind=True, name="app.tasks.step_tasks.generate_stl_cache", queue="thumbnail_rendering")
def generate_stl_cache(self, cad_file_id: str, quality: str):
"""Generate and cache STL for a CAD file without triggering a full render."""
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from app.config import settings as app_settings
from app.models.cad_file import CadFile
logger.info(f"Generating {quality}-quality STL for CAD file: {cad_file_id}")
sync_url = app_settings.database_url.replace("+asyncpg", "")
eng = create_engine(sync_url)
with Session(eng) as session:
cad_file = session.get(CadFile, cad_file_id)
if not cad_file or not cad_file.stored_path:
logger.error(f"CAD file not found or no stored_path: {cad_file_id}")
return
step_path = cad_file.stored_path
eng.dispose()
try:
from app.services.render_blender import convert_step_to_stl, export_per_part_stls
from app.domains.products.cache_service import compute_step_hash, check_stl_cache, store_stl_cache
from pathlib import Path as _Path
step = _Path(step_path)
stl_out = step.parent / f"{step.stem}_{quality}.stl"
parts_dir = step.parent / f"{step.stem}_{quality}_parts"
if not stl_out.exists() or stl_out.stat().st_size == 0:
# Check MinIO cache before running cadquery conversion
step_hash = compute_step_hash(step_path)
cached_bytes = check_stl_cache(step_hash, quality)
if cached_bytes:
stl_out.write_bytes(cached_bytes)
logger.info(f"STL cache hit for {cad_file_id} ({quality}), skipped conversion")
else:
convert_step_to_stl(step, stl_out, quality)
# Store result in MinIO for future workers
if stl_out.exists() and stl_out.stat().st_size > 0:
store_stl_cache(step_hash, quality, str(stl_out))
if not (parts_dir / "manifest.json").exists():
try:
export_per_part_stls(step, parts_dir, quality)
except Exception as pe:
logger.warning(f"Per-part STL export non-fatal: {pe}")
logger.info(f"STL cached: {stl_out} ({stl_out.stat().st_size // 1024} KB)")
except Exception as exc:
logger.error(f"STL generation failed for {cad_file_id} quality={quality}: {exc}")
raise self.retry(exc=exc, countdown=30, max_retries=2)
@celery_app.task(bind=True, name="app.tasks.step_tasks.generate_gltf_geometry_task", queue="thumbnail_rendering", max_retries=1)
def generate_gltf_geometry_task(self, cad_file_id: str):
"""Export a geometry-only GLB from the STL low-quality cache using trimesh.
Creates a MediaAsset with asset_type='gltf_geometry' and cad_file_id set.
No Blender required — trimesh handles the STL→GLB conversion.
"""
from pathlib import Path as _Path
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from app.config import settings as app_settings
from app.models.cad_file import CadFile
sync_url = app_settings.database_url.replace("+asyncpg", "")
eng = create_engine(sync_url)
with Session(eng) as session:
cad_file = session.get(CadFile, cad_file_id)
if not cad_file or not cad_file.stored_path:
logger.error("generate_gltf_geometry_task: no stored_path for %s", cad_file_id)
return
step_path_str = cad_file.stored_path
eng.dispose()
log_task_event(self.request.id, f"Starting generate_gltf_geometry_task: cad_file={cad_file_id}", "info")
step = _Path(step_path_str)
stl_path = step.parent / f"{step.stem}_low.stl"
if not stl_path.exists():
log_task_event(self.request.id, f"Failed: STL cache not found: {stl_path}", "error")
logger.error("generate_gltf_geometry_task: STL not found %s", stl_path)
raise RuntimeError(f"STL cache not found: {stl_path}")
output_path = step.parent / f"{step.stem}_geometry.glb"
try:
import trimesh
mesh = trimesh.load(str(stl_path))
mesh.export(str(output_path))
log_task_event(self.request.id, f"Completed successfully: {output_path.name}", "done")
logger.info("generate_gltf_geometry_task: exported %s", output_path.name)
except Exception as exc:
log_task_event(self.request.id, f"Failed: {exc}", "error")
logger.error("generate_gltf_geometry_task failed for %s: %s", cad_file_id, exc)
raise self.retry(exc=exc, countdown=15)
# Create MediaAsset record
import asyncio
async def _store():
from app.database import AsyncSessionLocal
from app.domains.media.models import MediaAsset, MediaAssetType
async with AsyncSessionLocal() as db:
import uuid
asset = MediaAsset(
cad_file_id=uuid.UUID(cad_file_id),
asset_type=MediaAssetType.gltf_geometry,
storage_key=str(output_path),
mime_type="model/gltf-binary",
file_size_bytes=output_path.stat().st_size if output_path.exists() else None,
)
db.add(asset)
await db.commit()
return str(asset.id)
asset_id = asyncio.get_event_loop().run_until_complete(_store())
logger.info("generate_gltf_geometry_task: MediaAsset %s created for cad %s", asset_id, cad_file_id)
return {"glb_path": str(output_path), "asset_id": asset_id}
@celery_app.task(bind=True, name="app.tasks.step_tasks.regenerate_thumbnail", queue="thumbnail_rendering")
def regenerate_thumbnail(self, cad_file_id: str, part_colors: dict):
"""Regenerate thumbnail with per-part colours."""
logger.info(f"Regenerating thumbnail for CAD file: {cad_file_id}")
try:
from app.services.step_processor import regenerate_cad_thumbnail
success = regenerate_cad_thumbnail(cad_file_id, part_colors)
if not success:
raise RuntimeError("regenerate_cad_thumbnail returned False")
except Exception as exc:
logger.error(f"Thumbnail regeneration failed for {cad_file_id}: {exc}")
raise self.retry(exc=exc, countdown=30, max_retries=2)
@celery_app.task(name="app.tasks.step_tasks.dispatch_order_line_render", queue="step_processing")
def dispatch_order_line_render(order_line_id: str):
"""Route an order-line render to render_order_line_task."""
logger.info(f"Dispatching render for order line: {order_line_id}")
render_order_line_task.delay(order_line_id)
@celery_app.task(bind=True, name="app.tasks.step_tasks.render_order_line_task", queue="thumbnail_rendering", max_retries=3)
def render_order_line_task(self, order_line_id: str):
"""Render a specific output type for an order line.
Loads OrderLine → Product → CadFile → OutputType.render_settings.
Merges with system render settings. Stores result at order_line.result_path.
"""
logger.info(f"Rendering order line: {order_line_id}")
from app.services.render_log import emit
emit(order_line_id, "Celery render task started")
try:
from sqlalchemy import create_engine, select, update as sql_update
from sqlalchemy.orm import Session, joinedload
from app.config import settings as app_settings
# Use sync session for Celery (no async event loop)
sync_url = app_settings.database_url.replace("+asyncpg", "")
engine = create_engine(sync_url)
with Session(engine) as session:
from app.models.order_line import OrderLine
from app.models.product import Product
emit(order_line_id, "Loading order line from database")
line = session.execute(
select(OrderLine)
.where(OrderLine.id == order_line_id)
.options(
joinedload(OrderLine.product).joinedload(Product.cad_file),
joinedload(OrderLine.output_type),
)
).scalar_one_or_none()
if line is None:
emit(order_line_id, "Order line not found in database", "error")
logger.error(f"OrderLine {order_line_id} not found")
return
if line.product.cad_file_id is None:
emit(order_line_id, "Product has no CAD file — marking as failed", "error")
logger.warning(f"OrderLine {order_line_id}: product has no CAD file")
session.execute(
sql_update(OrderLine)
.where(OrderLine.id == line.id)
.values(render_status="failed")
)
session.commit()
return
# Mark as processing with timing
from datetime import datetime
render_start = datetime.utcnow()
session.execute(
sql_update(OrderLine)
.where(OrderLine.id == line.id)
.values(
render_status="processing",
render_backend_used="celery",
render_started_at=render_start,
)
)
session.commit()
cad_file = line.product.cad_file
materials_source = line.product.cad_part_materials
part_colors = {}
if cad_file and cad_file.parsed_objects:
parsed_names = cad_file.parsed_objects.get("objects", [])
if materials_source:
from app.services.step_processor import build_part_colors
part_colors = build_part_colors(parsed_names, materials_source)
# Resolve render template + material library
from app.services.template_service import resolve_template, get_material_library_path
category_key = line.product.category_key if line.product else None
ot_id = str(line.output_type_id) if line.output_type_id else None
template = resolve_template(category_key=category_key, output_type_id=ot_id)
material_library = get_material_library_path()
# Build material_map (part_name → material_name) for material replacement.
# Works with or without a render template — only suppressed if a
# template explicitly has material_replace_enabled=False.
material_map = None
use_materials = bool(material_library and materials_source)
if template and not template.material_replace_enabled:
use_materials = False
if use_materials:
material_map = {
m["part_name"]: m["material"]
for m in materials_source
if m.get("part_name") and m.get("material")
}
# Resolve raw material names to SCHAEFFLER library names via aliases
from app.services.material_service import resolve_material_map
material_map = resolve_material_map(material_map)
if template:
emit(order_line_id, f"Using render template: {template.name} (collection={template.target_collection}, material_replace={template.material_replace_enabled}, lighting_only={template.lighting_only})")
logger.info(f"Render template resolved: '{template.name}' path={template.blend_file_path}, lighting_only={template.lighting_only}")
else:
emit(order_line_id, "No render template found — using factory settings (Mode A)")
logger.info(f"No render template for category_key={category_key!r}, output_type_id={ot_id!r}")
cad_name = cad_file.original_name if cad_file else "?"
# Load render_position for rotation values
rotation_x = rotation_y = rotation_z = 0.0
if line.render_position_id:
from app.models.render_position import ProductRenderPosition
rp = session.get(ProductRenderPosition, line.render_position_id)
if rp:
rotation_x, rotation_y, rotation_z = rp.rotation_x, rp.rotation_y, rp.rotation_z
emit(order_line_id, f"Render position: '{rp.name}' ({rotation_x}°, {rotation_y}°, {rotation_z}°)")
emit(order_line_id, f"Starting render for {cad_name} ({len(part_colors)} coloured parts)")
# Determine if this is an animation output type
is_animation = bool(line.output_type and getattr(line.output_type, 'is_animation', False))
# Determine output format/extension
out_ext = "jpg"
if line.output_type and line.output_type.output_format:
fmt = line.output_type.output_format.lower()
if fmt == "mp4":
out_ext = "mp4"
elif fmt in ("png", "jpg", "jpeg"):
out_ext = "png" if fmt == "png" else "jpg"
# Build meaningful output filename
import re
def _sanitize(s: str) -> str:
return re.sub(r'[^\w\-.]', '_', s.strip())[:100]
product_name = line.product.name or line.product.pim_id or "product"
ot_name = line.output_type.name if line.output_type else "render"
filename = f"{_sanitize(product_name)}_{_sanitize(ot_name)}.{out_ext}"
# Render to per-line output directory
from pathlib import Path as _Path
render_dir = _Path(app_settings.upload_dir) / "renders" / order_line_id
render_dir.mkdir(parents=True, exist_ok=True)
output_path = str(render_dir / filename)
# Extract per-output-type render settings
render_width = None
render_height = None
render_engine = None
render_samples = None
noise_threshold = ""
denoiser = ""
denoising_input_passes = ""
denoising_prefilter = ""
denoising_quality = ""
denoising_use_gpu = ""
frame_count = 24
fps = 25
bg_color = ""
turntable_axis = "world_z"
if line.output_type and line.output_type.render_settings:
rs = line.output_type.render_settings
if rs.get("width"):
render_width = int(rs["width"])
if rs.get("height"):
render_height = int(rs["height"])
if rs.get("engine"):
render_engine = rs["engine"]
if rs.get("samples"):
render_samples = int(rs["samples"])
if rs.get("frame_count"):
frame_count = int(rs["frame_count"])
if rs.get("fps"):
fps = int(rs["fps"])
bg_color = rs.get("bg_color", "")
turntable_axis = rs.get("turntable_axis", "world_z")
noise_threshold = str(rs.get("noise_threshold", ""))
denoiser = str(rs.get("denoiser", ""))
denoising_input_passes = str(rs.get("denoising_input_passes", ""))
denoising_prefilter = str(rs.get("denoising_prefilter", ""))
denoising_quality = str(rs.get("denoising_quality", ""))
denoising_use_gpu = str(rs.get("denoising_use_gpu", ""))
transparent_bg = bool(line.output_type and line.output_type.transparent_bg)
cycles_device_val = (line.output_type.cycles_device or "auto") if line.output_type else "auto"
# Build ordered part names list for index-based Blender matching
part_names_ordered = None
if cad_file and cad_file.parsed_objects:
part_names_ordered = cad_file.parsed_objects.get("objects", []) or None
tmpl_info = f" template={template.name}" if template else ""
if is_animation:
# ── Turntable animation path ────────────────────────────────
emit(order_line_id, f"Starting turntable render: {frame_count} frames @ {fps}fps, {render_width or 1920}x{render_height or 1920}{tmpl_info}")
from app.services.render_blender import is_blender_available, render_turntable_to_file
if not is_blender_available():
raise RuntimeError("Blender not available on this worker")
from app.services.step_processor import _get_all_settings
_sys = _get_all_settings()
try:
service_data = render_turntable_to_file(
step_path=_Path(cad_file.stored_path),
output_path=_Path(output_path),
frame_count=frame_count,
fps=fps,
width=render_width or 1920,
height=render_height or 1920,
engine=render_engine or _sys.get("blender_engine", "cycles"),
samples=render_samples or int(_sys.get(f"blender_{render_engine or _sys.get('blender_engine','cycles')}_samples", 128)),
stl_quality=_sys.get("stl_quality", "low"),
smooth_angle=int(_sys.get("blender_smooth_angle", 30)),
cycles_device=cycles_device_val,
transparent_bg=transparent_bg,
bg_color=bg_color,
turntable_axis=turntable_axis,
part_colors=part_colors or None,
template_path=template.blend_file_path if template else None,
target_collection=template.target_collection if template else "Product",
material_library_path=material_library if use_materials else None,
material_map=material_map,
part_names_ordered=part_names_ordered,
lighting_only=bool(template.lighting_only) if template else False,
shadow_catcher=bool(template.shadow_catcher_enabled) if template else False,
rotation_x=rotation_x,
rotation_y=rotation_y,
rotation_z=rotation_z,
)
success = True
render_log = {
"renderer": "blender",
"type": "turntable",
"format": "mp4",
"engine": render_engine or _sys.get("blender_engine", "cycles"),
"engine_used": service_data.get("engine_used", "cycles"),
"samples": render_samples,
"cycles_device": cycles_device_val,
"width": render_width or 1920,
"height": render_height or 1920,
"frame_count": service_data.get("frame_count", frame_count),
"fps": fps,
"total_duration_s": service_data.get("total_duration_s"),
"stl_duration_s": service_data.get("stl_duration_s"),
"render_duration_s": service_data.get("render_duration_s"),
"ffmpeg_duration_s": service_data.get("ffmpeg_duration_s"),
"stl_size_bytes": service_data.get("stl_size_bytes"),
"output_size_bytes": service_data.get("output_size_bytes"),
"log_lines": service_data.get("log_lines", []),
}
if template:
render_log["template"] = template.blend_file_path
except Exception as exc:
success = False
render_log = {"renderer": "blender", "type": "turntable", "error": str(exc)[:500]}
logger.error("Turntable render failed for %s: %s", order_line_id, exc)
else:
# ── Still image path ────────────────────────────────────────
emit(order_line_id, f"Calling renderer (STEP → STL → still) {render_width or 'default'}x{render_height or 'default'}{' [transparent]' if transparent_bg else ''}{f' engine={render_engine}' if render_engine else ''}{f' samples={render_samples}' if render_samples else ''}{tmpl_info}")
from app.services.step_processor import render_to_file
success, render_log = render_to_file(
step_path=cad_file.stored_path,
output_path=output_path,
part_colors=part_colors,
width=render_width,
height=render_height,
transparent_bg=transparent_bg,
engine=render_engine,
samples=render_samples,
template_path=template.blend_file_path if template else None,
target_collection=template.target_collection if template else "Product",
material_library_path=material_library if use_materials else None,
material_map=material_map,
part_names_ordered=part_names_ordered,
lighting_only=bool(template.lighting_only) if template else False,
shadow_catcher=bool(template.shadow_catcher_enabled) if template else False,
cycles_device=line.output_type.cycles_device if line.output_type else None,
rotation_x=rotation_x,
rotation_y=rotation_y,
rotation_z=rotation_z,
job_id=order_line_id,
noise_threshold=noise_threshold,
denoiser=denoiser,
denoising_input_passes=denoising_input_passes,
denoising_prefilter=denoising_prefilter,
denoising_quality=denoising_quality,
denoising_use_gpu=denoising_use_gpu,
)
new_status = "completed" if success else "failed"
render_end = datetime.utcnow()
elapsed = (render_end - render_start).total_seconds()
update_values = dict(
render_status=new_status,
render_completed_at=render_end,
render_log=render_log,
)
if success:
update_values["result_path"] = output_path
session.execute(
sql_update(OrderLine)
.where(OrderLine.id == line.id)
.values(**update_values)
)
session.commit()
if success:
emit(order_line_id, f"Render completed in {elapsed:.1f}s", "success")
else:
emit(order_line_id, f"Render failed after {elapsed:.1f}s", "error")
# Broadcast WebSocket event for live UI updates
try:
from app.core.websocket import publish_event_sync
_tenant_id = str(line.product.cad_file.tenant_id) if (
line.product and line.product.cad_file and line.product.cad_file.tenant_id
) else None
if _tenant_id:
publish_event_sync(_tenant_id, {
"type": "render_complete" if success else "render_failed",
"order_line_id": order_line_id,
"order_id": str(line.order_id),
"status": new_status,
})
except Exception:
logger.debug("WebSocket publish skipped (non-fatal)")
# Notify order creator about render result
try:
from app.models.order import Order as OrderModel
order_row = session.execute(
select(OrderModel.created_by, OrderModel.order_number)
.where(OrderModel.id == line.order_id)
).one_or_none()
if order_row:
from app.services.notification_service import emit_notification_sync
details: dict = {
"order_number": order_row[1],
"product_name": product_name,
"output_type": ot_name,
}
if not success and isinstance(render_log, dict):
err = render_log.get("error") or render_log.get("stderr", "")
if err:
details["error"] = str(err)[:300]
emit_notification_sync(
actor_user_id=None,
target_user_id=str(order_row[0]),
action="render.completed" if success else "render.failed",
entity_type="order",
entity_id=str(line.order_id),
details=details,
)
except Exception:
logger.exception("Failed to emit render notification")
# Check if all lines for this order are done → auto-advance
order_id_str = str(line.order_id)
engine.dispose()
from app.services.order_status_service import check_order_completion
check_order_completion(order_id_str)
except Exception as exc:
logger.error(f"render_order_line_task failed for {order_line_id}: {exc}")
# If retries exhausted, mark as failed so the line doesn't stay stuck
if self.request.retries >= self.max_retries:
logger.error(f"Max retries reached for {order_line_id}, marking as failed")
try:
from sqlalchemy import create_engine, update as sql_update2
from sqlalchemy.orm import Session as SyncSession
from app.config import settings as app_settings
from app.models.order_line import OrderLine as OL2
sync_url2 = app_settings.database_url.replace("+asyncpg", "")
eng2 = create_engine(sync_url2)
with SyncSession(eng2) as s2:
from datetime import datetime as dt2
s2.execute(
sql_update2(OL2).where(OL2.id == order_line_id)
.values(
render_status="failed",
render_completed_at=dt2.utcnow(),
render_log={"error": str(exc)[:500]},
)
)
s2.commit()
eng2.dispose()
from app.services.order_status_service import check_order_completion
# Try to get order_id from DB
eng3 = create_engine(sync_url2)
with SyncSession(eng3) as s3:
from sqlalchemy import select as sel
row = s3.execute(sel(OL2.order_id).where(OL2.id == order_line_id)).scalar_one_or_none()
if row:
check_order_completion(str(row))
eng3.dispose()
# Notify the order creator about the failure
try:
from sqlalchemy import select as sel2
from app.models.order import Order as OrderModel2
eng4 = create_engine(sync_url2)
with SyncSession(eng4) as s4:
order_row2 = s4.execute(
sel2(OrderModel2.created_by, OrderModel2.order_number)
.join(OL2, OL2.order_id == OrderModel2.id)
.where(OL2.id == order_line_id)
).one_or_none()
eng4.dispose()
if order_row2:
from app.services.notification_service import emit_notification_sync
emit_notification_sync(
actor_user_id=None,
target_user_id=str(order_row2[0]),
action="render.failed",
entity_type="order",
entity_id=None,
details={
"order_number": order_row2[1],
"product_name": "unknown",
"output_type": "unknown",
"error": str(exc)[:300],
},
)
except Exception:
logger.exception("Failed to emit render failure notification")
except Exception:
logger.exception(f"Failed to mark {order_line_id} as failed in DB")
raise
raise self.retry(exc=exc, countdown=60)