refactor(sse): narrow canonical audience scopes

This commit is contained in:
2026-03-31 22:56:12 +02:00
parent a76b173f4b
commit ac29ce3567
5 changed files with 64 additions and 24 deletions
+4 -2
View File
@@ -5,6 +5,7 @@
- SSE subscriptions are derived on the server from the authenticated database user.
- Clients do not provide arbitrary audience lists.
- Every scoped event declares its audience explicitly.
- Invalid audience strings fail fast in the event bus.
- Unscoped delivery is disabled for authenticated timeline subscriptions.
## Audience Model
@@ -12,10 +13,11 @@
Supported canonical audiences:
- `user:<userId>` for personal notifications and tasks
- `role:<systemRole>` for role-wide operational events
- `permission:<permissionKey>` for explicit capability audiences
- `resource:<resourceId>` for self-service or affected-resource updates
Any other prefix is invalid and must be rejected instead of being accepted silently.
## Subscription Flow
1. The web SSE route authenticates the session and loads the database user.
@@ -28,7 +30,7 @@ Supported canonical audiences:
- Personal events such as notifications must target `user:<userId>`.
- Planning events may target both `permission:manageAllocations` and affected `resource:<resourceId>` audiences.
- Broad operational events must use the smallest real audience that matches the use case.
- Broad operational events must use the smallest real audience that matches the use case, typically `permission:*` rather than role fan-out.
## Contract Tests
@@ -265,4 +265,24 @@ describe("event-bus debounce", () => {
unsubscribeManager();
unsubscribeResource();
});
it("rejects invalid subscription audiences", () => {
expect(() =>
eventBus.subscribe(
() => undefined,
{
audiences: ["chapter:capex" as never],
includeUnscoped: false,
},
)).toThrowError("Invalid SSE audience: chapter:capex");
});
it("rejects invalid emitted audiences", () => {
expect(() =>
eventBus.emit(
SSE_EVENT_TYPES.NOTIFICATION_CREATED,
{ notificationId: "n1" },
["user:" as never],
)).toThrowError("Invalid SSE audience: user:");
});
});
@@ -5,7 +5,6 @@ import {
eventBus,
permissionAudience,
resourceAudience,
roleAudience,
type SseEvent,
userAudience,
} from "../sse/event-bus.js";
@@ -36,7 +35,7 @@ describe("sse subscription policy", () => {
vi.useRealTimers();
});
it("derives canonical user, role, resource, and permission audiences server-side", () => {
it("derives canonical user, resource, and permission audiences server-side", () => {
const subscription = deriveUserSseSubscription({
userId: "user_1",
systemRole: SystemRole.USER,
@@ -55,7 +54,6 @@ describe("sse subscription policy", () => {
permissionAudience(PermissionKey.MANAGE_ALLOCATIONS),
permissionAudience(PermissionKey.VIEW_PLANNING),
resourceAudience("res_1"),
roleAudience(SystemRole.USER),
userAudience("user_1"),
]);
});
+39 -17
View File
@@ -1,11 +1,13 @@
import { Redis } from "ioredis";
import { PermissionKey, SSE_EVENT_TYPES, SystemRole, type SseEventType } from "@capakraken/shared";
import { PermissionKey, SSE_EVENT_TYPES, type SseEventType } from "@capakraken/shared";
import { logger } from "../lib/logger.js";
export type UserSseAudience = `user:${string}`;
export type RoleSseAudience = `role:${string}`;
export type PermissionSseAudience = `permission:${string}`;
export type ResourceSseAudience = `resource:${string}`;
export type SseAudience = UserSseAudience | RoleSseAudience | PermissionSseAudience | ResourceSseAudience;
export type SseAudience = UserSseAudience | PermissionSseAudience | ResourceSseAudience;
const SSE_AUDIENCE_PREFIXES = ["user", "permission", "resource"] as const;
type SseAudiencePrefix = (typeof SSE_AUDIENCE_PREFIXES)[number];
export interface SseEvent {
type: SseEventType;
@@ -46,8 +48,32 @@ interface BufferEntry {
const debounceBuffer = new Map<string, BufferEntry>();
function isSseAudiencePrefix(value: string): value is SseAudiencePrefix {
return SSE_AUDIENCE_PREFIXES.includes(value as SseAudiencePrefix);
}
function assertValidSseAudience(audience: string): SseAudience {
const normalized = audience.trim();
if (!normalized) {
throw new Error("Invalid SSE audience: empty value");
}
const separatorIndex = normalized.indexOf(":");
if (separatorIndex <= 0 || separatorIndex === normalized.length - 1) {
throw new Error(`Invalid SSE audience: ${normalized}`);
}
const prefix = normalized.slice(0, separatorIndex);
const scope = normalized.slice(separatorIndex + 1).trim();
if (!isSseAudiencePrefix(prefix) || !scope) {
throw new Error(`Invalid SSE audience: ${normalized}`);
}
return `${prefix}:${scope}` as SseAudience;
}
export function canonicalizeSseAudiences(audiences?: Iterable<SseAudience>): SseAudience[] {
return [...new Set(Array.from(audiences ?? [], (audience) => audience.trim() as SseAudience).filter(Boolean))].sort();
return [...new Set(Array.from(audiences ?? [], (audience) => assertValidSseAudience(audience)).filter(Boolean))].sort();
}
function getBufferKey(type: SseEventType, audience: readonly SseAudience[]): string {
@@ -70,7 +96,6 @@ function deliverEvent(event: SseEvent): void {
}
export const userAudience = (userId: string): SseAudience => `user:${userId}`;
export const roleAudience = (role: string): SseAudience => `role:${role}`;
export const permissionAudience = (permission: string): SseAudience => `permission:${permission}`;
export const resourceAudience = (resourceId: string): SseAudience => `resource:${resourceId}`;
@@ -160,7 +185,9 @@ let subscriber: Redis | null = null;
function getPublisher(): Redis {
if (!publisher) {
publisher = new Redis(REDIS_URL, { lazyConnect: false, enableReadyCheck: false });
publisher.on("error", (e: unknown) => console.error("[Redis publisher]", e));
publisher.on("error", (e: unknown) => {
logger.warn({ err: e, redisUrl: REDIS_URL, channel: CHANNEL }, "Redis publisher emitted an error");
});
}
return publisher;
}
@@ -169,9 +196,11 @@ function setupSubscriber(): void {
if (subscriber) return;
try {
subscriber = new Redis(REDIS_URL, { lazyConnect: false, enableReadyCheck: false });
subscriber.on("error", (e: unknown) => console.error("[Redis subscriber]", e));
subscriber.on("error", (e: unknown) => {
logger.warn({ err: e, redisUrl: REDIS_URL, channel: CHANNEL }, "Redis subscriber emitted an error");
});
void subscriber.subscribe(CHANNEL).catch((err: unknown) => {
console.error("[Redis subscribe]", err);
logger.warn({ err, redisUrl: REDIS_URL, channel: CHANNEL }, "Redis subscribe failed for SSE event bus");
});
subscriber.on("message", (_channel: string, message: string) => {
try {
@@ -192,7 +221,7 @@ function setupSubscriber(): void {
}
});
} catch (e) {
console.warn("[Redis setupSubscriber] Redis unavailable, SSE will be local-only:", e);
logger.warn({ err: e, redisUrl: REDIS_URL, channel: CHANNEL }, "Redis unavailable, SSE event bus will run in local-only mode");
}
}
@@ -230,7 +259,7 @@ class EventBus {
}),
);
} catch (e) {
console.warn("[Redis emit] fallback to local-only:", e);
logger.warn({ err: e, redisUrl: REDIS_URL, channel: CHANNEL }, "Redis publish failed, falling back to local-only SSE delivery");
// Deliver locally when Redis is unavailable
publishLocal(normalizedEvent);
}
@@ -345,10 +374,3 @@ export function emitTaskStatusChanged(userId: string, notificationId: string): v
export function emitReminderDue(userId: string, notificationId: string): void {
eventBus.emit(SSE_EVENT_TYPES.REMINDER_DUE, { userId, notificationId }, [userAudience(userId)]);
}
export function emitBroadcastSent(broadcastId: string, recipientCount: number): void {
eventBus.emit(SSE_EVENT_TYPES.BROADCAST_SENT, { broadcastId, recipientCount }, [
roleAudience(SystemRole.ADMIN),
roleAudience(SystemRole.MANAGER),
]);
}
@@ -3,7 +3,6 @@ import {
canonicalizeSseAudiences,
permissionAudience,
resourceAudience,
roleAudience,
type SseAudience,
type SseSubscriptionOptions,
userAudience,
@@ -35,7 +34,6 @@ export function deriveUserSseSubscription(
return {
audiences: canonicalizeSseAudiences([
userAudience(identity.userId),
roleAudience(identity.systemRole),
...(identity.resourceId ? [resourceAudience(identity.resourceId)] : []),
...Array.from(permissions, (permission) => permissionAudience(permission)),
]),