use sqlite for sync state; use sync.ts lib instead of scripts

This commit is contained in:
Avraham Sakal
2024-06-30 17:31:10 -04:00
parent ad66397639
commit e0a2bc395e
10 changed files with 2130 additions and 609 deletions
+28
View File
@@ -0,0 +1,28 @@
import _ from "../env.js";
import { createClient as createClickhouseClient } from "@clickhouse/client";
import type { DataFormat } from "@clickhouse/client";
// prevent from tree-shaking:
console.log(_);
export const clickhouse = createClickhouseClient({
host: process.env.CLICKHOUSE_HOST || "http://localhost:8123",
username: "avraham",
password: "buginoo",
});
export async function query<T>(
queryString: string,
format: DataFormat = "JSONEachRow"
): Promise<Array<T>> {
return await (
await clickhouse.query({
query: queryString,
format,
clickhouse_settings: {
output_format_json_quote_64bit_integers: 0,
//output_format_json_quote_64bit_floats: false,
//output_format_json_quote_64bit_decimals: false,
},
})
).json();
}
+138
View File
@@ -0,0 +1,138 @@
// import pThrottle from "p-throttle";
import pRetry from "p-retry";
const apiKey = "H95NTsatM1iTWLUwDLxM2J5zhUVYdCEz";
// export const getApiKey = pThrottle({ limit: 5, interval: 60000 })(() => apiKey);
export const getApiKey = () => apiKey;
export const optionContractToTicker = ({
symbol,
expirationDate,
strike,
type,
}: {
symbol: string;
expirationDate: string;
strike: number;
type: "call" | "put";
}) =>
`O:${symbol}${expirationDate.substring(2, 4)}${expirationDate.substring(
5,
7
)}${expirationDate.substring(8, 10)}${
type === "call" ? "C" : "P"
}${Math.floor(strike * 1000)
.toString()
.padStart(8, "0")}`;
type PolygonOptionContractsResponse = {
next_url?: string;
results: Array<{
ticker: string;
expiration_date: string;
strike_price: number;
contract_type: "call" | "put";
}>;
};
export async function* makeGetOptionContractsIterator(
symbol: string,
date: string
) {
let latestBatchResponse = (await (
await fetch(
`https://api.polygon.io/v3/reference/options/contracts?underlying_ticker=${symbol}&as_of=${date}&sort=ticker&limit=1000&apiKey=${await getApiKey()}`
)
).json()) as PolygonOptionContractsResponse;
yield latestBatchResponse.results.map((result) => ({
asOfDate: date,
symbol,
expirationDate: result.expiration_date,
strike: result.strike_price,
type: result.contract_type,
}));
// as long as there's a `next_url`, call that:
while (latestBatchResponse.hasOwnProperty("next_url")) {
latestBatchResponse = (await (
await fetch(`${latestBatchResponse.next_url}&apiKey=${await getApiKey()}`)
).json()) as PolygonOptionContractsResponse;
yield latestBatchResponse.results?.map((result) => ({
asOfDate: date,
symbol,
expirationDate: result.expiration_date,
strike: result.strike_price,
type: result.contract_type,
})) || [];
}
}
type PolygonOptionContractAggregatesResponse = {
next_url?: string;
status: string;
resultsCount: number;
results: Array<{
c: number;
h: number;
n: number;
l: number;
o: number;
t: number;
v: number;
vw: number;
}>;
};
export type OptionContract = {
symbol: string;
expirationDate: string;
strike: number;
type: "call" | "put";
};
export async function* makeGetOptionContractAggregatesIterator({
symbol,
expirationDate,
strike,
type,
firstDate,
}: OptionContract & { firstDate: string }) {
const optionContractTicker = optionContractToTicker({
symbol: symbol,
expirationDate,
strike,
type,
});
const expirationDateAsDateObject = new Date(expirationDate);
const currentDateAsDateObject = new Date(firstDate);
while (currentDateAsDateObject <= expirationDateAsDateObject) {
const asOfDate = currentDateAsDateObject.toISOString().substring(0, 10);
let latestBatchResponse = await pRetry(
async () =>
(await (
await fetch(
`https://api.polygon.io/v2/aggs/ticker/${optionContractTicker}/range/1/minute/${asOfDate}/${asOfDate}?adjusted=false&sort=asc&limit=50000&apiKey=${await getApiKey()}`
)
).json()) as PolygonOptionContractAggregatesResponse,
{ retries: 5, factor: 2, minTimeout: 1000, maxTimeout: 60 * 1000 }
);
if (latestBatchResponse.status.toLowerCase() !== "ok") {
console.error(latestBatchResponse);
throw new Error(
`error fetching option contract aggregate ${optionContractTicker}`
);
}
yield latestBatchResponse.results?.map((result) => ({
symbol,
expirationDate,
strike,
type,
tsStart: (result.t || 0) / 1000,
open: result.o,
close: result.c,
low: result.l,
high: result.h,
})) || [];
currentDateAsDateObject.setUTCDate(
currentDateAsDateObject.getUTCDate() + 1
);
}
}
+243
View File
@@ -0,0 +1,243 @@
import * as Polygon from "./polygon.js";
import sqlite3 from "sqlite3";
import { open } from "sqlite";
import { clickhouse, query } from "./clickhouse.js";
import { OptionContract } from "./polygon.js";
const sqliteDb = await open({
filename: "/tmp/sync-state.db",
driver: sqlite3.Database,
});
await sqliteDb.exec(`
CREATE TABLE IF NOT EXISTS option_contract_sync_states (
symbol TEXT,
date TEXT,
status TINYINT,
UNIQUE (symbol, date) ON CONFLICT REPLACE
)
`);
await sqliteDb.exec(`
CREATE TABLE IF NOT EXISTS option_contract_aggregates_sync_states (
ticker TEXT,
status TINYINT,
UNIQUE (ticker) ON CONFLICT REPLACE
)
`);
export async function getPullOptionContractsState(
symbol: string,
date: string
) {
const state = await sqliteDb.get(
`
SELECT
*
FROM option_contract_sync_states
WHERE symbol = :symbol
AND date = :date
`,
{
":symbol": symbol,
":date": date,
}
);
return state;
}
const enum OptionContractSyncStatus {
STARTED = 0,
COMPLETED = 1,
}
type OptionContractSyncState = {
status: OptionContractSyncStatus;
};
export async function setPullOptionContractsState(
symbol: string,
date: string,
state: OptionContractSyncState
) {
await sqliteDb.run(
`
INSERT INTO option_contract_sync_states (symbol, date, status) VALUES (:symbol, :date, :status)
`,
{
":symbol": symbol,
":date": date,
":status": state.status,
}
);
}
export async function getPullOptionContractAggregatesState(ticker: string) {
const state = await sqliteDb.get(
`
SELECT
*
FROM option_contract_aggregates_sync_states
WHERE ticker = :ticker
`,
{
":ticker": ticker,
}
);
return state;
}
const enum OptionContractAggregatesSyncStatus {
STARTED = 0,
COMPLETED = 1,
}
type OptionContractAggregatesSyncState = {
status: OptionContractAggregatesSyncStatus;
};
export async function setPullOptionContractAggregatesState(
ticker: string,
state: OptionContractAggregatesSyncState
) {
await sqliteDb.run(
`
INSERT INTO option_contract_aggregates_sync_states (ticker, status) VALUES (:ticker, :status)
`,
{
":ticker": ticker,
":status": state.status,
}
);
}
export async function pullOptionContracts(symbol: string, date: string) {
// check if sync was completed:
if (
(await getPullOptionContractsState(symbol, date))?.status !==
OptionContractSyncStatus.COMPLETED
) {
await setPullOptionContractsState(symbol, date, {
status: OptionContractSyncStatus.STARTED,
});
for await (const batch of Polygon.makeGetOptionContractsIterator(
symbol,
date
)) {
console.log(batch.length);
await clickhouse.insert({
table: "option_contract_existences",
values: batch,
format: "JSONEachRow",
});
}
await setPullOptionContractsState(symbol, date, {
status: OptionContractSyncStatus.COMPLETED,
});
}
}
export async function pullOptionContractAggregates(
optionContract: OptionContract
) {
const ticker = Polygon.optionContractToTicker(optionContract);
// check if sync was completed:
if (
(await getPullOptionContractAggregatesState(ticker))?.status !==
OptionContractAggregatesSyncStatus.COMPLETED
) {
await setPullOptionContractAggregatesState(ticker, {
status: OptionContractAggregatesSyncStatus.STARTED,
});
const { firstDate } = await getOptionContractDateRange(optionContract);
for await (const batch of Polygon.makeGetOptionContractAggregatesIterator({
...optionContract,
firstDate,
})) {
console.log(batch.length);
await clickhouse.insert({
table: "option_contract_aggregates",
values: batch,
format: "JSONEachRow",
});
}
await setPullOptionContractAggregatesState(ticker, {
status: OptionContractAggregatesSyncStatus.COMPLETED,
});
}
}
export async function* makeGetOptionContractsIterator(
symbol: string,
asOfDate: string
) {
const limit = 2000;
let offset = 0;
let batch;
do {
batch = await query(`
SELECT
*
FROM option_contract_existences
WHERE asOfDate = '${asOfDate}'
AND symbol = '${symbol}'
ORDER BY expirationDate ASC, strike ASC, type ASC
LIMIT ${limit}
OFFSET ${offset}
`);
yield batch;
offset = offset + limit;
} while (batch.length === limit);
}
export async function pullOptionContractsSince(
symbol: string,
firstDate: string
) {
const currentDateAsDateObject = new Date(firstDate);
const yesterdayAsDateObject = new Date();
yesterdayAsDateObject.setUTCDate(yesterdayAsDateObject.getUTCDate() - 1);
while (currentDateAsDateObject <= yesterdayAsDateObject) {
const currentDate = currentDateAsDateObject.toISOString().substring(0, 10);
console.log(`Date: ${currentDate}:`);
await pullOptionContracts(symbol, currentDate);
currentDateAsDateObject.setUTCDate(
currentDateAsDateObject.getUTCDate() + 1
);
}
}
export async function pullOptionContractAggregatesSince(
symbol: string,
firstDate: string
) {
const currentDateAsDateObject = new Date(firstDate);
const yesterdayAsDateObject = new Date();
yesterdayAsDateObject.setUTCDate(yesterdayAsDateObject.getUTCDate() - 1);
while (currentDateAsDateObject <= yesterdayAsDateObject) {
const currentDate = currentDateAsDateObject.toISOString().substring(0, 10);
for await (const optionContracts of makeGetOptionContractsIterator(
symbol,
currentDate
)) {
await pullOptionContractAggregates(optionContracts);
}
console.log(`Date: ${currentDate}`);
currentDateAsDateObject.setUTCDate(
currentDateAsDateObject.getUTCDate() + 1
);
}
}
export async function getOptionContractDateRange({
symbol,
expirationDate,
strike,
type,
}: OptionContract) {
const rows = await query<{ firstDate: string; lastDate: string }>(`
SELECT
min(asOfDate) AS firstDate,
max(asOfDate) AS lastDate
FROM option_contract_existences
WHERE symbol = '${symbol}'
AND expirationDate = '${expirationDate}'
AND strike = ${strike}
AND type = '${type}'
`);
return rows[0] || { firstDate: null, lastDate: null };
}