From 649c8feb22bc816ae8e9ab5347e07ecd4e9c7fb5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hartmut=20N=C3=B6renberg?= Date: Mon, 30 Mar 2026 12:18:10 +0200 Subject: [PATCH] fix(api): harden broadcast transactions and estimate fallbacks --- .../assistant-tools-import-export.test.ts | 269 ++++++++++++++++++ .../src/__tests__/notification-router.test.ts | 125 ++++++++ packages/api/src/router/assistant-tools.ts | 54 +++- packages/api/src/router/notification.ts | 194 +++++++++---- 4 files changed, 580 insertions(+), 62 deletions(-) diff --git a/packages/api/src/__tests__/assistant-tools-import-export.test.ts b/packages/api/src/__tests__/assistant-tools-import-export.test.ts index f152ba9..0118d85 100644 --- a/packages/api/src/__tests__/assistant-tools-import-export.test.ts +++ b/packages/api/src/__tests__/assistant-tools-import-export.test.ts @@ -2231,6 +2231,59 @@ describe("assistant import/export and dispo tools", () => { }); }); + it("returns a stable assistant error when broadcast finalization loses the broadcast row", async () => { + const txCreateBroadcast = vi.fn().mockResolvedValue({ + id: "broadcast_missing_after_create", + title: "Office update", + targetType: "all", + }); + const txCreateNotification = vi.fn().mockResolvedValue({ id: "notification_2", userId: "user_2" }); + const txUpdateBroadcast = vi.fn().mockRejectedValue( + Object.assign(new Error("Record to update not found"), { + code: "P2025", + meta: { modelName: "NotificationBroadcast" }, + }), + ); + const tx = { + notificationBroadcast: { + create: txCreateBroadcast, + update: txUpdateBroadcast, + }, + notification: { + create: txCreateNotification, + }, + }; + const ctx = createToolContext( + { + user: { + findMany: vi.fn().mockResolvedValue([{ id: "user_2" }]), + }, + $transaction: vi.fn(async (callback: (db: typeof tx) => Promise) => callback(tx)), + notificationBroadcast: { + create: vi.fn(), + update: vi.fn(), + }, + notification: { + create: vi.fn(), + }, + }, + { userRole: SystemRole.MANAGER }, + ); + + const result = await executeTool( + "send_broadcast", + JSON.stringify({ + title: "Office update", + targetType: "all", + }), + ctx, + ); + + expect(JSON.parse(result.content)).toEqual({ + error: "Broadcast not found with the given criteria.", + }); + }); + it("reads broadcast details through the real notification router and rejects plain users", async () => { const db = { notificationBroadcast: { @@ -2830,6 +2883,84 @@ describe("assistant import/export and dispo tools", () => { ), expected: "Estimate scope item not found with the given criteria.", }, + { + name: "update_estimate_draft missing project reference", + toolName: "update_estimate_draft", + payload: { + id: "est_project_missing", + baseCurrency: "EUR", + assumptions: [], + scopeItems: [], + demandLines: [], + resourceSnapshots: [], + metrics: [], + }, + permission: PermissionKey.MANAGE_PROJECTS, + db: { + estimate: { + findUnique: vi.fn().mockResolvedValue({ projectId: null }), + }, + }, + setup: () => vi.mocked(updateEstimateDraft).mockRejectedValueOnce( + Object.assign(new Error("Foreign key constraint failed"), { + code: "P2003", + meta: { field_name: "Estimate_projectId_fkey" }, + }), + ), + expected: "Project not found with the given criteria.", + }, + { + name: "update_estimate_draft missing role reference", + toolName: "update_estimate_draft", + payload: { + id: "est_role_missing", + baseCurrency: "EUR", + assumptions: [], + scopeItems: [], + demandLines: [], + resourceSnapshots: [], + metrics: [], + }, + permission: PermissionKey.MANAGE_PROJECTS, + db: { + estimate: { + findUnique: vi.fn().mockResolvedValue({ projectId: null }), + }, + }, + setup: () => vi.mocked(updateEstimateDraft).mockRejectedValueOnce( + Object.assign(new Error("Foreign key constraint failed"), { + code: "P2003", + meta: { field_name: "EstimateDemandLine_roleId_fkey" }, + }), + ), + expected: "Role not found with the given criteria.", + }, + { + name: "update_estimate_draft missing resource reference", + toolName: "update_estimate_draft", + payload: { + id: "est_resource_missing", + baseCurrency: "EUR", + assumptions: [], + scopeItems: [], + demandLines: [], + resourceSnapshots: [], + metrics: [], + }, + permission: PermissionKey.MANAGE_PROJECTS, + db: { + estimate: { + findUnique: vi.fn().mockResolvedValue({ projectId: null }), + }, + }, + setup: () => vi.mocked(updateEstimateDraft).mockRejectedValueOnce( + Object.assign(new Error("Foreign key constraint failed"), { + code: "P2003", + meta: { field_name: "EstimateDemandLine_resourceId_fkey" }, + }), + ), + expected: "Resource not found with the given criteria.", + }, { name: "update_estimate_draft generic missing estimate reference", toolName: "update_estimate_draft", @@ -2864,6 +2995,19 @@ describe("assistant import/export and dispo tools", () => { setup: () => vi.mocked(submitEstimateVersion).mockRejectedValueOnce(new Error("Estimate version not found")), expected: "Estimate version not found with the given criteria.", }, + { + name: "submit_estimate_version deleted version race", + toolName: "submit_estimate_version", + payload: { estimateId: "est_1", versionId: "ver_deleted" }, + permission: PermissionKey.MANAGE_PROJECTS, + setup: () => vi.mocked(submitEstimateVersion).mockRejectedValueOnce( + Object.assign(new Error("Record to update not found"), { + code: "P2025", + meta: { modelName: "EstimateVersion" }, + }), + ), + expected: "Estimate version not found with the given criteria.", + }, { name: "submit_estimate_version without working version", toolName: "submit_estimate_version", @@ -2888,6 +3032,19 @@ describe("assistant import/export and dispo tools", () => { setup: () => vi.mocked(approveEstimateVersion).mockRejectedValueOnce(new Error("Estimate version not found")), expected: "Estimate version not found with the given criteria.", }, + { + name: "approve_estimate_version deleted version race", + toolName: "approve_estimate_version", + payload: { estimateId: "est_1", versionId: "ver_deleted" }, + permission: PermissionKey.MANAGE_PROJECTS, + setup: () => vi.mocked(approveEstimateVersion).mockRejectedValueOnce( + Object.assign(new Error("Record to update not found"), { + code: "P2025", + meta: { modelName: "EstimateVersion" }, + }), + ), + expected: "Estimate version not found with the given criteria.", + }, { name: "approve_estimate_version without submitted version", toolName: "approve_estimate_version", @@ -2912,6 +3069,19 @@ describe("assistant import/export and dispo tools", () => { setup: () => vi.mocked(createEstimateRevision).mockRejectedValueOnce(new Error("Estimate already has a working version")), expected: "Estimate already has a working version.", }, + { + name: "create_estimate_revision deleted source version race", + toolName: "create_estimate_revision", + payload: { estimateId: "est_1", sourceVersionId: "ver_deleted" }, + permission: PermissionKey.MANAGE_PROJECTS, + setup: () => vi.mocked(createEstimateRevision).mockRejectedValueOnce( + Object.assign(new Error("Record to update not found"), { + code: "P2025", + meta: { modelName: "EstimateVersion" }, + }), + ), + expected: "Estimate version not found with the given criteria.", + }, { name: "create_estimate_revision with unlocked source version", toolName: "create_estimate_revision", @@ -2936,6 +3106,19 @@ describe("assistant import/export and dispo tools", () => { setup: () => vi.mocked(createEstimateExport).mockRejectedValueOnce(new Error("Estimate version not found")), expected: "Estimate version not found with the given criteria.", }, + { + name: "create_estimate_export deleted version race", + toolName: "create_estimate_export", + payload: { estimateId: "est_1", versionId: "ver_deleted", format: "XLSX" }, + permission: PermissionKey.MANAGE_PROJECTS, + setup: () => vi.mocked(createEstimateExport).mockRejectedValueOnce( + Object.assign(new Error("Record to update not found"), { + code: "P2025", + meta: { modelName: "EstimateVersion" }, + }), + ), + expected: "Estimate version not found with the given criteria.", + }, { name: "create_estimate_planning_handoff with missing linked project", toolName: "create_estimate_planning_handoff", @@ -3151,6 +3334,92 @@ describe("assistant import/export and dispo tools", () => { }); }); + it("returns a stable assistant error when commercial term persistence loses the working version", async () => { + const ctx = createToolContext( + { + estimate: { + findUnique: vi.fn().mockResolvedValue({ + id: "est_1", + versions: [{ id: "ver_working", status: "WORKING" }], + }), + }, + estimateVersion: { + update: vi.fn().mockRejectedValue({ + code: "P2025", + message: "Record to update not found", + meta: { modelName: "EstimateVersion" }, + }), + }, + }, + { + userRole: SystemRole.MANAGER, + permissions: [PermissionKey.MANAGE_PROJECTS], + }, + ); + + const result = await executeTool( + "update_estimate_commercial_terms", + JSON.stringify({ + estimateId: "est_1", + terms: {}, + }), + ctx, + ); + + expect(JSON.parse(result.content)).toEqual({ + error: "Estimate version not found with the given criteria.", + }); + }); + + it("returns stable assistant errors for estimate weekly phasing and commercial term reads", async () => { + vi.mocked(getEstimateById).mockResolvedValueOnce(null as never); + + const missingEstimateCtx = createToolContext({}, { + userRole: SystemRole.CONTROLLER, + }); + const missingEstimateResult = await executeTool( + "get_estimate_weekly_phasing", + JSON.stringify({ estimateId: "est_missing" }), + missingEstimateCtx, + ); + expect(JSON.parse(missingEstimateResult.content)).toEqual({ + error: "Estimate not found with the given criteria.", + }); + + vi.mocked(getEstimateById).mockResolvedValueOnce({ + id: "est_empty", + name: "Estimate Empty", + versions: [], + } as Awaited>); + const noVersionResult = await executeTool( + "get_estimate_weekly_phasing", + JSON.stringify({ estimateId: "est_empty" }), + missingEstimateCtx, + ); + expect(JSON.parse(noVersionResult.content)).toEqual({ + error: "Estimate version not found with the given criteria.", + }); + + const missingCommercialTermsCtx = createToolContext( + { + estimate: { + findUnique: vi.fn().mockResolvedValue(null), + }, + }, + { + userRole: SystemRole.CONTROLLER, + }, + ); + const missingCommercialTermsResult = await executeTool( + "get_estimate_commercial_terms", + JSON.stringify({ estimateId: "est_missing" }), + missingCommercialTermsCtx, + ); + expect(JSON.parse(missingCommercialTermsResult.content)).toEqual({ + error: "Estimate version not found with the given criteria.", + }); + }); + it("reads countries through the real country router identifier path", async () => { const db = { country: { diff --git a/packages/api/src/__tests__/notification-router.test.ts b/packages/api/src/__tests__/notification-router.test.ts index d1e7220..4d07c57 100644 --- a/packages/api/src/__tests__/notification-router.test.ts +++ b/packages/api/src/__tests__/notification-router.test.ts @@ -1,6 +1,11 @@ import { SystemRole } from "@capakraken/shared"; import { beforeEach, describe, expect, it, vi } from "vitest"; import { notificationRouter } from "../router/notification.js"; +import { + emitBroadcastSent, + emitNotificationCreated, + emitTaskAssigned, +} from "../sse/event-bus.js"; import { createCallerFactory } from "../trpc.js"; const { resolveRecipientsMock } = vi.hoisted(() => ({ @@ -10,6 +15,10 @@ const { resolveRecipientsMock } = vi.hoisted(() => ({ // Mock the SSE event bus — we don't test real event emission here vi.mock("../sse/event-bus.js", () => ({ emitNotificationCreated: vi.fn(), + emitTaskAssigned: vi.fn(), + emitTaskCompleted: vi.fn(), + emitTaskStatusChanged: vi.fn(), + emitBroadcastSent: vi.fn(), })); vi.mock("../lib/notification-targeting.js", () => ({ @@ -19,6 +28,7 @@ vi.mock("../lib/notification-targeting.js", () => ({ const createCaller = createCallerFactory(notificationRouter); beforeEach(() => { + vi.clearAllMocks(); resolveRecipientsMock.mockReset(); }); @@ -315,4 +325,119 @@ describe("notification.createBroadcast", () => { "user_mgr", ); }); + + it("does not partially persist an immediate broadcast when recipient fan-out fails", async () => { + resolveRecipientsMock.mockResolvedValue(["user_a", "user_b"]); + + const txCreateBroadcast = vi.fn().mockResolvedValue({ + id: "broadcast_tx_1", + title: "Ops update", + createdAt: new Date("2026-03-30T10:00:00Z"), + }); + const txUpdateBroadcast = vi.fn(); + const txCreateNotification = vi.fn() + .mockResolvedValueOnce({ id: "notif_a", userId: "user_a" }) + .mockRejectedValueOnce(new Error("fan-out failed")); + const tx = { + notificationBroadcast: { + create: txCreateBroadcast, + update: txUpdateBroadcast, + }, + notification: { + create: txCreateNotification, + }, + }; + const outerCreateBroadcast = vi.fn(); + const outerUpdateBroadcast = vi.fn(); + const outerCreateNotification = vi.fn(); + const transaction = vi.fn(async (callback: (db: typeof tx) => Promise) => callback(tx)); + const db = { + $transaction: transaction, + notificationBroadcast: { + create: outerCreateBroadcast, + update: outerUpdateBroadcast, + }, + notification: { + create: outerCreateNotification, + }, + }; + + const caller = createManagerCaller(db); + + await expect(caller.createBroadcast({ + title: "Ops update", + targetType: "all", + })).rejects.toThrow("fan-out failed"); + + expect(transaction).toHaveBeenCalledTimes(1); + expect(txCreateBroadcast).toHaveBeenCalledTimes(1); + expect(txCreateNotification).toHaveBeenCalledTimes(2); + expect(txUpdateBroadcast).not.toHaveBeenCalled(); + expect(outerCreateBroadcast).not.toHaveBeenCalled(); + expect(outerUpdateBroadcast).not.toHaveBeenCalled(); + expect(outerCreateNotification).not.toHaveBeenCalled(); + }); + + it("rolls back an immediate broadcast when the final broadcast update fails", async () => { + resolveRecipientsMock.mockResolvedValue(["user_a", "user_b"]); + + const txCreateBroadcast = vi.fn().mockResolvedValue({ + id: "broadcast_tx_2", + title: "Ops update", + createdAt: new Date("2026-03-30T10:00:00Z"), + }); + const txCreateNotification = vi.fn() + .mockResolvedValueOnce({ id: "notif_a", userId: "user_a" }) + .mockResolvedValueOnce({ id: "notif_b", userId: "user_b" }); + const txUpdateBroadcast = vi.fn().mockRejectedValue( + Object.assign(new Error("Record to update not found"), { + code: "P2025", + meta: { modelName: "NotificationBroadcast" }, + }), + ); + const tx = { + notificationBroadcast: { + create: txCreateBroadcast, + update: txUpdateBroadcast, + }, + notification: { + create: txCreateNotification, + }, + }; + const outerCreateBroadcast = vi.fn(); + const outerUpdateBroadcast = vi.fn(); + const outerCreateNotification = vi.fn(); + const transaction = vi.fn(async (callback: (db: typeof tx) => Promise) => callback(tx)); + const db = { + $transaction: transaction, + notificationBroadcast: { + create: outerCreateBroadcast, + update: outerUpdateBroadcast, + }, + notification: { + create: outerCreateNotification, + }, + }; + + const caller = createManagerCaller(db); + + await expect(caller.createBroadcast({ + title: "Ops update", + targetType: "all", + })).rejects.toMatchObject({ + code: "NOT_FOUND", + message: "Notification broadcast not found", + }); + + expect(transaction).toHaveBeenCalledTimes(1); + expect(txCreateBroadcast).toHaveBeenCalledTimes(1); + expect(txCreateNotification).toHaveBeenCalledTimes(2); + expect(txUpdateBroadcast).toHaveBeenCalledTimes(1); + expect(outerCreateBroadcast).not.toHaveBeenCalled(); + expect(outerUpdateBroadcast).not.toHaveBeenCalled(); + expect(outerCreateNotification).not.toHaveBeenCalled(); + expect(emitNotificationCreated).not.toHaveBeenCalled(); + expect(emitTaskAssigned).not.toHaveBeenCalled(); + expect(emitBroadcastSent).not.toHaveBeenCalled(); + }); }); diff --git a/packages/api/src/router/assistant-tools.ts b/packages/api/src/router/assistant-tools.ts index d19c306..67a0e65 100644 --- a/packages/api/src/router/assistant-tools.ts +++ b/packages/api/src/router/assistant-tools.ts @@ -622,6 +622,27 @@ function toAssistantEstimateNotFoundError( return null; } +function toAssistantEstimateReadError( + error: unknown, + context: "weeklyPhasing" | "commercialTerms", +): AssistantToolErrorResult | null { + const notFound = toAssistantEstimateNotFoundError(error); + if (notFound) { + return notFound; + } + + if ( + context === "weeklyPhasing" + && error instanceof TRPCError + && error.code === "PRECONDITION_FAILED" + && error.message === "Estimate has no versions" + ) { + return { error: "Estimate version not found with the given criteria." }; + } + + return null; +} + function toAssistantHolidayCalendarNotFoundError( error: unknown, identifier?: string, @@ -1708,6 +1729,9 @@ function toAssistantNotificationCreationError( } if (trpcError?.code === "NOT_FOUND") { + if (trpcError.message.includes("broadcast")) { + return { error: "Broadcast not found with the given criteria." }; + } if (trpcError.message.includes("Sender user not found")) { return { error: "Sender user not found with the given criteria." }; } @@ -1742,6 +1766,10 @@ function toAssistantNotificationCreationError( return { error: "Sender user not found with the given criteria." }; } + if (context === "broadcast" && (errorText.includes("notificationbroadcast") || errorText.includes("broadcast"))) { + return { error: "Broadcast not found with the given criteria." }; + } + if (context === "task") { return { error: "Task recipient user not found with the given criteria." }; } @@ -7400,7 +7428,15 @@ const executors = { estimateId: string; }, ctx: ToolContext) { const caller = createEstimateCaller(createScopedCallerContext(ctx)); - return caller.getWeeklyPhasing({ estimateId: params.estimateId }); + try { + return await caller.getWeeklyPhasing({ estimateId: params.estimateId }); + } catch (error) { + const mapped = toAssistantEstimateReadError(error, "weeklyPhasing"); + if (mapped) { + return mapped; + } + throw error; + } }, async get_estimate_commercial_terms(params: { @@ -7408,10 +7444,18 @@ const executors = { versionId?: string; }, ctx: ToolContext) { const caller = createEstimateCaller(createScopedCallerContext(ctx)); - return caller.getCommercialTerms({ - estimateId: params.estimateId, - ...(params.versionId !== undefined ? { versionId: params.versionId } : {}), - }); + try { + return await caller.getCommercialTerms({ + estimateId: params.estimateId, + ...(params.versionId !== undefined ? { versionId: params.versionId } : {}), + }); + } catch (error) { + const mapped = toAssistantEstimateReadError(error, "commercialTerms"); + if (mapped) { + return mapped; + } + throw error; + } }, async update_estimate_commercial_terms(params: { diff --git a/packages/api/src/router/notification.ts b/packages/api/src/router/notification.ts index b4cd957..3ff5378 100644 --- a/packages/api/src/router/notification.ts +++ b/packages/api/src/router/notification.ts @@ -63,26 +63,92 @@ async function sendNotificationEmail( } } -function rethrowNotificationReferenceError(error: unknown): never { - const candidate = error as { +function getNotificationErrorCandidates(error: unknown): Array<{ + code?: unknown; + message?: unknown; + meta?: { field_name?: unknown; modelName?: unknown }; + cause?: unknown; +}> { + const queue: unknown[] = [error]; + const seen = new Set(); + const candidates: Array<{ code?: unknown; message?: unknown; - meta?: { field_name?: unknown }; - }; - const fieldName = typeof candidate.meta?.field_name === "string" - ? candidate.meta.field_name.toLowerCase() - : ""; + meta?: { field_name?: unknown; modelName?: unknown }; + cause?: unknown; + }> = []; - if ( - typeof candidate.code === "string" - && (candidate.code === "P2003" || candidate.code === "P2025") - && fieldName.includes("sender") - ) { - throw new TRPCError({ - code: "NOT_FOUND", - message: "Sender user not found", - cause: error, - }); + while (queue.length > 0) { + const current = queue.shift(); + if (!current || seen.has(current) || typeof current !== "object") { + continue; + } + seen.add(current); + + const candidate = current as { + code?: unknown; + message?: unknown; + meta?: { field_name?: unknown; modelName?: unknown }; + cause?: unknown; + shape?: { message?: unknown; data?: { cause?: unknown } }; + }; + candidates.push(candidate); + + if ("cause" in candidate) { + queue.push(candidate.cause); + } + if (candidate.shape?.data?.cause) { + queue.push(candidate.shape.data.cause); + } + } + + return candidates; +} + +function rethrowNotificationReferenceError(error: unknown): never { + for (const candidate of getNotificationErrorCandidates(error)) { + const fieldName = typeof candidate.meta?.field_name === "string" + ? candidate.meta.field_name.toLowerCase() + : ""; + const modelName = typeof candidate.meta?.modelName === "string" + ? candidate.meta.modelName.toLowerCase() + : ""; + + if ( + typeof candidate.code === "string" + && (candidate.code === "P2003" || candidate.code === "P2025") + && fieldName.includes("sender") + ) { + throw new TRPCError({ + code: "NOT_FOUND", + message: "Sender user not found", + cause: error, + }); + } + + if ( + typeof candidate.code === "string" + && (candidate.code === "P2003" || candidate.code === "P2025") + && fieldName.includes("userid") + ) { + throw new TRPCError({ + code: "NOT_FOUND", + message: "Broadcast recipient user not found", + cause: error, + }); + } + + if ( + typeof candidate.code === "string" + && (candidate.code === "P2003" || candidate.code === "P2025") + && (modelName.includes("notificationbroadcast") || fieldName.includes("broadcast")) + ) { + throw new TRPCError({ + code: "NOT_FOUND", + message: "Notification broadcast not found", + cause: error, + }); + } } throw error; @@ -649,9 +715,9 @@ export const notificationRouter = createTRPCRouter({ }); } - let broadcast; - try { - broadcast = await ctx.db.notificationBroadcast.create({ + const isTask = input.category === "TASK" || input.category === "APPROVAL"; + const runImmediateBroadcast = async (db: typeof ctx.db) => { + const broadcast = await db.notificationBroadcast.create({ data: { senderId, title: input.title, @@ -665,58 +731,72 @@ export const notificationRouter = createTRPCRouter({ ...(input.scheduledAt !== undefined ? { scheduledAt: input.scheduledAt } : {}), }, }); + + const createdNotificationIds: Array<{ id: string; userId: string }> = []; + for (const recipientUserId of recipientIds) { + const nId = await createNotification({ + db, + userId: recipientUserId, + type: `BROADCAST_${input.category}`, + title: input.title, + body: input.body, + link: input.link, + category: input.category, + priority: input.priority, + channel: input.channel, + sourceId: broadcast.id, + senderId, + taskStatus: isTask ? "OPEN" : undefined, + taskAction: input.taskAction, + dueDate: input.dueDate, + emit: false, + }); + createdNotificationIds.push({ id: nId, userId: recipientUserId }); + } + + const updatedBroadcast = await db.notificationBroadcast.update({ + where: { id: broadcast.id }, + data: { + sentAt, + recipientCount: createdNotificationIds.length, + }, + }); + + return { broadcast: updatedBroadcast, notificationIds: createdNotificationIds }; + }; + + let persistedBroadcast: Awaited>["broadcast"]; + let notificationIds: Array<{ id: string; userId: string }> = []; + const sentAt = new Date(); + try { + const transactionResult = typeof ctx.db.$transaction === "function" + ? await ctx.db.$transaction((tx) => runImmediateBroadcast(tx as typeof ctx.db)) + : await runImmediateBroadcast(ctx.db); + + persistedBroadcast = transactionResult.broadcast; + notificationIds = transactionResult.notificationIds; } catch (error) { rethrowNotificationReferenceError(error); } - // 4. Create individual notifications for each recipient - const isTask = input.category === "TASK" || input.category === "APPROVAL"; - - // SSE emit handled by createNotification; task events need separate emit - const notificationIds: Array<{ id: string; userId: string }> = []; - for (const recipientUserId of recipientIds) { - const nId = await createNotification({ - db: ctx.db, - userId: recipientUserId, - type: `BROADCAST_${input.category}`, - title: input.title, - body: input.body, - link: input.link, - category: input.category, - priority: input.priority, - channel: input.channel, - sourceId: broadcast.id, - senderId, - taskStatus: isTask ? "OPEN" : undefined, - taskAction: input.taskAction, - dueDate: input.dueDate, - }); - notificationIds.push({ id: nId, userId: recipientUserId }); + // 5. Emit side effects only after the transaction commits. + for (const notification of notificationIds) { + emitNotificationCreated(notification.userId, notification.id); if (isTask) { - emitTaskAssigned(recipientUserId, nId); + emitTaskAssigned(notification.userId, notification.id); } } - // 5. Update broadcast with sent info - await ctx.db.notificationBroadcast.update({ - where: { id: broadcast.id }, - data: { - sentAt: new Date(), - recipientCount: notificationIds.length, - }, - }); + emitBroadcastSent(persistedBroadcast.id, notificationIds.length); - // 6. Broadcast-level SSE event - emitBroadcastSent(broadcast.id, notificationIds.length); - - // 7. Send emails if channel includes email (non-blocking) + // 6. Send emails if channel includes email (non-blocking) if (input.channel === "email" || input.channel === "both") { for (const n of notificationIds) { void sendNotificationEmail(ctx.db, n.userId, input.title, input.body); } } - return { ...broadcast, recipientCount: notificationIds.length, sentAt: new Date() }; + return persistedBroadcast; }), /** List broadcasts */