import { router, publicProcedure, createCallerFactory } from "./server.js"; import { generateObject, generateText, jsonSchema, streamText } from "ai"; import type { OtherParameters, CommittedMessage, DraftMessage, } from "../../types.js"; // import { client } from "../../database/milvus"; // import { // ConsistencyLevelEnum, // type NumberArrayId, // } from "@zilliz/milvus2-sdk-node"; import { conversations } from "./conversations.js"; import { messages } from "./messages.js"; import { facts, createCaller as createCallerFacts } from "./facts.js"; import { createCaller as createCallerMessages } from "./messages.js"; import { createCaller as createCallerFactTriggers } from "./fact-triggers.js"; import { factTriggers } from "./fact-triggers.js"; import { MODEL_NAME } from "../provider.js"; import type { FactTrigger } from "@database/common"; const mainSystemPrompt = ({ systemPrompt, previousRunningSummary, }: { systemPrompt: string; previousRunningSummary: string; }) => `${systemPrompt} This is a summary of the conversation so far, from your point-of-view (so "I" and "me" refer to you): ${previousRunningSummary} `; export const chat = router({ conversations, messages, facts, factTriggers, streamMessage: publicProcedure .input( (x) => x as { message: string; } ) .subscription(async function* ({ input, signal, ctx: { openrouter } }) { const result = streamText({ model: openrouter(MODEL_NAME), messages: [{ role: "user" as const, content: input.message }], abortSignal: signal, }); for await (const chunk of result.textStream) { yield chunk; } }), sendMessage: publicProcedure .input( (x) => x as { conversationId: string; messages: Array; systemPrompt: string; parameters: OtherParameters; } ) .subscription(async function* ({ input: { conversationId, messages, systemPrompt, parameters }, ctx, }) { const { dbClient, openrouter, jwt } = ctx; if (!jwt) { yield { status: "error" as const, message: "Unauthorized" }; return; } const factsCaller = createCallerFacts(ctx); const messagesCaller = createCallerMessages(ctx); const factTriggerCaller = createCallerFactTriggers(ctx); /** TODO: Save all unsaved messages (i.e. those without an `id`) to the * database. Is this dangerous? Can an attacker just send a bunch of * messages, omitting the ids, causing me to save a bunch of them to the * database? I guess it's no worse than starting new converations, which * anyone can freely do. */ const previousRunningSummaryIndex = messages.findLastIndex( (message) => typeof (message as CommittedMessage).runningSummary !== "undefined" ); const previousRunningSummary = previousRunningSummaryIndex >= 0 ? ((messages[previousRunningSummaryIndex] as CommittedMessage) .runningSummary as string) : ""; const messagesSincePreviousRunningSummary = messages.slice( previousRunningSummaryIndex + 1 ); // Emit status update yield { status: "saving_user_message", message: "Saving user message...", } as const; /** Save the incoming message to the database. */ const userMessageRowToInsert = { conversationId, // content: messages[messages.length - 1].content, // role: "user" as const, ...messages[messages.length - 1], index: messages.length - 1, createdAt: new Date().toISOString(), }; const insertedUserMessageRows = await dbClient .insertInto("messages") .values({ ...userMessageRowToInsert, parts: JSON.stringify(userMessageRowToInsert.parts), }) .returningAll() .execute(); const insertedUserMessage = insertedUserMessageRows[0] as CommittedMessage; // Emit status update yield { status: "generating_response", message: "Generating AI response...", } as const; /** Generate a new message from the model, but hold-off on adding it to * the database until we produce the associated running-summary, below. * The model should be given the conversation summary thus far, and of * course the user's latest message, unmodified. Invite the model to * create any tools it needs. The tool needs to be implemented in a * language which this system can execute; usually an interpretted * language like Python or JavaScript. */ const mainResponse = await generateText({ model: openrouter(MODEL_NAME), messages: [ previousRunningSummary === "" ? { role: "system" as const, content: systemPrompt, } : { role: "system" as const, content: mainSystemPrompt({ systemPrompt, previousRunningSummary, }), }, ...messagesSincePreviousRunningSummary.map((m) => ({ role: m.role, content: m.parts .filter((p) => p.type === "text") .map((p) => p.text) .join(""), })), ], tools: undefined, ...parameters, }); // Emit status update yield { status: "extracting_facts_from_user", message: "Extracting facts from user message...", } as const; /** Extract Facts from the user's message, and add them to the database, * linking the Facts with the messages they came from. (Yes, this should * be done *after* the model response, not before; because when we run a * query to find Facts to inject into the context sent to the model, we * don't want Facts from the user's current message to be candidates for * injection, because we're sending the user's message unadulterated to * the model; there's no reason to inject the same Facts that the model is * already using to generate its response.) */ const factsFromUserMessageResponse = await factsCaller.extractFromNewMessages({ previousRunningSummary, messagesSincePreviousRunningSummary: [], newMessages: messagesSincePreviousRunningSummary, }); const insertedFactsFromUserMessage = await dbClient .insertInto("facts") .values( factsFromUserMessageResponse.object.facts.map((fact) => ({ userId: jwt?.id as string, sourceMessageId: insertedUserMessage.id, content: fact, })) ) .returningAll() .execute(); // Emit status update yield { status: "generating_summary", message: "Generating conversation summary...", } as const; /** Produce a running summary of the conversation, and save that along * with the model's response to the database. The new running summary is * based on the previous running summary combined with the all messages * since that summary was produced. */ const runningSummaryResponse = await messagesCaller.generateRunningSummary({ messagesSincePreviousRunningSummary, mainResponseContent: mainResponse.text, previousRunningSummary, }); const insertedAssistantMessage = ( await dbClient .insertInto("messages") .values({ conversationId, // content: mainResponse.text, parts: JSON.stringify([{ type: "text", text: mainResponse.text }]), runningSummary: runningSummaryResponse.text, role: "assistant" as const, index: messages.length, createdAt: new Date().toISOString(), }) .returningAll() .execute() )[0]; // Emit status update yield { status: "extracting_facts_from_assistant", message: "Extracting facts from assistant response...", } as const; /** Extract Facts from the model's response, and add them to the database, * linking the Facts with the messages they came from. */ const factsFromAssistantMessageResponse = await factsCaller.extractFromNewMessages({ previousRunningSummary, messagesSincePreviousRunningSummary, newMessages: [ { role: "assistant" as const, // content: mainResponse.text, parts: [{ type: "text", text: mainResponse.text }], }, ], }); const insertedFactsFromAssistantMessage = await dbClient .insertInto("facts") .values( factsFromAssistantMessageResponse.object.facts.map((factContent) => ({ userId: jwt?.id as string, sourceMessageId: insertedAssistantMessage.id, content: factContent, createdAt: new Date().toISOString(), })) ) .returningAll() .execute(); const insertedFacts = [ ...insertedFactsFromUserMessage, ...insertedFactsFromAssistantMessage, ]; // Emit status update yield { status: "generating_fact_triggers", message: "Generating fact triggers...", } as const; /** For each Fact produced in the two fact-extraction steps, generate * FactTriggers and add them to the database, linking the FactTriggers * with the Facts they came from. A FactTrigger is a natural language * phrase that describes a situation in which it would be useful to invoke * the Fact. (e.g., "When food preferences are discussed"). */ for (const fact of insertedFacts) { const factTriggers = await factTriggerCaller.generateFromFact({ mainResponseContent: mainResponse.text, previousRunningSummary, messagesSincePreviousRunningSummary, fact, }); const insertedFactTriggers: Array> = await dbClient .insertInto("fact_triggers") .values( factTriggers.object.factTriggers.map((factTrigger) => ({ sourceFactId: fact.id, content: factTrigger, priorityMultiplier: 1, priorityMultiplierReason: "", scopeConversationId: conversationId, createdAt: new Date().toISOString(), })) ) .returningAll() .execute(); } // Emit final result yield { status: "completed", message: "Completed!", result: { insertedAssistantMessage, insertedUserMessage, insertedFacts, }, } as const; }), }); export const createCaller = createCallerFactory(chat);