Files
HartOMat/backend/app/api/routers/products.py
T
2026-03-05 22:12:38 +01:00

932 lines
33 KiB
Python

"""Product library API router."""
import hashlib
import io
import json
import os
import re
import uuid
import zipfile
from pathlib import Path
from fastapi import APIRouter, Depends, File, HTTPException, Query, UploadFile, status
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from sqlalchemy import select, or_, text
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload, joinedload
from app.config import settings
from app.database import get_db
from app.models.cad_file import CadFile, ProcessingStatus
from app.models.material import Material
from app.models.order import Order
from app.models.order_line import OrderLine
from app.models.output_type import OutputType
from app.models.product import Product
from app.models.render_position import ProductRenderPosition
from app.models.system_setting import SystemSetting
from app.schemas.order import OrderOut
from app.schemas.product import ProductCreate, ProductOut, ProductPatch
from app.schemas.render_position import RenderPositionCreate, RenderPositionPatch, RenderPositionOut
from app.utils.auth import get_current_user, require_admin_or_pm
from app.models.user import User
router = APIRouter(prefix="/products", tags=["products"])
def _best_render_url(product: Product, priority: list[str]) -> str | None:
"""Walk the priority list and return the first available render URL.
Each entry in priority is tried in order:
"cad_thumbnail" — stop and return None (caller shows STEP thumbnail)
"latest_render" — pick newest completed render regardless of output type
<UUID string> — pick newest render of that specific output type
Returns None if nothing is found (or "cad_thumbnail" is reached first).
"""
for source in priority:
if source == "cad_thumbnail":
return None # Signal to caller to show STEP thumbnail
filter_ot_id: str | None = None if source == "latest_render" else source
best = None
best_time = None
for line in product.order_lines:
if line.render_status != "completed" or not line.result_path:
continue
if filter_ot_id is not None and str(line.output_type_id) != filter_ot_id:
continue
url = _result_path_to_url(line.result_path)
if url and (best_time is None or (line.render_completed_at and line.render_completed_at > best_time)):
disk = _resolve_disk_path(url)
if disk and disk.exists():
best = url
best_time = line.render_completed_at
if best:
return best # Found a match for this priority entry
return None # Nothing found in the entire priority list
def _product_out(product: Product, priority: list[str] | None = None) -> ProductOut:
out = ProductOut.model_validate(product)
out.thumbnail_url = product.thumbnail_url
out.processing_status = product.processing_status
out.cad_parsed_objects = product.cad_parsed_objects
out.render_image_url = _best_render_url(product, priority or ["latest_render", "cad_thumbnail"])
out.stl_cached = _stl_cached_qualities(product)
return out
def _stl_cached_qualities(product: Product) -> list[str]:
"""Return list of STL qualities that are cached on disk for this product."""
from pathlib import Path as _Path
cad = product.cad_file
if not cad or not cad.stored_path:
return []
step = _Path(cad.stored_path)
return [q for q in ("low", "high") if (step.parent / f"{step.stem}_{q}.stl").exists()]
async def _load_thumbnail_priority(db: AsyncSession) -> list[str]:
"""Read product_thumbnail_priority from system_settings.
Falls back to ["latest_render", "cad_thumbnail"] (legacy behaviour).
Also reads the old product_thumbnail_source key for backward compatibility.
"""
row = await db.execute(
select(SystemSetting).where(SystemSetting.key == "product_thumbnail_priority")
)
setting = row.scalar_one_or_none()
if setting:
try:
parsed = json.loads(setting.value)
if isinstance(parsed, list) and parsed:
return parsed
except (json.JSONDecodeError, TypeError):
pass
# Legacy fallback: read old product_thumbnail_source key
legacy_row = await db.execute(
select(SystemSetting).where(SystemSetting.key == "product_thumbnail_source")
)
legacy = legacy_row.scalar_one_or_none()
if legacy:
src = legacy.value
if src == "cad_thumbnail":
return ["cad_thumbnail"]
elif src == "latest_render":
return ["latest_render", "cad_thumbnail"]
else:
return [src, "latest_render", "cad_thumbnail"]
return ["latest_render", "cad_thumbnail"]
@router.get("", response_model=list[ProductOut])
async def list_products(
q: str = Query(""),
category_key: str = Query(""),
has_cad: bool | None = Query(None),
ready_only: bool = Query(False),
materials_filter: str = Query(""), # "complete" | "incomplete" | ""
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
stmt = (
select(Product)
.options(
selectinload(Product.cad_file),
selectinload(Product.order_lines),
selectinload(Product.render_positions),
)
.where(Product.is_active.is_(True))
)
if q:
pattern = f"%{q}%"
stmt = stmt.where(
or_(Product.pim_id.ilike(pattern), Product.name.ilike(pattern))
)
if category_key:
stmt = stmt.where(Product.category_key == category_key)
if ready_only:
stmt = stmt.where(Product.cad_file_id.is_not(None))
elif has_cad is True:
stmt = stmt.where(Product.cad_file_id.is_not(None))
elif has_cad is False:
stmt = stmt.where(Product.cad_file_id.is_(None))
if materials_filter == "incomplete":
# STEP processed, but cad_part_materials is empty or has at least one blank entry.
stmt = stmt.join(CadFile, CadFile.id == Product.cad_file_id).where(
CadFile.processing_status == ProcessingStatus.completed,
text(
"("
" jsonb_array_length(products.cad_part_materials) = 0"
" OR EXISTS ("
" SELECT 1 FROM jsonb_array_elements(products.cad_part_materials) AS m"
" WHERE coalesce(m->>'material', '') = ''"
" )"
")"
),
)
elif materials_filter == "complete":
# STEP processed, cad_part_materials non-empty, and every entry has a material.
stmt = stmt.join(CadFile, CadFile.id == Product.cad_file_id).where(
CadFile.processing_status == ProcessingStatus.completed,
text(
"("
" jsonb_array_length(products.cad_part_materials) > 0"
" AND NOT EXISTS ("
" SELECT 1 FROM jsonb_array_elements(products.cad_part_materials) AS m"
" WHERE coalesce(m->>'material', '') = ''"
" )"
")"
),
)
stmt = stmt.order_by(Product.updated_at.desc()).offset(skip).limit(limit)
result = await db.execute(stmt)
products = result.scalars().all()
priority = await _load_thumbnail_priority(db)
return [_product_out(p, priority) for p in products]
@router.post("", response_model=ProductOut, status_code=status.HTTP_201_CREATED)
async def create_product(
body: ProductCreate,
user: User = Depends(require_admin_or_pm),
db: AsyncSession = Depends(get_db),
):
existing = await db.execute(select(Product).where(Product.pim_id == body.pim_id))
if existing.scalar_one_or_none():
raise HTTPException(409, detail=f"Product with pim_id '{body.pim_id}' already exists")
from app.services.product_service import create_default_positions
product = Product(**body.model_dump())
db.add(product)
await db.flush()
await create_default_positions(db, product.id)
await db.commit()
result = await db.execute(
select(Product)
.options(
selectinload(Product.cad_file),
selectinload(Product.order_lines),
selectinload(Product.render_positions),
)
.where(Product.id == product.id)
)
return _product_out(result.scalar_one())
@router.get("/{product_id}", response_model=ProductOut)
async def get_product(
product_id: uuid.UUID,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
select(Product)
.options(
selectinload(Product.cad_file),
selectinload(Product.order_lines),
selectinload(Product.render_positions),
)
.where(Product.id == product_id)
)
product = result.scalar_one_or_none()
if not product:
raise HTTPException(404, detail="Product not found")
priority = await _load_thumbnail_priority(db)
return _product_out(product, priority)
@router.patch("/{product_id}", response_model=ProductOut)
async def update_product(
product_id: uuid.UUID,
body: ProductPatch,
user: User = Depends(require_admin_or_pm),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
select(Product).options(selectinload(Product.cad_file)).where(Product.id == product_id)
)
product = result.scalar_one_or_none()
if not product:
raise HTTPException(404, detail="Product not found")
for field_name, value in body.model_dump(exclude_unset=True).items():
setattr(product, field_name, value)
await db.commit()
await db.refresh(product)
return _product_out(product)
@router.delete("/{product_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_product(
product_id: uuid.UUID,
hard: bool = Query(False, description="Hard delete (permanent) instead of soft delete"),
user: User = Depends(require_admin_or_pm),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(select(Product).where(Product.id == product_id))
product = result.scalar_one_or_none()
if not product:
raise HTTPException(404, detail="Product not found")
if hard:
from sqlalchemy import delete as sql_delete
# Delete order_lines referencing this product
await db.execute(sql_delete(OrderLine).where(OrderLine.product_id == product_id))
await db.delete(product)
else:
product.is_active = False
await db.commit()
@router.post("/{product_id}/cad", status_code=status.HTTP_201_CREATED)
async def upload_product_cad(
product_id: uuid.UUID,
file: UploadFile = File(...),
user: User = Depends(require_admin_or_pm),
db: AsyncSession = Depends(get_db),
):
"""Upload or replace the STEP file for a product."""
suffix = Path(file.filename or "").suffix.lower()
if suffix not in {".stp", ".step"}:
raise HTTPException(400, detail="Only .stp / .step files are accepted")
result = await db.execute(
select(Product).options(selectinload(Product.cad_file)).where(Product.id == product_id)
)
product = result.scalar_one_or_none()
if not product:
raise HTTPException(404, detail="Product not found")
content = await file.read()
file_hash = hashlib.sha256(content).hexdigest()
# Dedup by hash
existing_cad = await db.execute(select(CadFile).where(CadFile.file_hash == file_hash))
cad_file = existing_cad.scalar_one_or_none()
if cad_file is None:
step_dir = Path(settings.upload_dir) / "step_files"
step_dir.mkdir(parents=True, exist_ok=True)
stored_name = f"{uuid.uuid4()}{suffix}"
stored_path = step_dir / stored_name
stored_path.write_bytes(content)
cad_file = CadFile(
original_name=file.filename,
stored_path=str(stored_path),
file_hash=file_hash,
file_size=len(content),
processing_status=ProcessingStatus.pending,
)
db.add(cad_file)
await db.commit()
await db.refresh(cad_file)
try:
from app.tasks.step_tasks import process_step_file
process_step_file.delay(str(cad_file.id))
except Exception:
pass
# Link to product
from app.services.product_service import link_cad_to_product
product = await link_cad_to_product(db, product_id, cad_file.id)
return {
"cad_file_id": str(cad_file.id),
"original_name": cad_file.original_name,
"file_hash": file_hash,
"status": "uploaded" if cad_file.processing_status == ProcessingStatus.pending else "already_exists",
"product_id": str(product_id),
}
@router.post("/{product_id}/cad-materials", response_model=ProductOut)
async def save_product_cad_materials(
product_id: uuid.UUID,
body: dict,
user: User = Depends(require_admin_or_pm),
db: AsyncSession = Depends(get_db),
):
"""Save cad_part_materials and enqueue thumbnail regeneration."""
result = await db.execute(
select(Product)
.options(
selectinload(Product.cad_file),
selectinload(Product.order_lines),
selectinload(Product.render_positions),
)
.where(Product.id == product_id)
)
product = result.scalar_one_or_none()
if not product:
raise HTTPException(404, detail="Product not found")
parts = body.get("parts", [])
product.cad_part_materials = parts
# Auto-add new material names to the materials library
material_names = {p["material"].strip() for p in parts if p.get("material", "").strip()}
if material_names:
existing = await db.execute(
select(Material).where(
or_(*[Material.name.ilike(name) for name in material_names])
)
)
existing_names = {m.name.lower() for m in existing.scalars().all()}
for name in material_names:
if name.lower() not in existing_names:
db.add(Material(name=name, source="product_assign", created_by=user.id))
await db.commit()
if product.cad_file_id:
try:
from app.services.step_processor import build_part_colors
from app.tasks.step_tasks import regenerate_thumbnail
parsed_objects = product.cad_parsed_objects or []
part_colors = build_part_colors(parsed_objects, parts)
regenerate_thumbnail.delay(str(product.cad_file_id), part_colors)
except Exception:
pass
# Re-fetch with all relationships for _product_out
result2 = await db.execute(
select(Product)
.options(
selectinload(Product.cad_file),
selectinload(Product.order_lines),
selectinload(Product.render_positions),
)
.where(Product.id == product_id)
)
return _product_out(result2.scalar_one())
@router.post("/{product_id}/regenerate", status_code=status.HTTP_202_ACCEPTED)
async def regenerate_product_thumbnail(
product_id: uuid.UUID,
user: User = Depends(require_admin_or_pm),
db: AsyncSession = Depends(get_db),
):
"""Re-queue thumbnail generation with current part_colors."""
result = await db.execute(
select(Product).options(selectinload(Product.cad_file)).where(Product.id == product_id)
)
product = result.scalar_one_or_none()
if not product:
raise HTTPException(404, detail="Product not found")
if not product.cad_file_id:
raise HTTPException(400, detail="Product has no CAD file")
try:
from app.services.step_processor import build_part_colors
from app.tasks.step_tasks import regenerate_thumbnail
parsed_objects = product.cad_parsed_objects or []
part_colors = build_part_colors(parsed_objects, product.cad_part_materials or [])
task = regenerate_thumbnail.delay(str(product.cad_file_id), part_colors)
return {"status": "queued", "task_id": str(task.id)}
except Exception as exc:
raise HTTPException(500, detail=f"Failed to enqueue: {exc}")
def _normalize_part_token_name(name: str) -> str:
"""Lowercase, strip .prt extension, normalise separators to underscore."""
import re as _re
name = name.lower().strip()
if name.endswith(".prt"):
name = name[:-4]
# Hyphens and dots → underscores for uniform token splitting
return _re.sub(r"[-.]", "_", name)
def _part_tokens(name: str) -> set[str]:
"""Return significant tokens: length ≥ 2, not pure-numeric, contains a letter."""
return {
t for t in name.split("_")
if len(t) >= 2 and not t.isdigit() and any(c.isalpha() for c in t)
}
def _jaccard(a: set, b: set) -> float:
if not a or not b:
return 0.0
return len(a & b) / len(a | b)
def build_materials_from_excel(
cad_parts: list[str],
excel_components: list[dict],
similarity_threshold: float = 0.3,
) -> list[dict]:
"""Match CAD part names to Excel components and return cad_part_materials list.
Pure function — no DB access, sync-safe, callable from Celery tasks.
Matching strategy per CAD part:
1. Exact case-insensitive name match
2. Token-based Jaccard similarity on normalised filenames
3. Position-based fallback for low-confidence matches
"""
excel_entries: list[tuple[set[str], str, str]] = []
for c in excel_components:
raw = (c.get("part_name") or "").lower().strip()
norm = _normalize_part_token_name(raw)
tokens = _part_tokens(norm)
excel_entries.append((tokens, raw, c.get("material") or ""))
new_materials: list[dict] = []
for i, cad_part in enumerate(cad_parts):
cad_raw_lower = cad_part.lower()
cad_norm = _normalize_part_token_name(cad_raw_lower)
cad_tokens = _part_tokens(cad_norm)
best_mat = ""
best_score = 0.0
for tokens, raw, material in excel_entries:
if raw == cad_raw_lower:
best_mat = material
best_score = 1.0
break
score = _jaccard(tokens, cad_tokens)
if score > best_score:
best_score = score
best_mat = material
if best_score < similarity_threshold:
if i < len(excel_components):
best_mat = excel_components[i].get("material") or ""
new_materials.append({"part_name": cad_part, "material": best_mat})
return new_materials
@router.post("/{product_id}/reassign-materials-from-excel", response_model=ProductOut)
async def reassign_materials_from_excel(
product_id: uuid.UUID,
user: User = Depends(require_admin_or_pm),
db: AsyncSession = Depends(get_db),
):
"""Populate cad_part_materials from Excel component data stored on this product.
Matching strategy (applied per CAD part in order):
1. Exact case-insensitive name match (for generic semantic names like "Außenring")
2. Token-based Jaccard similarity on normalised part filenames:
Excel stores the .prt filename; OCC extracts assembly instance names derived
from the same file. Stripping extensions, separators and numeric-only tokens
lets them be compared reliably (e.g. "z-563681_krk_tr_jpb_dummy-90771.prt"
"Z-563681_KRK_JPB_DUMMY_1_AF0_1" → Jaccard ≈ 0.6).
3. Position-based fallback for low-confidence matches.
After this the Part Materials UI shows pre-filled materials that can be
reviewed/adjusted before saving. Thumbnail regeneration is queued automatically.
"""
result = await db.execute(
select(Product)
.options(selectinload(Product.cad_file), selectinload(Product.order_lines))
.where(Product.id == product_id)
)
product = result.scalar_one_or_none()
if not product:
raise HTTPException(404, detail="Product not found")
cad_parts: list[str] = product.cad_parsed_objects or []
if not cad_parts:
raise HTTPException(
400,
detail="No parsed CAD parts found. Use 'Re-process STEP' first to extract part names.",
)
excel_components: list[dict] = product.components or []
if not excel_components:
raise HTTPException(
400,
detail="No Excel component data found on this product. Was it imported from an Excel file?",
)
new_materials = build_materials_from_excel(cad_parts, excel_components)
product.cad_part_materials = new_materials
await db.commit()
if product.cad_file_id:
try:
from app.services.step_processor import build_part_colors
from app.tasks.step_tasks import regenerate_thumbnail
part_colors = build_part_colors(cad_parts, new_materials)
regenerate_thumbnail.delay(str(product.cad_file_id), part_colors)
except Exception:
pass
result2 = await db.execute(
select(Product)
.options(
selectinload(Product.cad_file),
selectinload(Product.order_lines),
selectinload(Product.render_positions),
)
.where(Product.id == product_id)
)
return _product_out(result2.scalar_one())
@router.post("/{product_id}/reprocess", status_code=status.HTTP_202_ACCEPTED)
async def reprocess_product_cad(
product_id: uuid.UUID,
user: User = Depends(require_admin_or_pm),
db: AsyncSession = Depends(get_db),
):
"""Re-queue full STEP processing (parse objects + generate thumbnail) for a product."""
result = await db.execute(
select(Product).options(selectinload(Product.cad_file)).where(Product.id == product_id)
)
product = result.scalar_one_or_none()
if not product:
raise HTTPException(404, detail="Product not found")
if not product.cad_file_id:
raise HTTPException(400, detail="Product has no CAD file")
try:
from app.models.cad_file import ProcessingStatus as PS
from sqlalchemy import update as sql_update
await db.execute(
sql_update(CadFile)
.where(CadFile.id == product.cad_file_id)
.values(processing_status=PS.pending, parsed_objects=None)
)
await db.commit()
from app.tasks.step_tasks import process_step_file
task = process_step_file.delay(str(product.cad_file_id))
return {"status": "queued", "task_id": str(task.id)}
except Exception as exc:
raise HTTPException(500, detail=f"Failed to enqueue: {exc}")
VIDEO_EXTENSIONS = {".mp4", ".webm", ".avi", ".mov"}
def _result_path_to_url(result_path: str) -> str | None:
"""Convert an internal result_path to a servable static URL."""
# Flamenco / shared renders: /shared/renders/X/file.jpg → /renders/X/file.jpg
if "/renders/" in result_path:
idx = result_path.index("/renders/")
return result_path[idx:]
# Celery renders stored as thumbnails: /app/uploads/thumbnails/X.png → /thumbnails/X.png
if "/thumbnails/" in result_path:
idx = result_path.index("/thumbnails/")
return result_path[idx:]
return None
def _resolve_disk_path(url: str) -> Path | None:
"""Given a servable URL like /renders/X/file.jpg, resolve to disk path."""
if url.startswith("/renders/"):
return Path(settings.upload_dir) / "renders" / url[len("/renders/"):]
if url.startswith("/thumbnails/"):
return Path(settings.upload_dir) / "thumbnails" / url[len("/thumbnails/"):]
return None
@router.get("/{product_id}/renders")
async def get_product_renders(
product_id: uuid.UUID,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""List completed render outputs for a product."""
result = await db.execute(
select(OrderLine)
.options(
joinedload(OrderLine.output_type),
joinedload(OrderLine.order),
)
.where(
OrderLine.product_id == product_id,
OrderLine.render_status == "completed",
OrderLine.result_path.is_not(None),
)
.order_by(OrderLine.render_completed_at.desc())
)
lines = result.unique().scalars().all()
renders = []
for line in lines:
url = _result_path_to_url(line.result_path)
if url is None:
continue
disk = _resolve_disk_path(url)
if disk is None or not disk.exists():
continue
ext = Path(url).suffix.lower()
renders.append({
"order_line_id": str(line.id),
"order_number": line.order.order_number if line.order else None,
"output_type_name": line.output_type.name if line.output_type else None,
"render_url": url,
"is_video": ext in VIDEO_EXTENSIONS,
"render_backend": line.render_backend_used,
"completed_at": line.render_completed_at.isoformat() if line.render_completed_at else None,
})
return renders
@router.delete("/{product_id}/renders/{order_line_id}", status_code=204)
async def delete_product_render(
product_id: uuid.UUID,
order_line_id: uuid.UUID,
user: User = Depends(require_admin_or_pm),
db: AsyncSession = Depends(get_db),
):
"""Delete a render output for a product.
Removes the file from disk, clears result_path, and resets render_status to
'pending' so the line can be re-dispatched if needed.
"""
from sqlalchemy import update as sql_update
result = await db.execute(
select(OrderLine).where(
OrderLine.id == order_line_id,
OrderLine.product_id == product_id,
)
)
line = result.scalar_one_or_none()
if line is None:
raise HTTPException(404, detail="Render not found for this product")
# Delete file from disk
if line.result_path:
url = _result_path_to_url(line.result_path)
if url:
disk = _resolve_disk_path(url)
if disk and disk.exists():
try:
disk.unlink()
except OSError as exc:
# Log but don't fail — DB cleanup still proceeds
import logging
logging.getLogger(__name__).warning(
f"Could not delete render file {disk}: {exc}"
)
await db.execute(
sql_update(OrderLine)
.where(OrderLine.id == order_line_id)
.values(result_path=None, render_status="pending", render_completed_at=None)
)
await db.commit()
class DownloadRendersRequest(BaseModel):
order_line_ids: list[uuid.UUID]
@router.post("/{product_id}/download-renders")
async def download_product_renders(
product_id: uuid.UUID,
body: DownloadRendersRequest,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Stream a ZIP of selected completed render files for a product."""
prod_result = await db.execute(select(Product).where(Product.id == product_id))
product = prod_result.scalar_one_or_none()
if not product:
raise HTTPException(404, detail="Product not found")
lines_result = await db.execute(
select(OrderLine)
.options(
joinedload(OrderLine.output_type),
joinedload(OrderLine.order),
)
.where(
OrderLine.id.in_(body.order_line_ids),
OrderLine.product_id == product_id,
OrderLine.render_status == "completed",
OrderLine.result_path.is_not(None),
)
)
lines = lines_result.unique().scalars().all()
if not lines:
raise HTTPException(404, detail="No completed renders found for the selected lines")
def _resolve_path(p: str) -> str:
if p.startswith("/shared/"):
return settings.upload_dir + p[len("/shared"):]
return p
def _safe(s: str) -> str:
return re.sub(r"[^\w\-.]", "_", s).strip("_")
buf = io.BytesIO()
name_counts: dict[str, int] = {}
with zipfile.ZipFile(buf, mode="w", compression=zipfile.ZIP_DEFLATED) as zf:
for line in lines:
if not line.result_path:
continue
fs_path = _resolve_path(line.result_path)
if not os.path.isfile(fs_path):
continue
ot_name = (line.output_type.name if line.output_type else None) or "render"
order_num = (line.order.order_number if line.order else None) or "unknown"
ext = os.path.splitext(line.result_path)[1] or ".png"
base_name = f"{_safe(ot_name)}_{_safe(order_num)}{ext}"
if base_name in name_counts:
name_counts[base_name] += 1
stem, suffix = os.path.splitext(base_name)
archive_name = f"{stem}_{name_counts[base_name]}{suffix}"
else:
name_counts[base_name] = 0
archive_name = base_name
zf.write(fs_path, archive_name)
if not zf.infolist():
raise HTTPException(404, detail="No render files found on disk")
buf.seek(0)
product_name = product.name or product.pim_id or "product"
safe_name = re.sub(r"[^\w\-]", "_", product_name)
filename = f"{safe_name}_renders.zip"
return StreamingResponse(
buf,
media_type="application/zip",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
@router.get("/{product_id}/orders", response_model=list[OrderOut])
async def get_product_orders(
product_id: uuid.UUID,
user: User = Depends(require_admin_or_pm),
db: AsyncSession = Depends(get_db),
):
"""List orders that reference this product via order_lines."""
from app.models.order import Order
from sqlalchemy import func
result = await db.execute(
select(Order)
.join(OrderLine, OrderLine.order_id == Order.id)
.where(OrderLine.product_id == product_id)
.distinct()
.order_by(Order.created_at.desc())
)
orders = result.scalars().all()
out = []
for order in orders:
d = OrderOut.model_validate(order)
cnt = await db.execute(
select(func.count(OrderLine.id)).where(OrderLine.order_id == order.id)
)
d.line_count = cnt.scalar() or 0
out.append(d)
return out
# ── Render Positions CRUD ────────────────────────────────────────────────────
@router.get("/{product_id}/render-positions", response_model=list[RenderPositionOut])
async def list_render_positions(
product_id: uuid.UUID,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
select(ProductRenderPosition)
.where(ProductRenderPosition.product_id == product_id)
.order_by(ProductRenderPosition.sort_order, ProductRenderPosition.name)
)
return result.scalars().all()
@router.post(
"/{product_id}/render-positions",
response_model=RenderPositionOut,
status_code=status.HTTP_201_CREATED,
)
async def create_render_position(
product_id: uuid.UUID,
body: RenderPositionCreate,
user: User = Depends(require_admin_or_pm),
db: AsyncSession = Depends(get_db),
):
prod = await db.execute(select(Product).where(Product.id == product_id))
if not prod.scalar_one_or_none():
raise HTTPException(404, detail="Product not found")
pos = ProductRenderPosition(product_id=product_id, **body.model_dump())
db.add(pos)
try:
await db.commit()
except Exception:
await db.rollback()
raise HTTPException(409, detail=f"Position named '{body.name}' already exists for this product")
await db.refresh(pos)
return pos
@router.patch("/{product_id}/render-positions/{pos_id}", response_model=RenderPositionOut)
async def update_render_position(
product_id: uuid.UUID,
pos_id: uuid.UUID,
body: RenderPositionPatch,
user: User = Depends(require_admin_or_pm),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
select(ProductRenderPosition).where(
ProductRenderPosition.id == pos_id,
ProductRenderPosition.product_id == product_id,
)
)
pos = result.scalar_one_or_none()
if not pos:
raise HTTPException(404, detail="Render position not found")
for field, value in body.model_dump(exclude_unset=True).items():
setattr(pos, field, value)
try:
await db.commit()
except Exception:
await db.rollback()
raise HTTPException(409, detail="Name already exists for this product")
await db.refresh(pos)
return pos
@router.delete("/{product_id}/render-positions/{pos_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_render_position(
product_id: uuid.UUID,
pos_id: uuid.UUID,
user: User = Depends(require_admin_or_pm),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
select(ProductRenderPosition).where(
ProductRenderPosition.id == pos_id,
ProductRenderPosition.product_id == product_id,
)
)
pos = result.scalar_one_or_none()
if not pos:
raise HTTPException(404, detail="Render position not found")
await db.delete(pos)
await db.commit()