"""Import services — Excel parsing and product import.""" from __future__ import annotations import difflib import logging import uuid from datetime import datetime # Re-export from original service files for backward compatibility. from app.services.excel_parser import parse_excel, parsed_excel_to_dict from app.services.excel_import import import_excel_to_products, preview_excel_rows logger = logging.getLogger(__name__) __all__ = [ "parse_excel", "parsed_excel_to_dict", "import_excel_to_products", "preview_excel_rows", "run_sanity_check", ] def run_sanity_check(validation_id: str, excel_path: str, tenant_id: str | None) -> dict: """Run sanity check on an imported Excel file. Returns result dict with summary + rows (stored in ImportValidation). Uses sync DB access (Celery context). """ from sqlalchemy import create_engine, select from sqlalchemy.orm import Session, selectinload from app.config import settings as app_settings from app.domains.imports.models import ImportValidation from app.domains.materials.models import Material, MaterialAlias from app.domains.products.models import Product, CadFile sync_url = app_settings.database_url.replace("+asyncpg", "") engine = create_engine(sync_url) with Session(engine) as db: # Update status to running val = db.get(ImportValidation, validation_id) if not val: logger.warning("ImportValidation %s not found", validation_id) return {} val.status = "running" db.commit() # Load all known material names + aliases for fuzzy matching materials = db.execute( select(Material).options(selectinload(Material.aliases)) ).scalars().all() known_names: list[str] = [] for m in materials: known_names.append(m.name.lower()) for a in m.aliases: known_names.append(a.alias.lower()) # Parse Excel try: parsed = parse_excel(excel_path) except Exception as exc: logger.error("Failed to parse excel %s: %s", excel_path, exc) val.status = "failed" val.completed_at = datetime.utcnow() db.commit() return {} rows_out = [] seen_pim_ids: dict[str, int] = {} counts = {"ok": 0, "warnings": 0, "errors": 0, "missing_materials": 0, "no_step": 0, "duplicates": 0} for row in parsed: issues = [] pim_id = getattr(row, "pim_id", None) or "" produkt_baureihe = getattr(row, "produkt_baureihe", None) or "" components = getattr(row, "components", []) or [] # Duplicate check key = pim_id or produkt_baureihe if key: if key in seen_pim_ids: issues.append({ "type": "duplicate", "field": "pim_id", "value": key, "suggestion": None, "message": f"Duplicate of row {seen_pim_ids[key]}", }) counts["duplicates"] += 1 else: seen_pim_ids[key] = row.row_index # STEP availability check product_id = None if pim_id or produkt_baureihe: q = select(Product) if pim_id: q = q.where(Product.pim_id == pim_id) elif produkt_baureihe: q = q.where(Product.produkt_baureihe == produkt_baureihe) product = db.execute(q).scalar_one_or_none() if product: product_id = str(product.id) has_cad = db.execute( select(CadFile).where(CadFile.id.in_( [item.cad_file_id for item in product.order_items if hasattr(item, 'cad_file_id')] )) ).first() if hasattr(product, 'order_items') else None # Simple check: product exists but may have no CAD if not product.cad_file_id if hasattr(product, 'cad_file_id') else False: issues.append({ "type": "no_step", "field": "cad_file", "value": None, "suggestion": None, "message": "No STEP file linked to this product", }) counts["no_step"] += 1 # Material check for comp in components: mat_name = getattr(comp, "material", None) or "" if not mat_name: continue mat_lower = mat_name.lower() if mat_lower in known_names: continue # exact match matches = difflib.get_close_matches(mat_lower, known_names, n=1, cutoff=0.8) if matches: issues.append({ "type": "material_suggestion", "field": "material", "value": mat_name, "suggestion": matches[0], "message": f"Material '{mat_name}' not found; closest: '{matches[0]}'", }) else: issues.append({ "type": "missing_material", "field": "material", "value": mat_name, "suggestion": None, "message": f"Material '{mat_name}' not found in library", }) counts["missing_materials"] += 1 # Row status has_error = any(i["type"] in ("missing_material",) for i in issues) has_warning = any(i["type"] in ("duplicate", "no_step", "material_suggestion") for i in issues) if has_error: row_status = "error" counts["errors"] += 1 elif has_warning: row_status = "warning" counts["warnings"] += 1 else: row_status = "ok" counts["ok"] += 1 rows_out.append({ "row_index": row.row_index, "product_id": product_id, "pim_id": pim_id or None, "produkt_baureihe": produkt_baureihe or None, "issues": issues, "status": row_status, }) summary = { "total": len(rows_out), **counts, } val.status = "completed" val.summary = summary val.rows = rows_out val.completed_at = datetime.utcnow() db.commit() return {"summary": summary, "rows": rows_out}