refactor(api): split notification procedure support
This commit is contained in:
@@ -0,0 +1,222 @@
|
||||
import { TRPCError } from "@trpc/server";
|
||||
import { z } from "zod";
|
||||
import { findUniqueOrThrow } from "../db/helpers.js";
|
||||
import { createNotification } from "../lib/create-notification.js";
|
||||
import { resolveRecipients } from "../lib/notification-targeting.js";
|
||||
import {
|
||||
emitNotificationCreated,
|
||||
emitTaskAssigned,
|
||||
} from "../sse/event-bus.js";
|
||||
import {
|
||||
type BroadcastPersistenceDb,
|
||||
type BroadcastRecipientNotification,
|
||||
CreateBroadcastInputSchema,
|
||||
ListBroadcastsInputSchema,
|
||||
type NotificationProcedureContext,
|
||||
NotificationIdInputSchema,
|
||||
requireNotificationDbUser,
|
||||
rethrowNotificationReferenceError,
|
||||
SCHEDULED_TASK_BROADCAST_UNSUPPORTED_MESSAGE,
|
||||
sendNotificationEmail,
|
||||
} from "./notification-procedure-base.js";
|
||||
|
||||
function isFutureScheduledBroadcast(input: z.infer<typeof CreateBroadcastInputSchema>): boolean {
|
||||
return Boolean(input.scheduledAt && input.scheduledAt > new Date());
|
||||
}
|
||||
|
||||
function hasTaskLikeBroadcastMetadata(input: z.infer<typeof CreateBroadcastInputSchema>): boolean {
|
||||
return input.category === "TASK"
|
||||
|| input.category === "APPROVAL"
|
||||
|| input.taskAction !== undefined
|
||||
|| input.dueDate !== undefined;
|
||||
}
|
||||
|
||||
function buildBroadcastCreateData(
|
||||
senderId: string,
|
||||
input: z.infer<typeof CreateBroadcastInputSchema>,
|
||||
recipientCount?: number,
|
||||
) {
|
||||
return {
|
||||
senderId,
|
||||
title: input.title,
|
||||
...(input.body !== undefined ? { body: input.body } : {}),
|
||||
...(input.link !== undefined ? { link: input.link } : {}),
|
||||
category: input.category,
|
||||
priority: input.priority,
|
||||
channel: input.channel,
|
||||
targetType: input.targetType,
|
||||
...(input.targetValue !== undefined ? { targetValue: input.targetValue } : {}),
|
||||
...(input.scheduledAt !== undefined ? { scheduledAt: input.scheduledAt } : {}),
|
||||
...(recipientCount !== undefined ? { recipientCount } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
async function resolveBroadcastRecipientIds(
|
||||
ctx: NotificationProcedureContext,
|
||||
input: z.infer<typeof CreateBroadcastInputSchema>,
|
||||
senderId: string,
|
||||
): Promise<string[]> {
|
||||
const recipientIds = await resolveRecipients(
|
||||
input.targetType,
|
||||
input.targetValue,
|
||||
ctx.db,
|
||||
senderId,
|
||||
);
|
||||
|
||||
if (recipientIds.length === 0) {
|
||||
throw new TRPCError({
|
||||
code: "BAD_REQUEST",
|
||||
message: "No recipients matched the broadcast target.",
|
||||
});
|
||||
}
|
||||
|
||||
return recipientIds;
|
||||
}
|
||||
|
||||
async function createScheduledBroadcastRecord(
|
||||
db: BroadcastPersistenceDb,
|
||||
senderId: string,
|
||||
input: z.infer<typeof CreateBroadcastInputSchema>,
|
||||
recipientIds: string[],
|
||||
) {
|
||||
return db.notificationBroadcast.create({
|
||||
data: buildBroadcastCreateData(senderId, input, recipientIds.length),
|
||||
});
|
||||
}
|
||||
|
||||
async function persistImmediateBroadcast(
|
||||
db: BroadcastPersistenceDb,
|
||||
senderId: string,
|
||||
input: z.infer<typeof CreateBroadcastInputSchema>,
|
||||
recipientIds: string[],
|
||||
) {
|
||||
const isTask = input.category === "TASK" || input.category === "APPROVAL";
|
||||
const sentAt = new Date();
|
||||
const broadcast = await db.notificationBroadcast.create({
|
||||
data: buildBroadcastCreateData(senderId, input),
|
||||
});
|
||||
|
||||
const notificationIds: BroadcastRecipientNotification[] = [];
|
||||
for (const recipientUserId of recipientIds) {
|
||||
const notificationId = await createNotification({
|
||||
db,
|
||||
userId: recipientUserId,
|
||||
type: `BROADCAST_${input.category}`,
|
||||
title: input.title,
|
||||
body: input.body,
|
||||
link: input.link,
|
||||
category: input.category,
|
||||
priority: input.priority,
|
||||
channel: input.channel,
|
||||
sourceId: broadcast.id,
|
||||
senderId,
|
||||
taskStatus: isTask ? "OPEN" : undefined,
|
||||
taskAction: input.taskAction,
|
||||
dueDate: input.dueDate,
|
||||
emit: false,
|
||||
});
|
||||
notificationIds.push({ id: notificationId, userId: recipientUserId });
|
||||
}
|
||||
|
||||
const updatedBroadcast = await db.notificationBroadcast.update({
|
||||
where: { id: broadcast.id },
|
||||
data: {
|
||||
sentAt,
|
||||
recipientCount: notificationIds.length,
|
||||
},
|
||||
});
|
||||
|
||||
return { broadcast: updatedBroadcast, notificationIds };
|
||||
}
|
||||
|
||||
function emitImmediateBroadcastSideEffects(
|
||||
db: BroadcastPersistenceDb,
|
||||
input: z.infer<typeof CreateBroadcastInputSchema>,
|
||||
notificationIds: BroadcastRecipientNotification[],
|
||||
) {
|
||||
const isTask = input.category === "TASK" || input.category === "APPROVAL";
|
||||
|
||||
for (const notification of notificationIds) {
|
||||
emitNotificationCreated(notification.userId, notification.id);
|
||||
if (isTask) {
|
||||
emitTaskAssigned(notification.userId, notification.id);
|
||||
}
|
||||
}
|
||||
|
||||
if (input.channel === "email" || input.channel === "both") {
|
||||
for (const notification of notificationIds) {
|
||||
void sendNotificationEmail(db, notification.userId, input.title, input.body);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export async function createBroadcast(
|
||||
ctx: NotificationProcedureContext,
|
||||
input: z.infer<typeof CreateBroadcastInputSchema>,
|
||||
) {
|
||||
const senderId = requireNotificationDbUser(ctx).id;
|
||||
const isScheduledFutureBroadcast = isFutureScheduledBroadcast(input);
|
||||
|
||||
if (isScheduledFutureBroadcast && hasTaskLikeBroadcastMetadata(input)) {
|
||||
throw new TRPCError({
|
||||
code: "BAD_REQUEST",
|
||||
message: SCHEDULED_TASK_BROADCAST_UNSUPPORTED_MESSAGE,
|
||||
});
|
||||
}
|
||||
|
||||
const recipientIds = await resolveBroadcastRecipientIds(ctx, input, senderId);
|
||||
|
||||
if (isScheduledFutureBroadcast) {
|
||||
try {
|
||||
return await createScheduledBroadcastRecord(ctx.db, senderId, input, recipientIds);
|
||||
} catch (error) {
|
||||
rethrowNotificationReferenceError(error);
|
||||
}
|
||||
}
|
||||
|
||||
let persistedBroadcast: Awaited<ReturnType<typeof persistImmediateBroadcast>>["broadcast"];
|
||||
let notificationIds: BroadcastRecipientNotification[] = [];
|
||||
|
||||
try {
|
||||
const transactionResult = typeof ctx.db.$transaction === "function"
|
||||
? await ctx.db.$transaction((tx) =>
|
||||
persistImmediateBroadcast(tx as typeof ctx.db, senderId, input, recipientIds))
|
||||
: await persistImmediateBroadcast(ctx.db, senderId, input, recipientIds);
|
||||
|
||||
persistedBroadcast = transactionResult.broadcast;
|
||||
notificationIds = transactionResult.notificationIds;
|
||||
} catch (error) {
|
||||
rethrowNotificationReferenceError(error);
|
||||
}
|
||||
|
||||
emitImmediateBroadcastSideEffects(ctx.db, input, notificationIds);
|
||||
return persistedBroadcast;
|
||||
}
|
||||
|
||||
export async function listBroadcasts(
|
||||
ctx: NotificationProcedureContext,
|
||||
input: z.infer<typeof ListBroadcastsInputSchema>,
|
||||
) {
|
||||
return ctx.db.notificationBroadcast.findMany({
|
||||
orderBy: { createdAt: "desc" },
|
||||
take: input.limit,
|
||||
include: {
|
||||
sender: { select: { id: true, name: true, email: true } },
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
export async function getBroadcastById(
|
||||
ctx: NotificationProcedureContext,
|
||||
input: z.infer<typeof NotificationIdInputSchema>,
|
||||
) {
|
||||
return findUniqueOrThrow(
|
||||
ctx.db.notificationBroadcast.findUnique({
|
||||
where: { id: input.id },
|
||||
include: {
|
||||
sender: { select: { id: true, name: true, email: true } },
|
||||
},
|
||||
}),
|
||||
"Broadcast",
|
||||
);
|
||||
}
|
||||
Reference in New Issue
Block a user