feat(api): add SSE subscriber isolation, token pruning and E2E rate-limit guard
- event-bus: wrap each subscriber.fn call in try/catch so one throwing subscriber cannot kill delivery to all others - event-bus: log Redis parse errors instead of swallowing them silently; add .catch() on Redis publish promise for async fallback to local delivery - pruning.ts: new runPruning() deletes expired invite tokens, expired password-reset tokens, and read notifications older than 90 days - settings.runPruning: expose pruning as adminProcedure mutation - trpc.ts: E2E_TEST_MODE rate-limit bypass is now a no-op in production (NODE_ENV=production); logs a startup warning if misconfigured Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,36 @@
|
||||
import type { PrismaClient } from "@capakraken/db";
|
||||
|
||||
const NOTIFICATION_RETENTION_DAYS = 90;
|
||||
|
||||
/**
|
||||
* Deletes expired invite tokens, expired password-reset tokens, and read
|
||||
* notifications older than NOTIFICATION_RETENTION_DAYS.
|
||||
*
|
||||
* Designed to be called from an admin procedure or a scheduled job.
|
||||
* Returns counts of deleted rows for observability.
|
||||
*/
|
||||
export async function runPruning(db: PrismaClient): Promise<{
|
||||
inviteTokensDeleted: number;
|
||||
passwordResetTokensDeleted: number;
|
||||
notificationsDeleted: number;
|
||||
}> {
|
||||
const now = new Date();
|
||||
const notificationCutoff = new Date(now.getTime() - NOTIFICATION_RETENTION_DAYS * 24 * 60 * 60 * 1000);
|
||||
|
||||
const [inviteResult, resetResult, notificationResult] = await Promise.all([
|
||||
db.inviteToken.deleteMany({ where: { expiresAt: { lt: now } } }),
|
||||
db.passwordResetToken.deleteMany({ where: { expiresAt: { lt: now } } }),
|
||||
db.notification.deleteMany({
|
||||
where: {
|
||||
readAt: { not: null },
|
||||
createdAt: { lt: notificationCutoff },
|
||||
},
|
||||
}),
|
||||
]);
|
||||
|
||||
return {
|
||||
inviteTokensDeleted: inviteResult.count,
|
||||
passwordResetTokensDeleted: resetResult.count,
|
||||
notificationsDeleted: notificationResult.count,
|
||||
};
|
||||
}
|
||||
@@ -1,4 +1,5 @@
|
||||
import { adminProcedure, createTRPCRouter } from "../trpc.js";
|
||||
import { runPruning } from "../lib/pruning.js";
|
||||
import {
|
||||
clearStoredRuntimeSecrets,
|
||||
getAiConfiguredStatus,
|
||||
@@ -26,4 +27,6 @@ export const settingsRouter = createTRPCRouter({
|
||||
testGeminiConnection: adminProcedure.mutation(({ ctx }) => testSettingsGeminiConnection(ctx)),
|
||||
|
||||
getAiConfigured: adminProcedure.query(({ ctx }) => getAiConfiguredStatus(ctx)),
|
||||
|
||||
runPruning: adminProcedure.mutation(({ ctx }) => runPruning(ctx.db)),
|
||||
});
|
||||
|
||||
@@ -90,7 +90,11 @@ function matchesSubscription(event: SseEvent, subscription: Subscription): boole
|
||||
function deliverEvent(event: SseEvent): void {
|
||||
for (const subscription of subscribers) {
|
||||
if (matchesSubscription(event, subscription)) {
|
||||
subscription.fn(event);
|
||||
try {
|
||||
subscription.fn(event);
|
||||
} catch (err) {
|
||||
logger.warn({ err, eventType: event.type }, "SSE subscriber threw during event delivery");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -216,8 +220,8 @@ function setupSubscriber(): void {
|
||||
timestamp: parsed.timestamp,
|
||||
audience: canonicalizeSseAudiences(parsed.audience),
|
||||
});
|
||||
} catch {
|
||||
// ignore parse errors
|
||||
} catch (err) {
|
||||
logger.warn({ err, message }, "Failed to parse SSE Redis message");
|
||||
}
|
||||
});
|
||||
} catch (e) {
|
||||
@@ -257,7 +261,10 @@ class EventBus {
|
||||
timestamp: normalizedEvent.timestamp,
|
||||
audience: normalizedEvent.audience,
|
||||
}),
|
||||
);
|
||||
).catch((e: unknown) => {
|
||||
logger.warn({ err: e, redisUrl: REDIS_URL, channel: CHANNEL }, "Redis publish promise rejected, falling back to local-only SSE delivery");
|
||||
publishLocal(normalizedEvent);
|
||||
});
|
||||
} catch (e) {
|
||||
logger.warn({ err: e, redisUrl: REDIS_URL, channel: CHANNEL }, "Redis publish failed, falling back to local-only SSE delivery");
|
||||
// Deliver locally when Redis is unavailable
|
||||
|
||||
@@ -89,7 +89,12 @@ export const publicProcedure = t.procedure;
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
const withLogging = t.middleware(loggingMiddleware as any);
|
||||
const isE2eTestMode = process.env["E2E_TEST_MODE"] === "true";
|
||||
const isE2eTestMode =
|
||||
process.env["E2E_TEST_MODE"] === "true" && process.env["NODE_ENV"] !== "production";
|
||||
if (process.env["E2E_TEST_MODE"] === "true" && process.env["NODE_ENV"] === "production") {
|
||||
// eslint-disable-next-line no-console
|
||||
console.warn("[SECURITY] E2E_TEST_MODE is set in production — rate limiting is NOT bypassed.");
|
||||
}
|
||||
|
||||
/**
|
||||
* Protected procedure — requires authenticated session AND a valid DB user record.
|
||||
|
||||
Reference in New Issue
Block a user