From ac29ce356761def558f457f2aeacc775fb6ef956 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hartmut=20N=C3=B6renberg?= Date: Tue, 31 Mar 2026 22:56:12 +0200 Subject: [PATCH] refactor(sse): narrow canonical audience scopes --- docs/sse-audience-architecture.md | 6 +- .../src/__tests__/event-bus-debounce.test.ts | 20 +++++++ .../__tests__/sse-subscription-policy.test.ts | 4 +- packages/api/src/sse/event-bus.ts | 56 +++++++++++++------ packages/api/src/sse/subscription-policy.ts | 2 - 5 files changed, 64 insertions(+), 24 deletions(-) diff --git a/docs/sse-audience-architecture.md b/docs/sse-audience-architecture.md index bc0e524..0f90939 100644 --- a/docs/sse-audience-architecture.md +++ b/docs/sse-audience-architecture.md @@ -5,6 +5,7 @@ - SSE subscriptions are derived on the server from the authenticated database user. - Clients do not provide arbitrary audience lists. - Every scoped event declares its audience explicitly. +- Invalid audience strings fail fast in the event bus. - Unscoped delivery is disabled for authenticated timeline subscriptions. ## Audience Model @@ -12,10 +13,11 @@ Supported canonical audiences: - `user:` for personal notifications and tasks -- `role:` for role-wide operational events - `permission:` for explicit capability audiences - `resource:` for self-service or affected-resource updates +Any other prefix is invalid and must be rejected instead of being accepted silently. + ## Subscription Flow 1. The web SSE route authenticates the session and loads the database user. @@ -28,7 +30,7 @@ Supported canonical audiences: - Personal events such as notifications must target `user:`. - Planning events may target both `permission:manageAllocations` and affected `resource:` audiences. -- Broad operational events must use the smallest real audience that matches the use case. +- Broad operational events must use the smallest real audience that matches the use case, typically `permission:*` rather than role fan-out. ## Contract Tests diff --git a/packages/api/src/__tests__/event-bus-debounce.test.ts b/packages/api/src/__tests__/event-bus-debounce.test.ts index 14ba2ef..d3798e2 100644 --- a/packages/api/src/__tests__/event-bus-debounce.test.ts +++ b/packages/api/src/__tests__/event-bus-debounce.test.ts @@ -265,4 +265,24 @@ describe("event-bus debounce", () => { unsubscribeManager(); unsubscribeResource(); }); + + it("rejects invalid subscription audiences", () => { + expect(() => + eventBus.subscribe( + () => undefined, + { + audiences: ["chapter:capex" as never], + includeUnscoped: false, + }, + )).toThrowError("Invalid SSE audience: chapter:capex"); + }); + + it("rejects invalid emitted audiences", () => { + expect(() => + eventBus.emit( + SSE_EVENT_TYPES.NOTIFICATION_CREATED, + { notificationId: "n1" }, + ["user:" as never], + )).toThrowError("Invalid SSE audience: user:"); + }); }); diff --git a/packages/api/src/__tests__/sse-subscription-policy.test.ts b/packages/api/src/__tests__/sse-subscription-policy.test.ts index 9cf1ce6..8f60aca 100644 --- a/packages/api/src/__tests__/sse-subscription-policy.test.ts +++ b/packages/api/src/__tests__/sse-subscription-policy.test.ts @@ -5,7 +5,6 @@ import { eventBus, permissionAudience, resourceAudience, - roleAudience, type SseEvent, userAudience, } from "../sse/event-bus.js"; @@ -36,7 +35,7 @@ describe("sse subscription policy", () => { vi.useRealTimers(); }); - it("derives canonical user, role, resource, and permission audiences server-side", () => { + it("derives canonical user, resource, and permission audiences server-side", () => { const subscription = deriveUserSseSubscription({ userId: "user_1", systemRole: SystemRole.USER, @@ -55,7 +54,6 @@ describe("sse subscription policy", () => { permissionAudience(PermissionKey.MANAGE_ALLOCATIONS), permissionAudience(PermissionKey.VIEW_PLANNING), resourceAudience("res_1"), - roleAudience(SystemRole.USER), userAudience("user_1"), ]); }); diff --git a/packages/api/src/sse/event-bus.ts b/packages/api/src/sse/event-bus.ts index 81026aa..e88aaa8 100644 --- a/packages/api/src/sse/event-bus.ts +++ b/packages/api/src/sse/event-bus.ts @@ -1,11 +1,13 @@ import { Redis } from "ioredis"; -import { PermissionKey, SSE_EVENT_TYPES, SystemRole, type SseEventType } from "@capakraken/shared"; +import { PermissionKey, SSE_EVENT_TYPES, type SseEventType } from "@capakraken/shared"; +import { logger } from "../lib/logger.js"; export type UserSseAudience = `user:${string}`; -export type RoleSseAudience = `role:${string}`; export type PermissionSseAudience = `permission:${string}`; export type ResourceSseAudience = `resource:${string}`; -export type SseAudience = UserSseAudience | RoleSseAudience | PermissionSseAudience | ResourceSseAudience; +export type SseAudience = UserSseAudience | PermissionSseAudience | ResourceSseAudience; +const SSE_AUDIENCE_PREFIXES = ["user", "permission", "resource"] as const; +type SseAudiencePrefix = (typeof SSE_AUDIENCE_PREFIXES)[number]; export interface SseEvent { type: SseEventType; @@ -46,8 +48,32 @@ interface BufferEntry { const debounceBuffer = new Map(); +function isSseAudiencePrefix(value: string): value is SseAudiencePrefix { + return SSE_AUDIENCE_PREFIXES.includes(value as SseAudiencePrefix); +} + +function assertValidSseAudience(audience: string): SseAudience { + const normalized = audience.trim(); + if (!normalized) { + throw new Error("Invalid SSE audience: empty value"); + } + + const separatorIndex = normalized.indexOf(":"); + if (separatorIndex <= 0 || separatorIndex === normalized.length - 1) { + throw new Error(`Invalid SSE audience: ${normalized}`); + } + + const prefix = normalized.slice(0, separatorIndex); + const scope = normalized.slice(separatorIndex + 1).trim(); + if (!isSseAudiencePrefix(prefix) || !scope) { + throw new Error(`Invalid SSE audience: ${normalized}`); + } + + return `${prefix}:${scope}` as SseAudience; +} + export function canonicalizeSseAudiences(audiences?: Iterable): SseAudience[] { - return [...new Set(Array.from(audiences ?? [], (audience) => audience.trim() as SseAudience).filter(Boolean))].sort(); + return [...new Set(Array.from(audiences ?? [], (audience) => assertValidSseAudience(audience)).filter(Boolean))].sort(); } function getBufferKey(type: SseEventType, audience: readonly SseAudience[]): string { @@ -70,7 +96,6 @@ function deliverEvent(event: SseEvent): void { } export const userAudience = (userId: string): SseAudience => `user:${userId}`; -export const roleAudience = (role: string): SseAudience => `role:${role}`; export const permissionAudience = (permission: string): SseAudience => `permission:${permission}`; export const resourceAudience = (resourceId: string): SseAudience => `resource:${resourceId}`; @@ -160,7 +185,9 @@ let subscriber: Redis | null = null; function getPublisher(): Redis { if (!publisher) { publisher = new Redis(REDIS_URL, { lazyConnect: false, enableReadyCheck: false }); - publisher.on("error", (e: unknown) => console.error("[Redis publisher]", e)); + publisher.on("error", (e: unknown) => { + logger.warn({ err: e, redisUrl: REDIS_URL, channel: CHANNEL }, "Redis publisher emitted an error"); + }); } return publisher; } @@ -169,9 +196,11 @@ function setupSubscriber(): void { if (subscriber) return; try { subscriber = new Redis(REDIS_URL, { lazyConnect: false, enableReadyCheck: false }); - subscriber.on("error", (e: unknown) => console.error("[Redis subscriber]", e)); + subscriber.on("error", (e: unknown) => { + logger.warn({ err: e, redisUrl: REDIS_URL, channel: CHANNEL }, "Redis subscriber emitted an error"); + }); void subscriber.subscribe(CHANNEL).catch((err: unknown) => { - console.error("[Redis subscribe]", err); + logger.warn({ err, redisUrl: REDIS_URL, channel: CHANNEL }, "Redis subscribe failed for SSE event bus"); }); subscriber.on("message", (_channel: string, message: string) => { try { @@ -192,7 +221,7 @@ function setupSubscriber(): void { } }); } catch (e) { - console.warn("[Redis setupSubscriber] Redis unavailable, SSE will be local-only:", e); + logger.warn({ err: e, redisUrl: REDIS_URL, channel: CHANNEL }, "Redis unavailable, SSE event bus will run in local-only mode"); } } @@ -230,7 +259,7 @@ class EventBus { }), ); } catch (e) { - console.warn("[Redis emit] fallback to local-only:", e); + logger.warn({ err: e, redisUrl: REDIS_URL, channel: CHANNEL }, "Redis publish failed, falling back to local-only SSE delivery"); // Deliver locally when Redis is unavailable publishLocal(normalizedEvent); } @@ -345,10 +374,3 @@ export function emitTaskStatusChanged(userId: string, notificationId: string): v export function emitReminderDue(userId: string, notificationId: string): void { eventBus.emit(SSE_EVENT_TYPES.REMINDER_DUE, { userId, notificationId }, [userAudience(userId)]); } - -export function emitBroadcastSent(broadcastId: string, recipientCount: number): void { - eventBus.emit(SSE_EVENT_TYPES.BROADCAST_SENT, { broadcastId, recipientCount }, [ - roleAudience(SystemRole.ADMIN), - roleAudience(SystemRole.MANAGER), - ]); -} diff --git a/packages/api/src/sse/subscription-policy.ts b/packages/api/src/sse/subscription-policy.ts index aba065a..b8efb92 100644 --- a/packages/api/src/sse/subscription-policy.ts +++ b/packages/api/src/sse/subscription-policy.ts @@ -3,7 +3,6 @@ import { canonicalizeSseAudiences, permissionAudience, resourceAudience, - roleAudience, type SseAudience, type SseSubscriptionOptions, userAudience, @@ -35,7 +34,6 @@ export function deriveUserSseSubscription( return { audiences: canonicalizeSseAudiences([ userAudience(identity.userId), - roleAudience(identity.systemRole), ...(identity.resourceId ? [resourceAudience(identity.resourceId)] : []), ...Array.from(permissions, (permission) => permissionAudience(permission)), ]),