Files
HartOMat/backend/app/services/part_key_service.py
T

313 lines
11 KiB
Python

"""Part key generation and scene manifest building for the USD pipeline.
The `resolved_material_assignments` JSONB schema written by `generate_usd_master_task`:
{part_key: {"source_name": str, "prim_path": str}}
The `manual_material_overrides` JSONB schema written by `PUT /cad/{id}/part-materials` (Priority 4):
{part_key: material_name_str}
The `source_material_assignments` JSONB schema written by the Excel importer (future):
{source_part_name: material_name_str}
No pxr imports — all data is read from JSONB columns, never from USD files directly.
"""
from __future__ import annotations
import hashlib
import re
# ── Part key generation ───────────────────────────────────────────────────────
_AF_RE = re.compile(r'_AF\d+$', re.IGNORECASE)
_AF_VARIANT_RE = re.compile(r"_AF\d+(_ASM)?_?$", re.IGNORECASE)
_LEGACY_MATERIAL_PREFIX = "SCHAEFFLER_"
_CURRENT_MATERIAL_PREFIX = "HARTOMAT_"
def generate_part_key(
xcaf_label_path: str,
source_name: str,
existing_keys: set[str] | None = None,
) -> str:
"""Deterministic slug from source_name, max 64 chars, unique within assembly.
- Strips `_AF\\d+` OCC suffix from source_name before slugifying.
- Falls back to sha256 digest of xcaf_label_path if slug is empty.
- Deduplicates by appending _2, _3, ... if existing_keys is provided.
"""
base = _AF_RE.sub('', source_name) if source_name else ''
# Split camelCase before slugifying: "RingOuter" → "Ring_Outer"
base = re.sub(r'([a-z])([A-Z])', r'\1_\2', base)
slug = re.sub(r'[^a-z0-9]+', '_', base.lower()).strip('_')
if not slug:
slug = f"part_{hashlib.sha256(xcaf_label_path.encode()).hexdigest()[:8]}"
slug = slug[:50]
if existing_keys is None:
return slug
key = slug
n = 2
while key in existing_keys:
key = f"{slug}_{n}"
n += 1
existing_keys.add(key)
return key
def normalize_material_name(material_name: str | None) -> str | None:
"""Normalize persisted legacy material names to the current HartOMat prefix."""
if not isinstance(material_name, str):
return None
value = material_name.strip()
if not value:
return None
if value.upper().startswith(_LEGACY_MATERIAL_PREFIX):
return f"{_CURRENT_MATERIAL_PREFIX}{value[len(_LEGACY_MATERIAL_PREFIX):]}"
return value
def _normalize_semantic_source_name(raw_name: str) -> str:
"""Collapse exporter-only suffixes back to their semantic OCC source name."""
name = (raw_name or "").strip()
name = re.sub(r"\.\d{3}$", "", name)
previous = None
while previous != name:
previous = name
name = _AF_VARIANT_RE.sub("", name)
return name
def _slugify_semantic_source_name(raw_name: str) -> str:
base = _normalize_semantic_source_name(raw_name)
base = re.sub(r"([a-z])([A-Z])", r"\1_\2", base)
return re.sub(r"[^a-z0-9]+", "_", base.lower()).strip("_")[:50]
def _derive_semantic_alias_key(part_key: str, source_name: str) -> str | None:
"""Return the semantic alias for deduplicated instance keys, if any."""
alias_key = _slugify_semantic_source_name(source_name)
if not alias_key or alias_key == part_key:
return None
if re.fullmatch(
rf"{re.escape(alias_key)}(?:_[2-9]\d*|_af\d+(?:_asm)?)",
part_key,
flags=re.IGNORECASE,
) is None:
return None
return alias_key
def _alias_priority(part_key: str, source_name: str) -> tuple[int, int, int]:
match = re.fullmatch(r".+_(\d+)$", part_key)
suffix_number = int(match.group(1)) if match else 1_000_000
return (suffix_number, len(source_name or ""), len(part_key))
def _iter_lookup_keys(part_key: str, fallback_part_keys: tuple[str, ...] = ()) -> tuple[str, ...]:
ordered_keys: list[str] = []
for key in (part_key, *fallback_part_keys):
if key and key not in ordered_keys:
ordered_keys.append(key)
return tuple(ordered_keys)
def _build_part_entry(
*,
part_key: str,
source_name: str,
prim_path: str | None,
manual: dict,
resolved: dict,
source: dict,
fallback_part_keys: tuple[str, ...] = (),
) -> dict:
effective_material, provenance = _resolve_material(
part_key,
source_name,
manual,
resolved,
source,
fallback_part_keys=fallback_part_keys,
)
is_unassigned = effective_material is None
return {
"part_key": part_key,
"source_name": source_name,
"prim_path": prim_path,
"effective_material": effective_material,
"assignment_provenance": provenance,
"is_unassigned": is_unassigned,
}
# ── Scene manifest building ───────────────────────────────────────────────────
def build_scene_manifest(cad_file, usd_asset=None) -> dict:
"""Build a scene manifest dict from CadFile ORM object.
Source of part list (priority order):
1. `resolved_material_assignments` — keyed by partKey (set by generate_usd_master_task)
2. `parsed_objects["objects"]` — list of source name strings from STEP extraction
3. Empty manifest if neither is available.
Material assignment priority per part:
1. `manual_material_overrides[part_key]` — provenance "manual"
2. `resolved_material_assignments[part_key]["canonical_material"]` (or legacy
`["material"]`) — provenance "auto"
3. substring match in `source_material_assignments` against source_name — provenance "source"
4. None, is_unassigned=True — provenance "default"
"""
cad_id = str(cad_file.id)
resolved = cad_file.resolved_material_assignments or {}
manual = cad_file.manual_material_overrides or {}
source = cad_file.source_material_assignments or {}
parts: list[dict] = []
unmatched_source_rows: list[str] = []
unassigned_parts: list[str] = []
if resolved:
# Build from resolved assignments (USD pipeline has run)
alias_candidates: dict[str, tuple[tuple[int, int, int], dict]] = {}
for part_key, meta in resolved.items():
source_name = meta.get("source_name", "") if isinstance(meta, dict) else ""
prim_path = meta.get("prim_path") if isinstance(meta, dict) else None
part_entry = _build_part_entry(
part_key=part_key,
source_name=source_name,
prim_path=prim_path,
manual=manual,
resolved=resolved,
source=source,
)
parts.append(part_entry)
if part_entry["is_unassigned"]:
unassigned_parts.append(part_key)
alias_key = _derive_semantic_alias_key(part_key, source_name)
if alias_key is None or alias_key in resolved:
continue
candidate = {
"part_key": alias_key,
"source_name": source_name,
"prim_path": prim_path,
"fallback_part_keys": (part_key,),
}
candidate_priority = _alias_priority(part_key, source_name)
current = alias_candidates.get(alias_key)
if current is None or candidate_priority < current[0]:
alias_candidates[alias_key] = (candidate_priority, candidate)
for alias_key, (_, candidate) in alias_candidates.items():
alias_entry = _build_part_entry(
part_key=candidate["part_key"],
source_name=candidate["source_name"],
prim_path=candidate["prim_path"],
manual=manual,
resolved=resolved,
source=source,
fallback_part_keys=candidate["fallback_part_keys"],
)
parts.append(alias_entry)
if alias_entry["is_unassigned"]:
unassigned_parts.append(alias_key)
elif cad_file.parsed_objects:
# Fall back to parsed_objects from STEP extraction
object_names: list[str] = cad_file.parsed_objects.get("objects") or []
seen_keys: set[str] = set()
for source_name in object_names:
# Fallback: USD master not yet generated. Use source_name as xcaf_path proxy.
# Note: slugs produced here may differ from what export_step_to_usd.py will
# produce for unnamed parts (which use sha256 of the XCAF hierarchy path).
# Named parts will match once USD master is generated.
part_key = generate_part_key(source_name, source_name, seen_keys)
effective_material, provenance = _resolve_material(
part_key, source_name, manual, resolved, source
)
is_unassigned = effective_material is None
parts.append({
"part_key": part_key,
"source_name": source_name,
"prim_path": None,
"effective_material": effective_material,
"assignment_provenance": provenance,
"is_unassigned": is_unassigned,
})
if is_unassigned:
unassigned_parts.append(part_key)
# Find source rows not matched to any part
matched_source_names = {p["source_name"].lower() for p in parts}
for src_key in source:
if not any(
src_key.lower() in sn or sn in src_key.lower()
for sn in matched_source_names
):
unmatched_source_rows.append(src_key)
return {
"cad_file_id": cad_id,
"parts": parts,
"unmatched_source_rows": unmatched_source_rows,
"unassigned_parts": unassigned_parts,
}
def _resolve_material(
part_key: str,
source_name: str,
manual: dict,
resolved: dict,
source: dict,
fallback_part_keys: tuple[str, ...] = (),
) -> tuple[str | None, str]:
"""Return (material_name, provenance) for one part using priority order."""
lookup_keys = _iter_lookup_keys(part_key, fallback_part_keys)
# 1. Manual override
for lookup_key in lookup_keys:
if lookup_key in manual and manual[lookup_key]:
return normalize_material_name(str(manual[lookup_key])), "manual"
# 2. Auto-resolved from USD pipeline
for lookup_key in lookup_keys:
meta = resolved.get(lookup_key)
if isinstance(meta, dict):
canonical = normalize_material_name(meta.get("canonical_material") or meta.get("material"))
if canonical:
return canonical, "auto"
# 3. Substring match in source_material_assignments against source_name
sn_lower = source_name.lower()
for src_key, src_mat in source.items():
if src_key.lower() in sn_lower or sn_lower in src_key.lower():
if src_mat:
return normalize_material_name(str(src_mat)), "source"
# 4. Unassigned
return None, "default"
# ── Effective assignments for render pipeline ─────────────────────────────────
def get_effective_assignments(cad_file) -> dict[str, str]:
"""Return {part_key: material_name} merged from all three layers.
Used by the render pipeline when building the material map (Priority 5).
"""
manifest = build_scene_manifest(cad_file)
return {
p["part_key"]: p["effective_material"]
for p in manifest["parts"]
if p["effective_material"] is not None
}