506 lines
17 KiB
Python
506 lines
17 KiB
Python
"""
|
||
Excel parser for Schaeffler CAD order lists.
|
||
|
||
Supports two formats:
|
||
|
||
Old format (per-category files):
|
||
Row 1-2: Instruction text (skip)
|
||
Row N: Column headers — detected as the first row containing "Ebene1"
|
||
Col 0 (A): Ebene1
|
||
Col 1 (B): Ebene2
|
||
...
|
||
Col 11+ : Component pairs – alternating (part_name, material)
|
||
|
||
New format (unified file — TestScope_final layout):
|
||
Row 1: Column headers (no instruction rows)
|
||
Col 0 (A): Arbeitspaket
|
||
Col 1 (B): Ebene1
|
||
Col 2 (C): Ebene2
|
||
...
|
||
Col 12+ : Component pairs
|
||
|
||
Detection is header-driven: we find "Ebene1" in any column within the first 5 rows
|
||
and build a dynamic column_map from that header row.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
import re
|
||
from collections import Counter
|
||
from dataclasses import dataclass, field
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
import openpyxl
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Category detection map: substring in col0 or col2 → category_key
|
||
# Priority order matters – more specific first.
|
||
# ---------------------------------------------------------------------------
|
||
CATEGORY_MAP: dict[str, str] = {
|
||
# Linear / Anschlagplatten (check Ebene1 = "Linearsysteme")
|
||
"endplatten": "Anschlagplatten",
|
||
"anschlagplatten": "Anschlagplatten",
|
||
"laufrollenführungen": "Anschlagplatten",
|
||
"linearsysteme": "Linear_schiene", # Ebene1 value
|
||
"profilschienenführungen": "Linear_schiene",
|
||
"rollenumlaufeinheit": "Linear_schiene",
|
||
"kugelumlaufeinheit": "Linear_schiene",
|
||
# Bearings – most specific first
|
||
"zylinderrollenlager": "CRB",
|
||
"axial-zylinderrollenlager": "CRB",
|
||
"axial-schrägrollenlager": "CRB",
|
||
"axiallagerscheiben": "CRB",
|
||
"torb": "SRB_TORB",
|
||
"radial srb": "SRB_TORB",
|
||
"pendelrollenlager": "SRB_TORB",
|
||
"kegelrollenlager": "TRB",
|
||
"kugellager": "Kugellager",
|
||
"axial-rillenkugellager": "Kugellager",
|
||
"rillenkugellager": "Kugellager",
|
||
"schrägkugellager": "Kugellager",
|
||
"gleitlager": "Gleitlager",
|
||
"gelenklager": "Gleitlager",
|
||
"gleitbuchsen": "Gleitlager",
|
||
# Fallback for generic Rollenlager → TRB (only if nothing else matched)
|
||
"rollenlager": "TRB",
|
||
}
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Header name normalization map: normalized header text → field name
|
||
# Supports multiple alternative column header texts for each field.
|
||
# ---------------------------------------------------------------------------
|
||
HEADER_FIELD_MAP: dict[str, str] = {
|
||
"arbeitspaket": "arbeitspaket",
|
||
"ebene1": "ebene1",
|
||
"ebene2": "ebene2",
|
||
"baureihe": "baureihe",
|
||
"pim-id": "pim_id",
|
||
"pim-id (klasse)": "pim_id",
|
||
"produkt (baureihe)": "produkt_baureihe",
|
||
"produkt": "produkt_baureihe",
|
||
"gewähltes produkt": "gewaehltes_produkt",
|
||
"gewaehltes produkt": "gewaehltes_produkt",
|
||
"name cad-modell": "name_cad_modell",
|
||
"name cad modell": "name_cad_modell",
|
||
"gewünschte bildnummer": "gewuenschte_bildnummer",
|
||
"gewuenschte bildnummer": "gewuenschte_bildnummer",
|
||
"lagertyp": "lagertyp",
|
||
"medias-rendering": "medias_rendering",
|
||
"medias": "medias_rendering",
|
||
}
|
||
|
||
|
||
@dataclass
|
||
class ParsedComponent:
|
||
part_name: str | None
|
||
material: str | None
|
||
component_type: str | None
|
||
column_index: int
|
||
|
||
|
||
@dataclass
|
||
class ParsedRow:
|
||
row_index: int
|
||
ebene1: str | None = None
|
||
ebene2: str | None = None
|
||
baureihe: str | None = None
|
||
pim_id: str | None = None
|
||
produkt_baureihe: str | None = None
|
||
gewaehltes_produkt: str | None = None
|
||
name_cad_modell: str | None = None
|
||
gewuenschte_bildnummer: str | None = None
|
||
lagertyp: str | None = None
|
||
medias_rendering: bool | None = None
|
||
components: list[ParsedComponent] = field(default_factory=list)
|
||
category_key: str | None = None
|
||
arbeitspaket: str | None = None
|
||
|
||
|
||
@dataclass
|
||
class ParsedExcel:
|
||
filename: str
|
||
category_key: str | None
|
||
template_name: str | None
|
||
column_headers: list[str]
|
||
rows: list[ParsedRow]
|
||
warnings: list[str] = field(default_factory=list)
|
||
material_mappings: list[dict] = field(default_factory=list)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _clean(value: Any) -> str | None:
|
||
"""Strip whitespace, return None for empty values."""
|
||
if value is None:
|
||
return None
|
||
s = str(value).strip()
|
||
return s if s else None
|
||
|
||
|
||
def _normalize_filename(name: str | None) -> str | None:
|
||
"""Lowercase and strip trailing spaces from filenames. Returns None for empty strings."""
|
||
if name is None:
|
||
return None
|
||
stripped = name.strip()
|
||
if not stripped:
|
||
return None
|
||
return stripped.lower()
|
||
|
||
|
||
def _to_bool(value: Any) -> bool | None:
|
||
"""Convert Excel 1/0, 'ja'/'nein', True/False to Python bool."""
|
||
if value is None:
|
||
return None
|
||
if isinstance(value, bool):
|
||
return value
|
||
s = str(value).strip().lower()
|
||
if s in ("1", "true", "ja", "yes", "x"):
|
||
return True
|
||
if s in ("0", "false", "nein", "no", ""):
|
||
return False
|
||
return None
|
||
|
||
|
||
def _normalize_header(text: str) -> str:
|
||
"""Normalize a header cell value for matching."""
|
||
return text.strip().lower().replace("_", " ").replace("–", "-").replace("—", "-")
|
||
|
||
|
||
def _detect_row_category(ebene1: str | None, ebene2: str | None, baureihe: str | None) -> str | None:
|
||
"""Detect category for a single row from its Ebene1, Ebene2, Baureihe values."""
|
||
candidates = []
|
||
for val in (ebene1, ebene2, baureihe):
|
||
if val:
|
||
candidates.append(val.lower())
|
||
for keyword, cat in CATEGORY_MAP.items():
|
||
for cand in candidates:
|
||
if keyword in cand:
|
||
return cat
|
||
return None
|
||
|
||
|
||
def _detect_category(rows: list[list[Any]], column_map: dict[str, int]) -> str | None:
|
||
"""
|
||
Detect category by scanning Ebene1, Ebene2, and Baureihe columns
|
||
across all data rows. Priority: more specific keywords first (as ordered in map).
|
||
"""
|
||
ebene1_col = column_map.get("ebene1")
|
||
ebene2_col = column_map.get("ebene2")
|
||
baureihe_col = column_map.get("baureihe")
|
||
|
||
candidates: list[str] = []
|
||
for row in rows:
|
||
for col in (ebene1_col, ebene2_col, baureihe_col):
|
||
if col is not None and col < len(row):
|
||
val = _clean(row[col])
|
||
if val:
|
||
candidates.append(val.lower())
|
||
|
||
for keyword, cat in CATEGORY_MAP.items():
|
||
for cand in candidates:
|
||
if keyword in cand:
|
||
return cat
|
||
return None
|
||
|
||
|
||
def _build_column_map(headers: list[str]) -> dict[str, int]:
|
||
"""Build field_name → column_index mapping from header row."""
|
||
column_map: dict[str, int] = {}
|
||
for idx, raw_header in enumerate(headers):
|
||
if not raw_header:
|
||
continue
|
||
normalized = _normalize_header(raw_header)
|
||
field_name = HEADER_FIELD_MAP.get(normalized)
|
||
if field_name and field_name not in column_map:
|
||
column_map[field_name] = idx
|
||
return column_map
|
||
|
||
|
||
def _find_component_start(column_map: dict[str, int]) -> int:
|
||
"""Find the first column after medias_rendering for component pairs."""
|
||
medias_col = column_map.get("medias_rendering")
|
||
if medias_col is not None:
|
||
return medias_col + 1
|
||
# Fallback: find the highest mapped column and start after it
|
||
if column_map:
|
||
return max(column_map.values()) + 1
|
||
return 11 # Legacy default
|
||
|
||
|
||
def _get_cell(row: list[Any], col: int | None) -> Any:
|
||
"""Safely get a cell value by column index."""
|
||
if col is None or col >= len(row):
|
||
return None
|
||
return row[col]
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Material mapping sheet parser
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _parse_material_mapping(wb) -> list[dict]:
|
||
"""Parse 'materialmapping' sheet if it exists.
|
||
|
||
Expected columns: display_name (col A), render_name (col B).
|
||
Returns list of {"display_name": str, "render_name": str}.
|
||
"""
|
||
# Case-insensitive sheet name search
|
||
target_name = None
|
||
for name in wb.sheetnames:
|
||
if name.lower().replace(" ", "").replace("_", "") == "materialmapping":
|
||
target_name = name
|
||
break
|
||
if target_name is None:
|
||
return []
|
||
|
||
ws = wb[target_name]
|
||
mappings = []
|
||
|
||
rows = list(ws.iter_rows(values_only=True))
|
||
if not rows:
|
||
return []
|
||
|
||
# Detect header row — look for "display" or "anzeige" in first few rows
|
||
data_start = 0
|
||
for i, row in enumerate(rows[:3]):
|
||
if row and any(
|
||
_clean(cell) and ("display" in str(cell).lower() or "anzeige" in str(cell).lower() or "material" in str(cell).lower())
|
||
for cell in row[:3]
|
||
if cell is not None
|
||
):
|
||
data_start = i + 1
|
||
break
|
||
|
||
for row in rows[data_start:]:
|
||
if len(row) < 2:
|
||
continue
|
||
display = _clean(row[0])
|
||
render = _clean(row[1])
|
||
if display and render:
|
||
mappings.append({"display_name": display, "render_name": render})
|
||
|
||
return mappings
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Main parser
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def parse_excel(file_path: str | Path) -> ParsedExcel:
|
||
"""
|
||
Parse a Schaeffler order list Excel file.
|
||
|
||
Returns a ParsedExcel with all data rows extracted.
|
||
Header-driven: finds "Ebene1" in any column within first 5 rows,
|
||
then builds column map dynamically.
|
||
"""
|
||
file_path = Path(file_path)
|
||
warnings: list[str] = []
|
||
|
||
try:
|
||
wb = openpyxl.load_workbook(file_path, data_only=True)
|
||
except Exception as exc:
|
||
raise ValueError(f"Cannot open Excel file: {exc}") from exc
|
||
|
||
ws = wb.active
|
||
|
||
# Collect all rows as raw values
|
||
all_rows: list[list[Any]] = []
|
||
for row in ws.iter_rows(values_only=True):
|
||
all_rows.append(list(row))
|
||
|
||
if len(all_rows) < 2:
|
||
raise ValueError("Excel file has fewer than 2 rows – cannot find header row")
|
||
|
||
# Auto-detect header row: first row (within first 5) where ANY column == "Ebene1"
|
||
header_idx: int | None = None
|
||
for i, row in enumerate(all_rows[:5]):
|
||
for col_idx, cell in enumerate(row):
|
||
val = _clean(cell)
|
||
if val and val.lower() == "ebene1":
|
||
header_idx = i
|
||
break
|
||
if header_idx is not None:
|
||
break
|
||
|
||
if header_idx is None:
|
||
# Fallback: assume row 3 (index 2) is headers
|
||
header_idx = 2
|
||
warnings.append(
|
||
"Could not auto-detect header row (expected 'Ebene1' in any column); "
|
||
"falling back to row 3 as headers"
|
||
)
|
||
|
||
if len(all_rows) <= header_idx:
|
||
raise ValueError("Excel file has no data rows after the detected header row")
|
||
|
||
headers_raw = list(all_rows[header_idx])
|
||
# Remove trailing None from headers
|
||
while headers_raw and headers_raw[-1] is None:
|
||
headers_raw.pop()
|
||
|
||
max_col = len(headers_raw)
|
||
column_headers = [_clean(h) or "" for h in headers_raw]
|
||
|
||
# Build dynamic column map from headers
|
||
column_map = _build_column_map(column_headers)
|
||
|
||
# Data rows start immediately after the header row
|
||
data_rows_raw = all_rows[header_idx + 1:]
|
||
|
||
# Detect file-level category (backward compat)
|
||
category_key = _detect_category(data_rows_raw, column_map)
|
||
template_name = _category_to_template_name(category_key)
|
||
|
||
# Determine component column start
|
||
comp_start = _find_component_start(column_map)
|
||
|
||
# Build component header info (paired columns from comp_start)
|
||
component_col_info: list[tuple[int, int, str]] = [] # (part_col, material_col, component_type)
|
||
col = comp_start
|
||
while col < max_col:
|
||
part_type = column_headers[col] if col < len(column_headers) else f"part_{col}"
|
||
mat_col = col + 1
|
||
component_col_info.append((col, mat_col, part_type))
|
||
col += 2
|
||
|
||
# Parse data rows
|
||
parsed_rows: list[ParsedRow] = []
|
||
for row_idx, raw_row in enumerate(data_rows_raw):
|
||
# Pad row to max_col
|
||
while len(raw_row) < max_col:
|
||
raw_row.append(None)
|
||
|
||
# Check if the row is completely empty (check all mapped columns)
|
||
check_end = min(comp_start, max_col)
|
||
if all(v is None or str(v).strip() == "" for v in raw_row[:check_end]):
|
||
continue
|
||
|
||
ebene1 = _clean(_get_cell(raw_row, column_map.get("ebene1")))
|
||
ebene2 = _clean(_get_cell(raw_row, column_map.get("ebene2")))
|
||
baureihe = _clean(_get_cell(raw_row, column_map.get("baureihe")))
|
||
|
||
pr = ParsedRow(
|
||
row_index=row_idx + header_idx + 2, # 1-based Excel row number
|
||
ebene1=ebene1,
|
||
ebene2=ebene2,
|
||
baureihe=baureihe,
|
||
pim_id=_clean(_get_cell(raw_row, column_map.get("pim_id"))),
|
||
produkt_baureihe=_clean(_get_cell(raw_row, column_map.get("produkt_baureihe"))),
|
||
gewaehltes_produkt=_clean(_get_cell(raw_row, column_map.get("gewaehltes_produkt"))),
|
||
name_cad_modell=_normalize_filename(
|
||
_clean(_get_cell(raw_row, column_map.get("name_cad_modell")))
|
||
),
|
||
gewuenschte_bildnummer=_clean(
|
||
_get_cell(raw_row, column_map.get("gewuenschte_bildnummer"))
|
||
),
|
||
lagertyp=_clean(_get_cell(raw_row, column_map.get("lagertyp"))),
|
||
medias_rendering=_to_bool(_get_cell(raw_row, column_map.get("medias_rendering"))),
|
||
arbeitspaket=_clean(_get_cell(raw_row, column_map.get("arbeitspaket"))),
|
||
category_key=_detect_row_category(ebene1, ebene2, baureihe),
|
||
)
|
||
|
||
# Parse component pairs
|
||
for part_col, mat_col, comp_type in component_col_info:
|
||
part_name = _normalize_filename(_clean(raw_row[part_col] if part_col < len(raw_row) else None))
|
||
material = _clean(raw_row[mat_col] if mat_col < len(raw_row) else None)
|
||
|
||
if part_name or material:
|
||
pr.components.append(
|
||
ParsedComponent(
|
||
part_name=part_name,
|
||
material=material,
|
||
component_type=comp_type,
|
||
column_index=part_col,
|
||
)
|
||
)
|
||
|
||
parsed_rows.append(pr)
|
||
|
||
if not parsed_rows:
|
||
warnings.append("No data rows found (all rows empty after header)")
|
||
|
||
# Determine file-level category from most common row category
|
||
if parsed_rows:
|
||
row_cats = [r.category_key for r in parsed_rows if r.category_key]
|
||
if row_cats:
|
||
most_common = Counter(row_cats).most_common(1)[0][0]
|
||
category_key = most_common
|
||
template_name = _category_to_template_name(category_key)
|
||
|
||
# Parse material mapping sheet if present
|
||
material_mappings = _parse_material_mapping(wb)
|
||
|
||
return ParsedExcel(
|
||
filename=file_path.name,
|
||
category_key=category_key,
|
||
template_name=template_name,
|
||
column_headers=column_headers,
|
||
rows=parsed_rows,
|
||
warnings=warnings,
|
||
material_mappings=material_mappings,
|
||
)
|
||
|
||
|
||
def _category_to_template_name(category_key: str | None) -> str | None:
|
||
names = {
|
||
"TRB": "Tapered Roller Bearings (TRB)",
|
||
"Kugellager": "Kugellager (Ball Bearings)",
|
||
"Gleitlager": "Gleitlager (Plain Bearings)",
|
||
"CRB": "Cylindrical Roller Bearings (CRB)",
|
||
"Linear_schiene": "Linear Guide Rails",
|
||
"Anschlagplatten": "End Plates (Anschlagplatten)",
|
||
"SRB_TORB": "Spherical / Toroidal Roller Bearings (SRB/TORB)",
|
||
}
|
||
return names.get(category_key) if category_key else None
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Serialisation helpers (convert dataclasses → plain dicts for API)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def parsed_row_to_dict(pr: ParsedRow) -> dict:
|
||
return {
|
||
"row_index": pr.row_index,
|
||
"ebene1": pr.ebene1,
|
||
"ebene2": pr.ebene2,
|
||
"baureihe": pr.baureihe,
|
||
"pim_id": pr.pim_id,
|
||
"produkt_baureihe": pr.produkt_baureihe,
|
||
"gewaehltes_produkt": pr.gewaehltes_produkt,
|
||
"name_cad_modell": pr.name_cad_modell,
|
||
"gewuenschte_bildnummer": pr.gewuenschte_bildnummer,
|
||
"lagertyp": pr.lagertyp,
|
||
"medias_rendering": pr.medias_rendering,
|
||
"category_key": pr.category_key,
|
||
"arbeitspaket": pr.arbeitspaket,
|
||
"components": [
|
||
{
|
||
"part_name": c.part_name,
|
||
"material": c.material,
|
||
"component_type": c.component_type,
|
||
"column_index": c.column_index,
|
||
}
|
||
for c in pr.components
|
||
],
|
||
}
|
||
|
||
|
||
def parsed_excel_to_dict(pe: ParsedExcel) -> dict:
|
||
return {
|
||
"filename": pe.filename,
|
||
"category_key": pe.category_key,
|
||
"template_name": pe.template_name,
|
||
"row_count": len(pe.rows),
|
||
"column_headers": pe.column_headers,
|
||
"rows": [parsed_row_to_dict(r) for r in pe.rows],
|
||
"warnings": pe.warnings,
|
||
"material_mappings": pe.material_mappings,
|
||
}
|