From 93d5ac8a30765531397777c536948492c31598f6 Mon Sep 17 00:00:00 2001 From: avraham Date: Wed, 7 Aug 2024 21:57:03 -0400 Subject: [PATCH] refactor; backtest using lmdbx --- server/TODO.md | 2 + server/package.json | 4 +- server/src/backtest.ts | 55 ++++----- .../clickhouse.ts} | 6 +- .../interfaces.ts} | 2 +- .../lmdbx.ts} | 2 +- .../optiondb-lmdbx.ts} | 115 ++++++++++-------- server/src/interfaces.ts | 38 +++--- server/src/lib/clickhouse.ts | 31 +++-- server/src/lib/{util.ts => utils/nextDate.ts} | 0 server/src/lib/utils/retry.ts | 61 ++++++++++ .../clickhouse.ts} | 6 +- .../interfaces.ts} | 2 +- .../{optiondb.lmdbx.ts => optiondb/lmdbx.ts} | 43 ++++--- server/src/scripts/clickhouse-to-lmdbx.ts | 28 +++-- server/src/stockdb.clickhouse.ts | 59 --------- server/src/stockdb.interfaces.ts | 7 -- server/src/stockdb.lmdbx.ts | 80 ------------ server/src/stockdb/clickhouse.ts | 62 ++++++++++ server/src/stockdb/interfaces.ts | 7 ++ server/src/stockdb/lmdbx.ts | 83 +++++++++++++ 21 files changed, 397 insertions(+), 296 deletions(-) create mode 100644 server/TODO.md rename server/src/{calendardb.clickhouse.ts => calendardb/clickhouse.ts} (96%) rename server/src/{calendardb.interfaces.ts => calendardb/interfaces.ts} (90%) rename server/src/{calendardb.lmdbx.ts => calendardb/lmdbx.ts} (98%) rename server/src/{calendardb.optiondb.lmdbx.ts => calendardb/optiondb-lmdbx.ts} (62%) rename server/src/lib/{util.ts => utils/nextDate.ts} (100%) create mode 100644 server/src/lib/utils/retry.ts rename server/src/{optiondb.clickhouse.ts => optiondb/clickhouse.ts} (94%) rename server/src/{optiondb.interfaces.ts => optiondb/interfaces.ts} (82%) rename server/src/{optiondb.lmdbx.ts => optiondb/lmdbx.ts} (78%) delete mode 100644 server/src/stockdb.clickhouse.ts delete mode 100644 server/src/stockdb.interfaces.ts delete mode 100644 server/src/stockdb.lmdbx.ts create mode 100644 server/src/stockdb/clickhouse.ts create mode 100644 server/src/stockdb/interfaces.ts create mode 100644 server/src/stockdb/lmdbx.ts diff --git a/server/TODO.md b/server/TODO.md new file mode 100644 index 0000000..8139624 --- /dev/null +++ b/server/TODO.md @@ -0,0 +1,2 @@ +- Ingest stock/underlying aggregates from flatfiles +- Create backtesting function to step through each minute of every day. diff --git a/server/package.json b/server/package.json index 80e5e9c..18cf47f 100644 --- a/server/package.json +++ b/server/package.json @@ -2,7 +2,7 @@ "private": true, "type": "module", "scripts": { - "build": "esbuild src/*.ts src/**/*.ts --platform=node --outdir=dist --format=esm", + "build": "esbuild src/*.ts src/**/*.ts src/**/**/*.ts --platform=node --outdir=dist --format=esm", "dev:node": "node --watch dist/index.js", "dev:esbuild": "pnpm run build --watch", "dev": "run-p dev:*" @@ -30,4 +30,4 @@ "npm-run-all": "^4.1.5", "typescript": "^5.3.3" } -} +} \ No newline at end of file diff --git a/server/src/backtest.ts b/server/src/backtest.ts index 9d8e589..dde5a56 100644 --- a/server/src/backtest.ts +++ b/server/src/backtest.ts @@ -1,8 +1,8 @@ -import { stockDatabase } from "./stockdb.clickhouse.js"; -import { calendarDatabase } from "./calendardb.optiondb.lmdbx.js"; -import type { CalendarKey } from "./calendardb.interfaces.js"; +import { stockDatabase } from "./stockdb/clickhouse.js"; +import { calendarDatabase } from "./calendardb/optiondb-lmdbx.js"; +import type { CalendarKey } from "./calendardb/interfaces.js"; import type { Aggregate } from "./interfaces.js"; -import { nextDate } from "./lib/util.js"; +import { nextDate } from "./lib/utils/nextDate.js"; type BacktestInput = { symbol: string; @@ -10,14 +10,14 @@ type BacktestInput = { endDate: string; /** Between 0 and 1. The frequency that similar calendars have historically ended (i.e. within the last hour) at a higher price than the current calendar's price. */ historicalProbabilityOfSuccess?: number; - initialAvailableValue?: number; + initialBuyingPower?: number; }; export async function backtest({ symbol, startDate, endDate, historicalProbabilityOfSuccess = 0.8, - initialAvailableValue: initialBuyingPower = 2000, + initialBuyingPower = 2000, }: BacktestInput) { let buyingPower = initialBuyingPower; const portfolio = new Set(); @@ -27,42 +27,27 @@ export async function backtest({ date <= endDate; date = nextDate(date), didBuyCalendar = false ) { - console.log("Current Date:", date); const calendars = await calendarDatabase.getCalendars({ key: { symbol }, date, }); const stockAggregates = await stockDatabase.getAggregates({ - key: symbol, + key: { symbol }, date, }); - const calendarsAggregates = new Map< - CalendarKey, - Array, "tsStart" | "open" | "close">> - >(); - for (const calendar of calendars) { - calendarsAggregates.set( - calendar, - await calendarDatabase.getAggregates({ - key: { - ...calendar, - }, - date, - }) - ); - } // for each minute of that day for which we have a stock candlestick: for (const stockAggregate of stockAggregates) { // console.log("Current Time:", new Date(stockAggregate.tsStart)); // filter-out calendars that are far-from-the-money (10%) + console.log("Current Date:", date, stockAggregate.tsStart); const calendarsNearTheMoney = calendars.filter( ({ strike }) => - Math.abs((stockAggregate.open - strike) / stockAggregate.open) < 0.1 + Math.abs((stockAggregate.open - strike) / stockAggregate.open) < 0.1, ); // for each relevant calendar on that day: for (const calendar of calendarsNearTheMoney) { const strikePercentageFromTheMoney = Math.abs( - (stockAggregate.open - calendar.strike) / stockAggregate.open + (stockAggregate.open - calendar.strike) / stockAggregate.open, ); /** In days. */ const calendarSpan = @@ -76,16 +61,26 @@ export async function backtest({ strikePercentageFromTheMoney, historicalProbabilityOfSuccess, }); - const calendarAggregates = calendarsAggregates.get(calendar); + const calendarAggregates = calendarDatabase.getAggregatesSync({ + key: { + ...calendar, + }, + date, + }); + // console.log( + // "Calendar Aggregates:", + // calendar, + // calendarAggregates.length, + // ); const calendarAggregateAtCurrentTime = calendarAggregates.find( - ({ tsStart }) => tsStart === stockAggregate.tsStart + ({ tsStart }) => tsStart === stockAggregate.tsStart, ); // if there exists a matching calendar candlestick for the current minute: if (calendarAggregateAtCurrentTime) { // if the current candlestick is a good price (i.e. less than the target price): const minCalendarPriceInCandlestick = Math.min( calendarAggregateAtCurrentTime.open, - calendarAggregateAtCurrentTime.close + calendarAggregateAtCurrentTime.close, ); if ( minCalendarPriceInCandlestick < targetCalendarPrice && @@ -104,7 +99,7 @@ export async function backtest({ minCalendarPriceInCandlestick * 100, "...$", buyingPower, - "left" + "left", ); didBuyCalendar = true; } @@ -136,7 +131,7 @@ export async function backtest({ calendarClosingPrice, "...$", buyingPower, - "left" + "left", ); } } diff --git a/server/src/calendardb.clickhouse.ts b/server/src/calendardb/clickhouse.ts similarity index 96% rename from server/src/calendardb.clickhouse.ts rename to server/src/calendardb/clickhouse.ts index dac31bb..5616f89 100644 --- a/server/src/calendardb.clickhouse.ts +++ b/server/src/calendardb/clickhouse.ts @@ -1,6 +1,6 @@ -import type { CalendarDatabase, CalendarKey } from "./calendardb.interfaces.js"; -import type { Aggregate } from "./interfaces.js"; -import { query } from "./lib/clickhouse.js"; +import type { CalendarDatabase, CalendarKey } from "./interfaces.js"; +import type { Aggregate } from "../interfaces.js"; +import { query } from "../lib/clickhouse.js"; function makeCalendarDatabase(): CalendarDatabase { const calendarDatabase: Omit = { diff --git a/server/src/calendardb.interfaces.ts b/server/src/calendardb/interfaces.ts similarity index 90% rename from server/src/calendardb.interfaces.ts rename to server/src/calendardb/interfaces.ts index 2495c80..26110da 100644 --- a/server/src/calendardb.interfaces.ts +++ b/server/src/calendardb/interfaces.ts @@ -1,4 +1,4 @@ -import type { AggregateDatabase } from "./interfaces.js"; +import type { AggregateDatabase } from "../interfaces.js"; export type CalendarKey = { symbol: string; diff --git a/server/src/calendardb.lmdbx.ts b/server/src/calendardb/lmdbx.ts similarity index 98% rename from server/src/calendardb.lmdbx.ts rename to server/src/calendardb/lmdbx.ts index 2e4bc6e..b8f27df 100644 --- a/server/src/calendardb.lmdbx.ts +++ b/server/src/calendardb/lmdbx.ts @@ -1,4 +1,4 @@ -import type { CalendarDatabase } from "./calendardb.interfaces.js"; +import type { CalendarDatabase } from "./interfaces.js"; import { open } from "lmdbx"; const calendarAggregatesDb = open({ diff --git a/server/src/calendardb.optiondb.lmdbx.ts b/server/src/calendardb/optiondb-lmdbx.ts similarity index 62% rename from server/src/calendardb.optiondb.lmdbx.ts rename to server/src/calendardb/optiondb-lmdbx.ts index 5e927c3..7f5bf75 100644 --- a/server/src/calendardb.optiondb.lmdbx.ts +++ b/server/src/calendardb/optiondb-lmdbx.ts @@ -1,10 +1,64 @@ -import { optionContractDatabase } from "./optiondb.lmdbx.js"; -import type { CalendarDatabase } from "./calendardb.interfaces.js"; +import { optionContractDatabase } from "../optiondb/lmdbx.js"; +import type { CalendarDatabase } from "./interfaces.js"; /** Largest possible key according to the `ordered-binary` (used by lmdbx) docs. */ const MAXIMUM_KEY = Buffer.from([0xff]); function makeCalendarDatabase(): CalendarDatabase { + const getAggregatesSync = ({ + key: { symbol, frontExpirationDate, backExpirationDate, strike, type }, + date, + }) => { + const frontOptionContractAggregates = + optionContractDatabase.getAggregatesSync({ + date, + key: { symbol, expirationDate: frontExpirationDate, strike, type }, + }); + const backOptionContractAggregates = + optionContractDatabase.getAggregatesSync({ + date, + key: { symbol, expirationDate: backExpirationDate, strike, type }, + }); + const calendarAggregates = []; + let i = 0; + let j = 0; + while ( + i < frontOptionContractAggregates.length && + j < backOptionContractAggregates.length + ) { + if ( + frontOptionContractAggregates[i].tsStart === + backOptionContractAggregates[j].tsStart + ) { + calendarAggregates.push({ + tsStart: frontOptionContractAggregates[i].tsStart, + open: + backOptionContractAggregates[j].open - + frontOptionContractAggregates[i].open, + close: + backOptionContractAggregates[j].close - + frontOptionContractAggregates[i].close, + // the high and low are not exactly correct since we don't know if each contract's high and low happened at the same moment as the other: + high: + backOptionContractAggregates[j].high - + frontOptionContractAggregates[i].high, + low: + backOptionContractAggregates[j].low - + frontOptionContractAggregates[i].low, + }); + i++; + j++; + } else if ( + frontOptionContractAggregates[i].tsStart > + backOptionContractAggregates[j].tsStart + ) { + j++; + } else { + i++; + } + } + return calendarAggregates; + }; const calendarDatabase: Omit = { getKeys: async ({ key: { symbol }, date }) => { const optionContracts = await optionContractDatabase.getOptionContracts({ @@ -27,57 +81,12 @@ function makeCalendarDatabase(): CalendarDatabase { getAggregates: async ({ key: { symbol, frontExpirationDate, backExpirationDate, strike, type }, date, - }) => { - const frontOptionContractAggregates = - await optionContractDatabase.getAggregates({ - date, - key: { symbol, expirationDate: frontExpirationDate, strike, type }, - }); - const backOptionContractAggregates = - await optionContractDatabase.getAggregates({ - date, - key: { symbol, expirationDate: backExpirationDate, strike, type }, - }); - const calendarAggregates = []; - let i = 0; - let j = 0; - while ( - i < frontOptionContractAggregates.length && - j < backOptionContractAggregates.length - ) { - if ( - frontOptionContractAggregates[i].tsStart === - backOptionContractAggregates[j].tsStart - ) { - calendarAggregates.push({ - tsStart: frontOptionContractAggregates[i].tsStart, - open: - backOptionContractAggregates[j].open - - frontOptionContractAggregates[i].open, - close: - backOptionContractAggregates[j].close - - frontOptionContractAggregates[i].close, - // the high and low are not exactly correct since we don't know if each contract's high and low happened at the same moment as the other: - high: - backOptionContractAggregates[j].high - - frontOptionContractAggregates[i].high, - low: - backOptionContractAggregates[j].low - - frontOptionContractAggregates[i].low, - }); - i++; - j++; - } else if ( - frontOptionContractAggregates[i].tsStart > - backOptionContractAggregates[j].tsStart - ) { - j++; - } else { - i++; - } - } - return calendarAggregates; - }, + }) => + getAggregatesSync({ + key: { symbol, frontExpirationDate, backExpirationDate, strike, type }, + date, + }), + getAggregatesSync, insertAggregates: async (aggregates) => { // right now, no-op }, diff --git a/server/src/interfaces.ts b/server/src/interfaces.ts index b53ad0c..f66f048 100644 --- a/server/src/interfaces.ts +++ b/server/src/interfaces.ts @@ -1,25 +1,29 @@ export type Candlestick = { - open: number; - close: number; - high: number; - low: number; + open: number; + close: number; + high: number; + low: number; }; export type Aggregate = { - key: T; - /** UNIX time in milliseconds */ - tsStart: number; + key: T; + /** UNIX time in milliseconds */ + tsStart: number; } & Candlestick; export type AggregateDatabase = { - getKeys: ({ - key, - date, - }: { key?: T | Partial; date?: string }) => Promise>; - getAggregates: ({ - key, - date, - }: { key: T; date: string }) => Promise, "key">>>; - insertAggregates: (aggregates: Array>) => Promise; - getClosingPrice: ({ key }: { key: T }) => Promise; + getKeys: ({ + key, + date, + }: { key?: T | Partial; date?: string }) => Promise>; + getAggregates: ({ + key, + date, + }: { key: T; date: string }) => Promise, "key">>>; + getAggregatesSync?: ({ + key, + date, + }: { key: T; date: string }) => Array, "key">>; + insertAggregates: (aggregates: Array>) => Promise; + getClosingPrice: ({ key }: { key: T }) => Promise; }; diff --git a/server/src/lib/clickhouse.ts b/server/src/lib/clickhouse.ts index 08dfddd..e1ba563 100644 --- a/server/src/lib/clickhouse.ts +++ b/server/src/lib/clickhouse.ts @@ -1,6 +1,7 @@ import { createClient as createClickhouseClient } from "@clickhouse/client"; import type { DataFormat } from "@clickhouse/client"; import { Env } from "@humanwhocodes/env"; +import { retry } from "./utils/retry.js"; const env = new Env(); @@ -11,21 +12,27 @@ export const clickhouse = createClickhouseClient({ host: CLICKHOUSE_HOST, username: CLICKHOUSE_USER, password: CLICKHOUSE_PASS, + keep_alive: { + enabled: true, + socket_ttl: 2500, + }, }); export async function query( queryString: string, - format: DataFormat = "JSONEachRow" + format: DataFormat = "JSONEachRow", ): Promise> { - return await ( - await clickhouse.query({ - query: queryString, - format, - clickhouse_settings: { - output_format_json_quote_64bit_integers: 0, - //output_format_json_quote_64bit_floats: false, - //output_format_json_quote_64bit_decimals: false, - }, - }) - ).json(); + return await retry( + async () => { + const result = await clickhouse.query({ + query: queryString, + format, + clickhouse_settings: { + output_format_json_quote_64bit_integers: 0, + }, + }); + return await result.json(); + }, + { maxRetries: 5 }, + ); } diff --git a/server/src/lib/util.ts b/server/src/lib/utils/nextDate.ts similarity index 100% rename from server/src/lib/util.ts rename to server/src/lib/utils/nextDate.ts diff --git a/server/src/lib/utils/retry.ts b/server/src/lib/utils/retry.ts new file mode 100644 index 0000000..d82ae9a --- /dev/null +++ b/server/src/lib/utils/retry.ts @@ -0,0 +1,61 @@ +type RetryDecision = { + shouldRetry: boolean; + maxRetries?: number; + delay?: number; +}; + +type RetryOptions = { + maxRetries?: number; + delay?: number; + shouldRetry?: (error: unknown) => RetryDecision; +}; + +export async function retry( + fn: () => Promise, + options: RetryOptions = {}, +): Promise { + const { + maxRetries: defaultMaxRetries = 3, + delay: defaultDelay = 1000, + shouldRetry = retryOnAnyError, + } = options; + + let attempt = 1; + while (true) { + try { + return await fn(); + } catch (error) { + const decision = shouldRetry(error); + if (!decision.shouldRetry) throw error; + + const currentMaxRetries = decision.maxRetries ?? defaultMaxRetries; + const currentDelay = decision.delay ?? defaultDelay; + + if (attempt >= currentMaxRetries) throw error; + + console.warn( + `Error occurred, retrying (attempt ${attempt}/${currentMaxRetries})...`, + ); + console.error(error); + await new Promise((resolve) => setTimeout(resolve, currentDelay)); + attempt++; + } + } +} + +export const retryOnAnyError = (): RetryDecision => ({ shouldRetry: true }); +export const retryOnTimeout = (error: unknown): RetryDecision => + error instanceof Error && error.message.includes("timeout") + ? { shouldRetry: true, maxRetries: 5, delay: 2000 } + : { shouldRetry: false }; +export const retryOnErrorType = + (errorType: new () => Error, options?: Partial) => + (error: unknown) => + error instanceof errorType + ? { shouldRetry: true, ...options } + : { shouldRetry: false }; +export const retryOnErrorSubstring = + (substring: string, options?: Partial) => (error: unknown) => + error instanceof Error && error.message.includes(substring) + ? { shouldRetry: true, ...options } + : { shouldRetry: false }; diff --git a/server/src/optiondb.clickhouse.ts b/server/src/optiondb/clickhouse.ts similarity index 94% rename from server/src/optiondb.clickhouse.ts rename to server/src/optiondb/clickhouse.ts index 7f80add..beacd04 100644 --- a/server/src/optiondb.clickhouse.ts +++ b/server/src/optiondb/clickhouse.ts @@ -1,9 +1,9 @@ import type { OptionContractDatabase, OptionContractKey, -} from "./optiondb.interfaces.js"; -import type { Aggregate } from "./interfaces.js"; -import { clickhouse, query } from "./lib/clickhouse.js"; +} from "./interfaces.js"; +import type { Aggregate } from "../interfaces.js"; +import { clickhouse, query } from "../lib/clickhouse.js"; function makeOptionContractDatabase(): OptionContractDatabase { const optionContractDatabase: Omit< diff --git a/server/src/optiondb.interfaces.ts b/server/src/optiondb/interfaces.ts similarity index 82% rename from server/src/optiondb.interfaces.ts rename to server/src/optiondb/interfaces.ts index 6b56d97..e98b878 100644 --- a/server/src/optiondb.interfaces.ts +++ b/server/src/optiondb/interfaces.ts @@ -1,4 +1,4 @@ -import type { AggregateDatabase } from "./interfaces.js"; +import type { AggregateDatabase } from "../interfaces.js"; export type OptionContractKey = { symbol: string; diff --git a/server/src/optiondb.lmdbx.ts b/server/src/optiondb/lmdbx.ts similarity index 78% rename from server/src/optiondb.lmdbx.ts rename to server/src/optiondb/lmdbx.ts index be69408..127898d 100644 --- a/server/src/optiondb.lmdbx.ts +++ b/server/src/optiondb/lmdbx.ts @@ -1,4 +1,4 @@ -import type { OptionContractDatabase } from "./optiondb.interfaces.js"; +import type { OptionContractDatabase } from "./interfaces.js"; import { open } from "lmdbx"; const optionContractAggregatesDb = open({ @@ -17,6 +17,25 @@ const optionContractExistenceDb = open({ const MAXIMUM_KEY = Buffer.from([0xff]); function makeOptionContractDatabase(): OptionContractDatabase { + const getAggregatesSync = ({ + key: { symbol, expirationDate, strike, type }, + date, + }) => { + const startOfDayUnix = new Date(`${date}T00:00:00Z`).valueOf(); + const endOfDayUnix = startOfDayUnix + 3600 * 24 * 1000; + return optionContractAggregatesDb + .getRange({ + start: [symbol, expirationDate, strike, type, startOfDayUnix], + end: [symbol, expirationDate, strike, type, endOfDayUnix], + }) + .map(({ value }) => ({ + tsStart: value.tsStart, + open: value.open, + close: value.close, + high: value.high, + low: value.low, + })).asArray; + }; const optionContractDatabase: Omit< OptionContractDatabase, "getOptionContracts" @@ -34,25 +53,15 @@ function makeOptionContractDatabase(): OptionContractDatabase { type: key[4], })).asArray; }, + getAggregatesSync, getAggregates: async ({ key: { symbol, expirationDate, strike, type }, date, - }) => { - const startOfDayUnix = new Date(`${date}T00:00:00Z`).valueOf(); - const endOfDayUnix = startOfDayUnix + 3600 * 24 * 1000; - return optionContractAggregatesDb - .getRange({ - start: [symbol, expirationDate, strike, type, startOfDayUnix], - end: [symbol, expirationDate, strike, type, endOfDayUnix], - }) - .map(({ value }) => ({ - tsStart: value.tsStart, - open: value.open, - close: value.close, - high: value.high, - low: value.low, - })).asArray; - }, + }) => + getAggregatesSync({ + key: { symbol, expirationDate, strike, type }, + date, + }), insertAggregates: async (aggregates) => { await optionContractExistenceDb.batch(() => { for (const aggregate of aggregates) { diff --git a/server/src/scripts/clickhouse-to-lmdbx.ts b/server/src/scripts/clickhouse-to-lmdbx.ts index 180d550..6d83243 100644 --- a/server/src/scripts/clickhouse-to-lmdbx.ts +++ b/server/src/scripts/clickhouse-to-lmdbx.ts @@ -1,10 +1,12 @@ import type { AggregateDatabase } from "../interfaces.js"; -// import { stockDatabase as stockDatabaseClickhouse } from "../stockdb.clickhouse.js"; -// import { stockDatabase as stockDatabaseLmdbx } from "../stockdb.lmdbx.js"; -import { optionContractDatabase as optionContractDatabaseClickhouse } from "../optiondb.clickhouse.js"; -import { optionContractDatabase as optionContractDatabaseLmdbx } from "../optiondb.lmdbx.js"; +import { stockDatabase as stockDatabaseClickhouse } from "../stockdb/clickhouse.js"; +import { stockDatabase as stockDatabaseLmdbx } from "../stockdb/lmdbx.js"; +// import { optionContractDatabase as optionContractDatabaseClickhouse } from "../optiondb.clickhouse.js"; +// import { optionContractDatabase as optionContractDatabaseLmdbx } from "../optiondb.lmdbx.js"; import { nextDate } from "../lib/utils/nextDate.js"; import { retry, retryOnTimeout } from "../lib/utils/retry.js"; +import type { OptionContractKey } from "../optiondb/interfaces.js"; +import type { StockKey } from "../stockdb/interfaces.js"; async function syncAggregates({ fromDatabase, @@ -24,7 +26,10 @@ async function syncAggregates({ } const symbols = ["AMD", "AAPL", "MSFT", "GOOGL", "NFLX", "NVDA"]; -async function run() { +async function run({ + fromDatabase, + toDatabase, +}: { fromDatabase: AggregateDatabase; toDatabase: AggregateDatabase }) { const startDate = process.argv[2]; const endDate = process.argv[3]; @@ -39,8 +44,8 @@ async function run() { console.log(date, symbol); const keys = await retry( () => - optionContractDatabaseClickhouse.getKeys({ - key: { symbol }, + fromDatabase.getKeys({ + key: { symbol } as T, date, }), { shouldRetry: retryOnTimeout }, @@ -51,8 +56,8 @@ async function run() { await retry( () => syncAggregates({ - fromDatabase: optionContractDatabaseClickhouse, - toDatabase: optionContractDatabaseLmdbx, + fromDatabase, + toDatabase, key, date, }), @@ -63,4 +68,7 @@ async function run() { } } -await run(); +await run({ + fromDatabase: stockDatabaseClickhouse, + toDatabase: stockDatabaseLmdbx, +}); diff --git a/server/src/stockdb.clickhouse.ts b/server/src/stockdb.clickhouse.ts deleted file mode 100644 index 0277109..0000000 --- a/server/src/stockdb.clickhouse.ts +++ /dev/null @@ -1,59 +0,0 @@ -import type { StockDatabase, StockKey } from "./stockdb.interfaces.js"; -import type { Aggregate } from "./interfaces.js"; -import { clickhouse, query } from "./lib/clickhouse.js"; - -function makeStockDatabase(): StockDatabase { - const stockDatabase: Omit = { - getKeys: async ({ date }) => { - return ( - await query(` - SELECT DISTINCT symbol FROM stock_aggregates WHERE toDate(tsStart) = '${date}' - `) - ).map(({ symbol }) => symbol); - }, - getAggregates: async ({ key: symbol, date }) => { - return ( - await query, "key">>(` - SELECT - toUnixTimestamp(tsStart) as tsStart, - open, - close, - high, - low - FROM stock_aggregates - WHERE symbol = '${symbol}' - AND toDate(tsStart) = '${date}' - ORDER BY tsStart ASC - `) - ).map((aggregate) => ({ - ...aggregate, - tsStart: aggregate.tsStart * 1000, // unfortunately, `toUnixTimestamp` only returns second-precision - })); - }, - insertAggregates: async (aggregates) => { - // stock existence is taken care of by clickhouse materialized view - await clickhouse.insert({ - table: "stock_aggregates", - values: aggregates.map(({ key, tsStart, open, close, high, low }) => ({ - symbol: key, - tsStart, - open, - close, - high, - low, - })), - }); - }, - getClosingPrice: async ({ key }) => { - // no-op: not used since stocks don't have a "closing" price, unlike options. - return 0; - }, - }; - - return { - ...stockDatabase, - getSymbols: stockDatabase.getKeys, - }; -} - -export const stockDatabase: StockDatabase = makeStockDatabase(); diff --git a/server/src/stockdb.interfaces.ts b/server/src/stockdb.interfaces.ts deleted file mode 100644 index bf71707..0000000 --- a/server/src/stockdb.interfaces.ts +++ /dev/null @@ -1,7 +0,0 @@ -import type { AggregateDatabase } from "./interfaces.js"; - -export type StockKey = string; - -export type StockDatabase = AggregateDatabase & { - getSymbols: AggregateDatabase["getKeys"]; -}; diff --git a/server/src/stockdb.lmdbx.ts b/server/src/stockdb.lmdbx.ts deleted file mode 100644 index 12d98b0..0000000 --- a/server/src/stockdb.lmdbx.ts +++ /dev/null @@ -1,80 +0,0 @@ -import type { StockDatabase } from "./stockdb.interfaces.js"; -import { open } from "lmdbx"; - -const stockAggregatesDb = open({ - path: "/tmp/stock-aggregates.db", - // any options go here, we can turn on compression like this: - compression: true, -}); - -const stockExistenceDb = open({ - path: "/tmp/stock-existence.db", - // any options go here, we can turn on compression like this: - compression: true, -}); - -/** Largest possible key according to the `ordered-binary` (used by lmdbx) docs. */ -const MAXIMUM_KEY = Buffer.from([0xff]); - -function makeStockDatabase(): StockDatabase { - const stockDatabase: Omit = { - getKeys: async ({ date }) => { - return stockExistenceDb - .getRange({ - start: [date], - end: [date, MAXIMUM_KEY], - }) - .map(({ key }) => key[1]).asArray; - }, - getAggregates: async ({ key: symbol, date }) => { - const startOfDayUnix = new Date(`${date}T00:00:00Z`).valueOf(); - const endOfDayUnix = startOfDayUnix + 3600 * 24 * 1000; - return stockAggregatesDb - .getRange({ - start: [symbol, startOfDayUnix], - end: [symbol, endOfDayUnix], - }) - .map(({ key, value }) => ({ - tsStart: key[1], - open: value.open, - close: value.close, - high: value.high, - low: value.low, - })).asArray; - }, - insertAggregates: async (aggregates) => { - await stockExistenceDb.batch(() => { - for (const aggregate of aggregates) { - stockExistenceDb.put( - [ - new Date(aggregate.tsStart).toISOString().substring(0, 10), - aggregate.key, - ], - null, - ); - } - }); - await stockAggregatesDb.batch(() => { - for (const aggregate of aggregates) { - stockAggregatesDb.put([aggregate.key, aggregate.tsStart], { - open: aggregate.open, - close: aggregate.close, - high: aggregate.high, - low: aggregate.low, - }); - } - }); - }, - getClosingPrice: async ({ key }) => { - // no-op: not used since stocks don't have a "closing" price, unlike options. - return 0; - }, - }; - - return { - ...stockDatabase, - getSymbols: stockDatabase.getKeys, - }; -} - -export const stockDatabase: StockDatabase = makeStockDatabase(); diff --git a/server/src/stockdb/clickhouse.ts b/server/src/stockdb/clickhouse.ts new file mode 100644 index 0000000..121c708 --- /dev/null +++ b/server/src/stockdb/clickhouse.ts @@ -0,0 +1,62 @@ +import type { StockDatabase, StockKey } from "./interfaces.js"; +import type { Aggregate } from "../interfaces.js"; +import { clickhouse, query } from "../lib/clickhouse.js"; + +function makeStockDatabase(): StockDatabase { + const stockDatabase: Omit = { + getKeys: async ({ date, key }) => { + if (key?.symbol) { + return [key as StockKey]; + } + return await query(` + SELECT DISTINCT symbol FROM stock_aggregates WHERE toDate(tsStart) = '${date}' + `); + }, + getAggregates: async ({ key: { symbol }, date }) => { + return ( + await query, "key">>(` + SELECT + toUnixTimestamp(tsStart) as tsStart, + open, + close, + high, + low + FROM stock_aggregates + WHERE symbol = '${symbol}' + AND toDate(tsStart) = '${date}' + ORDER BY tsStart ASC + `) + ).map((aggregate) => ({ + ...aggregate, + tsStart: aggregate.tsStart * 1000, // unfortunately, `toUnixTimestamp` only returns second-precision + })); + }, + insertAggregates: async (aggregates) => { + // stock existence is taken care of by clickhouse materialized view + await clickhouse.insert({ + table: "stock_aggregates", + values: aggregates.map( + ({ key: { symbol }, tsStart, open, close, high, low }) => ({ + symbol, + tsStart, + open, + close, + high, + low, + }), + ), + }); + }, + getClosingPrice: async ({ key }) => { + // no-op: not used since stocks don't have a "closing" price, unlike options. + return 0; + }, + }; + + return { + ...stockDatabase, + getSymbols: stockDatabase.getKeys, + }; +} + +export const stockDatabase: StockDatabase = makeStockDatabase(); diff --git a/server/src/stockdb/interfaces.ts b/server/src/stockdb/interfaces.ts new file mode 100644 index 0000000..7cf371e --- /dev/null +++ b/server/src/stockdb/interfaces.ts @@ -0,0 +1,7 @@ +import type { AggregateDatabase } from "../interfaces.js"; + +export type StockKey = { symbol: string }; + +export type StockDatabase = AggregateDatabase & { + getSymbols: AggregateDatabase["getKeys"]; +}; diff --git a/server/src/stockdb/lmdbx.ts b/server/src/stockdb/lmdbx.ts new file mode 100644 index 0000000..61cfe30 --- /dev/null +++ b/server/src/stockdb/lmdbx.ts @@ -0,0 +1,83 @@ +import type { StockDatabase, StockKey } from "./interfaces.js"; +import { open } from "lmdbx"; + +const stockAggregatesDb = open({ + path: "./stock-aggregates.db", + // any options go here, we can turn on compression like this: + compression: true, +}); + +const stockExistenceDb = open({ + path: "./stock-existence.db", + // any options go here, we can turn on compression like this: + compression: true, +}); + +/** Largest possible key according to the `ordered-binary` (used by lmdbx) docs. */ +const MAXIMUM_KEY = Buffer.from([0xff]); + +function makeStockDatabase(): StockDatabase { + const stockDatabase: Omit = { + getKeys: async ({ date, key }) => { + if (key?.symbol) { + return [key as StockKey]; + } + return stockExistenceDb + .getRange({ + start: [date], + end: [date, MAXIMUM_KEY], + }) + .map(({ key }) => ({ symbol: key[1] })).asArray; + }, + getAggregates: async ({ key: { symbol }, date }) => { + const startOfDayUnix = new Date(`${date}T00:00:00Z`).valueOf(); + const endOfDayUnix = startOfDayUnix + 3600 * 24 * 1000; + return stockAggregatesDb + .getRange({ + start: [symbol, startOfDayUnix], + end: [symbol, endOfDayUnix], + }) + .map(({ key, value }) => ({ + tsStart: key[1], + open: value.open, + close: value.close, + high: value.high, + low: value.low, + })).asArray; + }, + insertAggregates: async (aggregates) => { + await stockExistenceDb.batch(() => { + for (const aggregate of aggregates) { + stockExistenceDb.put( + [ + new Date(aggregate.tsStart).toISOString().substring(0, 10), + aggregate.key.symbol, + ], + null, + ); + } + }); + await stockAggregatesDb.batch(() => { + for (const aggregate of aggregates) { + stockAggregatesDb.put([aggregate.key.symbol, aggregate.tsStart], { + open: aggregate.open, + close: aggregate.close, + high: aggregate.high, + low: aggregate.low, + }); + } + }); + }, + getClosingPrice: async ({ key }) => { + // no-op: not used since stocks don't have a "closing" price, unlike options. + return 0; + }, + }; + + return { + ...stockDatabase, + getSymbols: stockDatabase.getKeys, + }; +} + +export const stockDatabase: StockDatabase = makeStockDatabase();