Files
Nexus/packages/api/src/__tests__/event-bus-debounce.test.ts
T
Hartmut cd78f72f33 chore: full technical rename planarchy → capakraken
Complete rename of all technical identifiers across the codebase:

Package names (11 packages):
- @planarchy/* → @capakraken/* in all package.json, tsconfig, imports

Import statements: 277 files, 548 occurrences replaced

Database & Docker:
- PostgreSQL user/db: planarchy → capakraken
- Docker volumes: planarchy_pgdata → capakraken_pgdata
- Connection strings updated in docker-compose, .env, CI

CI/CD:
- GitHub Actions workflow: all filter commands updated
- Test database credentials updated

Infrastructure:
- Redis channel: planarchy:sse → capakraken:sse
- Logger service name: planarchy-api → capakraken-api
- Anonymization seed updated
- Start/stop/restart scripts updated

Test data:
- Seed emails: @planarchy.dev → @capakraken.dev
- E2E test credentials: all 11 spec files updated
- Email defaults: @planarchy.app → @capakraken.app
- localStorage keys: planarchy_* → capakraken_*

Documentation: 30+ .md files updated

Verification:
- pnpm install: workspace resolution works
- TypeScript: only pre-existing TS2589 (no new errors)
- Engine: 310/310 tests pass
- Staffing: 37/37 tests pass

Co-Authored-By: claude-flow <ruv@ruv.net>
2026-03-27 13:18:09 +01:00

154 lines
4.7 KiB
TypeScript

import { SSE_EVENT_TYPES } from "@capakraken/shared";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import {
cancelPendingEvents,
eventBus,
flushPendingEvents,
type SseEvent,
} from "../sse/event-bus.js";
// Mock Redis so the module loads without a real connection.
// publish() throws so the event bus falls back to local-only delivery,
// which is the path that exercises the debounce buffer.
vi.mock("ioredis", () => {
const RedisMock = vi.fn().mockImplementation(() => ({
on: vi.fn(),
subscribe: vi.fn().mockResolvedValue(undefined),
publish: vi.fn().mockImplementation(() => {
throw new Error("Redis unavailable (test)");
}),
}));
return { Redis: RedisMock };
});
describe("event-bus debounce", () => {
let received: SseEvent[];
let unsubscribe: () => void;
beforeEach(() => {
vi.useFakeTimers();
received = [];
unsubscribe = eventBus.subscribe((event) => {
received.push(event);
});
});
afterEach(() => {
unsubscribe();
cancelPendingEvents();
vi.useRealTimers();
});
it("delivers a single event after the debounce window", () => {
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, { id: "a1" });
// Not yet delivered — still in the debounce window
expect(received).toHaveLength(0);
vi.advanceTimersByTime(50);
// Now delivered
expect(received).toHaveLength(1);
expect(received[0]!.type).toBe(SSE_EVENT_TYPES.ALLOCATION_CREATED);
expect(received[0]!.payload).toEqual({ id: "a1" });
});
it("aggregates multiple events of the same type into a single _batch event", () => {
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, { id: "a1" });
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, { id: "a2" });
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, { id: "a3" });
expect(received).toHaveLength(0);
vi.advanceTimersByTime(50);
expect(received).toHaveLength(1);
expect(received[0]!.type).toBe(SSE_EVENT_TYPES.ALLOCATION_UPDATED);
expect(received[0]!.payload).toEqual({
_batch: [{ id: "a1" }, { id: "a2" }, { id: "a3" }],
});
});
it("keeps different event types separate", () => {
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, { id: "a1" });
eventBus.emit(SSE_EVENT_TYPES.ROLE_UPDATED, { id: "r1" });
vi.advanceTimersByTime(50);
expect(received).toHaveLength(2);
const types = received.map((e) => e.type);
expect(types).toContain(SSE_EVENT_TYPES.ALLOCATION_CREATED);
expect(types).toContain(SSE_EVENT_TYPES.ROLE_UPDATED);
// Both should be single payloads (not batched)
for (const event of received) {
expect(event.payload).not.toHaveProperty("_batch");
}
});
it("resets the debounce timer when new events arrive within the window", () => {
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, { id: "a1" });
// Advance 30ms (still within window)
vi.advanceTimersByTime(30);
expect(received).toHaveLength(0);
// Emit another — this resets the timer
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, { id: "a2" });
// Advance another 30ms (60ms total from first, but only 30ms from second)
vi.advanceTimersByTime(30);
expect(received).toHaveLength(0);
// Advance remaining 20ms (now 50ms from second event)
vi.advanceTimersByTime(20);
expect(received).toHaveLength(1);
expect(received[0]!.payload).toEqual({
_batch: [{ id: "a1" }, { id: "a2" }],
});
});
it("flushPendingEvents delivers immediately", () => {
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, { id: "a1" });
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, { id: "a2" });
expect(received).toHaveLength(0);
flushPendingEvents();
expect(received).toHaveLength(1);
expect(received[0]!.payload).toEqual({
_batch: [{ id: "a1" }, { id: "a2" }],
});
// Timer should not fire again
vi.advanceTimersByTime(100);
expect(received).toHaveLength(1);
});
it("cancelPendingEvents discards events without delivering", () => {
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, { id: "a1" });
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_CREATED, { id: "a2" });
cancelPendingEvents();
vi.advanceTimersByTime(100);
expect(received).toHaveLength(0);
});
it("preserves the timestamp of the first event in a batch", () => {
const before = new Date().toISOString();
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, { id: "a1" });
// Advance a bit, emit another
vi.advanceTimersByTime(10);
eventBus.emit(SSE_EVENT_TYPES.ALLOCATION_UPDATED, { id: "a2" });
vi.advanceTimersByTime(50);
expect(received).toHaveLength(1);
// The timestamp should be from the first event (not later)
expect(received[0]!.timestamp).toBe(before);
});
});