fix(api): harden reminder and webhook delivery
This commit is contained in:
@@ -1,7 +1,9 @@
|
||||
import { prisma } from "@capakraken/db";
|
||||
import { logger } from "./logger.js";
|
||||
import { emitReminderDue, emitNotificationCreated } from "../sse/event-bus.js";
|
||||
|
||||
const POLL_INTERVAL_MS = 60_000; // 60 seconds
|
||||
let isProcessing = false;
|
||||
|
||||
function computeNextRemindAt(current: Date, recurrence: string): Date {
|
||||
const next = new Date(current);
|
||||
@@ -21,61 +23,95 @@ function computeNextRemindAt(current: Date, recurrence: string): Date {
|
||||
return next;
|
||||
}
|
||||
|
||||
function computeNextRemindAtAfter(current: Date, recurrence: string, now: Date): Date {
|
||||
let next = computeNextRemindAt(current, recurrence);
|
||||
if (next.getTime() <= current.getTime()) {
|
||||
return current;
|
||||
}
|
||||
|
||||
while (next.getTime() <= now.getTime()) {
|
||||
const advanced = computeNextRemindAt(next, recurrence);
|
||||
if (advanced.getTime() <= next.getTime()) {
|
||||
return next;
|
||||
}
|
||||
next = advanced;
|
||||
}
|
||||
|
||||
return next;
|
||||
}
|
||||
|
||||
async function processReminders() {
|
||||
if (isProcessing) {
|
||||
logger.warn("Skipping reminder scheduler tick while previous run is still processing");
|
||||
return;
|
||||
}
|
||||
|
||||
isProcessing = true;
|
||||
const now = new Date();
|
||||
|
||||
// Find all due reminders
|
||||
const dueReminders = await prisma.notification.findMany({
|
||||
where: {
|
||||
category: "REMINDER",
|
||||
nextRemindAt: { lte: now },
|
||||
},
|
||||
take: 100, // process in batches
|
||||
});
|
||||
try {
|
||||
// Find all due reminders
|
||||
const dueReminders = await prisma.notification.findMany({
|
||||
where: {
|
||||
category: "REMINDER",
|
||||
nextRemindAt: { lte: now },
|
||||
},
|
||||
take: 100, // process in batches
|
||||
});
|
||||
|
||||
for (const reminder of dueReminders) {
|
||||
try {
|
||||
if (reminder.recurrence) {
|
||||
// Recurring: create a new notification for this occurrence, advance nextRemindAt
|
||||
const notification = await prisma.notification.create({
|
||||
data: {
|
||||
userId: reminder.userId,
|
||||
category: "NOTIFICATION",
|
||||
type: "REMINDER_DUE",
|
||||
priority: reminder.priority,
|
||||
title: reminder.title,
|
||||
body: reminder.body,
|
||||
entityId: reminder.entityId,
|
||||
entityType: reminder.entityType,
|
||||
link: reminder.link,
|
||||
sourceId: reminder.id,
|
||||
channel: "in_app",
|
||||
},
|
||||
});
|
||||
for (const reminder of dueReminders) {
|
||||
try {
|
||||
if (reminder.recurrence) {
|
||||
// Recurring: create a new notification for this occurrence, advance nextRemindAt
|
||||
const notification = await prisma.notification.create({
|
||||
data: {
|
||||
userId: reminder.userId,
|
||||
category: "NOTIFICATION",
|
||||
type: "REMINDER_DUE",
|
||||
priority: reminder.priority,
|
||||
title: reminder.title,
|
||||
body: reminder.body,
|
||||
entityId: reminder.entityId,
|
||||
entityType: reminder.entityType,
|
||||
link: reminder.link,
|
||||
sourceId: reminder.id,
|
||||
channel: "in_app",
|
||||
},
|
||||
});
|
||||
|
||||
// Advance to next occurrence
|
||||
await prisma.notification.update({
|
||||
where: { id: reminder.id },
|
||||
data: {
|
||||
nextRemindAt: computeNextRemindAt(reminder.nextRemindAt!, reminder.recurrence),
|
||||
},
|
||||
});
|
||||
// Advance to next occurrence
|
||||
await prisma.notification.update({
|
||||
where: { id: reminder.id },
|
||||
data: {
|
||||
nextRemindAt: computeNextRemindAtAfter(
|
||||
reminder.nextRemindAt!,
|
||||
reminder.recurrence,
|
||||
now,
|
||||
),
|
||||
},
|
||||
});
|
||||
|
||||
emitNotificationCreated(reminder.userId, notification.id);
|
||||
emitReminderDue(reminder.userId, notification.id);
|
||||
} else {
|
||||
// One-shot: mark the reminder as "fired" by clearing nextRemindAt
|
||||
await prisma.notification.update({
|
||||
where: { id: reminder.id },
|
||||
data: { nextRemindAt: null },
|
||||
});
|
||||
emitNotificationCreated(reminder.userId, notification.id);
|
||||
emitReminderDue(reminder.userId, notification.id);
|
||||
} else {
|
||||
// One-shot: mark the reminder as "fired" by clearing nextRemindAt
|
||||
await prisma.notification.update({
|
||||
where: { id: reminder.id },
|
||||
data: { nextRemindAt: null },
|
||||
});
|
||||
|
||||
emitReminderDue(reminder.userId, reminder.id);
|
||||
emitNotificationCreated(reminder.userId, reminder.id);
|
||||
emitReminderDue(reminder.userId, reminder.id);
|
||||
emitNotificationCreated(reminder.userId, reminder.id);
|
||||
}
|
||||
} catch (err) {
|
||||
logger.error(
|
||||
{ err, reminderId: reminder.id, userId: reminder.userId },
|
||||
"Failed to process reminder",
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(`[ReminderScheduler] Error processing reminder ${reminder.id}:`, err);
|
||||
}
|
||||
} finally {
|
||||
isProcessing = false;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -83,7 +119,7 @@ let intervalId: ReturnType<typeof setInterval> | null = null;
|
||||
|
||||
export function startReminderScheduler(): void {
|
||||
if (intervalId) return; // already running
|
||||
console.log("[ReminderScheduler] Starting (poll every 60s)");
|
||||
logger.info({ pollIntervalMs: POLL_INTERVAL_MS }, "Starting reminder scheduler");
|
||||
// Run immediately to catch up on overdue reminders
|
||||
void processReminders();
|
||||
intervalId = setInterval(() => void processReminders(), POLL_INTERVAL_MS);
|
||||
@@ -93,6 +129,7 @@ export function stopReminderScheduler(): void {
|
||||
if (intervalId) {
|
||||
clearInterval(intervalId);
|
||||
intervalId = null;
|
||||
console.log("[ReminderScheduler] Stopped");
|
||||
isProcessing = false;
|
||||
logger.info("Stopped reminder scheduler");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
* Fire-and-forget — errors are logged, never thrown.
|
||||
*/
|
||||
import { createHmac } from "node:crypto";
|
||||
import { logger } from "./logger.js";
|
||||
import { sendSlackNotification } from "./slack-notify.js";
|
||||
|
||||
/** Available webhook event types. */
|
||||
@@ -72,7 +73,7 @@ async function _dispatch(
|
||||
|
||||
await Promise.allSettled(promises);
|
||||
} catch (err) {
|
||||
console.error("[webhook-dispatcher] failed to dispatch:", err);
|
||||
logger.error({ err, event }, "Failed to dispatch webhooks");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -108,19 +109,23 @@ async function _sendToWebhook(
|
||||
const timeout = setTimeout(() => controller.abort(), 5_000);
|
||||
|
||||
try {
|
||||
await fetch(wh.url, {
|
||||
const response = await fetch(wh.url, {
|
||||
method: "POST",
|
||||
headers,
|
||||
body,
|
||||
signal: controller.signal,
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error(`Webhook responded with HTTP ${response.status}`);
|
||||
}
|
||||
} finally {
|
||||
clearTimeout(timeout);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(
|
||||
`[webhook-dispatcher] error sending to "${wh.name}" (${wh.id}):`,
|
||||
err,
|
||||
logger.warn(
|
||||
{ err, event, webhookId: wh.id, webhookName: wh.name, webhookUrl: wh.url },
|
||||
"Webhook delivery failed",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user