diff --git a/server/src/scripts/ingest-option-quotes-from-polygon-to-clickhouse.ts b/server/src/scripts/ingest-option-quotes-from-polygon-to-clickhouse.ts index 475e814..d362fcf 100644 --- a/server/src/scripts/ingest-option-quotes-from-polygon-to-clickhouse.ts +++ b/server/src/scripts/ingest-option-quotes-from-polygon-to-clickhouse.ts @@ -27,6 +27,8 @@ const optionContractToTicker = ({ type PolygonResponse = { next_url?: string; + status: string; + resultsCount: number; results: Array<{ c: number; h: number; @@ -80,31 +82,33 @@ async function getOptionAggregates( ).json()) as PolygonResponse, { retries: 5, factor: 2, minTimeout: 1000, maxTimeout: 60 * 1000 } ); - if (!latestBatchResponse.results) { + if (latestBatchResponse.status.toLowerCase() !== "ok") { console.log(latestBatchResponse); return; } - let latestBatch = latestBatchResponse.results.map((result) => ({ - symbol: underlyingSymbol, - expirationDate, - strike, - type, + if (latestBatchResponse.resultsCount > 0) { + let latestBatch = latestBatchResponse.results.map((result) => ({ + symbol: underlyingSymbol, + expirationDate, + strike, + type, - tsStart: (result.t || 0) / 1000, - open: result.o, - close: result.c, - low: result.l, - high: result.h, - })); - await pRetry( - () => - clickhouse.insert({ - table: "option_aggregates", - values: latestBatch, - format: "JSONEachRow", - }), - { retries: 5, factor: 2, minTimeout: 1000, maxTimeout: 60 * 1000 } - ); + tsStart: (result.t || 0) / 1000, + open: result.o, + close: result.c, + low: result.l, + high: result.h, + })); + await pRetry( + () => + clickhouse.insert({ + table: "option_aggregates", + values: latestBatch, + format: "JSONEachRow", + }), + { retries: 5, factor: 2, minTimeout: 1000, maxTimeout: 60 * 1000 } + ); + } await pRetry( () => clickhouse.insert({ @@ -137,19 +141,19 @@ async function getNextBatchOfUnstartedOptionAggregates( limit: number ): Promise> { if (typeof previousUnstartedOptionContract === "undefined") { - return; + return []; } - const optionContractsWithoutAggregates = await pRetry( - () => - query(` + const queryContents = ` SELECT asOfDate, symbol, expirationDate, strike, - type + type, + argMax(status, ts) as status FROM amg_option_aggregate_sync_statuses - WHERE ( + WHERE symbol IN ['AAPL','AMD','GOOGL','MSFT','NFLX'] + AND ( ( asOfDate = '${previousUnstartedOptionContract.asOfDate}' AND symbol = '${previousUnstartedOptionContract.symbol}' @@ -177,13 +181,18 @@ async function getNextBatchOfUnstartedOptionAggregates( asOfDate > '${previousUnstartedOptionContract.asOfDate}' ) ) - AND status = 'not-started' + GROUP BY asOfDate, symbol, expirationDate, strike, type + HAVING status = 'not-started' ORDER BY asOfDate, symbol, expirationDate, strike, type LIMIT ${limit} - `), + `; + //console.log(queryContents); + const optionContractsWithoutAggregates = await pRetry( + () => query(queryContents), { retries: 5, factor: 2, minTimeout: 1000, maxTimeout: 60 * 1000 } ); - return optionContractsWithoutAggregates; + console.log(`Got ${optionContractsWithoutAggregates.length} records`); + return optionContractsWithoutAggregates || []; } /** @@ -194,91 +203,97 @@ async function getNextBatchOfUnstartedOptionAggregates( * so as to start afresh. */ async function revertPendingSyncs() { - const pendingOptionContracts = await query<{ - asOfDate: string; - symbol: string; - expirationDate: string; - strike: number; - type: "call" | "put"; - latestStatus: "not-started" | "pending" | "done"; - }>(` - SELECT + const batchSize = 1000; + let pendingOptionContracts; + do { + pendingOptionContracts = await query<{ + asOfDate: string; + symbol: string; + expirationDate: string; + strike: number; + type: "call" | "put"; + latestStatus: "not-started" | "pending" | "done"; + }>(` + SELECT asOfDate, symbol, expirationDate, strike, - type + type, + argMax(status, ts) as status FROM amg_option_aggregate_sync_statuses - WHERE status = 'pending' - ORDER BY asOfDate, symbol, expirationDate, strike, type - `); - console.log( - "Pending operations:", - pendingOptionContracts.map( - ({ asOfDate, symbol, expirationDate, strike, type }) => - `${symbol} ${expirationDate} ${strike} ${type} @ ${asOfDate}` - ) - ); - await pAll( - pendingOptionContracts.map( - ({ asOfDate, symbol, expirationDate, strike, type }) => - () => - pSeries([ - // Delete option_contracts first, in case this `pAll` operation fails and we need to restart; so `option_contract_sync_statuses` "pending" rows are still there for the restart - () => - clickhouse - .command({ - query: ` - DELETE FROM option_aggregates - WHERE symbol = '${symbol}' - AND expirationDate = '${expirationDate}' - AND strike = ${strike} - AND type = '${type}' - AND toDate(tsStart) = '${asOfDate}' - `, - }) - .then(() => { - console.log(`Deleted aggregates for `); - }), - () => - clickhouse - .insert({ - table: "amg_option_aggregate_sync_statuses", - values: [ - { - asOfDate, - symbol, - expirationDate, - strike, - type, - status: "not-started", - }, - ], - format: "JSONEachRow", - }) - .then(() => { - console.log(); - }), - ]) - ), - { concurrency: 1 } - ); + WHERE symbol IN ['AAPL','AMD','GOOGL','MSFT','NFLX'] + GROUP BY asOfDate, symbol, expirationDate, strike, type + HAVING status = 'pending' + LIMIT ${batchSize} + `); + console.log( + "Pending operations:", + pendingOptionContracts.map( + ({ asOfDate, symbol, expirationDate, strike, type }) => + `${symbol} ${expirationDate} ${strike} ${type} @ ${asOfDate}` + ) + ); + await pSeries([ + // Delete option_contracts first, in case this `pAll` operation fails and we need to restart; so `option_contract_sync_statuses` "pending" rows are still there for the restart + () => + clickhouse + .command({ + query: ` + DELETE FROM option_aggregates + WHERE (symbol, expirationDate, strike, type, toDate(tsStart)) + IN [${pendingOptionContracts + .map( + ({ asOfDate, symbol, expirationDate, strike, type }) => + `('${symbol}', '${expirationDate}', ${strike}, '${type}', '${asOfDate}')` + ) + .join(",")} + ] + `, + }) + .then(() => { + console.log(`Deleted ${pendingOptionContracts.length} aggregates`); + }), + () => + clickhouse + .insert({ + table: "amg_option_aggregate_sync_statuses", + values: pendingOptionContracts.map( + ({ asOfDate, symbol, expirationDate, strike, type }) => ({ + asOfDate, + symbol, + expirationDate, + strike, + type, + status: "not-started", + }) + ), + format: "JSONEachRow", + }) + .then(() => {}), + ]); + } while (pendingOptionContracts.length === batchSize); + await clickhouse.command({ + query: ` + OPTIMIZE TABLE amg_option_aggregate_sync_statuses FINAL + `, + }); } // First, revert 'pending' syncs: -//await revertPendingSyncs(); +await revertPendingSyncs(); /** Second, for each option contract, get all of its quotes. * - * This queries Polygon with a concurrency of 6. + * This queries Polygon with a concurrency of 16. */ -const q = new pQueue({ concurrency: 6 }); +const q = new pQueue({ concurrency: 16 }); /** Initialized with the lowest possible option contract. * It's passed into `getNextUnstartedSymbolAndAsOfDate()`. */ let nextBatchOfUnstartedOptionContracts: Array = [ { - asOfDate: "2022-04-05", + asOfDate: "2022-03-27", symbol: "A", expirationDate: "2022-02-01", strike: 0, @@ -289,8 +304,8 @@ while ( (nextBatchOfUnstartedOptionContracts = await getNextBatchOfUnstartedOptionAggregates( nextBatchOfUnstartedOptionContracts.pop(), - 200 - )) !== null + 100 + )).length !== 0 ) { await pAll( nextBatchOfUnstartedOptionContracts.map( diff --git a/server/tables.sql b/server/tables.sql index 938a83f..76f22b5 100644 --- a/server/tables.sql +++ b/server/tables.sql @@ -77,7 +77,7 @@ CREATE TABLE amg_option_aggregate_sync_statuses ( ts DateTime64 DEFAULT now() ) ENGINE=AggregatingMergeTree -ORDER BY (asOfDate, symbol, expirationDate, strike, type, ts); +ORDER BY (asOfDate, symbol, expirationDate, strike, type); INSERT INTO amg_option_aggregate_sync_statuses SELECT asOfDate, symbol, expirationDate, strike, type, status, ts