From 6d4de85660245836c5f368be1f4a4addeb05a35f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hartmut=20N=C3=B6renberg?= Date: Tue, 31 Mar 2026 22:36:53 +0200 Subject: [PATCH] fix(api): harden reminder and webhook delivery --- .../src/__tests__/reminder-scheduler.test.ts | 179 ++++++++++++++++++ .../src/__tests__/webhook-dispatcher.test.ts | 122 ++++++++++++ packages/api/src/lib/reminder-scheduler.ts | 133 ++++++++----- packages/api/src/lib/webhook-dispatcher.ts | 15 +- 4 files changed, 396 insertions(+), 53 deletions(-) create mode 100644 packages/api/src/__tests__/reminder-scheduler.test.ts create mode 100644 packages/api/src/__tests__/webhook-dispatcher.test.ts diff --git a/packages/api/src/__tests__/reminder-scheduler.test.ts b/packages/api/src/__tests__/reminder-scheduler.test.ts new file mode 100644 index 0000000..278eb7e --- /dev/null +++ b/packages/api/src/__tests__/reminder-scheduler.test.ts @@ -0,0 +1,179 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { emitNotificationCreated, emitReminderDue } from "../sse/event-bus.js"; +import { logger } from "../lib/logger.js"; +import { startReminderScheduler, stopReminderScheduler } from "../lib/reminder-scheduler.js"; + +const { findMany, create, update } = vi.hoisted(() => ({ + findMany: vi.fn(), + create: vi.fn(), + update: vi.fn(), +})); + +vi.mock("@capakraken/db", () => ({ + prisma: { + notification: { + findMany, + create, + update, + }, + }, +})); + +vi.mock("../sse/event-bus.js", () => ({ + emitReminderDue: vi.fn(), + emitNotificationCreated: vi.fn(), +})); + +vi.mock("../lib/logger.js", () => ({ + logger: { + error: vi.fn(), + warn: vi.fn(), + info: vi.fn(), + debug: vi.fn(), + }, +})); + +describe("reminder scheduler logging", () => { + beforeEach(() => { + vi.clearAllMocks(); + vi.useRealTimers(); + findMany.mockResolvedValue([]); + create.mockResolvedValue({ id: "notification_1" }); + update.mockResolvedValue({}); + }); + + afterEach(() => { + stopReminderScheduler(); + }); + + it("logs scheduler lifecycle with structured metadata", async () => { + startReminderScheduler(); + + await vi.waitFor(() => { + expect(findMany).toHaveBeenCalled(); + }); + + expect(logger.info).toHaveBeenCalledWith( + { pollIntervalMs: 60_000 }, + "Starting reminder scheduler", + ); + + stopReminderScheduler(); + + expect(logger.info).toHaveBeenCalledWith("Stopped reminder scheduler"); + }); + + it("logs reminder processing failures with reminder context", async () => { + findMany.mockResolvedValueOnce([ + { + id: "reminder_1", + userId: "user_1", + recurrence: null, + priority: "HIGH", + title: "Follow up", + body: "Body", + entityId: null, + entityType: null, + link: null, + nextRemindAt: new Date("2026-03-30T08:00:00.000Z"), + }, + ]); + update.mockRejectedValueOnce(new Error("db write failed")); + + startReminderScheduler(); + + await vi.waitFor(() => { + expect(logger.error).toHaveBeenCalledWith( + { + err: expect.any(Error), + reminderId: "reminder_1", + userId: "user_1", + }, + "Failed to process reminder", + ); + }); + + expect(emitReminderDue).not.toHaveBeenCalled(); + expect(emitNotificationCreated).not.toHaveBeenCalled(); + }); + + it("does not start a second reminder poll while the previous batch is still running", async () => { + vi.useFakeTimers(); + + let releaseFindMany: ((value: unknown[]) => void) | null = null; + findMany.mockImplementationOnce( + () => + new Promise((resolve) => { + releaseFindMany = resolve; + }), + ); + + startReminderScheduler(); + + expect(findMany).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(60_000); + + expect(findMany).toHaveBeenCalledTimes(1); + expect(logger.warn).toHaveBeenCalledWith( + "Skipping reminder scheduler tick while previous run is still processing", + ); + + releaseFindMany?.([]); + await Promise.resolve(); + + await vi.advanceTimersByTimeAsync(60_000); + + expect(findMany).toHaveBeenCalledTimes(2); + }); + + it("advances overdue recurring reminders to the first occurrence after now", async () => { + const now = new Date(); + const staleNextRemindAt = new Date(now); + staleNextRemindAt.setDate(staleNextRemindAt.getDate() - 3); + staleNextRemindAt.setHours(8, 0, 0, 0); + + const expectedNextRemindAt = new Date(staleNextRemindAt); + while (expectedNextRemindAt.getTime() <= now.getTime()) { + expectedNextRemindAt.setDate(expectedNextRemindAt.getDate() + 1); + } + + const overdueReminder = { + id: "reminder_recurring_1", + userId: "user_1", + recurrence: "daily", + priority: "MEDIUM", + title: "Daily standup prep", + body: "Body", + entityId: null, + entityType: null, + link: null, + nextRemindAt: staleNextRemindAt, + }; + + findMany.mockResolvedValueOnce([overdueReminder]); + + startReminderScheduler(); + + await vi.waitFor(() => { + expect(create).toHaveBeenCalledWith( + expect.objectContaining({ + data: expect.objectContaining({ + userId: "user_1", + sourceId: "reminder_recurring_1", + type: "REMINDER_DUE", + }), + }), + ); + }); + + expect(update).toHaveBeenCalledWith({ + where: { id: "reminder_recurring_1" }, + data: { + nextRemindAt: expectedNextRemindAt, + }, + }); + expect(emitNotificationCreated).toHaveBeenCalledWith("user_1", "notification_1"); + expect(emitReminderDue).toHaveBeenCalledWith("user_1", "notification_1"); + }); +}); diff --git a/packages/api/src/__tests__/webhook-dispatcher.test.ts b/packages/api/src/__tests__/webhook-dispatcher.test.ts new file mode 100644 index 0000000..c2fe4b3 --- /dev/null +++ b/packages/api/src/__tests__/webhook-dispatcher.test.ts @@ -0,0 +1,122 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { logger } from "../lib/logger.js"; +import { dispatchWebhooks } from "../lib/webhook-dispatcher.js"; + +const { sendSlackNotification } = vi.hoisted(() => ({ + sendSlackNotification: vi.fn(), +})); + +vi.mock("../lib/slack-notify.js", () => ({ + sendSlackNotification, +})); + +vi.mock("../lib/logger.js", () => ({ + logger: { + error: vi.fn(), + warn: vi.fn(), + info: vi.fn(), + debug: vi.fn(), + }, +})); + +describe("webhook dispatcher logging", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + afterEach(() => { + vi.unstubAllGlobals(); + }); + + it("logs dispatcher-level failures with event context", async () => { + const db = { + webhook: { + findMany: vi.fn().mockRejectedValue(new Error("database unavailable")), + }, + }; + + dispatchWebhooks(db, "estimate.submitted", { id: "estimate_1" }); + + await vi.waitFor(() => { + expect(logger.error).toHaveBeenCalledWith( + { + err: expect.any(Error), + event: "estimate.submitted", + }, + "Failed to dispatch webhooks", + ); + }); + }); + + it("logs webhook delivery failures with webhook metadata", async () => { + sendSlackNotification.mockRejectedValueOnce(new Error("slack down")); + + const db = { + webhook: { + findMany: vi.fn().mockResolvedValue([ + { + id: "wh_1", + name: "Ops Slack", + url: "https://hooks.slack.com/services/test", + secret: null, + events: ["project.created"], + }, + ]), + }, + }; + + dispatchWebhooks(db, "project.created", { id: "project_1", name: "Project One" }); + + await vi.waitFor(() => { + expect(logger.warn).toHaveBeenCalledWith( + { + err: expect.any(Error), + event: "project.created", + webhookId: "wh_1", + webhookName: "Ops Slack", + webhookUrl: "https://hooks.slack.com/services/test", + }, + "Webhook delivery failed", + ); + }); + }); + + it("treats non-2xx HTTP webhook responses as delivery failures", async () => { + const fetchMock = vi.fn().mockResolvedValue({ + ok: false, + status: 500, + }); + vi.stubGlobal("fetch", fetchMock); + + const db = { + webhook: { + findMany: vi.fn().mockResolvedValue([ + { + id: "wh_http_1", + name: "Project Webhook", + url: "https://example.com/webhooks/project", + secret: "secret", + events: ["project.created"], + }, + ]), + }, + }; + + dispatchWebhooks(db, "project.created", { id: "project_1", name: "Project One" }); + + await vi.waitFor(() => { + expect(logger.warn).toHaveBeenCalledWith( + { + err: expect.any(Error), + event: "project.created", + webhookId: "wh_http_1", + webhookName: "Project Webhook", + webhookUrl: "https://example.com/webhooks/project", + }, + "Webhook delivery failed", + ); + }); + + expect(fetchMock).toHaveBeenCalledTimes(1); + }); +}); diff --git a/packages/api/src/lib/reminder-scheduler.ts b/packages/api/src/lib/reminder-scheduler.ts index 72c7d25..b45f947 100644 --- a/packages/api/src/lib/reminder-scheduler.ts +++ b/packages/api/src/lib/reminder-scheduler.ts @@ -1,7 +1,9 @@ import { prisma } from "@capakraken/db"; +import { logger } from "./logger.js"; import { emitReminderDue, emitNotificationCreated } from "../sse/event-bus.js"; const POLL_INTERVAL_MS = 60_000; // 60 seconds +let isProcessing = false; function computeNextRemindAt(current: Date, recurrence: string): Date { const next = new Date(current); @@ -21,61 +23,95 @@ function computeNextRemindAt(current: Date, recurrence: string): Date { return next; } +function computeNextRemindAtAfter(current: Date, recurrence: string, now: Date): Date { + let next = computeNextRemindAt(current, recurrence); + if (next.getTime() <= current.getTime()) { + return current; + } + + while (next.getTime() <= now.getTime()) { + const advanced = computeNextRemindAt(next, recurrence); + if (advanced.getTime() <= next.getTime()) { + return next; + } + next = advanced; + } + + return next; +} + async function processReminders() { + if (isProcessing) { + logger.warn("Skipping reminder scheduler tick while previous run is still processing"); + return; + } + + isProcessing = true; const now = new Date(); - // Find all due reminders - const dueReminders = await prisma.notification.findMany({ - where: { - category: "REMINDER", - nextRemindAt: { lte: now }, - }, - take: 100, // process in batches - }); + try { + // Find all due reminders + const dueReminders = await prisma.notification.findMany({ + where: { + category: "REMINDER", + nextRemindAt: { lte: now }, + }, + take: 100, // process in batches + }); - for (const reminder of dueReminders) { - try { - if (reminder.recurrence) { - // Recurring: create a new notification for this occurrence, advance nextRemindAt - const notification = await prisma.notification.create({ - data: { - userId: reminder.userId, - category: "NOTIFICATION", - type: "REMINDER_DUE", - priority: reminder.priority, - title: reminder.title, - body: reminder.body, - entityId: reminder.entityId, - entityType: reminder.entityType, - link: reminder.link, - sourceId: reminder.id, - channel: "in_app", - }, - }); + for (const reminder of dueReminders) { + try { + if (reminder.recurrence) { + // Recurring: create a new notification for this occurrence, advance nextRemindAt + const notification = await prisma.notification.create({ + data: { + userId: reminder.userId, + category: "NOTIFICATION", + type: "REMINDER_DUE", + priority: reminder.priority, + title: reminder.title, + body: reminder.body, + entityId: reminder.entityId, + entityType: reminder.entityType, + link: reminder.link, + sourceId: reminder.id, + channel: "in_app", + }, + }); - // Advance to next occurrence - await prisma.notification.update({ - where: { id: reminder.id }, - data: { - nextRemindAt: computeNextRemindAt(reminder.nextRemindAt!, reminder.recurrence), - }, - }); + // Advance to next occurrence + await prisma.notification.update({ + where: { id: reminder.id }, + data: { + nextRemindAt: computeNextRemindAtAfter( + reminder.nextRemindAt!, + reminder.recurrence, + now, + ), + }, + }); - emitNotificationCreated(reminder.userId, notification.id); - emitReminderDue(reminder.userId, notification.id); - } else { - // One-shot: mark the reminder as "fired" by clearing nextRemindAt - await prisma.notification.update({ - where: { id: reminder.id }, - data: { nextRemindAt: null }, - }); + emitNotificationCreated(reminder.userId, notification.id); + emitReminderDue(reminder.userId, notification.id); + } else { + // One-shot: mark the reminder as "fired" by clearing nextRemindAt + await prisma.notification.update({ + where: { id: reminder.id }, + data: { nextRemindAt: null }, + }); - emitReminderDue(reminder.userId, reminder.id); - emitNotificationCreated(reminder.userId, reminder.id); + emitReminderDue(reminder.userId, reminder.id); + emitNotificationCreated(reminder.userId, reminder.id); + } + } catch (err) { + logger.error( + { err, reminderId: reminder.id, userId: reminder.userId }, + "Failed to process reminder", + ); } - } catch (err) { - console.error(`[ReminderScheduler] Error processing reminder ${reminder.id}:`, err); } + } finally { + isProcessing = false; } } @@ -83,7 +119,7 @@ let intervalId: ReturnType | null = null; export function startReminderScheduler(): void { if (intervalId) return; // already running - console.log("[ReminderScheduler] Starting (poll every 60s)"); + logger.info({ pollIntervalMs: POLL_INTERVAL_MS }, "Starting reminder scheduler"); // Run immediately to catch up on overdue reminders void processReminders(); intervalId = setInterval(() => void processReminders(), POLL_INTERVAL_MS); @@ -93,6 +129,7 @@ export function stopReminderScheduler(): void { if (intervalId) { clearInterval(intervalId); intervalId = null; - console.log("[ReminderScheduler] Stopped"); + isProcessing = false; + logger.info("Stopped reminder scheduler"); } } diff --git a/packages/api/src/lib/webhook-dispatcher.ts b/packages/api/src/lib/webhook-dispatcher.ts index 337f385..b298e76 100644 --- a/packages/api/src/lib/webhook-dispatcher.ts +++ b/packages/api/src/lib/webhook-dispatcher.ts @@ -7,6 +7,7 @@ * Fire-and-forget — errors are logged, never thrown. */ import { createHmac } from "node:crypto"; +import { logger } from "./logger.js"; import { sendSlackNotification } from "./slack-notify.js"; /** Available webhook event types. */ @@ -72,7 +73,7 @@ async function _dispatch( await Promise.allSettled(promises); } catch (err) { - console.error("[webhook-dispatcher] failed to dispatch:", err); + logger.error({ err, event }, "Failed to dispatch webhooks"); } } @@ -108,19 +109,23 @@ async function _sendToWebhook( const timeout = setTimeout(() => controller.abort(), 5_000); try { - await fetch(wh.url, { + const response = await fetch(wh.url, { method: "POST", headers, body, signal: controller.signal, }); + + if (!response.ok) { + throw new Error(`Webhook responded with HTTP ${response.status}`); + } } finally { clearTimeout(timeout); } } catch (err) { - console.error( - `[webhook-dispatcher] error sending to "${wh.name}" (${wh.id}):`, - err, + logger.warn( + { err, event, webhookId: wh.id, webhookName: wh.name, webhookUrl: wh.url }, + "Webhook delivery failed", ); } }