fix(api): derive secure sse subscriptions

This commit is contained in:
2026-03-30 14:20:18 +02:00
parent 27b0e38b93
commit 82466a4e34
7 changed files with 281 additions and 29 deletions
+10 -19
View File
@@ -1,8 +1,8 @@
import { loadRoleDefaults } from "@capakraken/api"; import { loadRoleDefaults } from "@capakraken/api";
import { eventBus, permissionAudience, resourceAudience, roleAudience, userAudience } from "@capakraken/api/sse"; import { deriveUserSseSubscription, eventBus } from "@capakraken/api/sse";
import { startReminderScheduler } from "@capakraken/api/lib/reminder-scheduler"; import { startReminderScheduler } from "@capakraken/api/lib/reminder-scheduler";
import { prisma } from "@capakraken/db"; import { prisma } from "@capakraken/db";
import { resolvePermissions, SSE_EVENT_TYPES, SystemRole, type PermissionOverrides } from "@capakraken/shared"; import { SSE_EVENT_TYPES, SystemRole, type PermissionOverrides } from "@capakraken/shared";
import { auth } from "~/server/auth.js"; import { auth } from "~/server/auth.js";
// Start the reminder scheduler (idempotent — only starts once) // Start the reminder scheduler (idempotent — only starts once)
@@ -42,21 +42,15 @@ export async function GET() {
} }
const roleDefaults = await loadRoleDefaults(); const roleDefaults = await loadRoleDefaults();
const permissions = resolvePermissions( const subscription = deriveUserSseSubscription(
dbUser.systemRole as SystemRole, {
dbUser.permissionOverrides as PermissionOverrides | null, userId: dbUser.id,
systemRole: dbUser.systemRole as SystemRole,
permissionOverrides: dbUser.permissionOverrides as PermissionOverrides | null,
resourceId: dbUser.resource?.id ?? null,
},
roleDefaults, roleDefaults,
); );
const audiences = new Set<string>([
userAudience(dbUser.id),
roleAudience(dbUser.systemRole),
]);
if (dbUser.resource?.id) {
audiences.add(resourceAudience(dbUser.resource.id));
}
for (const permission of permissions) {
audiences.add(permissionAudience(permission));
}
const encoder = new TextEncoder(); const encoder = new TextEncoder();
@@ -76,10 +70,7 @@ export async function GET() {
// Client disconnected // Client disconnected
} }
}, },
{ subscription,
audiences,
includeUnscoped: false,
},
); );
// Heartbeat every 30 seconds // Heartbeat every 30 seconds
+45
View File
@@ -0,0 +1,45 @@
**Purpose:** Define the secure audience model for realtime SSE delivery so subscribers only receive events that match their server-derived identity and permissions.
## Core Rules
- SSE subscriptions are derived on the server from the authenticated database user.
- Clients do not provide arbitrary audience lists.
- Every scoped event declares its audience explicitly.
- Unscoped delivery is disabled for authenticated timeline subscriptions.
## Audience Model
Supported canonical audiences:
- `user:<userId>` for personal notifications and tasks
- `role:<systemRole>` for role-wide operational events
- `permission:<permissionKey>` for explicit capability audiences
- `resource:<resourceId>` for self-service or affected-resource updates
## Subscription Flow
1. The web SSE route authenticates the session and loads the database user.
2. The API derives the effective permission set from `systemRole + permissionOverrides + roleDefaults`.
3. The API builds the canonical subscription audiences from that trusted identity.
4. The event bus subscribes with `includeUnscoped: false`.
5. Only events whose audience intersects with the derived audience set are delivered.
## Event Rules
- Personal events such as notifications must target `user:<userId>`.
- Planning events may target both `permission:manageAllocations` and affected `resource:<resourceId>` audiences.
- Broad operational events must use the smallest real audience that matches the use case.
## Contract Tests
The minimum regression suite for this architecture is:
- a standard user does not receive manager planning traffic
- an elevated user does not receive another user's personal events
- multi-audience planning delivery reaches only the matching manager/resource subscribers
## Migration Guidance
- New emitters must declare their audiences explicitly instead of relying on global fan-out.
- New SSE endpoints should reuse the same server-side audience derivation instead of rebuilding ad-hoc filters in route handlers.
- If a future feature needs a new audience class, document the audience source of truth before adding the prefix.
+1 -1
View File
@@ -7,7 +7,7 @@
".": "./src/index.ts", ".": "./src/index.ts",
"./router": "./src/router/index.ts", "./router": "./src/router/index.ts",
"./trpc": "./src/trpc.ts", "./trpc": "./src/trpc.ts",
"./sse": "./src/sse/event-bus.ts", "./sse": "./src/sse/index.ts",
"./lib/audit": "./src/lib/audit.ts", "./lib/audit": "./src/lib/audit.ts",
"./lib/reminder-scheduler": "./src/lib/reminder-scheduler.ts", "./lib/reminder-scheduler": "./src/lib/reminder-scheduler.ts",
"./lib/logger": "./src/lib/logger.ts", "./lib/logger": "./src/lib/logger.ts",
@@ -0,0 +1,165 @@
import { PermissionKey, SSE_EVENT_TYPES, SystemRole } from "@capakraken/shared";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import {
cancelPendingEvents,
eventBus,
permissionAudience,
resourceAudience,
roleAudience,
type SseEvent,
userAudience,
} from "../sse/event-bus.js";
import { deriveUserSseSubscription } from "../sse/subscription-policy.js";
vi.mock("ioredis", () => {
const RedisMock = vi.fn().mockImplementation(() => ({
on: vi.fn(),
subscribe: vi.fn().mockResolvedValue(undefined),
publish: vi.fn().mockImplementation(() => {
throw new Error("Redis unavailable (test)");
}),
}));
return { Redis: RedisMock };
});
describe("sse subscription policy", () => {
let consoleWarnSpy: ReturnType<typeof vi.spyOn>;
beforeEach(() => {
vi.useFakeTimers();
consoleWarnSpy = vi.spyOn(console, "warn").mockImplementation(() => undefined);
});
afterEach(() => {
cancelPendingEvents();
consoleWarnSpy.mockRestore();
vi.useRealTimers();
});
it("derives canonical user, role, resource, and permission audiences server-side", () => {
const subscription = deriveUserSseSubscription({
userId: "user_1",
systemRole: SystemRole.USER,
resourceId: "res_1",
permissionOverrides: {
granted: [PermissionKey.VIEW_PLANNING, PermissionKey.MANAGE_ALLOCATIONS],
},
});
expect(subscription.includeUnscoped).toBe(false);
expect(subscription.permissions).toEqual(new Set([
PermissionKey.VIEW_PLANNING,
PermissionKey.MANAGE_ALLOCATIONS,
]));
expect(subscription.audiences).toEqual([
permissionAudience(PermissionKey.MANAGE_ALLOCATIONS),
permissionAudience(PermissionKey.VIEW_PLANNING),
resourceAudience("res_1"),
roleAudience(SystemRole.USER),
userAudience("user_1"),
]);
});
it("does not deliver planning events to a standard user without the matching audience", () => {
const received: SseEvent[] = [];
const unsubscribe = eventBus.subscribe(
(event) => {
received.push(event);
},
deriveUserSseSubscription({
userId: "user_1",
systemRole: SystemRole.USER,
resourceId: "res_self",
}),
);
eventBus.emit(
SSE_EVENT_TYPES.ALLOCATION_UPDATED,
{ id: "allocation_1", resourceId: "res_other" },
[permissionAudience(PermissionKey.MANAGE_ALLOCATIONS), resourceAudience("res_other")],
);
vi.advanceTimersByTime(50);
expect(received).toHaveLength(0);
unsubscribe();
});
it("does not leak personal notifications to a manager subscribed with elevated permissions", () => {
const received: SseEvent[] = [];
const unsubscribe = eventBus.subscribe(
(event) => {
received.push(event);
},
deriveUserSseSubscription({
userId: "manager_1",
systemRole: SystemRole.MANAGER,
}),
);
eventBus.emit(
SSE_EVENT_TYPES.NOTIFICATION_CREATED,
{ notificationId: "notification_1", userId: "user_2" },
[userAudience("user_2")],
);
vi.advanceTimersByTime(50);
expect(received).toHaveLength(0);
unsubscribe();
});
it("delivers a multi-audience planning event only to matching manager and affected resource subscribers", () => {
const managerReceived: SseEvent[] = [];
const affectedUserReceived: SseEvent[] = [];
const unrelatedUserReceived: SseEvent[] = [];
const unsubscribeManager = eventBus.subscribe(
(event) => {
managerReceived.push(event);
},
deriveUserSseSubscription({
userId: "manager_1",
systemRole: SystemRole.MANAGER,
}),
);
const unsubscribeAffectedUser = eventBus.subscribe(
(event) => {
affectedUserReceived.push(event);
},
deriveUserSseSubscription({
userId: "user_1",
systemRole: SystemRole.USER,
resourceId: "res_1",
}),
);
const unsubscribeUnrelatedUser = eventBus.subscribe(
(event) => {
unrelatedUserReceived.push(event);
},
deriveUserSseSubscription({
userId: "user_2",
systemRole: SystemRole.USER,
resourceId: "res_2",
}),
);
eventBus.emit(
SSE_EVENT_TYPES.ALLOCATION_UPDATED,
{ id: "allocation_1", resourceId: "res_1" },
[permissionAudience(PermissionKey.MANAGE_ALLOCATIONS), resourceAudience("res_1")],
);
vi.advanceTimersByTime(50);
expect(managerReceived).toHaveLength(1);
expect(affectedUserReceived).toHaveLength(1);
expect(unrelatedUserReceived).toHaveLength(0);
unsubscribeManager();
unsubscribeAffectedUser();
unsubscribeUnrelatedUser();
});
});
+13 -9
View File
@@ -1,7 +1,11 @@
import { Redis } from "ioredis"; import { Redis } from "ioredis";
import { PermissionKey, SSE_EVENT_TYPES, SystemRole, type SseEventType } from "@capakraken/shared"; import { PermissionKey, SSE_EVENT_TYPES, SystemRole, type SseEventType } from "@capakraken/shared";
export type SseAudience = string; export type UserSseAudience = `user:${string}`;
export type RoleSseAudience = `role:${string}`;
export type PermissionSseAudience = `permission:${string}`;
export type ResourceSseAudience = `resource:${string}`;
export type SseAudience = UserSseAudience | RoleSseAudience | PermissionSseAudience | ResourceSseAudience;
export interface SseEvent { export interface SseEvent {
type: SseEventType; type: SseEventType;
@@ -42,8 +46,8 @@ interface BufferEntry {
const debounceBuffer = new Map<string, BufferEntry>(); const debounceBuffer = new Map<string, BufferEntry>();
function normalizeAudiences(audiences?: Iterable<SseAudience>): SseAudience[] { export function canonicalizeSseAudiences(audiences?: Iterable<SseAudience>): SseAudience[] {
return [...new Set(Array.from(audiences ?? [], (audience) => audience.trim()).filter(Boolean))].sort(); return [...new Set(Array.from(audiences ?? [], (audience) => audience.trim() as SseAudience).filter(Boolean))].sort();
} }
function getBufferKey(type: SseEventType, audience: readonly SseAudience[]): string { function getBufferKey(type: SseEventType, audience: readonly SseAudience[]): string {
@@ -91,7 +95,7 @@ function extractScopedResourceIds(payload: Record<string, unknown>): string[] {
} }
function buildPlanningAudiences(payload: Record<string, unknown>): SseAudience[] { function buildPlanningAudiences(payload: Record<string, unknown>): SseAudience[] {
return normalizeAudiences([ return canonicalizeSseAudiences([
permissionAudience(PermissionKey.MANAGE_ALLOCATIONS), permissionAudience(PermissionKey.MANAGE_ALLOCATIONS),
...extractScopedResourceIds(payload).map((resourceId) => resourceAudience(resourceId)), ...extractScopedResourceIds(payload).map((resourceId) => resourceAudience(resourceId)),
]); ]);
@@ -181,7 +185,7 @@ function setupSubscriber(): void {
type: parsed.type, type: parsed.type,
payload: parsed.payload, payload: parsed.payload,
timestamp: parsed.timestamp, timestamp: parsed.timestamp,
audience: normalizeAudiences(parsed.audience), audience: canonicalizeSseAudiences(parsed.audience),
}); });
} catch { } catch {
// ignore parse errors // ignore parse errors
@@ -200,7 +204,7 @@ class EventBus {
subscribe(fn: Subscriber, options: SseSubscriptionOptions = {}): () => void { subscribe(fn: Subscriber, options: SseSubscriptionOptions = {}): () => void {
const subscription: Subscription = { const subscription: Subscription = {
fn, fn,
audiences: new Set(normalizeAudiences(options.audiences)), audiences: new Set(canonicalizeSseAudiences(options.audiences)),
includeUnscoped: options.includeUnscoped ?? true, includeUnscoped: options.includeUnscoped ?? true,
}; };
subscribers.add(subscription); subscribers.add(subscription);
@@ -210,7 +214,7 @@ class EventBus {
publish(event: SseEvent): void { publish(event: SseEvent): void {
const normalizedEvent: SseEvent = { const normalizedEvent: SseEvent = {
...event, ...event,
audience: normalizeAudiences(event.audience), audience: canonicalizeSseAudiences(event.audience),
}; };
// Broadcast via Redis (all instances receive via subscriber.on("message")) // Broadcast via Redis (all instances receive via subscriber.on("message"))
@@ -237,7 +241,7 @@ class EventBus {
type, type,
payload, payload,
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
audience: normalizeAudiences(audience), audience: canonicalizeSseAudiences(audience),
}); });
} }
@@ -249,7 +253,7 @@ class EventBus {
// Local delivery with debounce: buffer events of the same type and audience // Local delivery with debounce: buffer events of the same type and audience
// within a 50ms window and then deliver a single (possibly aggregated) event. // within a 50ms window and then deliver a single (possibly aggregated) event.
function publishLocal(event: SseEvent): void { function publishLocal(event: SseEvent): void {
const audience = normalizeAudiences(event.audience); const audience = canonicalizeSseAudiences(event.audience);
const key = getBufferKey(event.type, audience); const key = getBufferKey(event.type, audience);
const existing = debounceBuffer.get(key); const existing = debounceBuffer.get(key);
+2
View File
@@ -0,0 +1,2 @@
export * from "./event-bus.js";
export * from "./subscription-policy.js";
@@ -0,0 +1,45 @@
import { resolvePermissions, type PermissionKey, type PermissionOverrides, SystemRole } from "@capakraken/shared";
import {
canonicalizeSseAudiences,
permissionAudience,
resourceAudience,
roleAudience,
type SseAudience,
type SseSubscriptionOptions,
userAudience,
} from "./event-bus.js";
export interface SseSubscriberIdentity {
userId: string;
systemRole: SystemRole;
permissionOverrides?: PermissionOverrides | null;
resourceId?: string | null;
}
export interface DerivedSseSubscription extends SseSubscriptionOptions {
audiences: SseAudience[];
permissions: Set<PermissionKey>;
includeUnscoped: false;
}
export function deriveUserSseSubscription(
identity: SseSubscriberIdentity,
roleDefaults?: Record<string, PermissionKey[]>,
): DerivedSseSubscription {
const permissions = resolvePermissions(
identity.systemRole,
identity.permissionOverrides ?? null,
roleDefaults,
);
return {
audiences: canonicalizeSseAudiences([
userAudience(identity.userId),
roleAudience(identity.systemRole),
...(identity.resourceId ? [resourceAudience(identity.resourceId)] : []),
...Array.from(permissions, (permission) => permissionAudience(permission)),
]),
permissions,
includeUnscoped: false,
};
}