import { clickhouse, query } from "../clickhouse.js"; import { getApiKey } from "./polygon.js"; import pAll from "p-all"; import pQueue from "p-queue"; import pSeries from "p-series"; import pRetry from "p-retry"; const optionContractToTicker = ({ symbol, expirationDate, strike, type, }: { symbol: string; expirationDate: string; strike: number; type: "call" | "put"; }) => `O:${symbol}${expirationDate.substring(2, 4)}${expirationDate.substring( 5, 7 )}${expirationDate.substring(8, 10)}${ type === "call" ? "C" : "P" }${Math.floor(strike * 1000) .toString() .padStart(8, "0")}`; type PolygonResponse = { next_url?: string; results: Array<{ c: number; h: number; n: number; l: number; o: number; t: number; v: number; vw: number; }>; }; async function getOptionAggregates( asOfDate: string, underlyingSymbol: string, expirationDate: string, strike: number, type: "call" | "put" ) { const optionContractTicker = optionContractToTicker({ symbol: underlyingSymbol, expirationDate, strike, type, }); // first mark the sync of this particular option contract as "pending": await pRetry( () => clickhouse.insert({ table: "amg_option_aggregate_sync_statuses", values: [ { asOfDate, symbol: underlyingSymbol, expirationDate, strike, type, status: "pending", }, ], format: "JSONEachRow", }), { retries: 5, factor: 2, minTimeout: 1000, maxTimeout: 60 * 1000 } ); let latestBatchResponse = await pRetry( async () => (await ( await fetch( `https://api.polygon.io/v2/aggs/ticker/${optionContractTicker}/range/1/minute/${asOfDate}/${asOfDate}?adjusted=false&sort=asc&limit=50000&apiKey=${await getApiKey()}` ) ).json()) as PolygonResponse, { retries: 5, factor: 2, minTimeout: 1000, maxTimeout: 60 * 1000 } ); if (!latestBatchResponse.results) { console.log(latestBatchResponse); return; } let latestBatch = latestBatchResponse.results.map((result) => ({ symbol: underlyingSymbol, expirationDate, strike, type, tsStart: (result.t || 0) / 1000, open: result.o, close: result.c, low: result.l, high: result.h, })); await pRetry( () => clickhouse.insert({ table: "option_aggregates", values: latestBatch, format: "JSONEachRow", }), { retries: 5, factor: 2, minTimeout: 1000, maxTimeout: 60 * 1000 } ); await pRetry( () => clickhouse.insert({ table: "amg_option_aggregate_sync_statuses", values: [ { asOfDate, symbol: underlyingSymbol, expirationDate, strike, type, status: "done", }, ], format: "JSONEachRow", }), { retries: 5, factor: 2, minTimeout: 1000, maxTimeout: 60 * 1000 } ); } type OptionContract = { symbol: string; expirationDate: string; strike: number; type: "call" | "put"; }; type OptionContractDay = OptionContract & { asOfDate: string }; async function getNextBatchOfUnstartedOptionAggregates( previousUnstartedOptionContract: OptionContractDay | undefined, limit: number ): Promise> { if (typeof previousUnstartedOptionContract === "undefined") { return; } const optionContractsWithoutAggregates = await pRetry( () => query(` SELECT asOfDate, symbol, expirationDate, strike, type FROM amg_option_aggregate_sync_statuses WHERE ( ( asOfDate = '${previousUnstartedOptionContract.asOfDate}' AND symbol = '${previousUnstartedOptionContract.symbol}' AND expirationDate = '${previousUnstartedOptionContract.expirationDate}' AND strike = ${previousUnstartedOptionContract.strike} AND type > '${previousUnstartedOptionContract.type}' ) OR ( asOfDate = '${previousUnstartedOptionContract.asOfDate}' AND symbol = '${previousUnstartedOptionContract.symbol}' AND expirationDate = '${previousUnstartedOptionContract.expirationDate}' AND strike > ${previousUnstartedOptionContract.strike} ) OR ( asOfDate = '${previousUnstartedOptionContract.asOfDate}' AND symbol = '${previousUnstartedOptionContract.symbol}' AND expirationDate > '${previousUnstartedOptionContract.expirationDate}' ) OR ( asOfDate = '${previousUnstartedOptionContract.asOfDate}' AND symbol > '${previousUnstartedOptionContract.symbol}' ) OR ( asOfDate > '${previousUnstartedOptionContract.asOfDate}' ) ) AND status = 'not-started' ORDER BY asOfDate, symbol, expirationDate, strike, type LIMIT ${limit} `), { retries: 5, factor: 2, minTimeout: 1000, maxTimeout: 60 * 1000 } ); return optionContractsWithoutAggregates; } /** * First, since this is startup time, obviously anything `pending` is not * really running. So, for each `pending` option contract (i.e. unique * combinations of `(symbol, expirationDate, strike, type)` in the * `option_contracts` table), delete its status, and all quotes synced so far, * so as to start afresh. */ async function revertPendingSyncs() { const pendingOptionContracts = await query<{ asOfDate: string; symbol: string; expirationDate: string; strike: number; type: "call" | "put"; latestStatus: "not-started" | "pending" | "done"; }>(` SELECT asOfDate, symbol, expirationDate, strike, type FROM amg_option_aggregate_sync_statuses WHERE status = 'pending' ORDER BY asOfDate, symbol, expirationDate, strike, type `); console.log( "Pending operations:", pendingOptionContracts.map( ({ asOfDate, symbol, expirationDate, strike, type }) => `${symbol} ${expirationDate} ${strike} ${type} @ ${asOfDate}` ) ); await pAll( pendingOptionContracts.map( ({ asOfDate, symbol, expirationDate, strike, type }) => () => pSeries([ // Delete option_contracts first, in case this `pAll` operation fails and we need to restart; so `option_contract_sync_statuses` "pending" rows are still there for the restart () => clickhouse .command({ query: ` DELETE FROM option_aggregates WHERE symbol = '${symbol}' AND expirationDate = '${expirationDate}' AND strike = ${strike} AND type = '${type}' AND toDate(tsStart) = '${asOfDate}' `, }) .then(() => { console.log(`Deleted aggregates for `); }), () => clickhouse .insert({ table: "amg_option_aggregate_sync_statuses", values: [ { asOfDate, symbol, expirationDate, strike, type, status: "not-started", }, ], format: "JSONEachRow", }) .then(() => { console.log(); }), ]) ), { concurrency: 1 } ); } // First, revert 'pending' syncs: //await revertPendingSyncs(); /** Second, for each option contract, get all of its quotes. * * This queries Polygon with a concurrency of 6. */ const q = new pQueue({ concurrency: 6 }); /** Initialized with the lowest possible option contract. * It's passed into `getNextUnstartedSymbolAndAsOfDate()`. */ let nextBatchOfUnstartedOptionContracts: Array = [ { asOfDate: "2022-03-18", symbol: "A", expirationDate: "2022-02-01", strike: 0, type: "call", }, ]; while ( (nextBatchOfUnstartedOptionContracts = await getNextBatchOfUnstartedOptionAggregates( nextBatchOfUnstartedOptionContracts.pop(), 200 )) !== null ) { await pAll( nextBatchOfUnstartedOptionContracts.map( (unstartedOptionContract) => () => q.add(async () => { console.log( `Getting aggregates for ${unstartedOptionContract.symbol} ${unstartedOptionContract.expirationDate} ${unstartedOptionContract.strike} ${unstartedOptionContract.type} @ ${unstartedOptionContract.asOfDate}` ); await getOptionAggregates( unstartedOptionContract.asOfDate, unstartedOptionContract.symbol, unstartedOptionContract.expirationDate, unstartedOptionContract.strike, unstartedOptionContract.type ); }) ) ); // don't loop again until the queue has less than 2 items; we don't want it to grow in memory without bound: console.log("Waiting till less than 2 in queue"); await q.onSizeLessThan(2); } // wait until pending queue operations are done: await q.onSizeLessThan(1); /*** TODOs ***/ /* + Gracefully recover from errors in individual operations. */