""" Excel parser for Schaeffler CAD order lists. Supports two formats: Old format (per-category files): Row 1-2: Instruction text (skip) Row N: Column headers — detected as the first row containing "Ebene1" Col 0 (A): Ebene1 Col 1 (B): Ebene2 ... Col 11+ : Component pairs – alternating (part_name, material) New format (unified file — TestScope_final layout): Row 1: Column headers (no instruction rows) Col 0 (A): Arbeitspaket Col 1 (B): Ebene1 Col 2 (C): Ebene2 ... Col 12+ : Component pairs Detection is header-driven: we find "Ebene1" in any column within the first 5 rows and build a dynamic column_map from that header row. """ from __future__ import annotations import logging import re from collections import Counter from dataclasses import dataclass, field from pathlib import Path from typing import Any import openpyxl logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Category detection map: substring in col0 or col2 → category_key # Priority order matters – more specific first. # --------------------------------------------------------------------------- CATEGORY_MAP: dict[str, str] = { # Linear / Anschlagplatten (check Ebene1 = "Linearsysteme") "endplatten": "Anschlagplatten", "anschlagplatten": "Anschlagplatten", "laufrollenführungen": "Anschlagplatten", "linearsysteme": "Linear_schiene", # Ebene1 value "profilschienenführungen": "Linear_schiene", "rollenumlaufeinheit": "Linear_schiene", "kugelumlaufeinheit": "Linear_schiene", # Bearings – most specific first "zylinderrollenlager": "CRB", "axial-zylinderrollenlager": "CRB", "axial-schrägrollenlager": "CRB", "axiallagerscheiben": "CRB", "torb": "SRB_TORB", "radial srb": "SRB_TORB", "pendelrollenlager": "SRB_TORB", "kegelrollenlager": "TRB", "kugellager": "Kugellager", "axial-rillenkugellager": "Kugellager", "rillenkugellager": "Kugellager", "schrägkugellager": "Kugellager", "gleitlager": "Gleitlager", "gelenklager": "Gleitlager", "gleitbuchsen": "Gleitlager", # Fallback for generic Rollenlager → TRB (only if nothing else matched) "rollenlager": "TRB", } # --------------------------------------------------------------------------- # Header name normalization map: normalized header text → field name # Supports multiple alternative column header texts for each field. # --------------------------------------------------------------------------- HEADER_FIELD_MAP: dict[str, str] = { "arbeitspaket": "arbeitspaket", "ebene1": "ebene1", "ebene2": "ebene2", "baureihe": "baureihe", "pim-id": "pim_id", "pim-id (klasse)": "pim_id", "produkt (baureihe)": "produkt_baureihe", "produkt": "produkt_baureihe", "gewähltes produkt": "gewaehltes_produkt", "gewaehltes produkt": "gewaehltes_produkt", "name cad-modell": "name_cad_modell", "name cad modell": "name_cad_modell", "gewünschte bildnummer": "gewuenschte_bildnummer", "gewuenschte bildnummer": "gewuenschte_bildnummer", "lagertyp": "lagertyp", "medias-rendering": "medias_rendering", "medias": "medias_rendering", } @dataclass class ParsedComponent: part_name: str | None material: str | None component_type: str | None column_index: int @dataclass class ParsedRow: row_index: int ebene1: str | None = None ebene2: str | None = None baureihe: str | None = None pim_id: str | None = None produkt_baureihe: str | None = None gewaehltes_produkt: str | None = None name_cad_modell: str | None = None gewuenschte_bildnummer: str | None = None lagertyp: str | None = None medias_rendering: bool | None = None components: list[ParsedComponent] = field(default_factory=list) category_key: str | None = None arbeitspaket: str | None = None @dataclass class ParsedExcel: filename: str category_key: str | None template_name: str | None column_headers: list[str] rows: list[ParsedRow] warnings: list[str] = field(default_factory=list) material_mappings: list[dict] = field(default_factory=list) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _clean(value: Any) -> str | None: """Strip whitespace, return None for empty values.""" if value is None: return None s = str(value).strip() return s if s else None def _normalize_filename(name: str | None) -> str | None: """Lowercase and strip trailing spaces from filenames. Returns None for empty strings.""" if name is None: return None stripped = name.strip() if not stripped: return None return stripped.lower() def _to_bool(value: Any) -> bool | None: """Convert Excel 1/0, 'ja'/'nein', True/False to Python bool.""" if value is None: return None if isinstance(value, bool): return value s = str(value).strip().lower() if s in ("1", "true", "ja", "yes", "x"): return True if s in ("0", "false", "nein", "no", ""): return False return None def _normalize_header(text: str) -> str: """Normalize a header cell value for matching.""" return text.strip().lower().replace("_", " ").replace("–", "-").replace("—", "-") def _detect_row_category(ebene1: str | None, ebene2: str | None, baureihe: str | None) -> str | None: """Detect category for a single row from its Ebene1, Ebene2, Baureihe values.""" candidates = [] for val in (ebene1, ebene2, baureihe): if val: candidates.append(val.lower()) for keyword, cat in CATEGORY_MAP.items(): for cand in candidates: if keyword in cand: return cat return None def _detect_category(rows: list[list[Any]], column_map: dict[str, int]) -> str | None: """ Detect category by scanning Ebene1, Ebene2, and Baureihe columns across all data rows. Priority: more specific keywords first (as ordered in map). """ ebene1_col = column_map.get("ebene1") ebene2_col = column_map.get("ebene2") baureihe_col = column_map.get("baureihe") candidates: list[str] = [] for row in rows: for col in (ebene1_col, ebene2_col, baureihe_col): if col is not None and col < len(row): val = _clean(row[col]) if val: candidates.append(val.lower()) for keyword, cat in CATEGORY_MAP.items(): for cand in candidates: if keyword in cand: return cat return None def _build_column_map(headers: list[str]) -> dict[str, int]: """Build field_name → column_index mapping from header row.""" column_map: dict[str, int] = {} for idx, raw_header in enumerate(headers): if not raw_header: continue normalized = _normalize_header(raw_header) field_name = HEADER_FIELD_MAP.get(normalized) if field_name and field_name not in column_map: column_map[field_name] = idx return column_map def _find_component_start(column_map: dict[str, int]) -> int: """Find the first column after medias_rendering for component pairs.""" medias_col = column_map.get("medias_rendering") if medias_col is not None: return medias_col + 1 # Fallback: find the highest mapped column and start after it if column_map: return max(column_map.values()) + 1 return 11 # Legacy default def _get_cell(row: list[Any], col: int | None) -> Any: """Safely get a cell value by column index.""" if col is None or col >= len(row): return None return row[col] # --------------------------------------------------------------------------- # Material mapping sheet parser # --------------------------------------------------------------------------- def _parse_material_mapping(wb) -> list[dict]: """Parse 'materialmapping' sheet if it exists. Expected columns: display_name (col A), render_name (col B). Returns list of {"display_name": str, "render_name": str}. """ # Case-insensitive sheet name search target_name = None for name in wb.sheetnames: if name.lower().replace(" ", "").replace("_", "") == "materialmapping": target_name = name break if target_name is None: return [] ws = wb[target_name] mappings = [] rows = list(ws.iter_rows(values_only=True)) if not rows: return [] # Detect header row — look for "display" or "anzeige" in first few rows data_start = 0 for i, row in enumerate(rows[:3]): if row and any( _clean(cell) and ("display" in str(cell).lower() or "anzeige" in str(cell).lower() or "material" in str(cell).lower()) for cell in row[:3] if cell is not None ): data_start = i + 1 break for row in rows[data_start:]: if len(row) < 2: continue display = _clean(row[0]) render = _clean(row[1]) if display and render: mappings.append({"display_name": display, "render_name": render}) return mappings # --------------------------------------------------------------------------- # Main parser # --------------------------------------------------------------------------- def parse_excel(file_path: str | Path) -> ParsedExcel: """ Parse a Schaeffler order list Excel file. Returns a ParsedExcel with all data rows extracted. Header-driven: finds "Ebene1" in any column within first 5 rows, then builds column map dynamically. """ file_path = Path(file_path) warnings: list[str] = [] try: wb = openpyxl.load_workbook(file_path, data_only=True) except Exception as exc: raise ValueError(f"Cannot open Excel file: {exc}") from exc ws = wb.active # Collect all rows as raw values all_rows: list[list[Any]] = [] for row in ws.iter_rows(values_only=True): all_rows.append(list(row)) if len(all_rows) < 2: raise ValueError("Excel file has fewer than 2 rows – cannot find header row") # Auto-detect header row: first row (within first 5) where ANY column == "Ebene1" header_idx: int | None = None for i, row in enumerate(all_rows[:5]): for col_idx, cell in enumerate(row): val = _clean(cell) if val and val.lower() == "ebene1": header_idx = i break if header_idx is not None: break if header_idx is None: # Fallback: assume row 3 (index 2) is headers header_idx = 2 warnings.append( "Could not auto-detect header row (expected 'Ebene1' in any column); " "falling back to row 3 as headers" ) if len(all_rows) <= header_idx: raise ValueError("Excel file has no data rows after the detected header row") headers_raw = list(all_rows[header_idx]) # Remove trailing None from headers while headers_raw and headers_raw[-1] is None: headers_raw.pop() max_col = len(headers_raw) column_headers = [_clean(h) or "" for h in headers_raw] # Build dynamic column map from headers column_map = _build_column_map(column_headers) # Data rows start immediately after the header row data_rows_raw = all_rows[header_idx + 1:] # Detect file-level category (backward compat) category_key = _detect_category(data_rows_raw, column_map) template_name = _category_to_template_name(category_key) # Determine component column start comp_start = _find_component_start(column_map) # Build component header info (paired columns from comp_start) component_col_info: list[tuple[int, int, str]] = [] # (part_col, material_col, component_type) col = comp_start while col < max_col: part_type = column_headers[col] if col < len(column_headers) else f"part_{col}" mat_col = col + 1 component_col_info.append((col, mat_col, part_type)) col += 2 # Parse data rows parsed_rows: list[ParsedRow] = [] for row_idx, raw_row in enumerate(data_rows_raw): # Pad row to max_col while len(raw_row) < max_col: raw_row.append(None) # Check if the row is completely empty (check all mapped columns) check_end = min(comp_start, max_col) if all(v is None or str(v).strip() == "" for v in raw_row[:check_end]): continue ebene1 = _clean(_get_cell(raw_row, column_map.get("ebene1"))) ebene2 = _clean(_get_cell(raw_row, column_map.get("ebene2"))) baureihe = _clean(_get_cell(raw_row, column_map.get("baureihe"))) pr = ParsedRow( row_index=row_idx + header_idx + 2, # 1-based Excel row number ebene1=ebene1, ebene2=ebene2, baureihe=baureihe, pim_id=_clean(_get_cell(raw_row, column_map.get("pim_id"))), produkt_baureihe=_clean(_get_cell(raw_row, column_map.get("produkt_baureihe"))), gewaehltes_produkt=_clean(_get_cell(raw_row, column_map.get("gewaehltes_produkt"))), name_cad_modell=_normalize_filename( _clean(_get_cell(raw_row, column_map.get("name_cad_modell"))) ), gewuenschte_bildnummer=_clean( _get_cell(raw_row, column_map.get("gewuenschte_bildnummer")) ), lagertyp=_clean(_get_cell(raw_row, column_map.get("lagertyp"))), medias_rendering=_to_bool(_get_cell(raw_row, column_map.get("medias_rendering"))), arbeitspaket=_clean(_get_cell(raw_row, column_map.get("arbeitspaket"))), category_key=_detect_row_category(ebene1, ebene2, baureihe), ) # Parse component pairs for part_col, mat_col, comp_type in component_col_info: part_name = _normalize_filename(_clean(raw_row[part_col] if part_col < len(raw_row) else None)) material = _clean(raw_row[mat_col] if mat_col < len(raw_row) else None) if part_name or material: pr.components.append( ParsedComponent( part_name=part_name, material=material, component_type=comp_type, column_index=part_col, ) ) parsed_rows.append(pr) if not parsed_rows: warnings.append("No data rows found (all rows empty after header)") # Determine file-level category from most common row category if parsed_rows: row_cats = [r.category_key for r in parsed_rows if r.category_key] if row_cats: most_common = Counter(row_cats).most_common(1)[0][0] category_key = most_common template_name = _category_to_template_name(category_key) # Parse material mapping sheet if present material_mappings = _parse_material_mapping(wb) return ParsedExcel( filename=file_path.name, category_key=category_key, template_name=template_name, column_headers=column_headers, rows=parsed_rows, warnings=warnings, material_mappings=material_mappings, ) def _category_to_template_name(category_key: str | None) -> str | None: names = { "TRB": "Tapered Roller Bearings (TRB)", "Kugellager": "Kugellager (Ball Bearings)", "Gleitlager": "Gleitlager (Plain Bearings)", "CRB": "Cylindrical Roller Bearings (CRB)", "Linear_schiene": "Linear Guide Rails", "Anschlagplatten": "End Plates (Anschlagplatten)", "SRB_TORB": "Spherical / Toroidal Roller Bearings (SRB/TORB)", } return names.get(category_key) if category_key else None # --------------------------------------------------------------------------- # Serialisation helpers (convert dataclasses → plain dicts for API) # --------------------------------------------------------------------------- def parsed_row_to_dict(pr: ParsedRow) -> dict: return { "row_index": pr.row_index, "ebene1": pr.ebene1, "ebene2": pr.ebene2, "baureihe": pr.baureihe, "pim_id": pr.pim_id, "produkt_baureihe": pr.produkt_baureihe, "gewaehltes_produkt": pr.gewaehltes_produkt, "name_cad_modell": pr.name_cad_modell, "gewuenschte_bildnummer": pr.gewuenschte_bildnummer, "lagertyp": pr.lagertyp, "medias_rendering": pr.medias_rendering, "category_key": pr.category_key, "arbeitspaket": pr.arbeitspaket, "components": [ { "part_name": c.part_name, "material": c.material, "component_type": c.component_type, "column_index": c.column_index, } for c in pr.components ], } def parsed_excel_to_dict(pe: ParsedExcel) -> dict: return { "filename": pe.filename, "category_key": pe.category_key, "template_name": pe.template_name, "row_count": len(pe.rows), "column_headers": pe.column_headers, "rows": [parsed_row_to_dict(r) for r in pe.rows], "warnings": pe.warnings, "material_mappings": pe.material_mappings, }