refactor(B1): migrate to domain-driven project structure

Move all models/schemas/services/routers into app/domains/.
Keep backward-compat shims in old locations for imports.
Preserves domains/rendering/tasks.py from Phase A.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-06 16:24:11 +01:00
parent 82bf46725b
commit b87df4a3e5
69 changed files with 1729 additions and 1831 deletions
+3 -143
View File
@@ -1,143 +1,3 @@
"""Material alias resolution service.
Used from Celery tasks (sync context) to resolve raw material names
(from Excel / user input) to SCHAEFFLER library material names via aliases.
Resolution chain:
1. Exact Material.name match (case-insensitive) → use it
2. MaterialAlias lookup (case-insensitive) → use alias.material.name
3. Pass through unchanged → Blender will show FailedMaterial magenta
"""
import logging
from sqlalchemy import create_engine, select, func
from sqlalchemy.orm import Session, selectinload
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.material import Material
from app.models.material_alias import MaterialAlias
logger = logging.getLogger(__name__)
_engine = None
def _get_engine():
global _engine
if _engine is None:
from app.config import settings as app_settings
_engine = create_engine(app_settings.database_url_sync)
return _engine
def resolve_material_map(raw_map: dict[str, str]) -> dict[str, str]:
"""Resolve raw material names to SCHAEFFLER library names via aliases.
For each value in raw_map:
1. If it already matches a Material.name (case-insensitive) → keep as-is (use canonical name)
2. Else look up MaterialAlias.alias (case-insensitive) → return alias.material.name
3. Else keep original (Blender will use FailedMaterial fallback)
Returns a new dict with the same keys but resolved material names.
"""
if not raw_map:
return raw_map
engine = _get_engine()
with Session(engine) as session:
# Load all materials
materials = session.execute(
select(Material).options(selectinload(Material.aliases))
).scalars().all()
# Build lookup dicts (case-insensitive)
# material name (lower) → canonical Material.name
name_lookup: dict[str, str] = {}
# alias (lower) → Material.name
alias_lookup: dict[str, str] = {}
for mat in materials:
name_lookup[mat.name.lower()] = mat.name
for a in mat.aliases:
alias_lookup[a.alias.lower()] = mat.name
resolved = {}
for part_name, raw_material in raw_map.items():
raw_lower = raw_material.lower()
# 1. Alias lookup first — aliases explicitly map intermediate/display names
# to the canonical SCHAEFFLER library names (e.g. "Steel--Stahl" →
# "SCHAEFFLER_010101_Steel-Bare"). This must take priority over the
# direct name match so that intermediate names are properly redirected.
if raw_lower in alias_lookup:
target = alias_lookup[raw_lower]
logger.info("resolved '%s''%s' (alias match)", raw_material, target)
resolved[part_name] = target
continue
# 2. Exact material name match (canonical name used as-is)
if raw_lower in name_lookup:
canonical = name_lookup[raw_lower]
if canonical != raw_material:
logger.info("resolved '%s''%s' (exact name match)", raw_material, canonical)
resolved[part_name] = canonical
continue
# 3. Pass through unchanged
logger.warning("no material match for '%s' — will use FailedMaterial fallback", raw_material)
resolved[part_name] = raw_material
return resolved
async def seed_material_aliases_from_mappings(
db: AsyncSession, mappings: list[dict]
) -> dict:
"""Seed material aliases from Excel materialmapping sheet.
For each {display_name, render_name}:
- Find or create Material by render_name
- Add display_name as alias if not already present
Returns {"created": N, "skipped": N}.
"""
created = 0
skipped = 0
for mapping in mappings:
display_name = mapping.get("display_name", "").strip()
render_name = mapping.get("render_name", "").strip()
if not display_name or not render_name:
skipped += 1
continue
# Find or create Material by render_name
result = await db.execute(
select(Material).where(func.lower(Material.name) == render_name.lower())
)
material = result.scalar_one_or_none()
if material is None:
material = Material(name=render_name, source="excel_mapping")
db.add(material)
await db.flush()
# Check if alias already exists
alias_result = await db.execute(
select(MaterialAlias).where(
func.lower(MaterialAlias.alias) == display_name.lower()
)
)
existing_alias = alias_result.scalar_one_or_none()
if existing_alias:
skipped += 1
continue
# Create alias
alias = MaterialAlias(material_id=material.id, alias=display_name)
db.add(alias)
created += 1
if created > 0:
await db.flush()
return {"created": created, "skipped": skipped}
# Compat shim — use app.domains.materials.service instead
from app.domains.materials.service import resolve_material_map, seed_material_aliases_from_mappings
__all__ = ["resolve_material_map", "seed_material_aliases_from_mappings"]
+3 -84
View File
@@ -1,84 +1,3 @@
"""Notification emission helpers.
Provides async (for routers) and sync (for Celery tasks) entry points
to create notification rows in the audit_log table.
"""
import logging
import uuid
from datetime import datetime
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.audit_log import AuditLog
logger = logging.getLogger(__name__)
_engine = None
def _get_engine():
global _engine
if _engine is None:
from app.config import settings as app_settings
_engine = create_engine(app_settings.database_url_sync)
return _engine
async def emit_notification(
db: AsyncSession,
*,
actor_user_id: str | uuid.UUID | None = None,
target_user_id: str | uuid.UUID | None = None,
action: str,
entity_type: str | None = None,
entity_id: str | None = None,
details: dict | None = None,
) -> None:
"""Create a notification (async — for use inside FastAPI routers)."""
try:
entry = AuditLog(
user_id=str(actor_user_id) if actor_user_id else None,
target_user_id=str(target_user_id) if target_user_id else None,
action=action,
entity_type=entity_type,
entity_id=str(entity_id) if entity_id else None,
details=details,
notification=True,
timestamp=datetime.utcnow(),
)
db.add(entry)
await db.commit()
except Exception:
logger.exception("Failed to emit notification (async)")
await db.rollback()
def emit_notification_sync(
*,
actor_user_id: str | uuid.UUID | None = None,
target_user_id: str | uuid.UUID | None = None,
action: str,
entity_type: str | None = None,
entity_id: str | None = None,
details: dict | None = None,
) -> None:
"""Create a notification (sync — for use inside Celery tasks)."""
engine = _get_engine()
try:
with Session(engine) as session:
entry = AuditLog(
user_id=str(actor_user_id) if actor_user_id else None,
target_user_id=str(target_user_id) if target_user_id else None,
action=action,
entity_type=entity_type,
entity_id=str(entity_id) if entity_id else None,
details=details,
notification=True,
timestamp=datetime.utcnow(),
)
session.add(entry)
session.commit()
except Exception:
logger.exception("Failed to emit notification (sync)")
# Compat shim — use app.domains.notifications.service instead
from app.domains.notifications.service import emit_notification, emit_notification_sync
__all__ = ["emit_notification", "emit_notification_sync"]
+3 -22
View File
@@ -1,22 +1,3 @@
"""Order number generation and business logic."""
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from app.models.order import Order
async def generate_order_number(db: AsyncSession) -> str:
"""Generate next sequential order number: SA-2026-XXXXX."""
year = datetime.utcnow().year
prefix = f"SA-{year}-"
# Use MAX to find the highest existing sequence number this year.
# COUNT-based approach breaks when orders are deleted (produces duplicates).
result = await db.execute(
select(func.max(Order.order_number)).where(Order.order_number.like(f"{prefix}%"))
)
max_num = result.scalar()
if max_num:
last_seq = int(max_num.split("-")[-1])
return f"{prefix}{last_seq + 1:05d}"
return f"{prefix}00001"
# Compat shim — use app.domains.orders.service instead
from app.domains.orders.service import generate_order_number, check_order_completion
__all__ = ["generate_order_number", "check_order_completion"]
+3 -86
View File
@@ -1,86 +1,3 @@
"""Service to auto-advance order status when all renders complete."""
import logging
from datetime import datetime
from sqlalchemy import create_engine, select, update as sql_update
from sqlalchemy.orm import Session
from app.models.order import Order, OrderStatus
from app.models.order_line import OrderLine
logger = logging.getLogger(__name__)
def check_order_completion(order_id: str) -> bool:
"""If all renderable lines are done, auto-advance order to completed.
Called from Celery tasks (sync context).
Returns True if the order was advanced to completed.
"""
from app.config import settings as app_settings
sync_url = app_settings.database_url.replace("+asyncpg", "")
engine = create_engine(sync_url)
try:
with Session(engine) as session:
# Get all lines that have an output type (i.e. renderable)
lines = session.execute(
select(OrderLine).where(
OrderLine.order_id == order_id,
OrderLine.output_type_id.isnot(None),
)
).scalars().all()
if not lines:
return False
# Check if all renderable lines are in a terminal state
all_terminal = all(
line.render_status in ("completed", "failed", "cancelled")
for line in lines
)
if not all_terminal:
return False
# Check order is still in processing state
order = session.execute(
select(Order).where(Order.id == order_id)
).scalar_one_or_none()
if order is None or order.status != OrderStatus.processing:
return False
# Auto-advance to completed
now = datetime.utcnow()
session.execute(
sql_update(Order)
.where(Order.id == order_id)
.values(
status=OrderStatus.completed,
completed_at=now,
updated_at=now,
)
)
session.commit()
logger.info(f"Order {order_id} auto-advanced to completed (all {len(lines)} lines done)")
# Notify order creator
try:
from app.services.notification_service import emit_notification_sync
emit_notification_sync(
actor_user_id=None,
target_user_id=str(order.created_by),
action="order.completed",
entity_type="order",
entity_id=str(order_id),
details={"order_number": order.order_number},
)
except Exception:
logger.exception("Failed to emit order.completed notification")
return True
finally:
engine.dispose()
# Compat shim — use app.domains.orders.service instead
from app.domains.orders.service import check_order_completion
__all__ = ["check_order_completion"]
+8 -232
View File
@@ -1,232 +1,8 @@
"""Pricing service — price lookup and order price computation.
Price resolution cascade for order lines:
1. OutputType's linked pricing_tier (if active) → use its price_per_item
2. Product's category_key → look up PricingTier by category
3. "default" category tier → global fallback
4. None if nothing configured
"""
from decimal import Decimal
from typing import Any
from sqlalchemy import select, update as sql_update
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models.pricing_tier import PricingTier
async def get_price_for(
db: AsyncSession,
category_key: str,
quality_level: str = "Normal",
) -> Decimal | None:
"""Return price_per_item for the given category + quality level.
Falls back to category_key='default' if no exact match is found.
Returns None if nothing is configured.
"""
# 1. Exact match
result = await db.execute(
select(PricingTier).where(
PricingTier.category_key == category_key,
PricingTier.quality_level == quality_level,
PricingTier.is_active.is_(True),
)
)
tier = result.scalar_one_or_none()
if tier is not None:
return tier.price_per_item
if category_key == "default":
return None
# 2. Fallback: default category
result = await db.execute(
select(PricingTier).where(
PricingTier.category_key == "default",
PricingTier.quality_level == quality_level,
PricingTier.is_active.is_(True),
)
)
tier = result.scalar_one_or_none()
return tier.price_per_item if tier is not None else None
async def resolve_line_price(
db: AsyncSession,
output_type_id: str | None,
product_category_key: str | None,
) -> Decimal | None:
"""Resolve the unit price for a single order line using the cascade.
1. OutputType's linked pricing_tier (if active)
2. Product's category_key → PricingTier by category
3. "default" category tier → global fallback
4. None
"""
if output_type_id is not None:
from app.models.output_type import OutputType
result = await db.execute(
select(OutputType)
.options(selectinload(OutputType.pricing_tier))
.where(OutputType.id == output_type_id)
)
ot = result.scalar_one_or_none()
if ot and ot.pricing_tier and ot.pricing_tier.is_active:
return ot.pricing_tier.price_per_item
# Step 2+3: category lookup with default fallback
cat = product_category_key or "default"
return await get_price_for(db, cat)
async def estimate_order_price(
db: AsyncSession,
lines: list[dict[str, Any]],
) -> dict:
"""Estimate price for a list of prospective order lines.
Each line dict should have: product_id, output_type_id.
Returns {total, line_count, breakdown: [{output_type_id, product_id, unit_price}], has_unpriced}.
"""
from app.models.product import Product
breakdown: list[dict] = []
total = Decimal("0.00")
has_unpriced = False
for line in lines:
product_id = line.get("product_id")
output_type_id = line.get("output_type_id")
# Get product category
cat = None
if product_id:
prod_result = await db.execute(
select(Product).where(Product.id == product_id)
)
prod = prod_result.scalar_one_or_none()
if prod:
cat = prod.category_key
price = await resolve_line_price(db, output_type_id, cat)
breakdown.append({
"output_type_id": str(output_type_id) if output_type_id else None,
"product_id": str(product_id) if product_id else None,
"unit_price": float(price) if price is not None else None,
})
if price is not None:
total += price
else:
has_unpriced = True
return {
"total": float(total),
"line_count": len(lines),
"breakdown": breakdown,
"has_unpriced": has_unpriced,
}
async def compute_order_estimated_price(
db: AsyncSession,
order,
items,
quality_level: str = "Normal",
) -> Decimal | None:
"""Compute estimated price for an order based on rendering items.
Returns None if no pricing is configured, or Decimal('0.00') if there
are no rendering items.
"""
rendering_count = sum(1 for i in items if i.medias_rendering)
if rendering_count == 0:
return Decimal("0.00")
# Resolve category from template
category_key = "default"
if order.template_id is not None:
from app.models.template import Template
tmpl_result = await db.execute(
select(Template).where(Template.id == order.template_id)
)
tmpl = tmpl_result.scalar_one_or_none()
if tmpl and tmpl.category_key:
category_key = tmpl.category_key
unit_price = await get_price_for(db, category_key, quality_level)
if unit_price is None:
return None
return unit_price * rendering_count
async def refresh_order_price(db: AsyncSession, order_id) -> Decimal | None:
"""Re-fetch order + lines, resolve per-line prices, snapshot to unit_price, update order total."""
from app.models.order import Order
from app.models.order_line import OrderLine
from app.models.output_type import OutputType
from app.models.product import Product
order_result = await db.execute(select(Order).where(Order.id == order_id))
order = order_result.scalar_one_or_none()
if order is None:
return None
lines_result = await db.execute(
select(OrderLine)
.options(
selectinload(OrderLine.output_type).selectinload(OutputType.pricing_tier),
selectinload(OrderLine.product),
)
.where(
OrderLine.order_id == order_id,
OrderLine.output_type_id.is_not(None),
)
)
lines = lines_result.scalars().all()
if not lines:
await db.execute(
sql_update(Order)
.where(Order.id == order_id)
.values(estimated_price=Decimal("0.00"))
)
await db.commit()
return Decimal("0.00")
total = Decimal("0.00")
any_priced = False
for line in lines:
# Cascade: 1) OT pricing tier, 2) product category, 3) default
price = None
if line.output_type and line.output_type.pricing_tier and line.output_type.pricing_tier.is_active:
price = line.output_type.pricing_tier.price_per_item
else:
cat = line.product.category_key if line.product else None
price = await get_price_for(db, cat or "default")
# Snapshot to line
await db.execute(
sql_update(OrderLine)
.where(OrderLine.id == line.id)
.values(unit_price=price)
)
if price is not None:
total += price
any_priced = True
new_price = total if any_priced else None
await db.execute(
sql_update(Order)
.where(Order.id == order_id)
.values(estimated_price=new_price)
)
await db.commit()
return new_price
# Compat shim — use app.domains.billing.service instead
from app.domains.billing.service import (
get_price_for,
resolve_line_price,
estimate_order_price,
refresh_order_price,
)
__all__ = ["get_price_for", "resolve_line_price", "estimate_order_price", "refresh_order_price"]
+14 -142
View File
@@ -1,143 +1,15 @@
"""Product service — lookup/create products, link CAD files."""
import uuid
from sqlalchemy import select, func, update as sql_update
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.product import Product
# Default render positions added to every newly created product.
DEFAULT_RENDER_POSITIONS = [
{"name": "3/4 Front", "rotation_x": -15.0, "rotation_y": 45.0, "rotation_z": 0.0, "is_default": True, "sort_order": 0},
{"name": "3/4 Rear", "rotation_x": -15.0, "rotation_y": -135.0, "rotation_z": 0.0, "is_default": False, "sort_order": 1},
{"name": "Default", "rotation_x": 0.0, "rotation_y": 0.0, "rotation_z": 0.0, "is_default": False, "sort_order": 2},
# Compat shim — use app.domains.products.service instead
from app.domains.products.service import (
create_default_positions,
lookup_product,
lookup_or_create_product,
link_cad_to_product,
DEFAULT_RENDER_POSITIONS,
)
__all__ = [
"create_default_positions",
"lookup_product",
"lookup_or_create_product",
"link_cad_to_product",
"DEFAULT_RENDER_POSITIONS",
]
async def create_default_positions(db: AsyncSession, product_id: uuid.UUID) -> None:
"""Insert the default render positions for a newly created product."""
from app.models.render_position import ProductRenderPosition
for pos_data in DEFAULT_RENDER_POSITIONS:
db.add(ProductRenderPosition(product_id=product_id, **pos_data))
await db.flush()
def _fill_missing_fields(product: Product, pim_id: str | None, fields: dict) -> None:
"""Fill in null/empty fields on an existing product without overwriting manual edits."""
if pim_id and not product.pim_id:
product.pim_id = pim_id
for attr in (
"name", "category_key", "ebene1", "ebene2", "baureihe",
"lagertyp", "name_cad_modell", "arbeitspaket",
):
if fields.get(attr) and not getattr(product, attr, None):
setattr(product, attr, fields[attr])
# Update medias_rendering if not set
if fields.get("medias_rendering") is not None and product.medias_rendering is None:
product.medias_rendering = fields["medias_rendering"]
# Always update components from the latest Excel import (needed for auto-reassign)
if fields.get("components"):
product.components = fields["components"]
async def lookup_product(
db: AsyncSession, pim_id: str | None, produkt_baureihe: str | None
) -> Product | None:
"""Read-only lookup: produkt_baureihe (primary), then pim_id (fallback).
Same cascade as lookup_or_create_product but never creates or mutates.
"""
if produkt_baureihe:
result = await db.execute(
select(Product).where(
func.lower(Product.produkt_baureihe) == produkt_baureihe.lower(),
Product.is_active.is_(True),
)
)
product = result.scalar_one_or_none()
if product is not None:
return product
# baureihe provided but not found — skip pim_id fallback (same logic)
return None
if pim_id:
result = await db.execute(
select(Product).where(Product.pim_id == pim_id, Product.is_active.is_(True))
)
return result.scalar_one_or_none()
return None
async def lookup_or_create_product(
db: AsyncSession, pim_id: str | None, fields: dict
) -> tuple[Product, bool]:
"""Look up by produkt_baureihe (primary), then pim_id (fallback). Create if not found.
Returns (product, was_created).
Does NOT overwrite existing fields — preserves manual edits.
"""
produkt_baureihe = fields.get("produkt_baureihe")
# Primary lookup: by produkt_baureihe (case-insensitive)
if produkt_baureihe:
result = await db.execute(
select(Product).where(
func.lower(Product.produkt_baureihe) == produkt_baureihe.lower(),
Product.is_active.is_(True),
)
)
product = result.scalar_one_or_none()
if product is not None:
_fill_missing_fields(product, pim_id, fields)
await db.flush()
return product, False
# produkt_baureihe was provided but not found — each baureihe is a
# distinct product, so skip the pim_id fallback and create a new one.
# Fallback lookup: by pim_id (only when produkt_baureihe is absent,
# e.g. old per-category Excel files that don't have a Baureihe column).
if not produkt_baureihe and pim_id:
result = await db.execute(
select(Product).where(Product.pim_id == pim_id, Product.is_active.is_(True))
)
product = result.scalar_one_or_none()
if product is not None:
_fill_missing_fields(product, pim_id, fields)
await db.flush()
return product, False
product = Product(
pim_id=pim_id or f"auto-{uuid.uuid4().hex[:8]}",
name=fields.get("name"),
category_key=fields.get("category_key"),
ebene1=fields.get("ebene1"),
ebene2=fields.get("ebene2"),
baureihe=fields.get("baureihe"),
produkt_baureihe=produkt_baureihe,
lagertyp=fields.get("lagertyp"),
name_cad_modell=fields.get("name_cad_modell"),
arbeitspaket=fields.get("arbeitspaket"),
components=fields.get("components", []),
cad_part_materials=fields.get("cad_part_materials", []),
source_excel=fields.get("source_excel"),
)
db.add(product)
await db.flush()
await create_default_positions(db, product.id)
return product, True
async def link_cad_to_product(
db: AsyncSession, product_id: uuid.UUID, cad_file_id: uuid.UUID
) -> Product:
"""Set product.cad_file_id via direct SQL UPDATE."""
await db.execute(
sql_update(Product)
.where(Product.id == product_id)
.values(cad_file_id=cad_file_id)
)
await db.commit()
result = await db.execute(select(Product).where(Product.id == product_id))
return result.scalar_one()
+3 -96
View File
@@ -1,96 +1,3 @@
"""Render dispatcher — routes render jobs to Celery.
All renders run via Celery workers (Flamenco removed in v2 refactor).
"""
import logging
from datetime import datetime
from sqlalchemy import select, update as sql_update
from sqlalchemy.orm import Session, joinedload
from app.models.order_line import OrderLine
from app.models.product import Product
from app.models.system_setting import SystemSetting
logger = logging.getLogger(__name__)
def _load_setting(session: Session, key: str, default: str = "") -> str:
"""Load a single system setting (sync)."""
row = session.execute(
select(SystemSetting).where(SystemSetting.key == key)
).scalar_one_or_none()
return row.value if row else default
def dispatch_render(order_line_id: str) -> dict:
"""Dispatch a render job to Celery.
Must be called from a sync context (Celery task or sync wrapper).
Returns {"backend": "celery", "job_ref": str}.
"""
from app.config import settings as app_settings
from app.services.render_log import emit, clear
clear(order_line_id)
emit(order_line_id, "Dispatch started — loading order line data")
sync_url = app_settings.database_url.replace("+asyncpg", "")
from sqlalchemy import create_engine
engine_db = create_engine(sync_url)
with Session(engine_db) as session:
line = session.execute(
select(OrderLine)
.where(OrderLine.id == order_line_id)
.options(
joinedload(OrderLine.product).joinedload(Product.cad_file),
joinedload(OrderLine.output_type),
)
).scalar_one_or_none()
if line is None:
emit(order_line_id, "Order line not found", "error")
logger.error(f"OrderLine {order_line_id} not found")
return {"backend": "none", "job_ref": "", "error": "not_found"}
product_name = line.product.name or line.product.pim_id or "unknown"
output_name = line.output_type.name if line.output_type else "default"
emit(order_line_id, f"Product: {product_name} | Output: {output_name}")
if line.product.cad_file_id is None:
emit(order_line_id, "Product has no CAD file — marking as failed", "error")
logger.warning(f"OrderLine {order_line_id}: product has no CAD file")
session.execute(
sql_update(OrderLine)
.where(OrderLine.id == line.id)
.values(render_status="failed")
)
session.commit()
return {"backend": "none", "job_ref": "", "error": "no_cad_file"}
cad_name = line.product.cad_file.original_name if line.product.cad_file else "?"
emit(order_line_id, f"CAD file: {cad_name}")
emit(order_line_id, "Dispatching to Celery render worker")
now = datetime.utcnow()
session.execute(
sql_update(OrderLine)
.where(OrderLine.id == line.id)
.values(
render_status="processing",
render_backend_used="celery",
render_started_at=now,
)
)
session.commit()
engine_db.dispose()
return _dispatch_celery(order_line_id)
def _dispatch_celery(order_line_id: str) -> dict:
"""Dispatch to the Celery render task."""
from app.tasks.step_tasks import render_order_line_task
result = render_order_line_task.delay(order_line_id)
return {"backend": "celery", "job_ref": result.id}
# Compat shim — use app.domains.rendering.service instead
from app.domains.rendering.service import dispatch_render
__all__ = ["dispatch_render"]
+3 -102
View File
@@ -1,102 +1,3 @@
"""Render template resolution service.
Used from Celery tasks (sync context) to find the best matching .blend template
for a given category + output type combination.
Cascade priority (first active match wins):
1. Exact: category_key + output_type_id
2. Category only: category_key + output_type_id IS NULL
3. OT only: category_key IS NULL + output_type_id
4. Global: both NULL
5. No template → caller falls back to factory-settings behavior
"""
import logging
from sqlalchemy import create_engine, select, and_
from sqlalchemy.orm import Session
from app.models.render_template import RenderTemplate
from app.models.system_setting import SystemSetting
logger = logging.getLogger(__name__)
_engine = None
def _get_engine():
global _engine
if _engine is None:
from app.config import settings as app_settings
_engine = create_engine(app_settings.database_url_sync)
return _engine
def resolve_template(
category_key: str | None = None,
output_type_id: str | None = None,
) -> RenderTemplate | None:
"""Find the best matching active render template.
Uses sync SQLAlchemy — safe for Celery tasks.
"""
engine = _get_engine()
with Session(engine) as session:
active = RenderTemplate.is_active == True # noqa: E712
# 1. Exact match
if category_key and output_type_id:
row = session.execute(
select(RenderTemplate).where(and_(
active,
RenderTemplate.category_key == category_key,
RenderTemplate.output_type_id == output_type_id,
))
).scalar_one_or_none()
if row:
return row
# 2. Category only
if category_key:
row = session.execute(
select(RenderTemplate).where(and_(
active,
RenderTemplate.category_key == category_key,
RenderTemplate.output_type_id.is_(None),
))
).scalar_one_or_none()
if row:
return row
# 3. OT only
if output_type_id:
row = session.execute(
select(RenderTemplate).where(and_(
active,
RenderTemplate.category_key.is_(None),
RenderTemplate.output_type_id == output_type_id,
))
).scalar_one_or_none()
if row:
return row
# 4. Global fallback (both NULL)
row = session.execute(
select(RenderTemplate).where(and_(
active,
RenderTemplate.category_key.is_(None),
RenderTemplate.output_type_id.is_(None),
))
).scalar_one_or_none()
return row
def get_material_library_path() -> str | None:
"""Read material_library_path from system_settings. Returns None if empty."""
engine = _get_engine()
with Session(engine) as session:
row = session.execute(
select(SystemSetting).where(SystemSetting.key == "material_library_path")
).scalar_one_or_none()
if row and row.value and row.value.strip():
return row.value.strip()
return None
# Compat shim — use app.domains.rendering.service instead
from app.domains.rendering.service import resolve_template, get_material_library_path
__all__ = ["resolve_template", "get_material_library_path"]