From fac8c1c3a58c2c701972ba43f32a070798cb1b0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hartmut=20N=C3=B6renberg?= Date: Mon, 30 Mar 2026 00:40:24 +0200 Subject: [PATCH] feat(sse): scope timeline events to affected audiences --- apps/web/src/app/api/sse/timeline/route.ts | 10 +++- apps/web/src/hooks/useTimelineSSE.ts | 3 ++ docs/route-access-matrix.md | 9 ++++ .../src/__tests__/allocation-router.test.ts | 2 +- .../src/__tests__/event-bus-debounce.test.ts | 34 ++++++++++++++ packages/api/src/router/allocation.ts | 15 ++++-- packages/api/src/router/timeline.ts | 1 + packages/api/src/sse/event-bus.ts | 46 +++++++++++++++---- 8 files changed, 106 insertions(+), 14 deletions(-) diff --git a/apps/web/src/app/api/sse/timeline/route.ts b/apps/web/src/app/api/sse/timeline/route.ts index 38ffc13..67ed344 100644 --- a/apps/web/src/app/api/sse/timeline/route.ts +++ b/apps/web/src/app/api/sse/timeline/route.ts @@ -1,5 +1,5 @@ import { loadRoleDefaults } from "@capakraken/api"; -import { eventBus, permissionAudience, roleAudience, userAudience } from "@capakraken/api/sse"; +import { eventBus, permissionAudience, resourceAudience, roleAudience, userAudience } from "@capakraken/api/sse"; import { startReminderScheduler } from "@capakraken/api/lib/reminder-scheduler"; import { prisma } from "@capakraken/db"; import { resolvePermissions, SSE_EVENT_TYPES, SystemRole, type PermissionOverrides } from "@capakraken/shared"; @@ -29,6 +29,11 @@ export async function GET() { id: true, systemRole: true, permissionOverrides: true, + resource: { + select: { + id: true, + }, + }, }, }); @@ -46,6 +51,9 @@ export async function GET() { userAudience(dbUser.id), roleAudience(dbUser.systemRole), ]); + if (dbUser.resource?.id) { + audiences.add(resourceAudience(dbUser.resource.id)); + } for (const permission of permissions) { audiences.add(permissionAudience(permission)); } diff --git a/apps/web/src/hooks/useTimelineSSE.ts b/apps/web/src/hooks/useTimelineSSE.ts index cc8dd60..7f7de54 100644 --- a/apps/web/src/hooks/useTimelineSSE.ts +++ b/apps/web/src/hooks/useTimelineSSE.ts @@ -27,6 +27,9 @@ export function useTimelineSSE() { case SSE_EVENT_TYPES.ALLOCATION_CREATED: case SSE_EVENT_TYPES.ALLOCATION_UPDATED: case SSE_EVENT_TYPES.ALLOCATION_DELETED: + case SSE_EVENT_TYPES.VACATION_CREATED: + case SSE_EVENT_TYPES.VACATION_UPDATED: + case SSE_EVENT_TYPES.VACATION_DELETED: void queryClient.invalidateQueries({ queryKey: [["timeline", "getEntries"]] }); void queryClient.invalidateQueries({ queryKey: [["timeline", "getEntriesView"]] }); void queryClient.invalidateQueries({ queryKey: [["timeline", "getMyEntriesView"]] }); diff --git a/docs/route-access-matrix.md b/docs/route-access-matrix.md index c0c24bc..38fa359 100644 --- a/docs/route-access-matrix.md +++ b/docs/route-access-matrix.md @@ -113,6 +113,15 @@ Routes in [timeline.ts](/home/hartmut/Documents/Copilot/capakraken/packages/api/ | `getShiftPreviewDetail` | `controllerProcedure` | detail variant includes project metadata plus cost/conflict preview | | `getBudgetStatus` | `controllerProcedure` | budget burn/remaining exposure is commercial data | +### Timeline SSE + +The live-update path in [event-bus.ts](/home/hartmut/Documents/Copilot/capakraken/packages/api/src/sse/event-bus.ts) and [route.ts](/home/hartmut/Documents/Copilot/capakraken/apps/web/src/app/api/sse/timeline/route.ts) now follows the same audience model as the timeline reads: + +- planning staff subscribe through role/permission audiences +- linked users additionally subscribe to their own `resource:` audience +- allocation, vacation, and project-shift events fan out to both staff planning audiences and the affected resource audiences +- self-service timeline clients invalidate both personal entries and personal holiday overlays on allocation and vacation events + ## Review Standard - Any new sensitive read route must document one of: diff --git a/packages/api/src/__tests__/allocation-router.test.ts b/packages/api/src/__tests__/allocation-router.test.ts index aa36e12..8c70bc4 100644 --- a/packages/api/src/__tests__/allocation-router.test.ts +++ b/packages/api/src/__tests__/allocation-router.test.ts @@ -999,7 +999,7 @@ describe("allocation entry resolution router", () => { expect(db.assignment.delete).toHaveBeenCalledWith({ where: { id: "assignment_explicit_1" }, }); - expect(emitAllocationDeleted).toHaveBeenCalledWith("assignment_explicit_1", "project_1"); + expect(emitAllocationDeleted).toHaveBeenCalledWith("assignment_explicit_1", "project_1", "resource_1"); }); it("updates an explicit demand row through allocation.update", async () => { diff --git a/packages/api/src/__tests__/event-bus-debounce.test.ts b/packages/api/src/__tests__/event-bus-debounce.test.ts index d53d7ff..14ba2ef 100644 --- a/packages/api/src/__tests__/event-bus-debounce.test.ts +++ b/packages/api/src/__tests__/event-bus-debounce.test.ts @@ -5,6 +5,7 @@ import { eventBus, flushPendingEvents, permissionAudience, + resourceAudience, type SseEvent, userAudience, } from "../sse/event-bus.js"; @@ -231,4 +232,37 @@ describe("event-bus debounce", () => { unsubscribeFirst(); unsubscribeSecond(); }); + + it("delivers planning events to both staff and the affected resource audience", () => { + const managerReceived: SseEvent[] = []; + const resourceReceived: SseEvent[] = []; + const unsubscribeManager = eventBus.subscribe((event) => { + managerReceived.push(event); + }, { + audiences: [permissionAudience("manageAllocations")], + includeUnscoped: false, + }); + const unsubscribeResource = eventBus.subscribe((event) => { + resourceReceived.push(event); + }, { + audiences: [resourceAudience("res_1")], + includeUnscoped: false, + }); + + eventBus.emit( + SSE_EVENT_TYPES.ALLOCATION_UPDATED, + { id: "a1", resourceId: "res_1" }, + [permissionAudience("manageAllocations"), resourceAudience("res_1")], + ); + + vi.advanceTimersByTime(50); + + expect(managerReceived).toHaveLength(1); + expect(resourceReceived).toHaveLength(1); + expect(managerReceived[0]!.type).toBe(SSE_EVENT_TYPES.ALLOCATION_UPDATED); + expect(resourceReceived[0]!.payload).toEqual({ id: "a1", resourceId: "res_1" }); + + unsubscribeManager(); + unsubscribeResource(); + }); }); diff --git a/packages/api/src/router/allocation.ts b/packages/api/src/router/allocation.ts index e21b454..809b997 100644 --- a/packages/api/src/router/allocation.ts +++ b/packages/api/src/router/allocation.ts @@ -1046,6 +1046,13 @@ export const allocationRouter = createTRPCRouter({ .input(z.object({ id: z.string(), data: UpdateAssignmentSchema })) .mutation(async ({ ctx, input }) => { requirePermission(ctx, PermissionKey.MANAGE_ALLOCATIONS); + const existing = await findUniqueOrThrow( + ctx.db.assignment.findUnique({ + where: { id: input.id }, + select: { resourceId: true }, + }), + "Assignment", + ); const updated = await ctx.db.$transaction(async (tx) => { return updateAssignment( @@ -1059,6 +1066,7 @@ export const allocationRouter = createTRPCRouter({ id: updated.id, projectId: updated.projectId, resourceId: updated.resourceId, + resourceIds: [existing.resourceId, updated.resourceId], }); dispatchAllocationWebhookInBackground(ctx.db, "allocation.updated", { id: updated.id, @@ -1192,6 +1200,7 @@ export const allocationRouter = createTRPCRouter({ id: updated.id, projectId: updated.projectId, resourceId: updated.resourceId, + resourceIds: [existing.entry.resourceId, updated.resourceId], }); invalidateDashboardCacheInBackground(); checkBudgetThresholdsInBackground(ctx.db, updated.projectId); @@ -1228,7 +1237,7 @@ export const allocationRouter = createTRPCRouter({ }); }); - emitAllocationDeleted(existing.id, existing.projectId); + emitAllocationDeleted(existing.id, existing.projectId, existing.resourceId); invalidateDashboardCacheInBackground(); checkBudgetThresholdsInBackground(ctx.db, existing.projectId); @@ -1257,7 +1266,7 @@ export const allocationRouter = createTRPCRouter({ }); }); - emitAllocationDeleted(existing.entry.id, existing.projectId); + emitAllocationDeleted(existing.entry.id, existing.projectId, existing.entry.resourceId); invalidateDashboardCacheInBackground(); checkBudgetThresholdsInBackground(ctx.db, existing.projectId); @@ -1292,7 +1301,7 @@ export const allocationRouter = createTRPCRouter({ }); for (const a of existing) { - emitAllocationDeleted(a.entry.id, a.projectId); + emitAllocationDeleted(a.entry.id, a.projectId, a.entry.resourceId); } invalidateDashboardCacheInBackground(); // Check budget thresholds for each affected project diff --git a/packages/api/src/router/timeline.ts b/packages/api/src/router/timeline.ts index a565d51..fc2b756 100644 --- a/packages/api/src/router/timeline.ts +++ b/packages/api/src/router/timeline.ts @@ -1315,6 +1315,7 @@ export const timelineRouter = createTRPCRouter({ newStartDate: newStartDate.toISOString(), newEndDate: newEndDate.toISOString(), costDeltaCents: validation.costImpact.deltaCents, + resourceIds: assignments.map((assignment) => assignment.resourceId), }); return { project: updatedProject, validation }; diff --git a/packages/api/src/sse/event-bus.ts b/packages/api/src/sse/event-bus.ts index 53b7708..b49a29a 100644 --- a/packages/api/src/sse/event-bus.ts +++ b/packages/api/src/sse/event-bus.ts @@ -68,6 +68,34 @@ function deliverEvent(event: SseEvent): void { export const userAudience = (userId: string): SseAudience => `user:${userId}`; export const roleAudience = (role: string): SseAudience => `role:${role}`; export const permissionAudience = (permission: string): SseAudience => `permission:${permission}`; +export const resourceAudience = (resourceId: string): SseAudience => `resource:${resourceId}`; + +function extractScopedResourceIds(payload: Record): string[] { + const resourceIds = new Set(); + + const directResourceId = payload["resourceId"]; + if (typeof directResourceId === "string" && directResourceId.trim().length > 0) { + resourceIds.add(directResourceId); + } + + const listedResourceIds = payload["resourceIds"]; + if (Array.isArray(listedResourceIds)) { + for (const resourceId of listedResourceIds) { + if (typeof resourceId === "string" && resourceId.trim().length > 0) { + resourceIds.add(resourceId); + } + } + } + + return [...resourceIds]; +} + +function buildPlanningAudiences(payload: Record): SseAudience[] { + return normalizeAudiences([ + permissionAudience(PermissionKey.MANAGE_ALLOCATIONS), + ...extractScopedResourceIds(payload).map((resourceId) => resourceAudience(resourceId)), + ]); +} /** Flush a single event type from the buffer and deliver to subscribers. */ function flushEventType(type: SseEventType, audience: readonly SseAudience[]): void { @@ -250,20 +278,20 @@ setupSubscriber(); // Helper emitters export const emitAllocationCreated = (allocation: Record) => - eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, allocation, [permissionAudience(PermissionKey.MANAGE_ALLOCATIONS)]); + eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, allocation, buildPlanningAudiences(allocation)); export const emitAllocationUpdated = (allocation: Record) => - eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, allocation, [permissionAudience(PermissionKey.MANAGE_ALLOCATIONS)]); + eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, allocation, buildPlanningAudiences(allocation)); -export const emitAllocationDeleted = (allocationId: string, projectId: string) => +export const emitAllocationDeleted = (allocationId: string, projectId: string, resourceId?: string | null) => eventBus.emit( SSE_EVENT_TYPES.ALLOCATION_DELETED, - { allocationId, projectId }, - [permissionAudience(PermissionKey.MANAGE_ALLOCATIONS)], + { allocationId, projectId, ...(resourceId ? { resourceId } : {}) }, + buildPlanningAudiences({ allocationId, projectId, ...(resourceId ? { resourceId } : {}) }), ); export const emitProjectShifted = (project: Record) => - eventBus.emit(SSE_EVENT_TYPES.PROJECT_SHIFTED, project, [permissionAudience(PermissionKey.MANAGE_ALLOCATIONS)]); + eventBus.emit(SSE_EVENT_TYPES.PROJECT_SHIFTED, project, buildPlanningAudiences(project)); export const emitBudgetWarning = (projectId: string, payload: Record) => eventBus.emit( @@ -273,16 +301,16 @@ export const emitBudgetWarning = (projectId: string, payload: Record) => - eventBus.emit(SSE_EVENT_TYPES.VACATION_CREATED, vacation, [permissionAudience(PermissionKey.MANAGE_ALLOCATIONS)]); + eventBus.emit(SSE_EVENT_TYPES.VACATION_CREATED, vacation, buildPlanningAudiences(vacation)); export const emitVacationUpdated = (vacation: Record) => - eventBus.emit(SSE_EVENT_TYPES.VACATION_UPDATED, vacation, [permissionAudience(PermissionKey.MANAGE_ALLOCATIONS)]); + eventBus.emit(SSE_EVENT_TYPES.VACATION_UPDATED, vacation, buildPlanningAudiences(vacation)); export const emitVacationDeleted = (vacationId: string, resourceId: string) => eventBus.emit( SSE_EVENT_TYPES.VACATION_DELETED, { vacationId, resourceId }, - [permissionAudience(PermissionKey.MANAGE_ALLOCATIONS)], + buildPlanningAudiences({ vacationId, resourceId }), ); export const emitRoleCreated = (role: Record) =>