import { Redis } from "ioredis"; import { PermissionKey, SSE_EVENT_TYPES, SystemRole, type SseEventType } from "@capakraken/shared"; export type SseAudience = string; export interface SseEvent { type: SseEventType; payload: Record; timestamp: string; audience: SseAudience[]; } type Subscriber = (event: SseEvent) => void; interface Subscription { fn: Subscriber; audiences: Set; includeUnscoped: boolean; } export interface SseSubscriptionOptions { audiences?: Iterable; includeUnscoped?: boolean; } // Module-level subscriber registry (shared between EventBus and publishLocal) const subscribers = new Set(); // --------------------------------------------------------------------------- // Debounce buffer: aggregates rapid events of the same type and audience within // a 50ms window and delivers a single event per scope to subscribers. // --------------------------------------------------------------------------- const DEBOUNCE_MS = 50; interface BufferEntry { payloads: Record[]; timer: ReturnType; firstTimestamp: string; audience: SseAudience[]; } const debounceBuffer = new Map(); function normalizeAudiences(audiences?: Iterable): SseAudience[] { return [...new Set(Array.from(audiences ?? [], (audience) => audience.trim()).filter(Boolean))].sort(); } function getBufferKey(type: SseEventType, audience: readonly SseAudience[]): string { return `${type}::${audience.length > 0 ? audience.join("|") : "__unscoped__"}`; } function matchesSubscription(event: SseEvent, subscription: Subscription): boolean { if (event.audience.length === 0) { return subscription.includeUnscoped; } return event.audience.some((audience) => subscription.audiences.has(audience)); } function deliverEvent(event: SseEvent): void { for (const subscription of subscribers) { if (matchesSubscription(event, subscription)) { subscription.fn(event); } } } export const userAudience = (userId: string): SseAudience => `user:${userId}`; export const roleAudience = (role: string): SseAudience => `role:${role}`; export const permissionAudience = (permission: string): SseAudience => `permission:${permission}`; /** Flush a single event type from the buffer and deliver to subscribers. */ function flushEventType(type: SseEventType, audience: readonly SseAudience[]): void { const key = getBufferKey(type, audience); const entry = debounceBuffer.get(key); if (!entry) return; debounceBuffer.delete(key); const event: SseEvent = entry.payloads.length === 1 ? { type, payload: entry.payloads[0]!, timestamp: entry.firstTimestamp, audience: entry.audience } : { type, payload: { _batch: entry.payloads }, timestamp: entry.firstTimestamp, audience: entry.audience, }; deliverEvent(event); } /** Flush all pending debounce timers immediately (for cleanup / tests). */ export function flushPendingEvents(): void { for (const [key, entry] of debounceBuffer) { clearTimeout(entry.timer); debounceBuffer.delete(key); const [type] = key.split("::") as [SseEventType]; const event: SseEvent = entry.payloads.length === 1 ? { type, payload: entry.payloads[0]!, timestamp: entry.firstTimestamp, audience: entry.audience } : { type, payload: { _batch: entry.payloads }, timestamp: entry.firstTimestamp, audience: entry.audience, }; deliverEvent(event); } } /** Cancel all pending debounce timers without delivering (for shutdown). */ export function cancelPendingEvents(): void { for (const [, entry] of debounceBuffer) { clearTimeout(entry.timer); } debounceBuffer.clear(); } // Redis connection — use env var REDIS_URL or fallback to default dev URL const REDIS_URL = process.env["REDIS_URL"] ?? "redis://localhost:6380"; const CHANNEL = "capakraken:sse"; let publisher: Redis | null = null; let subscriber: Redis | null = null; function getPublisher(): Redis { if (!publisher) { publisher = new Redis(REDIS_URL, { lazyConnect: false, enableReadyCheck: false }); publisher.on("error", (e: unknown) => console.error("[Redis publisher]", e)); } return publisher; } function setupSubscriber(): void { if (subscriber) return; try { subscriber = new Redis(REDIS_URL, { lazyConnect: false, enableReadyCheck: false }); subscriber.on("error", (e: unknown) => console.error("[Redis subscriber]", e)); void subscriber.subscribe(CHANNEL).catch((err: unknown) => { console.error("[Redis subscribe]", err); }); subscriber.on("message", (_channel: string, message: string) => { try { const parsed = JSON.parse(message) as { type: SseEventType; payload: Record; timestamp: string; audience?: SseAudience[]; }; publishLocal({ type: parsed.type, payload: parsed.payload, timestamp: parsed.timestamp, audience: normalizeAudiences(parsed.audience), }); } catch { // ignore parse errors } }); } catch (e) { console.warn("[Redis setupSubscriber] Redis unavailable, SSE will be local-only:", e); } } /** * SSE Event Bus with Redis Pub/Sub for multi-instance support. * Gracefully degrades to in-memory delivery when Redis is unavailable. */ class EventBus { subscribe(fn: Subscriber, options: SseSubscriptionOptions = {}): () => void { const subscription: Subscription = { fn, audiences: new Set(normalizeAudiences(options.audiences)), includeUnscoped: options.includeUnscoped ?? true, }; subscribers.add(subscription); return () => subscribers.delete(subscription); } publish(event: SseEvent): void { const normalizedEvent: SseEvent = { ...event, audience: normalizeAudiences(event.audience), }; // Broadcast via Redis (all instances receive via subscriber.on("message")) try { const pub = getPublisher(); void pub.publish( CHANNEL, JSON.stringify({ type: normalizedEvent.type, payload: normalizedEvent.payload, timestamp: normalizedEvent.timestamp, audience: normalizedEvent.audience, }), ); } catch (e) { console.warn("[Redis emit] fallback to local-only:", e); // Deliver locally when Redis is unavailable publishLocal(normalizedEvent); } } emit(type: SseEventType, payload: Record, audience: Iterable = []): void { this.publish({ type, payload, timestamp: new Date().toISOString(), audience: normalizeAudiences(audience), }); } get subscriberCount(): number { return subscribers.size; } } // Local delivery with debounce: buffer events of the same type and audience // within a 50ms window and then deliver a single (possibly aggregated) event. function publishLocal(event: SseEvent): void { const audience = normalizeAudiences(event.audience); const key = getBufferKey(event.type, audience); const existing = debounceBuffer.get(key); if (existing) { // Another event of the same type is already buffered — append payload and // reset the timer so the window starts fresh from the latest arrival. existing.payloads.push(event.payload); clearTimeout(existing.timer); existing.timer = setTimeout(() => flushEventType(event.type, audience), DEBOUNCE_MS); } else { // First event of this type and audience — start a new debounce window. debounceBuffer.set(key, { payloads: [event.payload], timer: setTimeout(() => flushEventType(event.type, audience), DEBOUNCE_MS), firstTimestamp: event.timestamp, audience, }); } } // Singleton event bus export const eventBus = new EventBus(); // Start Redis subscriber once at module init (best-effort) setupSubscriber(); // Helper emitters export const emitAllocationCreated = (allocation: Record) => eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, allocation, [permissionAudience(PermissionKey.MANAGE_ALLOCATIONS)]); export const emitAllocationUpdated = (allocation: Record) => eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, allocation, [permissionAudience(PermissionKey.MANAGE_ALLOCATIONS)]); export const emitAllocationDeleted = (allocationId: string, projectId: string) => eventBus.emit( SSE_EVENT_TYPES.ALLOCATION_DELETED, { allocationId, projectId }, [permissionAudience(PermissionKey.MANAGE_ALLOCATIONS)], ); export const emitProjectShifted = (project: Record) => eventBus.emit(SSE_EVENT_TYPES.PROJECT_SHIFTED, project, [permissionAudience(PermissionKey.MANAGE_ALLOCATIONS)]); export const emitBudgetWarning = (projectId: string, payload: Record) => eventBus.emit( SSE_EVENT_TYPES.BUDGET_WARNING, { projectId, ...payload }, [permissionAudience(PermissionKey.MANAGE_ALLOCATIONS)], ); export const emitVacationCreated = (vacation: Record) => eventBus.emit(SSE_EVENT_TYPES.VACATION_CREATED, vacation, [permissionAudience(PermissionKey.MANAGE_ALLOCATIONS)]); export const emitVacationUpdated = (vacation: Record) => eventBus.emit(SSE_EVENT_TYPES.VACATION_UPDATED, vacation, [permissionAudience(PermissionKey.MANAGE_ALLOCATIONS)]); export const emitVacationDeleted = (vacationId: string, resourceId: string) => eventBus.emit( SSE_EVENT_TYPES.VACATION_DELETED, { vacationId, resourceId }, [permissionAudience(PermissionKey.MANAGE_ALLOCATIONS)], ); export const emitRoleCreated = (role: Record) => eventBus.emit(SSE_EVENT_TYPES.ROLE_CREATED, role, [permissionAudience(PermissionKey.MANAGE_ROLES)]); export const emitRoleUpdated = (role: Record) => eventBus.emit(SSE_EVENT_TYPES.ROLE_UPDATED, role, [permissionAudience(PermissionKey.MANAGE_ROLES)]); export const emitRoleDeleted = (roleId: string) => eventBus.emit(SSE_EVENT_TYPES.ROLE_DELETED, { roleId }, [permissionAudience(PermissionKey.MANAGE_ROLES)]); export function emitNotificationCreated(userId: string, notificationId: string): void { eventBus.emit(SSE_EVENT_TYPES.NOTIFICATION_CREATED, { userId, notificationId }, [userAudience(userId)]); } export function emitTaskAssigned(userId: string, notificationId: string): void { eventBus.emit(SSE_EVENT_TYPES.TASK_ASSIGNED, { userId, notificationId }, [userAudience(userId)]); } export function emitTaskCompleted(userId: string, notificationId: string): void { eventBus.emit(SSE_EVENT_TYPES.TASK_COMPLETED, { userId, notificationId }, [userAudience(userId)]); } export function emitTaskStatusChanged(userId: string, notificationId: string): void { eventBus.emit(SSE_EVENT_TYPES.TASK_STATUS_CHANGED, { userId, notificationId }, [userAudience(userId)]); } export function emitReminderDue(userId: string, notificationId: string): void { eventBus.emit(SSE_EVENT_TYPES.REMINDER_DUE, { userId, notificationId }, [userAudience(userId)]); } export function emitBroadcastSent(broadcastId: string, recipientCount: number): void { eventBus.emit(SSE_EVENT_TYPES.BROADCAST_SENT, { broadcastId, recipientCount }, [ roleAudience(SystemRole.ADMIN), roleAudience(SystemRole.MANAGER), ]); }