179 lines
6.3 KiB
Python
179 lines
6.3 KiB
Python
"""Excel import service — maps parsed rows to Product library."""
|
|
from dataclasses import dataclass, field
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.services.product_service import (
|
|
lookup_or_create_product,
|
|
lookup_product,
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class PreviewResult:
|
|
"""Read-only preview: annotates rows without creating anything."""
|
|
rows: list[dict] = field(default_factory=list)
|
|
existing_product_count: int = 0
|
|
new_product_count: int = 0
|
|
no_pim_id_count: int = 0
|
|
has_step_count: int = 0
|
|
no_step_count: int = 0
|
|
duplicate_count: int = 0
|
|
warnings: list[str] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class ImportResult:
|
|
rows: list[dict] = field(default_factory=list)
|
|
matched_count: int = 0
|
|
created_count: int = 0
|
|
no_pim_id_count: int = 0
|
|
duplicate_baureihe_count: int = 0
|
|
warnings: list[str] = field(default_factory=list)
|
|
|
|
|
|
async def import_excel_to_products(
|
|
db: AsyncSession,
|
|
parsed_rows: list[dict],
|
|
source_excel: str,
|
|
category_key: str | None = None,
|
|
) -> ImportResult:
|
|
"""For each row, look up or create a Product.
|
|
|
|
Grouping strategy:
|
|
1. Primary key: produkt_baureihe (lowercased)
|
|
2. Fallback: pim_id (backward compat)
|
|
|
|
Annotates each row dict with product_id, product_created.
|
|
"""
|
|
result = ImportResult()
|
|
|
|
# Track seen produkt_baureihe values to skip duplicates
|
|
seen_baureihe: dict[str, str] = {} # lower(baureihe) → first product_id
|
|
|
|
for row in parsed_rows:
|
|
pim_id = row.get("pim_id")
|
|
produkt_baureihe = row.get("produkt_baureihe")
|
|
row_category = row.get("category_key") or category_key
|
|
|
|
# Need at least one identifier
|
|
if not pim_id and not produkt_baureihe:
|
|
row["product_id"] = None
|
|
row["product_created"] = False
|
|
result.no_pim_id_count += 1
|
|
continue
|
|
|
|
fields = {
|
|
"name": produkt_baureihe or row.get("gewaehltes_produkt"),
|
|
"category_key": row_category,
|
|
"ebene1": row.get("ebene1"),
|
|
"ebene2": row.get("ebene2"),
|
|
"baureihe": row.get("baureihe"),
|
|
"produkt_baureihe": produkt_baureihe,
|
|
"lagertyp": row.get("lagertyp"),
|
|
"name_cad_modell": row.get("name_cad_modell"),
|
|
"gewuenschte_bildnummer": row.get("gewuenschte_bildnummer"),
|
|
"medias_rendering": row.get("medias_rendering"),
|
|
"components": row.get("components", []),
|
|
"arbeitspaket": row.get("arbeitspaket"),
|
|
"source_excel": source_excel,
|
|
}
|
|
|
|
product, was_created = await lookup_or_create_product(db, pim_id, fields)
|
|
row["product_id"] = str(product.id)
|
|
row["product_created"] = was_created
|
|
# Carry forward any STEP file already linked to this product
|
|
row["product_cad_file_id"] = str(product.cad_file_id) if product.cad_file_id else None
|
|
|
|
if was_created:
|
|
result.created_count += 1
|
|
else:
|
|
result.matched_count += 1
|
|
|
|
# Track duplicate baureihe
|
|
if produkt_baureihe:
|
|
bkey = produkt_baureihe.lower()
|
|
if bkey in seen_baureihe:
|
|
result.duplicate_baureihe_count += 1
|
|
else:
|
|
seen_baureihe[bkey] = str(product.id)
|
|
|
|
result.rows = parsed_rows
|
|
# NOTE: caller is responsible for db.commit() — keeps the transaction
|
|
# composable with order + line creation in the finalize endpoint.
|
|
return result
|
|
|
|
|
|
async def preview_excel_rows(
|
|
db: AsyncSession,
|
|
parsed_rows: list[dict],
|
|
category_key: str | None = None,
|
|
) -> PreviewResult:
|
|
"""Read-only preview: annotates rows with product_exists / product_id / duplicate flags.
|
|
|
|
Uses lookup_product (read-only) to check what already exists in the DB.
|
|
New-vs-existing is determined per unique produkt_baureihe (or pim_id fallback).
|
|
Duplicate rows (same produkt_baureihe seen more than once in this batch) are
|
|
annotated with is_duplicate=True and duplicate_of_row=<first_row_index>.
|
|
"""
|
|
result = PreviewResult()
|
|
# Track unique identifiers we've already resolved in this batch
|
|
# key = lower(baureihe) or pim_id → (product_exists, product_id_str | None, has_step, first_row_index)
|
|
seen: dict[str, tuple[bool, str | None, bool, int]] = {}
|
|
|
|
for row in parsed_rows:
|
|
pim_id = row.get("pim_id")
|
|
produkt_baureihe = row.get("produkt_baureihe")
|
|
row_index = row.get("row_index", 0)
|
|
row["category_key"] = row.get("category_key") or category_key
|
|
|
|
# Must have at least one identifier
|
|
if not pim_id and not produkt_baureihe:
|
|
row["product_exists"] = False
|
|
row["product_id"] = None
|
|
row["has_step"] = False
|
|
row["is_duplicate"] = False
|
|
result.no_pim_id_count += 1
|
|
continue
|
|
|
|
# Build a cache key
|
|
cache_key = (produkt_baureihe or "").lower() or pim_id or ""
|
|
|
|
if cache_key in seen:
|
|
exists, pid, has_step, first_row = seen[cache_key]
|
|
row["product_exists"] = exists
|
|
row["product_id"] = pid
|
|
row["has_step"] = has_step
|
|
row["is_duplicate"] = True
|
|
row["duplicate_of_row"] = first_row
|
|
result.duplicate_count += 1
|
|
continue
|
|
|
|
product = await lookup_product(db, pim_id, produkt_baureihe)
|
|
row["is_duplicate"] = False
|
|
if product is not None:
|
|
has_step = product.cad_file_id is not None
|
|
row["product_exists"] = True
|
|
row["product_id"] = str(product.id)
|
|
row["has_step"] = has_step
|
|
seen[cache_key] = (True, str(product.id), has_step, row_index)
|
|
result.existing_product_count += 1
|
|
if has_step:
|
|
result.has_step_count += 1
|
|
else:
|
|
result.no_step_count += 1
|
|
else:
|
|
row["product_exists"] = False
|
|
row["product_id"] = None
|
|
row["has_step"] = False
|
|
seen[cache_key] = (False, None, False, row_index)
|
|
result.new_product_count += 1
|
|
result.no_step_count += 1
|
|
|
|
result.rows = parsed_rows
|
|
if result.duplicate_count > 0:
|
|
result.warnings.append(
|
|
f"{result.duplicate_count} duplicate Produkt-Baureihe row(s) detected — "
|
|
"these are pre-unchecked. Only one row per product will be imported."
|
|
)
|
|
return result
|