refactor(api): modularize assistant router workflow
This commit is contained in:
@@ -0,0 +1,284 @@
|
||||
import { AssistantApprovalStatus, Prisma, type PrismaClient } from "@capakraken/db";
|
||||
import { logger } from "../lib/logger.js";
|
||||
import { buildApprovalSummary } from "./assistant-confirmation.js";
|
||||
|
||||
const PENDING_APPROVAL_TTL_MS = 15 * 60 * 1000;
|
||||
const ASSISTANT_APPROVALS_TABLE_NAME = "public.assistant_approvals";
|
||||
let hasLoggedAssistantApprovalStorageUnavailable = false;
|
||||
|
||||
export type AssistantApprovalStore = Pick<PrismaClient, "assistantApproval">;
|
||||
|
||||
export class AssistantApprovalStorageUnavailableError extends Error {
|
||||
constructor() {
|
||||
super("Assistant approval storage is unavailable.");
|
||||
this.name = "AssistantApprovalStorageUnavailableError";
|
||||
}
|
||||
}
|
||||
|
||||
export interface PendingAssistantApproval {
|
||||
id: string;
|
||||
userId: string;
|
||||
conversationId: string;
|
||||
toolName: string;
|
||||
toolArguments: string;
|
||||
summary: string;
|
||||
createdAt: number;
|
||||
expiresAt: number;
|
||||
}
|
||||
|
||||
export interface AssistantApprovalPayload {
|
||||
id: string;
|
||||
status: "pending" | "approved" | "cancelled";
|
||||
conversationId: string;
|
||||
toolName: string;
|
||||
summary: string;
|
||||
createdAt: string;
|
||||
expiresAt: string;
|
||||
}
|
||||
|
||||
type AssistantApprovalRecord = {
|
||||
id: string;
|
||||
userId: string;
|
||||
conversationId: string;
|
||||
toolName: string;
|
||||
toolArguments: string;
|
||||
summary: string;
|
||||
createdAt: Date;
|
||||
expiresAt: Date;
|
||||
};
|
||||
|
||||
function mapPendingApproval(record: AssistantApprovalRecord): PendingAssistantApproval {
|
||||
return {
|
||||
id: record.id,
|
||||
userId: record.userId,
|
||||
conversationId: record.conversationId,
|
||||
toolName: record.toolName,
|
||||
toolArguments: record.toolArguments,
|
||||
summary: record.summary,
|
||||
createdAt: record.createdAt.getTime(),
|
||||
expiresAt: record.expiresAt.getTime(),
|
||||
};
|
||||
}
|
||||
|
||||
export function toApprovalPayload(
|
||||
approval: PendingAssistantApproval,
|
||||
status: AssistantApprovalPayload["status"],
|
||||
): AssistantApprovalPayload {
|
||||
return {
|
||||
id: approval.id,
|
||||
status,
|
||||
conversationId: approval.conversationId,
|
||||
toolName: approval.toolName,
|
||||
summary: approval.summary,
|
||||
createdAt: new Date(approval.createdAt).toISOString(),
|
||||
expiresAt: new Date(approval.expiresAt).toISOString(),
|
||||
};
|
||||
}
|
||||
|
||||
function isAssistantApprovalTableMissingError(error: unknown): boolean {
|
||||
if (error instanceof Prisma.PrismaClientKnownRequestError) {
|
||||
if (error.code !== "P2021") return false;
|
||||
const table = typeof error.meta?.table === "string" ? error.meta.table : "";
|
||||
return table.includes("assistant_approvals") || error.message.includes("assistant_approvals");
|
||||
}
|
||||
|
||||
if (typeof error !== "object" || error === null || !("code" in error)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const candidate = error as {
|
||||
code?: unknown;
|
||||
message?: unknown;
|
||||
meta?: {
|
||||
table?: unknown;
|
||||
};
|
||||
};
|
||||
const code = typeof candidate.code === "string" ? candidate.code : "";
|
||||
if (code !== "P2021") return false;
|
||||
|
||||
const message = typeof candidate.message === "string"
|
||||
? candidate.message
|
||||
: "";
|
||||
const metaTable = typeof candidate.meta?.table === "string"
|
||||
? candidate.meta.table
|
||||
: "";
|
||||
|
||||
return metaTable.includes("assistant_approvals") || message.includes("assistant_approvals");
|
||||
}
|
||||
|
||||
function logAssistantApprovalStorageUnavailable(error: unknown) {
|
||||
if (hasLoggedAssistantApprovalStorageUnavailable) {
|
||||
return;
|
||||
}
|
||||
hasLoggedAssistantApprovalStorageUnavailable = true;
|
||||
logger.warn(
|
||||
{
|
||||
err: error,
|
||||
table: ASSISTANT_APPROVALS_TABLE_NAME,
|
||||
},
|
||||
"Assistant approval storage is unavailable",
|
||||
);
|
||||
}
|
||||
|
||||
async function withAssistantApprovalFallback<T>(
|
||||
operation: () => Promise<T>,
|
||||
fallback: () => T,
|
||||
): Promise<T> {
|
||||
try {
|
||||
return await operation();
|
||||
} catch (error) {
|
||||
if (!isAssistantApprovalTableMissingError(error)) throw error;
|
||||
logAssistantApprovalStorageUnavailable(error);
|
||||
return fallback();
|
||||
}
|
||||
}
|
||||
|
||||
export function resetAssistantApprovalStorageWarningStateForTests(): void {
|
||||
hasLoggedAssistantApprovalStorageUnavailable = false;
|
||||
}
|
||||
|
||||
export async function listPendingAssistantApprovals(
|
||||
db: AssistantApprovalStore,
|
||||
userId: string,
|
||||
): Promise<PendingAssistantApproval[]> {
|
||||
return withAssistantApprovalFallback(async () => {
|
||||
await db.assistantApproval.updateMany({
|
||||
where: {
|
||||
userId,
|
||||
status: AssistantApprovalStatus.PENDING,
|
||||
expiresAt: { lte: new Date() },
|
||||
},
|
||||
data: {
|
||||
status: AssistantApprovalStatus.EXPIRED,
|
||||
},
|
||||
});
|
||||
|
||||
const approvals = await db.assistantApproval.findMany({
|
||||
where: {
|
||||
userId,
|
||||
status: AssistantApprovalStatus.PENDING,
|
||||
expiresAt: { gt: new Date() },
|
||||
},
|
||||
orderBy: { createdAt: "desc" },
|
||||
});
|
||||
|
||||
return approvals.map(mapPendingApproval);
|
||||
}, () => []);
|
||||
}
|
||||
|
||||
export async function clearPendingAssistantApproval(
|
||||
db: AssistantApprovalStore,
|
||||
userId: string,
|
||||
conversationId: string,
|
||||
): Promise<void> {
|
||||
await withAssistantApprovalFallback(async () => {
|
||||
await db.assistantApproval.updateMany({
|
||||
where: {
|
||||
userId,
|
||||
conversationId,
|
||||
status: AssistantApprovalStatus.PENDING,
|
||||
},
|
||||
data: {
|
||||
status: AssistantApprovalStatus.CANCELLED,
|
||||
cancelledAt: new Date(),
|
||||
},
|
||||
});
|
||||
}, () => undefined);
|
||||
}
|
||||
|
||||
export async function peekPendingAssistantApproval(
|
||||
db: AssistantApprovalStore,
|
||||
userId: string,
|
||||
conversationId: string,
|
||||
): Promise<PendingAssistantApproval | null> {
|
||||
return withAssistantApprovalFallback(async () => {
|
||||
await db.assistantApproval.updateMany({
|
||||
where: {
|
||||
userId,
|
||||
conversationId,
|
||||
status: AssistantApprovalStatus.PENDING,
|
||||
expiresAt: { lte: new Date() },
|
||||
},
|
||||
data: {
|
||||
status: AssistantApprovalStatus.EXPIRED,
|
||||
},
|
||||
});
|
||||
|
||||
const pending = await db.assistantApproval.findFirst({
|
||||
where: {
|
||||
userId,
|
||||
conversationId,
|
||||
status: AssistantApprovalStatus.PENDING,
|
||||
},
|
||||
orderBy: { createdAt: "desc" },
|
||||
});
|
||||
if (!pending) return null;
|
||||
return mapPendingApproval(pending);
|
||||
}, () => null);
|
||||
}
|
||||
|
||||
export async function consumePendingAssistantApproval(
|
||||
db: AssistantApprovalStore,
|
||||
userId: string,
|
||||
conversationId: string,
|
||||
): Promise<PendingAssistantApproval | null> {
|
||||
const pending = await peekPendingAssistantApproval(db, userId, conversationId);
|
||||
if (!pending) return null;
|
||||
const approvedAt = new Date();
|
||||
const updateResult = await db.assistantApproval.updateMany({
|
||||
where: {
|
||||
id: pending.id,
|
||||
userId,
|
||||
conversationId,
|
||||
status: AssistantApprovalStatus.PENDING,
|
||||
expiresAt: { gt: approvedAt },
|
||||
},
|
||||
data: {
|
||||
status: AssistantApprovalStatus.APPROVED,
|
||||
approvedAt,
|
||||
},
|
||||
});
|
||||
if (updateResult.count === 0) return null;
|
||||
|
||||
const approved = await db.assistantApproval.findFirst({
|
||||
where: {
|
||||
id: pending.id,
|
||||
userId,
|
||||
conversationId,
|
||||
},
|
||||
});
|
||||
if (!approved) return null;
|
||||
return mapPendingApproval(approved);
|
||||
}
|
||||
|
||||
export async function createPendingAssistantApproval(
|
||||
db: AssistantApprovalStore,
|
||||
userId: string,
|
||||
conversationId: string,
|
||||
toolName: string,
|
||||
toolArguments: string,
|
||||
options?: { summary?: string; ttlMs?: number },
|
||||
): Promise<PendingAssistantApproval> {
|
||||
const now = new Date();
|
||||
const expiresAt = new Date(now.getTime() + (options?.ttlMs ?? PENDING_APPROVAL_TTL_MS));
|
||||
const summary = options?.summary ?? buildApprovalSummary(toolName, toolArguments);
|
||||
try {
|
||||
await clearPendingAssistantApproval(db, userId, conversationId);
|
||||
const pendingApproval = await db.assistantApproval.create({
|
||||
data: {
|
||||
userId,
|
||||
conversationId,
|
||||
toolName,
|
||||
toolArguments,
|
||||
summary,
|
||||
createdAt: now,
|
||||
expiresAt,
|
||||
},
|
||||
});
|
||||
return mapPendingApproval(pendingApproval);
|
||||
} catch (error) {
|
||||
if (!isAssistantApprovalTableMissingError(error)) throw error;
|
||||
logAssistantApprovalStorageUnavailable(error);
|
||||
throw new AssistantApprovalStorageUnavailableError();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user