From de48720d17017b96e878d2ec1fdfa3f1b6e5ed6f Mon Sep 17 00:00:00 2001 From: Avraham Sakal Date: Thu, 29 Feb 2024 18:52:37 -0500 Subject: [PATCH] functioning polygon-to-clickhouse sync script for option contracts --- server/package.json | 1 + server/pnpm-lock.yaml | 8 + ...ption-quotes-from-polygon-to-clickhouse.ts | 236 +++++++++++++----- 3 files changed, 176 insertions(+), 69 deletions(-) diff --git a/server/package.json b/server/package.json index 491d539..677ed0b 100644 --- a/server/package.json +++ b/server/package.json @@ -15,6 +15,7 @@ "dotenv": "^16.4.1", "p-all": "^5.0.0", "p-queue": "^8.0.1", + "p-series": "^3.0.0", "p-throttle": "^6.1.0" }, "devDependencies": { diff --git a/server/pnpm-lock.yaml b/server/pnpm-lock.yaml index eebfb2a..bb4b9b6 100644 --- a/server/pnpm-lock.yaml +++ b/server/pnpm-lock.yaml @@ -26,6 +26,9 @@ dependencies: p-queue: specifier: ^8.0.1 version: 8.0.1 + p-series: + specifier: ^3.0.0 + version: 3.0.0 p-throttle: specifier: ^6.1.0 version: 6.1.0 @@ -825,6 +828,11 @@ packages: p-timeout: 6.1.2 dev: false + /p-series@3.0.0: + resolution: {integrity: sha512-geaabIwiqy+jN4vuJROl1rpMJT/myHAMAfdubPQGJT3Grr8td+ogWvTk2qLsNlhYXcoZZAfl01pfq7lK3/gYKQ==} + engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} + dev: false + /p-throttle@6.1.0: resolution: {integrity: sha512-eQMdGTxk2+047La67wefUtt0tEHh7D+C8Jl7QXoFCuIiNYeQ9zWs2AZiJdIAs72rSXZ06t11me2bgalRNdy3SQ==} engines: {node: '>=18'} diff --git a/server/src/scripts/ingest-option-quotes-from-polygon-to-clickhouse.ts b/server/src/scripts/ingest-option-quotes-from-polygon-to-clickhouse.ts index 53c32e4..a85cd6f 100644 --- a/server/src/scripts/ingest-option-quotes-from-polygon-to-clickhouse.ts +++ b/server/src/scripts/ingest-option-quotes-from-polygon-to-clickhouse.ts @@ -1,59 +1,80 @@ import { clickhouse, query } from "../clickhouse.js"; import { getApiKey } from "./polygon.js"; -import pAll from 'p-all'; -import pQueue from 'p-queue'; +import pAll from "p-all"; +import pQueue from "p-queue"; +import pSeries from "p-series"; -type PolygonResponse = {next_url?:string, results:Array<{ticker:string, expiration_date:string, strike_price:number, contract_type:'call'|'put'}>}; -async function getOptionContracts(underlyingSymbol, asOfDate){ +type PolygonResponse = { + next_url?: string; + results: Array<{ + ticker: string; + expiration_date: string; + strike_price: number; + contract_type: "call" | "put"; + }>; +}; +async function getOptionContracts(underlyingSymbol, asOfDate) { // first mark the sync of this particular symbol and asOfDate as "pending": await clickhouse.insert({ - table: 'option_contract_sync_statuses', - values: [{symbol: underlyingSymbol, asOfDate, status: 'pending'}], - format: 'JSONEachRow', + table: "option_contract_sync_statuses", + values: [{ symbol: underlyingSymbol, asOfDate, status: "pending" }], + format: "JSONEachRow", }); // then commence the sync with the initial request: - let latestBatchResponse = await (await fetch(`https://api.polygon.io/v3/reference/options/contracts?underlying_ticker=${underlyingSymbol}&as_of=${asOfDate}&sort=ticker&limit=1000&apiKey=${await getApiKey()}`)).json() as PolygonResponse; - let latestBatch = latestBatchResponse.results - .map((result)=>({ + let latestBatchResponse = (await ( + await fetch( + `https://api.polygon.io/v3/reference/options/contracts?underlying_ticker=${underlyingSymbol}&as_of=${asOfDate}&sort=ticker&limit=1000&apiKey=${await getApiKey()}` + ) + ).json()) as PolygonResponse; + let latestBatch = latestBatchResponse.results.map((result) => ({ + asOfDate, + symbol: underlyingSymbol, + expirationDate: result.expiration_date, + strike: result.strike_price, + type: result.contract_type, + })); + await clickhouse.insert({ + table: "option_contracts", + values: latestBatch, + format: "JSONEachRow", + }); + //console.log(latestBatch.results.map((r)=>r.ticker)); + // as long as there's a `next_url`, call that: + while (latestBatchResponse.hasOwnProperty("next_url")) { + latestBatchResponse = (await ( + await fetch(`${latestBatchResponse.next_url}&apiKey=${await getApiKey()}`) + ).json()) as PolygonResponse; + latestBatch = latestBatchResponse.results.map((result) => ({ asOfDate, symbol: underlyingSymbol, expirationDate: result.expiration_date, strike: result.strike_price, type: result.contract_type, })); - await clickhouse.insert({ - table: 'option_contracts', - values: latestBatch, - format: 'JSONEachRow', - }); - //console.log(latestBatch.results.map((r)=>r.ticker)); - // as long as there's a `next_url`, call that: - while(latestBatchResponse.hasOwnProperty('next_url')){ - latestBatchResponse = await (await fetch(`${latestBatchResponse.next_url}&apiKey=${await getApiKey()}`)).json() as PolygonResponse; - latestBatch = latestBatchResponse.results - .map((result)=>({ - asOfDate, - symbol: underlyingSymbol, - expirationDate: result.expiration_date, - strike: result.strike_price, - type: result.contract_type, - })); //console.log(latestBatch.results.map((r)=>r.ticker)); await clickhouse.insert({ - table: 'option_contracts', + table: "option_contracts", values: latestBatch, - format: 'JSONEachRow', + format: "JSONEachRow", }); } await clickhouse.insert({ - table: 'option_contract_sync_statuses', - values: [{symbol: underlyingSymbol, asOfDate, status: 'done'}], - format: 'JSONEachRow', + table: "option_contract_sync_statuses", + values: [{ symbol: underlyingSymbol, asOfDate, status: "done" }], + format: "JSONEachRow", }); } -async function getNextBatchOfUnstartedSymbolsAndAsOfDates(previousUnstartedSymbolAndAsOfDate:{symbol:string, asOfDate:string}, limit:number){ - const rows = await query<{symbol:string, earliestAsOfDate:string}>(` +async function getNextBatchOfUnstartedSymbolsAndAsOfDates( + previousUnstartedSymbolAndAsOfDate: + | { symbol: string; asOfDate: string } + | undefined, + limit: number +) { + if (typeof previousUnstartedSymbolAndAsOfDate === "undefined") { + return; + } + const rows = await query<{ symbol: string; earliestAsOfDate: string }>(` SELECT symbol, first_value(asOfDate) as earliestAsOfDate @@ -82,26 +103,29 @@ async function getNextBatchOfUnstartedSymbolsAndAsOfDates(previousUnstartedSymbo ORDER BY symbol ASC LIMIT ${limit} `); - return rows.map(row=>({ + return rows.map((row) => ({ symbol: row.symbol, asOfDate: row.earliestAsOfDate, })); } /** - * For each symbol in `symbols` table, check the latest `asOfDate` - * in `option_contract_sync_statuses` for that symbol. Then fill-in the rest + * For each symbol in `symbols` table, check the latest `asOfDate` + * in `option_contract_sync_statuses` for that symbol. Then fill-in the rest * of the dates until today's date. */ -async function fillSyncStatuses(){ - const symbols = (await query<{symbol:string}>(` +async function fillSyncStatuses() { + const symbols = ( + await query<{ symbol: string }>(` SELECT symbol from symbols - `)).map(({symbol})=>symbol); - - console.log('symbols', symbols); - await pAll(symbols.map( - (symbol)=> - ()=>query<{latestAsOfDate:string}>(` + `) + ).map(({ symbol }) => symbol); + + console.log("symbols", symbols); + await pAll( + symbols.map( + (symbol) => () => + query<{ latestAsOfDate: string }>(` SELECT latestAsOfDate FROM ( @@ -114,47 +138,117 @@ async function fillSyncStatuses(){ ) ) WHERE latestAsOfDate > '2022-02-18' - `).then((rows)=> - clickhouse.command({ - query: ` + `).then((rows) => + clickhouse + .command({ + query: ` INSERT INTO option_contract_sync_statuses SELECT '${symbol}' as symbol, - Date(dateAdd(DAY,number,'${rows[0]?.latestAsOfDate || '2022-02-19'}')) as asOfDate, + Date(dateAdd(DAY,number,'${ + rows[0]?.latestAsOfDate || "2022-02-19" + }')) as asOfDate, 'not-started' as status FROM system.numbers - WHERE number < dateDiff('days',Date('${rows[0]?.latestAsOfDate || '2022-02-19'}'), Date(now())) + WHERE number < dateDiff('days',Date('${ + rows[0]?.latestAsOfDate || "2022-02-19" + }'), Date(now())) AND number > 0 - ` - }).then(()=>{console.log(`Done ${symbol}`);}) - ) - ), - {concurrency: 6} + `, + }) + .then(() => { + console.log(`Done ${symbol}`); + }) + ) + ), + { concurrency: 6 } ); } /** First, make sure we know which symbol-asOfDate combinations are * yet un-synced. -*/ + */ await fillSyncStatuses(); -/** Second, for each symbol-asOfDate combination whose option contracts +/** + * Second, since this is startup time, obviously anything `pending` is not really running. + * So, for each `pending` combo, delete its status, and all contracts synced so far, so as to start afresh. + */ +const pendingSymbolsAndAsOfDates = await query<{ + symbol: string; + asOfDate: string; + latestStatus: "not-started" | "pending" | "done"; +}>(` + SELECT + symbol, + asOfDate, + last_value(status) as latestStatus + FROM ( + SELECT * + FROM option_contract_sync_statuses + ORDER BY asOfDate ASC, symbol ASC + ) + GROUP BY symbol, asOfDate + HAVING latestStatus = 'pending' + ORDER BY symbol ASC, asOfDate ASC +`); +console.log( + "Pending operations:", + pendingSymbolsAndAsOfDates.map( + ({ symbol, asOfDate }) => `${symbol} ${asOfDate}` + ) +); +await pAll( + pendingSymbolsAndAsOfDates.map( + ({ symbol, asOfDate }) => + () => + pSeries([ + // Delete option_contracts first, in case this `pAll` operation fails and we need to restart; so `option_contract_sync_statuses` "pending" rows are still there for the restart + () => + clickhouse.command({ + query: `DELETE FROM option_contracts WHERE symbol = '${symbol}' AND asOfDate = '${asOfDate}'`, + }), + () => + clickhouse.command({ + query: `DELETE FROM option_contract_sync_statuses WHERE symbol = '${symbol}' AND asOfDate = '${asOfDate}' AND status = 'pending'`, + }), + ]) + ) +); + +/** Second, for each symbol-asOfDate combination whose option contracts * are not known, make them known. - * + * * This queries Polygon with a concurrency of 6. */ -const q = new pQueue({concurrency: 6}); +const q = new pQueue({ concurrency: 6 }); /** Initialized with the lowest possible symbol and the earliest possible asOfDate. * It's passed into `getNextUnstartedSymbolAndAsOfDate()`. */ -let nextBatchOfUnstartedSymbolsAndAsOfDates = [{symbol:'A', asOfDate:'2022-02-01'}]; -while((nextBatchOfUnstartedSymbolsAndAsOfDates = await getNextBatchOfUnstartedSymbolsAndAsOfDates(nextBatchOfUnstartedSymbolsAndAsOfDates.pop(), 200)) !== null){ - await pAll(nextBatchOfUnstartedSymbolsAndAsOfDates.map((unstartedSymbolAndAsOfDate)=> - ()=>q.add(async ()=>{ - console.log(`Getting contracts for ${unstartedSymbolAndAsOfDate.symbol} at ${unstartedSymbolAndAsOfDate.asOfDate}`); - await getOptionContracts(unstartedSymbolAndAsOfDate.symbol, unstartedSymbolAndAsOfDate.asOfDate); - }) - )); +let nextBatchOfUnstartedSymbolsAndAsOfDates = [ + { symbol: "A", asOfDate: "2022-02-01" }, +]; +while ( + (nextBatchOfUnstartedSymbolsAndAsOfDates = + await getNextBatchOfUnstartedSymbolsAndAsOfDates( + nextBatchOfUnstartedSymbolsAndAsOfDates.pop(), + 200 + )) !== null +) { + await pAll( + nextBatchOfUnstartedSymbolsAndAsOfDates.map( + (unstartedSymbolAndAsOfDate) => () => + q.add(async () => { + console.log( + `Getting contracts for ${unstartedSymbolAndAsOfDate.symbol} at ${unstartedSymbolAndAsOfDate.asOfDate}` + ); + await getOptionContracts( + unstartedSymbolAndAsOfDate.symbol, + unstartedSymbolAndAsOfDate.asOfDate + ); + }) + ) + ); // don't loop again until the queue has less than 50 items; we don't want it to grow in memory without bound: console.log("Waiting till less than 50 in queue"); await q.onSizeLessThan(50); @@ -162,8 +256,12 @@ while((nextBatchOfUnstartedSymbolsAndAsOfDates = await getNextBatchOfUnstartedSy // wait until pending queue operations are done: await q.onSizeLessThan(1); +/** + * For each option contract, find its earliest date of existence, and get all quotes from + * then on. + */ + /*** TODOs ***/ /* + Gracefully recover from errors in individual operations. - + If program stops and is restarted, it should restart anything `pending`: erase existing option contracts for that symbol/asOfDate -*/ \ No newline at end of file +*/