fix(timeline): resync after sse reconnect
This commit is contained in:
@@ -1,6 +1,10 @@
|
||||
import { SSE_EVENT_TYPES } from "@capakraken/shared";
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { getTimelineSseInvalidationKeys, parseTimelineSseEvent } from "./timelineSsePolicy.js";
|
||||
import {
|
||||
getTimelineSseInvalidationKeys,
|
||||
getTimelineSseResyncKeys,
|
||||
parseTimelineSseEvent,
|
||||
} from "./timelineSsePolicy.js";
|
||||
|
||||
describe("timelineSsePolicy", () => {
|
||||
it("returns null for malformed event payloads", () => {
|
||||
@@ -33,4 +37,23 @@ describe("timelineSsePolicy", () => {
|
||||
[["notification", "unreadCount"]],
|
||||
]);
|
||||
});
|
||||
|
||||
it("returns the full resync invalidation set for reconnect catch-up", () => {
|
||||
expect(getTimelineSseResyncKeys()).toEqual([
|
||||
[["timeline", "getEntries"]],
|
||||
[["timeline", "getEntriesView"]],
|
||||
[["timeline", "getMyEntriesView"]],
|
||||
[["timeline", "getHolidayOverlays"]],
|
||||
[["timeline", "getMyHolidayOverlays"]],
|
||||
[["vacation", "list"]],
|
||||
[["allocation", "list"]],
|
||||
[["project", "list"]],
|
||||
[["timeline", "getBudgetStatus"]],
|
||||
[["notification", "listTasks"]],
|
||||
[["notification", "taskCounts"]],
|
||||
[["notification", "list"]],
|
||||
[["notification", "unreadCount"]],
|
||||
[["notification", "listReminders"]],
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -15,6 +15,18 @@ const NOTIFICATION_KEYS: TimelineQueryKey[] = [
|
||||
[["notification", "unreadCount"]],
|
||||
];
|
||||
|
||||
const RESYNC_INVALIDATION_KEYS: TimelineQueryKey[] = [
|
||||
...TIMELINE_ENTRY_KEYS,
|
||||
[["vacation", "list"]],
|
||||
[["allocation", "list"]],
|
||||
[["project", "list"]],
|
||||
[["timeline", "getBudgetStatus"]],
|
||||
[["notification", "listTasks"]],
|
||||
[["notification", "taskCounts"]],
|
||||
...NOTIFICATION_KEYS,
|
||||
[["notification", "listReminders"]],
|
||||
];
|
||||
|
||||
export function parseTimelineSseEvent(rawData: string): string | null {
|
||||
try {
|
||||
const data = JSON.parse(rawData) as { type?: unknown };
|
||||
@@ -69,3 +81,7 @@ export function getTimelineSseInvalidationKeys(eventType: string): TimelineQuery
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
export function getTimelineSseResyncKeys(): TimelineQueryKey[] {
|
||||
return RESYNC_INVALIDATION_KEYS;
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { SSE_EVENT_TYPES } from "@capakraken/shared";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { getTimelineSseResyncKeys } from "./timelineSsePolicy.js";
|
||||
|
||||
const invalidateQueries = vi.fn();
|
||||
const effectCleanups: Array<() => void> = [];
|
||||
@@ -83,11 +84,23 @@ describe("useTimelineSSE", () => {
|
||||
|
||||
expect(MockEventSource.instances).toHaveLength(1);
|
||||
|
||||
MockEventSource.instances[0]?.emitOpen();
|
||||
|
||||
MockEventSource.instances[0]?.emitMessage(JSON.stringify({ type: SSE_EVENT_TYPES.PING }));
|
||||
|
||||
expect(invalidateQueries).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not resync on the initial successful SSE connection", () => {
|
||||
useTimelineSSE();
|
||||
|
||||
expect(MockEventSource.instances).toHaveLength(1);
|
||||
|
||||
MockEventSource.instances[0]?.emitOpen();
|
||||
|
||||
expect(invalidateQueries).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("resets reconnect backoff after ping and reconnects only once per pending timer", () => {
|
||||
useTimelineSSE();
|
||||
|
||||
@@ -117,6 +130,31 @@ describe("useTimelineSSE", () => {
|
||||
expect(invalidateQueries).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("resyncs timeline queries exactly once after a successful reconnect", () => {
|
||||
useTimelineSSE();
|
||||
|
||||
const firstConnection = MockEventSource.instances[0];
|
||||
expect(firstConnection).toBeDefined();
|
||||
|
||||
firstConnection?.emitOpen();
|
||||
firstConnection?.emitError();
|
||||
|
||||
vi.advanceTimersByTime(2_000);
|
||||
|
||||
expect(MockEventSource.instances).toHaveLength(2);
|
||||
|
||||
const secondConnection = MockEventSource.instances[1];
|
||||
expect(secondConnection).toBeDefined();
|
||||
|
||||
secondConnection?.emitOpen();
|
||||
secondConnection?.emitOpen();
|
||||
|
||||
expect(invalidateQueries).toHaveBeenCalledTimes(getTimelineSseResyncKeys().length);
|
||||
expect(invalidateQueries.mock.calls).toEqual(
|
||||
getTimelineSseResyncKeys().map((queryKey) => [{ queryKey }]),
|
||||
);
|
||||
});
|
||||
|
||||
it("clears a pending reconnect when the hook is disposed", () => {
|
||||
useTimelineSSE();
|
||||
|
||||
|
||||
@@ -3,7 +3,11 @@
|
||||
import { SSE_EVENT_TYPES } from "@capakraken/shared";
|
||||
import { useQueryClient } from "@tanstack/react-query";
|
||||
import { useEffect, useRef } from "react";
|
||||
import { getTimelineSseInvalidationKeys, parseTimelineSseEvent } from "./timelineSsePolicy.js";
|
||||
import {
|
||||
getTimelineSseInvalidationKeys,
|
||||
getTimelineSseResyncKeys,
|
||||
parseTimelineSseEvent,
|
||||
} from "./timelineSsePolicy.js";
|
||||
|
||||
/**
|
||||
* Connects to the SSE timeline endpoint and invalidates React Query caches
|
||||
@@ -17,6 +21,7 @@ export function useTimelineSSE() {
|
||||
let es: EventSource | null = null;
|
||||
let reconnectAttempts = 0;
|
||||
let isDisposed = false;
|
||||
let shouldResyncOnOpen = false;
|
||||
|
||||
function scheduleReconnect() {
|
||||
if (isDisposed || reconnectTimeout.current) return;
|
||||
@@ -35,6 +40,14 @@ export function useTimelineSSE() {
|
||||
|
||||
es.onopen = () => {
|
||||
reconnectAttempts = 0;
|
||||
if (!shouldResyncOnOpen) {
|
||||
return;
|
||||
}
|
||||
|
||||
shouldResyncOnOpen = false;
|
||||
for (const queryKey of getTimelineSseResyncKeys()) {
|
||||
void queryClient.invalidateQueries({ queryKey });
|
||||
}
|
||||
};
|
||||
|
||||
es.onmessage = (event) => {
|
||||
@@ -54,6 +67,7 @@ export function useTimelineSSE() {
|
||||
};
|
||||
|
||||
es.onerror = () => {
|
||||
shouldResyncOnOpen = true;
|
||||
es?.close();
|
||||
scheduleReconnect();
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user