"""Notification emission helpers. Provides async (for routers) and sync (for Celery tasks) entry points to create notification rows in the audit_log table. """ import logging import uuid from datetime import datetime from sqlalchemy import create_engine, select from sqlalchemy.orm import Session from sqlalchemy.ext.asyncio import AsyncSession from app.domains.notifications.models import AuditLog logger = logging.getLogger(__name__) # ── Channel constants ──────────────────────────────────────────────────────── CHANNEL_ACTIVITY = "activity" # per-action events, not shown in bell dropdown CHANNEL_NOTIFICATION = "notification" # batch summaries, shown in bell CHANNEL_ALERT = "alert" # admin-only infrastructure issues _engine = None def _get_engine(): global _engine if _engine is None: from app.config import settings as app_settings _engine = create_engine(app_settings.database_url_sync) return _engine async def emit_notification( db: AsyncSession, *, actor_user_id: str | uuid.UUID | None = None, target_user_id: str | uuid.UUID | None = None, action: str, entity_type: str | None = None, entity_id: str | None = None, details: dict | None = None, channel: str = CHANNEL_NOTIFICATION, ) -> None: """Create a notification (async — for use inside FastAPI routers).""" try: entry = AuditLog( user_id=str(actor_user_id) if actor_user_id else None, target_user_id=str(target_user_id) if target_user_id else None, action=action, entity_type=entity_type, entity_id=str(entity_id) if entity_id else None, details=details, notification=True, channel=channel, timestamp=datetime.utcnow(), ) db.add(entry) await db.commit() except Exception: logger.exception("Failed to emit notification (async)") await db.rollback() def emit_notification_sync( *, actor_user_id: str | uuid.UUID | None = None, target_user_id: str | uuid.UUID | None = None, action: str, entity_type: str | None = None, entity_id: str | None = None, details: dict | None = None, channel: str = CHANNEL_NOTIFICATION, ) -> None: """Create a notification (sync — for use inside Celery tasks).""" engine = _get_engine() try: with Session(engine) as session: entry = AuditLog( user_id=str(actor_user_id) if actor_user_id else None, target_user_id=str(target_user_id) if target_user_id else None, action=action, entity_type=entity_type, entity_id=str(entity_id) if entity_id else None, details=details, notification=True, channel=channel, timestamp=datetime.utcnow(), ) session.add(entry) session.commit() except Exception: logger.exception("Failed to emit notification (sync)") def emit_batch_render_notification_sync(order_id: str) -> None: """Emit a single batch notification summarising all render results for an order. Queries all order_lines for the order, counts terminal statuses, and emits a single channel=notification row targeting the order creator. Should be called after all lines reach a terminal state. """ engine = _get_engine() try: from app.domains.orders.models import Order, OrderLine with Session(engine) as session: order = session.get(Order, order_id) if order is None: logger.warning("emit_batch_render_notification_sync: order %s not found", order_id) return lines = session.execute( select(OrderLine).where(OrderLine.order_id == order_id) ).scalars().all() if not lines: return total = len(lines) completed = sum(1 for l in lines if l.render_status == "completed") failed = sum(1 for l in lines if l.render_status == "failed") cancelled = sum(1 for l in lines if l.render_status == "cancelled") if completed == total: action = "order.completed" message = f"All {total} render(s) completed successfully" elif failed > 0 and (completed + failed + cancelled) == total: action = "order.completed" message = f"Rendering complete: {completed}/{total} succeeded, {failed} failed" if cancelled: message += f", {cancelled} cancelled" else: action = "order.completed" message = f"Order rendering complete: {completed}/{total} succeeded" entry = AuditLog( user_id=None, target_user_id=str(order.created_by), action=action, entity_type="order", entity_id=str(order.id), details={ "order_number": order.order_number, "total": total, "completed": completed, "failed": failed, "cancelled": cancelled, "message": message, }, notification=True, channel=CHANNEL_NOTIFICATION, timestamp=datetime.utcnow(), ) session.add(entry) session.commit() logger.info( "emit_batch_render_notification_sync: emitted batch notification for order %s (%s)", order_id, message, ) except Exception: logger.exception("emit_batch_render_notification_sync failed for order %s", order_id) # ── Notification config helpers ───────────────────────────────────────────── def _is_channel_enabled_sync(user_id: str | None, event_type: str, channel: str) -> bool: """Check if a notification channel is enabled for a user (sync, for Celery).""" if not user_id: return channel == "in_app" # default: in_app on, email off engine = _get_engine() from app.domains.notifications.models import NotificationConfig with Session(engine) as session: cfg = session.execute( select(NotificationConfig).where( NotificationConfig.user_id == user_id, NotificationConfig.event_type == event_type, NotificationConfig.channel == channel, ) ).scalar_one_or_none() if cfg is None: return channel == "in_app" # default return cfg.enabled def send_email_notification_stub( *, to_user_id: str | None, event_type: str, subject: str, body: str, ) -> None: """Send email notification via SMTP if configured and enabled, else log only.""" engine = _get_engine() try: from sqlalchemy.orm import Session from app.models.system_setting import SystemSetting with Session(engine) as s: rows = s.execute( __import__("sqlalchemy").select(SystemSetting).where( SystemSetting.key.in_(["smtp_enabled", "smtp_host", "smtp_port", "smtp_user", "smtp_password", "smtp_from_address"]) ) ).scalars().all() cfg = {r.key: r.value for r in rows} except Exception: cfg = {} smtp_enabled = cfg.get("smtp_enabled", "false").lower() == "true" smtp_host = cfg.get("smtp_host", "") if not smtp_enabled or not smtp_host: logger.info( "[EMAIL STUB] Would send email to user=%s event=%s subject=%s (smtp disabled)", to_user_id, event_type, subject ) return # Resolve to_address from user record to_address: str | None = None try: from sqlalchemy.orm import Session from app.models.user import User with Session(engine) as s: u = s.get(User, to_user_id) if to_user_id else None if u: to_address = u.email except Exception: pass if not to_address: logger.warning("[EMAIL] Could not resolve email for user=%s", to_user_id) return try: import smtplib from email.mime.text import MIMEText msg = MIMEText(body, "plain", "utf-8") msg["Subject"] = subject msg["From"] = cfg.get("smtp_from_address") or cfg.get("smtp_user", "noreply@schaeffler.com") msg["To"] = to_address port = int(cfg.get("smtp_port", "587")) with smtplib.SMTP(smtp_host, port) as smtp: smtp.ehlo() smtp.starttls() if cfg.get("smtp_user") and cfg.get("smtp_password"): smtp.login(cfg["smtp_user"], cfg["smtp_password"]) smtp.sendmail(msg["From"], [to_address], msg.as_string()) logger.info("[EMAIL] Sent to %s event=%s", to_address, event_type) except Exception as exc: logger.error("[EMAIL] Failed to send to %s: %s", to_address, exc) async def get_notification_configs(db: AsyncSession, user_id: uuid.UUID) -> list: from app.domains.notifications.models import NotificationConfig from sqlalchemy import select as sa_select result = await db.execute( sa_select(NotificationConfig).where(NotificationConfig.user_id == user_id) .order_by(NotificationConfig.event_type, NotificationConfig.channel) ) return list(result.scalars().all()) async def upsert_notification_config( db: AsyncSession, user_id: uuid.UUID, event_type: str, channel: str, enabled: bool, frequency: str | None = None, ) -> object: from app.domains.notifications.models import NotificationConfig from sqlalchemy import select as sa_select result = await db.execute( sa_select(NotificationConfig).where( NotificationConfig.user_id == user_id, NotificationConfig.event_type == event_type, NotificationConfig.channel == channel, ) ) cfg = result.scalar_one_or_none() if cfg is None: cfg = NotificationConfig( user_id=user_id, event_type=event_type, channel=channel, enabled=enabled, frequency=frequency or "immediate", ) db.add(cfg) else: cfg.enabled = enabled if frequency is not None: cfg.frequency = frequency await db.commit() await db.refresh(cfg) return cfg async def reset_notification_configs(db: AsyncSession, user_id: uuid.UUID) -> list: from app.domains.notifications.models import NotificationConfig, NotificationEvent from sqlalchemy import delete as sa_delete await db.execute(sa_delete(NotificationConfig).where(NotificationConfig.user_id == user_id)) configs = [] for event in NotificationEvent.ALL: for channel, default_enabled in [("in_app", True), ("email", False)]: cfg = NotificationConfig( user_id=user_id, event_type=event, channel=channel, enabled=default_enabled ) db.add(cfg) configs.append(cfg) await db.commit() return configs