From 458049481010ad2e38e3f2a1046642646e119cc8 Mon Sep 17 00:00:00 2001 From: Avraham Sakal Date: Tue, 12 Mar 2024 22:19:37 -0400 Subject: [PATCH] option aggregate sync statuses usable by ingest script --- ...ption-quotes-from-polygon-to-clickhouse.ts | 126 ++++++++++-------- server/tables.sql | 17 +++ 2 files changed, 90 insertions(+), 53 deletions(-) diff --git a/server/src/scripts/ingest-option-quotes-from-polygon-to-clickhouse.ts b/server/src/scripts/ingest-option-quotes-from-polygon-to-clickhouse.ts index 3f2fbb9..868d1e2 100644 --- a/server/src/scripts/ingest-option-quotes-from-polygon-to-clickhouse.ts +++ b/server/src/scripts/ingest-option-quotes-from-polygon-to-clickhouse.ts @@ -4,7 +4,17 @@ import pAll from "p-all"; import pQueue from "p-queue"; import pSeries from "p-series"; -const optionContractToTicker = ({ symbol, expirationDate, strike, type }) => +const optionContractToTicker = ({ + symbol, + expirationDate, + strike, + type, +}: { + symbol: string; + expirationDate: string; + strike: number; + type: "call" | "put"; +}) => `O:${symbol}${expirationDate.substring(2, 4)}${expirationDate.substring( 5, 7 @@ -28,11 +38,11 @@ type PolygonResponse = { }>; }; async function getOptionAggregates( - asOfDate, - underlyingSymbol, - expirationDate, - strike, - type + asOfDate: string, + underlyingSymbol: string, + expirationDate: string, + strike: number, + type: "call" | "put" ) { const optionContractTicker = optionContractToTicker({ symbol: underlyingSymbol, @@ -42,7 +52,7 @@ async function getOptionAggregates( }); // first mark the sync of this particular option contract as "pending": await clickhouse.insert({ - table: "option_aggregate_sync_statuses", + table: "amg_option_aggregate_sync_statuses", values: [ { asOfDate, @@ -61,6 +71,10 @@ async function getOptionAggregates( `https://api.polygon.io/v2/aggs/ticker/${optionContractTicker}/range/1/minute/${asOfDate}/${asOfDate}?adjusted=false&sort=asc&limit=50000&apiKey=${await getApiKey()}` ) ).json()) as PolygonResponse; + if (!latestBatchResponse.results) { + console.log(latestBatchResponse); + return; + } let latestBatch = latestBatchResponse.results.map((result) => ({ symbol: underlyingSymbol, expirationDate, @@ -79,7 +93,7 @@ async function getOptionAggregates( format: "JSONEachRow", }); await clickhouse.insert({ - table: "option_contract_quote_sync_statuses", + table: "amg_option_aggregate_sync_statuses", values: [ { asOfDate, @@ -114,16 +128,9 @@ async function getNextBatchOfUnstartedOptionAggregates( symbol, expirationDate, strike, - type, - last_value(status) as latestStatus - FROM ( - SELECT * - FROM option_aggregate_sync_statuses - ORDER BY ts ASC - ) - GROUP BY asOfDate, symbol, expirationDate, strike, type - HAVING latestStatus = 'not-started' - AND ( + type + FROM amg_option_aggregate_sync_statuses + WHERE ( ( asOfDate = '${previousUnstartedOptionContract.asOfDate}' AND symbol = '${previousUnstartedOptionContract.symbol}' @@ -151,6 +158,7 @@ async function getNextBatchOfUnstartedOptionAggregates( asOfDate > '${previousUnstartedOptionContract.asOfDate}' ) ) + AND status = 'not-started' ORDER BY asOfDate, symbol, expirationDate, strike, type LIMIT ${limit} `); @@ -166,6 +174,7 @@ async function getNextBatchOfUnstartedOptionAggregates( */ async function revertPendingSyncs() { const pendingOptionContracts = await query<{ + asOfDate: string; symbol: string; expirationDate: string; strike: number; @@ -173,58 +182,69 @@ async function revertPendingSyncs() { latestStatus: "not-started" | "pending" | "done"; }>(` SELECT + asOfDate, symbol, expirationDate, strike, - type, - last_value(status) as latestStatus - FROM ( - SELECT * - FROM option_aggregate_sync_statuses - ORDER BY symbol, expirationDate, strike, type, ts ASC - ) - GROUP BY symbol, expirationDate, strike, type - HAVING latestStatus = 'pending' - ORDER BY symbol, expirationDate, strike, type + type + FROM amg_option_aggregate_sync_statuses + WHERE status = 'pending' + ORDER BY asOfDate, symbol, expirationDate, strike, type `); console.log( "Pending operations:", pendingOptionContracts.map( - ({ symbol, expirationDate, strike, type }) => - `${symbol} ${expirationDate} ${strike} ${type}` + ({ asOfDate, symbol, expirationDate, strike, type }) => + `${symbol} ${expirationDate} ${strike} ${type} @ ${asOfDate}` ) ); await pAll( pendingOptionContracts.map( - ({ symbol, expirationDate, strike, type }) => + ({ asOfDate, symbol, expirationDate, strike, type }) => () => pSeries([ // Delete option_contracts first, in case this `pAll` operation fails and we need to restart; so `option_contract_sync_statuses` "pending" rows are still there for the restart () => - clickhouse.command({ - query: ` + clickhouse + .command({ + query: ` DELETE FROM option_aggregates WHERE symbol = '${symbol}' AND expirationDate = '${expirationDate}' AND strike = ${strike} AND type = '${type}' + AND toDate(tsStart) = '${asOfDate}' `, - }), + }) + .then(() => { + console.log(`Deleted aggregates for `); + }), () => - clickhouse.command({ - query: ` - DELETE FROM option_aggregate_sync_statuses - WHERE symbol = '${symbol}' - AND expirationDate = '${expirationDate}' - AND strike = ${strike} - AND type = '${type}' - AND status = 'pending'`, - }), + clickhouse + .insert({ + table: "amg_option_aggregate_sync_statuses", + values: [ + { + asOfDate, + symbol, + expirationDate, + strike, + type, + status: "not-started", + }, + ], + format: "JSONEachRow", + }) + .then(() => { + console.log(); + }), ]) ) ); } -//await revertPendingSyncs(); + +// First, revert 'pending' syncs: +await revertPendingSyncs(); /** Second, for each option contract, get all of its quotes. * @@ -236,7 +256,7 @@ const q = new pQueue({ concurrency: 6 }); */ let nextBatchOfUnstartedOptionContracts: Array = [ { - asOfDate: "2022-02-01", + asOfDate: "2022-03-15", symbol: "A", expirationDate: "2022-02-01", strike: 0, @@ -255,15 +275,15 @@ while ( (unstartedOptionContract) => () => q.add(async () => { console.log( - `Getting aggregates for ${unstartedOptionContract.asOfDate} ${unstartedOptionContract.symbol} at ${unstartedOptionContract.expirationDate} ${unstartedOptionContract.strike} ${unstartedOptionContract.type}` + `Getting aggregates for ${unstartedOptionContract.symbol} ${unstartedOptionContract.expirationDate} ${unstartedOptionContract.strike} ${unstartedOptionContract.type} @ ${unstartedOptionContract.asOfDate}` + ); + await getOptionAggregates( + unstartedOptionContract.asOfDate, + unstartedOptionContract.symbol, + unstartedOptionContract.expirationDate, + unstartedOptionContract.strike, + unstartedOptionContract.type ); - // await getOptionAggregates( - // unstartedOptionContract.asOfDate, - // unstartedOptionContract.symbol, - // unstartedOptionContract.expirationDate, - // unstartedOptionContract.strike, - // unstartedOptionContract.type - // ); }) ) ); diff --git a/server/tables.sql b/server/tables.sql index 5707944..938a83f 100644 --- a/server/tables.sql +++ b/server/tables.sql @@ -67,6 +67,23 @@ SELECT now() as ts FROM option_contracts; +CREATE TABLE amg_option_aggregate_sync_statuses ( + asOfDate Date, + symbol LowCardinality(String), + expirationDate Date, + strike Float32, + type ENUM('call', 'put'), + status SimpleAggregateFunction(anyLast, ENUM('not-started','pending','done')), + ts DateTime64 DEFAULT now() +) +ENGINE=AggregatingMergeTree +ORDER BY (asOfDate, symbol, expirationDate, strike, type, ts); + +INSERT INTO amg_option_aggregate_sync_statuses +SELECT asOfDate, symbol, expirationDate, strike, type, status, ts +FROM option_aggregate_sync_statuses +ORDER BY asOfDate, symbol, expirationDate, strike, type, ts; + -- END: Option Contract Quotes CREATE TABLE stock_aggregates