From 82466a4e34a1b754daa876e83ccbd1e74f06d170 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hartmut=20N=C3=B6renberg?= Date: Mon, 30 Mar 2026 14:20:18 +0200 Subject: [PATCH] fix(api): derive secure sse subscriptions --- apps/web/src/app/api/sse/timeline/route.ts | 29 ++- docs/sse-audience-architecture.md | 45 +++++ packages/api/package.json | 2 +- .../__tests__/sse-subscription-policy.test.ts | 165 ++++++++++++++++++ packages/api/src/sse/event-bus.ts | 22 ++- packages/api/src/sse/index.ts | 2 + packages/api/src/sse/subscription-policy.ts | 45 +++++ 7 files changed, 281 insertions(+), 29 deletions(-) create mode 100644 docs/sse-audience-architecture.md create mode 100644 packages/api/src/__tests__/sse-subscription-policy.test.ts create mode 100644 packages/api/src/sse/index.ts create mode 100644 packages/api/src/sse/subscription-policy.ts diff --git a/apps/web/src/app/api/sse/timeline/route.ts b/apps/web/src/app/api/sse/timeline/route.ts index 67ed344..2b4e085 100644 --- a/apps/web/src/app/api/sse/timeline/route.ts +++ b/apps/web/src/app/api/sse/timeline/route.ts @@ -1,8 +1,8 @@ import { loadRoleDefaults } from "@capakraken/api"; -import { eventBus, permissionAudience, resourceAudience, roleAudience, userAudience } from "@capakraken/api/sse"; +import { deriveUserSseSubscription, eventBus } from "@capakraken/api/sse"; import { startReminderScheduler } from "@capakraken/api/lib/reminder-scheduler"; import { prisma } from "@capakraken/db"; -import { resolvePermissions, SSE_EVENT_TYPES, SystemRole, type PermissionOverrides } from "@capakraken/shared"; +import { SSE_EVENT_TYPES, SystemRole, type PermissionOverrides } from "@capakraken/shared"; import { auth } from "~/server/auth.js"; // Start the reminder scheduler (idempotent — only starts once) @@ -42,21 +42,15 @@ export async function GET() { } const roleDefaults = await loadRoleDefaults(); - const permissions = resolvePermissions( - dbUser.systemRole as SystemRole, - dbUser.permissionOverrides as PermissionOverrides | null, + const subscription = deriveUserSseSubscription( + { + userId: dbUser.id, + systemRole: dbUser.systemRole as SystemRole, + permissionOverrides: dbUser.permissionOverrides as PermissionOverrides | null, + resourceId: dbUser.resource?.id ?? null, + }, roleDefaults, ); - const audiences = new Set([ - userAudience(dbUser.id), - roleAudience(dbUser.systemRole), - ]); - if (dbUser.resource?.id) { - audiences.add(resourceAudience(dbUser.resource.id)); - } - for (const permission of permissions) { - audiences.add(permissionAudience(permission)); - } const encoder = new TextEncoder(); @@ -76,10 +70,7 @@ export async function GET() { // Client disconnected } }, - { - audiences, - includeUnscoped: false, - }, + subscription, ); // Heartbeat every 30 seconds diff --git a/docs/sse-audience-architecture.md b/docs/sse-audience-architecture.md new file mode 100644 index 0000000..bc0e524 --- /dev/null +++ b/docs/sse-audience-architecture.md @@ -0,0 +1,45 @@ +**Purpose:** Define the secure audience model for realtime SSE delivery so subscribers only receive events that match their server-derived identity and permissions. + +## Core Rules + +- SSE subscriptions are derived on the server from the authenticated database user. +- Clients do not provide arbitrary audience lists. +- Every scoped event declares its audience explicitly. +- Unscoped delivery is disabled for authenticated timeline subscriptions. + +## Audience Model + +Supported canonical audiences: + +- `user:` for personal notifications and tasks +- `role:` for role-wide operational events +- `permission:` for explicit capability audiences +- `resource:` for self-service or affected-resource updates + +## Subscription Flow + +1. The web SSE route authenticates the session and loads the database user. +2. The API derives the effective permission set from `systemRole + permissionOverrides + roleDefaults`. +3. The API builds the canonical subscription audiences from that trusted identity. +4. The event bus subscribes with `includeUnscoped: false`. +5. Only events whose audience intersects with the derived audience set are delivered. + +## Event Rules + +- Personal events such as notifications must target `user:`. +- Planning events may target both `permission:manageAllocations` and affected `resource:` audiences. +- Broad operational events must use the smallest real audience that matches the use case. + +## Contract Tests + +The minimum regression suite for this architecture is: + +- a standard user does not receive manager planning traffic +- an elevated user does not receive another user's personal events +- multi-audience planning delivery reaches only the matching manager/resource subscribers + +## Migration Guidance + +- New emitters must declare their audiences explicitly instead of relying on global fan-out. +- New SSE endpoints should reuse the same server-side audience derivation instead of rebuilding ad-hoc filters in route handlers. +- If a future feature needs a new audience class, document the audience source of truth before adding the prefix. diff --git a/packages/api/package.json b/packages/api/package.json index 2efd570..7c5d533 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -7,7 +7,7 @@ ".": "./src/index.ts", "./router": "./src/router/index.ts", "./trpc": "./src/trpc.ts", - "./sse": "./src/sse/event-bus.ts", + "./sse": "./src/sse/index.ts", "./lib/audit": "./src/lib/audit.ts", "./lib/reminder-scheduler": "./src/lib/reminder-scheduler.ts", "./lib/logger": "./src/lib/logger.ts", diff --git a/packages/api/src/__tests__/sse-subscription-policy.test.ts b/packages/api/src/__tests__/sse-subscription-policy.test.ts new file mode 100644 index 0000000..9cf1ce6 --- /dev/null +++ b/packages/api/src/__tests__/sse-subscription-policy.test.ts @@ -0,0 +1,165 @@ +import { PermissionKey, SSE_EVENT_TYPES, SystemRole } from "@capakraken/shared"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { + cancelPendingEvents, + eventBus, + permissionAudience, + resourceAudience, + roleAudience, + type SseEvent, + userAudience, +} from "../sse/event-bus.js"; +import { deriveUserSseSubscription } from "../sse/subscription-policy.js"; + +vi.mock("ioredis", () => { + const RedisMock = vi.fn().mockImplementation(() => ({ + on: vi.fn(), + subscribe: vi.fn().mockResolvedValue(undefined), + publish: vi.fn().mockImplementation(() => { + throw new Error("Redis unavailable (test)"); + }), + })); + return { Redis: RedisMock }; +}); + +describe("sse subscription policy", () => { + let consoleWarnSpy: ReturnType; + + beforeEach(() => { + vi.useFakeTimers(); + consoleWarnSpy = vi.spyOn(console, "warn").mockImplementation(() => undefined); + }); + + afterEach(() => { + cancelPendingEvents(); + consoleWarnSpy.mockRestore(); + vi.useRealTimers(); + }); + + it("derives canonical user, role, resource, and permission audiences server-side", () => { + const subscription = deriveUserSseSubscription({ + userId: "user_1", + systemRole: SystemRole.USER, + resourceId: "res_1", + permissionOverrides: { + granted: [PermissionKey.VIEW_PLANNING, PermissionKey.MANAGE_ALLOCATIONS], + }, + }); + + expect(subscription.includeUnscoped).toBe(false); + expect(subscription.permissions).toEqual(new Set([ + PermissionKey.VIEW_PLANNING, + PermissionKey.MANAGE_ALLOCATIONS, + ])); + expect(subscription.audiences).toEqual([ + permissionAudience(PermissionKey.MANAGE_ALLOCATIONS), + permissionAudience(PermissionKey.VIEW_PLANNING), + resourceAudience("res_1"), + roleAudience(SystemRole.USER), + userAudience("user_1"), + ]); + }); + + it("does not deliver planning events to a standard user without the matching audience", () => { + const received: SseEvent[] = []; + const unsubscribe = eventBus.subscribe( + (event) => { + received.push(event); + }, + deriveUserSseSubscription({ + userId: "user_1", + systemRole: SystemRole.USER, + resourceId: "res_self", + }), + ); + + eventBus.emit( + SSE_EVENT_TYPES.ALLOCATION_UPDATED, + { id: "allocation_1", resourceId: "res_other" }, + [permissionAudience(PermissionKey.MANAGE_ALLOCATIONS), resourceAudience("res_other")], + ); + + vi.advanceTimersByTime(50); + + expect(received).toHaveLength(0); + unsubscribe(); + }); + + it("does not leak personal notifications to a manager subscribed with elevated permissions", () => { + const received: SseEvent[] = []; + const unsubscribe = eventBus.subscribe( + (event) => { + received.push(event); + }, + deriveUserSseSubscription({ + userId: "manager_1", + systemRole: SystemRole.MANAGER, + }), + ); + + eventBus.emit( + SSE_EVENT_TYPES.NOTIFICATION_CREATED, + { notificationId: "notification_1", userId: "user_2" }, + [userAudience("user_2")], + ); + + vi.advanceTimersByTime(50); + + expect(received).toHaveLength(0); + unsubscribe(); + }); + + it("delivers a multi-audience planning event only to matching manager and affected resource subscribers", () => { + const managerReceived: SseEvent[] = []; + const affectedUserReceived: SseEvent[] = []; + const unrelatedUserReceived: SseEvent[] = []; + + const unsubscribeManager = eventBus.subscribe( + (event) => { + managerReceived.push(event); + }, + deriveUserSseSubscription({ + userId: "manager_1", + systemRole: SystemRole.MANAGER, + }), + ); + + const unsubscribeAffectedUser = eventBus.subscribe( + (event) => { + affectedUserReceived.push(event); + }, + deriveUserSseSubscription({ + userId: "user_1", + systemRole: SystemRole.USER, + resourceId: "res_1", + }), + ); + + const unsubscribeUnrelatedUser = eventBus.subscribe( + (event) => { + unrelatedUserReceived.push(event); + }, + deriveUserSseSubscription({ + userId: "user_2", + systemRole: SystemRole.USER, + resourceId: "res_2", + }), + ); + + eventBus.emit( + SSE_EVENT_TYPES.ALLOCATION_UPDATED, + { id: "allocation_1", resourceId: "res_1" }, + [permissionAudience(PermissionKey.MANAGE_ALLOCATIONS), resourceAudience("res_1")], + ); + + vi.advanceTimersByTime(50); + + expect(managerReceived).toHaveLength(1); + expect(affectedUserReceived).toHaveLength(1); + expect(unrelatedUserReceived).toHaveLength(0); + + unsubscribeManager(); + unsubscribeAffectedUser(); + unsubscribeUnrelatedUser(); + }); +}); diff --git a/packages/api/src/sse/event-bus.ts b/packages/api/src/sse/event-bus.ts index b49a29a..81026aa 100644 --- a/packages/api/src/sse/event-bus.ts +++ b/packages/api/src/sse/event-bus.ts @@ -1,7 +1,11 @@ import { Redis } from "ioredis"; import { PermissionKey, SSE_EVENT_TYPES, SystemRole, type SseEventType } from "@capakraken/shared"; -export type SseAudience = string; +export type UserSseAudience = `user:${string}`; +export type RoleSseAudience = `role:${string}`; +export type PermissionSseAudience = `permission:${string}`; +export type ResourceSseAudience = `resource:${string}`; +export type SseAudience = UserSseAudience | RoleSseAudience | PermissionSseAudience | ResourceSseAudience; export interface SseEvent { type: SseEventType; @@ -42,8 +46,8 @@ interface BufferEntry { const debounceBuffer = new Map(); -function normalizeAudiences(audiences?: Iterable): SseAudience[] { - return [...new Set(Array.from(audiences ?? [], (audience) => audience.trim()).filter(Boolean))].sort(); +export function canonicalizeSseAudiences(audiences?: Iterable): SseAudience[] { + return [...new Set(Array.from(audiences ?? [], (audience) => audience.trim() as SseAudience).filter(Boolean))].sort(); } function getBufferKey(type: SseEventType, audience: readonly SseAudience[]): string { @@ -91,7 +95,7 @@ function extractScopedResourceIds(payload: Record): string[] { } function buildPlanningAudiences(payload: Record): SseAudience[] { - return normalizeAudiences([ + return canonicalizeSseAudiences([ permissionAudience(PermissionKey.MANAGE_ALLOCATIONS), ...extractScopedResourceIds(payload).map((resourceId) => resourceAudience(resourceId)), ]); @@ -181,7 +185,7 @@ function setupSubscriber(): void { type: parsed.type, payload: parsed.payload, timestamp: parsed.timestamp, - audience: normalizeAudiences(parsed.audience), + audience: canonicalizeSseAudiences(parsed.audience), }); } catch { // ignore parse errors @@ -200,7 +204,7 @@ class EventBus { subscribe(fn: Subscriber, options: SseSubscriptionOptions = {}): () => void { const subscription: Subscription = { fn, - audiences: new Set(normalizeAudiences(options.audiences)), + audiences: new Set(canonicalizeSseAudiences(options.audiences)), includeUnscoped: options.includeUnscoped ?? true, }; subscribers.add(subscription); @@ -210,7 +214,7 @@ class EventBus { publish(event: SseEvent): void { const normalizedEvent: SseEvent = { ...event, - audience: normalizeAudiences(event.audience), + audience: canonicalizeSseAudiences(event.audience), }; // Broadcast via Redis (all instances receive via subscriber.on("message")) @@ -237,7 +241,7 @@ class EventBus { type, payload, timestamp: new Date().toISOString(), - audience: normalizeAudiences(audience), + audience: canonicalizeSseAudiences(audience), }); } @@ -249,7 +253,7 @@ class EventBus { // Local delivery with debounce: buffer events of the same type and audience // within a 50ms window and then deliver a single (possibly aggregated) event. function publishLocal(event: SseEvent): void { - const audience = normalizeAudiences(event.audience); + const audience = canonicalizeSseAudiences(event.audience); const key = getBufferKey(event.type, audience); const existing = debounceBuffer.get(key); diff --git a/packages/api/src/sse/index.ts b/packages/api/src/sse/index.ts new file mode 100644 index 0000000..35e2f31 --- /dev/null +++ b/packages/api/src/sse/index.ts @@ -0,0 +1,2 @@ +export * from "./event-bus.js"; +export * from "./subscription-policy.js"; diff --git a/packages/api/src/sse/subscription-policy.ts b/packages/api/src/sse/subscription-policy.ts new file mode 100644 index 0000000..aba065a --- /dev/null +++ b/packages/api/src/sse/subscription-policy.ts @@ -0,0 +1,45 @@ +import { resolvePermissions, type PermissionKey, type PermissionOverrides, SystemRole } from "@capakraken/shared"; +import { + canonicalizeSseAudiences, + permissionAudience, + resourceAudience, + roleAudience, + type SseAudience, + type SseSubscriptionOptions, + userAudience, +} from "./event-bus.js"; + +export interface SseSubscriberIdentity { + userId: string; + systemRole: SystemRole; + permissionOverrides?: PermissionOverrides | null; + resourceId?: string | null; +} + +export interface DerivedSseSubscription extends SseSubscriptionOptions { + audiences: SseAudience[]; + permissions: Set; + includeUnscoped: false; +} + +export function deriveUserSseSubscription( + identity: SseSubscriberIdentity, + roleDefaults?: Record, +): DerivedSseSubscription { + const permissions = resolvePermissions( + identity.systemRole, + identity.permissionOverrides ?? null, + roleDefaults, + ); + + return { + audiences: canonicalizeSseAudiences([ + userAudience(identity.userId), + roleAudience(identity.systemRole), + ...(identity.resourceId ? [resourceAudience(identity.resourceId)] : []), + ...Array.from(permissions, (permission) => permissionAudience(permission)), + ]), + permissions, + includeUnscoped: false, + }; +}