import { z } from "zod"; import { TRPCError } from "@trpc/server"; import { PermissionKey, parseTaskAction, resolvePermissions } from "@capakraken/shared"; import { findUniqueOrThrow } from "../db/helpers.js"; import { createTRPCRouter, managerProcedure, protectedProcedure } from "../trpc.js"; import { emitNotificationCreated, emitTaskAssigned, emitTaskCompleted, emitTaskStatusChanged, emitBroadcastSent, } from "../sse/event-bus.js"; import { createNotification } from "../lib/create-notification.js"; import { resolveRecipients } from "../lib/notification-targeting.js"; import { sendEmail } from "../lib/email.js"; import { getTaskAction } from "../lib/task-actions.js"; // ─── Helpers ────────────────────────────────────────────────────────────────── /** Resolve the DB user id from the session email. Throws UNAUTHORIZED if not found. */ async function resolveUserId(ctx: { db: { user: { findUnique: (args: { where: { email: string }; select: { id: true }; }) => Promise<{ id: string } | null>; }; }; session: { user?: { email?: string | null } | null }; }): Promise { const email = ctx.session.user?.email; if (!email) throw new TRPCError({ code: "UNAUTHORIZED" }); const user = await ctx.db.user.findUnique({ where: { email }, select: { id: true }, }); if (!user) throw new TRPCError({ code: "UNAUTHORIZED" }); return user.id; } /** Send email notification (non-blocking). */ async function sendNotificationEmail( db: { user: { findUnique: (args: { where: { id: string }; select: { email: true; name: true } }) => Promise<{ email: string; name: string | null } | null> } }, userId: string, title: string, body?: string | null, ): Promise { try { const user = await db.user.findUnique({ where: { id: userId }, select: { email: true, name: true }, }); if (!user) return; void sendEmail({ to: user.email, subject: title, text: body ?? title, ...(body !== undefined && body !== null ? { html: `

${body}

` } : {}), }); } catch { // Non-blocking — swallow errors } } function getNotificationErrorCandidates(error: unknown): Array<{ code?: unknown; message?: unknown; meta?: { field_name?: unknown; modelName?: unknown }; cause?: unknown; }> { const queue: unknown[] = [error]; const seen = new Set(); const candidates: Array<{ code?: unknown; message?: unknown; meta?: { field_name?: unknown; modelName?: unknown }; cause?: unknown; }> = []; while (queue.length > 0) { const current = queue.shift(); if (!current || seen.has(current) || typeof current !== "object") { continue; } seen.add(current); const candidate = current as { code?: unknown; message?: unknown; meta?: { field_name?: unknown; modelName?: unknown }; cause?: unknown; shape?: { message?: unknown; data?: { cause?: unknown } }; }; candidates.push(candidate); if ("cause" in candidate) { queue.push(candidate.cause); } if (candidate.shape?.data?.cause) { queue.push(candidate.shape.data.cause); } } return candidates; } function rethrowNotificationReferenceError(error: unknown): never { for (const candidate of getNotificationErrorCandidates(error)) { const fieldName = typeof candidate.meta?.field_name === "string" ? candidate.meta.field_name.toLowerCase() : ""; const modelName = typeof candidate.meta?.modelName === "string" ? candidate.meta.modelName.toLowerCase() : ""; if ( typeof candidate.code === "string" && (candidate.code === "P2003" || candidate.code === "P2025") && fieldName.includes("sender") ) { throw new TRPCError({ code: "NOT_FOUND", message: "Sender user not found", cause: error, }); } if ( typeof candidate.code === "string" && (candidate.code === "P2003" || candidate.code === "P2025") && fieldName.includes("userid") ) { throw new TRPCError({ code: "NOT_FOUND", message: "Broadcast recipient user not found", cause: error, }); } if ( typeof candidate.code === "string" && (candidate.code === "P2003" || candidate.code === "P2025") && (modelName.includes("notificationbroadcast") || fieldName.includes("broadcast")) ) { throw new TRPCError({ code: "NOT_FOUND", message: "Notification broadcast not found", cause: error, }); } } throw error; } // ─── Zod Enums ──────────────────────────────────────────────────────────────── const categoryEnum = z.enum(["NOTIFICATION", "REMINDER", "TASK", "APPROVAL"]); const priorityEnum = z.enum(["LOW", "NORMAL", "HIGH", "URGENT"]); const taskStatusEnum = z.enum(["OPEN", "IN_PROGRESS", "DONE", "DISMISSED"]); const channelEnum = z.enum(["in_app", "email", "both"]); const recurrenceEnum = z.enum(["daily", "weekly", "monthly"]); const targetTypeEnum = z.enum(["user", "role", "project", "orgUnit", "all"]); // ─── Router ─────────────────────────────────────────────────────────────────── export const notificationRouter = createTRPCRouter({ // ═══════════════════════════════════════════════════════════════════════════ // EXISTING (enhanced) // ═══════════════════════════════════════════════════════════════════════════ /** List notifications for the current user with optional filters */ list: protectedProcedure .input( z.object({ unreadOnly: z.boolean().optional(), category: categoryEnum.optional(), taskStatus: taskStatusEnum.optional(), priority: priorityEnum.optional(), limit: z.number().min(1).max(100).default(50), }), ) .query(async ({ ctx, input }) => { const userId = await resolveUserId(ctx); return ctx.db.notification.findMany({ where: { userId, ...(input.unreadOnly ? { readAt: null } : {}), ...(input.category !== undefined ? { category: input.category } : {}), ...(input.taskStatus !== undefined ? { taskStatus: input.taskStatus } : {}), ...(input.priority !== undefined ? { priority: input.priority } : {}), }, orderBy: { createdAt: "desc" }, take: input.limit, }); }), /** Count unread notifications */ unreadCount: protectedProcedure.query(async ({ ctx }) => { const userId = await resolveUserId(ctx); return ctx.db.notification.count({ where: { userId, readAt: null }, }); }), /** Mark one or all as read */ markRead: protectedProcedure .input(z.object({ id: z.string().optional() })) .mutation(async ({ ctx, input }) => { const userId = await resolveUserId(ctx); const now = new Date(); if (input.id) { await ctx.db.notification.update({ where: { id: input.id, userId }, data: { readAt: now }, }); } else { await ctx.db.notification.updateMany({ where: { userId, readAt: null }, data: { readAt: now }, }); } }), /** Create a notification (enhanced with all new fields) */ create: managerProcedure .input( z.object({ userId: z.string(), type: z.string(), title: z.string(), body: z.string().optional(), entityId: z.string().optional(), entityType: z.string().optional(), // New fields category: categoryEnum.optional(), priority: priorityEnum.optional(), link: z.string().optional(), taskStatus: taskStatusEnum.optional(), taskAction: z.string().optional(), assigneeId: z.string().optional(), dueDate: z.date().optional(), channel: channelEnum.optional(), senderId: z.string().optional(), }), ) .mutation(async ({ ctx, input }) => { const currentUserId = ctx.dbUser.id; const notificationId = await createNotification({ db: ctx.db, userId: input.userId, type: input.type, title: input.title, body: input.body, entityId: input.entityId, entityType: input.entityType, category: input.category, priority: input.priority, link: input.link, taskStatus: input.taskStatus, taskAction: input.taskAction, assigneeId: input.assigneeId, dueDate: input.dueDate, channel: input.channel, senderId: input.senderId ?? currentUserId, }); // Emit task-specific events if (input.category === "TASK" || input.category === "APPROVAL") { emitTaskAssigned(input.userId, notificationId); } // Email if channel includes email const channel = input.channel ?? "in_app"; if (channel === "email" || channel === "both") { void sendNotificationEmail(ctx.db, input.userId, input.title, input.body); } // Re-fetch for return value (to maintain API contract) const n = await ctx.db.notification.findUnique({ where: { id: notificationId } }); return n; }), // ═══════════════════════════════════════════════════════════════════════════ // TASK MANAGEMENT // ═══════════════════════════════════════════════════════════════════════════ /** List tasks for the current user (as owner or assignee) */ listTasks: protectedProcedure .input( z.object({ status: taskStatusEnum.optional(), includeAssigned: z.boolean().default(true), limit: z.number().min(1).max(100).default(20), }), ) .query(async ({ ctx, input }) => { const userId = await resolveUserId(ctx); const userFilter = input.includeAssigned ? { OR: [{ userId }, { assigneeId: userId }] } : { userId }; return ctx.db.notification.findMany({ where: { ...userFilter, category: { in: ["TASK", "APPROVAL"] }, ...(input.status !== undefined ? { taskStatus: input.status } : {}), }, orderBy: [{ priority: "desc" }, { dueDate: "asc" }, { createdAt: "desc" }], take: input.limit, }); }), /** Get task counts for the current user — single groupBy instead of 5 counts */ taskCounts: protectedProcedure.query(async ({ ctx }) => { const userId = await resolveUserId(ctx); const now = new Date(); const where = { OR: [{ userId }, { assigneeId: userId }], category: { in: ["TASK" as const, "APPROVAL" as const] }, }; const [grouped, overdue] = await Promise.all([ ctx.db.notification.groupBy({ by: ["taskStatus"], where, _count: true, }), ctx.db.notification.count({ where: { ...where, taskStatus: { in: ["OPEN", "IN_PROGRESS"] }, dueDate: { lt: now }, }, }), ]); const counts: Record = {}; for (const g of grouped) { if (g.taskStatus) counts[g.taskStatus] = g._count; } return { open: counts["OPEN"] ?? 0, inProgress: counts["IN_PROGRESS"] ?? 0, done: counts["DONE"] ?? 0, dismissed: counts["DISMISSED"] ?? 0, overdue, }; }), /** Get one task/approval visible to the current user */ getTaskDetail: protectedProcedure .input(z.object({ id: z.string() })) .query(async ({ ctx, input }) => { const userId = await resolveUserId(ctx); const task = await ctx.db.notification.findFirst({ where: { id: input.id, OR: [{ userId }, { assigneeId: userId }], category: { in: ["TASK", "APPROVAL"] }, }, select: { id: true, title: true, body: true, type: true, priority: true, category: true, taskStatus: true, taskAction: true, dueDate: true, entityId: true, entityType: true, completedAt: true, completedBy: true, createdAt: true, userId: true, assigneeId: true, sender: { select: { id: true, name: true, email: true } }, }, }); if (!task) { throw new TRPCError({ code: "NOT_FOUND", message: "Task not found or you do not have permission", }); } return task; }), /** Update task status */ updateTaskStatus: protectedProcedure .input( z.object({ id: z.string(), status: taskStatusEnum, }), ) .mutation(async ({ ctx, input }) => { const userId = await resolveUserId(ctx); // Only allow if userId or assigneeId matches const existing = await ctx.db.notification.findFirst({ where: { id: input.id, OR: [{ userId }, { assigneeId: userId }], }, }); if (!existing) { throw new TRPCError({ code: "NOT_FOUND", message: "Task not found or you do not have permission", }); } const isCompleting = input.status === "DONE"; const updated = await ctx.db.notification.update({ where: { id: input.id }, data: { taskStatus: input.status, ...(isCompleting ? { completedAt: new Date(), completedBy: userId } : {}), }, }); if (isCompleting) { emitTaskCompleted(existing.userId, updated.id); // Also notify assignee if different if (existing.assigneeId && existing.assigneeId !== existing.userId) { emitTaskCompleted(existing.assigneeId, updated.id); } } else { emitTaskStatusChanged(existing.userId, updated.id); if (existing.assigneeId && existing.assigneeId !== existing.userId) { emitTaskStatusChanged(existing.assigneeId, updated.id); } } return updated; }), /** Execute the machine-readable action associated with a task */ executeTaskAction: protectedProcedure .input(z.object({ id: z.string() })) .mutation(async ({ ctx, input }) => { const userId = await resolveUserId(ctx); const task = await ctx.db.notification.findFirst({ where: { id: input.id, OR: [{ userId }, { assigneeId: userId }], category: { in: ["TASK", "APPROVAL"] }, }, select: { id: true, userId: true, assigneeId: true, taskAction: true, taskStatus: true, }, }); if (!task) { throw new TRPCError({ code: "NOT_FOUND", message: "Task not found or you do not have permission", }); } if (!task.taskAction) { throw new TRPCError({ code: "PRECONDITION_FAILED", message: "This task has no executable action", }); } if (task.taskStatus === "DONE") { throw new TRPCError({ code: "PRECONDITION_FAILED", message: "This task is already completed", }); } const parsed = parseTaskAction(task.taskAction); if (!parsed) { throw new TRPCError({ code: "BAD_REQUEST", message: `Invalid taskAction format: ${task.taskAction}`, }); } const handler = getTaskAction(parsed.action); if (!handler) { throw new TRPCError({ code: "BAD_REQUEST", message: `Unknown action: ${parsed.action}`, }); } const permissions = resolvePermissions( ctx.dbUser.systemRole as import("@capakraken/shared").SystemRole, ctx.dbUser.permissionOverrides as import("@capakraken/shared").PermissionOverrides | null, ctx.roleDefaults ?? undefined, ); if (handler.permission && !permissions.has(handler.permission as PermissionKey)) { throw new TRPCError({ code: "FORBIDDEN", message: `Permission denied: you need "${handler.permission}" to perform this action`, }); } const actionResult = await handler.execute(parsed.entityId, ctx.db, userId); if (!actionResult.success) { throw new TRPCError({ code: "BAD_REQUEST", message: actionResult.message, }); } const completedTask = await ctx.db.notification.update({ where: { id: input.id }, data: { taskStatus: "DONE", completedAt: new Date(), completedBy: userId, }, }); emitTaskCompleted(task.userId, task.id); if (task.assigneeId && task.assigneeId !== task.userId) { emitTaskCompleted(task.assigneeId, task.id); } return { task: completedTask, actionResult, }; }), // ═══════════════════════════════════════════════════════════════════════════ // REMINDERS // ═══════════════════════════════════════════════════════════════════════════ /** Create a personal reminder */ createReminder: protectedProcedure .input( z.object({ title: z.string().min(1).max(200), body: z.string().max(2000).optional(), remindAt: z.date(), recurrence: recurrenceEnum.optional(), entityId: z.string().optional(), entityType: z.string().optional(), link: z.string().optional(), }), ) .mutation(async ({ ctx, input }) => { const userId = await resolveUserId(ctx); // Reminders have extra fields (remindAt, nextRemindAt, recurrence) not covered // by the generic helper, so we keep the direct create here but still use // the exactOptionalPropertyTypes spread pattern. return ctx.db.notification.create({ data: { userId, type: "REMINDER", category: "REMINDER", title: input.title, ...(input.body !== undefined ? { body: input.body } : {}), remindAt: input.remindAt, nextRemindAt: input.remindAt, ...(input.recurrence !== undefined ? { recurrence: input.recurrence } : {}), ...(input.entityId !== undefined ? { entityId: input.entityId } : {}), ...(input.entityType !== undefined ? { entityType: input.entityType } : {}), ...(input.link !== undefined ? { link: input.link } : {}), channel: "in_app", }, }); }), /** Update a personal reminder */ updateReminder: protectedProcedure .input( z.object({ id: z.string(), title: z.string().min(1).max(200).optional(), body: z.string().max(2000).optional(), remindAt: z.date().optional(), recurrence: recurrenceEnum.nullish(), }), ) .mutation(async ({ ctx, input }) => { const userId = await resolveUserId(ctx); // Verify ownership const existing = await ctx.db.notification.findFirst({ where: { id: input.id, userId, category: "REMINDER" }, }); if (!existing) { throw new TRPCError({ code: "NOT_FOUND", message: "Reminder not found or you do not have permission", }); } return ctx.db.notification.update({ where: { id: input.id }, data: { ...(input.title !== undefined ? { title: input.title } : {}), ...(input.body !== undefined ? { body: input.body } : {}), ...(input.remindAt !== undefined ? { remindAt: input.remindAt, nextRemindAt: input.remindAt } : {}), // recurrence can be set to null (clear it) or a new value ...(input.recurrence !== undefined ? { recurrence: input.recurrence } : {}), }, }); }), /** Delete a personal reminder */ deleteReminder: protectedProcedure .input(z.object({ id: z.string() })) .mutation(async ({ ctx, input }) => { const userId = await resolveUserId(ctx); const existing = await ctx.db.notification.findFirst({ where: { id: input.id, userId, category: "REMINDER" }, }); if (!existing) { throw new TRPCError({ code: "NOT_FOUND", message: "Reminder not found or you do not have permission", }); } await ctx.db.notification.delete({ where: { id: input.id } }); }), /** List personal reminders */ listReminders: protectedProcedure .input(z.object({ limit: z.number().min(1).max(100).default(20) })) .query(async ({ ctx, input }) => { const userId = await resolveUserId(ctx); return ctx.db.notification.findMany({ where: { userId, category: "REMINDER" }, orderBy: { nextRemindAt: "asc" }, take: input.limit, }); }), // ═══════════════════════════════════════════════════════════════════════════ // BROADCASTS (Manager+) // ═══════════════════════════════════════════════════════════════════════════ /** Create and send a broadcast notification */ createBroadcast: managerProcedure .input( z.object({ title: z.string().min(1).max(200), body: z.string().max(2000).optional(), link: z.string().optional(), category: categoryEnum.default("NOTIFICATION"), priority: priorityEnum.default("NORMAL"), channel: channelEnum.default("in_app"), targetType: targetTypeEnum, targetValue: z.string().optional(), scheduledAt: z.date().optional(), taskAction: z.string().optional(), dueDate: z.date().optional(), }), ) .mutation(async ({ ctx, input }) => { const senderId = ctx.dbUser.id; // Scheduled broadcasts can be stored immediately because fan-out is deferred. if (input.scheduledAt && input.scheduledAt > new Date()) { return ctx.db.notificationBroadcast.create({ data: { senderId, title: input.title, ...(input.body !== undefined ? { body: input.body } : {}), ...(input.link !== undefined ? { link: input.link } : {}), category: input.category, priority: input.priority, channel: input.channel, targetType: input.targetType, ...(input.targetValue !== undefined ? { targetValue: input.targetValue } : {}), ...(input.scheduledAt !== undefined ? { scheduledAt: input.scheduledAt } : {}), }, }); } // Resolve recipients before persisting immediate broadcasts so empty targets // do not leave orphaned broadcast rows behind. const recipientIds = await resolveRecipients( input.targetType, input.targetValue, ctx.db, senderId, ); if (recipientIds.length === 0) { throw new TRPCError({ code: "BAD_REQUEST", message: "No recipients matched the broadcast target.", }); } const isTask = input.category === "TASK" || input.category === "APPROVAL"; const runImmediateBroadcast = async (db: typeof ctx.db) => { const broadcast = await db.notificationBroadcast.create({ data: { senderId, title: input.title, ...(input.body !== undefined ? { body: input.body } : {}), ...(input.link !== undefined ? { link: input.link } : {}), category: input.category, priority: input.priority, channel: input.channel, targetType: input.targetType, ...(input.targetValue !== undefined ? { targetValue: input.targetValue } : {}), ...(input.scheduledAt !== undefined ? { scheduledAt: input.scheduledAt } : {}), }, }); const createdNotificationIds: Array<{ id: string; userId: string }> = []; for (const recipientUserId of recipientIds) { const nId = await createNotification({ db, userId: recipientUserId, type: `BROADCAST_${input.category}`, title: input.title, body: input.body, link: input.link, category: input.category, priority: input.priority, channel: input.channel, sourceId: broadcast.id, senderId, taskStatus: isTask ? "OPEN" : undefined, taskAction: input.taskAction, dueDate: input.dueDate, emit: false, }); createdNotificationIds.push({ id: nId, userId: recipientUserId }); } const updatedBroadcast = await db.notificationBroadcast.update({ where: { id: broadcast.id }, data: { sentAt, recipientCount: createdNotificationIds.length, }, }); return { broadcast: updatedBroadcast, notificationIds: createdNotificationIds }; }; let persistedBroadcast: Awaited>["broadcast"]; let notificationIds: Array<{ id: string; userId: string }> = []; const sentAt = new Date(); try { const transactionResult = typeof ctx.db.$transaction === "function" ? await ctx.db.$transaction((tx) => runImmediateBroadcast(tx as typeof ctx.db)) : await runImmediateBroadcast(ctx.db); persistedBroadcast = transactionResult.broadcast; notificationIds = transactionResult.notificationIds; } catch (error) { rethrowNotificationReferenceError(error); } // 5. Emit side effects only after the transaction commits. for (const notification of notificationIds) { emitNotificationCreated(notification.userId, notification.id); if (isTask) { emitTaskAssigned(notification.userId, notification.id); } } emitBroadcastSent(persistedBroadcast.id, notificationIds.length); // 6. Send emails if channel includes email (non-blocking) if (input.channel === "email" || input.channel === "both") { for (const n of notificationIds) { void sendNotificationEmail(ctx.db, n.userId, input.title, input.body); } } return persistedBroadcast; }), /** List broadcasts */ listBroadcasts: managerProcedure .input(z.object({ limit: z.number().min(1).max(50).default(20) })) .query(async ({ ctx, input }) => { return ctx.db.notificationBroadcast.findMany({ orderBy: { createdAt: "desc" }, take: input.limit, include: { sender: { select: { id: true, name: true, email: true } }, }, }); }), /** Get one broadcast with sender context */ getBroadcastById: managerProcedure .input(z.object({ id: z.string() })) .query(async ({ ctx, input }) => { return findUniqueOrThrow( ctx.db.notificationBroadcast.findUnique({ where: { id: input.id }, include: { sender: { select: { id: true, name: true, email: true } }, }, }), "Broadcast", ); }), // ═══════════════════════════════════════════════════════════════════════════ // TASK CREATION (Manager+) // ═══════════════════════════════════════════════════════════════════════════ /** Create a task for a specific user */ createTask: managerProcedure .input( z.object({ userId: z.string(), title: z.string().min(1).max(200), body: z.string().max(2000).optional(), priority: priorityEnum.default("NORMAL"), dueDate: z.date().optional(), taskAction: z.string().optional(), entityId: z.string().optional(), entityType: z.string().optional(), link: z.string().optional(), channel: channelEnum.default("in_app"), }), ) .mutation(async ({ ctx, input }) => { const senderId = ctx.dbUser.id; const notificationId = await createNotification({ db: ctx.db, userId: input.userId, type: "TASK_CREATED", category: "TASK", taskStatus: "OPEN", title: input.title, priority: input.priority, senderId, channel: input.channel, body: input.body, dueDate: input.dueDate, taskAction: input.taskAction, entityId: input.entityId, entityType: input.entityType, link: input.link, }); emitTaskAssigned(input.userId, notificationId); // Send email if channel includes email if (input.channel === "email" || input.channel === "both") { void sendNotificationEmail(ctx.db, input.userId, input.title, input.body); } // Re-fetch for return value const n = await ctx.db.notification.findUnique({ where: { id: notificationId } }); return n; }), /** Reassign a task to another user */ assignTask: managerProcedure .input(z.object({ id: z.string(), assigneeId: z.string() })) .mutation(async ({ ctx, input }) => { const existing = await findUniqueOrThrow( ctx.db.notification.findUnique({ where: { id: input.id } }), "Task", ); if (existing.category !== "TASK" && existing.category !== "APPROVAL") { throw new TRPCError({ code: "BAD_REQUEST", message: "Only tasks and approvals can be assigned", }); } const updated = await ctx.db.notification.update({ where: { id: input.id }, data: { assigneeId: input.assigneeId }, }); emitTaskAssigned(input.assigneeId, updated.id); return updated; }), // ═══════════════════════════════════════════════════════════════════════════ // DELETE // ═══════════════════════════════════════════════════════════════════════════ /** Delete own notification */ delete: protectedProcedure .input(z.object({ id: z.string() })) .mutation(async ({ ctx, input }) => { const userId = await resolveUserId(ctx); const existing = await ctx.db.notification.findFirst({ where: { id: input.id, userId }, }); if (!existing) { throw new TRPCError({ code: "NOT_FOUND", message: "Notification not found", }); } // Cannot delete tasks created by others (senderId differs) if ( (existing.category === "TASK" || existing.category === "APPROVAL") && existing.senderId && existing.senderId !== userId ) { throw new TRPCError({ code: "FORBIDDEN", message: "Cannot delete tasks created by others", }); } await ctx.db.notification.delete({ where: { id: input.id } }); }), });