import { loadRoleDefaults } from "@capakraken/api"; import { deriveUserSseSubscription, eventBus } from "@capakraken/api/sse"; import { startReminderScheduler } from "@capakraken/api/lib/reminder-scheduler"; import { prisma } from "@capakraken/db"; import type { SystemRole } from "@capakraken/shared"; import { SSE_EVENT_TYPES, type PermissionOverrides } from "@capakraken/shared"; import { auth } from "~/server/auth.js"; export const dynamic = "force-dynamic"; export const runtime = "nodejs"; // Bounded connection tracking: a single user opening 100 tabs should not be // able to pin 100 persistent subscriptions on this node. const MAX_SSE_CONNECTIONS_PER_USER = 8; const sseConnectionsByUser = new Map(); export async function GET() { // Start lazily on the first real SSE request so builds/import-time evaluation // never attempt reminder processing against a live database. startReminderScheduler(); const session = await auth(); if (!session?.user) { return new Response("Unauthorized", { status: 401 }); } const sessionUser = session.user as typeof session.user & { id?: string }; if (!sessionUser.id) { return new Response("Unauthorized", { status: 401 }); } const dbUser = await prisma.user.findUnique({ where: { id: sessionUser.id }, select: { id: true, systemRole: true, permissionOverrides: true, resource: { select: { id: true, }, }, }, }); if (!dbUser) { return new Response("Unauthorized", { status: 401 }); } const currentCount = sseConnectionsByUser.get(dbUser.id) ?? 0; if (currentCount >= MAX_SSE_CONNECTIONS_PER_USER) { return new Response("Too many SSE connections", { status: 429, headers: { "Retry-After": "30" }, }); } sseConnectionsByUser.set(dbUser.id, currentCount + 1); const releaseSlot = () => { const next = (sseConnectionsByUser.get(dbUser.id) ?? 1) - 1; if (next <= 0) { sseConnectionsByUser.delete(dbUser.id); } else { sseConnectionsByUser.set(dbUser.id, next); } }; const roleDefaults = await loadRoleDefaults(); const subscription = deriveUserSseSubscription( { userId: dbUser.id, systemRole: dbUser.systemRole as SystemRole, permissionOverrides: dbUser.permissionOverrides as PermissionOverrides | null, resourceId: dbUser.resource?.id ?? null, }, roleDefaults, ); const encoder = new TextEncoder(); const stream = new ReadableStream({ start(controller) { // Send initial connection confirmation controller.enqueue( encoder.encode( `data: ${JSON.stringify({ type: SSE_EVENT_TYPES.PING, timestamp: new Date().toISOString() })}\n\n`, ), ); // Subscribe to event bus const unsubscribe = eventBus.subscribe((event) => { try { controller.enqueue(encoder.encode(`data: ${JSON.stringify(event)}\n\n`)); } catch { // Client disconnected } }, subscription); // Heartbeat every 30 seconds const heartbeat = setInterval(() => { try { controller.enqueue( encoder.encode( `data: ${JSON.stringify({ type: SSE_EVENT_TYPES.PING, timestamp: new Date().toISOString() })}\n\n`, ), ); } catch { clearInterval(heartbeat); unsubscribe(); releaseSlot(); } }, 30000); // Cleanup on close return () => { clearInterval(heartbeat); unsubscribe(); releaseSlot(); }; }, cancel() { releaseSlot(); }, }); return new Response(stream, { headers: { "Content-Type": "text/event-stream", "Cache-Control": "no-cache, no-transform", Connection: "keep-alive", "X-Accel-Buffering": "no", }, }); }