Files
Nexus/packages/api/src/sse/event-bus.ts
T
Hartmut 19aeb2ba04
CI / Lint (push) Successful in 3m4s
CI / Typecheck (push) Successful in 3m6s
CI / Architecture Guardrails (push) Successful in 3m8s
CI / Assistant Split Regression (push) Successful in 3m48s
CI / Build (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
CI / Fresh-Linux Docker Deploy (push) Has been cancelled
CI / Release Images (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
rename(phase 3): compose/DB/infra + stray code refs capakraken → nexus (#62)
rename(phase 3): compose/DB/infra + stray code refs capakraken → nexus (#62)

Co-authored-by: Hartmut Nörenberg <hn@hartmut-noerenberg.com>
Co-committed-by: Hartmut Nörenberg <hn@hartmut-noerenberg.com>
2026-05-21 20:07:18 +02:00

440 lines
14 KiB
TypeScript

import { Redis } from "ioredis";
import { PermissionKey, SSE_EVENT_TYPES, type SseEventType } from "@nexus/shared";
import { logger } from "../lib/logger.js";
export type UserSseAudience = `user:${string}`;
export type PermissionSseAudience = `permission:${string}`;
export type ResourceSseAudience = `resource:${string}`;
export type SseAudience = UserSseAudience | PermissionSseAudience | ResourceSseAudience;
const SSE_AUDIENCE_PREFIXES = ["user", "permission", "resource"] as const;
type SseAudiencePrefix = (typeof SSE_AUDIENCE_PREFIXES)[number];
export interface SseEvent {
type: SseEventType;
payload: Record<string, unknown>;
timestamp: string;
audience: SseAudience[];
}
type Subscriber = (event: SseEvent) => void;
interface Subscription {
fn: Subscriber;
audiences: Set<SseAudience>;
includeUnscoped: boolean;
}
export interface SseSubscriptionOptions {
audiences?: Iterable<SseAudience>;
includeUnscoped?: boolean;
}
// Module-level subscriber registry (shared between EventBus and publishLocal)
const subscribers = new Set<Subscription>();
// ---------------------------------------------------------------------------
// Debounce buffer: aggregates rapid events of the same type and audience within
// a 50ms window and delivers a single event per scope to subscribers.
// ---------------------------------------------------------------------------
const DEBOUNCE_MS = 50;
interface BufferEntry {
payloads: Record<string, unknown>[];
timer: ReturnType<typeof setTimeout>;
firstTimestamp: string;
audience: SseAudience[];
}
const debounceBuffer = new Map<string, BufferEntry>();
function isSseAudiencePrefix(value: string): value is SseAudiencePrefix {
return SSE_AUDIENCE_PREFIXES.includes(value as SseAudiencePrefix);
}
function assertValidSseAudience(audience: string): SseAudience {
const normalized = audience.trim();
if (!normalized) {
throw new Error("Invalid SSE audience: empty value");
}
const separatorIndex = normalized.indexOf(":");
if (separatorIndex <= 0 || separatorIndex === normalized.length - 1) {
throw new Error(`Invalid SSE audience: ${normalized}`);
}
const prefix = normalized.slice(0, separatorIndex);
const scope = normalized.slice(separatorIndex + 1).trim();
if (!isSseAudiencePrefix(prefix) || !scope) {
throw new Error(`Invalid SSE audience: ${normalized}`);
}
return `${prefix}:${scope}` as SseAudience;
}
export function canonicalizeSseAudiences(audiences?: Iterable<SseAudience>): SseAudience[] {
return [
...new Set(
Array.from(audiences ?? [], (audience) => assertValidSseAudience(audience)).filter(Boolean),
),
].sort();
}
function getBufferKey(type: SseEventType, audience: readonly SseAudience[]): string {
return `${type}::${audience.length > 0 ? audience.join("|") : "__unscoped__"}`;
}
function matchesSubscription(event: SseEvent, subscription: Subscription): boolean {
if (event.audience.length === 0) {
return subscription.includeUnscoped;
}
return event.audience.some((audience) => subscription.audiences.has(audience));
}
function deliverEvent(event: SseEvent): void {
for (const subscription of subscribers) {
if (matchesSubscription(event, subscription)) {
try {
subscription.fn(event);
} catch (err) {
logger.warn({ err, eventType: event.type }, "SSE subscriber threw during event delivery");
}
}
}
}
export const userAudience = (userId: string): SseAudience => `user:${userId}`;
export const permissionAudience = (permission: string): SseAudience => `permission:${permission}`;
export const resourceAudience = (resourceId: string): SseAudience => `resource:${resourceId}`;
function extractScopedResourceIds(payload: Record<string, unknown>): string[] {
const resourceIds = new Set<string>();
const directResourceId = payload["resourceId"];
if (typeof directResourceId === "string" && directResourceId.trim().length > 0) {
resourceIds.add(directResourceId);
}
const listedResourceIds = payload["resourceIds"];
if (Array.isArray(listedResourceIds)) {
for (const resourceId of listedResourceIds) {
if (typeof resourceId === "string" && resourceId.trim().length > 0) {
resourceIds.add(resourceId);
}
}
}
return [...resourceIds];
}
function buildPlanningAudiences(payload: Record<string, unknown>): SseAudience[] {
return canonicalizeSseAudiences([
permissionAudience(PermissionKey.MANAGE_ALLOCATIONS),
...extractScopedResourceIds(payload).map((resourceId) => resourceAudience(resourceId)),
]);
}
/** Flush a single event type from the buffer and deliver to subscribers. */
function flushEventType(type: SseEventType, audience: readonly SseAudience[]): void {
const key = getBufferKey(type, audience);
const entry = debounceBuffer.get(key);
if (!entry) return;
debounceBuffer.delete(key);
const event: SseEvent =
entry.payloads.length === 1
? {
type,
payload: entry.payloads[0]!,
timestamp: entry.firstTimestamp,
audience: entry.audience,
}
: {
type,
payload: { _batch: entry.payloads },
timestamp: entry.firstTimestamp,
audience: entry.audience,
};
deliverEvent(event);
}
/** Flush all pending debounce timers immediately (for cleanup / tests). */
export function flushPendingEvents(): void {
for (const [key, entry] of debounceBuffer) {
clearTimeout(entry.timer);
debounceBuffer.delete(key);
const [type] = key.split("::") as [SseEventType];
const event: SseEvent =
entry.payloads.length === 1
? {
type,
payload: entry.payloads[0]!,
timestamp: entry.firstTimestamp,
audience: entry.audience,
}
: {
type,
payload: { _batch: entry.payloads },
timestamp: entry.firstTimestamp,
audience: entry.audience,
};
deliverEvent(event);
}
}
/** Cancel all pending debounce timers without delivering (for shutdown). */
export function cancelPendingEvents(): void {
for (const [, entry] of debounceBuffer) {
clearTimeout(entry.timer);
}
debounceBuffer.clear();
}
// Redis connection — use env var REDIS_URL or fallback to default dev URL
const REDIS_URL =
process.env["REDIS_URL"] ??
(process.env["NODE_ENV"] !== "production"
? (console.warn("[env] REDIS_URL not set, using localhost fallback"), "redis://localhost:6380")
: (() => {
throw new Error("REDIS_URL required in production");
})());
const CHANNEL = "nexus:sse";
let publisher: Redis | null = null;
let subscriber: Redis | null = null;
function getPublisher(): Redis {
if (!publisher) {
publisher = new Redis(REDIS_URL, { lazyConnect: false, enableReadyCheck: false });
publisher.on("error", (e: unknown) => {
logger.warn(
{ err: e, redisUrl: REDIS_URL, channel: CHANNEL },
"Redis publisher emitted an error",
);
});
}
return publisher;
}
function setupSubscriber(): void {
if (subscriber) return;
try {
subscriber = new Redis(REDIS_URL, { lazyConnect: false, enableReadyCheck: false });
subscriber.on("error", (e: unknown) => {
logger.warn(
{ err: e, redisUrl: REDIS_URL, channel: CHANNEL },
"Redis subscriber emitted an error",
);
});
void subscriber.subscribe(CHANNEL).catch((err: unknown) => {
logger.warn(
{ err, redisUrl: REDIS_URL, channel: CHANNEL },
"Redis subscribe failed for SSE event bus",
);
});
subscriber.on("message", (_channel: string, message: string) => {
try {
const parsed = JSON.parse(message) as {
type: SseEventType;
payload: Record<string, unknown>;
timestamp: string;
audience?: SseAudience[];
};
publishLocal({
type: parsed.type,
payload: parsed.payload,
timestamp: parsed.timestamp,
audience: canonicalizeSseAudiences(parsed.audience),
});
} catch (err) {
logger.warn({ err, message }, "Failed to parse SSE Redis message");
}
});
} catch (e) {
logger.warn(
{ err: e, redisUrl: REDIS_URL, channel: CHANNEL },
"Redis unavailable, SSE event bus will run in local-only mode",
);
}
}
/**
* SSE Event Bus with Redis Pub/Sub for multi-instance support.
* Gracefully degrades to in-memory delivery when Redis is unavailable.
*/
class EventBus {
subscribe(fn: Subscriber, options: SseSubscriptionOptions = {}): () => void {
const subscription: Subscription = {
fn,
audiences: new Set(canonicalizeSseAudiences(options.audiences)),
includeUnscoped: options.includeUnscoped ?? true,
};
subscribers.add(subscription);
return () => subscribers.delete(subscription);
}
publish(event: SseEvent): void {
const normalizedEvent: SseEvent = {
...event,
audience: canonicalizeSseAudiences(event.audience),
};
// Broadcast via Redis (all instances receive via subscriber.on("message"))
try {
const pub = getPublisher();
void pub
.publish(
CHANNEL,
JSON.stringify({
type: normalizedEvent.type,
payload: normalizedEvent.payload,
timestamp: normalizedEvent.timestamp,
audience: normalizedEvent.audience,
}),
)
.catch((e: unknown) => {
logger.warn(
{ err: e, redisUrl: REDIS_URL, channel: CHANNEL },
"Redis publish promise rejected, falling back to local-only SSE delivery",
);
publishLocal(normalizedEvent);
});
} catch (e) {
logger.warn(
{ err: e, redisUrl: REDIS_URL, channel: CHANNEL },
"Redis publish failed, falling back to local-only SSE delivery",
);
// Deliver locally when Redis is unavailable
publishLocal(normalizedEvent);
}
}
emit(
type: SseEventType,
payload: Record<string, unknown>,
audience: Iterable<SseAudience> = [],
): void {
this.publish({
type,
payload,
timestamp: new Date().toISOString(),
audience: canonicalizeSseAudiences(audience),
});
}
get subscriberCount(): number {
return subscribers.size;
}
}
// Local delivery with debounce: buffer events of the same type and audience
// within a 50ms window and then deliver a single (possibly aggregated) event.
function publishLocal(event: SseEvent): void {
const audience = canonicalizeSseAudiences(event.audience);
const key = getBufferKey(event.type, audience);
const existing = debounceBuffer.get(key);
if (existing) {
// Another event of the same type is already buffered — append payload and
// reset the timer so the window starts fresh from the latest arrival.
existing.payloads.push(event.payload);
clearTimeout(existing.timer);
existing.timer = setTimeout(() => flushEventType(event.type, audience), DEBOUNCE_MS);
} else {
// First event of this type and audience — start a new debounce window.
debounceBuffer.set(key, {
payloads: [event.payload],
timer: setTimeout(() => flushEventType(event.type, audience), DEBOUNCE_MS),
firstTimestamp: event.timestamp,
audience,
});
}
}
// Singleton event bus
export const eventBus = new EventBus();
// Start Redis subscriber once at module init (best-effort)
setupSubscriber();
// Helper emitters
export const emitAllocationCreated = (allocation: Record<string, unknown>) =>
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, allocation, buildPlanningAudiences(allocation));
export const emitAllocationUpdated = (allocation: Record<string, unknown>) =>
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, allocation, buildPlanningAudiences(allocation));
export const emitAllocationDeleted = (
allocationId: string,
projectId: string,
resourceId?: string | null,
) =>
eventBus.emit(
SSE_EVENT_TYPES.ALLOCATION_DELETED,
{ allocationId, projectId, ...(resourceId ? { resourceId } : {}) },
buildPlanningAudiences({ allocationId, projectId, ...(resourceId ? { resourceId } : {}) }),
);
export const emitProjectShifted = (project: Record<string, unknown>) =>
eventBus.emit(SSE_EVENT_TYPES.PROJECT_SHIFTED, project, buildPlanningAudiences(project));
export const emitBudgetWarning = (projectId: string, payload: Record<string, unknown>) =>
eventBus.emit(SSE_EVENT_TYPES.BUDGET_WARNING, { projectId, ...payload }, [
permissionAudience(PermissionKey.MANAGE_ALLOCATIONS),
]);
export const emitVacationCreated = (vacation: Record<string, unknown>) =>
eventBus.emit(SSE_EVENT_TYPES.VACATION_CREATED, vacation, buildPlanningAudiences(vacation));
export const emitVacationUpdated = (vacation: Record<string, unknown>) =>
eventBus.emit(SSE_EVENT_TYPES.VACATION_UPDATED, vacation, buildPlanningAudiences(vacation));
export const emitVacationDeleted = (vacationId: string, resourceId: string) =>
eventBus.emit(
SSE_EVENT_TYPES.VACATION_DELETED,
{ vacationId, resourceId },
buildPlanningAudiences({ vacationId, resourceId }),
);
export const emitRoleCreated = (role: Record<string, unknown>) =>
eventBus.emit(SSE_EVENT_TYPES.ROLE_CREATED, role, [
permissionAudience(PermissionKey.MANAGE_ROLES),
]);
export const emitRoleUpdated = (role: Record<string, unknown>) =>
eventBus.emit(SSE_EVENT_TYPES.ROLE_UPDATED, role, [
permissionAudience(PermissionKey.MANAGE_ROLES),
]);
export const emitRoleDeleted = (roleId: string) =>
eventBus.emit(SSE_EVENT_TYPES.ROLE_DELETED, { roleId }, [
permissionAudience(PermissionKey.MANAGE_ROLES),
]);
export function emitNotificationCreated(userId: string, notificationId: string): void {
eventBus.emit(SSE_EVENT_TYPES.NOTIFICATION_CREATED, { userId, notificationId }, [
userAudience(userId),
]);
}
export function emitTaskAssigned(userId: string, notificationId: string): void {
eventBus.emit(SSE_EVENT_TYPES.TASK_ASSIGNED, { userId, notificationId }, [userAudience(userId)]);
}
export function emitTaskCompleted(userId: string, notificationId: string): void {
eventBus.emit(SSE_EVENT_TYPES.TASK_COMPLETED, { userId, notificationId }, [userAudience(userId)]);
}
export function emitTaskStatusChanged(userId: string, notificationId: string): void {
eventBus.emit(SSE_EVENT_TYPES.TASK_STATUS_CHANGED, { userId, notificationId }, [
userAudience(userId),
]);
}
export function emitReminderDue(userId: string, notificationId: string): void {
eventBus.emit(SSE_EVENT_TYPES.REMINDER_DUE, { userId, notificationId }, [userAudience(userId)]);
}