Files

506 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Excel parser for HartOMat CAD order lists.
Supports two formats:
Old format (per-category files):
Row 1-2: Instruction text (skip)
Row N: Column headers — detected as the first row containing "Ebene1"
Col 0 (A): Ebene1
Col 1 (B): Ebene2
...
Col 11+ : Component pairs alternating (part_name, material)
New format (unified file — TestScope_final layout):
Row 1: Column headers (no instruction rows)
Col 0 (A): Arbeitspaket
Col 1 (B): Ebene1
Col 2 (C): Ebene2
...
Col 12+ : Component pairs
Detection is header-driven: we find "Ebene1" in any column within the first 5 rows
and build a dynamic column_map from that header row.
"""
from __future__ import annotations
import logging
import re
from collections import Counter
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import openpyxl
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Category detection map: substring in col0 or col2 → category_key
# Priority order matters more specific first.
# ---------------------------------------------------------------------------
CATEGORY_MAP: dict[str, str] = {
# Linear / Anschlagplatten (check Ebene1 = "Linearsysteme")
"endplatten": "Anschlagplatten",
"anschlagplatten": "Anschlagplatten",
"laufrollenführungen": "Anschlagplatten",
"linearsysteme": "Linear_schiene", # Ebene1 value
"profilschienenführungen": "Linear_schiene",
"rollenumlaufeinheit": "Linear_schiene",
"kugelumlaufeinheit": "Linear_schiene",
# Bearings most specific first
"zylinderrollenlager": "CRB",
"axial-zylinderrollenlager": "CRB",
"axial-schrägrollenlager": "CRB",
"axiallagerscheiben": "CRB",
"torb": "SRB_TORB",
"radial srb": "SRB_TORB",
"pendelrollenlager": "SRB_TORB",
"kegelrollenlager": "TRB",
"kugellager": "Kugellager",
"axial-rillenkugellager": "Kugellager",
"rillenkugellager": "Kugellager",
"schrägkugellager": "Kugellager",
"gleitlager": "Gleitlager",
"gelenklager": "Gleitlager",
"gleitbuchsen": "Gleitlager",
# Fallback for generic Rollenlager → TRB (only if nothing else matched)
"rollenlager": "TRB",
}
# ---------------------------------------------------------------------------
# Header name normalization map: normalized header text → field name
# Supports multiple alternative column header texts for each field.
# ---------------------------------------------------------------------------
HEADER_FIELD_MAP: dict[str, str] = {
"arbeitspaket": "arbeitspaket",
"ebene1": "ebene1",
"ebene2": "ebene2",
"baureihe": "baureihe",
"pim-id": "pim_id",
"pim-id (klasse)": "pim_id",
"produkt (baureihe)": "produkt_baureihe",
"produkt": "produkt_baureihe",
"gewähltes produkt": "gewaehltes_produkt",
"gewaehltes produkt": "gewaehltes_produkt",
"name cad-modell": "name_cad_modell",
"name cad modell": "name_cad_modell",
"gewünschte bildnummer": "gewuenschte_bildnummer",
"gewuenschte bildnummer": "gewuenschte_bildnummer",
"lagertyp": "lagertyp",
"medias-rendering": "medias_rendering",
"medias": "medias_rendering",
}
@dataclass
class ParsedComponent:
part_name: str | None
material: str | None
component_type: str | None
column_index: int
@dataclass
class ParsedRow:
row_index: int
ebene1: str | None = None
ebene2: str | None = None
baureihe: str | None = None
pim_id: str | None = None
produkt_baureihe: str | None = None
gewaehltes_produkt: str | None = None
name_cad_modell: str | None = None
gewuenschte_bildnummer: str | None = None
lagertyp: str | None = None
medias_rendering: bool | None = None
components: list[ParsedComponent] = field(default_factory=list)
category_key: str | None = None
arbeitspaket: str | None = None
@dataclass
class ParsedExcel:
filename: str
category_key: str | None
template_name: str | None
column_headers: list[str]
rows: list[ParsedRow]
warnings: list[str] = field(default_factory=list)
material_mappings: list[dict] = field(default_factory=list)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _clean(value: Any) -> str | None:
"""Strip whitespace, return None for empty values."""
if value is None:
return None
s = str(value).strip()
return s if s else None
def _normalize_filename(name: str | None) -> str | None:
"""Lowercase and strip trailing spaces from filenames. Returns None for empty strings."""
if name is None:
return None
stripped = name.strip()
if not stripped:
return None
return stripped.lower()
def _to_bool(value: Any) -> bool | None:
"""Convert Excel 1/0, 'ja'/'nein', True/False to Python bool."""
if value is None:
return None
if isinstance(value, bool):
return value
s = str(value).strip().lower()
if s in ("1", "true", "ja", "yes", "x"):
return True
if s in ("0", "false", "nein", "no", ""):
return False
return None
def _normalize_header(text: str) -> str:
"""Normalize a header cell value for matching."""
return text.strip().lower().replace("_", " ").replace("", "-").replace("", "-")
def _detect_row_category(ebene1: str | None, ebene2: str | None, baureihe: str | None) -> str | None:
"""Detect category for a single row from its Ebene1, Ebene2, Baureihe values."""
candidates = []
for val in (ebene1, ebene2, baureihe):
if val:
candidates.append(val.lower())
for keyword, cat in CATEGORY_MAP.items():
for cand in candidates:
if keyword in cand:
return cat
return None
def _detect_category(rows: list[list[Any]], column_map: dict[str, int]) -> str | None:
"""
Detect category by scanning Ebene1, Ebene2, and Baureihe columns
across all data rows. Priority: more specific keywords first (as ordered in map).
"""
ebene1_col = column_map.get("ebene1")
ebene2_col = column_map.get("ebene2")
baureihe_col = column_map.get("baureihe")
candidates: list[str] = []
for row in rows:
for col in (ebene1_col, ebene2_col, baureihe_col):
if col is not None and col < len(row):
val = _clean(row[col])
if val:
candidates.append(val.lower())
for keyword, cat in CATEGORY_MAP.items():
for cand in candidates:
if keyword in cand:
return cat
return None
def _build_column_map(headers: list[str]) -> dict[str, int]:
"""Build field_name → column_index mapping from header row."""
column_map: dict[str, int] = {}
for idx, raw_header in enumerate(headers):
if not raw_header:
continue
normalized = _normalize_header(raw_header)
field_name = HEADER_FIELD_MAP.get(normalized)
if field_name and field_name not in column_map:
column_map[field_name] = idx
return column_map
def _find_component_start(column_map: dict[str, int]) -> int:
"""Find the first column after medias_rendering for component pairs."""
medias_col = column_map.get("medias_rendering")
if medias_col is not None:
return medias_col + 1
# Fallback: find the highest mapped column and start after it
if column_map:
return max(column_map.values()) + 1
return 11 # Legacy default
def _get_cell(row: list[Any], col: int | None) -> Any:
"""Safely get a cell value by column index."""
if col is None or col >= len(row):
return None
return row[col]
# ---------------------------------------------------------------------------
# Material mapping sheet parser
# ---------------------------------------------------------------------------
def _parse_material_mapping(wb) -> list[dict]:
"""Parse 'materialmapping' sheet if it exists.
Expected columns: display_name (col A), render_name (col B).
Returns list of {"display_name": str, "render_name": str}.
"""
# Case-insensitive sheet name search
target_name = None
for name in wb.sheetnames:
if name.lower().replace(" ", "").replace("_", "") == "materialmapping":
target_name = name
break
if target_name is None:
return []
ws = wb[target_name]
mappings = []
rows = list(ws.iter_rows(values_only=True))
if not rows:
return []
# Detect header row — look for "display" or "anzeige" in first few rows
data_start = 0
for i, row in enumerate(rows[:3]):
if row and any(
_clean(cell) and ("display" in str(cell).lower() or "anzeige" in str(cell).lower() or "material" in str(cell).lower())
for cell in row[:3]
if cell is not None
):
data_start = i + 1
break
for row in rows[data_start:]:
if len(row) < 2:
continue
display = _clean(row[0])
render = _clean(row[1])
if display and render:
mappings.append({"display_name": display, "render_name": render})
return mappings
# ---------------------------------------------------------------------------
# Main parser
# ---------------------------------------------------------------------------
def parse_excel(file_path: str | Path) -> ParsedExcel:
"""
Parse a HartOMat order list Excel file.
Returns a ParsedExcel with all data rows extracted.
Header-driven: finds "Ebene1" in any column within first 5 rows,
then builds column map dynamically.
"""
file_path = Path(file_path)
warnings: list[str] = []
try:
wb = openpyxl.load_workbook(file_path, data_only=True)
except Exception as exc:
raise ValueError(f"Cannot open Excel file: {exc}") from exc
ws = wb.active
# Collect all rows as raw values
all_rows: list[list[Any]] = []
for row in ws.iter_rows(values_only=True):
all_rows.append(list(row))
if len(all_rows) < 2:
raise ValueError("Excel file has fewer than 2 rows cannot find header row")
# Auto-detect header row: first row (within first 5) where ANY column == "Ebene1"
header_idx: int | None = None
for i, row in enumerate(all_rows[:5]):
for col_idx, cell in enumerate(row):
val = _clean(cell)
if val and val.lower() == "ebene1":
header_idx = i
break
if header_idx is not None:
break
if header_idx is None:
# Fallback: assume row 3 (index 2) is headers
header_idx = 2
warnings.append(
"Could not auto-detect header row (expected 'Ebene1' in any column); "
"falling back to row 3 as headers"
)
if len(all_rows) <= header_idx:
raise ValueError("Excel file has no data rows after the detected header row")
headers_raw = list(all_rows[header_idx])
# Remove trailing None from headers
while headers_raw and headers_raw[-1] is None:
headers_raw.pop()
max_col = len(headers_raw)
column_headers = [_clean(h) or "" for h in headers_raw]
# Build dynamic column map from headers
column_map = _build_column_map(column_headers)
# Data rows start immediately after the header row
data_rows_raw = all_rows[header_idx + 1:]
# Detect file-level category (backward compat)
category_key = _detect_category(data_rows_raw, column_map)
template_name = _category_to_template_name(category_key)
# Determine component column start
comp_start = _find_component_start(column_map)
# Build component header info (paired columns from comp_start)
component_col_info: list[tuple[int, int, str]] = [] # (part_col, material_col, component_type)
col = comp_start
while col < max_col:
part_type = column_headers[col] if col < len(column_headers) else f"part_{col}"
mat_col = col + 1
component_col_info.append((col, mat_col, part_type))
col += 2
# Parse data rows
parsed_rows: list[ParsedRow] = []
for row_idx, raw_row in enumerate(data_rows_raw):
# Pad row to max_col
while len(raw_row) < max_col:
raw_row.append(None)
# Check if the row is completely empty (check all mapped columns)
check_end = min(comp_start, max_col)
if all(v is None or str(v).strip() == "" for v in raw_row[:check_end]):
continue
ebene1 = _clean(_get_cell(raw_row, column_map.get("ebene1")))
ebene2 = _clean(_get_cell(raw_row, column_map.get("ebene2")))
baureihe = _clean(_get_cell(raw_row, column_map.get("baureihe")))
pr = ParsedRow(
row_index=row_idx + header_idx + 2, # 1-based Excel row number
ebene1=ebene1,
ebene2=ebene2,
baureihe=baureihe,
pim_id=_clean(_get_cell(raw_row, column_map.get("pim_id"))),
produkt_baureihe=_clean(_get_cell(raw_row, column_map.get("produkt_baureihe"))),
gewaehltes_produkt=_clean(_get_cell(raw_row, column_map.get("gewaehltes_produkt"))),
name_cad_modell=_normalize_filename(
_clean(_get_cell(raw_row, column_map.get("name_cad_modell")))
),
gewuenschte_bildnummer=_clean(
_get_cell(raw_row, column_map.get("gewuenschte_bildnummer"))
),
lagertyp=_clean(_get_cell(raw_row, column_map.get("lagertyp"))),
medias_rendering=_to_bool(_get_cell(raw_row, column_map.get("medias_rendering"))),
arbeitspaket=_clean(_get_cell(raw_row, column_map.get("arbeitspaket"))),
category_key=_detect_row_category(ebene1, ebene2, baureihe),
)
# Parse component pairs
for part_col, mat_col, comp_type in component_col_info:
part_name = _normalize_filename(_clean(raw_row[part_col] if part_col < len(raw_row) else None))
material = _clean(raw_row[mat_col] if mat_col < len(raw_row) else None)
if part_name or material:
pr.components.append(
ParsedComponent(
part_name=part_name,
material=material,
component_type=comp_type,
column_index=part_col,
)
)
parsed_rows.append(pr)
if not parsed_rows:
warnings.append("No data rows found (all rows empty after header)")
# Determine file-level category from most common row category
if parsed_rows:
row_cats = [r.category_key for r in parsed_rows if r.category_key]
if row_cats:
most_common = Counter(row_cats).most_common(1)[0][0]
category_key = most_common
template_name = _category_to_template_name(category_key)
# Parse material mapping sheet if present
material_mappings = _parse_material_mapping(wb)
return ParsedExcel(
filename=file_path.name,
category_key=category_key,
template_name=template_name,
column_headers=column_headers,
rows=parsed_rows,
warnings=warnings,
material_mappings=material_mappings,
)
def _category_to_template_name(category_key: str | None) -> str | None:
names = {
"TRB": "Tapered Roller Bearings (TRB)",
"Kugellager": "Kugellager (Ball Bearings)",
"Gleitlager": "Gleitlager (Plain Bearings)",
"CRB": "Cylindrical Roller Bearings (CRB)",
"Linear_schiene": "Linear Guide Rails",
"Anschlagplatten": "End Plates (Anschlagplatten)",
"SRB_TORB": "Spherical / Toroidal Roller Bearings (SRB/TORB)",
}
return names.get(category_key) if category_key else None
# ---------------------------------------------------------------------------
# Serialisation helpers (convert dataclasses → plain dicts for API)
# ---------------------------------------------------------------------------
def parsed_row_to_dict(pr: ParsedRow) -> dict:
return {
"row_index": pr.row_index,
"ebene1": pr.ebene1,
"ebene2": pr.ebene2,
"baureihe": pr.baureihe,
"pim_id": pr.pim_id,
"produkt_baureihe": pr.produkt_baureihe,
"gewaehltes_produkt": pr.gewaehltes_produkt,
"name_cad_modell": pr.name_cad_modell,
"gewuenschte_bildnummer": pr.gewuenschte_bildnummer,
"lagertyp": pr.lagertyp,
"medias_rendering": pr.medias_rendering,
"category_key": pr.category_key,
"arbeitspaket": pr.arbeitspaket,
"components": [
{
"part_name": c.part_name,
"material": c.material,
"component_type": c.component_type,
"column_index": c.column_index,
}
for c in pr.components
],
}
def parsed_excel_to_dict(pe: ParsedExcel) -> dict:
return {
"filename": pe.filename,
"category_key": pe.category_key,
"template_name": pe.template_name,
"row_count": len(pe.rows),
"column_headers": pe.column_headers,
"rows": [parsed_row_to_dict(r) for r in pe.rows],
"warnings": pe.warnings,
"material_mappings": pe.material_mappings,
}