import { TRPCError } from "@trpc/server"; import { z } from "zod"; import { findUniqueOrThrow } from "../db/helpers.js"; import { createNotification } from "../lib/create-notification.js"; import { resolveRecipients } from "../lib/notification-targeting.js"; import { emitNotificationCreated, emitTaskAssigned, } from "../sse/event-bus.js"; import { type BroadcastPersistenceDb, type BroadcastRecipientNotification, CreateBroadcastInputSchema, ListBroadcastsInputSchema, type NotificationProcedureContext, NotificationIdInputSchema, requireNotificationDbUser, rethrowNotificationReferenceError, SCHEDULED_TASK_BROADCAST_UNSUPPORTED_MESSAGE, sendNotificationEmail, } from "./notification-procedure-base.js"; function isFutureScheduledBroadcast(input: z.infer): boolean { return Boolean(input.scheduledAt && input.scheduledAt > new Date()); } function hasTaskLikeBroadcastMetadata(input: z.infer): boolean { return input.category === "TASK" || input.category === "APPROVAL" || input.taskAction !== undefined || input.dueDate !== undefined; } function requireImmediateBroadcastTransaction( db: BroadcastPersistenceDb, ): NonNullable { if (typeof db.$transaction !== "function") { throw new TRPCError({ code: "INTERNAL_SERVER_ERROR", message: "Immediate broadcasts require transactional persistence support.", }); } return db.$transaction.bind(db); } function buildBroadcastCreateData( senderId: string, input: z.infer, options: { includeScheduledAt?: boolean; recipientCount?: number; } = {}, ) { return { senderId, title: input.title, ...(input.body !== undefined ? { body: input.body } : {}), ...(input.link !== undefined ? { link: input.link } : {}), category: input.category, priority: input.priority, channel: input.channel, targetType: input.targetType, ...(input.targetValue !== undefined ? { targetValue: input.targetValue } : {}), ...(options.includeScheduledAt && input.scheduledAt !== undefined ? { scheduledAt: input.scheduledAt } : {}), ...(options.recipientCount !== undefined ? { recipientCount: options.recipientCount } : {}), }; } async function resolveBroadcastRecipientIds( ctx: NotificationProcedureContext, input: z.infer, senderId: string, ): Promise { const recipientIds = await resolveRecipients( input.targetType, input.targetValue, ctx.db, senderId, ); if (recipientIds.length === 0) { throw new TRPCError({ code: "BAD_REQUEST", message: "No recipients matched the broadcast target.", }); } return recipientIds; } async function createScheduledBroadcastRecord( db: BroadcastPersistenceDb, senderId: string, input: z.infer, recipientIds: string[], ) { return db.notificationBroadcast.create({ data: buildBroadcastCreateData(senderId, input, { includeScheduledAt: true, recipientCount: recipientIds.length, }), }); } async function persistImmediateBroadcast( db: BroadcastPersistenceDb, senderId: string, input: z.infer, recipientIds: string[], ) { const isTask = input.category === "TASK" || input.category === "APPROVAL"; const sentAt = new Date(); const broadcast = await db.notificationBroadcast.create({ data: buildBroadcastCreateData(senderId, input), }); const created = await Promise.all( recipientIds.map((recipientUserId) => createNotification({ db, userId: recipientUserId, type: `BROADCAST_${input.category}`, title: input.title, body: input.body, link: input.link, category: input.category, priority: input.priority, channel: input.channel, sourceId: broadcast.id, senderId, taskStatus: isTask ? "OPEN" : undefined, taskAction: input.taskAction, dueDate: input.dueDate, emit: false, }), ), ); const notificationIds: BroadcastRecipientNotification[] = created.map((id, i) => ({ id, userId: recipientIds[i]! })); const updatedBroadcast = await db.notificationBroadcast.update({ where: { id: broadcast.id }, data: { sentAt, recipientCount: notificationIds.length, }, }); return { broadcast: updatedBroadcast, notificationIds }; } function emitImmediateBroadcastSideEffects( db: BroadcastPersistenceDb, input: z.infer, notificationIds: BroadcastRecipientNotification[], ) { const isTask = input.category === "TASK" || input.category === "APPROVAL"; for (const notification of notificationIds) { emitNotificationCreated(notification.userId, notification.id); if (isTask) { emitTaskAssigned(notification.userId, notification.id); } } if (input.channel === "email" || input.channel === "both") { for (const notification of notificationIds) { void sendNotificationEmail(db, notification.userId, input.title, input.body); } } } export async function createBroadcast( ctx: NotificationProcedureContext, input: z.infer, ) { const senderId = requireNotificationDbUser(ctx).id; const isScheduledFutureBroadcast = isFutureScheduledBroadcast(input); if (isScheduledFutureBroadcast && hasTaskLikeBroadcastMetadata(input)) { throw new TRPCError({ code: "BAD_REQUEST", message: SCHEDULED_TASK_BROADCAST_UNSUPPORTED_MESSAGE, }); } const recipientIds = await resolveBroadcastRecipientIds(ctx, input, senderId); if (isScheduledFutureBroadcast) { try { return await createScheduledBroadcastRecord(ctx.db, senderId, input, recipientIds); } catch (error) { rethrowNotificationReferenceError(error, "broadcast"); } } let persistedBroadcast: Awaited>["broadcast"]; let notificationIds: BroadcastRecipientNotification[] = []; try { const transaction = requireImmediateBroadcastTransaction(ctx.db); const transactionResult = await transaction((tx) => persistImmediateBroadcast(tx as typeof ctx.db, senderId, input, recipientIds)); persistedBroadcast = transactionResult.broadcast; notificationIds = transactionResult.notificationIds; } catch (error) { rethrowNotificationReferenceError(error, "broadcast"); } emitImmediateBroadcastSideEffects(ctx.db, input, notificationIds); return persistedBroadcast; } export async function listBroadcasts( ctx: NotificationProcedureContext, input: z.infer, ) { return ctx.db.notificationBroadcast.findMany({ orderBy: { createdAt: "desc" }, take: input.limit, include: { sender: { select: { id: true, name: true, email: true } }, }, }); } export async function getBroadcastById( ctx: NotificationProcedureContext, input: z.infer, ) { return findUniqueOrThrow( ctx.db.notificationBroadcast.findUnique({ where: { id: input.id }, include: { sender: { select: { id: true, name: true, email: true } }, }, }), "Broadcast", ); }