import { SSE_EVENT_TYPES } from "@planarchy/shared"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { cancelPendingEvents, eventBus, flushPendingEvents, type SseEvent, } from "../sse/event-bus.js"; // Mock Redis so the module loads without a real connection. // publish() throws so the event bus falls back to local-only delivery, // which is the path that exercises the debounce buffer. vi.mock("ioredis", () => { const RedisMock = vi.fn().mockImplementation(() => ({ on: vi.fn(), subscribe: vi.fn().mockResolvedValue(undefined), publish: vi.fn().mockImplementation(() => { throw new Error("Redis unavailable (test)"); }), })); return { Redis: RedisMock }; }); describe("event-bus debounce", () => { let received: SseEvent[]; let unsubscribe: () => void; beforeEach(() => { vi.useFakeTimers(); received = []; unsubscribe = eventBus.subscribe((event) => { received.push(event); }); }); afterEach(() => { unsubscribe(); cancelPendingEvents(); vi.useRealTimers(); }); it("delivers a single event after the debounce window", () => { eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, { id: "a1" }); // Not yet delivered — still in the debounce window expect(received).toHaveLength(0); vi.advanceTimersByTime(50); // Now delivered expect(received).toHaveLength(1); expect(received[0]!.type).toBe(SSE_EVENT_TYPES.ALLOCATION_CREATED); expect(received[0]!.payload).toEqual({ id: "a1" }); }); it("aggregates multiple events of the same type into a single _batch event", () => { eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, { id: "a1" }); eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, { id: "a2" }); eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, { id: "a3" }); expect(received).toHaveLength(0); vi.advanceTimersByTime(50); expect(received).toHaveLength(1); expect(received[0]!.type).toBe(SSE_EVENT_TYPES.ALLOCATION_UPDATED); expect(received[0]!.payload).toEqual({ _batch: [{ id: "a1" }, { id: "a2" }, { id: "a3" }], }); }); it("keeps different event types separate", () => { eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, { id: "a1" }); eventBus.emit(SSE_EVENT_TYPES.ROLE_UPDATED, { id: "r1" }); vi.advanceTimersByTime(50); expect(received).toHaveLength(2); const types = received.map((e) => e.type); expect(types).toContain(SSE_EVENT_TYPES.ALLOCATION_CREATED); expect(types).toContain(SSE_EVENT_TYPES.ROLE_UPDATED); // Both should be single payloads (not batched) for (const event of received) { expect(event.payload).not.toHaveProperty("_batch"); } }); it("resets the debounce timer when new events arrive within the window", () => { eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, { id: "a1" }); // Advance 30ms (still within window) vi.advanceTimersByTime(30); expect(received).toHaveLength(0); // Emit another — this resets the timer eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, { id: "a2" }); // Advance another 30ms (60ms total from first, but only 30ms from second) vi.advanceTimersByTime(30); expect(received).toHaveLength(0); // Advance remaining 20ms (now 50ms from second event) vi.advanceTimersByTime(20); expect(received).toHaveLength(1); expect(received[0]!.payload).toEqual({ _batch: [{ id: "a1" }, { id: "a2" }], }); }); it("flushPendingEvents delivers immediately", () => { eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, { id: "a1" }); eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, { id: "a2" }); expect(received).toHaveLength(0); flushPendingEvents(); expect(received).toHaveLength(1); expect(received[0]!.payload).toEqual({ _batch: [{ id: "a1" }, { id: "a2" }], }); // Timer should not fire again vi.advanceTimersByTime(100); expect(received).toHaveLength(1); }); it("cancelPendingEvents discards events without delivering", () => { eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, { id: "a1" }); eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, { id: "a2" }); cancelPendingEvents(); vi.advanceTimersByTime(100); expect(received).toHaveLength(0); }); it("preserves the timestamp of the first event in a batch", () => { const before = new Date().toISOString(); eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, { id: "a1" }); // Advance a bit, emit another vi.advanceTimersByTime(10); eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, { id: "a2" }); vi.advanceTimersByTime(50); expect(received).toHaveLength(1); // The timestamp should be from the first event (not later) expect(received[0]!.timestamp).toBe(before); }); });