269 lines
8.3 KiB
TypeScript
269 lines
8.3 KiB
TypeScript
import { SSE_EVENT_TYPES } from "@capakraken/shared";
|
|
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
|
import {
|
|
cancelPendingEvents,
|
|
eventBus,
|
|
flushPendingEvents,
|
|
permissionAudience,
|
|
resourceAudience,
|
|
type SseEvent,
|
|
userAudience,
|
|
} from "../sse/event-bus.js";
|
|
|
|
// Mock Redis so the module loads without a real connection.
|
|
// publish() throws so the event bus falls back to local-only delivery,
|
|
// which is the path that exercises the debounce buffer.
|
|
vi.mock("ioredis", () => {
|
|
const RedisMock = vi.fn().mockImplementation(() => ({
|
|
on: vi.fn(),
|
|
subscribe: vi.fn().mockResolvedValue(undefined),
|
|
publish: vi.fn().mockImplementation(() => {
|
|
throw new Error("Redis unavailable (test)");
|
|
}),
|
|
}));
|
|
return { Redis: RedisMock };
|
|
});
|
|
|
|
describe("event-bus debounce", () => {
|
|
let received: SseEvent[];
|
|
let unsubscribe: () => void;
|
|
let consoleWarnSpy: ReturnType<typeof vi.spyOn>;
|
|
|
|
beforeEach(() => {
|
|
vi.useFakeTimers();
|
|
received = [];
|
|
consoleWarnSpy = vi.spyOn(console, "warn").mockImplementation(() => undefined);
|
|
unsubscribe = eventBus.subscribe((event) => {
|
|
received.push(event);
|
|
});
|
|
});
|
|
|
|
afterEach(() => {
|
|
unsubscribe();
|
|
cancelPendingEvents();
|
|
consoleWarnSpy.mockRestore();
|
|
vi.useRealTimers();
|
|
});
|
|
|
|
it("delivers a single event after the debounce window", () => {
|
|
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, { id: "a1" });
|
|
|
|
// Not yet delivered — still in the debounce window
|
|
expect(received).toHaveLength(0);
|
|
|
|
vi.advanceTimersByTime(50);
|
|
|
|
// Now delivered
|
|
expect(received).toHaveLength(1);
|
|
expect(received[0]!.type).toBe(SSE_EVENT_TYPES.ALLOCATION_CREATED);
|
|
expect(received[0]!.payload).toEqual({ id: "a1" });
|
|
});
|
|
|
|
it("aggregates multiple events of the same type into a single _batch event", () => {
|
|
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, { id: "a1" });
|
|
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, { id: "a2" });
|
|
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, { id: "a3" });
|
|
|
|
expect(received).toHaveLength(0);
|
|
|
|
vi.advanceTimersByTime(50);
|
|
|
|
expect(received).toHaveLength(1);
|
|
expect(received[0]!.type).toBe(SSE_EVENT_TYPES.ALLOCATION_UPDATED);
|
|
expect(received[0]!.payload).toEqual({
|
|
_batch: [{ id: "a1" }, { id: "a2" }, { id: "a3" }],
|
|
});
|
|
});
|
|
|
|
it("keeps different event types separate", () => {
|
|
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, { id: "a1" });
|
|
eventBus.emit(SSE_EVENT_TYPES.ROLE_UPDATED, { id: "r1" });
|
|
|
|
vi.advanceTimersByTime(50);
|
|
|
|
expect(received).toHaveLength(2);
|
|
const types = received.map((e) => e.type);
|
|
expect(types).toContain(SSE_EVENT_TYPES.ALLOCATION_CREATED);
|
|
expect(types).toContain(SSE_EVENT_TYPES.ROLE_UPDATED);
|
|
|
|
// Both should be single payloads (not batched)
|
|
for (const event of received) {
|
|
expect(event.payload).not.toHaveProperty("_batch");
|
|
}
|
|
});
|
|
|
|
it("resets the debounce timer when new events arrive within the window", () => {
|
|
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, { id: "a1" });
|
|
|
|
// Advance 30ms (still within window)
|
|
vi.advanceTimersByTime(30);
|
|
expect(received).toHaveLength(0);
|
|
|
|
// Emit another — this resets the timer
|
|
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, { id: "a2" });
|
|
|
|
// Advance another 30ms (60ms total from first, but only 30ms from second)
|
|
vi.advanceTimersByTime(30);
|
|
expect(received).toHaveLength(0);
|
|
|
|
// Advance remaining 20ms (now 50ms from second event)
|
|
vi.advanceTimersByTime(20);
|
|
expect(received).toHaveLength(1);
|
|
expect(received[0]!.payload).toEqual({
|
|
_batch: [{ id: "a1" }, { id: "a2" }],
|
|
});
|
|
});
|
|
|
|
it("flushPendingEvents delivers immediately", () => {
|
|
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, { id: "a1" });
|
|
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, { id: "a2" });
|
|
|
|
expect(received).toHaveLength(0);
|
|
|
|
flushPendingEvents();
|
|
|
|
expect(received).toHaveLength(1);
|
|
expect(received[0]!.payload).toEqual({
|
|
_batch: [{ id: "a1" }, { id: "a2" }],
|
|
});
|
|
|
|
// Timer should not fire again
|
|
vi.advanceTimersByTime(100);
|
|
expect(received).toHaveLength(1);
|
|
});
|
|
|
|
it("cancelPendingEvents discards events without delivering", () => {
|
|
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, { id: "a1" });
|
|
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, { id: "a2" });
|
|
|
|
cancelPendingEvents();
|
|
|
|
vi.advanceTimersByTime(100);
|
|
expect(received).toHaveLength(0);
|
|
});
|
|
|
|
it("preserves the timestamp of the first event in a batch", () => {
|
|
const before = new Date().toISOString();
|
|
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, { id: "a1" });
|
|
|
|
// Advance a bit, emit another
|
|
vi.advanceTimersByTime(10);
|
|
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, { id: "a2" });
|
|
|
|
vi.advanceTimersByTime(50);
|
|
|
|
expect(received).toHaveLength(1);
|
|
// The timestamp should be from the first event (not later)
|
|
expect(received[0]!.timestamp).toBe(before);
|
|
});
|
|
|
|
it("delivers scoped events only to matching audiences", () => {
|
|
const managerReceived: SseEvent[] = [];
|
|
const userReceived: SseEvent[] = [];
|
|
const unsubscribeManager = eventBus.subscribe((event) => {
|
|
managerReceived.push(event);
|
|
}, {
|
|
audiences: [permissionAudience("manageAllocations")],
|
|
includeUnscoped: false,
|
|
});
|
|
const unsubscribeUser = eventBus.subscribe((event) => {
|
|
userReceived.push(event);
|
|
}, {
|
|
audiences: [userAudience("user_1")],
|
|
includeUnscoped: false,
|
|
});
|
|
|
|
eventBus.emit(
|
|
SSE_EVENT_TYPES.ALLOCATION_CREATED,
|
|
{ id: "a1" },
|
|
[permissionAudience("manageAllocations")],
|
|
);
|
|
eventBus.emit(
|
|
SSE_EVENT_TYPES.NOTIFICATION_CREATED,
|
|
{ notificationId: "n1" },
|
|
[userAudience("user_1")],
|
|
);
|
|
|
|
vi.advanceTimersByTime(50);
|
|
|
|
expect(managerReceived).toHaveLength(1);
|
|
expect(managerReceived[0]!.type).toBe(SSE_EVENT_TYPES.ALLOCATION_CREATED);
|
|
expect(userReceived).toHaveLength(1);
|
|
expect(userReceived[0]!.type).toBe(SSE_EVENT_TYPES.NOTIFICATION_CREATED);
|
|
|
|
unsubscribeManager();
|
|
unsubscribeUser();
|
|
});
|
|
|
|
it("does not batch events from different audiences together", () => {
|
|
const firstUserReceived: SseEvent[] = [];
|
|
const secondUserReceived: SseEvent[] = [];
|
|
const unsubscribeFirst = eventBus.subscribe((event) => {
|
|
firstUserReceived.push(event);
|
|
}, {
|
|
audiences: [userAudience("user_1")],
|
|
includeUnscoped: false,
|
|
});
|
|
const unsubscribeSecond = eventBus.subscribe((event) => {
|
|
secondUserReceived.push(event);
|
|
}, {
|
|
audiences: [userAudience("user_2")],
|
|
includeUnscoped: false,
|
|
});
|
|
|
|
eventBus.emit(
|
|
SSE_EVENT_TYPES.NOTIFICATION_CREATED,
|
|
{ notificationId: "n1" },
|
|
[userAudience("user_1")],
|
|
);
|
|
eventBus.emit(
|
|
SSE_EVENT_TYPES.NOTIFICATION_CREATED,
|
|
{ notificationId: "n2" },
|
|
[userAudience("user_2")],
|
|
);
|
|
|
|
vi.advanceTimersByTime(50);
|
|
|
|
expect(firstUserReceived).toHaveLength(1);
|
|
expect(firstUserReceived[0]!.payload).toEqual({ notificationId: "n1" });
|
|
expect(secondUserReceived).toHaveLength(1);
|
|
expect(secondUserReceived[0]!.payload).toEqual({ notificationId: "n2" });
|
|
|
|
unsubscribeFirst();
|
|
unsubscribeSecond();
|
|
});
|
|
|
|
it("delivers planning events to both staff and the affected resource audience", () => {
|
|
const managerReceived: SseEvent[] = [];
|
|
const resourceReceived: SseEvent[] = [];
|
|
const unsubscribeManager = eventBus.subscribe((event) => {
|
|
managerReceived.push(event);
|
|
}, {
|
|
audiences: [permissionAudience("manageAllocations")],
|
|
includeUnscoped: false,
|
|
});
|
|
const unsubscribeResource = eventBus.subscribe((event) => {
|
|
resourceReceived.push(event);
|
|
}, {
|
|
audiences: [resourceAudience("res_1")],
|
|
includeUnscoped: false,
|
|
});
|
|
|
|
eventBus.emit(
|
|
SSE_EVENT_TYPES.ALLOCATION_UPDATED,
|
|
{ id: "a1", resourceId: "res_1" },
|
|
[permissionAudience("manageAllocations"), resourceAudience("res_1")],
|
|
);
|
|
|
|
vi.advanceTimersByTime(50);
|
|
|
|
expect(managerReceived).toHaveLength(1);
|
|
expect(resourceReceived).toHaveLength(1);
|
|
expect(managerReceived[0]!.type).toBe(SSE_EVENT_TYPES.ALLOCATION_UPDATED);
|
|
expect(resourceReceived[0]!.payload).toEqual({ id: "a1", resourceId: "res_1" });
|
|
|
|
unsubscribeManager();
|
|
unsubscribeResource();
|
|
});
|
|
});
|