Files
HartOMat/backend/app/services/excel_import.py
T
Hartmut b6bac080bb feat: duplicate product detection — STEP conflict warnings on Excel import and CAD upload
- Excel preview detects when a product already has a different STEP file linked
- Excel preview detects intra-Excel conflicts (same product, different CAD model names)
- Product STEP upload warns when replacing an existing file and shows render count
- All warnings are non-blocking (amber badges, toast warnings)
- LEARNINGS.md: all open items resolved

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-14 13:05:40 +01:00

227 lines
8.5 KiB
Python

"""Excel import service — maps parsed rows to Product library."""
from dataclasses import dataclass, field
from pathlib import PurePosixPath
from sqlalchemy.ext.asyncio import AsyncSession
from app.services.product_service import (
lookup_or_create_product,
lookup_product,
)
def _stem_lower(name: str | None) -> str:
"""Return the lowercased stem (no extension) of a filename."""
if not name:
return ""
return PurePosixPath(name).stem.lower()
@dataclass
class PreviewResult:
"""Read-only preview: annotates rows without creating anything."""
rows: list[dict] = field(default_factory=list)
existing_product_count: int = 0
new_product_count: int = 0
no_pim_id_count: int = 0
has_step_count: int = 0
no_step_count: int = 0
duplicate_count: int = 0
step_conflict_count: int = 0
cad_name_conflict_count: int = 0
warnings: list[str] = field(default_factory=list)
@dataclass
class ImportResult:
rows: list[dict] = field(default_factory=list)
matched_count: int = 0
created_count: int = 0
no_pim_id_count: int = 0
duplicate_baureihe_count: int = 0
warnings: list[str] = field(default_factory=list)
async def import_excel_to_products(
db: AsyncSession,
parsed_rows: list[dict],
source_excel: str,
category_key: str | None = None,
tenant_id=None,
) -> ImportResult:
"""For each row, look up or create a Product.
Grouping strategy:
1. Primary key: produkt_baureihe (lowercased)
2. Fallback: pim_id (backward compat)
Annotates each row dict with product_id, product_created.
"""
result = ImportResult()
# Track seen produkt_baureihe values to skip duplicates
seen_baureihe: dict[str, str] = {} # lower(baureihe) → first product_id
for row in parsed_rows:
pim_id = row.get("pim_id")
produkt_baureihe = row.get("produkt_baureihe")
row_category = row.get("category_key") or category_key
# Need at least one identifier
if not pim_id and not produkt_baureihe:
row["product_id"] = None
row["product_created"] = False
result.no_pim_id_count += 1
continue
fields = {
"name": produkt_baureihe or row.get("gewaehltes_produkt"),
"category_key": row_category,
"ebene1": row.get("ebene1"),
"ebene2": row.get("ebene2"),
"baureihe": row.get("baureihe"),
"produkt_baureihe": produkt_baureihe,
"lagertyp": row.get("lagertyp"),
"name_cad_modell": row.get("name_cad_modell"),
"gewuenschte_bildnummer": row.get("gewuenschte_bildnummer"),
"medias_rendering": row.get("medias_rendering"),
"components": row.get("components", []),
"arbeitspaket": row.get("arbeitspaket"),
"source_excel": source_excel,
}
product, was_created = await lookup_or_create_product(db, pim_id, fields, tenant_id=tenant_id)
row["product_id"] = str(product.id)
row["product_created"] = was_created
# Carry forward any STEP file already linked to this product
row["product_cad_file_id"] = str(product.cad_file_id) if product.cad_file_id else None
if was_created:
result.created_count += 1
else:
result.matched_count += 1
# Track duplicate baureihe
if produkt_baureihe:
bkey = produkt_baureihe.lower()
if bkey in seen_baureihe:
result.duplicate_baureihe_count += 1
else:
seen_baureihe[bkey] = str(product.id)
result.rows = parsed_rows
# NOTE: caller is responsible for db.commit() — keeps the transaction
# composable with order + line creation in the finalize endpoint.
return result
async def preview_excel_rows(
db: AsyncSession,
parsed_rows: list[dict],
category_key: str | None = None,
) -> PreviewResult:
"""Read-only preview: annotates rows with product_exists / product_id / duplicate flags.
Uses lookup_product (read-only) to check what already exists in the DB.
New-vs-existing is determined per unique produkt_baureihe (or pim_id fallback).
Duplicate rows (same produkt_baureihe seen more than once in this batch) are
annotated with is_duplicate=True and duplicate_of_row=<first_row_index>.
"""
result = PreviewResult()
# Track unique identifiers we've already resolved in this batch
# key = lower(baureihe) or pim_id → (product_exists, product_id_str | None, has_step, first_row_index, name_cad_modell_stem)
seen: dict[str, tuple[bool, str | None, bool, int, str]] = {}
for row in parsed_rows:
pim_id = row.get("pim_id")
produkt_baureihe = row.get("produkt_baureihe")
row_index = row.get("row_index", 0)
row["category_key"] = row.get("category_key") or category_key
# Default conflict fields
row["step_conflict"] = False
row["step_conflict_existing_name"] = None
row["step_conflict_excel_name"] = None
row["cad_name_conflict"] = False
row["cad_name_conflict_other_name"] = None
row["cad_name_conflict_row"] = None
# Must have at least one identifier
if not pim_id and not produkt_baureihe:
row["product_exists"] = False
row["product_id"] = None
row["has_step"] = False
row["is_duplicate"] = False
result.no_pim_id_count += 1
continue
# Build a cache key
cache_key = (produkt_baureihe or "").lower() or pim_id or ""
excel_cad_name = row.get("name_cad_modell")
excel_cad_stem = _stem_lower(excel_cad_name)
if cache_key in seen:
exists, pid, has_step, first_row, first_cad_stem = seen[cache_key]
row["product_exists"] = exists
row["product_id"] = pid
row["has_step"] = has_step
row["is_duplicate"] = True
row["duplicate_of_row"] = first_row
# Intra-Excel conflict: same product key, different name_cad_modell
if excel_cad_stem and first_cad_stem and excel_cad_stem != first_cad_stem:
row["cad_name_conflict"] = True
row["cad_name_conflict_other_name"] = first_cad_stem
row["cad_name_conflict_row"] = first_row
result.cad_name_conflict_count += 1
result.duplicate_count += 1
continue
product = await lookup_product(db, pim_id, produkt_baureihe)
row["is_duplicate"] = False
if product is not None:
has_step = product.cad_file_id is not None
row["product_exists"] = True
row["product_id"] = str(product.id)
row["has_step"] = has_step
seen[cache_key] = (True, str(product.id), has_step, row_index, excel_cad_stem)
result.existing_product_count += 1
if has_step:
result.has_step_count += 1
else:
result.no_step_count += 1
# STEP conflict: product already has a different STEP file
if has_step and excel_cad_stem and product.cad_file:
existing_stem = _stem_lower(product.cad_file.original_name)
if existing_stem and existing_stem != excel_cad_stem:
row["step_conflict"] = True
row["step_conflict_existing_name"] = existing_stem
row["step_conflict_excel_name"] = excel_cad_stem
result.step_conflict_count += 1
else:
row["product_exists"] = False
row["product_id"] = None
row["has_step"] = False
seen[cache_key] = (False, None, False, row_index, excel_cad_stem)
result.new_product_count += 1
result.no_step_count += 1
result.rows = parsed_rows
if result.duplicate_count > 0:
result.warnings.append(
f"{result.duplicate_count} duplicate Produkt-Baureihe row(s) detected — "
"these are pre-unchecked. Only one row per product will be imported."
)
if result.step_conflict_count > 0:
result.warnings.append(
f"{result.step_conflict_count} product(s) already have a different STEP file linked — "
"importing will not replace the existing STEP file automatically."
)
if result.cad_name_conflict_count > 0:
result.warnings.append(
f"{result.cad_name_conflict_count} row(s) reference the same product with a different CAD model name."
)
return result