refactor; backtest using lmdbx
parent
1d83cd419a
commit
93d5ac8a30
@ -0,0 +1,2 @@
|
||||
- Ingest stock/underlying aggregates from flatfiles
|
||||
- Create backtesting function to step through each minute of every day.
|
@ -1,6 +1,6 @@
|
||||
import type { CalendarDatabase, CalendarKey } from "./calendardb.interfaces.js";
|
||||
import type { Aggregate } from "./interfaces.js";
|
||||
import { query } from "./lib/clickhouse.js";
|
||||
import type { CalendarDatabase, CalendarKey } from "./interfaces.js";
|
||||
import type { Aggregate } from "../interfaces.js";
|
||||
import { query } from "../lib/clickhouse.js";
|
||||
|
||||
function makeCalendarDatabase(): CalendarDatabase {
|
||||
const calendarDatabase: Omit<CalendarDatabase, "getCalendars"> = {
|
@ -1,4 +1,4 @@
|
||||
import type { AggregateDatabase } from "./interfaces.js";
|
||||
import type { AggregateDatabase } from "../interfaces.js";
|
||||
|
||||
export type CalendarKey = {
|
||||
symbol: string;
|
@ -1,4 +1,4 @@
|
||||
import type { CalendarDatabase } from "./calendardb.interfaces.js";
|
||||
import type { CalendarDatabase } from "./interfaces.js";
|
||||
import { open } from "lmdbx";
|
||||
|
||||
const calendarAggregatesDb = open({
|
@ -1,25 +1,29 @@
|
||||
export type Candlestick = {
|
||||
open: number;
|
||||
close: number;
|
||||
high: number;
|
||||
low: number;
|
||||
open: number;
|
||||
close: number;
|
||||
high: number;
|
||||
low: number;
|
||||
};
|
||||
|
||||
export type Aggregate<T> = {
|
||||
key: T;
|
||||
/** UNIX time in milliseconds */
|
||||
tsStart: number;
|
||||
key: T;
|
||||
/** UNIX time in milliseconds */
|
||||
tsStart: number;
|
||||
} & Candlestick;
|
||||
|
||||
export type AggregateDatabase<T> = {
|
||||
getKeys: ({
|
||||
key,
|
||||
date,
|
||||
}: { key?: T | Partial<T>; date?: string }) => Promise<Array<T>>;
|
||||
getAggregates: ({
|
||||
key,
|
||||
date,
|
||||
}: { key: T; date: string }) => Promise<Array<Omit<Aggregate<T>, "key">>>;
|
||||
insertAggregates: (aggregates: Array<Aggregate<T>>) => Promise<void>;
|
||||
getClosingPrice: ({ key }: { key: T }) => Promise<number>;
|
||||
getKeys: ({
|
||||
key,
|
||||
date,
|
||||
}: { key?: T | Partial<T>; date?: string }) => Promise<Array<T>>;
|
||||
getAggregates: ({
|
||||
key,
|
||||
date,
|
||||
}: { key: T; date: string }) => Promise<Array<Omit<Aggregate<T>, "key">>>;
|
||||
getAggregatesSync?: ({
|
||||
key,
|
||||
date,
|
||||
}: { key: T; date: string }) => Array<Omit<Aggregate<T>, "key">>;
|
||||
insertAggregates: (aggregates: Array<Aggregate<T>>) => Promise<void>;
|
||||
getClosingPrice: ({ key }: { key: T }) => Promise<number>;
|
||||
};
|
||||
|
@ -0,0 +1,61 @@
|
||||
type RetryDecision = {
|
||||
shouldRetry: boolean;
|
||||
maxRetries?: number;
|
||||
delay?: number;
|
||||
};
|
||||
|
||||
type RetryOptions = {
|
||||
maxRetries?: number;
|
||||
delay?: number;
|
||||
shouldRetry?: (error: unknown) => RetryDecision;
|
||||
};
|
||||
|
||||
export async function retry<T>(
|
||||
fn: () => Promise<T>,
|
||||
options: RetryOptions = {},
|
||||
): Promise<T> {
|
||||
const {
|
||||
maxRetries: defaultMaxRetries = 3,
|
||||
delay: defaultDelay = 1000,
|
||||
shouldRetry = retryOnAnyError,
|
||||
} = options;
|
||||
|
||||
let attempt = 1;
|
||||
while (true) {
|
||||
try {
|
||||
return await fn();
|
||||
} catch (error) {
|
||||
const decision = shouldRetry(error);
|
||||
if (!decision.shouldRetry) throw error;
|
||||
|
||||
const currentMaxRetries = decision.maxRetries ?? defaultMaxRetries;
|
||||
const currentDelay = decision.delay ?? defaultDelay;
|
||||
|
||||
if (attempt >= currentMaxRetries) throw error;
|
||||
|
||||
console.warn(
|
||||
`Error occurred, retrying (attempt ${attempt}/${currentMaxRetries})...`,
|
||||
);
|
||||
console.error(error);
|
||||
await new Promise((resolve) => setTimeout(resolve, currentDelay));
|
||||
attempt++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const retryOnAnyError = (): RetryDecision => ({ shouldRetry: true });
|
||||
export const retryOnTimeout = (error: unknown): RetryDecision =>
|
||||
error instanceof Error && error.message.includes("timeout")
|
||||
? { shouldRetry: true, maxRetries: 5, delay: 2000 }
|
||||
: { shouldRetry: false };
|
||||
export const retryOnErrorType =
|
||||
(errorType: new () => Error, options?: Partial<RetryDecision>) =>
|
||||
(error: unknown) =>
|
||||
error instanceof errorType
|
||||
? { shouldRetry: true, ...options }
|
||||
: { shouldRetry: false };
|
||||
export const retryOnErrorSubstring =
|
||||
(substring: string, options?: Partial<RetryDecision>) => (error: unknown) =>
|
||||
error instanceof Error && error.message.includes(substring)
|
||||
? { shouldRetry: true, ...options }
|
||||
: { shouldRetry: false };
|
@ -1,9 +1,9 @@
|
||||
import type {
|
||||
OptionContractDatabase,
|
||||
OptionContractKey,
|
||||
} from "./optiondb.interfaces.js";
|
||||
import type { Aggregate } from "./interfaces.js";
|
||||
import { clickhouse, query } from "./lib/clickhouse.js";
|
||||
} from "./interfaces.js";
|
||||
import type { Aggregate } from "../interfaces.js";
|
||||
import { clickhouse, query } from "../lib/clickhouse.js";
|
||||
|
||||
function makeOptionContractDatabase(): OptionContractDatabase {
|
||||
const optionContractDatabase: Omit<
|
@ -1,4 +1,4 @@
|
||||
import type { AggregateDatabase } from "./interfaces.js";
|
||||
import type { AggregateDatabase } from "../interfaces.js";
|
||||
|
||||
export type OptionContractKey = {
|
||||
symbol: string;
|
@ -1,59 +0,0 @@
|
||||
import type { StockDatabase, StockKey } from "./stockdb.interfaces.js";
|
||||
import type { Aggregate } from "./interfaces.js";
|
||||
import { clickhouse, query } from "./lib/clickhouse.js";
|
||||
|
||||
function makeStockDatabase(): StockDatabase {
|
||||
const stockDatabase: Omit<StockDatabase, "getSymbols"> = {
|
||||
getKeys: async ({ date }) => {
|
||||
return (
|
||||
await query(`
|
||||
SELECT DISTINCT symbol FROM stock_aggregates WHERE toDate(tsStart) = '${date}'
|
||||
`)
|
||||
).map(({ symbol }) => symbol);
|
||||
},
|
||||
getAggregates: async ({ key: symbol, date }) => {
|
||||
return (
|
||||
await query<Omit<Aggregate<StockKey>, "key">>(`
|
||||
SELECT
|
||||
toUnixTimestamp(tsStart) as tsStart,
|
||||
open,
|
||||
close,
|
||||
high,
|
||||
low
|
||||
FROM stock_aggregates
|
||||
WHERE symbol = '${symbol}'
|
||||
AND toDate(tsStart) = '${date}'
|
||||
ORDER BY tsStart ASC
|
||||
`)
|
||||
).map((aggregate) => ({
|
||||
...aggregate,
|
||||
tsStart: aggregate.tsStart * 1000, // unfortunately, `toUnixTimestamp` only returns second-precision
|
||||
}));
|
||||
},
|
||||
insertAggregates: async (aggregates) => {
|
||||
// stock existence is taken care of by clickhouse materialized view
|
||||
await clickhouse.insert({
|
||||
table: "stock_aggregates",
|
||||
values: aggregates.map(({ key, tsStart, open, close, high, low }) => ({
|
||||
symbol: key,
|
||||
tsStart,
|
||||
open,
|
||||
close,
|
||||
high,
|
||||
low,
|
||||
})),
|
||||
});
|
||||
},
|
||||
getClosingPrice: async ({ key }) => {
|
||||
// no-op: not used since stocks don't have a "closing" price, unlike options.
|
||||
return 0;
|
||||
},
|
||||
};
|
||||
|
||||
return {
|
||||
...stockDatabase,
|
||||
getSymbols: stockDatabase.getKeys,
|
||||
};
|
||||
}
|
||||
|
||||
export const stockDatabase: StockDatabase = makeStockDatabase();
|
@ -1,7 +0,0 @@
|
||||
import type { AggregateDatabase } from "./interfaces.js";
|
||||
|
||||
export type StockKey = string;
|
||||
|
||||
export type StockDatabase = AggregateDatabase<StockKey> & {
|
||||
getSymbols: AggregateDatabase<StockKey>["getKeys"];
|
||||
};
|
@ -1,80 +0,0 @@
|
||||
import type { StockDatabase } from "./stockdb.interfaces.js";
|
||||
import { open } from "lmdbx";
|
||||
|
||||
const stockAggregatesDb = open({
|
||||
path: "/tmp/stock-aggregates.db",
|
||||
// any options go here, we can turn on compression like this:
|
||||
compression: true,
|
||||
});
|
||||
|
||||
const stockExistenceDb = open({
|
||||
path: "/tmp/stock-existence.db",
|
||||
// any options go here, we can turn on compression like this:
|
||||
compression: true,
|
||||
});
|
||||
|
||||
/** Largest possible key according to the `ordered-binary` (used by lmdbx) docs. */
|
||||
const MAXIMUM_KEY = Buffer.from([0xff]);
|
||||
|
||||
function makeStockDatabase(): StockDatabase {
|
||||
const stockDatabase: Omit<StockDatabase, "getSymbols"> = {
|
||||
getKeys: async ({ date }) => {
|
||||
return stockExistenceDb
|
||||
.getRange({
|
||||
start: [date],
|
||||
end: [date, MAXIMUM_KEY],
|
||||
})
|
||||
.map(({ key }) => key[1]).asArray;
|
||||
},
|
||||
getAggregates: async ({ key: symbol, date }) => {
|
||||
const startOfDayUnix = new Date(`${date}T00:00:00Z`).valueOf();
|
||||
const endOfDayUnix = startOfDayUnix + 3600 * 24 * 1000;
|
||||
return stockAggregatesDb
|
||||
.getRange({
|
||||
start: [symbol, startOfDayUnix],
|
||||
end: [symbol, endOfDayUnix],
|
||||
})
|
||||
.map(({ key, value }) => ({
|
||||
tsStart: key[1],
|
||||
open: value.open,
|
||||
close: value.close,
|
||||
high: value.high,
|
||||
low: value.low,
|
||||
})).asArray;
|
||||
},
|
||||
insertAggregates: async (aggregates) => {
|
||||
await stockExistenceDb.batch(() => {
|
||||
for (const aggregate of aggregates) {
|
||||
stockExistenceDb.put(
|
||||
[
|
||||
new Date(aggregate.tsStart).toISOString().substring(0, 10),
|
||||
aggregate.key,
|
||||
],
|
||||
null,
|
||||
);
|
||||
}
|
||||
});
|
||||
await stockAggregatesDb.batch(() => {
|
||||
for (const aggregate of aggregates) {
|
||||
stockAggregatesDb.put([aggregate.key, aggregate.tsStart], {
|
||||
open: aggregate.open,
|
||||
close: aggregate.close,
|
||||
high: aggregate.high,
|
||||
low: aggregate.low,
|
||||
});
|
||||
}
|
||||
});
|
||||
},
|
||||
getClosingPrice: async ({ key }) => {
|
||||
// no-op: not used since stocks don't have a "closing" price, unlike options.
|
||||
return 0;
|
||||
},
|
||||
};
|
||||
|
||||
return {
|
||||
...stockDatabase,
|
||||
getSymbols: stockDatabase.getKeys,
|
||||
};
|
||||
}
|
||||
|
||||
export const stockDatabase: StockDatabase = makeStockDatabase();
|
@ -0,0 +1,62 @@
|
||||
import type { StockDatabase, StockKey } from "./interfaces.js";
|
||||
import type { Aggregate } from "../interfaces.js";
|
||||
import { clickhouse, query } from "../lib/clickhouse.js";
|
||||
|
||||
function makeStockDatabase(): StockDatabase {
|
||||
const stockDatabase: Omit<StockDatabase, "getSymbols"> = {
|
||||
getKeys: async ({ date, key }) => {
|
||||
if (key?.symbol) {
|
||||
return [key as StockKey];
|
||||
}
|
||||
return await query(`
|
||||
SELECT DISTINCT symbol FROM stock_aggregates WHERE toDate(tsStart) = '${date}'
|
||||
`);
|
||||
},
|
||||
getAggregates: async ({ key: { symbol }, date }) => {
|
||||
return (
|
||||
await query<Omit<Aggregate<StockKey>, "key">>(`
|
||||
SELECT
|
||||
toUnixTimestamp(tsStart) as tsStart,
|
||||
open,
|
||||
close,
|
||||
high,
|
||||
low
|
||||
FROM stock_aggregates
|
||||
WHERE symbol = '${symbol}'
|
||||
AND toDate(tsStart) = '${date}'
|
||||
ORDER BY tsStart ASC
|
||||
`)
|
||||
).map((aggregate) => ({
|
||||
...aggregate,
|
||||
tsStart: aggregate.tsStart * 1000, // unfortunately, `toUnixTimestamp` only returns second-precision
|
||||
}));
|
||||
},
|
||||
insertAggregates: async (aggregates) => {
|
||||
// stock existence is taken care of by clickhouse materialized view
|
||||
await clickhouse.insert({
|
||||
table: "stock_aggregates",
|
||||
values: aggregates.map(
|
||||
({ key: { symbol }, tsStart, open, close, high, low }) => ({
|
||||
symbol,
|
||||
tsStart,
|
||||
open,
|
||||
close,
|
||||
high,
|
||||
low,
|
||||
}),
|
||||
),
|
||||
});
|
||||
},
|
||||
getClosingPrice: async ({ key }) => {
|
||||
// no-op: not used since stocks don't have a "closing" price, unlike options.
|
||||
return 0;
|
||||
},
|
||||
};
|
||||
|
||||
return {
|
||||
...stockDatabase,
|
||||
getSymbols: stockDatabase.getKeys,
|
||||
};
|
||||
}
|
||||
|
||||
export const stockDatabase: StockDatabase = makeStockDatabase();
|
@ -0,0 +1,7 @@
|
||||
import type { AggregateDatabase } from "../interfaces.js";
|
||||
|
||||
export type StockKey = { symbol: string };
|
||||
|
||||
export type StockDatabase = AggregateDatabase<StockKey> & {
|
||||
getSymbols: AggregateDatabase<StockKey>["getKeys"];
|
||||
};
|
@ -0,0 +1,83 @@
|
||||
import type { StockDatabase, StockKey } from "./interfaces.js";
|
||||
import { open } from "lmdbx";
|
||||
|
||||
const stockAggregatesDb = open({
|
||||
path: "./stock-aggregates.db",
|
||||
// any options go here, we can turn on compression like this:
|
||||
compression: true,
|
||||
});
|
||||
|
||||
const stockExistenceDb = open({
|
||||
path: "./stock-existence.db",
|
||||
// any options go here, we can turn on compression like this:
|
||||
compression: true,
|
||||
});
|
||||
|
||||
/** Largest possible key according to the `ordered-binary` (used by lmdbx) docs. */
|
||||
const MAXIMUM_KEY = Buffer.from([0xff]);
|
||||
|
||||
function makeStockDatabase(): StockDatabase {
|
||||
const stockDatabase: Omit<StockDatabase, "getSymbols"> = {
|
||||
getKeys: async ({ date, key }) => {
|
||||
if (key?.symbol) {
|
||||
return [key as StockKey];
|
||||
}
|
||||
return stockExistenceDb
|
||||
.getRange({
|
||||
start: [date],
|
||||
end: [date, MAXIMUM_KEY],
|
||||
})
|
||||
.map(({ key }) => ({ symbol: key[1] })).asArray;
|
||||
},
|
||||
getAggregates: async ({ key: { symbol }, date }) => {
|
||||
const startOfDayUnix = new Date(`${date}T00:00:00Z`).valueOf();
|
||||
const endOfDayUnix = startOfDayUnix + 3600 * 24 * 1000;
|
||||
return stockAggregatesDb
|
||||
.getRange({
|
||||
start: [symbol, startOfDayUnix],
|
||||
end: [symbol, endOfDayUnix],
|
||||
})
|
||||
.map(({ key, value }) => ({
|
||||
tsStart: key[1],
|
||||
open: value.open,
|
||||
close: value.close,
|
||||
high: value.high,
|
||||
low: value.low,
|
||||
})).asArray;
|
||||
},
|
||||
insertAggregates: async (aggregates) => {
|
||||
await stockExistenceDb.batch(() => {
|
||||
for (const aggregate of aggregates) {
|
||||
stockExistenceDb.put(
|
||||
[
|
||||
new Date(aggregate.tsStart).toISOString().substring(0, 10),
|
||||
aggregate.key.symbol,
|
||||
],
|
||||
null,
|
||||
);
|
||||
}
|
||||
});
|
||||
await stockAggregatesDb.batch(() => {
|
||||
for (const aggregate of aggregates) {
|
||||
stockAggregatesDb.put([aggregate.key.symbol, aggregate.tsStart], {
|
||||
open: aggregate.open,
|
||||
close: aggregate.close,
|
||||
high: aggregate.high,
|
||||
low: aggregate.low,
|
||||
});
|
||||
}
|
||||
});
|
||||
},
|
||||
getClosingPrice: async ({ key }) => {
|
||||
// no-op: not used since stocks don't have a "closing" price, unlike options.
|
||||
return 0;
|
||||
},
|
||||
};
|
||||
|
||||
return {
|
||||
...stockDatabase,
|
||||
getSymbols: stockDatabase.getKeys,
|
||||
};
|
||||
}
|
||||
|
||||
export const stockDatabase: StockDatabase = makeStockDatabase();
|
Loading…
Reference in New Issue