use environment variables from cloudflare's environment

master
Avraham Sakal 4 weeks ago
parent c98342e886
commit 81338b8831

@ -1,2 +1,2 @@
// export { db } from "./lowdb";
export { db } from "./postgres";
export { getDb, getDbClient } from "./postgres";

@ -10,7 +10,6 @@ import type {
MessageEntity,
} from "./common.ts";
import type { CommittedMessage } from "../types";
import { env } from "../server/env.js";
// export const pool = new Pool({
// connectionString: env.POSTGRES_CONNECTION_STRING as string,
@ -21,244 +20,254 @@ import { env } from "../server/env.js";
// pool,
// });
const dialect = new NeonDialect({
neon: neon(env.POSTGRES_CONNECTION_STRING as string),
});
export function getDbClient(POSTGRES_CONNECTION_STRING: string) {
const dialect = new NeonDialect({
neon: neon(POSTGRES_CONNECTION_STRING as string),
});
// Database interface is passed to Kysely's constructor, and from now on, Kysely
// knows your database structure.
// Dialect is passed to Kysely's constructor, and from now on, Kysely knows how
// to communicate with your database.
export const dbClient = new Kysely<Database>({
dialect,
});
// Database interface is passed to Kysely's constructor, and from now on, Kysely
// knows your database structure.
// Dialect is passed to Kysely's constructor, and from now on, Kysely knows how
// to communicate with your database.
const dbClient = new Kysely<Database>({
dialect,
});
const conversations: ConversationEntity = {
construct: (conversation) => conversation,
create: async (conversation) => {
const insertedRows = await dbClient
.insertInto("conversations")
.values(conversation)
.returningAll()
.execute();
return insertedRows[0];
},
createMany: async (conversations) => {
const insertedRows = await dbClient
.insertInto("conversations")
.values(conversations)
.returningAll()
.execute();
return insertedRows;
},
findAll: async () => {
const rows = await dbClient
.selectFrom("conversations")
.selectAll()
.execute();
return rows;
},
findById: async (id) => {
const row = await dbClient
.selectFrom("conversations")
.selectAll()
.where("id", "=", id)
.execute();
return row[0];
},
update: async (id, data) => {
await dbClient
.updateTable("conversations")
.set(data)
.where("id", "=", id)
.execute();
},
delete: async (id) => {
await dbClient.deleteFrom("conversations").where("id", "=", id).execute();
},
fetchMessages: async (conversationId) => {
const rows = await dbClient
.selectFrom("messages")
.selectAll()
.where("conversationId", "=", conversationId)
.execute();
return rows as Array<CommittedMessage>;
},
};
return dbClient;
}
const facts: FactEntity = {
construct: (fact) => fact,
create: async (fact) => {
const insertedRows = await dbClient
.insertInto("facts")
.values(fact)
.returningAll()
.execute();
return insertedRows[0];
},
createMany: async (facts) => {
const insertedRows = await dbClient
.insertInto("facts")
.values(facts)
.returningAll()
.execute();
return insertedRows;
},
findAll: async () => {
const rows = await dbClient.selectFrom("facts").selectAll().execute();
return rows;
},
findById: async (id) => {
const row = await dbClient
.selectFrom("facts")
.selectAll()
.where("id", "=", id)
.execute();
return row[0];
},
update: async (id, data) => {
await dbClient
.updateTable("facts")
.set(data)
.where("id", "=", id)
.execute();
},
delete: async (id) => {
await dbClient.deleteFrom("facts").where("id", "=", id).execute();
},
findByConversationId: async (conversationId) => {
const rows = await dbClient
.selectFrom("facts")
.innerJoin("messages", "messages.id", "facts.sourceMessageId")
.selectAll("facts")
.where("conversationId", "=", conversationId)
.execute();
return rows;
},
};
export function getDb(POSTGRES_CONNECTION_STRING: string) {
const dbClient = getDbClient(POSTGRES_CONNECTION_STRING);
const factTriggers: FactTriggerEntity = {
construct: (factTrigger) => factTrigger,
create: async (factTrigger) => {
const insertedRows = await dbClient
.insertInto("fact_triggers")
.values(factTrigger)
.returningAll()
.execute();
return insertedRows[0];
},
createMany: async (factTriggers) => {
const insertedRows = await dbClient
.insertInto("fact_triggers")
.values(factTriggers)
.returningAll()
.execute();
return insertedRows;
},
findAll: async () => {
const rows = await dbClient
.selectFrom("fact_triggers")
.selectAll()
.execute();
return rows;
},
findById: async (id) => {
const row = await dbClient
.selectFrom("fact_triggers")
.selectAll()
.where("id", "=", id)
.execute();
return row[0];
},
update: async (id, data) => {
await dbClient
.updateTable("fact_triggers")
.set(data)
.where("id", "=", id)
.execute();
},
delete: async (id) => {
await dbClient.deleteFrom("fact_triggers").where("id", "=", id).execute();
},
findByFactId: async (factId) => {
const rows = await dbClient
.selectFrom("fact_triggers")
.innerJoin("facts", "facts.id", "fact_triggers.sourceFactId")
.selectAll("fact_triggers")
.where("sourceFactId", "=", factId)
.execute();
return rows;
},
findByConversationId: async (conversationId) => {
const rows = await dbClient
.selectFrom("fact_triggers")
.innerJoin("facts", "facts.id", "fact_triggers.sourceFactId")
.innerJoin("messages", "messages.id", "facts.sourceMessageId")
.selectAll("fact_triggers")
.where("messages.conversationId", "=", conversationId)
.execute();
return rows;
},
};
const conversations: ConversationEntity = {
construct: (conversation) => conversation,
create: async (conversation) => {
const insertedRows = await dbClient
.insertInto("conversations")
.values(conversation)
.returningAll()
.execute();
return insertedRows[0];
},
createMany: async (conversations) => {
const insertedRows = await dbClient
.insertInto("conversations")
.values(conversations)
.returningAll()
.execute();
return insertedRows;
},
findAll: async () => {
const rows = await dbClient
.selectFrom("conversations")
.selectAll()
.execute();
return rows;
},
findById: async (id) => {
const row = await dbClient
.selectFrom("conversations")
.selectAll()
.where("id", "=", id)
.execute();
return row[0];
},
update: async (id, data) => {
await dbClient
.updateTable("conversations")
.set(data)
.where("id", "=", id)
.execute();
},
delete: async (id) => {
await dbClient.deleteFrom("conversations").where("id", "=", id).execute();
},
fetchMessages: async (conversationId) => {
const rows = await dbClient
.selectFrom("messages")
.selectAll()
.where("conversationId", "=", conversationId)
.execute();
return rows as Array<CommittedMessage>;
},
};
const messages: MessageEntity = {
construct: (message) => message,
create: async (message) => {
const insertedRows = await dbClient
.insertInto("messages")
.values({ ...message, parts: JSON.stringify(message.parts) })
.returningAll()
.execute();
return insertedRows[0] as CommittedMessage;
},
createMany: async (messages) => {
const insertedRows = await dbClient
.insertInto("messages")
.values(
messages.map((message) => ({
...message,
parts: JSON.stringify(message.parts),
}))
)
.returningAll()
.execute();
return insertedRows as Array<CommittedMessage>;
},
findAll: async () => {
const rows = await dbClient.selectFrom("messages").selectAll().execute();
return rows as Array<CommittedMessage>;
},
findById: async (id) => {
const row = await dbClient
.selectFrom("messages")
.selectAll()
.where("id", "=", id)
.execute();
return row[0] as CommittedMessage;
},
update: async (id, data) => {
await dbClient
.updateTable("messages")
.set(data)
.where("id", "=", id)
.execute();
},
delete: async (id) => {
await dbClient.deleteFrom("messages").where("id", "=", id).execute();
},
findByConversationId: async (conversationId) => {
const rows = (await dbClient
.selectFrom("messages")
.selectAll()
.where("conversationId", "=", conversationId)
.execute()) as Array<CommittedMessage>;
return rows;
},
};
const facts: FactEntity = {
construct: (fact) => fact,
create: async (fact) => {
const insertedRows = await dbClient
.insertInto("facts")
.values(fact)
.returningAll()
.execute();
return insertedRows[0];
},
createMany: async (facts) => {
const insertedRows = await dbClient
.insertInto("facts")
.values(facts)
.returningAll()
.execute();
return insertedRows;
},
findAll: async () => {
const rows = await dbClient.selectFrom("facts").selectAll().execute();
return rows;
},
findById: async (id) => {
const row = await dbClient
.selectFrom("facts")
.selectAll()
.where("id", "=", id)
.execute();
return row[0];
},
update: async (id, data) => {
await dbClient
.updateTable("facts")
.set(data)
.where("id", "=", id)
.execute();
},
delete: async (id) => {
await dbClient.deleteFrom("facts").where("id", "=", id).execute();
},
findByConversationId: async (conversationId) => {
const rows = await dbClient
.selectFrom("facts")
.innerJoin("messages", "messages.id", "facts.sourceMessageId")
.selectAll("facts")
.where("conversationId", "=", conversationId)
.execute();
return rows;
},
};
export const db = {
conversations,
facts,
factTriggers,
messages,
};
const factTriggers: FactTriggerEntity = {
construct: (factTrigger) => factTrigger,
create: async (factTrigger) => {
const insertedRows = await dbClient
.insertInto("fact_triggers")
.values(factTrigger)
.returningAll()
.execute();
return insertedRows[0];
},
createMany: async (factTriggers) => {
const insertedRows = await dbClient
.insertInto("fact_triggers")
.values(factTriggers)
.returningAll()
.execute();
return insertedRows;
},
findAll: async () => {
const rows = await dbClient
.selectFrom("fact_triggers")
.selectAll()
.execute();
return rows;
},
findById: async (id) => {
const row = await dbClient
.selectFrom("fact_triggers")
.selectAll()
.where("id", "=", id)
.execute();
return row[0];
},
update: async (id, data) => {
await dbClient
.updateTable("fact_triggers")
.set(data)
.where("id", "=", id)
.execute();
},
delete: async (id) => {
await dbClient.deleteFrom("fact_triggers").where("id", "=", id).execute();
},
findByFactId: async (factId) => {
const rows = await dbClient
.selectFrom("fact_triggers")
.innerJoin("facts", "facts.id", "fact_triggers.sourceFactId")
.selectAll("fact_triggers")
.where("sourceFactId", "=", factId)
.execute();
return rows;
},
findByConversationId: async (conversationId) => {
const rows = await dbClient
.selectFrom("fact_triggers")
.innerJoin("facts", "facts.id", "fact_triggers.sourceFactId")
.innerJoin("messages", "messages.id", "facts.sourceMessageId")
.selectAll("fact_triggers")
.where("messages.conversationId", "=", conversationId)
.execute();
return rows;
},
};
const messages: MessageEntity = {
construct: (message) => message,
create: async (message) => {
const insertedRows = await dbClient
.insertInto("messages")
.values({ ...message, parts: JSON.stringify(message.parts) })
.returningAll()
.execute();
return insertedRows[0] as CommittedMessage;
},
createMany: async (messages) => {
const insertedRows = await dbClient
.insertInto("messages")
.values(
messages.map((message) => ({
...message,
parts: JSON.stringify(message.parts),
}))
)
.returningAll()
.execute();
return insertedRows as Array<CommittedMessage>;
},
findAll: async () => {
const rows = await dbClient.selectFrom("messages").selectAll().execute();
return rows as Array<CommittedMessage>;
},
findById: async (id) => {
const row = await dbClient
.selectFrom("messages")
.selectAll()
.where("id", "=", id)
.execute();
return row[0] as CommittedMessage;
},
update: async (id, data) => {
await dbClient
.updateTable("messages")
.set(data)
.where("id", "=", id)
.execute();
},
delete: async (id) => {
await dbClient.deleteFrom("messages").where("id", "=", id).execute();
},
findByConversationId: async (conversationId) => {
const rows = (await dbClient
.selectFrom("messages")
.selectAll()
.where("conversationId", "=", conversationId)
.execute()) as Array<CommittedMessage>;
return rows;
},
};
const db = {
conversations,
facts,
factTriggers,
messages,
};
return db;
}

@ -1,11 +1,22 @@
import type { PageContextServer } from "vike/types";
import { createCaller } from "../trpc.js";
import { getDb } from "../../../database/postgres.js";
import { getOpenrouter } from "../provider.js";
import { env } from "../../../server/env.js";
export type Data = Awaited<ReturnType<typeof data>>;
export const data = async (pageContext: PageContextServer) => {
const { id } = pageContext.routeParams;
const caller = createCaller({});
const caller = createCaller({
db: getDb(
(pageContext.env?.POSTGRES_CONNECTION_STRING ||
env.POSTGRES_CONNECTION_STRING) as string
),
openrouter: getOpenrouter(
(pageContext.env?.OPENROUTER_API_KEY || env.OPENROUTER_API_KEY) as string
),
});
const [
conversation,

@ -3,18 +3,17 @@ import {
publicProcedure,
createCallerFactory,
} from "../../trpc/server";
import { db } from "../../database/index";
export const conversations = router({
fetchAll: publicProcedure.query(async () => {
fetchAll: publicProcedure.query(async ({ ctx: { db } }) => {
return await db.conversations.findAll();
}),
fetchOne: publicProcedure
.input((x) => x as { id: string })
.query(async ({ input: { id } }) => {
.query(async ({ input: { id }, ctx: { db } }) => {
return await db.conversations.findById(id);
}),
start: publicProcedure.mutation(async () => {
start: publicProcedure.mutation(async ({ ctx: { db } }) => {
const row = {
title: "New Conversation",
userId: "019900bb-61b3-7333-b760-b27784dfe33b",
@ -23,7 +22,7 @@ export const conversations = router({
}),
deleteOne: publicProcedure
.input((x) => x as { id: string })
.mutation(async ({ input: { id } }) => {
.mutation(async ({ input: { id }, ctx: { db } }) => {
await db.conversations.delete(id);
return { ok: true };
}),
@ -35,13 +34,13 @@ export const conversations = router({
title: string;
}
)
.mutation(async ({ input: { id, title } }) => {
.mutation(async ({ input: { id, title }, ctx: { db } }) => {
await db.conversations.update(id, { title });
return { ok: true };
}),
fetchMessages: publicProcedure
.input((x) => x as { conversationId: string })
.query(async ({ input: { conversationId } }) => {
.query(async ({ input: { conversationId }, ctx: { db } }) => {
return await db.conversations.fetchMessages(conversationId);
}),
});

@ -3,9 +3,8 @@ import {
publicProcedure,
createCallerFactory,
} from "../../trpc/server.js";
import { db } from "../../database/index.js";
import type { DraftMessage } from "../../types.js";
import { openrouter, MODEL_NAME } from "./provider.js";
import { MODEL_NAME } from "./provider.js";
import { generateObject, generateText, jsonSchema } from "ai";
import type { Fact } from "../../database/common.js";
@ -61,12 +60,12 @@ Generate a list of situations in which the fact is useful.`;
export const factTriggers = router({
fetchByFactId: publicProcedure
.input((x) => x as { factId: string })
.query(async ({ input: { factId } }) => {
.query(async ({ input: { factId }, ctx: { db } }) => {
return db.factTriggers.findByFactId(factId);
}),
fetchByConversationId: publicProcedure
.input((x) => x as { conversationId: string })
.query(async ({ input: { conversationId } }) => {
.query(async ({ input: { conversationId }, ctx: { db } }) => {
return await db.factTriggers.findByConversationId(conversationId);
}),
deleteOne: publicProcedure
@ -76,7 +75,7 @@ export const factTriggers = router({
factTriggerId: string;
}
)
.mutation(async ({ input: { factTriggerId } }) => {
.mutation(async ({ input: { factTriggerId }, ctx: { db } }) => {
await db.factTriggers.delete(factTriggerId);
return { ok: true };
}),
@ -88,7 +87,7 @@ export const factTriggers = router({
content: string;
}
)
.mutation(async ({ input: { factTriggerId, content } }) => {
.mutation(async ({ input: { factTriggerId, content }, ctx: { db } }) => {
db.factTriggers.update(factTriggerId, { content });
return { ok: true };
}),
@ -110,6 +109,7 @@ export const factTriggers = router({
mainResponseContent,
fact,
},
ctx: { openrouter },
}) => {
const factTriggers = await generateObject({
model: openrouter(MODEL_NAME),

@ -3,7 +3,6 @@ import {
publicProcedure,
createCallerFactory,
} from "../../trpc/server.js";
import { db } from "../../database/index.js";
import type { DraftMessage } from "../../types.js";
import { MODEL_NAME, openrouter } from "./provider.js";
import { generateObject, generateText, jsonSchema } from "ai";
@ -56,7 +55,7 @@ Extract new facts from these messages.`;
export const facts = router({
fetchByConversationId: publicProcedure
.input((x) => x as { conversationId: string })
.query(async ({ input: { conversationId } }) => {
.query(async ({ input: { conversationId }, ctx: { db } }) => {
return await db.facts.findByConversationId(conversationId);
}),
deleteOne: publicProcedure
@ -66,7 +65,7 @@ export const facts = router({
factId: string;
}
)
.mutation(async ({ input: { factId } }) => {
.mutation(async ({ input: { factId }, ctx: { db } }) => {
await db.facts.delete(factId);
return { ok: true };
}),
@ -78,7 +77,7 @@ export const facts = router({
content: string;
}
)
.mutation(async ({ input: { factId, content } }) => {
.mutation(async ({ input: { factId, content }, ctx: { db } }) => {
await db.facts.update(factId, { content });
return { ok: true };
}),
@ -100,6 +99,7 @@ export const facts = router({
messagesSincePreviousRunningSummary,
newMessages,
},
ctx: { db },
}) => {
const factsFromUserMessageResponse = await generateObject({
model: openrouter(MODEL_NAME),

@ -3,10 +3,9 @@ import {
publicProcedure,
createCallerFactory,
} from "../../trpc/server";
import { MODEL_NAME, openrouter } from "./provider.js";
import { MODEL_NAME } from "./provider.js";
import { generateObject, generateText, jsonSchema } from "ai";
import type { DraftMessage } from "../../types.js";
import { db } from "../../database/index";
const runningSummarySystemPrompt = ({
previousRunningSummary,
@ -51,12 +50,12 @@ Generate a new running summary of the conversation.`;
export const messages = router({
fetchByConversationId: publicProcedure
.input((x) => x as { conversationId: string })
.query(async ({ input: { conversationId } }) => {
.query(async ({ input: { conversationId }, ctx: { db } }) => {
return await db.messages.findByConversationId(conversationId);
}),
deleteOne: publicProcedure
.input((x) => x as { id: string })
.mutation(async ({ input: { id } }) => {
.mutation(async ({ input: { id }, ctx: { db } }) => {
await db.messages.delete(id);
return { success: true };
}),
@ -76,6 +75,7 @@ export const messages = router({
messagesSincePreviousRunningSummary,
mainResponseContent,
},
ctx: { openrouter },
}) => {
const runningSummaryResponse = await generateText({
model: openrouter(MODEL_NAME),

@ -4,6 +4,11 @@ export const openrouter = createOpenRouter({
apiKey: env.OPENROUTER_API_KEY,
});
export function getOpenrouter(OPENROUTER_API_KEY: string) {
return createOpenRouter({
apiKey: OPENROUTER_API_KEY,
});
}
export const MODEL_NAME = "mistralai/mistral-nemo";
// export const MODEL_NAME = "z-ai/glm-4.5-air";
// export const MODEL_NAME = "openai/gpt-5-mini";

@ -14,20 +14,15 @@ import type {
// ConsistencyLevelEnum,
// type NumberArrayId,
// } from "@zilliz/milvus2-sdk-node";
import { db } from "../../database/index.js";
import { conversations } from "./conversations.js";
import { messages } from "./messages.js";
import { facts, createCaller as createCallerFacts } from "./facts.js";
import { createCaller as createCallerMessages } from "./messages.js";
import { createCaller as createCallerFactTriggers } from "./fact-triggers.js";
import { factTriggers } from "./fact-triggers.js";
import { MODEL_NAME, openrouter } from "./provider.js";
import { MODEL_NAME } from "./provider.js";
import type { Fact, FactTrigger } from "../../database/common.js";
const factsCaller = createCallerFacts({});
const messagesCaller = createCallerMessages({});
const factTriggerCaller = createCallerFactTriggers({});
const mainSystemPrompt = ({
systemPrompt,
previousRunningSummary,
@ -54,7 +49,7 @@ export const chat = router({
message: string;
}
)
.subscription(async function* ({ input, signal }) {
.subscription(async function* ({ input, signal, ctx: { openrouter } }) {
const result = streamText({
model: openrouter(MODEL_NAME),
messages: [{ role: "user" as const, content: input.message }],
@ -76,7 +71,12 @@ export const chat = router({
)
.subscription(async function* ({
input: { conversationId, messages, systemPrompt, parameters },
ctx,
}) {
const { db, openrouter } = ctx;
const factsCaller = createCallerFacts(ctx);
const messagesCaller = createCallerMessages(ctx);
const factTriggerCaller = createCallerFactTriggers(ctx);
/** TODO: Save all unsaved messages (i.e. those without an `id`) to the
* database. Is this dangerous? Can an attacker just send a bunch of
* messages, omitting the ids, causing me to save a bunch of them to the

@ -1,7 +1,14 @@
import { appRouter } from "../trpc/router";
// TODO: stop using universal-middleware and directly integrate server middlewares instead and/or use vike-server https://vike.dev/server. (Bati generates boilerplates that use universal-middleware https://github.com/magne4000/universal-middleware to make Bati's internal logic easier. This is temporary and will be removed soon.)
import type { Get, UniversalHandler } from "@universal-middleware/core";
import {
type Get,
type UniversalHandler,
env as getEnv,
} from "@universal-middleware/core";
import { fetchRequestHandler } from "@trpc/server/adapters/fetch";
import { getDb, getDbClient } from "../database/postgres";
import { getOpenrouter } from "../pages/chat/provider";
import { env as processEnv } from "./env.js";
export const trpcHandler = ((endpoint) => (request, context, runtime) => {
return fetchRequestHandler({
@ -9,11 +16,26 @@ export const trpcHandler = ((endpoint) => (request, context, runtime) => {
req: request,
router: appRouter,
createContext({ req, resHeaders }) {
const env = getEnv(runtime);
const dbClient = getDbClient(
(env.POSTGRES_CONNECTION_STRING ||
processEnv.POSTGRES_CONNECTION_STRING) as string
);
const db = getDb(
(env.POSTGRES_CONNECTION_STRING ||
processEnv.POSTGRES_CONNECTION_STRING) as string
);
const openrouter = getOpenrouter(
(env.OPENROUTER_API_KEY || processEnv.OPENROUTER_API_KEY) as string
);
return {
...context,
...runtime,
req,
resHeaders,
dbClient,
db,
openrouter,
};
},
allowMethodOverride: true,

@ -1,12 +1,21 @@
import type { TSchema } from "@sinclair/typebox";
import { TypeCompiler } from "@sinclair/typebox/compiler";
import { initTRPC, TRPCError } from "@trpc/server";
import type { getDb } from "../database/postgres";
import type { getOpenrouter } from "../pages/chat/provider";
/**
* Initialization of tRPC backend
* Should be done only once per backend!
*/
const t = initTRPC.context<object>().create(/*{
const t = initTRPC
.context<
object & {
db: ReturnType<typeof getDb>;
openrouter: ReturnType<typeof getOpenrouter>;
}
>()
.create(/*{
sse: {
maxDurationMs: 5 * 60 * 1_000, // 5 minutes
ping: {

Loading…
Cancel
Save