Files
HartOMat/backend/app/services/product_service.py
T
2026-03-05 22:12:38 +01:00

144 lines
5.5 KiB
Python

"""Product service — lookup/create products, link CAD files."""
import uuid
from sqlalchemy import select, func, update as sql_update
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.product import Product
# Default render positions added to every newly created product.
DEFAULT_RENDER_POSITIONS = [
{"name": "3/4 Front", "rotation_x": -15.0, "rotation_y": 45.0, "rotation_z": 0.0, "is_default": True, "sort_order": 0},
{"name": "3/4 Rear", "rotation_x": -15.0, "rotation_y": -135.0, "rotation_z": 0.0, "is_default": False, "sort_order": 1},
{"name": "Default", "rotation_x": 0.0, "rotation_y": 0.0, "rotation_z": 0.0, "is_default": False, "sort_order": 2},
]
async def create_default_positions(db: AsyncSession, product_id: uuid.UUID) -> None:
"""Insert the default render positions for a newly created product."""
from app.models.render_position import ProductRenderPosition
for pos_data in DEFAULT_RENDER_POSITIONS:
db.add(ProductRenderPosition(product_id=product_id, **pos_data))
await db.flush()
def _fill_missing_fields(product: Product, pim_id: str | None, fields: dict) -> None:
"""Fill in null/empty fields on an existing product without overwriting manual edits."""
if pim_id and not product.pim_id:
product.pim_id = pim_id
for attr in (
"name", "category_key", "ebene1", "ebene2", "baureihe",
"lagertyp", "name_cad_modell", "arbeitspaket",
):
if fields.get(attr) and not getattr(product, attr, None):
setattr(product, attr, fields[attr])
# Update medias_rendering if not set
if fields.get("medias_rendering") is not None and product.medias_rendering is None:
product.medias_rendering = fields["medias_rendering"]
# Always update components from the latest Excel import (needed for auto-reassign)
if fields.get("components"):
product.components = fields["components"]
async def lookup_product(
db: AsyncSession, pim_id: str | None, produkt_baureihe: str | None
) -> Product | None:
"""Read-only lookup: produkt_baureihe (primary), then pim_id (fallback).
Same cascade as lookup_or_create_product but never creates or mutates.
"""
if produkt_baureihe:
result = await db.execute(
select(Product).where(
func.lower(Product.produkt_baureihe) == produkt_baureihe.lower(),
Product.is_active.is_(True),
)
)
product = result.scalar_one_or_none()
if product is not None:
return product
# baureihe provided but not found — skip pim_id fallback (same logic)
return None
if pim_id:
result = await db.execute(
select(Product).where(Product.pim_id == pim_id, Product.is_active.is_(True))
)
return result.scalar_one_or_none()
return None
async def lookup_or_create_product(
db: AsyncSession, pim_id: str | None, fields: dict
) -> tuple[Product, bool]:
"""Look up by produkt_baureihe (primary), then pim_id (fallback). Create if not found.
Returns (product, was_created).
Does NOT overwrite existing fields — preserves manual edits.
"""
produkt_baureihe = fields.get("produkt_baureihe")
# Primary lookup: by produkt_baureihe (case-insensitive)
if produkt_baureihe:
result = await db.execute(
select(Product).where(
func.lower(Product.produkt_baureihe) == produkt_baureihe.lower(),
Product.is_active.is_(True),
)
)
product = result.scalar_one_or_none()
if product is not None:
_fill_missing_fields(product, pim_id, fields)
await db.flush()
return product, False
# produkt_baureihe was provided but not found — each baureihe is a
# distinct product, so skip the pim_id fallback and create a new one.
# Fallback lookup: by pim_id (only when produkt_baureihe is absent,
# e.g. old per-category Excel files that don't have a Baureihe column).
if not produkt_baureihe and pim_id:
result = await db.execute(
select(Product).where(Product.pim_id == pim_id, Product.is_active.is_(True))
)
product = result.scalar_one_or_none()
if product is not None:
_fill_missing_fields(product, pim_id, fields)
await db.flush()
return product, False
product = Product(
pim_id=pim_id or f"auto-{uuid.uuid4().hex[:8]}",
name=fields.get("name"),
category_key=fields.get("category_key"),
ebene1=fields.get("ebene1"),
ebene2=fields.get("ebene2"),
baureihe=fields.get("baureihe"),
produkt_baureihe=produkt_baureihe,
lagertyp=fields.get("lagertyp"),
name_cad_modell=fields.get("name_cad_modell"),
arbeitspaket=fields.get("arbeitspaket"),
components=fields.get("components", []),
cad_part_materials=fields.get("cad_part_materials", []),
source_excel=fields.get("source_excel"),
)
db.add(product)
await db.flush()
await create_default_positions(db, product.id)
return product, True
async def link_cad_to_product(
db: AsyncSession, product_id: uuid.UUID, cad_file_id: uuid.UUID
) -> Product:
"""Set product.cad_file_id via direct SQL UPDATE."""
await db.execute(
sql_update(Product)
.where(Product.id == product_id)
.values(cad_file_id=cad_file_id)
)
await db.commit()
result = await db.execute(select(Product).where(Product.id == product_id))
return result.scalar_one()