"""Celery tasks for STEP file processing and thumbnail generation.""" import logging from app.tasks.celery_app import celery_app logger = logging.getLogger(__name__) @celery_app.task(bind=True, name="app.tasks.step_tasks.process_step_file", queue="step_processing") def process_step_file(self, cad_file_id: str): """Process a STEP file: extract objects, generate thumbnail, convert to glTF. After processing completes, auto-populate cad_part_materials from Excel component data for any linked products that don't yet have materials assigned. A per-file Redis lock (TTL = 10 min) prevents duplicate tasks from processing the same file concurrently — e.g. when 'Process Unprocessed' is clicked while a file is already being processed. """ import redis as redis_lib from app.config import settings as app_settings lock_key = f"step_processing_lock:{cad_file_id}" r = redis_lib.from_url(app_settings.redis_url) acquired = r.set(lock_key, "1", nx=True, ex=600) # 10-minute TTL if not acquired: logger.warning(f"STEP file {cad_file_id} is already being processed — skipping duplicate task") return try: logger.info(f"Processing STEP file (metadata only): {cad_file_id}") try: from app.services.step_processor import extract_cad_metadata extract_cad_metadata(cad_file_id) except Exception as exc: logger.error(f"STEP metadata extraction failed for {cad_file_id}: {exc}") r.delete(lock_key) # release lock so a retry can proceed raise self.retry(exc=exc, countdown=60, max_retries=3) finally: r.delete(lock_key) # always release on completion or unhandled error # Queue thumbnail rendering on the dedicated single-concurrency worker render_step_thumbnail.delay(cad_file_id) def _auto_populate_materials_for_cad(cad_file_id: str) -> None: """Sync helper: auto-populate cad_part_materials from Excel for newly-processed CAD files. Only fills products where cad_part_materials is empty or all-blank, preventing overwrites of manually assigned materials. """ from sqlalchemy import create_engine, select as sql_select, update as sql_update from sqlalchemy.orm import Session from app.config import settings as app_settings from app.models.cad_file import CadFile from app.models.product import Product from app.api.routers.products import build_materials_from_excel from app.services.step_processor import build_part_colors sync_url = app_settings.database_url.replace("+asyncpg", "") eng = create_engine(sync_url) with Session(eng) as session: # Load the CAD file to get parsed objects cad_file = session.execute( sql_select(CadFile).where(CadFile.id == cad_file_id) ).scalar_one_or_none() if cad_file is None: return parsed_objects = cad_file.parsed_objects or {} cad_parts: list[str] = parsed_objects.get("objects", []) if not cad_parts: return # Find products linked to this CAD file that have Excel components products = session.execute( sql_select(Product).where( Product.cad_file_id == cad_file.id, Product.is_active.is_(True), ) ).scalars().all() final_part_colors = None for product in products: excel_components: list[dict] = product.components or [] if not excel_components: continue # Only auto-fill when cad_part_materials is empty or all-blank existing = product.cad_part_materials or [] if existing and any(m.get("material", "").strip() for m in existing): continue # has at least one real material — don't overwrite new_materials = build_materials_from_excel(cad_parts, excel_components) session.execute( sql_update(Product) .where(Product.id == product.id) .values(cad_part_materials=new_materials) ) session.flush() # Compute part colors; thumbnail queued once after the loop try: final_part_colors = build_part_colors(cad_parts, new_materials) except Exception: logger.exception(f"Part colors build failed for product {product.id}") logger.info( f"Auto-populated {len(new_materials)} materials for product {product.id} " f"from {len(excel_components)} Excel components" ) session.commit() # Queue exactly ONE thumbnail regeneration per CAD file regardless of how many # products were auto-populated. Queuing once-per-product multiplies the task # count needlessly and causes the Redis queue depth to grow instead of shrink. if final_part_colors is not None: try: regenerate_thumbnail.delay(str(cad_file_id), final_part_colors) except Exception: logger.exception(f"Thumbnail regen queue failed for cad_file {cad_file_id}") eng.dispose() @celery_app.task(bind=True, name="app.tasks.step_tasks.render_step_thumbnail", queue="thumbnail_rendering") def render_step_thumbnail(self, cad_file_id: str): """Render the thumbnail for a freshly-processed STEP file. Runs on the dedicated thumbnail_rendering queue (concurrency=1) so the blender-renderer service is never overwhelmed by concurrent requests. On success, also auto-populates materials and marks the CadFile as completed. """ logger.info(f"Rendering thumbnail for CAD file: {cad_file_id}") # Compute and persist STEP file hash for STL cache lookups try: from sqlalchemy import create_engine from sqlalchemy.orm import Session from app.config import settings as app_settings from app.models.cad_file import CadFile from app.domains.products.cache_service import compute_step_hash sync_url = app_settings.database_url.replace("+asyncpg", "") _eng = create_engine(sync_url) with Session(_eng) as _sess: _cad = _sess.get(CadFile, cad_file_id) if _cad and _cad.stored_path and not _cad.step_file_hash: _hash = compute_step_hash(_cad.stored_path) _cad.step_file_hash = _hash _sess.commit() logger.info(f"Saved step_file_hash for {cad_file_id}: {_hash[:12]}…") _eng.dispose() except Exception: logger.warning(f"step_file_hash computation failed for {cad_file_id} (non-fatal)") try: from app.services.step_processor import regenerate_cad_thumbnail success = regenerate_cad_thumbnail(cad_file_id, part_colors={}) if not success: raise RuntimeError("regenerate_cad_thumbnail returned False") except Exception as exc: logger.error(f"Thumbnail render failed for {cad_file_id}: {exc}") raise self.retry(exc=exc, countdown=30, max_retries=2) # Auto-populate materials now that parsed_objects are available try: _auto_populate_materials_for_cad(cad_file_id) except Exception: logger.exception( f"Auto material population failed for cad_file {cad_file_id} (non-fatal)" ) # Broadcast WebSocket event for live UI updates try: from sqlalchemy import create_engine, select as sql_select2 from sqlalchemy.orm import Session as _Session from app.config import settings as _cfg from app.models.cad_file import CadFile as _CadFile _sync_url = _cfg.database_url.replace("+asyncpg", "") _eng = create_engine(_sync_url) with _Session(_eng) as _s: _cad = _s.get(_CadFile, cad_file_id) _tid = str(_cad.tenant_id) if _cad and _cad.tenant_id else None _eng.dispose() if _tid: from app.core.websocket import publish_event_sync publish_event_sync(_tid, { "type": "cad_processing_complete", "cad_file_id": cad_file_id, "status": "completed", }) except Exception: logger.debug("WebSocket publish for CAD complete skipped (non-fatal)") @celery_app.task(bind=True, name="app.tasks.step_tasks.generate_stl_cache", queue="thumbnail_rendering") def generate_stl_cache(self, cad_file_id: str, quality: str): """Generate and cache STL for a CAD file without triggering a full render.""" from sqlalchemy import create_engine from sqlalchemy.orm import Session from app.config import settings as app_settings from app.models.cad_file import CadFile logger.info(f"Generating {quality}-quality STL for CAD file: {cad_file_id}") sync_url = app_settings.database_url.replace("+asyncpg", "") eng = create_engine(sync_url) with Session(eng) as session: cad_file = session.get(CadFile, cad_file_id) if not cad_file or not cad_file.stored_path: logger.error(f"CAD file not found or no stored_path: {cad_file_id}") return step_path = cad_file.stored_path eng.dispose() try: from app.services.render_blender import convert_step_to_stl, export_per_part_stls from app.domains.products.cache_service import compute_step_hash, check_stl_cache, store_stl_cache from pathlib import Path as _Path step = _Path(step_path) stl_out = step.parent / f"{step.stem}_{quality}.stl" parts_dir = step.parent / f"{step.stem}_{quality}_parts" if not stl_out.exists() or stl_out.stat().st_size == 0: # Check MinIO cache before running cadquery conversion step_hash = compute_step_hash(step_path) cached_bytes = check_stl_cache(step_hash, quality) if cached_bytes: stl_out.write_bytes(cached_bytes) logger.info(f"STL cache hit for {cad_file_id} ({quality}), skipped conversion") else: convert_step_to_stl(step, stl_out, quality) # Store result in MinIO for future workers if stl_out.exists() and stl_out.stat().st_size > 0: store_stl_cache(step_hash, quality, str(stl_out)) if not (parts_dir / "manifest.json").exists(): try: export_per_part_stls(step, parts_dir, quality) except Exception as pe: logger.warning(f"Per-part STL export non-fatal: {pe}") logger.info(f"STL cached: {stl_out} ({stl_out.stat().st_size // 1024} KB)") except Exception as exc: logger.error(f"STL generation failed for {cad_file_id} quality={quality}: {exc}") raise self.retry(exc=exc, countdown=30, max_retries=2) @celery_app.task(bind=True, name="app.tasks.step_tasks.regenerate_thumbnail", queue="thumbnail_rendering") def regenerate_thumbnail(self, cad_file_id: str, part_colors: dict): """Regenerate thumbnail with per-part colours.""" logger.info(f"Regenerating thumbnail for CAD file: {cad_file_id}") try: from app.services.step_processor import regenerate_cad_thumbnail success = regenerate_cad_thumbnail(cad_file_id, part_colors) if not success: raise RuntimeError("regenerate_cad_thumbnail returned False") except Exception as exc: logger.error(f"Thumbnail regeneration failed for {cad_file_id}: {exc}") raise self.retry(exc=exc, countdown=30, max_retries=2) @celery_app.task(name="app.tasks.step_tasks.dispatch_order_line_render", queue="step_processing") def dispatch_order_line_render(order_line_id: str): """Route an order-line render to render_order_line_task.""" logger.info(f"Dispatching render for order line: {order_line_id}") render_order_line_task.delay(order_line_id) @celery_app.task(bind=True, name="app.tasks.step_tasks.render_order_line_task", queue="thumbnail_rendering", max_retries=3) def render_order_line_task(self, order_line_id: str): """Render a specific output type for an order line. Loads OrderLine → Product → CadFile → OutputType.render_settings. Merges with system render settings. Stores result at order_line.result_path. """ logger.info(f"Rendering order line: {order_line_id}") from app.services.render_log import emit emit(order_line_id, "Celery render task started") try: from sqlalchemy import create_engine, select, update as sql_update from sqlalchemy.orm import Session, joinedload from app.config import settings as app_settings # Use sync session for Celery (no async event loop) sync_url = app_settings.database_url.replace("+asyncpg", "") engine = create_engine(sync_url) with Session(engine) as session: from app.models.order_line import OrderLine from app.models.product import Product emit(order_line_id, "Loading order line from database") line = session.execute( select(OrderLine) .where(OrderLine.id == order_line_id) .options( joinedload(OrderLine.product).joinedload(Product.cad_file), joinedload(OrderLine.output_type), ) ).scalar_one_or_none() if line is None: emit(order_line_id, "Order line not found in database", "error") logger.error(f"OrderLine {order_line_id} not found") return if line.product.cad_file_id is None: emit(order_line_id, "Product has no CAD file — marking as failed", "error") logger.warning(f"OrderLine {order_line_id}: product has no CAD file") session.execute( sql_update(OrderLine) .where(OrderLine.id == line.id) .values(render_status="failed") ) session.commit() return # Mark as processing with timing from datetime import datetime render_start = datetime.utcnow() session.execute( sql_update(OrderLine) .where(OrderLine.id == line.id) .values( render_status="processing", render_backend_used="celery", render_started_at=render_start, ) ) session.commit() cad_file = line.product.cad_file materials_source = line.product.cad_part_materials part_colors = {} if cad_file and cad_file.parsed_objects: parsed_names = cad_file.parsed_objects.get("objects", []) if materials_source: from app.services.step_processor import build_part_colors part_colors = build_part_colors(parsed_names, materials_source) # Resolve render template + material library from app.services.template_service import resolve_template, get_material_library_path category_key = line.product.category_key if line.product else None ot_id = str(line.output_type_id) if line.output_type_id else None template = resolve_template(category_key=category_key, output_type_id=ot_id) material_library = get_material_library_path() # Build material_map (part_name → material_name) for material replacement. # Works with or without a render template — only suppressed if a # template explicitly has material_replace_enabled=False. material_map = None use_materials = bool(material_library and materials_source) if template and not template.material_replace_enabled: use_materials = False if use_materials: material_map = { m["part_name"]: m["material"] for m in materials_source if m.get("part_name") and m.get("material") } # Resolve raw material names to SCHAEFFLER library names via aliases from app.services.material_service import resolve_material_map material_map = resolve_material_map(material_map) if template: emit(order_line_id, f"Using render template: {template.name} (collection={template.target_collection}, material_replace={template.material_replace_enabled}, lighting_only={template.lighting_only})") logger.info(f"Render template resolved: '{template.name}' path={template.blend_file_path}, lighting_only={template.lighting_only}") else: emit(order_line_id, "No render template found — using factory settings (Mode A)") logger.info(f"No render template for category_key={category_key!r}, output_type_id={ot_id!r}") cad_name = cad_file.original_name if cad_file else "?" # Load render_position for rotation values rotation_x = rotation_y = rotation_z = 0.0 if line.render_position_id: from app.models.render_position import ProductRenderPosition rp = session.get(ProductRenderPosition, line.render_position_id) if rp: rotation_x, rotation_y, rotation_z = rp.rotation_x, rp.rotation_y, rp.rotation_z emit(order_line_id, f"Render position: '{rp.name}' ({rotation_x}°, {rotation_y}°, {rotation_z}°)") emit(order_line_id, f"Starting render for {cad_name} ({len(part_colors)} coloured parts)") # Determine if this is an animation output type is_animation = bool(line.output_type and getattr(line.output_type, 'is_animation', False)) # Determine output format/extension out_ext = "jpg" if line.output_type and line.output_type.output_format: fmt = line.output_type.output_format.lower() if fmt == "mp4": out_ext = "mp4" elif fmt in ("png", "jpg", "jpeg"): out_ext = "png" if fmt == "png" else "jpg" # Build meaningful output filename import re def _sanitize(s: str) -> str: return re.sub(r'[^\w\-.]', '_', s.strip())[:100] product_name = line.product.name or line.product.pim_id or "product" ot_name = line.output_type.name if line.output_type else "render" filename = f"{_sanitize(product_name)}_{_sanitize(ot_name)}.{out_ext}" # Render to per-line output directory from pathlib import Path as _Path render_dir = _Path(app_settings.upload_dir) / "renders" / order_line_id render_dir.mkdir(parents=True, exist_ok=True) output_path = str(render_dir / filename) # Extract per-output-type render settings render_width = None render_height = None render_engine = None render_samples = None noise_threshold = "" denoiser = "" denoising_input_passes = "" denoising_prefilter = "" denoising_quality = "" denoising_use_gpu = "" frame_count = 24 fps = 25 bg_color = "" turntable_axis = "world_z" if line.output_type and line.output_type.render_settings: rs = line.output_type.render_settings if rs.get("width"): render_width = int(rs["width"]) if rs.get("height"): render_height = int(rs["height"]) if rs.get("engine"): render_engine = rs["engine"] if rs.get("samples"): render_samples = int(rs["samples"]) if rs.get("frame_count"): frame_count = int(rs["frame_count"]) if rs.get("fps"): fps = int(rs["fps"]) bg_color = rs.get("bg_color", "") turntable_axis = rs.get("turntable_axis", "world_z") noise_threshold = str(rs.get("noise_threshold", "")) denoiser = str(rs.get("denoiser", "")) denoising_input_passes = str(rs.get("denoising_input_passes", "")) denoising_prefilter = str(rs.get("denoising_prefilter", "")) denoising_quality = str(rs.get("denoising_quality", "")) denoising_use_gpu = str(rs.get("denoising_use_gpu", "")) transparent_bg = bool(line.output_type and line.output_type.transparent_bg) cycles_device_val = (line.output_type.cycles_device or "auto") if line.output_type else "auto" # Build ordered part names list for index-based Blender matching part_names_ordered = None if cad_file and cad_file.parsed_objects: part_names_ordered = cad_file.parsed_objects.get("objects", []) or None tmpl_info = f" template={template.name}" if template else "" if is_animation: # ── Turntable animation path ──────────────────────────────── emit(order_line_id, f"Starting turntable render: {frame_count} frames @ {fps}fps, {render_width or 1920}x{render_height or 1920}{tmpl_info}") from app.services.render_blender import is_blender_available, render_turntable_to_file if not is_blender_available(): raise RuntimeError("Blender not available on this worker") from app.services.step_processor import _get_all_settings _sys = _get_all_settings() try: service_data = render_turntable_to_file( step_path=_Path(cad_file.stored_path), output_path=_Path(output_path), frame_count=frame_count, fps=fps, width=render_width or 1920, height=render_height or 1920, engine=render_engine or _sys.get("blender_engine", "cycles"), samples=render_samples or int(_sys.get(f"blender_{render_engine or _sys.get('blender_engine','cycles')}_samples", 128)), stl_quality=_sys.get("stl_quality", "low"), smooth_angle=int(_sys.get("blender_smooth_angle", 30)), cycles_device=cycles_device_val, transparent_bg=transparent_bg, bg_color=bg_color, turntable_axis=turntable_axis, part_colors=part_colors or None, template_path=template.blend_file_path if template else None, target_collection=template.target_collection if template else "Product", material_library_path=material_library if use_materials else None, material_map=material_map, part_names_ordered=part_names_ordered, lighting_only=bool(template.lighting_only) if template else False, shadow_catcher=bool(template.shadow_catcher_enabled) if template else False, rotation_x=rotation_x, rotation_y=rotation_y, rotation_z=rotation_z, ) success = True render_log = { "renderer": "blender", "type": "turntable", "format": "mp4", "engine": render_engine or _sys.get("blender_engine", "cycles"), "engine_used": service_data.get("engine_used", "cycles"), "samples": render_samples, "cycles_device": cycles_device_val, "width": render_width or 1920, "height": render_height or 1920, "frame_count": service_data.get("frame_count", frame_count), "fps": fps, "total_duration_s": service_data.get("total_duration_s"), "stl_duration_s": service_data.get("stl_duration_s"), "render_duration_s": service_data.get("render_duration_s"), "ffmpeg_duration_s": service_data.get("ffmpeg_duration_s"), "stl_size_bytes": service_data.get("stl_size_bytes"), "output_size_bytes": service_data.get("output_size_bytes"), "log_lines": service_data.get("log_lines", []), } if template: render_log["template"] = template.blend_file_path except Exception as exc: success = False render_log = {"renderer": "blender", "type": "turntable", "error": str(exc)[:500]} logger.error("Turntable render failed for %s: %s", order_line_id, exc) else: # ── Still image path ──────────────────────────────────────── emit(order_line_id, f"Calling renderer (STEP → STL → still) {render_width or 'default'}x{render_height or 'default'}{' [transparent]' if transparent_bg else ''}{f' engine={render_engine}' if render_engine else ''}{f' samples={render_samples}' if render_samples else ''}{tmpl_info}") from app.services.step_processor import render_to_file success, render_log = render_to_file( step_path=cad_file.stored_path, output_path=output_path, part_colors=part_colors, width=render_width, height=render_height, transparent_bg=transparent_bg, engine=render_engine, samples=render_samples, template_path=template.blend_file_path if template else None, target_collection=template.target_collection if template else "Product", material_library_path=material_library if use_materials else None, material_map=material_map, part_names_ordered=part_names_ordered, lighting_only=bool(template.lighting_only) if template else False, shadow_catcher=bool(template.shadow_catcher_enabled) if template else False, cycles_device=line.output_type.cycles_device if line.output_type else None, rotation_x=rotation_x, rotation_y=rotation_y, rotation_z=rotation_z, job_id=order_line_id, noise_threshold=noise_threshold, denoiser=denoiser, denoising_input_passes=denoising_input_passes, denoising_prefilter=denoising_prefilter, denoising_quality=denoising_quality, denoising_use_gpu=denoising_use_gpu, ) new_status = "completed" if success else "failed" render_end = datetime.utcnow() elapsed = (render_end - render_start).total_seconds() update_values = dict( render_status=new_status, render_completed_at=render_end, render_log=render_log, ) if success: update_values["result_path"] = output_path session.execute( sql_update(OrderLine) .where(OrderLine.id == line.id) .values(**update_values) ) session.commit() if success: emit(order_line_id, f"Render completed in {elapsed:.1f}s", "success") else: emit(order_line_id, f"Render failed after {elapsed:.1f}s", "error") # Broadcast WebSocket event for live UI updates try: from app.core.websocket import publish_event_sync _tenant_id = str(line.product.cad_file.tenant_id) if ( line.product and line.product.cad_file and line.product.cad_file.tenant_id ) else None if _tenant_id: publish_event_sync(_tenant_id, { "type": "render_complete" if success else "render_failed", "order_line_id": order_line_id, "order_id": str(line.order_id), "status": new_status, }) except Exception: logger.debug("WebSocket publish skipped (non-fatal)") # Notify order creator about render result try: from app.models.order import Order as OrderModel order_row = session.execute( select(OrderModel.created_by, OrderModel.order_number) .where(OrderModel.id == line.order_id) ).one_or_none() if order_row: from app.services.notification_service import emit_notification_sync details: dict = { "order_number": order_row[1], "product_name": product_name, "output_type": ot_name, } if not success and isinstance(render_log, dict): err = render_log.get("error") or render_log.get("stderr", "") if err: details["error"] = str(err)[:300] emit_notification_sync( actor_user_id=None, target_user_id=str(order_row[0]), action="render.completed" if success else "render.failed", entity_type="order", entity_id=str(line.order_id), details=details, ) except Exception: logger.exception("Failed to emit render notification") # Check if all lines for this order are done → auto-advance order_id_str = str(line.order_id) engine.dispose() from app.services.order_status_service import check_order_completion check_order_completion(order_id_str) except Exception as exc: logger.error(f"render_order_line_task failed for {order_line_id}: {exc}") # If retries exhausted, mark as failed so the line doesn't stay stuck if self.request.retries >= self.max_retries: logger.error(f"Max retries reached for {order_line_id}, marking as failed") try: from sqlalchemy import create_engine, update as sql_update2 from sqlalchemy.orm import Session as SyncSession from app.config import settings as app_settings from app.models.order_line import OrderLine as OL2 sync_url2 = app_settings.database_url.replace("+asyncpg", "") eng2 = create_engine(sync_url2) with SyncSession(eng2) as s2: from datetime import datetime as dt2 s2.execute( sql_update2(OL2).where(OL2.id == order_line_id) .values( render_status="failed", render_completed_at=dt2.utcnow(), render_log={"error": str(exc)[:500]}, ) ) s2.commit() eng2.dispose() from app.services.order_status_service import check_order_completion # Try to get order_id from DB eng3 = create_engine(sync_url2) with SyncSession(eng3) as s3: from sqlalchemy import select as sel row = s3.execute(sel(OL2.order_id).where(OL2.id == order_line_id)).scalar_one_or_none() if row: check_order_completion(str(row)) eng3.dispose() # Notify the order creator about the failure try: from sqlalchemy import select as sel2 from app.models.order import Order as OrderModel2 eng4 = create_engine(sync_url2) with SyncSession(eng4) as s4: order_row2 = s4.execute( sel2(OrderModel2.created_by, OrderModel2.order_number) .join(OL2, OL2.order_id == OrderModel2.id) .where(OL2.id == order_line_id) ).one_or_none() eng4.dispose() if order_row2: from app.services.notification_service import emit_notification_sync emit_notification_sync( actor_user_id=None, target_user_id=str(order_row2[0]), action="render.failed", entity_type="order", entity_id=None, details={ "order_number": order_row2[1], "product_name": "unknown", "output_type": "unknown", "error": str(exc)[:300], }, ) except Exception: logger.exception("Failed to emit render failure notification") except Exception: logger.exception(f"Failed to mark {order_line_id} as failed in DB") raise raise self.retry(exc=exc, countdown=60)