136 lines
3.8 KiB
TypeScript
136 lines
3.8 KiB
TypeScript
import { prisma } from "@capakraken/db";
|
|
import { logger } from "./logger.js";
|
|
import { emitReminderDue, emitNotificationCreated } from "../sse/event-bus.js";
|
|
|
|
const POLL_INTERVAL_MS = 60_000; // 60 seconds
|
|
let isProcessing = false;
|
|
|
|
function computeNextRemindAt(current: Date, recurrence: string): Date {
|
|
const next = new Date(current);
|
|
switch (recurrence) {
|
|
case "daily":
|
|
next.setDate(next.getDate() + 1);
|
|
break;
|
|
case "weekly":
|
|
next.setDate(next.getDate() + 7);
|
|
break;
|
|
case "monthly":
|
|
next.setMonth(next.getMonth() + 1);
|
|
break;
|
|
default:
|
|
return current;
|
|
}
|
|
return next;
|
|
}
|
|
|
|
function computeNextRemindAtAfter(current: Date, recurrence: string, now: Date): Date {
|
|
let next = computeNextRemindAt(current, recurrence);
|
|
if (next.getTime() <= current.getTime()) {
|
|
return current;
|
|
}
|
|
|
|
while (next.getTime() <= now.getTime()) {
|
|
const advanced = computeNextRemindAt(next, recurrence);
|
|
if (advanced.getTime() <= next.getTime()) {
|
|
return next;
|
|
}
|
|
next = advanced;
|
|
}
|
|
|
|
return next;
|
|
}
|
|
|
|
async function processReminders() {
|
|
if (isProcessing) {
|
|
logger.warn("Skipping reminder scheduler tick while previous run is still processing");
|
|
return;
|
|
}
|
|
|
|
isProcessing = true;
|
|
const now = new Date();
|
|
|
|
try {
|
|
// Find all due reminders
|
|
const dueReminders = await prisma.notification.findMany({
|
|
where: {
|
|
category: "REMINDER",
|
|
nextRemindAt: { lte: now },
|
|
},
|
|
take: 100, // process in batches
|
|
});
|
|
|
|
for (const reminder of dueReminders) {
|
|
try {
|
|
if (reminder.recurrence) {
|
|
// Recurring: create a new notification for this occurrence, advance nextRemindAt
|
|
const notification = await prisma.notification.create({
|
|
data: {
|
|
userId: reminder.userId,
|
|
category: "NOTIFICATION",
|
|
type: "REMINDER_DUE",
|
|
priority: reminder.priority,
|
|
title: reminder.title,
|
|
body: reminder.body,
|
|
entityId: reminder.entityId,
|
|
entityType: reminder.entityType,
|
|
link: reminder.link,
|
|
sourceId: reminder.id,
|
|
channel: "in_app",
|
|
},
|
|
});
|
|
|
|
// Advance to next occurrence
|
|
await prisma.notification.update({
|
|
where: { id: reminder.id },
|
|
data: {
|
|
nextRemindAt: computeNextRemindAtAfter(
|
|
reminder.nextRemindAt!,
|
|
reminder.recurrence,
|
|
now,
|
|
),
|
|
},
|
|
});
|
|
|
|
emitNotificationCreated(reminder.userId, notification.id);
|
|
emitReminderDue(reminder.userId, notification.id);
|
|
} else {
|
|
// One-shot: mark the reminder as "fired" by clearing nextRemindAt
|
|
await prisma.notification.update({
|
|
where: { id: reminder.id },
|
|
data: { nextRemindAt: null },
|
|
});
|
|
|
|
emitReminderDue(reminder.userId, reminder.id);
|
|
emitNotificationCreated(reminder.userId, reminder.id);
|
|
}
|
|
} catch (err) {
|
|
logger.error(
|
|
{ err, reminderId: reminder.id, userId: reminder.userId },
|
|
"Failed to process reminder",
|
|
);
|
|
}
|
|
}
|
|
} finally {
|
|
isProcessing = false;
|
|
}
|
|
}
|
|
|
|
let intervalId: ReturnType<typeof setInterval> | null = null;
|
|
|
|
export function startReminderScheduler(): void {
|
|
if (intervalId) return; // already running
|
|
logger.info({ pollIntervalMs: POLL_INTERVAL_MS }, "Starting reminder scheduler");
|
|
// Run immediately to catch up on overdue reminders
|
|
void processReminders();
|
|
intervalId = setInterval(() => void processReminders(), POLL_INTERVAL_MS);
|
|
}
|
|
|
|
export function stopReminderScheduler(): void {
|
|
if (intervalId) {
|
|
clearInterval(intervalId);
|
|
intervalId = null;
|
|
isProcessing = false;
|
|
logger.info("Stopped reminder scheduler");
|
|
}
|
|
}
|