"""Billing service — pricing (price lookup + order price computation) and invoice CRUD + PDF generation. Price resolution cascade for order lines: 1. OutputType's linked pricing_tier (if active) → use its price_per_item 2. Product's category_key → look up PricingTier by category 3. "default" category tier → global fallback 4. None if nothing configured """ from __future__ import annotations import logging import os import tempfile import uuid from datetime import date, datetime from decimal import Decimal from typing import Any from sqlalchemy import func, select, update as sql_update from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.domains.billing.models import Invoice, InvoiceLine, PricingTier logger = logging.getLogger(__name__) async def get_price_for( db: AsyncSession, category_key: str, quality_level: str = "Normal", ) -> Decimal | None: """Return price_per_item for the given category + quality level.""" # 1. Exact match result = await db.execute( select(PricingTier).where( PricingTier.category_key == category_key, PricingTier.quality_level == quality_level, PricingTier.is_active.is_(True), ) ) tier = result.scalar_one_or_none() if tier is not None: return tier.price_per_item if category_key == "default": return None # 2. Fallback: default category result = await db.execute( select(PricingTier).where( PricingTier.category_key == "default", PricingTier.quality_level == quality_level, PricingTier.is_active.is_(True), ) ) tier = result.scalar_one_or_none() return tier.price_per_item if tier is not None else None async def resolve_line_price( db: AsyncSession, output_type_id: str | None, product_category_key: str | None, ) -> Decimal | None: """Resolve the unit price for a single order line using the cascade.""" if output_type_id is not None: from app.domains.rendering.models import OutputType result = await db.execute( select(OutputType) .options(selectinload(OutputType.pricing_tier)) .where(OutputType.id == output_type_id) ) ot = result.scalar_one_or_none() if ot and ot.pricing_tier and ot.pricing_tier.is_active: return ot.pricing_tier.price_per_item # Step 2+3: category lookup with default fallback cat = product_category_key or "default" return await get_price_for(db, cat) async def estimate_order_price( db: AsyncSession, lines: list[dict[str, Any]], ) -> dict: """Estimate price for a list of prospective order lines.""" from app.domains.products.models import Product breakdown: list[dict] = [] total = Decimal("0.00") has_unpriced = False for line in lines: product_id = line.get("product_id") output_type_id = line.get("output_type_id") # Get product category cat = None if product_id: prod_result = await db.execute( select(Product).where(Product.id == product_id) ) prod = prod_result.scalar_one_or_none() if prod: cat = prod.category_key price = await resolve_line_price(db, output_type_id, cat) breakdown.append({ "output_type_id": str(output_type_id) if output_type_id else None, "product_id": str(product_id) if product_id else None, "unit_price": float(price) if price is not None else None, }) if price is not None: total += price else: has_unpriced = True return { "total": float(total), "line_count": len(lines), "breakdown": breakdown, "has_unpriced": has_unpriced, } async def refresh_order_price(db: AsyncSession, order_id) -> Decimal | None: """Re-fetch order + lines, resolve per-line prices, snapshot to unit_price, update order total.""" from app.domains.orders.models import Order, OrderLine from app.domains.rendering.models import OutputType order_result = await db.execute(select(Order).where(Order.id == order_id)) order = order_result.scalar_one_or_none() if order is None: return None lines_result = await db.execute( select(OrderLine) .options( selectinload(OrderLine.output_type).selectinload(OutputType.pricing_tier), selectinload(OrderLine.product), ) .where( OrderLine.order_id == order_id, OrderLine.output_type_id.is_not(None), ) ) lines = lines_result.scalars().all() if not lines: await db.execute( sql_update(Order) .where(Order.id == order_id) .values(estimated_price=Decimal("0.00")) ) await db.commit() return Decimal("0.00") total = Decimal("0.00") any_priced = False for line in lines: # Cascade: 1) OT pricing tier, 2) product category, 3) default price = None if line.output_type and line.output_type.pricing_tier and line.output_type.pricing_tier.is_active: price = line.output_type.pricing_tier.price_per_item else: cat = line.product.category_key if line.product else None price = await get_price_for(db, cat or "default") # Snapshot to line await db.execute( sql_update(OrderLine) .where(OrderLine.id == line.id) .values(unit_price=price) ) if price is not None: total += price any_priced = True new_price = total if any_priced else None await db.execute( sql_update(Order) .where(Order.id == order_id) .values(estimated_price=new_price) ) await db.commit() return new_price # --------------------------------------------------------------------------- # Invoice CRUD # --------------------------------------------------------------------------- VALID_STATUSES = {"draft", "sent", "paid", "cancelled"} async def generate_invoice_number(db: AsyncSession, tenant_id: uuid.UUID | None) -> str: """Generate sequential invoice number: INV-YYYY-NNNN.""" year = datetime.utcnow().year count_result = await db.execute( select(func.count()).select_from(Invoice).where( func.extract("year", Invoice.created_at) == year ) ) seq = (count_result.scalar() or 0) + 1 return f"INV-{year}-{seq:04d}" async def create_invoice( db: AsyncSession, tenant_id: uuid.UUID | None, order_line_ids: list[uuid.UUID], notes: str | None = None, issued_at: date | None = None, due_at: date | None = None, vat_rate: Decimal = Decimal("0.19"), currency: str = "EUR", ) -> Invoice: """Create invoice with lines derived from order lines.""" from app.domains.orders.models import OrderLine invoice_number = await generate_invoice_number(db, tenant_id) invoice = Invoice( tenant_id=tenant_id, invoice_number=invoice_number, status="draft", issued_at=issued_at or date.today(), due_at=due_at, notes=notes, vat_rate=vat_rate, currency=currency, ) db.add(invoice) await db.flush() # get invoice.id total_net = Decimal("0") for ol_id in order_line_ids: result = await db.execute(select(OrderLine).where(OrderLine.id == ol_id)) ol = result.scalar_one_or_none() if not ol: continue unit_price = ol.unit_price or Decimal("0") line = InvoiceLine( invoice_id=invoice.id, order_line_id=ol.id, description=f"Render: {ol.id}", quantity=1, unit_price=unit_price, total=unit_price, ) db.add(line) total_net += unit_price invoice.total_net = total_net invoice.total_vat = (total_net * vat_rate).quantize(Decimal("0.01")) await db.commit() await db.refresh(invoice) return invoice async def get_invoices( db: AsyncSession, tenant_id: uuid.UUID | None = None, skip: int = 0, limit: int = 50, ) -> list[Invoice]: q = ( select(Invoice) .options(selectinload(Invoice.lines)) .order_by(Invoice.created_at.desc()) .offset(skip) .limit(limit) ) result = await db.execute(q) return list(result.scalars().all()) async def get_invoice(db: AsyncSession, invoice_id: uuid.UUID) -> Invoice | None: result = await db.execute( select(Invoice) .options(selectinload(Invoice.lines)) .where(Invoice.id == invoice_id) ) return result.scalar_one_or_none() async def update_invoice_status(db: AsyncSession, invoice_id: uuid.UUID, status: str) -> Invoice | None: invoice = await get_invoice(db, invoice_id) if not invoice: return None invoice.status = status invoice.updated_at = datetime.utcnow() await db.commit() await db.refresh(invoice) return invoice async def delete_invoice(db: AsyncSession, invoice_id: uuid.UUID) -> bool: invoice = await get_invoice(db, invoice_id) if not invoice or invoice.status != "draft": return False await db.delete(invoice) await db.commit() return True async def render_pdf(db: AsyncSession, invoice_id: uuid.UUID) -> str | None: """Generate PDF via WeasyPrint, upload to storage, return storage key.""" try: from weasyprint import HTML except ImportError: logger.warning("WeasyPrint not installed — PDF generation skipped") return None invoice = await get_invoice(db, invoice_id) if not invoice: return None html_content = _build_invoice_html(invoice) pdf_bytes = HTML(string=html_content).write_pdf() from app.core.storage import get_storage storage = get_storage() key = f"invoices/{invoice_id}.pdf" with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp: tmp.write(pdf_bytes) tmp_path = tmp.name try: storage.upload(tmp_path, key) finally: os.unlink(tmp_path) invoice.pdf_key = key invoice.updated_at = datetime.utcnow() await db.commit() return key def _build_invoice_html(invoice: Invoice) -> str: lines_html = "".join( f"{l.description}{l.quantity}" f"{l.unit_price or 0:.2f} {invoice.currency}" f"{l.total or 0:.2f} {invoice.currency}" for l in invoice.lines ) return f"""

Invoice {invoice.invoice_number}

Status: {invoice.status} | Currency: {invoice.currency}

Issued: {invoice.issued_at} | Due: {invoice.due_at or "—"}

{lines_html}
DescriptionQtyUnit PriceTotal

Net: {invoice.total_net or 0:.2f} {invoice.currency}

VAT ({float(invoice.vat_rate) * 100:.0f}%): {invoice.total_vat or 0:.2f} {invoice.currency}

Gross: {(invoice.total_net or 0) + (invoice.total_vat or 0):.2f} {invoice.currency}

{f'

Notes: {invoice.notes}

' if invoice.notes else ''} """