fix(api): harden broadcast transactions and estimate fallbacks

This commit is contained in:
2026-03-30 12:18:10 +02:00
parent c82a146f84
commit 649c8feb22
4 changed files with 580 additions and 62 deletions
+137 -57
View File
@@ -63,26 +63,92 @@ async function sendNotificationEmail(
}
}
function rethrowNotificationReferenceError(error: unknown): never {
const candidate = error as {
function getNotificationErrorCandidates(error: unknown): Array<{
code?: unknown;
message?: unknown;
meta?: { field_name?: unknown; modelName?: unknown };
cause?: unknown;
}> {
const queue: unknown[] = [error];
const seen = new Set<unknown>();
const candidates: Array<{
code?: unknown;
message?: unknown;
meta?: { field_name?: unknown };
};
const fieldName = typeof candidate.meta?.field_name === "string"
? candidate.meta.field_name.toLowerCase()
: "";
meta?: { field_name?: unknown; modelName?: unknown };
cause?: unknown;
}> = [];
if (
typeof candidate.code === "string"
&& (candidate.code === "P2003" || candidate.code === "P2025")
&& fieldName.includes("sender")
) {
throw new TRPCError({
code: "NOT_FOUND",
message: "Sender user not found",
cause: error,
});
while (queue.length > 0) {
const current = queue.shift();
if (!current || seen.has(current) || typeof current !== "object") {
continue;
}
seen.add(current);
const candidate = current as {
code?: unknown;
message?: unknown;
meta?: { field_name?: unknown; modelName?: unknown };
cause?: unknown;
shape?: { message?: unknown; data?: { cause?: unknown } };
};
candidates.push(candidate);
if ("cause" in candidate) {
queue.push(candidate.cause);
}
if (candidate.shape?.data?.cause) {
queue.push(candidate.shape.data.cause);
}
}
return candidates;
}
function rethrowNotificationReferenceError(error: unknown): never {
for (const candidate of getNotificationErrorCandidates(error)) {
const fieldName = typeof candidate.meta?.field_name === "string"
? candidate.meta.field_name.toLowerCase()
: "";
const modelName = typeof candidate.meta?.modelName === "string"
? candidate.meta.modelName.toLowerCase()
: "";
if (
typeof candidate.code === "string"
&& (candidate.code === "P2003" || candidate.code === "P2025")
&& fieldName.includes("sender")
) {
throw new TRPCError({
code: "NOT_FOUND",
message: "Sender user not found",
cause: error,
});
}
if (
typeof candidate.code === "string"
&& (candidate.code === "P2003" || candidate.code === "P2025")
&& fieldName.includes("userid")
) {
throw new TRPCError({
code: "NOT_FOUND",
message: "Broadcast recipient user not found",
cause: error,
});
}
if (
typeof candidate.code === "string"
&& (candidate.code === "P2003" || candidate.code === "P2025")
&& (modelName.includes("notificationbroadcast") || fieldName.includes("broadcast"))
) {
throw new TRPCError({
code: "NOT_FOUND",
message: "Notification broadcast not found",
cause: error,
});
}
}
throw error;
@@ -649,9 +715,9 @@ export const notificationRouter = createTRPCRouter({
});
}
let broadcast;
try {
broadcast = await ctx.db.notificationBroadcast.create({
const isTask = input.category === "TASK" || input.category === "APPROVAL";
const runImmediateBroadcast = async (db: typeof ctx.db) => {
const broadcast = await db.notificationBroadcast.create({
data: {
senderId,
title: input.title,
@@ -665,58 +731,72 @@ export const notificationRouter = createTRPCRouter({
...(input.scheduledAt !== undefined ? { scheduledAt: input.scheduledAt } : {}),
},
});
const createdNotificationIds: Array<{ id: string; userId: string }> = [];
for (const recipientUserId of recipientIds) {
const nId = await createNotification({
db,
userId: recipientUserId,
type: `BROADCAST_${input.category}`,
title: input.title,
body: input.body,
link: input.link,
category: input.category,
priority: input.priority,
channel: input.channel,
sourceId: broadcast.id,
senderId,
taskStatus: isTask ? "OPEN" : undefined,
taskAction: input.taskAction,
dueDate: input.dueDate,
emit: false,
});
createdNotificationIds.push({ id: nId, userId: recipientUserId });
}
const updatedBroadcast = await db.notificationBroadcast.update({
where: { id: broadcast.id },
data: {
sentAt,
recipientCount: createdNotificationIds.length,
},
});
return { broadcast: updatedBroadcast, notificationIds: createdNotificationIds };
};
let persistedBroadcast: Awaited<ReturnType<typeof runImmediateBroadcast>>["broadcast"];
let notificationIds: Array<{ id: string; userId: string }> = [];
const sentAt = new Date();
try {
const transactionResult = typeof ctx.db.$transaction === "function"
? await ctx.db.$transaction((tx) => runImmediateBroadcast(tx as typeof ctx.db))
: await runImmediateBroadcast(ctx.db);
persistedBroadcast = transactionResult.broadcast;
notificationIds = transactionResult.notificationIds;
} catch (error) {
rethrowNotificationReferenceError(error);
}
// 4. Create individual notifications for each recipient
const isTask = input.category === "TASK" || input.category === "APPROVAL";
// SSE emit handled by createNotification; task events need separate emit
const notificationIds: Array<{ id: string; userId: string }> = [];
for (const recipientUserId of recipientIds) {
const nId = await createNotification({
db: ctx.db,
userId: recipientUserId,
type: `BROADCAST_${input.category}`,
title: input.title,
body: input.body,
link: input.link,
category: input.category,
priority: input.priority,
channel: input.channel,
sourceId: broadcast.id,
senderId,
taskStatus: isTask ? "OPEN" : undefined,
taskAction: input.taskAction,
dueDate: input.dueDate,
});
notificationIds.push({ id: nId, userId: recipientUserId });
// 5. Emit side effects only after the transaction commits.
for (const notification of notificationIds) {
emitNotificationCreated(notification.userId, notification.id);
if (isTask) {
emitTaskAssigned(recipientUserId, nId);
emitTaskAssigned(notification.userId, notification.id);
}
}
// 5. Update broadcast with sent info
await ctx.db.notificationBroadcast.update({
where: { id: broadcast.id },
data: {
sentAt: new Date(),
recipientCount: notificationIds.length,
},
});
emitBroadcastSent(persistedBroadcast.id, notificationIds.length);
// 6. Broadcast-level SSE event
emitBroadcastSent(broadcast.id, notificationIds.length);
// 7. Send emails if channel includes email (non-blocking)
// 6. Send emails if channel includes email (non-blocking)
if (input.channel === "email" || input.channel === "both") {
for (const n of notificationIds) {
void sendNotificationEmail(ctx.db, n.userId, input.title, input.body);
}
}
return { ...broadcast, recipientCount: notificationIds.length, sentAt: new Date() };
return persistedBroadcast;
}),
/** List broadcasts */