"""Product library API router.""" import hashlib import io import json import os import re import uuid import zipfile from pathlib import Path from fastapi import APIRouter, Depends, File, HTTPException, Query, UploadFile, status from fastapi.responses import StreamingResponse from pydantic import BaseModel from sqlalchemy import select, or_, text from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload, joinedload from app.config import settings from app.database import get_db from app.models.cad_file import CadFile, ProcessingStatus from app.models.material import Material from app.models.order import Order from app.models.order_line import OrderLine from app.models.output_type import OutputType from app.models.product import Product from app.models.render_position import ProductRenderPosition from app.models.system_setting import SystemSetting from app.schemas.order import OrderOut from app.schemas.product import ProductCreate, ProductOut, ProductPatch from app.schemas.render_position import RenderPositionCreate, RenderPositionPatch, RenderPositionOut from app.utils.auth import get_current_user, require_admin_or_pm from app.models.user import User router = APIRouter(prefix="/products", tags=["products"]) def _best_render_url(product: Product, priority: list[str]) -> str | None: """Walk the priority list and return the first available render URL. Each entry in priority is tried in order: "cad_thumbnail" — stop and return None (caller shows STEP thumbnail) "latest_render" — pick newest completed render regardless of output type — pick newest render of that specific output type Returns None if nothing is found (or "cad_thumbnail" is reached first). """ for source in priority: if source == "cad_thumbnail": return None # Signal to caller to show STEP thumbnail filter_ot_id: str | None = None if source == "latest_render" else source best = None best_time = None for line in product.order_lines: if line.render_status != "completed" or not line.result_path: continue if filter_ot_id is not None and str(line.output_type_id) != filter_ot_id: continue url = _result_path_to_url(line.result_path) if url and (best_time is None or (line.render_completed_at and line.render_completed_at > best_time)): disk = _resolve_disk_path(url) if disk and disk.exists(): best = url best_time = line.render_completed_at if best: return best # Found a match for this priority entry return None # Nothing found in the entire priority list def _product_out(product: Product, priority: list[str] | None = None) -> ProductOut: out = ProductOut.model_validate(product) out.thumbnail_url = product.thumbnail_url out.processing_status = product.processing_status out.cad_parsed_objects = product.cad_parsed_objects out.cad_mesh_attributes = product.cad_file.mesh_attributes if product.cad_file else None out.render_image_url = _best_render_url(product, priority or ["latest_render", "cad_thumbnail"]) out.stl_cached = _stl_cached_qualities(product) return out def _stl_cached_qualities(product: Product) -> list[str]: """Return list of STL qualities that are cached on disk for this product.""" from pathlib import Path as _Path cad = product.cad_file if not cad or not cad.stored_path: return [] step = _Path(cad.stored_path) return [q for q in ("low", "high") if (step.parent / f"{step.stem}_{q}.stl").exists()] async def _load_thumbnail_priority(db: AsyncSession) -> list[str]: """Read product_thumbnail_priority from system_settings. Falls back to ["latest_render", "cad_thumbnail"] (legacy behaviour). Also reads the old product_thumbnail_source key for backward compatibility. """ row = await db.execute( select(SystemSetting).where(SystemSetting.key == "product_thumbnail_priority") ) setting = row.scalar_one_or_none() if setting: try: parsed = json.loads(setting.value) if isinstance(parsed, list) and parsed: return parsed except (json.JSONDecodeError, TypeError): pass # Legacy fallback: read old product_thumbnail_source key legacy_row = await db.execute( select(SystemSetting).where(SystemSetting.key == "product_thumbnail_source") ) legacy = legacy_row.scalar_one_or_none() if legacy: src = legacy.value if src == "cad_thumbnail": return ["cad_thumbnail"] elif src == "latest_render": return ["latest_render", "cad_thumbnail"] else: return [src, "latest_render", "cad_thumbnail"] return ["latest_render", "cad_thumbnail"] @router.get("", response_model=list[ProductOut]) async def list_products( q: str = Query(""), category_key: str = Query(""), has_cad: bool | None = Query(None), ready_only: bool = Query(False), materials_filter: str = Query(""), # "complete" | "incomplete" | "" skip: int = Query(0, ge=0), limit: int = Query(50, ge=1, le=200), user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): stmt = ( select(Product) .options( selectinload(Product.cad_file), selectinload(Product.order_lines), selectinload(Product.render_positions), ) .where(Product.is_active.is_(True)) ) if q: pattern = f"%{q}%" stmt = stmt.where( or_(Product.pim_id.ilike(pattern), Product.name.ilike(pattern)) ) if category_key: stmt = stmt.where(Product.category_key == category_key) if ready_only: stmt = stmt.where(Product.cad_file_id.is_not(None)) elif has_cad is True: stmt = stmt.where(Product.cad_file_id.is_not(None)) elif has_cad is False: stmt = stmt.where(Product.cad_file_id.is_(None)) if materials_filter == "incomplete": # STEP processed, but cad_part_materials is empty or has at least one blank entry. stmt = stmt.join(CadFile, CadFile.id == Product.cad_file_id).where( CadFile.processing_status == ProcessingStatus.completed, text( "(" " jsonb_array_length(products.cad_part_materials) = 0" " OR EXISTS (" " SELECT 1 FROM jsonb_array_elements(products.cad_part_materials) AS m" " WHERE coalesce(m->>'material', '') = ''" " )" ")" ), ) elif materials_filter == "complete": # STEP processed, cad_part_materials non-empty, and every entry has a material. stmt = stmt.join(CadFile, CadFile.id == Product.cad_file_id).where( CadFile.processing_status == ProcessingStatus.completed, text( "(" " jsonb_array_length(products.cad_part_materials) > 0" " AND NOT EXISTS (" " SELECT 1 FROM jsonb_array_elements(products.cad_part_materials) AS m" " WHERE coalesce(m->>'material', '') = ''" " )" ")" ), ) stmt = stmt.order_by(Product.updated_at.desc()).offset(skip).limit(limit) result = await db.execute(stmt) products = result.scalars().all() priority = await _load_thumbnail_priority(db) return [_product_out(p, priority) for p in products] @router.post("", response_model=ProductOut, status_code=status.HTTP_201_CREATED) async def create_product( body: ProductCreate, user: User = Depends(require_admin_or_pm), db: AsyncSession = Depends(get_db), ): existing = await db.execute(select(Product).where(Product.pim_id == body.pim_id)) if existing.scalar_one_or_none(): raise HTTPException(409, detail=f"Product with pim_id '{body.pim_id}' already exists") from app.services.product_service import create_default_positions product = Product(**body.model_dump(), tenant_id=getattr(user, 'tenant_id', None)) db.add(product) await db.flush() await create_default_positions(db, product.id) await db.commit() result = await db.execute( select(Product) .options( selectinload(Product.cad_file), selectinload(Product.order_lines), selectinload(Product.render_positions), ) .where(Product.id == product.id) ) return _product_out(result.scalar_one()) @router.get("/{product_id}", response_model=ProductOut) async def get_product( product_id: uuid.UUID, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): result = await db.execute( select(Product) .options( selectinload(Product.cad_file), selectinload(Product.order_lines), selectinload(Product.render_positions), ) .where(Product.id == product_id) ) product = result.scalar_one_or_none() if not product: raise HTTPException(404, detail="Product not found") priority = await _load_thumbnail_priority(db) return _product_out(product, priority) @router.patch("/{product_id}", response_model=ProductOut) async def update_product( product_id: uuid.UUID, body: ProductPatch, user: User = Depends(require_admin_or_pm), db: AsyncSession = Depends(get_db), ): result = await db.execute( select(Product).options(selectinload(Product.cad_file)).where(Product.id == product_id) ) product = result.scalar_one_or_none() if not product: raise HTTPException(404, detail="Product not found") for field_name, value in body.model_dump(exclude_unset=True).items(): setattr(product, field_name, value) await db.commit() await db.refresh(product) return _product_out(product) @router.delete("/{product_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_product( product_id: uuid.UUID, hard: bool = Query(False, description="Hard delete (permanent) instead of soft delete"), user: User = Depends(require_admin_or_pm), db: AsyncSession = Depends(get_db), ): result = await db.execute(select(Product).where(Product.id == product_id)) product = result.scalar_one_or_none() if not product: raise HTTPException(404, detail="Product not found") if hard: from sqlalchemy import delete as sql_delete # Delete order_lines referencing this product await db.execute(sql_delete(OrderLine).where(OrderLine.product_id == product_id)) await db.delete(product) else: product.is_active = False await db.commit() @router.post("/{product_id}/cad", status_code=status.HTTP_201_CREATED) async def upload_product_cad( product_id: uuid.UUID, file: UploadFile = File(...), user: User = Depends(require_admin_or_pm), db: AsyncSession = Depends(get_db), ): """Upload or replace the STEP file for a product.""" suffix = Path(file.filename or "").suffix.lower() if suffix not in {".stp", ".step"}: raise HTTPException(400, detail="Only .stp / .step files are accepted") result = await db.execute( select(Product).options(selectinload(Product.cad_file)).where(Product.id == product_id) ) product = result.scalar_one_or_none() if not product: raise HTTPException(404, detail="Product not found") content = await file.read() file_hash = hashlib.sha256(content).hexdigest() # Dedup by hash existing_cad = await db.execute(select(CadFile).where(CadFile.file_hash == file_hash)) cad_file = existing_cad.scalar_one_or_none() if cad_file is None: step_dir = Path(settings.upload_dir) / "step_files" step_dir.mkdir(parents=True, exist_ok=True) stored_name = f"{uuid.uuid4()}{suffix}" stored_path = step_dir / stored_name stored_path.write_bytes(content) cad_file = CadFile( original_name=file.filename, stored_path=str(stored_path), file_hash=file_hash, file_size=len(content), processing_status=ProcessingStatus.pending, ) db.add(cad_file) await db.commit() await db.refresh(cad_file) try: from app.tasks.step_tasks import process_step_file process_step_file.delay(str(cad_file.id)) except Exception: pass # Link to product from app.services.product_service import link_cad_to_product product = await link_cad_to_product(db, product_id, cad_file.id) return { "cad_file_id": str(cad_file.id), "original_name": cad_file.original_name, "file_hash": file_hash, "status": "uploaded" if cad_file.processing_status == ProcessingStatus.pending else "already_exists", "product_id": str(product_id), } @router.post("/{product_id}/cad-materials", response_model=ProductOut) async def save_product_cad_materials( product_id: uuid.UUID, body: dict, user: User = Depends(require_admin_or_pm), db: AsyncSession = Depends(get_db), ): """Save cad_part_materials and enqueue thumbnail regeneration.""" result = await db.execute( select(Product) .options( selectinload(Product.cad_file), selectinload(Product.order_lines), selectinload(Product.render_positions), ) .where(Product.id == product_id) ) product = result.scalar_one_or_none() if not product: raise HTTPException(404, detail="Product not found") parts = body.get("parts", []) product.cad_part_materials = parts # Auto-add new material names to the materials library material_names = {p["material"].strip() for p in parts if p.get("material", "").strip()} if material_names: existing = await db.execute( select(Material).where( or_(*[Material.name.ilike(name) for name in material_names]) ) ) existing_names = {m.name.lower() for m in existing.scalars().all()} for name in material_names: if name.lower() not in existing_names: db.add(Material(name=name, source="product_assign", created_by=user.id)) await db.commit() if product.cad_file_id: try: from app.services.step_processor import build_part_colors from app.tasks.step_tasks import regenerate_thumbnail parsed_objects = product.cad_parsed_objects or [] part_colors = build_part_colors(parsed_objects, parts) regenerate_thumbnail.delay(str(product.cad_file_id), part_colors) except Exception: pass # Re-fetch with all relationships for _product_out result2 = await db.execute( select(Product) .options( selectinload(Product.cad_file), selectinload(Product.order_lines), selectinload(Product.render_positions), ) .where(Product.id == product_id) ) return _product_out(result2.scalar_one()) @router.post("/{product_id}/regenerate", status_code=status.HTTP_202_ACCEPTED) async def regenerate_product_thumbnail( product_id: uuid.UUID, user: User = Depends(require_admin_or_pm), db: AsyncSession = Depends(get_db), ): """Re-queue thumbnail generation with current part_colors.""" result = await db.execute( select(Product).options(selectinload(Product.cad_file)).where(Product.id == product_id) ) product = result.scalar_one_or_none() if not product: raise HTTPException(404, detail="Product not found") if not product.cad_file_id: raise HTTPException(400, detail="Product has no CAD file") try: from app.services.step_processor import build_part_colors from app.tasks.step_tasks import regenerate_thumbnail parsed_objects = product.cad_parsed_objects or [] part_colors = build_part_colors(parsed_objects, product.cad_part_materials or []) task = regenerate_thumbnail.delay(str(product.cad_file_id), part_colors) return {"status": "queued", "task_id": str(task.id)} except Exception as exc: raise HTTPException(500, detail=f"Failed to enqueue: {exc}") def _normalize_part_token_name(name: str) -> str: """Lowercase, strip .prt extension, normalise separators to underscore.""" import re as _re name = name.lower().strip() if name.endswith(".prt"): name = name[:-4] # Hyphens and dots → underscores for uniform token splitting return _re.sub(r"[-.]", "_", name) def _part_tokens(name: str) -> set[str]: """Return significant tokens: length ≥ 2, not pure-numeric, contains a letter.""" return { t for t in name.split("_") if len(t) >= 2 and not t.isdigit() and any(c.isalpha() for c in t) } def _jaccard(a: set, b: set) -> float: if not a or not b: return 0.0 return len(a & b) / len(a | b) def build_materials_from_excel( cad_parts: list[str], excel_components: list[dict], similarity_threshold: float = 0.3, ) -> list[dict]: """Match CAD part names to Excel components and return cad_part_materials list. Pure function — no DB access, sync-safe, callable from Celery tasks. Matching strategy per CAD part: 1. Exact case-insensitive name match 2. Token-based Jaccard similarity on normalised filenames 3. Position-based fallback for low-confidence matches """ excel_entries: list[tuple[set[str], str, str]] = [] for c in excel_components: raw = (c.get("part_name") or "").lower().strip() norm = _normalize_part_token_name(raw) tokens = _part_tokens(norm) excel_entries.append((tokens, raw, c.get("material") or "")) new_materials: list[dict] = [] for i, cad_part in enumerate(cad_parts): cad_raw_lower = cad_part.lower() cad_norm = _normalize_part_token_name(cad_raw_lower) cad_tokens = _part_tokens(cad_norm) best_mat = "" best_score = 0.0 for tokens, raw, material in excel_entries: if raw == cad_raw_lower: best_mat = material best_score = 1.0 break score = _jaccard(tokens, cad_tokens) if score > best_score: best_score = score best_mat = material if best_score < similarity_threshold: if i < len(excel_components): best_mat = excel_components[i].get("material") or "" new_materials.append({"part_name": cad_part, "material": best_mat}) return new_materials @router.post("/{product_id}/reassign-materials-from-excel", response_model=ProductOut) async def reassign_materials_from_excel( product_id: uuid.UUID, user: User = Depends(require_admin_or_pm), db: AsyncSession = Depends(get_db), ): """Populate cad_part_materials from Excel component data stored on this product. Matching strategy (applied per CAD part in order): 1. Exact case-insensitive name match (for generic semantic names like "Außenring") 2. Token-based Jaccard similarity on normalised part filenames: Excel stores the .prt filename; OCC extracts assembly instance names derived from the same file. Stripping extensions, separators and numeric-only tokens lets them be compared reliably (e.g. "z-563681_krk_tr_jpb_dummy-90771.prt" ↔ "Z-563681_KRK_JPB_DUMMY_1_AF0_1" → Jaccard ≈ 0.6). 3. Position-based fallback for low-confidence matches. After this the Part Materials UI shows pre-filled materials that can be reviewed/adjusted before saving. Thumbnail regeneration is queued automatically. """ result = await db.execute( select(Product) .options(selectinload(Product.cad_file), selectinload(Product.order_lines)) .where(Product.id == product_id) ) product = result.scalar_one_or_none() if not product: raise HTTPException(404, detail="Product not found") cad_parts: list[str] = product.cad_parsed_objects or [] if not cad_parts: raise HTTPException( 400, detail="No parsed CAD parts found. Use 'Re-process STEP' first to extract part names.", ) excel_components: list[dict] = product.components or [] if not excel_components: raise HTTPException( 400, detail="No Excel component data found on this product. Was it imported from an Excel file?", ) new_materials = build_materials_from_excel(cad_parts, excel_components) product.cad_part_materials = new_materials await db.commit() if product.cad_file_id: try: from app.services.step_processor import build_part_colors from app.tasks.step_tasks import regenerate_thumbnail part_colors = build_part_colors(cad_parts, new_materials) regenerate_thumbnail.delay(str(product.cad_file_id), part_colors) except Exception: pass result2 = await db.execute( select(Product) .options( selectinload(Product.cad_file), selectinload(Product.order_lines), selectinload(Product.render_positions), ) .where(Product.id == product_id) ) return _product_out(result2.scalar_one()) @router.post("/{product_id}/reprocess", status_code=status.HTTP_202_ACCEPTED) async def reprocess_product_cad( product_id: uuid.UUID, user: User = Depends(require_admin_or_pm), db: AsyncSession = Depends(get_db), ): """Re-queue full STEP processing (parse objects + generate thumbnail) for a product.""" result = await db.execute( select(Product).options(selectinload(Product.cad_file)).where(Product.id == product_id) ) product = result.scalar_one_or_none() if not product: raise HTTPException(404, detail="Product not found") if not product.cad_file_id: raise HTTPException(400, detail="Product has no CAD file") try: from app.models.cad_file import ProcessingStatus as PS from sqlalchemy import update as sql_update await db.execute( sql_update(CadFile) .where(CadFile.id == product.cad_file_id) .values(processing_status=PS.pending, parsed_objects=None) ) await db.commit() from app.tasks.step_tasks import process_step_file task = process_step_file.delay(str(product.cad_file_id)) return {"status": "queued", "task_id": str(task.id)} except Exception as exc: raise HTTPException(500, detail=f"Failed to enqueue: {exc}") VIDEO_EXTENSIONS = {".mp4", ".webm", ".avi", ".mov"} def _result_path_to_url(result_path: str) -> str | None: """Convert an internal result_path to a servable static URL.""" # Flamenco / shared renders: /shared/renders/X/file.jpg → /renders/X/file.jpg if "/renders/" in result_path: idx = result_path.index("/renders/") return result_path[idx:] # Celery renders stored as thumbnails: /app/uploads/thumbnails/X.png → /thumbnails/X.png if "/thumbnails/" in result_path: idx = result_path.index("/thumbnails/") return result_path[idx:] return None def _resolve_disk_path(url: str) -> Path | None: """Given a servable URL like /renders/X/file.jpg, resolve to disk path.""" if url.startswith("/renders/"): return Path(settings.upload_dir) / "renders" / url[len("/renders/"):] if url.startswith("/thumbnails/"): return Path(settings.upload_dir) / "thumbnails" / url[len("/thumbnails/"):] return None @router.get("/{product_id}/renders") async def get_product_renders( product_id: uuid.UUID, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """List completed render outputs for a product.""" result = await db.execute( select(OrderLine) .options( joinedload(OrderLine.output_type), joinedload(OrderLine.order), ) .where( OrderLine.product_id == product_id, OrderLine.render_status == "completed", OrderLine.result_path.is_not(None), ) .order_by(OrderLine.render_completed_at.desc()) ) lines = result.unique().scalars().all() renders = [] for line in lines: url = _result_path_to_url(line.result_path) if url is None: continue disk = _resolve_disk_path(url) if disk is None or not disk.exists(): continue ext = Path(url).suffix.lower() renders.append({ "order_line_id": str(line.id), "order_number": line.order.order_number if line.order else None, "output_type_name": line.output_type.name if line.output_type else None, "render_url": url, "is_video": ext in VIDEO_EXTENSIONS, "render_backend": line.render_backend_used, "completed_at": line.render_completed_at.isoformat() if line.render_completed_at else None, }) return renders @router.delete("/{product_id}/renders/{order_line_id}", status_code=204) async def delete_product_render( product_id: uuid.UUID, order_line_id: uuid.UUID, user: User = Depends(require_admin_or_pm), db: AsyncSession = Depends(get_db), ): """Delete a render output for a product. Removes the file from disk, clears result_path, and resets render_status to 'pending' so the line can be re-dispatched if needed. """ from sqlalchemy import update as sql_update result = await db.execute( select(OrderLine).where( OrderLine.id == order_line_id, OrderLine.product_id == product_id, ) ) line = result.scalar_one_or_none() if line is None: raise HTTPException(404, detail="Render not found for this product") # Delete file from disk if line.result_path: url = _result_path_to_url(line.result_path) if url: disk = _resolve_disk_path(url) if disk and disk.exists(): try: disk.unlink() except OSError as exc: # Log but don't fail — DB cleanup still proceeds import logging logging.getLogger(__name__).warning( f"Could not delete render file {disk}: {exc}" ) await db.execute( sql_update(OrderLine) .where(OrderLine.id == order_line_id) .values(result_path=None, render_status="pending", render_completed_at=None) ) await db.commit() class DownloadRendersRequest(BaseModel): order_line_ids: list[uuid.UUID] @router.post("/{product_id}/download-renders") async def download_product_renders( product_id: uuid.UUID, body: DownloadRendersRequest, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Stream a ZIP of selected completed render files for a product.""" prod_result = await db.execute(select(Product).where(Product.id == product_id)) product = prod_result.scalar_one_or_none() if not product: raise HTTPException(404, detail="Product not found") lines_result = await db.execute( select(OrderLine) .options( joinedload(OrderLine.output_type), joinedload(OrderLine.order), ) .where( OrderLine.id.in_(body.order_line_ids), OrderLine.product_id == product_id, OrderLine.render_status == "completed", OrderLine.result_path.is_not(None), ) ) lines = lines_result.unique().scalars().all() if not lines: raise HTTPException(404, detail="No completed renders found for the selected lines") def _resolve_path(p: str) -> str: if p.startswith("/shared/"): return settings.upload_dir + p[len("/shared"):] return p def _safe(s: str) -> str: return re.sub(r"[^\w\-.]", "_", s).strip("_") buf = io.BytesIO() name_counts: dict[str, int] = {} with zipfile.ZipFile(buf, mode="w", compression=zipfile.ZIP_DEFLATED) as zf: for line in lines: if not line.result_path: continue fs_path = _resolve_path(line.result_path) if not os.path.isfile(fs_path): continue ot_name = (line.output_type.name if line.output_type else None) or "render" order_num = (line.order.order_number if line.order else None) or "unknown" ext = os.path.splitext(line.result_path)[1] or ".png" base_name = f"{_safe(ot_name)}_{_safe(order_num)}{ext}" if base_name in name_counts: name_counts[base_name] += 1 stem, suffix = os.path.splitext(base_name) archive_name = f"{stem}_{name_counts[base_name]}{suffix}" else: name_counts[base_name] = 0 archive_name = base_name zf.write(fs_path, archive_name) if not zf.infolist(): raise HTTPException(404, detail="No render files found on disk") buf.seek(0) product_name = product.name or product.pim_id or "product" safe_name = re.sub(r"[^\w\-]", "_", product_name) filename = f"{safe_name}_renders.zip" return StreamingResponse( buf, media_type="application/zip", headers={"Content-Disposition": f'attachment; filename="{filename}"'}, ) @router.get("/{product_id}/orders", response_model=list[OrderOut]) async def get_product_orders( product_id: uuid.UUID, user: User = Depends(require_admin_or_pm), db: AsyncSession = Depends(get_db), ): """List orders that reference this product via order_lines.""" from app.models.order import Order from sqlalchemy import func result = await db.execute( select(Order) .join(OrderLine, OrderLine.order_id == Order.id) .where(OrderLine.product_id == product_id) .distinct() .order_by(Order.created_at.desc()) ) orders = result.scalars().all() out = [] for order in orders: d = OrderOut.model_validate(order) cnt = await db.execute( select(func.count(OrderLine.id)).where(OrderLine.order_id == order.id) ) d.line_count = cnt.scalar() or 0 out.append(d) return out # ── Render Positions CRUD ──────────────────────────────────────────────────── @router.get("/{product_id}/render-positions", response_model=list[RenderPositionOut]) async def list_render_positions( product_id: uuid.UUID, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): result = await db.execute( select(ProductRenderPosition) .where(ProductRenderPosition.product_id == product_id) .order_by(ProductRenderPosition.sort_order, ProductRenderPosition.name) ) return result.scalars().all() @router.post( "/{product_id}/render-positions", response_model=RenderPositionOut, status_code=status.HTTP_201_CREATED, ) async def create_render_position( product_id: uuid.UUID, body: RenderPositionCreate, user: User = Depends(require_admin_or_pm), db: AsyncSession = Depends(get_db), ): prod = await db.execute(select(Product).where(Product.id == product_id)) if not prod.scalar_one_or_none(): raise HTTPException(404, detail="Product not found") pos = ProductRenderPosition(product_id=product_id, **body.model_dump()) db.add(pos) try: await db.commit() except Exception: await db.rollback() raise HTTPException(409, detail=f"Position named '{body.name}' already exists for this product") await db.refresh(pos) return pos @router.patch("/{product_id}/render-positions/{pos_id}", response_model=RenderPositionOut) async def update_render_position( product_id: uuid.UUID, pos_id: uuid.UUID, body: RenderPositionPatch, user: User = Depends(require_admin_or_pm), db: AsyncSession = Depends(get_db), ): result = await db.execute( select(ProductRenderPosition).where( ProductRenderPosition.id == pos_id, ProductRenderPosition.product_id == product_id, ) ) pos = result.scalar_one_or_none() if not pos: raise HTTPException(404, detail="Render position not found") for field, value in body.model_dump(exclude_unset=True).items(): setattr(pos, field, value) try: await db.commit() except Exception: await db.rollback() raise HTTPException(409, detail="Name already exists for this product") await db.refresh(pos) return pos @router.delete("/{product_id}/render-positions/{pos_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_render_position( product_id: uuid.UUID, pos_id: uuid.UUID, user: User = Depends(require_admin_or_pm), db: AsyncSession = Depends(get_db), ): result = await db.execute( select(ProductRenderPosition).where( ProductRenderPosition.id == pos_id, ProductRenderPosition.product_id == product_id, ) ) pos = result.scalar_one_or_none() if not pos: raise HTTPException(404, detail="Render position not found") await db.delete(pos) await db.commit()