From 60d267fa0af1ffabee5b853d05a19379121ffdb8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hartmut=20N=C3=B6renberg?= Date: Thu, 9 Apr 2026 08:35:39 +0200 Subject: [PATCH] feat(api): add SSE subscriber isolation, token pruning and E2E rate-limit guard - event-bus: wrap each subscriber.fn call in try/catch so one throwing subscriber cannot kill delivery to all others - event-bus: log Redis parse errors instead of swallowing them silently; add .catch() on Redis publish promise for async fallback to local delivery - pruning.ts: new runPruning() deletes expired invite tokens, expired password-reset tokens, and read notifications older than 90 days - settings.runPruning: expose pruning as adminProcedure mutation - trpc.ts: E2E_TEST_MODE rate-limit bypass is now a no-op in production (NODE_ENV=production); logs a startup warning if misconfigured Co-Authored-By: Claude Sonnet 4.6 --- packages/api/src/lib/pruning.ts | 36 +++++++++++++++++++++++++++++ packages/api/src/router/settings.ts | 3 +++ packages/api/src/sse/event-bus.ts | 15 ++++++++---- packages/api/src/trpc.ts | 7 +++++- 4 files changed, 56 insertions(+), 5 deletions(-) create mode 100644 packages/api/src/lib/pruning.ts diff --git a/packages/api/src/lib/pruning.ts b/packages/api/src/lib/pruning.ts new file mode 100644 index 0000000..ef6e0f8 --- /dev/null +++ b/packages/api/src/lib/pruning.ts @@ -0,0 +1,36 @@ +import type { PrismaClient } from "@capakraken/db"; + +const NOTIFICATION_RETENTION_DAYS = 90; + +/** + * Deletes expired invite tokens, expired password-reset tokens, and read + * notifications older than NOTIFICATION_RETENTION_DAYS. + * + * Designed to be called from an admin procedure or a scheduled job. + * Returns counts of deleted rows for observability. + */ +export async function runPruning(db: PrismaClient): Promise<{ + inviteTokensDeleted: number; + passwordResetTokensDeleted: number; + notificationsDeleted: number; +}> { + const now = new Date(); + const notificationCutoff = new Date(now.getTime() - NOTIFICATION_RETENTION_DAYS * 24 * 60 * 60 * 1000); + + const [inviteResult, resetResult, notificationResult] = await Promise.all([ + db.inviteToken.deleteMany({ where: { expiresAt: { lt: now } } }), + db.passwordResetToken.deleteMany({ where: { expiresAt: { lt: now } } }), + db.notification.deleteMany({ + where: { + readAt: { not: null }, + createdAt: { lt: notificationCutoff }, + }, + }), + ]); + + return { + inviteTokensDeleted: inviteResult.count, + passwordResetTokensDeleted: resetResult.count, + notificationsDeleted: notificationResult.count, + }; +} diff --git a/packages/api/src/router/settings.ts b/packages/api/src/router/settings.ts index 1e5b22f..b88213f 100644 --- a/packages/api/src/router/settings.ts +++ b/packages/api/src/router/settings.ts @@ -1,4 +1,5 @@ import { adminProcedure, createTRPCRouter } from "../trpc.js"; +import { runPruning } from "../lib/pruning.js"; import { clearStoredRuntimeSecrets, getAiConfiguredStatus, @@ -26,4 +27,6 @@ export const settingsRouter = createTRPCRouter({ testGeminiConnection: adminProcedure.mutation(({ ctx }) => testSettingsGeminiConnection(ctx)), getAiConfigured: adminProcedure.query(({ ctx }) => getAiConfiguredStatus(ctx)), + + runPruning: adminProcedure.mutation(({ ctx }) => runPruning(ctx.db)), }); diff --git a/packages/api/src/sse/event-bus.ts b/packages/api/src/sse/event-bus.ts index e88aaa8..c5efbd6 100644 --- a/packages/api/src/sse/event-bus.ts +++ b/packages/api/src/sse/event-bus.ts @@ -90,7 +90,11 @@ function matchesSubscription(event: SseEvent, subscription: Subscription): boole function deliverEvent(event: SseEvent): void { for (const subscription of subscribers) { if (matchesSubscription(event, subscription)) { - subscription.fn(event); + try { + subscription.fn(event); + } catch (err) { + logger.warn({ err, eventType: event.type }, "SSE subscriber threw during event delivery"); + } } } } @@ -216,8 +220,8 @@ function setupSubscriber(): void { timestamp: parsed.timestamp, audience: canonicalizeSseAudiences(parsed.audience), }); - } catch { - // ignore parse errors + } catch (err) { + logger.warn({ err, message }, "Failed to parse SSE Redis message"); } }); } catch (e) { @@ -257,7 +261,10 @@ class EventBus { timestamp: normalizedEvent.timestamp, audience: normalizedEvent.audience, }), - ); + ).catch((e: unknown) => { + logger.warn({ err: e, redisUrl: REDIS_URL, channel: CHANNEL }, "Redis publish promise rejected, falling back to local-only SSE delivery"); + publishLocal(normalizedEvent); + }); } catch (e) { logger.warn({ err: e, redisUrl: REDIS_URL, channel: CHANNEL }, "Redis publish failed, falling back to local-only SSE delivery"); // Deliver locally when Redis is unavailable diff --git a/packages/api/src/trpc.ts b/packages/api/src/trpc.ts index 9b48c1e..4bd0dc9 100644 --- a/packages/api/src/trpc.ts +++ b/packages/api/src/trpc.ts @@ -89,7 +89,12 @@ export const publicProcedure = t.procedure; // eslint-disable-next-line @typescript-eslint/no-explicit-any const withLogging = t.middleware(loggingMiddleware as any); -const isE2eTestMode = process.env["E2E_TEST_MODE"] === "true"; +const isE2eTestMode = + process.env["E2E_TEST_MODE"] === "true" && process.env["NODE_ENV"] !== "production"; +if (process.env["E2E_TEST_MODE"] === "true" && process.env["NODE_ENV"] === "production") { + // eslint-disable-next-line no-console + console.warn("[SECURITY] E2E_TEST_MODE is set in production — rate limiting is NOT bypassed."); +} /** * Protected procedure — requires authenticated session AND a valid DB user record.