from __future__ import annotations import logging import shutil from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Any, Callable, Literal from sqlalchemy import select, update as sql_update from sqlalchemy.orm import Session, joinedload from app.config import settings as app_settings from app.domains.media.models import MediaAsset, MediaAssetType from app.domains.orders.models import Order, OrderLine, OrderStatus from app.domains.products.models import CadFile, Product from app.domains.rendering.models import GlobalRenderPosition, ProductRenderPosition, RenderTemplate from app.services.material_service import resolve_material_map from app.services.step_processor import build_part_colors from app.services.template_service import ( get_material_library_path_for_session, resolve_template_for_session, ) logger = logging.getLogger(__name__) EmitFn = Callable[..., None] | None SetupStatus = Literal["ready", "skip", "failed", "missing"] @dataclass(slots=True) class OrderLineRenderSetupResult: status: SetupStatus order_line: OrderLine | None = None order: Order | None = None cad_file: CadFile | None = None materials_source: list[dict[str, Any]] = field(default_factory=list) usd_render_path: Path | None = None glb_reuse_path: Path | None = None part_colors: dict[str, str] = field(default_factory=dict) render_start: datetime | None = None reason: str | None = None @property def is_ready(self) -> bool: return self.status == "ready" and self.order_line is not None and self.cad_file is not None @dataclass(slots=True) class RenderPositionContext: rotation_x: float = 0.0 rotation_y: float = 0.0 rotation_z: float = 0.0 focal_length_mm: float | None = None sensor_width_mm: float | None = None source_name: str | None = None source_kind: Literal["product", "global", "default"] = "default" @dataclass(slots=True) class TemplateResolutionResult: template: RenderTemplate | None material_library: str | None material_map: dict[str, str] | None use_materials: bool override_material: str | None category_key: str | None output_type_id: str | None def _emit(emit: EmitFn, order_line_id: str, message: str, level: str | None = None) -> None: if emit is None: return if level is None: emit(order_line_id, message) else: emit(order_line_id, message, level) def _resolve_asset_path(storage_key: str | None) -> Path | None: if not storage_key: return None candidate = Path(app_settings.upload_dir) / storage_key if candidate.exists(): return candidate return None def prepare_order_line_render_context( session: Session, order_line_id: str, *, emit: EmitFn = None, ) -> OrderLineRenderSetupResult: """Load and validate the order line, then prepare reusable render inputs.""" _emit(emit, order_line_id, "Loading order line from database") line = session.execute( select(OrderLine) .where(OrderLine.id == order_line_id) .options( joinedload(OrderLine.product).joinedload(Product.cad_file), joinedload(OrderLine.output_type), ) ).scalar_one_or_none() if line is None: _emit(emit, order_line_id, "Order line not found in database", "error") logger.error("OrderLine %s not found", order_line_id) return OrderLineRenderSetupResult(status="missing", reason="order_line_not_found") if line.render_status == "cancelled": _emit(emit, order_line_id, "Order line already cancelled — skipping render") logger.info("OrderLine %s cancelled — skipping", order_line_id) return OrderLineRenderSetupResult( status="skip", order_line=line, reason="line_cancelled", ) order = session.execute( select(Order).where(Order.id == line.order_id) ).scalar_one_or_none() if order and order.status in (OrderStatus.rejected, OrderStatus.completed): _emit(emit, order_line_id, f"Order {order.status.value} — skipping render") logger.info("OrderLine %s: order %s — skipping", order_line_id, order.status.value) if line.render_status in ("pending", "processing"): session.execute( sql_update(OrderLine) .where(OrderLine.id == line.id) .values(render_status="cancelled") ) session.commit() return OrderLineRenderSetupResult( status="skip", order_line=line, order=order, reason="order_closed", ) if line.product is None or line.product.cad_file_id is None or line.product.cad_file is None: _emit(emit, order_line_id, "Product has no CAD file — marking as failed", "error") logger.warning("OrderLine %s: product has no CAD file", order_line_id) session.execute( sql_update(OrderLine) .where(OrderLine.id == line.id) .values(render_status="failed") ) session.commit() return OrderLineRenderSetupResult( status="failed", order_line=line, order=order, reason="missing_cad_file", ) render_start = datetime.utcnow() session.execute( sql_update(OrderLine) .where(OrderLine.id == line.id) .values( render_status="processing", render_backend_used="celery", render_started_at=render_start, ) ) session.commit() cad_file = line.product.cad_file materials_source = line.product.cad_part_materials or [] usd_render_path = None usd_asset = session.execute( select(MediaAsset) .where( MediaAsset.cad_file_id == cad_file.id, MediaAsset.asset_type == MediaAssetType.usd_master, ) .order_by(MediaAsset.created_at.desc()) .limit(1) ).scalar_one_or_none() if usd_asset: usd_render_path = _resolve_asset_path(usd_asset.storage_key) if usd_render_path: logger.info( "render_order_line: using usd_master %s for cad %s", usd_render_path.name, cad_file.id, ) glb_reuse_path = None if not usd_render_path: glb_asset = session.execute( select(MediaAsset) .where( MediaAsset.cad_file_id == cad_file.id, MediaAsset.asset_type == MediaAssetType.gltf_geometry, ) .order_by(MediaAsset.created_at.desc()) .limit(1) ).scalar_one_or_none() glb_candidate = _resolve_asset_path(glb_asset.storage_key if glb_asset else None) if glb_candidate and glb_candidate.stat().st_size > 0: step_path = Path(cad_file.stored_path) expected_glb = step_path.parent / f"{step_path.stem}_thumbnail.glb" if not expected_glb.exists() or expected_glb.stat().st_size == 0: try: shutil.copy2(str(glb_candidate), str(expected_glb)) logger.info( "render_order_line: reused gltf_geometry asset %s -> %s", glb_candidate.name, expected_glb.name, ) glb_reuse_path = expected_glb except Exception as copy_exc: logger.warning("render_order_line: failed to copy GLB asset: %s", copy_exc) else: glb_reuse_path = expected_glb if usd_render_path: _emit(emit, order_line_id, "Using USD master for render (skipping GLB tessellation)") elif glb_reuse_path: _emit( emit, order_line_id, f"Reusing cached GLB geometry ({glb_reuse_path.name}) — skipping re-tessellation", ) else: _emit(emit, order_line_id, "No USD master or cached GLB — will tessellate STEP -> GLB") part_colors: dict[str, str] = {} if cad_file.parsed_objects: parsed_names = cad_file.parsed_objects.get("objects", []) if materials_source: part_colors = build_part_colors(parsed_names, materials_source) return OrderLineRenderSetupResult( status="ready", order_line=line, order=order, cad_file=cad_file, materials_source=materials_source, usd_render_path=usd_render_path, glb_reuse_path=glb_reuse_path, part_colors=part_colors, render_start=render_start, ) def resolve_order_line_template_context( session: Session, setup: OrderLineRenderSetupResult, *, emit: EmitFn = None, ) -> TemplateResolutionResult: """Resolve render template, material library, and material map for a prepared order line.""" if not setup.is_ready: raise ValueError("resolve_order_line_template_context requires a ready setup result") line = setup.order_line cad_file = setup.cad_file assert line is not None assert cad_file is not None materials_source = setup.materials_source category_key = line.product.category_key if line.product else None output_type_id = str(line.output_type_id) if line.output_type_id else None template = resolve_template_for_session( session, category_key=category_key, output_type_id=output_type_id, ) material_library = get_material_library_path_for_session(session) material_map = None use_materials = bool(material_library and materials_source) if template and not template.material_replace_enabled: use_materials = False if use_materials: material_map = { material["part_name"]: material["material"] for material in materials_source if material.get("part_name") and material.get("material") } material_map = resolve_material_map(material_map) line_override = getattr(line, "material_override", None) output_override = line.output_type.material_override if line.output_type else None override_material = line_override or output_override if override_material: override_keys = set(material_map.keys()) if material_map else set() if cad_file and cad_file.parsed_objects: for part_name in cad_file.parsed_objects.get("objects", []): override_keys.add(part_name) material_map = {key: override_material for key in override_keys} use_materials = True _emit( emit, str(line.id), f"Material override active: {len(material_map)} parts → {override_material}", ) if template: _emit( emit, str(line.id), "Using render template: " f"{template.name} (collection={template.target_collection}, " f"material_replace={template.material_replace_enabled}, " f"lighting_only={template.lighting_only})", ) logger.info( "Render template resolved: '%s' path=%s, lighting_only=%s", template.name, template.blend_file_path, template.lighting_only, ) if not template: _emit(emit, str(line.id), "No render template found — using factory settings (Mode A)") logger.info( "No render template for category_key=%r, output_type_id=%r", category_key, output_type_id, ) return TemplateResolutionResult( template=template, material_library=material_library, material_map=material_map, use_materials=use_materials, override_material=override_material, category_key=category_key, output_type_id=output_type_id, ) def resolve_render_position_context( session: Session, line: OrderLine, *, emit: EmitFn = None, ) -> RenderPositionContext: """Resolve per-line render position values with product/global fallback.""" if line.render_position_id: render_position = session.get(ProductRenderPosition, line.render_position_id) if render_position: _emit( emit, str(line.id), f"Render position: '{render_position.name}' " f"({render_position.rotation_x}°, {render_position.rotation_y}°, {render_position.rotation_z}°)" + ( f" focal_length={render_position.focal_length_mm}mm" if render_position.focal_length_mm else "" ), ) return RenderPositionContext( rotation_x=render_position.rotation_x, rotation_y=render_position.rotation_y, rotation_z=render_position.rotation_z, focal_length_mm=render_position.focal_length_mm, sensor_width_mm=render_position.sensor_width_mm, source_name=render_position.name, source_kind="product", ) if line.global_render_position_id: global_position = session.get(GlobalRenderPosition, line.global_render_position_id) if global_position: _emit( emit, str(line.id), f"Global render position: '{global_position.name}' " f"({global_position.rotation_x}°, {global_position.rotation_y}°, {global_position.rotation_z}°)" + ( f" focal_length={global_position.focal_length_mm}mm" if global_position.focal_length_mm else "" ), ) return RenderPositionContext( rotation_x=global_position.rotation_x, rotation_y=global_position.rotation_y, rotation_z=global_position.rotation_z, focal_length_mm=global_position.focal_length_mm, sensor_width_mm=global_position.sensor_width_mm, source_name=global_position.name, source_kind="global", ) return RenderPositionContext()