import { SSE_EVENT_TYPES } from "@capakraken/shared"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { cancelPendingEvents, eventBus, flushPendingEvents, permissionAudience, resourceAudience, type SseEvent, userAudience, } from "../sse/event-bus.js"; // Mock Redis so the module loads without a real connection. // publish() throws so the event bus falls back to local-only delivery, // which is the path that exercises the debounce buffer. vi.mock("ioredis", () => { const RedisMock = vi.fn().mockImplementation(() => ({ on: vi.fn(), subscribe: vi.fn().mockResolvedValue(undefined), publish: vi.fn().mockImplementation(() => { throw new Error("Redis unavailable (test)"); }), })); return { Redis: RedisMock }; }); describe("event-bus debounce", () => { let received: SseEvent[]; let unsubscribe: () => void; let consoleWarnSpy: ReturnType; beforeEach(() => { vi.useFakeTimers(); received = []; consoleWarnSpy = vi.spyOn(console, "warn").mockImplementation(() => undefined); unsubscribe = eventBus.subscribe((event) => { received.push(event); }); }); afterEach(() => { unsubscribe(); cancelPendingEvents(); consoleWarnSpy.mockRestore(); vi.useRealTimers(); }); it("delivers a single event after the debounce window", () => { eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, { id: "a1" }); // Not yet delivered — still in the debounce window expect(received).toHaveLength(0); vi.advanceTimersByTime(50); // Now delivered expect(received).toHaveLength(1); expect(received[0]!.type).toBe(SSE_EVENT_TYPES.ALLOCATION_CREATED); expect(received[0]!.payload).toEqual({ id: "a1" }); }); it("aggregates multiple events of the same type into a single _batch event", () => { eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, { id: "a1" }); eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, { id: "a2" }); eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, { id: "a3" }); expect(received).toHaveLength(0); vi.advanceTimersByTime(50); expect(received).toHaveLength(1); expect(received[0]!.type).toBe(SSE_EVENT_TYPES.ALLOCATION_UPDATED); expect(received[0]!.payload).toEqual({ _batch: [{ id: "a1" }, { id: "a2" }, { id: "a3" }], }); }); it("keeps different event types separate", () => { eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, { id: "a1" }); eventBus.emit(SSE_EVENT_TYPES.ROLE_UPDATED, { id: "r1" }); vi.advanceTimersByTime(50); expect(received).toHaveLength(2); const types = received.map((e) => e.type); expect(types).toContain(SSE_EVENT_TYPES.ALLOCATION_CREATED); expect(types).toContain(SSE_EVENT_TYPES.ROLE_UPDATED); // Both should be single payloads (not batched) for (const event of received) { expect(event.payload).not.toHaveProperty("_batch"); } }); it("resets the debounce timer when new events arrive within the window", () => { eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, { id: "a1" }); // Advance 30ms (still within window) vi.advanceTimersByTime(30); expect(received).toHaveLength(0); // Emit another — this resets the timer eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, { id: "a2" }); // Advance another 30ms (60ms total from first, but only 30ms from second) vi.advanceTimersByTime(30); expect(received).toHaveLength(0); // Advance remaining 20ms (now 50ms from second event) vi.advanceTimersByTime(20); expect(received).toHaveLength(1); expect(received[0]!.payload).toEqual({ _batch: [{ id: "a1" }, { id: "a2" }], }); }); it("flushPendingEvents delivers immediately", () => { eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, { id: "a1" }); eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, { id: "a2" }); expect(received).toHaveLength(0); flushPendingEvents(); expect(received).toHaveLength(1); expect(received[0]!.payload).toEqual({ _batch: [{ id: "a1" }, { id: "a2" }], }); // Timer should not fire again vi.advanceTimersByTime(100); expect(received).toHaveLength(1); }); it("cancelPendingEvents discards events without delivering", () => { eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, { id: "a1" }); eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, { id: "a2" }); cancelPendingEvents(); vi.advanceTimersByTime(100); expect(received).toHaveLength(0); }); it("preserves the timestamp of the first event in a batch", () => { const before = new Date().toISOString(); eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, { id: "a1" }); // Advance a bit, emit another vi.advanceTimersByTime(10); eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, { id: "a2" }); vi.advanceTimersByTime(50); expect(received).toHaveLength(1); // The timestamp should be from the first event (not later) expect(received[0]!.timestamp).toBe(before); }); it("delivers scoped events only to matching audiences", () => { const managerReceived: SseEvent[] = []; const userReceived: SseEvent[] = []; const unsubscribeManager = eventBus.subscribe((event) => { managerReceived.push(event); }, { audiences: [permissionAudience("manageAllocations")], includeUnscoped: false, }); const unsubscribeUser = eventBus.subscribe((event) => { userReceived.push(event); }, { audiences: [userAudience("user_1")], includeUnscoped: false, }); eventBus.emit( SSE_EVENT_TYPES.ALLOCATION_CREATED, { id: "a1" }, [permissionAudience("manageAllocations")], ); eventBus.emit( SSE_EVENT_TYPES.NOTIFICATION_CREATED, { notificationId: "n1" }, [userAudience("user_1")], ); vi.advanceTimersByTime(50); expect(managerReceived).toHaveLength(1); expect(managerReceived[0]!.type).toBe(SSE_EVENT_TYPES.ALLOCATION_CREATED); expect(userReceived).toHaveLength(1); expect(userReceived[0]!.type).toBe(SSE_EVENT_TYPES.NOTIFICATION_CREATED); unsubscribeManager(); unsubscribeUser(); }); it("does not batch events from different audiences together", () => { const firstUserReceived: SseEvent[] = []; const secondUserReceived: SseEvent[] = []; const unsubscribeFirst = eventBus.subscribe((event) => { firstUserReceived.push(event); }, { audiences: [userAudience("user_1")], includeUnscoped: false, }); const unsubscribeSecond = eventBus.subscribe((event) => { secondUserReceived.push(event); }, { audiences: [userAudience("user_2")], includeUnscoped: false, }); eventBus.emit( SSE_EVENT_TYPES.NOTIFICATION_CREATED, { notificationId: "n1" }, [userAudience("user_1")], ); eventBus.emit( SSE_EVENT_TYPES.NOTIFICATION_CREATED, { notificationId: "n2" }, [userAudience("user_2")], ); vi.advanceTimersByTime(50); expect(firstUserReceived).toHaveLength(1); expect(firstUserReceived[0]!.payload).toEqual({ notificationId: "n1" }); expect(secondUserReceived).toHaveLength(1); expect(secondUserReceived[0]!.payload).toEqual({ notificationId: "n2" }); unsubscribeFirst(); unsubscribeSecond(); }); it("delivers planning events to both staff and the affected resource audience", () => { const managerReceived: SseEvent[] = []; const resourceReceived: SseEvent[] = []; const unsubscribeManager = eventBus.subscribe((event) => { managerReceived.push(event); }, { audiences: [permissionAudience("manageAllocations")], includeUnscoped: false, }); const unsubscribeResource = eventBus.subscribe((event) => { resourceReceived.push(event); }, { audiences: [resourceAudience("res_1")], includeUnscoped: false, }); eventBus.emit( SSE_EVENT_TYPES.ALLOCATION_UPDATED, { id: "a1", resourceId: "res_1" }, [permissionAudience("manageAllocations"), resourceAudience("res_1")], ); vi.advanceTimersByTime(50); expect(managerReceived).toHaveLength(1); expect(resourceReceived).toHaveLength(1); expect(managerReceived[0]!.type).toBe(SSE_EVENT_TYPES.ALLOCATION_UPDATED); expect(resourceReceived[0]!.payload).toEqual({ id: "a1", resourceId: "res_1" }); unsubscribeManager(); unsubscribeResource(); }); });