feat(sse): scope timeline events to affected audiences

This commit is contained in:
2026-03-30 00:40:24 +02:00
parent 819345acfa
commit fac8c1c3a5
8 changed files with 106 additions and 14 deletions
@@ -999,7 +999,7 @@ describe("allocation entry resolution router", () => {
expect(db.assignment.delete).toHaveBeenCalledWith({
where: { id: "assignment_explicit_1" },
});
expect(emitAllocationDeleted).toHaveBeenCalledWith("assignment_explicit_1", "project_1");
expect(emitAllocationDeleted).toHaveBeenCalledWith("assignment_explicit_1", "project_1", "resource_1");
});
it("updates an explicit demand row through allocation.update", async () => {
@@ -5,6 +5,7 @@ import {
eventBus,
flushPendingEvents,
permissionAudience,
resourceAudience,
type SseEvent,
userAudience,
} from "../sse/event-bus.js";
@@ -231,4 +232,37 @@ describe("event-bus debounce", () => {
unsubscribeFirst();
unsubscribeSecond();
});
it("delivers planning events to both staff and the affected resource audience", () => {
const managerReceived: SseEvent[] = [];
const resourceReceived: SseEvent[] = [];
const unsubscribeManager = eventBus.subscribe((event) => {
managerReceived.push(event);
}, {
audiences: [permissionAudience("manageAllocations")],
includeUnscoped: false,
});
const unsubscribeResource = eventBus.subscribe((event) => {
resourceReceived.push(event);
}, {
audiences: [resourceAudience("res_1")],
includeUnscoped: false,
});
eventBus.emit(
SSE_EVENT_TYPES.ALLOCATION_UPDATED,
{ id: "a1", resourceId: "res_1" },
[permissionAudience("manageAllocations"), resourceAudience("res_1")],
);
vi.advanceTimersByTime(50);
expect(managerReceived).toHaveLength(1);
expect(resourceReceived).toHaveLength(1);
expect(managerReceived[0]!.type).toBe(SSE_EVENT_TYPES.ALLOCATION_UPDATED);
expect(resourceReceived[0]!.payload).toEqual({ id: "a1", resourceId: "res_1" });
unsubscribeManager();
unsubscribeResource();
});
});
+12 -3
View File
@@ -1046,6 +1046,13 @@ export const allocationRouter = createTRPCRouter({
.input(z.object({ id: z.string(), data: UpdateAssignmentSchema }))
.mutation(async ({ ctx, input }) => {
requirePermission(ctx, PermissionKey.MANAGE_ALLOCATIONS);
const existing = await findUniqueOrThrow(
ctx.db.assignment.findUnique({
where: { id: input.id },
select: { resourceId: true },
}),
"Assignment",
);
const updated = await ctx.db.$transaction(async (tx) => {
return updateAssignment(
@@ -1059,6 +1066,7 @@ export const allocationRouter = createTRPCRouter({
id: updated.id,
projectId: updated.projectId,
resourceId: updated.resourceId,
resourceIds: [existing.resourceId, updated.resourceId],
});
dispatchAllocationWebhookInBackground(ctx.db, "allocation.updated", {
id: updated.id,
@@ -1192,6 +1200,7 @@ export const allocationRouter = createTRPCRouter({
id: updated.id,
projectId: updated.projectId,
resourceId: updated.resourceId,
resourceIds: [existing.entry.resourceId, updated.resourceId],
});
invalidateDashboardCacheInBackground();
checkBudgetThresholdsInBackground(ctx.db, updated.projectId);
@@ -1228,7 +1237,7 @@ export const allocationRouter = createTRPCRouter({
});
});
emitAllocationDeleted(existing.id, existing.projectId);
emitAllocationDeleted(existing.id, existing.projectId, existing.resourceId);
invalidateDashboardCacheInBackground();
checkBudgetThresholdsInBackground(ctx.db, existing.projectId);
@@ -1257,7 +1266,7 @@ export const allocationRouter = createTRPCRouter({
});
});
emitAllocationDeleted(existing.entry.id, existing.projectId);
emitAllocationDeleted(existing.entry.id, existing.projectId, existing.entry.resourceId);
invalidateDashboardCacheInBackground();
checkBudgetThresholdsInBackground(ctx.db, existing.projectId);
@@ -1292,7 +1301,7 @@ export const allocationRouter = createTRPCRouter({
});
for (const a of existing) {
emitAllocationDeleted(a.entry.id, a.projectId);
emitAllocationDeleted(a.entry.id, a.projectId, a.entry.resourceId);
}
invalidateDashboardCacheInBackground();
// Check budget thresholds for each affected project
+1
View File
@@ -1315,6 +1315,7 @@ export const timelineRouter = createTRPCRouter({
newStartDate: newStartDate.toISOString(),
newEndDate: newEndDate.toISOString(),
costDeltaCents: validation.costImpact.deltaCents,
resourceIds: assignments.map((assignment) => assignment.resourceId),
});
return { project: updatedProject, validation };
+37 -9
View File
@@ -68,6 +68,34 @@ function deliverEvent(event: SseEvent): void {
export const userAudience = (userId: string): SseAudience => `user:${userId}`;
export const roleAudience = (role: string): SseAudience => `role:${role}`;
export const permissionAudience = (permission: string): SseAudience => `permission:${permission}`;
export const resourceAudience = (resourceId: string): SseAudience => `resource:${resourceId}`;
function extractScopedResourceIds(payload: Record<string, unknown>): string[] {
const resourceIds = new Set<string>();
const directResourceId = payload["resourceId"];
if (typeof directResourceId === "string" && directResourceId.trim().length > 0) {
resourceIds.add(directResourceId);
}
const listedResourceIds = payload["resourceIds"];
if (Array.isArray(listedResourceIds)) {
for (const resourceId of listedResourceIds) {
if (typeof resourceId === "string" && resourceId.trim().length > 0) {
resourceIds.add(resourceId);
}
}
}
return [...resourceIds];
}
function buildPlanningAudiences(payload: Record<string, unknown>): SseAudience[] {
return normalizeAudiences([
permissionAudience(PermissionKey.MANAGE_ALLOCATIONS),
...extractScopedResourceIds(payload).map((resourceId) => resourceAudience(resourceId)),
]);
}
/** Flush a single event type from the buffer and deliver to subscribers. */
function flushEventType(type: SseEventType, audience: readonly SseAudience[]): void {
@@ -250,20 +278,20 @@ setupSubscriber();
// Helper emitters
export const emitAllocationCreated = (allocation: Record<string, unknown>) =>
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, allocation, [permissionAudience(PermissionKey.MANAGE_ALLOCATIONS)]);
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, allocation, buildPlanningAudiences(allocation));
export const emitAllocationUpdated = (allocation: Record<string, unknown>) =>
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, allocation, [permissionAudience(PermissionKey.MANAGE_ALLOCATIONS)]);
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, allocation, buildPlanningAudiences(allocation));
export const emitAllocationDeleted = (allocationId: string, projectId: string) =>
export const emitAllocationDeleted = (allocationId: string, projectId: string, resourceId?: string | null) =>
eventBus.emit(
SSE_EVENT_TYPES.ALLOCATION_DELETED,
{ allocationId, projectId },
[permissionAudience(PermissionKey.MANAGE_ALLOCATIONS)],
{ allocationId, projectId, ...(resourceId ? { resourceId } : {}) },
buildPlanningAudiences({ allocationId, projectId, ...(resourceId ? { resourceId } : {}) }),
);
export const emitProjectShifted = (project: Record<string, unknown>) =>
eventBus.emit(SSE_EVENT_TYPES.PROJECT_SHIFTED, project, [permissionAudience(PermissionKey.MANAGE_ALLOCATIONS)]);
eventBus.emit(SSE_EVENT_TYPES.PROJECT_SHIFTED, project, buildPlanningAudiences(project));
export const emitBudgetWarning = (projectId: string, payload: Record<string, unknown>) =>
eventBus.emit(
@@ -273,16 +301,16 @@ export const emitBudgetWarning = (projectId: string, payload: Record<string, unk
);
export const emitVacationCreated = (vacation: Record<string, unknown>) =>
eventBus.emit(SSE_EVENT_TYPES.VACATION_CREATED, vacation, [permissionAudience(PermissionKey.MANAGE_ALLOCATIONS)]);
eventBus.emit(SSE_EVENT_TYPES.VACATION_CREATED, vacation, buildPlanningAudiences(vacation));
export const emitVacationUpdated = (vacation: Record<string, unknown>) =>
eventBus.emit(SSE_EVENT_TYPES.VACATION_UPDATED, vacation, [permissionAudience(PermissionKey.MANAGE_ALLOCATIONS)]);
eventBus.emit(SSE_EVENT_TYPES.VACATION_UPDATED, vacation, buildPlanningAudiences(vacation));
export const emitVacationDeleted = (vacationId: string, resourceId: string) =>
eventBus.emit(
SSE_EVENT_TYPES.VACATION_DELETED,
{ vacationId, resourceId },
[permissionAudience(PermissionKey.MANAGE_ALLOCATIONS)],
buildPlanningAudiences({ vacationId, resourceId }),
);
export const emitRoleCreated = (role: Record<string, unknown>) =>