import { Redis } from "ioredis"; import { SSE_EVENT_TYPES, type SseEventType } from "@planarchy/shared"; export interface SseEvent { type: SseEventType; payload: Record; timestamp: string; } type Subscriber = (event: SseEvent) => void; // Module-level subscriber registry (shared between EventBus and publishLocal) const subscribers = new Set(); // --------------------------------------------------------------------------- // Debounce buffer: aggregates rapid events of the same type within a 50ms // window and delivers a single event per type to subscribers. // --------------------------------------------------------------------------- const DEBOUNCE_MS = 50; interface BufferEntry { payloads: Record[]; timer: ReturnType; firstTimestamp: string; } const debounceBuffer = new Map(); /** Flush a single event type from the buffer and deliver to subscribers. */ function flushEventType(type: SseEventType): void { const entry = debounceBuffer.get(type); if (!entry) return; debounceBuffer.delete(type); const event: SseEvent = entry.payloads.length === 1 ? { type, payload: entry.payloads[0]!, timestamp: entry.firstTimestamp } : { type, payload: { _batch: entry.payloads }, timestamp: entry.firstTimestamp, }; for (const fn of subscribers) { fn(event); } } /** Flush all pending debounce timers immediately (for cleanup / tests). */ export function flushPendingEvents(): void { for (const [type, entry] of debounceBuffer) { clearTimeout(entry.timer); debounceBuffer.delete(type); const event: SseEvent = entry.payloads.length === 1 ? { type, payload: entry.payloads[0]!, timestamp: entry.firstTimestamp } : { type, payload: { _batch: entry.payloads }, timestamp: entry.firstTimestamp, }; for (const fn of subscribers) { fn(event); } } } /** Cancel all pending debounce timers without delivering (for shutdown). */ export function cancelPendingEvents(): void { for (const [, entry] of debounceBuffer) { clearTimeout(entry.timer); } debounceBuffer.clear(); } // Redis connection — use env var REDIS_URL or fallback to default dev URL const REDIS_URL = process.env["REDIS_URL"] ?? "redis://localhost:6380"; const CHANNEL = "planarchy:sse"; let publisher: Redis | null = null; let subscriber: Redis | null = null; function getPublisher(): Redis { if (!publisher) { publisher = new Redis(REDIS_URL, { lazyConnect: false, enableReadyCheck: false }); publisher.on("error", (e: unknown) => console.error("[Redis publisher]", e)); } return publisher; } function setupSubscriber(): void { if (subscriber) return; try { subscriber = new Redis(REDIS_URL, { lazyConnect: false, enableReadyCheck: false }); subscriber.on("error", (e: unknown) => console.error("[Redis subscriber]", e)); void subscriber.subscribe(CHANNEL).catch((err: unknown) => { console.error("[Redis subscribe]", err); }); subscriber.on("message", (_channel: string, message: string) => { try { const parsed = JSON.parse(message) as { type: SseEventType; payload: Record; timestamp: string }; publishLocal({ type: parsed.type, payload: parsed.payload, timestamp: parsed.timestamp }); } catch { /* ignore parse errors */ } }); } catch (e) { console.warn("[Redis setupSubscriber] Redis unavailable, SSE will be local-only:", e); } } /** * SSE Event Bus with Redis Pub/Sub for multi-instance support. * Gracefully degrades to in-memory delivery when Redis is unavailable. */ class EventBus { subscribe(fn: Subscriber): () => void { subscribers.add(fn); return () => subscribers.delete(fn); } publish(event: SseEvent): void { // Broadcast via Redis (all instances receive via subscriber.on("message")) try { const pub = getPublisher(); void pub.publish(CHANNEL, JSON.stringify({ type: event.type, payload: event.payload, timestamp: event.timestamp })); } catch (e) { console.warn("[Redis emit] fallback to local-only:", e); // Deliver locally when Redis is unavailable publishLocal(event); } } emit(type: SseEventType, payload: Record): void { this.publish({ type, payload, timestamp: new Date().toISOString(), }); } get subscriberCount(): number { return subscribers.size; } } // Local delivery with debounce: buffer events of the same type within a 50ms // window and then deliver a single (possibly aggregated) event to subscribers. function publishLocal(event: SseEvent): void { const existing = debounceBuffer.get(event.type); if (existing) { // Another event of the same type is already buffered — append payload and // reset the timer so the window starts fresh from the latest arrival. existing.payloads.push(event.payload); clearTimeout(existing.timer); existing.timer = setTimeout(() => flushEventType(event.type), DEBOUNCE_MS); } else { // First event of this type — start a new debounce window. debounceBuffer.set(event.type, { payloads: [event.payload], timer: setTimeout(() => flushEventType(event.type), DEBOUNCE_MS), firstTimestamp: event.timestamp, }); } } // Singleton event bus export const eventBus = new EventBus(); // Start Redis subscriber once at module init (best-effort) setupSubscriber(); // Helper emitters export const emitAllocationCreated = (allocation: Record) => eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, allocation); export const emitAllocationUpdated = (allocation: Record) => eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, allocation); export const emitAllocationDeleted = (allocationId: string, projectId: string) => eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_DELETED, { allocationId, projectId }); export const emitProjectShifted = (project: Record) => eventBus.emit(SSE_EVENT_TYPES.PROJECT_SHIFTED, project); export const emitBudgetWarning = (projectId: string, payload: Record) => eventBus.emit(SSE_EVENT_TYPES.BUDGET_WARNING, { projectId, ...payload }); export const emitVacationCreated = (vacation: Record) => eventBus.emit(SSE_EVENT_TYPES.VACATION_CREATED, vacation); export const emitVacationUpdated = (vacation: Record) => eventBus.emit(SSE_EVENT_TYPES.VACATION_UPDATED, vacation); export const emitVacationDeleted = (vacationId: string, resourceId: string) => eventBus.emit(SSE_EVENT_TYPES.VACATION_DELETED, { vacationId, resourceId }); export const emitRoleCreated = (role: Record) => eventBus.emit(SSE_EVENT_TYPES.ROLE_CREATED, role); export const emitRoleUpdated = (role: Record) => eventBus.emit(SSE_EVENT_TYPES.ROLE_UPDATED, role); export const emitRoleDeleted = (roleId: string) => eventBus.emit(SSE_EVENT_TYPES.ROLE_DELETED, { roleId }); export function emitNotificationCreated(userId: string, notificationId: string): void { eventBus.emit(SSE_EVENT_TYPES.NOTIFICATION_CREATED, { userId, notificationId }); } export function emitTaskAssigned(userId: string, notificationId: string): void { eventBus.emit(SSE_EVENT_TYPES.TASK_ASSIGNED, { userId, notificationId }); } export function emitTaskCompleted(userId: string, notificationId: string): void { eventBus.emit(SSE_EVENT_TYPES.TASK_COMPLETED, { userId, notificationId }); } export function emitTaskStatusChanged(userId: string, notificationId: string): void { eventBus.emit(SSE_EVENT_TYPES.TASK_STATUS_CHANGED, { userId, notificationId }); } export function emitReminderDue(userId: string, notificationId: string): void { eventBus.emit(SSE_EVENT_TYPES.REMINDER_DUE, { userId, notificationId }); } export function emitBroadcastSent(broadcastId: string, recipientCount: number): void { eventBus.emit(SSE_EVENT_TYPES.BROADCAST_SENT, { broadcastId, recipientCount }); }