diff --git a/backend/app/domains/pipeline/__init__.py b/backend/app/domains/pipeline/__init__.py new file mode 100644 index 0000000..709bb77 --- /dev/null +++ b/backend/app/domains/pipeline/__init__.py @@ -0,0 +1 @@ +"""Pipeline domain — STEP processing and render orchestration tasks.""" diff --git a/backend/app/domains/pipeline/tasks/__init__.py b/backend/app/domains/pipeline/tasks/__init__.py new file mode 100644 index 0000000..98df041 --- /dev/null +++ b/backend/app/domains/pipeline/tasks/__init__.py @@ -0,0 +1 @@ +"""Pipeline Celery tasks — split by concern.""" diff --git a/backend/app/domains/pipeline/tasks/export_glb.py b/backend/app/domains/pipeline/tasks/export_glb.py new file mode 100644 index 0000000..bb4539a --- /dev/null +++ b/backend/app/domains/pipeline/tasks/export_glb.py @@ -0,0 +1,364 @@ +"""GLB/GLTF export tasks. + +Covers: +- generate_gltf_geometry_task — OCC STEP → geometry GLB (fast preview) +- generate_gltf_production_task — OCC STEP → production GLB (Blender PBR materials) +""" +import logging + +from app.tasks.celery_app import celery_app +from app.core.task_logs import log_task_event +from app.core.pipeline_logger import PipelineLogger + +logger = logging.getLogger(__name__) + + +@celery_app.task(bind=True, name="app.tasks.step_tasks.generate_gltf_geometry_task", queue="thumbnail_rendering", max_retries=1) +def generate_gltf_geometry_task(self, cad_file_id: str): + """Export a geometry GLB directly from STEP via OCC (no STL intermediary). + + Pipeline: + 1. Reads STEP file directly (no STL needed) + 2. Builds color_map from product.cad_part_materials (hex colors) + 3. Runs export_step_to_gltf.py (Python/OCP): STEP → GLB with per-part colors + 4. Stores result as gltf_geometry MediaAsset (replaces any existing one) + + Output is in meters, Y-up (glTF convention). + """ + import json as _json + import os as _os + import subprocess as _subprocess + import sys as _sys + from pathlib import Path as _Path + from sqlalchemy import create_engine, select as _select + from sqlalchemy.orm import Session + from app.config import settings as app_settings + from app.models.cad_file import CadFile + + from app.models.system_setting import SystemSetting as _SysSetting + + pl = PipelineLogger(task_id=self.request.id) + pl.step_start("export_glb_geometry", {"cad_file_id": cad_file_id}) + + sync_url = app_settings.database_url.replace("+asyncpg", "") + eng = create_engine(sync_url) + with Session(eng) as session: + cad_file = session.get(CadFile, cad_file_id) + if not cad_file or not cad_file.stored_path: + logger.error("generate_gltf_geometry_task: no stored_path for %s", cad_file_id) + return + step_path_str = cad_file.stored_path + + # Build hex color_map from product.cad_part_materials + from app.domains.products.models import Product + product = session.execute( + _select(Product).where(Product.cad_file_id == cad_file.id) + ).scalar_one_or_none() + + color_map: dict[str, str] = {} + product_id = str(product.id) if product else None + if product and product.cad_part_materials: + for entry in product.cad_part_materials: + part_name = entry.get("part_name") or entry.get("name", "") + hex_color = entry.get("hex_color") or entry.get("color", "") + if part_name and hex_color: + color_map[part_name] = hex_color + + settings_rows = session.execute(_select(_SysSetting)).scalars().all() + sys_settings = {s.key: s.value for s in settings_rows} + eng.dispose() + + linear_deflection = float(sys_settings.get("gltf_preview_linear_deflection", "0.1")) + angular_deflection = float(sys_settings.get("gltf_preview_angular_deflection", "0.5")) + + step = _Path(step_path_str) + + if not step.exists(): + log_task_event(self.request.id, f"Failed: STEP file not found: {step}", "error") + raise RuntimeError(f"STEP file not found: {step}") + + output_path = step.parent / f"{step.stem}_geometry.glb" + + log_task_event( + self.request.id, + f"Starting OCC GLB export: {len(color_map)} part colors", + "info", + ) + + # Run export_step_to_gltf.py as a subprocess so OCP imports don't pollute worker state + scripts_dir = _Path(_os.environ.get("RENDER_SCRIPTS_DIR", "/render-scripts")) + script_path = scripts_dir / "export_step_to_gltf.py" + + python_bin = _sys.executable + cmd = [ + python_bin, str(script_path), + "--step_path", str(step), + "--output_path", str(output_path), + "--color_map", _json.dumps(color_map), + "--linear_deflection", str(linear_deflection), + "--angular_deflection", str(angular_deflection), + ] + log_task_event( + self.request.id, + f"OCC tessellation: linear={linear_deflection}mm, angular={angular_deflection}rad", + "info", + ) + + try: + result = _subprocess.run(cmd, capture_output=True, text=True, timeout=120) + for line in result.stdout.splitlines(): + logger.info("[occ-gltf] %s", line) + for line in result.stderr.splitlines(): + logger.warning("[occ-gltf stderr] %s", line) + + if result.returncode != 0 or not output_path.exists() or output_path.stat().st_size == 0: + raise RuntimeError( + f"export_step_to_gltf.py failed (exit {result.returncode}).\n" + f"STDERR: {result.stderr[-1000:]}" + ) + except Exception as exc: + log_task_event(self.request.id, f"Failed: {exc}", "error") + pl.step_error("export_glb_geometry", str(exc), exc) + logger.error("generate_gltf_geometry_task OCC export failed: %s", exc) + raise self.retry(exc=exc, countdown=15) + + log_task_event(self.request.id, f"OCC GLB export completed: {output_path.name}", "done") + + # --- Store MediaAsset (replace existing gltf_geometry for this cad_file) --- + import uuid as _uuid + from sqlalchemy import create_engine as _ce, delete as _del + from sqlalchemy.orm import Session as _Session + from app.domains.media.models import MediaAsset, MediaAssetType + + _sync_url = app_settings.database_url.replace("+asyncpg", "") + _eng2 = _ce(_sync_url) + with _Session(_eng2) as _sess: + _sess.execute( + _del(MediaAsset).where( + MediaAsset.cad_file_id == _uuid.UUID(cad_file_id), + MediaAsset.asset_type == MediaAssetType.gltf_geometry, + ) + ) + _key = str(output_path) + _prefix = str(app_settings.upload_dir).rstrip("/") + "/" + if _key.startswith(_prefix): + _key = _key[len(_prefix):] + asset = MediaAsset( + cad_file_id=_uuid.UUID(cad_file_id), + product_id=_uuid.UUID(product_id) if product_id else None, + asset_type=MediaAssetType.gltf_geometry, + storage_key=_key, + mime_type="model/gltf-binary", + file_size_bytes=output_path.stat().st_size if output_path.exists() else None, + ) + _sess.add(asset) + _sess.commit() + asset_id = str(asset.id) + _eng2.dispose() + + pl.step_done("export_glb_geometry", result={"glb_path": str(output_path), "asset_id": asset_id}) + logger.info("generate_gltf_geometry_task: MediaAsset %s created for cad %s", asset_id, cad_file_id) + return {"glb_path": str(output_path), "asset_id": asset_id} + + +@celery_app.task( + bind=True, + name="app.tasks.step_tasks.generate_gltf_production_task", + queue="thumbnail_rendering", + max_retries=2, +) +def generate_gltf_production_task(self, cad_file_id: str, product_id: str | None = None) -> dict: + """Generate a production GLB (Blender + PBR materials) from a geometry GLB via export_gltf.py. + + 1. Ensures a gltf_geometry MediaAsset exists (runs OCC export inline if not). + 2. Resolves SCHAEFFLER material map for the CadFile's product. + 3. Runs Blender headless with export_gltf.py → production GLB. + 4. Stores result as gltf_production MediaAsset. + """ + import json as _json + import os as _os + import subprocess as _subprocess + import sys as _sys + import uuid as _uuid + from pathlib import Path as _Path + + from sqlalchemy import create_engine as _ce, delete as _del, select as _sel + from sqlalchemy.orm import Session as _Session + + from app.config import settings as app_settings + from app.domains.media.models import MediaAsset, MediaAssetType + from app.services.render_blender import find_blender, is_blender_available + + pl = PipelineLogger(task_id=self.request.id) + pl.step_start("export_glb_production", {"cad_file_id": cad_file_id}) + log_task_event(self.request.id, f"generate_gltf_production_task started for cad {cad_file_id}", "info") + + _sync_url = app_settings.database_url.replace("+asyncpg", "") + _eng = _ce(_sync_url) + + # --- 1. Resolve STEP file path and system settings --- + from app.models.cad_file import CadFile as _CF + from app.models.system_setting import SystemSetting + + with _Session(_eng) as _sess: + _cad = _sess.execute( + _sel(_CF).where(_CF.id == _uuid.UUID(cad_file_id)) + ).scalar_one_or_none() + step_path_str = _cad.stored_path if _cad else None + + settings_rows = _sess.execute(_sel(SystemSetting)).scalars().all() + sys_settings = {s.key: s.value for s in settings_rows} + + if not step_path_str: + raise RuntimeError(f"CadFile {cad_file_id} not found in DB") + step_path = _Path(step_path_str) + if not step_path.exists(): + raise RuntimeError(f"STEP file not found: {step_path}") + + smooth_angle = float(sys_settings.get("blender_smooth_angle", "30")) + prod_linear = float(sys_settings.get("gltf_production_linear_deflection", "0.03")) + prod_angular = float(sys_settings.get("gltf_production_angular_deflection", "0.2")) + + scripts_dir = _Path(_os.environ.get("RENDER_SCRIPTS_DIR", "/render-scripts")) + occ_script = scripts_dir / "export_step_to_gltf.py" + if not occ_script.exists(): + raise RuntimeError(f"export_step_to_gltf.py not found at {occ_script}") + + prod_geom_glb = step_path.parent / f"{step_path.stem}_production_geom.glb" + python_bin = _sys.executable + occ_cmd = [ + python_bin, str(occ_script), + "--step_path", str(step_path), + "--output_path", str(prod_geom_glb), + "--linear_deflection", str(prod_linear), + "--angular_deflection", str(prod_angular), + ] + log_task_event( + self.request.id, + f"Re-exporting STEP at production quality (linear={prod_linear}mm, angular={prod_angular}rad)", + "info", + ) + try: + occ_result = _subprocess.run(occ_cmd, capture_output=True, text=True, timeout=180) + for line in occ_result.stdout.splitlines(): + logger.info("[occ-prod] %s", line) + if occ_result.returncode != 0 or not prod_geom_glb.exists() or prod_geom_glb.stat().st_size == 0: + raise RuntimeError( + f"OCC export failed (exit {occ_result.returncode}): {occ_result.stderr[-500:]}" + ) + except Exception as exc: + log_task_event(self.request.id, f"OCC re-export failed: {exc}", "error") + pl.step_error("export_glb_production", f"OCC re-export failed: {exc}", exc) + raise self.retry(exc=exc, countdown=30) + + geom_glb_path = prod_geom_glb + + # --- 2. Resolve material map from Product.cad_part_materials (SCHAEFFLER library names) --- + # cad_part_materials lives on Product (list[dict]), NOT on CadFile. + # We look up the Product that owns this CadFile (prefer product_id arg if given). + from app.services.material_service import resolve_material_map + from app.domains.products.models import Product as _Product + + with _Session(_eng) as _sess: + _prod_query = _sel(_Product).where(_Product.cad_file_id == _uuid.UUID(cad_file_id)) + if product_id: + _prod_query = _prod_query.where(_Product.id == _uuid.UUID(product_id)) + _product = _sess.execute(_prod_query).scalars().first() + raw_materials: list[dict] = _product.cad_part_materials if _product else [] + + # Convert list[{"part_name": X, "material": Y}] → dict[str, str] for resolve_material_map + raw_mat_map: dict[str, str] = { + m["part_name"]: m["material"] + for m in raw_materials + if m.get("part_name") and m.get("material") + } + mat_map = resolve_material_map(raw_mat_map) + logger.info( + "generate_gltf_production_task: resolved %d material(s) for cad %s (product: %s)", + len(mat_map), cad_file_id, _product.id if _product else "none", + ) + + # --- 3. Run Blender: apply materials + smooth shading + export production GLB --- + # Use get_material_library_path() which checks active AssetLibrary first, + # then falls back to the legacy material_library_path system setting. + from app.services.template_service import get_material_library_path + asset_library_blend = get_material_library_path() or "" + _eng.dispose() + + output_path = step_path.parent / f"{step_path.stem}_production.glb" + + export_script = scripts_dir / "export_gltf.py" + if not is_blender_available(): + raise RuntimeError("Blender is not available — cannot generate production GLB") + if not export_script.exists(): + raise RuntimeError(f"export_gltf.py not found at {export_script}") + + blender_bin = find_blender() + cmd = [ + blender_bin, "--background", + "--python", str(export_script), + "--", + "--glb_path", str(geom_glb_path), + "--output_path", str(output_path), + "--material_map", _json.dumps(mat_map), + "--smooth_angle", str(smooth_angle), + ] + if asset_library_blend: + cmd += ["--asset_library_blend", asset_library_blend] + + log_task_event( + self.request.id, + f"Running Blender export_gltf.py — {len(mat_map)} material(s), smooth={smooth_angle}°", + "info", + ) + try: + result = _subprocess.run(cmd, capture_output=True, text=True, timeout=300) + for line in result.stdout.splitlines(): + logger.info("[export-gltf] %s", line) + if result.returncode != 0: + raise RuntimeError( + f"export_gltf.py exited {result.returncode}:\n{result.stderr[-500:]}" + ) + except Exception as exc: + log_task_event(self.request.id, f"Blender production GLB failed: {exc}", "error") + pl.step_error("export_glb_production", f"Blender production GLB failed: {exc}", exc) + logger.error("generate_gltf_production_task Blender failed for cad %s: %s", cad_file_id, exc) + raise self.retry(exc=exc, countdown=30) + finally: + # Clean up the high-quality temp geometry GLB (not needed after Blender export) + try: + prod_geom_glb.unlink(missing_ok=True) + except Exception: + pass + + log_task_event(self.request.id, f"Production GLB exported: {output_path.name}", "done") + + # --- 4. Store MediaAsset (replace existing gltf_production for this cad_file) --- + _eng2 = _ce(_sync_url) + with _Session(_eng2) as _sess: + _sess.execute( + _del(MediaAsset).where( + MediaAsset.cad_file_id == _uuid.UUID(cad_file_id), + MediaAsset.asset_type == MediaAssetType.gltf_production, + ) + ) + _key = str(output_path) + _prefix = str(app_settings.upload_dir).rstrip("/") + "/" + if _key.startswith(_prefix): + _key = _key[len(_prefix):] + asset = MediaAsset( + cad_file_id=_uuid.UUID(cad_file_id), + product_id=_uuid.UUID(product_id) if product_id else None, + asset_type=MediaAssetType.gltf_production, + storage_key=_key, + mime_type="model/gltf-binary", + file_size_bytes=output_path.stat().st_size if output_path.exists() else None, + ) + _sess.add(asset) + _sess.commit() + asset_id = str(asset.id) + _eng2.dispose() + + pl.step_done("export_glb_production", result={"glb_path": str(output_path), "asset_id": asset_id}) + logger.info("generate_gltf_production_task: MediaAsset %s created for cad %s", asset_id, cad_file_id) + return {"glb_path": str(output_path), "asset_id": asset_id} diff --git a/backend/app/domains/pipeline/tasks/extract_metadata.py b/backend/app/domains/pipeline/tasks/extract_metadata.py new file mode 100644 index 0000000..39a0297 --- /dev/null +++ b/backend/app/domains/pipeline/tasks/extract_metadata.py @@ -0,0 +1,250 @@ +"""STEP metadata extraction tasks. + +Covers: +- process_step_file — extract OCC objects + queue thumbnail +- reextract_cad_metadata — re-compute bounding-box for completed files +- _auto_populate_materials_for_cad — helper: fill cad_part_materials from Excel +- _bbox_from_glb / _bbox_from_step_cadquery — bbox helpers +""" +import logging +from pathlib import Path + +from app.tasks.celery_app import celery_app +from app.core.task_logs import log_task_event +from app.core.pipeline_logger import PipelineLogger + +logger = logging.getLogger(__name__) + + +def _bbox_from_glb(glb_path: str) -> dict | None: + """Extract bounding box from a GLB file (meters → converted to mm). + + Returns {"dimensions_mm": {x,y,z}, "bbox_center_mm": {x,y,z}} or None on failure. + OCC GLB output is in meters; multiply by 1000 to get mm. + """ + try: + import trimesh + p = Path(glb_path) + if not p.exists(): + return None + scene = trimesh.load(str(p), force="scene") + bounds = getattr(scene, "bounds", None) + if bounds is None: + return None + mins, maxs = bounds + dims = maxs - mins + return { + "dimensions_mm": { + "x": round(float(dims[0]) * 1000, 2), + "y": round(float(dims[1]) * 1000, 2), + "z": round(float(dims[2]) * 1000, 2), + }, + "bbox_center_mm": { + "x": round(float((mins[0] + maxs[0]) / 2) * 1000, 2), + "y": round(float((mins[1] + maxs[1]) / 2) * 1000, 2), + "z": round(float((mins[2] + maxs[2]) / 2) * 1000, 2), + }, + } + except Exception as exc: + logger.debug(f"_bbox_from_glb failed for {glb_path}: {exc}") + return None + + +def _bbox_from_step_cadquery(step_path: str) -> dict | None: + """Fallback: extract bounding box by re-parsing STEP via cadquery.""" + try: + import cadquery as cq + bb = cq.importers.importStep(step_path).val().BoundingBox() + return { + "dimensions_mm": { + "x": round(bb.xlen, 2), + "y": round(bb.ylen, 2), + "z": round(bb.zlen, 2), + }, + "bbox_center_mm": { + "x": round((bb.xmin + bb.xmax) / 2, 2), + "y": round((bb.ymin + bb.ymax) / 2, 2), + "z": round((bb.zmin + bb.zmax) / 2, 2), + }, + } + except Exception as exc: + logger.debug(f"_bbox_from_step_cadquery failed for {step_path}: {exc}") + return None + + +@celery_app.task(bind=True, name="app.tasks.step_tasks.process_step_file", queue="step_processing") +def process_step_file(self, cad_file_id: str): + """Process a STEP file: extract objects, generate thumbnail, convert to glTF. + + After processing completes, auto-populate cad_part_materials from Excel + component data for any linked products that don't yet have materials assigned. + + A per-file Redis lock (TTL = 10 min) prevents duplicate tasks from processing + the same file concurrently — e.g. when 'Process Unprocessed' is clicked while + a file is already being processed. + """ + import redis as redis_lib + from app.config import settings as app_settings + + pl = PipelineLogger(task_id=self.request.id) + pl.step_start("process_step_file", {"cad_file_id": cad_file_id}) + + lock_key = f"step_processing_lock:{cad_file_id}" + r = redis_lib.from_url(app_settings.redis_url) + acquired = r.set(lock_key, "1", nx=True, ex=600) # 10-minute TTL + if not acquired: + logger.warning(f"STEP file {cad_file_id} is already being processed — skipping duplicate task") + return + + try: + pl.info("process_step_file", f"Processing STEP file (metadata only): {cad_file_id}") + try: + from app.services.step_processor import extract_cad_metadata + extract_cad_metadata(cad_file_id) + except Exception as exc: + pl.step_error("process_step_file", f"STEP metadata extraction failed: {exc}", exc) + r.delete(lock_key) # release lock so a retry can proceed + raise self.retry(exc=exc, countdown=60, max_retries=3) + finally: + r.delete(lock_key) # always release on completion or unhandled error + + pl.step_done("process_step_file") + + # Queue thumbnail rendering on the dedicated single-concurrency worker + from app.domains.pipeline.tasks.render_thumbnail import render_step_thumbnail + render_step_thumbnail.delay(cad_file_id) + + +def _auto_populate_materials_for_cad(cad_file_id: str) -> None: + """Sync helper: auto-populate cad_part_materials from Excel for newly-processed CAD files. + + Only fills products where cad_part_materials is empty or all-blank, + preventing overwrites of manually assigned materials. + """ + from sqlalchemy import create_engine, select as sql_select, update as sql_update + from sqlalchemy.orm import Session + from app.config import settings as app_settings + from app.models.cad_file import CadFile + from app.models.product import Product + from app.api.routers.products import build_materials_from_excel + from app.services.step_processor import build_part_colors + + sync_url = app_settings.database_url.replace("+asyncpg", "") + eng = create_engine(sync_url) + with Session(eng) as session: + # Load the CAD file to get parsed objects + cad_file = session.execute( + sql_select(CadFile).where(CadFile.id == cad_file_id) + ).scalar_one_or_none() + if cad_file is None: + return + + parsed_objects = cad_file.parsed_objects or {} + cad_parts: list[str] = parsed_objects.get("objects", []) + if not cad_parts: + return + + # Find products linked to this CAD file that have Excel components + products = session.execute( + sql_select(Product).where( + Product.cad_file_id == cad_file.id, + Product.is_active.is_(True), + ) + ).scalars().all() + + final_part_colors = None + for product in products: + excel_components: list[dict] = product.components or [] + if not excel_components: + continue + + # Only auto-fill when cad_part_materials is empty or all-blank + existing = product.cad_part_materials or [] + if existing and any(m.get("material", "").strip() for m in existing): + continue # has at least one real material — don't overwrite + + new_materials = build_materials_from_excel(cad_parts, excel_components) + session.execute( + sql_update(Product) + .where(Product.id == product.id) + .values(cad_part_materials=new_materials) + ) + session.flush() + + # Compute part colors; thumbnail queued once after the loop + try: + final_part_colors = build_part_colors(cad_parts, new_materials) + except Exception: + logger.exception(f"Part colors build failed for product {product.id}") + + logger.info( + f"Auto-populated {len(new_materials)} materials for product {product.id} " + f"from {len(excel_components)} Excel components" + ) + + session.commit() + + # Queue exactly ONE thumbnail regeneration per CAD file regardless of how many + # products were auto-populated. Queuing once-per-product multiplies the task + # count needlessly and causes the Redis queue depth to grow instead of shrink. + if final_part_colors is not None: + try: + from app.domains.pipeline.tasks.render_thumbnail import regenerate_thumbnail + regenerate_thumbnail.delay(str(cad_file_id), final_part_colors) + except Exception: + logger.exception(f"Thumbnail regen queue failed for cad_file {cad_file_id}") + + eng.dispose() + + +@celery_app.task(name="app.tasks.step_tasks.reextract_cad_metadata", queue="thumbnail_rendering") +def reextract_cad_metadata(cad_file_id: str): + """Re-extract bounding-box dimensions for an already-completed CAD file. + + Uses cadquery (available in render-worker) to compute dimensions_mm. + Updates mesh_attributes without changing processing_status or re-rendering. + Safe to run on completed files. + """ + from sqlalchemy import create_engine + from sqlalchemy.orm import Session + from app.config import settings as app_settings + from app.models.cad_file import CadFile + + pl = PipelineLogger(task_id=None) + pl.step_start("reextract_cad_metadata", {"cad_file_id": cad_file_id}) + + sync_url = app_settings.database_url.replace("+asyncpg", "") + eng = create_engine(sync_url) + with Session(eng) as session: + cad_file = session.get(CadFile, cad_file_id) + if not cad_file or not cad_file.stored_path: + logger.warning(f"reextract_cad_metadata: file not found {cad_file_id}") + eng.dispose() + return + step_path = cad_file.stored_path + + try: + p = Path(step_path) + glb_path = p.parent / f"{p.stem}_thumbnail.glb" + patch = _bbox_from_glb(str(glb_path)) or _bbox_from_step_cadquery(step_path) + if patch: + with Session(eng) as session: + cad_file = session.get(CadFile, cad_file_id) + if cad_file: + cad_file.mesh_attributes = {**(cad_file.mesh_attributes or {}), **patch} + session.commit() + dims = patch["dimensions_mm"] + pl.step_done("reextract_cad_metadata", result={ + "dimensions_mm": f"{dims['x']}×{dims['y']}×{dims['z']} mm" + }) + logger.info( + f"reextract_cad_metadata: {cad_file_id} → " + f"{dims['x']}×{dims['y']}×{dims['z']} mm" + ) + else: + logger.warning(f"reextract_cad_metadata: no bbox data for {cad_file_id}") + except Exception as exc: + pl.step_error("reextract_cad_metadata", str(exc), exc) + logger.error(f"reextract_cad_metadata failed for {cad_file_id}: {exc}") + finally: + eng.dispose() diff --git a/backend/app/domains/pipeline/tasks/render_order_line.py b/backend/app/domains/pipeline/tasks/render_order_line.py new file mode 100644 index 0000000..68cad50 --- /dev/null +++ b/backend/app/domains/pipeline/tasks/render_order_line.py @@ -0,0 +1,503 @@ +"""Order-line render tasks. + +Covers: +- dispatch_order_line_render — thin router that queues render_order_line_task +- render_order_line_task — full still/turntable render pipeline for one order line +""" +import logging + +from app.tasks.celery_app import celery_app +from app.core.task_logs import log_task_event +from app.core.pipeline_logger import PipelineLogger + +logger = logging.getLogger(__name__) + + +@celery_app.task(name="app.tasks.step_tasks.dispatch_order_line_render", queue="step_processing") +def dispatch_order_line_render(order_line_id: str): + """Route an order-line render to render_order_line_task.""" + logger.info(f"Dispatching render for order line: {order_line_id}") + render_order_line_task.delay(order_line_id) + + +@celery_app.task(bind=True, name="app.tasks.step_tasks.render_order_line_task", queue="thumbnail_rendering", max_retries=3) +def render_order_line_task(self, order_line_id: str): + """Render a specific output type for an order line. + + Loads OrderLine → Product → CadFile → OutputType.render_settings. + Merges with system render settings. Stores result at order_line.result_path. + """ + pl = PipelineLogger(task_id=self.request.id, order_line_id=order_line_id) + pl.step_start("render_order_line_task", {"order_line_id": order_line_id}) + logger.info(f"Rendering order line: {order_line_id}") + from app.services.render_log import emit + + emit(order_line_id, "Celery render task started") + try: + from sqlalchemy import create_engine, select, update as sql_update + from sqlalchemy.orm import Session, joinedload + from app.config import settings as app_settings + + # Use sync session for Celery (no async event loop) + sync_url = app_settings.database_url.replace("+asyncpg", "") + engine = create_engine(sync_url) + + with Session(engine) as session: + from app.models.order_line import OrderLine + from app.models.product import Product + + emit(order_line_id, "Loading order line from database") + line = session.execute( + select(OrderLine) + .where(OrderLine.id == order_line_id) + .options( + joinedload(OrderLine.product).joinedload(Product.cad_file), + joinedload(OrderLine.output_type), + ) + ).scalar_one_or_none() + + if line is None: + emit(order_line_id, "Order line not found in database", "error") + logger.error(f"OrderLine {order_line_id} not found") + return + + if line.product.cad_file_id is None: + emit(order_line_id, "Product has no CAD file — marking as failed", "error") + logger.warning(f"OrderLine {order_line_id}: product has no CAD file") + session.execute( + sql_update(OrderLine) + .where(OrderLine.id == line.id) + .values(render_status="failed") + ) + session.commit() + return + + # Mark as processing with timing + from datetime import datetime + render_start = datetime.utcnow() + session.execute( + sql_update(OrderLine) + .where(OrderLine.id == line.id) + .values( + render_status="processing", + render_backend_used="celery", + render_started_at=render_start, + ) + ) + session.commit() + + cad_file = line.product.cad_file + materials_source = line.product.cad_part_materials + + part_colors = {} + if cad_file and cad_file.parsed_objects: + parsed_names = cad_file.parsed_objects.get("objects", []) + if materials_source: + from app.services.step_processor import build_part_colors + part_colors = build_part_colors(parsed_names, materials_source) + + # Resolve render template + material library + from app.services.template_service import resolve_template, get_material_library_path + + category_key = line.product.category_key if line.product else None + ot_id = str(line.output_type_id) if line.output_type_id else None + template = resolve_template(category_key=category_key, output_type_id=ot_id) + material_library = get_material_library_path() + + # Build material_map (part_name → material_name) for material replacement. + # Works with or without a render template — only suppressed if a + # template explicitly has material_replace_enabled=False. + material_map = None + use_materials = bool(material_library and materials_source) + if template and not template.material_replace_enabled: + use_materials = False + if use_materials: + material_map = { + m["part_name"]: m["material"] + for m in materials_source + if m.get("part_name") and m.get("material") + } + + # Resolve raw material names to SCHAEFFLER library names via aliases + from app.services.material_service import resolve_material_map + material_map = resolve_material_map(material_map) + + if template: + emit(order_line_id, f"Using render template: {template.name} (collection={template.target_collection}, material_replace={template.material_replace_enabled}, lighting_only={template.lighting_only})") + logger.info(f"Render template resolved: '{template.name}' path={template.blend_file_path}, lighting_only={template.lighting_only}") + else: + emit(order_line_id, "No render template found — using factory settings (Mode A)") + logger.info(f"No render template for category_key={category_key!r}, output_type_id={ot_id!r}") + + cad_name = cad_file.original_name if cad_file else "?" + # Load render_position for rotation values + rotation_x = rotation_y = rotation_z = 0.0 + if line.render_position_id: + from app.models.render_position import ProductRenderPosition + rp = session.get(ProductRenderPosition, line.render_position_id) + if rp: + rotation_x, rotation_y, rotation_z = rp.rotation_x, rp.rotation_y, rp.rotation_z + emit(order_line_id, f"Render position: '{rp.name}' ({rotation_x}°, {rotation_y}°, {rotation_z}°)") + + emit(order_line_id, f"Starting render for {cad_name} ({len(part_colors)} coloured parts)") + + # Determine if this is an animation output type + is_animation = bool(line.output_type and getattr(line.output_type, 'is_animation', False)) + + # Determine output format/extension + out_ext = "jpg" + if line.output_type and line.output_type.output_format: + fmt = line.output_type.output_format.lower() + if fmt == "mp4": + out_ext = "mp4" + elif fmt in ("png", "jpg", "jpeg"): + out_ext = "png" if fmt == "png" else "jpg" + + # Build meaningful output filename + import re + def _sanitize(s: str) -> str: + return re.sub(r'[^\w\-.]', '_', s.strip())[:100] + + product_name = line.product.name or line.product.pim_id or "product" + ot_name = line.output_type.name if line.output_type else "render" + filename = f"{_sanitize(product_name)}_{_sanitize(ot_name)}.{out_ext}" + + # Render to per-line output directory + from pathlib import Path as _Path + render_dir = _Path(app_settings.upload_dir) / "renders" / order_line_id + render_dir.mkdir(parents=True, exist_ok=True) + output_path = str(render_dir / filename) + + # Extract per-output-type render settings + render_width = None + render_height = None + render_engine = None + render_samples = None + noise_threshold = "" + denoiser = "" + denoising_input_passes = "" + denoising_prefilter = "" + denoising_quality = "" + denoising_use_gpu = "" + frame_count = 24 + fps = 25 + bg_color = "" + turntable_axis = "world_z" + if line.output_type and line.output_type.render_settings: + rs = line.output_type.render_settings + if rs.get("width"): + render_width = int(rs["width"]) + if rs.get("height"): + render_height = int(rs["height"]) + if rs.get("engine"): + render_engine = rs["engine"] + if rs.get("samples"): + render_samples = int(rs["samples"]) + if rs.get("frame_count"): + frame_count = int(rs["frame_count"]) + if rs.get("fps"): + fps = int(rs["fps"]) + bg_color = rs.get("bg_color", "") + turntable_axis = rs.get("turntable_axis", "world_z") + noise_threshold = str(rs.get("noise_threshold", "")) + denoiser = str(rs.get("denoiser", "")) + denoising_input_passes = str(rs.get("denoising_input_passes", "")) + denoising_prefilter = str(rs.get("denoising_prefilter", "")) + denoising_quality = str(rs.get("denoising_quality", "")) + denoising_use_gpu = str(rs.get("denoising_use_gpu", "")) + + transparent_bg = bool(line.output_type and line.output_type.transparent_bg) + cycles_device_val = (line.output_type.cycles_device or "auto") if line.output_type else "auto" + + # Build ordered part names list for index-based Blender matching + part_names_ordered = None + if cad_file and cad_file.parsed_objects: + part_names_ordered = cad_file.parsed_objects.get("objects", []) or None + + tmpl_info = f" template={template.name}" if template else "" + + if is_animation: + # ── Turntable animation path ──────────────────────────────── + emit(order_line_id, f"Starting turntable render: {frame_count} frames @ {fps}fps, {render_width or 1920}x{render_height or 1920}{tmpl_info}") + pl.step_start("blender_turntable", {"frame_count": frame_count, "fps": fps}) + from app.services.render_blender import is_blender_available, render_turntable_to_file + if not is_blender_available(): + raise RuntimeError("Blender not available on this worker") + + from app.services.step_processor import _get_all_settings + _sys = _get_all_settings() + try: + service_data = render_turntable_to_file( + step_path=_Path(cad_file.stored_path), + output_path=_Path(output_path), + frame_count=frame_count, + fps=fps, + width=render_width or 1920, + height=render_height or 1920, + engine=render_engine or _sys.get("blender_engine", "cycles"), + samples=render_samples or int(_sys.get(f"blender_{render_engine or _sys.get('blender_engine','cycles')}_samples", 128)), + stl_quality=_sys.get("stl_quality", "low"), + smooth_angle=int(_sys.get("blender_smooth_angle", 30)), + cycles_device=cycles_device_val, + transparent_bg=transparent_bg, + bg_color=bg_color, + turntable_axis=turntable_axis, + part_colors=part_colors or None, + template_path=template.blend_file_path if template else None, + target_collection=template.target_collection if template else "Product", + material_library_path=material_library if use_materials else None, + material_map=material_map, + part_names_ordered=part_names_ordered, + lighting_only=bool(template.lighting_only) if template else False, + shadow_catcher=bool(template.shadow_catcher_enabled) if template else False, + rotation_x=rotation_x, + rotation_y=rotation_y, + rotation_z=rotation_z, + ) + success = True + render_log = { + "renderer": "blender", + "type": "turntable", + "format": "mp4", + "engine": render_engine or _sys.get("blender_engine", "cycles"), + "engine_used": service_data.get("engine_used", "cycles"), + "samples": render_samples, + "cycles_device": cycles_device_val, + "width": render_width or 1920, + "height": render_height or 1920, + "frame_count": service_data.get("frame_count", frame_count), + "fps": fps, + "total_duration_s": service_data.get("total_duration_s"), + "stl_duration_s": service_data.get("stl_duration_s"), + "render_duration_s": service_data.get("render_duration_s"), + "ffmpeg_duration_s": service_data.get("ffmpeg_duration_s"), + "stl_size_bytes": service_data.get("stl_size_bytes"), + "output_size_bytes": service_data.get("output_size_bytes"), + "log_lines": service_data.get("log_lines", []), + } + if template: + render_log["template"] = template.blend_file_path + pl.step_done("blender_turntable") + except Exception as exc: + success = False + render_log = {"renderer": "blender", "type": "turntable", "error": str(exc)[:500]} + pl.step_error("blender_turntable", str(exc), exc) + logger.error("Turntable render failed for %s: %s", order_line_id, exc) + else: + # ── Still image path ──────────────────────────────────────── + emit(order_line_id, f"Calling renderer (STEP → GLB → Blender) {render_width or 'default'}x{render_height or 'default'}{' [transparent]' if transparent_bg else ''}{f' engine={render_engine}' if render_engine else ''}{f' samples={render_samples}' if render_samples else ''}{tmpl_info}") + pl.step_start("blender_still", {"width": render_width, "height": render_height}) + from app.services.step_processor import render_to_file + + success, render_log = render_to_file( + step_path=cad_file.stored_path, + output_path=output_path, + part_colors=part_colors, + width=render_width, + height=render_height, + transparent_bg=transparent_bg, + engine=render_engine, + samples=render_samples, + template_path=template.blend_file_path if template else None, + target_collection=template.target_collection if template else "Product", + material_library_path=material_library if use_materials else None, + material_map=material_map, + part_names_ordered=part_names_ordered, + lighting_only=bool(template.lighting_only) if template else False, + shadow_catcher=bool(template.shadow_catcher_enabled) if template else False, + cycles_device=line.output_type.cycles_device if line.output_type else None, + rotation_x=rotation_x, + rotation_y=rotation_y, + rotation_z=rotation_z, + job_id=order_line_id, + order_line_id=order_line_id, + noise_threshold=noise_threshold, + denoiser=denoiser, + denoising_input_passes=denoising_input_passes, + denoising_prefilter=denoising_prefilter, + denoising_quality=denoising_quality, + denoising_use_gpu=denoising_use_gpu, + ) + if success: + pl.step_done("blender_still") + else: + pl.step_error("blender_still", "render_to_file returned False") + + new_status = "completed" if success else "failed" + render_end = datetime.utcnow() + elapsed = (render_end - render_start).total_seconds() + + update_values = dict( + render_status=new_status, + render_completed_at=render_end, + render_log=render_log, + ) + if success: + update_values["result_path"] = output_path + + session.execute( + sql_update(OrderLine) + .where(OrderLine.id == line.id) + .values(**update_values) + ) + session.commit() + + if success: + # Create MediaAsset so the render appears in the Media Browser + try: + from app.domains.media.models import MediaAsset, MediaAssetType as MAT + from app.config import settings as _cfg2 + _ext = str(output_path).rsplit(".", 1)[-1].lower() if "." in str(output_path) else "bin" + _mime = "video/mp4" if _ext in ("mp4", "webm") else ("image/jpeg" if _ext in ("jpg", "jpeg") else "image/png") + # Extension determines type — poster frames (.jpg/.png) from animations are still stills + _at = MAT.turntable if _ext in ("mp4", "webm") else MAT.still + _tenant_id = line.product.cad_file.tenant_id if (line.product and line.product.cad_file) else None + # Normalize storage_key to relative path + _raw_key = str(output_path) + _upload_prefix = str(_cfg2.upload_dir).rstrip("/") + "/" + _norm_key = _raw_key[len(_upload_prefix):] if _raw_key.startswith(_upload_prefix) else _raw_key + _existing = session.execute( + select(MediaAsset.id).where(MediaAsset.storage_key == _norm_key).limit(1) + ).scalar_one_or_none() + if not _existing: + _asset = MediaAsset( + tenant_id=_tenant_id, + order_line_id=line.id, + product_id=line.product_id, + asset_type=_at, + storage_key=_norm_key, + mime_type=_mime, + ) + session.add(_asset) + session.commit() + except Exception: + logger.exception("Failed to create MediaAsset for order_line %s", order_line_id) + + if success: + emit(order_line_id, f"Render completed in {elapsed:.1f}s", "success") + else: + emit(order_line_id, f"Render failed after {elapsed:.1f}s", "error") + + # Broadcast WebSocket event for live UI updates + try: + from app.core.websocket import publish_event_sync + _tenant_id = str(line.product.cad_file.tenant_id) if ( + line.product and line.product.cad_file and line.product.cad_file.tenant_id + ) else None + if _tenant_id: + publish_event_sync(_tenant_id, { + "type": "render_complete" if success else "render_failed", + "order_line_id": order_line_id, + "order_id": str(line.order_id), + "status": new_status, + }) + except Exception: + logger.debug("WebSocket publish skipped (non-fatal)") + + # Emit per-render activity event (channel=activity, not shown in bell dropdown) + try: + from app.models.order import Order as OrderModel + order_row = session.execute( + select(OrderModel.created_by, OrderModel.order_number) + .where(OrderModel.id == line.order_id) + ).one_or_none() + if order_row: + from app.services.notification_service import emit_notification_sync, CHANNEL_ACTIVITY + details: dict = { + "order_number": order_row[1], + "product_name": product_name, + "output_type": ot_name, + } + if not success and isinstance(render_log, dict): + err = render_log.get("error") or render_log.get("stderr", "") + if err: + details["error"] = str(err)[:300] + emit_notification_sync( + actor_user_id=None, + target_user_id=str(order_row[0]), + action="render.completed" if success else "render.failed", + entity_type="order", + entity_id=str(line.order_id), + details=details, + channel=CHANNEL_ACTIVITY, + ) + except Exception: + logger.exception("Failed to emit render activity event") + + # Check if all lines for this order are done → auto-advance + order_id_str = str(line.order_id) + + engine.dispose() + + from app.services.order_status_service import check_order_completion + check_order_completion(order_id_str) + + pl.step_done("render_order_line_task") + + except Exception as exc: + logger.error(f"render_order_line_task failed for {order_line_id}: {exc}") + # If retries exhausted, mark as failed so the line doesn't stay stuck + if self.request.retries >= self.max_retries: + logger.error(f"Max retries reached for {order_line_id}, marking as failed") + try: + from sqlalchemy import create_engine, update as sql_update2 + from sqlalchemy.orm import Session as SyncSession + from app.config import settings as app_settings + from app.models.order_line import OrderLine as OL2 + sync_url2 = app_settings.database_url.replace("+asyncpg", "") + eng2 = create_engine(sync_url2) + with SyncSession(eng2) as s2: + from datetime import datetime as dt2 + s2.execute( + sql_update2(OL2).where(OL2.id == order_line_id) + .values( + render_status="failed", + render_completed_at=dt2.utcnow(), + render_log={"error": str(exc)[:500]}, + ) + ) + s2.commit() + eng2.dispose() + from app.services.order_status_service import check_order_completion + # Try to get order_id from DB + eng3 = create_engine(sync_url2) + with SyncSession(eng3) as s3: + from sqlalchemy import select as sel + row = s3.execute(sel(OL2.order_id).where(OL2.id == order_line_id)).scalar_one_or_none() + if row: + check_order_completion(str(row)) + eng3.dispose() + # Notify the order creator about the failure + try: + from sqlalchemy import select as sel2 + from app.models.order import Order as OrderModel2 + eng4 = create_engine(sync_url2) + with SyncSession(eng4) as s4: + order_row2 = s4.execute( + sel2(OrderModel2.created_by, OrderModel2.order_number) + .join(OL2, OL2.order_id == OrderModel2.id) + .where(OL2.id == order_line_id) + ).one_or_none() + eng4.dispose() + if order_row2: + from app.services.notification_service import emit_notification_sync, CHANNEL_ACTIVITY + emit_notification_sync( + actor_user_id=None, + target_user_id=str(order_row2[0]), + action="render.failed", + entity_type="order", + entity_id=None, + details={ + "order_number": order_row2[1], + "product_name": "unknown", + "output_type": "unknown", + "error": str(exc)[:300], + }, + channel=CHANNEL_ACTIVITY, + ) + except Exception: + logger.exception("Failed to emit render failure activity event") + except Exception: + logger.exception(f"Failed to mark {order_line_id} as failed in DB") + raise + raise self.retry(exc=exc, countdown=60) diff --git a/backend/app/domains/pipeline/tasks/render_thumbnail.py b/backend/app/domains/pipeline/tasks/render_thumbnail.py new file mode 100644 index 0000000..4cc1bbf --- /dev/null +++ b/backend/app/domains/pipeline/tasks/render_thumbnail.py @@ -0,0 +1,156 @@ +"""Thumbnail rendering tasks. + +Covers: +- render_step_thumbnail — render thumbnail for a freshly-processed STEP file +- regenerate_thumbnail — re-render thumbnail with updated per-part colours +""" +import logging +from pathlib import Path + +from app.tasks.celery_app import celery_app +from app.core.task_logs import log_task_event +from app.core.pipeline_logger import PipelineLogger + +logger = logging.getLogger(__name__) + + +@celery_app.task(bind=True, name="app.tasks.step_tasks.render_step_thumbnail", queue="thumbnail_rendering") +def render_step_thumbnail(self, cad_file_id: str): + """Render the thumbnail for a freshly-processed STEP file. + + Runs on the dedicated thumbnail_rendering queue (concurrency=1) so the + blender-renderer service is never overwhelmed by concurrent requests. + On success, also auto-populates materials and marks the CadFile as completed. + """ + pl = PipelineLogger(task_id=self.request.id) + pl.step_start("render_step_thumbnail", {"cad_file_id": cad_file_id}) + logger.info(f"Rendering thumbnail for CAD file: {cad_file_id}") + + # Compute and persist STEP file hash for STL cache lookups + try: + from sqlalchemy import create_engine + from sqlalchemy.orm import Session + from app.config import settings as app_settings + from app.models.cad_file import CadFile + from app.domains.products.cache_service import compute_step_hash + + sync_url = app_settings.database_url.replace("+asyncpg", "") + _eng = create_engine(sync_url) + with Session(_eng) as _sess: + _cad = _sess.get(CadFile, cad_file_id) + if _cad and _cad.stored_path and not _cad.step_file_hash: + _hash = compute_step_hash(_cad.stored_path) + _cad.step_file_hash = _hash + _sess.commit() + logger.info(f"Saved step_file_hash for {cad_file_id}: {_hash[:12]}…") + _eng.dispose() + except Exception: + logger.warning(f"step_file_hash computation failed for {cad_file_id} (non-fatal)") + + try: + from app.services.step_processor import regenerate_cad_thumbnail + pl.info("render_step_thumbnail", "Calling regenerate_cad_thumbnail") + success = regenerate_cad_thumbnail(cad_file_id, part_colors={}) + if not success: + raise RuntimeError("regenerate_cad_thumbnail returned False") + except Exception as exc: + pl.step_error("render_step_thumbnail", f"Thumbnail render failed: {exc}", exc) + logger.error(f"Thumbnail render failed for {cad_file_id}: {exc}") + raise self.retry(exc=exc, countdown=30, max_retries=2) + + # Extract bounding box from the thumbnail GLB generated by the renderer. + # GLB bbox via trimesh is fast and avoids re-parsing the STEP file. + # Falls back to cadquery STEP re-parse if GLB is not found. + try: + from sqlalchemy import create_engine + from sqlalchemy.orm import Session + from app.config import settings as _cfg2 + from app.models.cad_file import CadFile as _CadFile2 + from app.domains.pipeline.tasks.extract_metadata import _bbox_from_glb, _bbox_from_step_cadquery + + _sync_url2 = _cfg2.database_url.replace("+asyncpg", "") + _eng2 = create_engine(_sync_url2) + with Session(_eng2) as _sess2: + _cad2 = _sess2.get(_CadFile2, cad_file_id) + _step_path = _cad2.stored_path if _cad2 else None + _eng2.dispose() + + if _step_path and not (_cad2.mesh_attributes or {}).get("dimensions_mm"): + _step = Path(_step_path) + _glb = _step.parent / f"{_step.stem}_thumbnail.glb" + bbox_data = _bbox_from_glb(str(_glb)) or _bbox_from_step_cadquery(_step_path) + if bbox_data: + _eng2 = create_engine(_sync_url2) + with Session(_eng2) as _sess2: + _cad2 = _sess2.get(_CadFile2, cad_file_id) + if _cad2: + _cad2.mesh_attributes = {**( _cad2.mesh_attributes or {}), **bbox_data} + _sess2.commit() + dims = bbox_data["dimensions_mm"] + logger.info( + f"bbox for {cad_file_id}: " + f"{dims['x']}×{dims['y']}×{dims['z']} mm" + ) + _eng2.dispose() + except Exception: + logger.exception(f"bbox extraction failed for {cad_file_id} (non-fatal)") + + # Auto-populate materials now that parsed_objects are available + try: + from app.domains.pipeline.tasks.extract_metadata import _auto_populate_materials_for_cad + _auto_populate_materials_for_cad(cad_file_id) + except Exception: + logger.exception( + f"Auto material population failed for cad_file {cad_file_id} (non-fatal)" + ) + + # Broadcast WebSocket event for live UI updates + try: + from sqlalchemy import create_engine, select as sql_select2 + from sqlalchemy.orm import Session as _Session + from app.config import settings as _cfg + from app.models.cad_file import CadFile as _CadFile + _sync_url = _cfg.database_url.replace("+asyncpg", "") + _eng = create_engine(_sync_url) + with _Session(_eng) as _s: + _cad = _s.get(_CadFile, cad_file_id) + _tid = str(_cad.tenant_id) if _cad and _cad.tenant_id else None + _eng.dispose() + if _tid: + from app.core.websocket import publish_event_sync + publish_event_sync(_tid, { + "type": "cad_processing_complete", + "cad_file_id": cad_file_id, + "status": "completed", + }) + except Exception: + logger.debug("WebSocket publish for CAD complete skipped (non-fatal)") + + # Auto-generate geometry GLB so the 3D viewer is ready without manual trigger + try: + from app.domains.pipeline.tasks.export_glb import generate_gltf_geometry_task + generate_gltf_geometry_task.delay(cad_file_id) + pl.info("render_step_thumbnail", f"Queued generate_gltf_geometry_task for {cad_file_id}") + logger.info("render_step_thumbnail: queued generate_gltf_geometry_task for %s", cad_file_id) + except Exception: + logger.debug("Could not queue generate_gltf_geometry_task (non-fatal)") + + pl.step_done("render_step_thumbnail") + + +@celery_app.task(bind=True, name="app.tasks.step_tasks.regenerate_thumbnail", queue="thumbnail_rendering") +def regenerate_thumbnail(self, cad_file_id: str, part_colors: dict): + """Regenerate thumbnail with per-part colours.""" + pl = PipelineLogger(task_id=self.request.id) + pl.step_start("regenerate_thumbnail", {"cad_file_id": cad_file_id}) + logger.info(f"Regenerating thumbnail for CAD file: {cad_file_id}") + try: + from app.services.step_processor import regenerate_cad_thumbnail + success = regenerate_cad_thumbnail(cad_file_id, part_colors) + if not success: + raise RuntimeError("regenerate_cad_thumbnail returned False") + except Exception as exc: + pl.step_error("regenerate_thumbnail", f"Thumbnail regeneration failed: {exc}", exc) + logger.error(f"Thumbnail regeneration failed for {cad_file_id}: {exc}") + raise self.retry(exc=exc, countdown=30, max_retries=2) + pl.step_done("regenerate_thumbnail") diff --git a/backend/app/tasks/celery_app.py b/backend/app/tasks/celery_app.py index 76d569e..62954ab 100644 --- a/backend/app/tasks/celery_app.py +++ b/backend/app/tasks/celery_app.py @@ -7,7 +7,11 @@ celery_app = Celery( broker=settings.redis_url, backend=settings.redis_url, include=[ - "app.tasks.step_tasks", + "app.tasks.step_tasks", # shim — re-exports from domains.pipeline.tasks.* + "app.domains.pipeline.tasks.extract_metadata", + "app.domains.pipeline.tasks.render_thumbnail", + "app.domains.pipeline.tasks.render_order_line", + "app.domains.pipeline.tasks.export_glb", "app.tasks.ai_tasks", "app.tasks.beat_tasks", "app.domains.rendering.tasks", @@ -24,10 +28,12 @@ celery_app.conf.update( timezone="UTC", enable_utc=True, task_routes={ - "app.tasks.step_tasks.*": {"queue": "step_processing"}, - "app.tasks.ai_tasks.*": {"queue": "ai_validation"}, - "app.tasks.beat_tasks.*": {"queue": "step_processing"}, + "app.domains.pipeline.tasks.*": {"queue": "step_processing"}, "app.domains.rendering.tasks.*": {"queue": "thumbnail_rendering"}, + "app.tasks.beat_tasks.*": {"queue": "step_processing"}, + "app.tasks.ai_tasks.*": {"queue": "ai_validation"}, + # Legacy task names (shim) — keep until old queued tasks drain + "app.tasks.step_tasks.*": {"queue": "step_processing"}, }, beat_schedule={ "broadcast-queue-status-every-10s": { diff --git a/backend/app/tasks/step_tasks.py b/backend/app/tasks/step_tasks.py index 72e5abd..9af1552 100644 --- a/backend/app/tasks/step_tasks.py +++ b/backend/app/tasks/step_tasks.py @@ -1,1172 +1,23 @@ -"""Celery tasks for STEP file processing and thumbnail generation.""" -import logging -from pathlib import Path -from app.tasks.celery_app import celery_app -from app.core.task_logs import log_task_event - -logger = logging.getLogger(__name__) - - -def _bbox_from_glb(glb_path: str) -> dict | None: - """Extract bounding box from a GLB file (meters → converted to mm). - - Returns {"dimensions_mm": {x,y,z}, "bbox_center_mm": {x,y,z}} or None on failure. - OCC GLB output is in meters; multiply by 1000 to get mm. - """ - try: - import trimesh - p = Path(glb_path) - if not p.exists(): - return None - scene = trimesh.load(str(p), force="scene") - bounds = getattr(scene, "bounds", None) - if bounds is None: - return None - mins, maxs = bounds - dims = maxs - mins - return { - "dimensions_mm": { - "x": round(float(dims[0]) * 1000, 2), - "y": round(float(dims[1]) * 1000, 2), - "z": round(float(dims[2]) * 1000, 2), - }, - "bbox_center_mm": { - "x": round(float((mins[0] + maxs[0]) / 2) * 1000, 2), - "y": round(float((mins[1] + maxs[1]) / 2) * 1000, 2), - "z": round(float((mins[2] + maxs[2]) / 2) * 1000, 2), - }, - } - except Exception as exc: - logger.debug(f"_bbox_from_glb failed for {glb_path}: {exc}") - return None - - -def _bbox_from_step_cadquery(step_path: str) -> dict | None: - """Fallback: extract bounding box by re-parsing STEP via cadquery.""" - try: - import cadquery as cq - bb = cq.importers.importStep(step_path).val().BoundingBox() - return { - "dimensions_mm": { - "x": round(bb.xlen, 2), - "y": round(bb.ylen, 2), - "z": round(bb.zlen, 2), - }, - "bbox_center_mm": { - "x": round((bb.xmin + bb.xmax) / 2, 2), - "y": round((bb.ymin + bb.ymax) / 2, 2), - "z": round((bb.zmin + bb.zmax) / 2, 2), - }, - } - except Exception as exc: - logger.debug(f"_bbox_from_step_cadquery failed for {step_path}: {exc}") - return None - - -@celery_app.task(bind=True, name="app.tasks.step_tasks.process_step_file", queue="step_processing") -def process_step_file(self, cad_file_id: str): - """Process a STEP file: extract objects, generate thumbnail, convert to glTF. - - After processing completes, auto-populate cad_part_materials from Excel - component data for any linked products that don't yet have materials assigned. - - A per-file Redis lock (TTL = 10 min) prevents duplicate tasks from processing - the same file concurrently — e.g. when 'Process Unprocessed' is clicked while - a file is already being processed. - """ - import redis as redis_lib - from app.config import settings as app_settings - - lock_key = f"step_processing_lock:{cad_file_id}" - r = redis_lib.from_url(app_settings.redis_url) - acquired = r.set(lock_key, "1", nx=True, ex=600) # 10-minute TTL - if not acquired: - logger.warning(f"STEP file {cad_file_id} is already being processed — skipping duplicate task") - return - - try: - logger.info(f"Processing STEP file (metadata only): {cad_file_id}") - try: - from app.services.step_processor import extract_cad_metadata - extract_cad_metadata(cad_file_id) - except Exception as exc: - logger.error(f"STEP metadata extraction failed for {cad_file_id}: {exc}") - r.delete(lock_key) # release lock so a retry can proceed - raise self.retry(exc=exc, countdown=60, max_retries=3) - finally: - r.delete(lock_key) # always release on completion or unhandled error - - # Queue thumbnail rendering on the dedicated single-concurrency worker - render_step_thumbnail.delay(cad_file_id) - - -def _auto_populate_materials_for_cad(cad_file_id: str) -> None: - """Sync helper: auto-populate cad_part_materials from Excel for newly-processed CAD files. - - Only fills products where cad_part_materials is empty or all-blank, - preventing overwrites of manually assigned materials. - """ - from sqlalchemy import create_engine, select as sql_select, update as sql_update - from sqlalchemy.orm import Session - from app.config import settings as app_settings - from app.models.cad_file import CadFile - from app.models.product import Product - from app.api.routers.products import build_materials_from_excel - from app.services.step_processor import build_part_colors - - sync_url = app_settings.database_url.replace("+asyncpg", "") - eng = create_engine(sync_url) - with Session(eng) as session: - # Load the CAD file to get parsed objects - cad_file = session.execute( - sql_select(CadFile).where(CadFile.id == cad_file_id) - ).scalar_one_or_none() - if cad_file is None: - return - - parsed_objects = cad_file.parsed_objects or {} - cad_parts: list[str] = parsed_objects.get("objects", []) - if not cad_parts: - return - - # Find products linked to this CAD file that have Excel components - products = session.execute( - sql_select(Product).where( - Product.cad_file_id == cad_file.id, - Product.is_active.is_(True), - ) - ).scalars().all() - - final_part_colors = None - for product in products: - excel_components: list[dict] = product.components or [] - if not excel_components: - continue - - # Only auto-fill when cad_part_materials is empty or all-blank - existing = product.cad_part_materials or [] - if existing and any(m.get("material", "").strip() for m in existing): - continue # has at least one real material — don't overwrite - - new_materials = build_materials_from_excel(cad_parts, excel_components) - session.execute( - sql_update(Product) - .where(Product.id == product.id) - .values(cad_part_materials=new_materials) - ) - session.flush() - - # Compute part colors; thumbnail queued once after the loop - try: - final_part_colors = build_part_colors(cad_parts, new_materials) - except Exception: - logger.exception(f"Part colors build failed for product {product.id}") - - logger.info( - f"Auto-populated {len(new_materials)} materials for product {product.id} " - f"from {len(excel_components)} Excel components" - ) - - session.commit() - - # Queue exactly ONE thumbnail regeneration per CAD file regardless of how many - # products were auto-populated. Queuing once-per-product multiplies the task - # count needlessly and causes the Redis queue depth to grow instead of shrink. - if final_part_colors is not None: - try: - regenerate_thumbnail.delay(str(cad_file_id), final_part_colors) - except Exception: - logger.exception(f"Thumbnail regen queue failed for cad_file {cad_file_id}") - - eng.dispose() - - -@celery_app.task(bind=True, name="app.tasks.step_tasks.render_step_thumbnail", queue="thumbnail_rendering") -def render_step_thumbnail(self, cad_file_id: str): - """Render the thumbnail for a freshly-processed STEP file. - - Runs on the dedicated thumbnail_rendering queue (concurrency=1) so the - blender-renderer service is never overwhelmed by concurrent requests. - On success, also auto-populates materials and marks the CadFile as completed. - """ - logger.info(f"Rendering thumbnail for CAD file: {cad_file_id}") - - # Compute and persist STEP file hash for STL cache lookups - try: - from sqlalchemy import create_engine - from sqlalchemy.orm import Session - from app.config import settings as app_settings - from app.models.cad_file import CadFile - from app.domains.products.cache_service import compute_step_hash - - sync_url = app_settings.database_url.replace("+asyncpg", "") - _eng = create_engine(sync_url) - with Session(_eng) as _sess: - _cad = _sess.get(CadFile, cad_file_id) - if _cad and _cad.stored_path and not _cad.step_file_hash: - _hash = compute_step_hash(_cad.stored_path) - _cad.step_file_hash = _hash - _sess.commit() - logger.info(f"Saved step_file_hash for {cad_file_id}: {_hash[:12]}…") - _eng.dispose() - except Exception: - logger.warning(f"step_file_hash computation failed for {cad_file_id} (non-fatal)") - - try: - from app.services.step_processor import regenerate_cad_thumbnail - success = regenerate_cad_thumbnail(cad_file_id, part_colors={}) - if not success: - raise RuntimeError("regenerate_cad_thumbnail returned False") - except Exception as exc: - logger.error(f"Thumbnail render failed for {cad_file_id}: {exc}") - raise self.retry(exc=exc, countdown=30, max_retries=2) - - # Extract bounding box from the thumbnail GLB generated by the renderer. - # GLB bbox via trimesh is fast and avoids re-parsing the STEP file. - # Falls back to cadquery STEP re-parse if GLB is not found. - try: - from sqlalchemy import create_engine - from sqlalchemy.orm import Session - from app.config import settings as _cfg2 - from app.models.cad_file import CadFile as _CadFile2 - - _sync_url2 = _cfg2.database_url.replace("+asyncpg", "") - _eng2 = create_engine(_sync_url2) - with Session(_eng2) as _sess2: - _cad2 = _sess2.get(_CadFile2, cad_file_id) - _step_path = _cad2.stored_path if _cad2 else None - _eng2.dispose() - - if _step_path and not (_cad2.mesh_attributes or {}).get("dimensions_mm"): - _step = Path(_step_path) - _glb = _step.parent / f"{_step.stem}_thumbnail.glb" - bbox_data = _bbox_from_glb(str(_glb)) or _bbox_from_step_cadquery(_step_path) - if bbox_data: - _eng2 = create_engine(_sync_url2) - with Session(_eng2) as _sess2: - _cad2 = _sess2.get(_CadFile2, cad_file_id) - if _cad2: - _cad2.mesh_attributes = {**( _cad2.mesh_attributes or {}), **bbox_data} - _sess2.commit() - dims = bbox_data["dimensions_mm"] - logger.info( - f"bbox for {cad_file_id}: " - f"{dims['x']}×{dims['y']}×{dims['z']} mm" - ) - _eng2.dispose() - except Exception: - logger.exception(f"bbox extraction failed for {cad_file_id} (non-fatal)") - - # Auto-populate materials now that parsed_objects are available - try: - _auto_populate_materials_for_cad(cad_file_id) - except Exception: - logger.exception( - f"Auto material population failed for cad_file {cad_file_id} (non-fatal)" - ) - - # Broadcast WebSocket event for live UI updates - try: - from sqlalchemy import create_engine, select as sql_select2 - from sqlalchemy.orm import Session as _Session - from app.config import settings as _cfg - from app.models.cad_file import CadFile as _CadFile - _sync_url = _cfg.database_url.replace("+asyncpg", "") - _eng = create_engine(_sync_url) - with _Session(_eng) as _s: - _cad = _s.get(_CadFile, cad_file_id) - _tid = str(_cad.tenant_id) if _cad and _cad.tenant_id else None - _eng.dispose() - if _tid: - from app.core.websocket import publish_event_sync - publish_event_sync(_tid, { - "type": "cad_processing_complete", - "cad_file_id": cad_file_id, - "status": "completed", - }) - except Exception: - logger.debug("WebSocket publish for CAD complete skipped (non-fatal)") - - # Auto-generate geometry GLB so the 3D viewer is ready without manual trigger - try: - generate_gltf_geometry_task.delay(cad_file_id) - logger.info("render_step_thumbnail: queued generate_gltf_geometry_task for %s", cad_file_id) - except Exception: - logger.debug("Could not queue generate_gltf_geometry_task (non-fatal)") - - -@celery_app.task(name="app.tasks.step_tasks.reextract_cad_metadata", queue="thumbnail_rendering") -def reextract_cad_metadata(cad_file_id: str): - """Re-extract bounding-box dimensions for an already-completed CAD file. - - Uses cadquery (available in render-worker) to compute dimensions_mm. - Updates mesh_attributes without changing processing_status or re-rendering. - Safe to run on completed files. - """ - from sqlalchemy import create_engine - from sqlalchemy.orm import Session - from app.config import settings as app_settings - from app.models.cad_file import CadFile - - sync_url = app_settings.database_url.replace("+asyncpg", "") - eng = create_engine(sync_url) - with Session(eng) as session: - cad_file = session.get(CadFile, cad_file_id) - if not cad_file or not cad_file.stored_path: - logger.warning(f"reextract_cad_metadata: file not found {cad_file_id}") - eng.dispose() - return - step_path = cad_file.stored_path - - try: - p = Path(step_path) - glb_path = p.parent / f"{p.stem}_thumbnail.glb" - patch = _bbox_from_glb(str(glb_path)) or _bbox_from_step_cadquery(step_path) - if patch: - with Session(eng) as session: - cad_file = session.get(CadFile, cad_file_id) - if cad_file: - cad_file.mesh_attributes = {**(cad_file.mesh_attributes or {}), **patch} - session.commit() - dims = patch["dimensions_mm"] - logger.info( - f"reextract_cad_metadata: {cad_file_id} → " - f"{dims['x']}×{dims['y']}×{dims['z']} mm" - ) - else: - logger.warning(f"reextract_cad_metadata: no bbox data for {cad_file_id}") - except Exception as exc: - logger.error(f"reextract_cad_metadata failed for {cad_file_id}: {exc}") - finally: - eng.dispose() - - -@celery_app.task(bind=True, name="app.tasks.step_tasks.generate_gltf_geometry_task", queue="thumbnail_rendering", max_retries=1) -def generate_gltf_geometry_task(self, cad_file_id: str): - """Export a geometry GLB directly from STEP via OCC (no STL intermediary). - - Pipeline: - 1. Reads STEP file directly (no STL needed) - 2. Builds color_map from product.cad_part_materials (hex colors) - 3. Runs export_step_to_gltf.py (Python/OCP): STEP → GLB with per-part colors - 4. Stores result as gltf_geometry MediaAsset (replaces any existing one) - - Output is in meters, Y-up (glTF convention). - """ - import json as _json - import os as _os - import subprocess as _subprocess - import sys as _sys - from pathlib import Path as _Path - from sqlalchemy import create_engine, select as _select - from sqlalchemy.orm import Session - from app.config import settings as app_settings - from app.models.cad_file import CadFile - - from app.models.system_setting import SystemSetting as _SysSetting - - sync_url = app_settings.database_url.replace("+asyncpg", "") - eng = create_engine(sync_url) - with Session(eng) as session: - cad_file = session.get(CadFile, cad_file_id) - if not cad_file or not cad_file.stored_path: - logger.error("generate_gltf_geometry_task: no stored_path for %s", cad_file_id) - return - step_path_str = cad_file.stored_path - - # Build hex color_map from product.cad_part_materials - from app.domains.products.models import Product - product = session.execute( - _select(Product).where(Product.cad_file_id == cad_file.id) - ).scalar_one_or_none() - - color_map: dict[str, str] = {} - product_id = str(product.id) if product else None - if product and product.cad_part_materials: - for entry in product.cad_part_materials: - part_name = entry.get("part_name") or entry.get("name", "") - hex_color = entry.get("hex_color") or entry.get("color", "") - if part_name and hex_color: - color_map[part_name] = hex_color - - settings_rows = session.execute(_select(_SysSetting)).scalars().all() - sys_settings = {s.key: s.value for s in settings_rows} - eng.dispose() - - linear_deflection = float(sys_settings.get("gltf_preview_linear_deflection", "0.1")) - angular_deflection = float(sys_settings.get("gltf_preview_angular_deflection", "0.5")) - - step = _Path(step_path_str) - if not step.exists(): - log_task_event(self.request.id, f"Failed: STEP file not found: {step}", "error") - raise RuntimeError(f"STEP file not found: {step}") - - output_path = step.parent / f"{step.stem}_geometry.glb" - - log_task_event( - self.request.id, - f"Starting OCC GLB export: {len(color_map)} part colors", - "info", - ) - - # Run export_step_to_gltf.py as a subprocess so OCP imports don't pollute worker state - scripts_dir = _Path(_os.environ.get("RENDER_SCRIPTS_DIR", "/render-scripts")) - script_path = scripts_dir / "export_step_to_gltf.py" - - python_bin = _sys.executable - cmd = [ - python_bin, str(script_path), - "--step_path", str(step), - "--output_path", str(output_path), - "--color_map", _json.dumps(color_map), - "--linear_deflection", str(linear_deflection), - "--angular_deflection", str(angular_deflection), - ] - log_task_event( - self.request.id, - f"OCC tessellation: linear={linear_deflection}mm, angular={angular_deflection}rad", - "info", - ) - - try: - result = _subprocess.run(cmd, capture_output=True, text=True, timeout=120) - for line in result.stdout.splitlines(): - logger.info("[occ-gltf] %s", line) - for line in result.stderr.splitlines(): - logger.warning("[occ-gltf stderr] %s", line) - - if result.returncode != 0 or not output_path.exists() or output_path.stat().st_size == 0: - raise RuntimeError( - f"export_step_to_gltf.py failed (exit {result.returncode}).\n" - f"STDERR: {result.stderr[-1000:]}" - ) - except Exception as exc: - log_task_event(self.request.id, f"Failed: {exc}", "error") - logger.error("generate_gltf_geometry_task OCC export failed: %s", exc) - raise self.retry(exc=exc, countdown=15) - - log_task_event(self.request.id, f"OCC GLB export completed: {output_path.name}", "done") - - # --- Store MediaAsset (replace existing gltf_geometry for this cad_file) --- - import uuid as _uuid - from sqlalchemy import create_engine as _ce, delete as _del - from sqlalchemy.orm import Session as _Session - from app.domains.media.models import MediaAsset, MediaAssetType - - _sync_url = app_settings.database_url.replace("+asyncpg", "") - _eng2 = _ce(_sync_url) - with _Session(_eng2) as _sess: - _sess.execute( - _del(MediaAsset).where( - MediaAsset.cad_file_id == _uuid.UUID(cad_file_id), - MediaAsset.asset_type == MediaAssetType.gltf_geometry, - ) - ) - _key = str(output_path) - _prefix = str(app_settings.upload_dir).rstrip("/") + "/" - if _key.startswith(_prefix): - _key = _key[len(_prefix):] - asset = MediaAsset( - cad_file_id=_uuid.UUID(cad_file_id), - product_id=_uuid.UUID(product_id) if product_id else None, - asset_type=MediaAssetType.gltf_geometry, - storage_key=_key, - mime_type="model/gltf-binary", - file_size_bytes=output_path.stat().st_size if output_path.exists() else None, - ) - _sess.add(asset) - _sess.commit() - asset_id = str(asset.id) - _eng2.dispose() - - logger.info("generate_gltf_geometry_task: MediaAsset %s created for cad %s", asset_id, cad_file_id) - return {"glb_path": str(output_path), "asset_id": asset_id} - - -@celery_app.task( - bind=True, - name="app.tasks.step_tasks.generate_gltf_production_task", - queue="thumbnail_rendering", - max_retries=2, +""" +Legacy import shim — tasks have moved to app.domains.pipeline.tasks.* + +This module re-exports all tasks so that existing Celery task names +(registered as "app.tasks.step_tasks.*") continue to resolve. +Celery discovers tasks by import path, so these re-exports are required. +""" +from app.domains.pipeline.tasks.extract_metadata import ( # noqa: F401 + process_step_file, + reextract_cad_metadata, +) +from app.domains.pipeline.tasks.render_thumbnail import ( # noqa: F401 + render_step_thumbnail, + regenerate_thumbnail, +) +from app.domains.pipeline.tasks.render_order_line import ( # noqa: F401 + dispatch_order_line_render, + render_order_line_task, +) +from app.domains.pipeline.tasks.export_glb import ( # noqa: F401 + generate_gltf_geometry_task, + generate_gltf_production_task, ) -def generate_gltf_production_task(self, cad_file_id: str, product_id: str | None = None) -> dict: - """Generate a production GLB (Blender + PBR materials) from a geometry GLB via export_gltf.py. - - 1. Ensures a gltf_geometry MediaAsset exists (runs OCC export inline if not). - 2. Resolves SCHAEFFLER material map for the CadFile's product. - 3. Runs Blender headless with export_gltf.py → production GLB. - 4. Stores result as gltf_production MediaAsset. - """ - import json as _json - import os as _os - import subprocess as _subprocess - import sys as _sys - import uuid as _uuid - from pathlib import Path as _Path - - from sqlalchemy import create_engine as _ce, delete as _del, select as _sel - from sqlalchemy.orm import Session as _Session - - from app.config import settings as app_settings - from app.domains.media.models import MediaAsset, MediaAssetType - from app.services.render_blender import find_blender, is_blender_available - - log_task_event(self.request.id, f"generate_gltf_production_task started for cad {cad_file_id}", "info") - - _sync_url = app_settings.database_url.replace("+asyncpg", "") - _eng = _ce(_sync_url) - - # --- 1. Resolve STEP file path and system settings --- - from app.models.cad_file import CadFile as _CF - from app.models.system_setting import SystemSetting - - with _Session(_eng) as _sess: - _cad = _sess.execute( - _sel(_CF).where(_CF.id == _uuid.UUID(cad_file_id)) - ).scalar_one_or_none() - step_path_str = _cad.stored_path if _cad else None - - settings_rows = _sess.execute(_sel(SystemSetting)).scalars().all() - sys_settings = {s.key: s.value for s in settings_rows} - - if not step_path_str: - raise RuntimeError(f"CadFile {cad_file_id} not found in DB") - step_path = _Path(step_path_str) - if not step_path.exists(): - raise RuntimeError(f"STEP file not found: {step_path}") - - smooth_angle = float(sys_settings.get("blender_smooth_angle", "30")) - prod_linear = float(sys_settings.get("gltf_production_linear_deflection", "0.03")) - prod_angular = float(sys_settings.get("gltf_production_angular_deflection", "0.2")) - - scripts_dir = _Path(_os.environ.get("RENDER_SCRIPTS_DIR", "/render-scripts")) - occ_script = scripts_dir / "export_step_to_gltf.py" - if not occ_script.exists(): - raise RuntimeError(f"export_step_to_gltf.py not found at {occ_script}") - - prod_geom_glb = step_path.parent / f"{step_path.stem}_production_geom.glb" - python_bin = _sys.executable - occ_cmd = [ - python_bin, str(occ_script), - "--step_path", str(step_path), - "--output_path", str(prod_geom_glb), - "--linear_deflection", str(prod_linear), - "--angular_deflection", str(prod_angular), - ] - log_task_event( - self.request.id, - f"Re-exporting STEP at production quality (linear={prod_linear}mm, angular={prod_angular}rad)", - "info", - ) - try: - occ_result = _subprocess.run(occ_cmd, capture_output=True, text=True, timeout=180) - for line in occ_result.stdout.splitlines(): - logger.info("[occ-prod] %s", line) - if occ_result.returncode != 0 or not prod_geom_glb.exists() or prod_geom_glb.stat().st_size == 0: - raise RuntimeError( - f"OCC export failed (exit {occ_result.returncode}): {occ_result.stderr[-500:]}" - ) - except Exception as exc: - log_task_event(self.request.id, f"OCC re-export failed: {exc}", "error") - raise self.retry(exc=exc, countdown=30) - - geom_glb_path = prod_geom_glb - - # --- 2. Resolve material map from Product.cad_part_materials (SCHAEFFLER library names) --- - # cad_part_materials lives on Product (list[dict]), NOT on CadFile. - # We look up the Product that owns this CadFile (prefer product_id arg if given). - from app.services.material_service import resolve_material_map - from app.domains.products.models import Product as _Product - - with _Session(_eng) as _sess: - _prod_query = _sel(_Product).where(_Product.cad_file_id == _uuid.UUID(cad_file_id)) - if product_id: - _prod_query = _prod_query.where(_Product.id == _uuid.UUID(product_id)) - _product = _sess.execute(_prod_query).scalars().first() - raw_materials: list[dict] = _product.cad_part_materials if _product else [] - - # Convert list[{"part_name": X, "material": Y}] → dict[str, str] for resolve_material_map - raw_mat_map: dict[str, str] = { - m["part_name"]: m["material"] - for m in raw_materials - if m.get("part_name") and m.get("material") - } - mat_map = resolve_material_map(raw_mat_map) - logger.info( - "generate_gltf_production_task: resolved %d material(s) for cad %s (product: %s)", - len(mat_map), cad_file_id, _product.id if _product else "none", - ) - - # --- 3. Run Blender: apply materials + smooth shading + export production GLB --- - # Use get_material_library_path() which checks active AssetLibrary first, - # then falls back to the legacy material_library_path system setting. - from app.services.template_service import get_material_library_path - asset_library_blend = get_material_library_path() or "" - _eng.dispose() - - output_path = step_path.parent / f"{step_path.stem}_production.glb" - - export_script = scripts_dir / "export_gltf.py" - if not is_blender_available(): - raise RuntimeError("Blender is not available — cannot generate production GLB") - if not export_script.exists(): - raise RuntimeError(f"export_gltf.py not found at {export_script}") - - blender_bin = find_blender() - cmd = [ - blender_bin, "--background", - "--python", str(export_script), - "--", - "--glb_path", str(geom_glb_path), - "--output_path", str(output_path), - "--material_map", _json.dumps(mat_map), - "--smooth_angle", str(smooth_angle), - ] - if asset_library_blend: - cmd += ["--asset_library_blend", asset_library_blend] - - log_task_event( - self.request.id, - f"Running Blender export_gltf.py — {len(mat_map)} material(s), smooth={smooth_angle}°", - "info", - ) - try: - result = _subprocess.run(cmd, capture_output=True, text=True, timeout=300) - for line in result.stdout.splitlines(): - logger.info("[export-gltf] %s", line) - if result.returncode != 0: - raise RuntimeError( - f"export_gltf.py exited {result.returncode}:\n{result.stderr[-500:]}" - ) - except Exception as exc: - log_task_event(self.request.id, f"Blender production GLB failed: {exc}", "error") - logger.error("generate_gltf_production_task Blender failed for cad %s: %s", cad_file_id, exc) - raise self.retry(exc=exc, countdown=30) - finally: - # Clean up the high-quality temp geometry GLB (not needed after Blender export) - try: - prod_geom_glb.unlink(missing_ok=True) - except Exception: - pass - - log_task_event(self.request.id, f"Production GLB exported: {output_path.name}", "done") - - # --- 4. Store MediaAsset (replace existing gltf_production for this cad_file) --- - _eng2 = _ce(_sync_url) - with _Session(_eng2) as _sess: - _sess.execute( - _del(MediaAsset).where( - MediaAsset.cad_file_id == _uuid.UUID(cad_file_id), - MediaAsset.asset_type == MediaAssetType.gltf_production, - ) - ) - _key = str(output_path) - _prefix = str(app_settings.upload_dir).rstrip("/") + "/" - if _key.startswith(_prefix): - _key = _key[len(_prefix):] - asset = MediaAsset( - cad_file_id=_uuid.UUID(cad_file_id), - product_id=_uuid.UUID(product_id) if product_id else None, - asset_type=MediaAssetType.gltf_production, - storage_key=_key, - mime_type="model/gltf-binary", - file_size_bytes=output_path.stat().st_size if output_path.exists() else None, - ) - _sess.add(asset) - _sess.commit() - asset_id = str(asset.id) - _eng2.dispose() - - logger.info("generate_gltf_production_task: MediaAsset %s created for cad %s", asset_id, cad_file_id) - return {"glb_path": str(output_path), "asset_id": asset_id} - - -@celery_app.task(bind=True, name="app.tasks.step_tasks.regenerate_thumbnail", queue="thumbnail_rendering") -def regenerate_thumbnail(self, cad_file_id: str, part_colors: dict): - """Regenerate thumbnail with per-part colours.""" - logger.info(f"Regenerating thumbnail for CAD file: {cad_file_id}") - try: - from app.services.step_processor import regenerate_cad_thumbnail - success = regenerate_cad_thumbnail(cad_file_id, part_colors) - if not success: - raise RuntimeError("regenerate_cad_thumbnail returned False") - except Exception as exc: - logger.error(f"Thumbnail regeneration failed for {cad_file_id}: {exc}") - raise self.retry(exc=exc, countdown=30, max_retries=2) - - -@celery_app.task(name="app.tasks.step_tasks.dispatch_order_line_render", queue="step_processing") -def dispatch_order_line_render(order_line_id: str): - """Route an order-line render to render_order_line_task.""" - logger.info(f"Dispatching render for order line: {order_line_id}") - render_order_line_task.delay(order_line_id) - - -@celery_app.task(bind=True, name="app.tasks.step_tasks.render_order_line_task", queue="thumbnail_rendering", max_retries=3) -def render_order_line_task(self, order_line_id: str): - """Render a specific output type for an order line. - - Loads OrderLine → Product → CadFile → OutputType.render_settings. - Merges with system render settings. Stores result at order_line.result_path. - """ - logger.info(f"Rendering order line: {order_line_id}") - from app.services.render_log import emit - - emit(order_line_id, "Celery render task started") - try: - from sqlalchemy import create_engine, select, update as sql_update - from sqlalchemy.orm import Session, joinedload - from app.config import settings as app_settings - - # Use sync session for Celery (no async event loop) - sync_url = app_settings.database_url.replace("+asyncpg", "") - engine = create_engine(sync_url) - - with Session(engine) as session: - from app.models.order_line import OrderLine - from app.models.product import Product - - emit(order_line_id, "Loading order line from database") - line = session.execute( - select(OrderLine) - .where(OrderLine.id == order_line_id) - .options( - joinedload(OrderLine.product).joinedload(Product.cad_file), - joinedload(OrderLine.output_type), - ) - ).scalar_one_or_none() - - if line is None: - emit(order_line_id, "Order line not found in database", "error") - logger.error(f"OrderLine {order_line_id} not found") - return - - if line.product.cad_file_id is None: - emit(order_line_id, "Product has no CAD file — marking as failed", "error") - logger.warning(f"OrderLine {order_line_id}: product has no CAD file") - session.execute( - sql_update(OrderLine) - .where(OrderLine.id == line.id) - .values(render_status="failed") - ) - session.commit() - return - - # Mark as processing with timing - from datetime import datetime - render_start = datetime.utcnow() - session.execute( - sql_update(OrderLine) - .where(OrderLine.id == line.id) - .values( - render_status="processing", - render_backend_used="celery", - render_started_at=render_start, - ) - ) - session.commit() - - cad_file = line.product.cad_file - materials_source = line.product.cad_part_materials - - part_colors = {} - if cad_file and cad_file.parsed_objects: - parsed_names = cad_file.parsed_objects.get("objects", []) - if materials_source: - from app.services.step_processor import build_part_colors - part_colors = build_part_colors(parsed_names, materials_source) - - # Resolve render template + material library - from app.services.template_service import resolve_template, get_material_library_path - - category_key = line.product.category_key if line.product else None - ot_id = str(line.output_type_id) if line.output_type_id else None - template = resolve_template(category_key=category_key, output_type_id=ot_id) - material_library = get_material_library_path() - - # Build material_map (part_name → material_name) for material replacement. - # Works with or without a render template — only suppressed if a - # template explicitly has material_replace_enabled=False. - material_map = None - use_materials = bool(material_library and materials_source) - if template and not template.material_replace_enabled: - use_materials = False - if use_materials: - material_map = { - m["part_name"]: m["material"] - for m in materials_source - if m.get("part_name") and m.get("material") - } - # Resolve raw material names to SCHAEFFLER library names via aliases - from app.services.material_service import resolve_material_map - material_map = resolve_material_map(material_map) - - if template: - emit(order_line_id, f"Using render template: {template.name} (collection={template.target_collection}, material_replace={template.material_replace_enabled}, lighting_only={template.lighting_only})") - logger.info(f"Render template resolved: '{template.name}' path={template.blend_file_path}, lighting_only={template.lighting_only}") - else: - emit(order_line_id, "No render template found — using factory settings (Mode A)") - logger.info(f"No render template for category_key={category_key!r}, output_type_id={ot_id!r}") - - cad_name = cad_file.original_name if cad_file else "?" - # Load render_position for rotation values - rotation_x = rotation_y = rotation_z = 0.0 - if line.render_position_id: - from app.models.render_position import ProductRenderPosition - rp = session.get(ProductRenderPosition, line.render_position_id) - if rp: - rotation_x, rotation_y, rotation_z = rp.rotation_x, rp.rotation_y, rp.rotation_z - emit(order_line_id, f"Render position: '{rp.name}' ({rotation_x}°, {rotation_y}°, {rotation_z}°)") - - emit(order_line_id, f"Starting render for {cad_name} ({len(part_colors)} coloured parts)") - - # Determine if this is an animation output type - is_animation = bool(line.output_type and getattr(line.output_type, 'is_animation', False)) - - # Determine output format/extension - out_ext = "jpg" - if line.output_type and line.output_type.output_format: - fmt = line.output_type.output_format.lower() - if fmt == "mp4": - out_ext = "mp4" - elif fmt in ("png", "jpg", "jpeg"): - out_ext = "png" if fmt == "png" else "jpg" - - # Build meaningful output filename - import re - def _sanitize(s: str) -> str: - return re.sub(r'[^\w\-.]', '_', s.strip())[:100] - - product_name = line.product.name or line.product.pim_id or "product" - ot_name = line.output_type.name if line.output_type else "render" - filename = f"{_sanitize(product_name)}_{_sanitize(ot_name)}.{out_ext}" - - # Render to per-line output directory - from pathlib import Path as _Path - render_dir = _Path(app_settings.upload_dir) / "renders" / order_line_id - render_dir.mkdir(parents=True, exist_ok=True) - output_path = str(render_dir / filename) - - # Extract per-output-type render settings - render_width = None - render_height = None - render_engine = None - render_samples = None - noise_threshold = "" - denoiser = "" - denoising_input_passes = "" - denoising_prefilter = "" - denoising_quality = "" - denoising_use_gpu = "" - frame_count = 24 - fps = 25 - bg_color = "" - turntable_axis = "world_z" - if line.output_type and line.output_type.render_settings: - rs = line.output_type.render_settings - if rs.get("width"): - render_width = int(rs["width"]) - if rs.get("height"): - render_height = int(rs["height"]) - if rs.get("engine"): - render_engine = rs["engine"] - if rs.get("samples"): - render_samples = int(rs["samples"]) - if rs.get("frame_count"): - frame_count = int(rs["frame_count"]) - if rs.get("fps"): - fps = int(rs["fps"]) - bg_color = rs.get("bg_color", "") - turntable_axis = rs.get("turntable_axis", "world_z") - noise_threshold = str(rs.get("noise_threshold", "")) - denoiser = str(rs.get("denoiser", "")) - denoising_input_passes = str(rs.get("denoising_input_passes", "")) - denoising_prefilter = str(rs.get("denoising_prefilter", "")) - denoising_quality = str(rs.get("denoising_quality", "")) - denoising_use_gpu = str(rs.get("denoising_use_gpu", "")) - - transparent_bg = bool(line.output_type and line.output_type.transparent_bg) - cycles_device_val = (line.output_type.cycles_device or "auto") if line.output_type else "auto" - - # Build ordered part names list for index-based Blender matching - part_names_ordered = None - if cad_file and cad_file.parsed_objects: - part_names_ordered = cad_file.parsed_objects.get("objects", []) or None - - tmpl_info = f" template={template.name}" if template else "" - - if is_animation: - # ── Turntable animation path ──────────────────────────────── - emit(order_line_id, f"Starting turntable render: {frame_count} frames @ {fps}fps, {render_width or 1920}x{render_height or 1920}{tmpl_info}") - from app.services.render_blender import is_blender_available, render_turntable_to_file - if not is_blender_available(): - raise RuntimeError("Blender not available on this worker") - - from app.services.step_processor import _get_all_settings - _sys = _get_all_settings() - try: - service_data = render_turntable_to_file( - step_path=_Path(cad_file.stored_path), - output_path=_Path(output_path), - frame_count=frame_count, - fps=fps, - width=render_width or 1920, - height=render_height or 1920, - engine=render_engine or _sys.get("blender_engine", "cycles"), - samples=render_samples or int(_sys.get(f"blender_{render_engine or _sys.get('blender_engine','cycles')}_samples", 128)), - stl_quality=_sys.get("stl_quality", "low"), - smooth_angle=int(_sys.get("blender_smooth_angle", 30)), - cycles_device=cycles_device_val, - transparent_bg=transparent_bg, - bg_color=bg_color, - turntable_axis=turntable_axis, - part_colors=part_colors or None, - template_path=template.blend_file_path if template else None, - target_collection=template.target_collection if template else "Product", - material_library_path=material_library if use_materials else None, - material_map=material_map, - part_names_ordered=part_names_ordered, - lighting_only=bool(template.lighting_only) if template else False, - shadow_catcher=bool(template.shadow_catcher_enabled) if template else False, - rotation_x=rotation_x, - rotation_y=rotation_y, - rotation_z=rotation_z, - ) - success = True - render_log = { - "renderer": "blender", - "type": "turntable", - "format": "mp4", - "engine": render_engine or _sys.get("blender_engine", "cycles"), - "engine_used": service_data.get("engine_used", "cycles"), - "samples": render_samples, - "cycles_device": cycles_device_val, - "width": render_width or 1920, - "height": render_height or 1920, - "frame_count": service_data.get("frame_count", frame_count), - "fps": fps, - "total_duration_s": service_data.get("total_duration_s"), - "stl_duration_s": service_data.get("stl_duration_s"), - "render_duration_s": service_data.get("render_duration_s"), - "ffmpeg_duration_s": service_data.get("ffmpeg_duration_s"), - "stl_size_bytes": service_data.get("stl_size_bytes"), - "output_size_bytes": service_data.get("output_size_bytes"), - "log_lines": service_data.get("log_lines", []), - } - if template: - render_log["template"] = template.blend_file_path - except Exception as exc: - success = False - render_log = {"renderer": "blender", "type": "turntable", "error": str(exc)[:500]} - logger.error("Turntable render failed for %s: %s", order_line_id, exc) - else: - # ── Still image path ──────────────────────────────────────── - emit(order_line_id, f"Calling renderer (STEP → GLB → Blender) {render_width or 'default'}x{render_height or 'default'}{' [transparent]' if transparent_bg else ''}{f' engine={render_engine}' if render_engine else ''}{f' samples={render_samples}' if render_samples else ''}{tmpl_info}") - from app.services.step_processor import render_to_file - - success, render_log = render_to_file( - step_path=cad_file.stored_path, - output_path=output_path, - part_colors=part_colors, - width=render_width, - height=render_height, - transparent_bg=transparent_bg, - engine=render_engine, - samples=render_samples, - template_path=template.blend_file_path if template else None, - target_collection=template.target_collection if template else "Product", - material_library_path=material_library if use_materials else None, - material_map=material_map, - part_names_ordered=part_names_ordered, - lighting_only=bool(template.lighting_only) if template else False, - shadow_catcher=bool(template.shadow_catcher_enabled) if template else False, - cycles_device=line.output_type.cycles_device if line.output_type else None, - rotation_x=rotation_x, - rotation_y=rotation_y, - rotation_z=rotation_z, - job_id=order_line_id, - order_line_id=order_line_id, - noise_threshold=noise_threshold, - denoiser=denoiser, - denoising_input_passes=denoising_input_passes, - denoising_prefilter=denoising_prefilter, - denoising_quality=denoising_quality, - denoising_use_gpu=denoising_use_gpu, - ) - - new_status = "completed" if success else "failed" - render_end = datetime.utcnow() - elapsed = (render_end - render_start).total_seconds() - - update_values = dict( - render_status=new_status, - render_completed_at=render_end, - render_log=render_log, - ) - if success: - update_values["result_path"] = output_path - - session.execute( - sql_update(OrderLine) - .where(OrderLine.id == line.id) - .values(**update_values) - ) - session.commit() - - if success: - # Create MediaAsset so the render appears in the Media Browser - try: - from app.domains.media.models import MediaAsset, MediaAssetType as MAT - from app.config import settings as _cfg2 - _ext = str(output_path).rsplit(".", 1)[-1].lower() if "." in str(output_path) else "bin" - _mime = "video/mp4" if _ext in ("mp4", "webm") else ("image/jpeg" if _ext in ("jpg", "jpeg") else "image/png") - # Extension determines type — poster frames (.jpg/.png) from animations are still stills - _at = MAT.turntable if _ext in ("mp4", "webm") else MAT.still - _tenant_id = line.product.cad_file.tenant_id if (line.product and line.product.cad_file) else None - # Normalize storage_key to relative path - _raw_key = str(output_path) - _upload_prefix = str(_cfg2.upload_dir).rstrip("/") + "/" - _norm_key = _raw_key[len(_upload_prefix):] if _raw_key.startswith(_upload_prefix) else _raw_key - _existing = session.execute( - select(MediaAsset.id).where(MediaAsset.storage_key == _norm_key).limit(1) - ).scalar_one_or_none() - if not _existing: - _asset = MediaAsset( - tenant_id=_tenant_id, - order_line_id=line.id, - product_id=line.product_id, - asset_type=_at, - storage_key=_norm_key, - mime_type=_mime, - ) - session.add(_asset) - session.commit() - except Exception: - logger.exception("Failed to create MediaAsset for order_line %s", order_line_id) - - if success: - emit(order_line_id, f"Render completed in {elapsed:.1f}s", "success") - else: - emit(order_line_id, f"Render failed after {elapsed:.1f}s", "error") - - # Broadcast WebSocket event for live UI updates - try: - from app.core.websocket import publish_event_sync - _tenant_id = str(line.product.cad_file.tenant_id) if ( - line.product and line.product.cad_file and line.product.cad_file.tenant_id - ) else None - if _tenant_id: - publish_event_sync(_tenant_id, { - "type": "render_complete" if success else "render_failed", - "order_line_id": order_line_id, - "order_id": str(line.order_id), - "status": new_status, - }) - except Exception: - logger.debug("WebSocket publish skipped (non-fatal)") - - # Emit per-render activity event (channel=activity, not shown in bell dropdown) - try: - from app.models.order import Order as OrderModel - order_row = session.execute( - select(OrderModel.created_by, OrderModel.order_number) - .where(OrderModel.id == line.order_id) - ).one_or_none() - if order_row: - from app.services.notification_service import emit_notification_sync, CHANNEL_ACTIVITY - details: dict = { - "order_number": order_row[1], - "product_name": product_name, - "output_type": ot_name, - } - if not success and isinstance(render_log, dict): - err = render_log.get("error") or render_log.get("stderr", "") - if err: - details["error"] = str(err)[:300] - emit_notification_sync( - actor_user_id=None, - target_user_id=str(order_row[0]), - action="render.completed" if success else "render.failed", - entity_type="order", - entity_id=str(line.order_id), - details=details, - channel=CHANNEL_ACTIVITY, - ) - except Exception: - logger.exception("Failed to emit render activity event") - - # Check if all lines for this order are done → auto-advance - order_id_str = str(line.order_id) - - engine.dispose() - - from app.services.order_status_service import check_order_completion - check_order_completion(order_id_str) - - except Exception as exc: - logger.error(f"render_order_line_task failed for {order_line_id}: {exc}") - # If retries exhausted, mark as failed so the line doesn't stay stuck - if self.request.retries >= self.max_retries: - logger.error(f"Max retries reached for {order_line_id}, marking as failed") - try: - from sqlalchemy import create_engine, update as sql_update2 - from sqlalchemy.orm import Session as SyncSession - from app.config import settings as app_settings - from app.models.order_line import OrderLine as OL2 - sync_url2 = app_settings.database_url.replace("+asyncpg", "") - eng2 = create_engine(sync_url2) - with SyncSession(eng2) as s2: - from datetime import datetime as dt2 - s2.execute( - sql_update2(OL2).where(OL2.id == order_line_id) - .values( - render_status="failed", - render_completed_at=dt2.utcnow(), - render_log={"error": str(exc)[:500]}, - ) - ) - s2.commit() - eng2.dispose() - from app.services.order_status_service import check_order_completion - # Try to get order_id from DB - eng3 = create_engine(sync_url2) - with SyncSession(eng3) as s3: - from sqlalchemy import select as sel - row = s3.execute(sel(OL2.order_id).where(OL2.id == order_line_id)).scalar_one_or_none() - if row: - check_order_completion(str(row)) - eng3.dispose() - # Notify the order creator about the failure - try: - from sqlalchemy import select as sel2 - from app.models.order import Order as OrderModel2 - eng4 = create_engine(sync_url2) - with SyncSession(eng4) as s4: - order_row2 = s4.execute( - sel2(OrderModel2.created_by, OrderModel2.order_number) - .join(OL2, OL2.order_id == OrderModel2.id) - .where(OL2.id == order_line_id) - ).one_or_none() - eng4.dispose() - if order_row2: - from app.services.notification_service import emit_notification_sync, CHANNEL_ACTIVITY - emit_notification_sync( - actor_user_id=None, - target_user_id=str(order_row2[0]), - action="render.failed", - entity_type="order", - entity_id=None, - details={ - "order_number": order_row2[1], - "product_name": "unknown", - "output_type": "unknown", - "error": str(exc)[:300], - }, - channel=CHANNEL_ACTIVITY, - ) - except Exception: - logger.exception("Failed to emit render failure activity event") - except Exception: - logger.exception(f"Failed to mark {order_line_id} as failed in DB") - raise - raise self.retry(exc=exc, countdown=60)