Files
HartOMat/backend/app/domains/rendering/workflow_runtime_services.py
T

623 lines
21 KiB
Python

from __future__ import annotations
import logging
import shutil
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Literal
from sqlalchemy import select, update as sql_update
from sqlalchemy.orm import Session, joinedload
from app.config import settings as app_settings
from app.domains.media.models import MediaAsset, MediaAssetType
from app.domains.orders.models import Order, OrderLine, OrderStatus
from app.domains.products.models import CadFile, Product
from app.domains.rendering.models import GlobalRenderPosition, ProductRenderPosition, RenderTemplate
from app.services.material_service import resolve_material_map
from app.services.step_processor import build_part_colors
from app.services.template_service import (
get_material_library_path_for_session,
resolve_template_for_session,
)
logger = logging.getLogger(__name__)
EmitFn = Callable[..., None] | None
SetupStatus = Literal["ready", "skip", "failed", "missing"]
QueueThumbnailFn = Callable[[str, dict[str, str]], None] | None
@dataclass(slots=True)
class OrderLineRenderSetupResult:
status: SetupStatus
order_line: OrderLine | None = None
order: Order | None = None
cad_file: CadFile | None = None
materials_source: list[dict[str, Any]] = field(default_factory=list)
usd_render_path: Path | None = None
glb_reuse_path: Path | None = None
part_colors: dict[str, str] = field(default_factory=dict)
render_start: datetime | None = None
reason: str | None = None
@property
def is_ready(self) -> bool:
return self.status == "ready" and self.order_line is not None and self.cad_file is not None
@dataclass(slots=True)
class RenderPositionContext:
rotation_x: float = 0.0
rotation_y: float = 0.0
rotation_z: float = 0.0
focal_length_mm: float | None = None
sensor_width_mm: float | None = None
source_name: str | None = None
source_kind: Literal["product", "global", "default"] = "default"
@dataclass(slots=True)
class TemplateResolutionResult:
template: RenderTemplate | None
material_library: str | None
material_map: dict[str, str] | None
use_materials: bool
override_material: str | None
category_key: str | None
output_type_id: str | None
@dataclass(slots=True)
class MaterialResolutionResult:
material_map: dict[str, str] | None
use_materials: bool
override_material: str | None
source_material_count: int = 0
resolved_material_count: int = 0
@dataclass(slots=True)
class AutoPopulateMaterialsResult:
cad_file_id: str
updated_product_ids: list[str] = field(default_factory=list)
queued_thumbnail_regeneration: bool = False
part_colors: dict[str, str] | None = None
cad_parts: list[str] = field(default_factory=list)
@dataclass(slots=True)
class BBoxResolutionResult:
bbox_data: dict[str, dict[str, float]] | None
source_kind: Literal["glb", "step", "none"]
step_path: str
glb_path: str | None = None
@property
def has_bbox(self) -> bool:
return self.bbox_data is not None
def _emit(emit: EmitFn, order_line_id: str, message: str, level: str | None = None) -> None:
if emit is None:
return
if level is None:
emit(order_line_id, message)
else:
emit(order_line_id, message, level)
def _resolve_asset_path(storage_key: str | None) -> Path | None:
if not storage_key:
return None
candidate = Path(app_settings.upload_dir) / storage_key
if candidate.exists():
return candidate
return None
def extract_bbox_from_glb(glb_path: str) -> dict[str, dict[str, float]] | None:
"""Extract a bounding box from a GLB file in meters and convert to mm."""
try:
import trimesh
path = Path(glb_path)
if not path.exists():
return None
scene = trimesh.load(str(path), force="scene")
bounds = getattr(scene, "bounds", None)
if bounds is None:
return None
mins, maxs = bounds
dims = maxs - mins
return {
"dimensions_mm": {
"x": round(float(dims[0]) * 1000, 2),
"y": round(float(dims[1]) * 1000, 2),
"z": round(float(dims[2]) * 1000, 2),
},
"bbox_center_mm": {
"x": round(float((mins[0] + maxs[0]) / 2) * 1000, 2),
"y": round(float((mins[1] + maxs[1]) / 2) * 1000, 2),
"z": round(float((mins[2] + maxs[2]) / 2) * 1000, 2),
},
}
except Exception as exc:
logger.debug("extract_bbox_from_glb failed for %s: %s", glb_path, exc)
return None
def extract_bbox_from_step_cadquery(step_path: str) -> dict[str, dict[str, float]] | None:
"""Fallback: extract a bounding box by re-parsing the STEP file via cadquery."""
try:
import cadquery as cq
bb = cq.importers.importStep(step_path).val().BoundingBox()
return {
"dimensions_mm": {
"x": round(bb.xlen, 2),
"y": round(bb.ylen, 2),
"z": round(bb.zlen, 2),
},
"bbox_center_mm": {
"x": round((bb.xmin + bb.xmax) / 2, 2),
"y": round((bb.ymin + bb.ymax) / 2, 2),
"z": round((bb.zmin + bb.zmax) / 2, 2),
},
}
except Exception as exc:
logger.debug("extract_bbox_from_step_cadquery failed for %s: %s", step_path, exc)
return None
def resolve_cad_bbox(
step_path: str,
*,
glb_path: str | None = None,
) -> BBoxResolutionResult:
"""Resolve CAD bounding-box data with the legacy GLB-first fallback order."""
bbox_data = None
source_kind: Literal["glb", "step", "none"] = "none"
if glb_path:
bbox_data = extract_bbox_from_glb(glb_path)
if bbox_data:
source_kind = "glb"
if bbox_data is None:
bbox_data = extract_bbox_from_step_cadquery(step_path)
if bbox_data:
source_kind = "step"
return BBoxResolutionResult(
bbox_data=bbox_data,
source_kind=source_kind,
step_path=step_path,
glb_path=glb_path,
)
def prepare_order_line_render_context(
session: Session,
order_line_id: str,
*,
emit: EmitFn = None,
) -> OrderLineRenderSetupResult:
"""Load and validate the order line, then prepare reusable render inputs."""
_emit(emit, order_line_id, "Loading order line from database")
line = session.execute(
select(OrderLine)
.where(OrderLine.id == order_line_id)
.options(
joinedload(OrderLine.product).joinedload(Product.cad_file),
joinedload(OrderLine.output_type),
)
).scalar_one_or_none()
if line is None:
_emit(emit, order_line_id, "Order line not found in database", "error")
logger.error("OrderLine %s not found", order_line_id)
return OrderLineRenderSetupResult(status="missing", reason="order_line_not_found")
if line.render_status == "cancelled":
_emit(emit, order_line_id, "Order line already cancelled — skipping render")
logger.info("OrderLine %s cancelled — skipping", order_line_id)
return OrderLineRenderSetupResult(
status="skip",
order_line=line,
reason="line_cancelled",
)
order = session.execute(
select(Order).where(Order.id == line.order_id)
).scalar_one_or_none()
if order and order.status in (OrderStatus.rejected, OrderStatus.completed):
_emit(emit, order_line_id, f"Order {order.status.value} — skipping render")
logger.info("OrderLine %s: order %s — skipping", order_line_id, order.status.value)
if line.render_status in ("pending", "processing"):
session.execute(
sql_update(OrderLine)
.where(OrderLine.id == line.id)
.values(render_status="cancelled")
)
session.commit()
return OrderLineRenderSetupResult(
status="skip",
order_line=line,
order=order,
reason="order_closed",
)
if line.product is None or line.product.cad_file_id is None or line.product.cad_file is None:
_emit(emit, order_line_id, "Product has no CAD file — marking as failed", "error")
logger.warning("OrderLine %s: product has no CAD file", order_line_id)
session.execute(
sql_update(OrderLine)
.where(OrderLine.id == line.id)
.values(render_status="failed")
)
session.commit()
return OrderLineRenderSetupResult(
status="failed",
order_line=line,
order=order,
reason="missing_cad_file",
)
render_start = datetime.utcnow()
session.execute(
sql_update(OrderLine)
.where(OrderLine.id == line.id)
.values(
render_status="processing",
render_backend_used="celery",
render_started_at=render_start,
)
)
session.commit()
cad_file = line.product.cad_file
materials_source = line.product.cad_part_materials or []
usd_render_path = None
usd_asset = session.execute(
select(MediaAsset)
.where(
MediaAsset.cad_file_id == cad_file.id,
MediaAsset.asset_type == MediaAssetType.usd_master,
)
.order_by(MediaAsset.created_at.desc())
.limit(1)
).scalar_one_or_none()
if usd_asset:
usd_render_path = _resolve_asset_path(usd_asset.storage_key)
if usd_render_path:
logger.info(
"render_order_line: using usd_master %s for cad %s",
usd_render_path.name,
cad_file.id,
)
glb_reuse_path = None
if not usd_render_path:
glb_asset = session.execute(
select(MediaAsset)
.where(
MediaAsset.cad_file_id == cad_file.id,
MediaAsset.asset_type == MediaAssetType.gltf_geometry,
)
.order_by(MediaAsset.created_at.desc())
.limit(1)
).scalar_one_or_none()
glb_candidate = _resolve_asset_path(glb_asset.storage_key if glb_asset else None)
if glb_candidate and glb_candidate.stat().st_size > 0:
step_path = Path(cad_file.stored_path)
expected_glb = step_path.parent / f"{step_path.stem}_thumbnail.glb"
if not expected_glb.exists() or expected_glb.stat().st_size == 0:
try:
shutil.copy2(str(glb_candidate), str(expected_glb))
logger.info(
"render_order_line: reused gltf_geometry asset %s -> %s",
glb_candidate.name,
expected_glb.name,
)
glb_reuse_path = expected_glb
except Exception as copy_exc:
logger.warning("render_order_line: failed to copy GLB asset: %s", copy_exc)
else:
glb_reuse_path = expected_glb
if usd_render_path:
_emit(emit, order_line_id, "Using USD master for render (skipping GLB tessellation)")
elif glb_reuse_path:
_emit(
emit,
order_line_id,
f"Reusing cached GLB geometry ({glb_reuse_path.name}) — skipping re-tessellation",
)
else:
_emit(emit, order_line_id, "No USD master or cached GLB — will tessellate STEP -> GLB")
part_colors: dict[str, str] = {}
if cad_file.parsed_objects:
parsed_names = cad_file.parsed_objects.get("objects", [])
if materials_source:
part_colors = build_part_colors(parsed_names, materials_source)
return OrderLineRenderSetupResult(
status="ready",
order_line=line,
order=order,
cad_file=cad_file,
materials_source=materials_source,
usd_render_path=usd_render_path,
glb_reuse_path=glb_reuse_path,
part_colors=part_colors,
render_start=render_start,
)
def resolve_order_line_template_context(
session: Session,
setup: OrderLineRenderSetupResult,
*,
emit: EmitFn = None,
) -> TemplateResolutionResult:
"""Resolve render template, material library, and material map for a prepared order line."""
if not setup.is_ready:
raise ValueError("resolve_order_line_template_context requires a ready setup result")
line = setup.order_line
cad_file = setup.cad_file
assert line is not None
assert cad_file is not None
materials_source = setup.materials_source
category_key = line.product.category_key if line.product else None
output_type_id = str(line.output_type_id) if line.output_type_id else None
template = resolve_template_for_session(
session,
category_key=category_key,
output_type_id=output_type_id,
)
material_library = get_material_library_path_for_session(session)
material_resolution = resolve_order_line_material_map(
line,
cad_file,
materials_source,
material_library=material_library,
template=template,
emit=emit,
)
if template:
_emit(
emit,
str(line.id),
"Using render template: "
f"{template.name} (collection={template.target_collection}, "
f"material_replace={template.material_replace_enabled}, "
f"lighting_only={template.lighting_only})",
)
logger.info(
"Render template resolved: '%s' path=%s, lighting_only=%s",
template.name,
template.blend_file_path,
template.lighting_only,
)
if not template:
_emit(emit, str(line.id), "No render template found — using factory settings (Mode A)")
logger.info(
"No render template for category_key=%r, output_type_id=%r",
category_key,
output_type_id,
)
return TemplateResolutionResult(
template=template,
material_library=material_library,
material_map=material_resolution.material_map,
use_materials=material_resolution.use_materials,
override_material=material_resolution.override_material,
category_key=category_key,
output_type_id=output_type_id,
)
def resolve_order_line_material_map(
line: OrderLine,
cad_file: CadFile | None,
materials_source: list[dict[str, Any]],
*,
material_library: str | None,
template: RenderTemplate | None,
emit: EmitFn = None,
) -> MaterialResolutionResult:
"""Resolve the effective order-line material map with legacy precedence rules."""
material_map = None
raw_material_count = 0
use_materials = bool(material_library and materials_source)
if template and not template.material_replace_enabled:
use_materials = False
if use_materials:
material_map = {
material["part_name"]: material["material"]
for material in materials_source
if material.get("part_name") and material.get("material")
}
raw_material_count = len(material_map)
material_map = resolve_material_map(material_map)
line_override = getattr(line, "material_override", None)
output_override = line.output_type.material_override if line.output_type else None
override_material = line_override or output_override
if override_material:
override_keys = set(material_map.keys()) if material_map else set()
if cad_file and cad_file.parsed_objects:
for part_name in cad_file.parsed_objects.get("objects", []):
override_keys.add(part_name)
material_map = {key: override_material for key in override_keys}
use_materials = True
_emit(
emit,
str(line.id),
f"Material override active: {len(material_map)} parts → {override_material}",
)
return MaterialResolutionResult(
material_map=material_map,
use_materials=use_materials,
override_material=override_material,
source_material_count=raw_material_count,
resolved_material_count=len(material_map or {}),
)
def auto_populate_materials_for_cad(
session: Session,
cad_file_id: str,
*,
enqueue_thumbnail: QueueThumbnailFn = None,
) -> AutoPopulateMaterialsResult:
"""Auto-fill empty CAD material mappings from Excel component data.
This preserves the legacy rules:
- only products with empty/all-blank `cad_part_materials` are updated
- thumbnail regeneration is queued at most once per CAD file
- the queued part-color map comes from the last updated product
"""
from app.api.routers.products import build_materials_from_excel
cad_file = session.execute(
select(CadFile).where(CadFile.id == cad_file_id)
).scalar_one_or_none()
if cad_file is None:
return AutoPopulateMaterialsResult(cad_file_id=str(cad_file_id))
parsed_objects = cad_file.parsed_objects or {}
cad_parts: list[str] = parsed_objects.get("objects", [])
if not cad_parts:
return AutoPopulateMaterialsResult(
cad_file_id=str(cad_file_id),
cad_parts=[],
)
products = session.execute(
select(Product).where(
Product.cad_file_id == cad_file.id,
Product.is_active.is_(True),
)
).scalars().all()
updated_product_ids: list[str] = []
final_part_colors: dict[str, str] | None = None
for product in products:
excel_components: list[dict[str, Any]] = product.components or []
if not excel_components:
continue
existing = product.cad_part_materials or []
if existing and any((entry.get("material") or "").strip() for entry in existing):
continue
new_materials = build_materials_from_excel(cad_parts, excel_components)
session.execute(
sql_update(Product)
.where(Product.id == product.id)
.values(cad_part_materials=new_materials)
)
session.flush()
updated_product_ids.append(str(product.id))
try:
final_part_colors = build_part_colors(cad_parts, new_materials)
except Exception:
logger.exception("Part colors build failed for product %s", product.id)
logger.info(
"Auto-populated %d materials for product %s from %d Excel components",
len(new_materials),
product.id,
len(excel_components),
)
session.commit()
queued_thumbnail_regeneration = False
if final_part_colors is not None:
if enqueue_thumbnail is None:
from app.domains.pipeline.tasks.render_thumbnail import regenerate_thumbnail
enqueue_thumbnail = lambda current_cad_file_id, part_colors: regenerate_thumbnail.delay( # noqa: E731
current_cad_file_id,
part_colors,
)
enqueue_thumbnail(str(cad_file_id), final_part_colors)
queued_thumbnail_regeneration = True
return AutoPopulateMaterialsResult(
cad_file_id=str(cad_file_id),
updated_product_ids=updated_product_ids,
queued_thumbnail_regeneration=queued_thumbnail_regeneration,
part_colors=final_part_colors,
cad_parts=cad_parts,
)
def resolve_render_position_context(
session: Session,
line: OrderLine,
*,
emit: EmitFn = None,
) -> RenderPositionContext:
"""Resolve per-line render position values with product/global fallback."""
if line.render_position_id:
render_position = session.get(ProductRenderPosition, line.render_position_id)
if render_position:
_emit(
emit,
str(line.id),
f"Render position: '{render_position.name}' "
f"({render_position.rotation_x}°, {render_position.rotation_y}°, {render_position.rotation_z}°)"
+ (
f" focal_length={render_position.focal_length_mm}mm"
if render_position.focal_length_mm
else ""
),
)
return RenderPositionContext(
rotation_x=render_position.rotation_x,
rotation_y=render_position.rotation_y,
rotation_z=render_position.rotation_z,
focal_length_mm=render_position.focal_length_mm,
sensor_width_mm=render_position.sensor_width_mm,
source_name=render_position.name,
source_kind="product",
)
if line.global_render_position_id:
global_position = session.get(GlobalRenderPosition, line.global_render_position_id)
if global_position:
_emit(
emit,
str(line.id),
f"Global render position: '{global_position.name}' "
f"({global_position.rotation_x}°, {global_position.rotation_y}°, {global_position.rotation_z}°)"
+ (
f" focal_length={global_position.focal_length_mm}mm"
if global_position.focal_length_mm
else ""
),
)
return RenderPositionContext(
rotation_x=global_position.rotation_x,
rotation_y=global_position.rotation_y,
rotation_z=global_position.rotation_z,
focal_length_mm=global_position.focal_length_mm,
sensor_width_mm=global_position.sensor_width_mm,
source_name=global_position.name,
source_kind="global",
)
return RenderPositionContext()