Files
HartOMat/backend/app/domains/billing/service.py
T
Hartmut c0ea60d984 fix: resolve open risks — invoice race condition, SMTP config, workflow seeds
- billing/service.py: pg_advisory_xact_lock on invoice_number_seq per year
  → prevents duplicate INV-YYYY-NNNN under concurrent requests
- admin.py: SMTP settings in system_settings (smtp_host/port/user/password/
  from_address/enabled) with GET+PUT support; seed-workflows endpoint creates
  4 standard workflow definitions (still-cycles, still-eevee, turntable,
  multi-angle) idempotently
- notifications/service.py: send_email_notification_stub now sends real
  SMTP email via smtplib when smtp_enabled=true in system_settings
- Admin.tsx: SMTP settings panel (host/port/user/password/from + enable
  toggle, save button); Seed Standard Workflows maintenance button
- Upload.tsx: fix TS error — title→aria-label on Lucide icons
- Admin.tsx Settings type: add render_backend/flamenco_* fields (TS fix)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-06 18:15:45 +01:00

379 lines
12 KiB
Python

"""Billing service — pricing (price lookup + order price computation) and invoice CRUD + PDF generation.
Price resolution cascade for order lines:
1. OutputType's linked pricing_tier (if active) → use its price_per_item
2. Product's category_key → look up PricingTier by category
3. "default" category tier → global fallback
4. None if nothing configured
"""
from __future__ import annotations
import logging
import os
import tempfile
import uuid
from datetime import date, datetime
from decimal import Decimal
from typing import Any
from sqlalchemy import func, select, update as sql_update
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.domains.billing.models import Invoice, InvoiceLine, PricingTier
logger = logging.getLogger(__name__)
async def get_price_for(
db: AsyncSession,
category_key: str,
quality_level: str = "Normal",
) -> Decimal | None:
"""Return price_per_item for the given category + quality level."""
# 1. Exact match
result = await db.execute(
select(PricingTier).where(
PricingTier.category_key == category_key,
PricingTier.quality_level == quality_level,
PricingTier.is_active.is_(True),
)
)
tier = result.scalar_one_or_none()
if tier is not None:
return tier.price_per_item
if category_key == "default":
return None
# 2. Fallback: default category
result = await db.execute(
select(PricingTier).where(
PricingTier.category_key == "default",
PricingTier.quality_level == quality_level,
PricingTier.is_active.is_(True),
)
)
tier = result.scalar_one_or_none()
return tier.price_per_item if tier is not None else None
async def resolve_line_price(
db: AsyncSession,
output_type_id: str | None,
product_category_key: str | None,
) -> Decimal | None:
"""Resolve the unit price for a single order line using the cascade."""
if output_type_id is not None:
from app.domains.rendering.models import OutputType
result = await db.execute(
select(OutputType)
.options(selectinload(OutputType.pricing_tier))
.where(OutputType.id == output_type_id)
)
ot = result.scalar_one_or_none()
if ot and ot.pricing_tier and ot.pricing_tier.is_active:
return ot.pricing_tier.price_per_item
# Step 2+3: category lookup with default fallback
cat = product_category_key or "default"
return await get_price_for(db, cat)
async def estimate_order_price(
db: AsyncSession,
lines: list[dict[str, Any]],
) -> dict:
"""Estimate price for a list of prospective order lines."""
from app.domains.products.models import Product
breakdown: list[dict] = []
total = Decimal("0.00")
has_unpriced = False
for line in lines:
product_id = line.get("product_id")
output_type_id = line.get("output_type_id")
# Get product category
cat = None
if product_id:
prod_result = await db.execute(
select(Product).where(Product.id == product_id)
)
prod = prod_result.scalar_one_or_none()
if prod:
cat = prod.category_key
price = await resolve_line_price(db, output_type_id, cat)
breakdown.append({
"output_type_id": str(output_type_id) if output_type_id else None,
"product_id": str(product_id) if product_id else None,
"unit_price": float(price) if price is not None else None,
})
if price is not None:
total += price
else:
has_unpriced = True
return {
"total": float(total),
"line_count": len(lines),
"breakdown": breakdown,
"has_unpriced": has_unpriced,
}
async def refresh_order_price(db: AsyncSession, order_id) -> Decimal | None:
"""Re-fetch order + lines, resolve per-line prices, snapshot to unit_price, update order total."""
from app.domains.orders.models import Order, OrderLine
from app.domains.rendering.models import OutputType
order_result = await db.execute(select(Order).where(Order.id == order_id))
order = order_result.scalar_one_or_none()
if order is None:
return None
lines_result = await db.execute(
select(OrderLine)
.options(
selectinload(OrderLine.output_type).selectinload(OutputType.pricing_tier),
selectinload(OrderLine.product),
)
.where(
OrderLine.order_id == order_id,
OrderLine.output_type_id.is_not(None),
)
)
lines = lines_result.scalars().all()
if not lines:
await db.execute(
sql_update(Order)
.where(Order.id == order_id)
.values(estimated_price=Decimal("0.00"))
)
await db.commit()
return Decimal("0.00")
total = Decimal("0.00")
any_priced = False
for line in lines:
# Cascade: 1) OT pricing tier, 2) product category, 3) default
price = None
if line.output_type and line.output_type.pricing_tier and line.output_type.pricing_tier.is_active:
price = line.output_type.pricing_tier.price_per_item
else:
cat = line.product.category_key if line.product else None
price = await get_price_for(db, cat or "default")
# Snapshot to line
await db.execute(
sql_update(OrderLine)
.where(OrderLine.id == line.id)
.values(unit_price=price)
)
if price is not None:
total += price
any_priced = True
new_price = total if any_priced else None
await db.execute(
sql_update(Order)
.where(Order.id == order_id)
.values(estimated_price=new_price)
)
await db.commit()
return new_price
# ---------------------------------------------------------------------------
# Invoice CRUD
# ---------------------------------------------------------------------------
VALID_STATUSES = {"draft", "sent", "paid", "cancelled"}
async def generate_invoice_number(db: AsyncSession, tenant_id: uuid.UUID | None) -> str:
"""Generate sequential invoice number: INV-YYYY-NNNN.
Uses a PostgreSQL advisory lock to prevent race conditions when multiple
requests create invoices concurrently. The lock is automatically released
when the surrounding transaction commits or rolls back.
"""
from sqlalchemy import text
year = datetime.utcnow().year
# Advisory lock keyed on year — serialises concurrent invoice creation
await db.execute(text("SELECT pg_advisory_xact_lock(hashtext(:key))"), {"key": f"invoice_number_seq_{year}"})
count_result = await db.execute(
select(func.count()).select_from(Invoice).where(
func.extract("year", Invoice.created_at) == year
)
)
seq = (count_result.scalar() or 0) + 1
return f"INV-{year}-{seq:04d}"
async def create_invoice(
db: AsyncSession,
tenant_id: uuid.UUID | None,
order_line_ids: list[uuid.UUID],
notes: str | None = None,
issued_at: date | None = None,
due_at: date | None = None,
vat_rate: Decimal = Decimal("0.19"),
currency: str = "EUR",
) -> Invoice:
"""Create invoice with lines derived from order lines."""
from app.domains.orders.models import OrderLine
invoice_number = await generate_invoice_number(db, tenant_id)
invoice = Invoice(
tenant_id=tenant_id,
invoice_number=invoice_number,
status="draft",
issued_at=issued_at or date.today(),
due_at=due_at,
notes=notes,
vat_rate=vat_rate,
currency=currency,
)
db.add(invoice)
await db.flush() # get invoice.id
total_net = Decimal("0")
for ol_id in order_line_ids:
result = await db.execute(select(OrderLine).where(OrderLine.id == ol_id))
ol = result.scalar_one_or_none()
if not ol:
continue
unit_price = ol.unit_price or Decimal("0")
line = InvoiceLine(
invoice_id=invoice.id,
order_line_id=ol.id,
description=f"Render: {ol.id}",
quantity=1,
unit_price=unit_price,
total=unit_price,
)
db.add(line)
total_net += unit_price
invoice.total_net = total_net
invoice.total_vat = (total_net * vat_rate).quantize(Decimal("0.01"))
await db.commit()
await db.refresh(invoice)
return invoice
async def get_invoices(
db: AsyncSession,
tenant_id: uuid.UUID | None = None,
skip: int = 0,
limit: int = 50,
) -> list[Invoice]:
q = (
select(Invoice)
.options(selectinload(Invoice.lines))
.order_by(Invoice.created_at.desc())
.offset(skip)
.limit(limit)
)
result = await db.execute(q)
return list(result.scalars().all())
async def get_invoice(db: AsyncSession, invoice_id: uuid.UUID) -> Invoice | None:
result = await db.execute(
select(Invoice)
.options(selectinload(Invoice.lines))
.where(Invoice.id == invoice_id)
)
return result.scalar_one_or_none()
async def update_invoice_status(db: AsyncSession, invoice_id: uuid.UUID, status: str) -> Invoice | None:
invoice = await get_invoice(db, invoice_id)
if not invoice:
return None
invoice.status = status
invoice.updated_at = datetime.utcnow()
await db.commit()
await db.refresh(invoice)
return invoice
async def delete_invoice(db: AsyncSession, invoice_id: uuid.UUID) -> bool:
invoice = await get_invoice(db, invoice_id)
if not invoice or invoice.status != "draft":
return False
await db.delete(invoice)
await db.commit()
return True
async def render_pdf(db: AsyncSession, invoice_id: uuid.UUID) -> str | None:
"""Generate PDF via WeasyPrint, upload to storage, return storage key."""
try:
from weasyprint import HTML
except ImportError:
logger.warning("WeasyPrint not installed — PDF generation skipped")
return None
invoice = await get_invoice(db, invoice_id)
if not invoice:
return None
html_content = _build_invoice_html(invoice)
pdf_bytes = HTML(string=html_content).write_pdf()
from app.core.storage import get_storage
storage = get_storage()
key = f"invoices/{invoice_id}.pdf"
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp:
tmp.write(pdf_bytes)
tmp_path = tmp.name
try:
storage.upload(tmp_path, key)
finally:
os.unlink(tmp_path)
invoice.pdf_key = key
invoice.updated_at = datetime.utcnow()
await db.commit()
return key
def _build_invoice_html(invoice: Invoice) -> str:
lines_html = "".join(
f"<tr><td>{l.description}</td><td>{l.quantity}</td>"
f"<td>{l.unit_price or 0:.2f} {invoice.currency}</td>"
f"<td>{l.total or 0:.2f} {invoice.currency}</td></tr>"
for l in invoice.lines
)
return f"""<!DOCTYPE html><html><head><meta charset="utf-8">
<style>
body {{ font-family: Arial, sans-serif; margin: 40px; color: #333; }}
h1 {{ color: #1a56db; }} table {{ width: 100%; border-collapse: collapse; margin-top: 20px; }}
th, td {{ padding: 8px 12px; border-bottom: 1px solid #e5e7eb; text-align: left; }}
th {{ background: #f9fafb; font-weight: 600; }}
.totals {{ text-align: right; margin-top: 16px; }}
</style></head><body>
<h1>Invoice {invoice.invoice_number}</h1>
<p>Status: <strong>{invoice.status}</strong> | Currency: {invoice.currency}</p>
<p>Issued: {invoice.issued_at} | Due: {invoice.due_at or ""}</p>
<table><thead><tr><th>Description</th><th>Qty</th><th>Unit Price</th><th>Total</th></tr></thead>
<tbody>{lines_html}</tbody></table>
<div class="totals">
<p>Net: <strong>{invoice.total_net or 0:.2f} {invoice.currency}</strong></p>
<p>VAT ({float(invoice.vat_rate) * 100:.0f}%): {invoice.total_vat or 0:.2f} {invoice.currency}</p>
<p>Gross: <strong>{(invoice.total_net or 0) + (invoice.total_vat or 0):.2f} {invoice.currency}</strong></p>
</div>
{f'<p><em>Notes: {invoice.notes}</em></p>' if invoice.notes else ''}
</body></html>"""