diff --git a/server/package.json b/server/package.json index 5a12faf..68bed06 100644 --- a/server/package.json +++ b/server/package.json @@ -15,6 +15,7 @@ "cors": "^2.8.5", "dotenv": "^16.4.1", "p-all": "^5.0.0", + "p-queue": "^8.0.1", "p-throttle": "^6.1.0" }, "devDependencies": { diff --git a/server/pnpm-lock.yaml b/server/pnpm-lock.yaml index 2ab5077..eebfb2a 100644 --- a/server/pnpm-lock.yaml +++ b/server/pnpm-lock.yaml @@ -23,6 +23,9 @@ dependencies: p-all: specifier: ^5.0.0 version: 5.0.0 + p-queue: + specifier: ^8.0.1 + version: 8.0.1 p-throttle: specifier: ^6.1.0 version: 6.1.0 @@ -505,6 +508,10 @@ packages: engines: {node: '>=0.8.0'} dev: true + /eventemitter3@5.0.1: + resolution: {integrity: sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==} + dev: false + /for-each@0.3.3: resolution: {integrity: sha512-jqYfLp7mo9vIyQf8ykW2v7A+2N4QjeCeI5+Dz9XraiO1ign81wjiH7Fb9vSOWvQfNtmSa4H2RoQTrrXivdUZmw==} dependencies: @@ -810,11 +817,24 @@ packages: engines: {node: '>=16'} dev: false + /p-queue@8.0.1: + resolution: {integrity: sha512-NXzu9aQJTAzbBqOt2hwsR63ea7yvxJc0PwN/zobNAudYfb1B7R08SzB4TsLeSbUCuG467NhnoT0oO6w1qRO+BA==} + engines: {node: '>=18'} + dependencies: + eventemitter3: 5.0.1 + p-timeout: 6.1.2 + dev: false + /p-throttle@6.1.0: resolution: {integrity: sha512-eQMdGTxk2+047La67wefUtt0tEHh7D+C8Jl7QXoFCuIiNYeQ9zWs2AZiJdIAs72rSXZ06t11me2bgalRNdy3SQ==} engines: {node: '>=18'} dev: false + /p-timeout@6.1.2: + resolution: {integrity: sha512-UbD77BuZ9Bc9aABo74gfXhNvzC9Tx7SxtHSh1fxvx3jTLLYvmVhiQZZrJzqqU0jKbN32kb5VOKiLEQI/3bIjgQ==} + engines: {node: '>=14.16'} + dev: false + /parse-json@4.0.0: resolution: {integrity: sha512-aOIos8bujGN93/8Ox/jPLh7RwVnPEysynVFE+fQZyg6jKELEHwzgKdLRFHUgXJL6kylijVSBC4BvN9OmsB48Rw==} engines: {node: '>=4'} diff --git a/server/src/scripts/ingest-option-quotes-from-polygon-to-clickhouse.ts b/server/src/scripts/ingest-option-quotes-from-polygon-to-clickhouse.ts index 4d57b13..cb1eefe 100644 --- a/server/src/scripts/ingest-option-quotes-from-polygon-to-clickhouse.ts +++ b/server/src/scripts/ingest-option-quotes-from-polygon-to-clickhouse.ts @@ -1,26 +1,106 @@ import { clickhouse, query } from "../clickhouse.js"; import { getApiKey } from "./polygon.js"; import pAll from 'p-all'; +import pQueue from 'p-queue'; -type PolygonResponse = {next_url?:string, results:Array<{ticker:string}>}; +type PolygonResponse = {next_url?:string, results:Array<{ticker:string, expiration_date:string, strike_price:number, contract_type:'call'|'put'}>}; async function getOptionContracts(underlyingSymbol, asOfDate){ - let latestBatch = await (await fetch(`https://api.polygon.io/v3/reference/options/contracts?underlying_ticker=${underlyingSymbol}&as_of=${asOfDate}&sort=ticker&limit=1000&apiKey=${await getApiKey()}`)).json() as PolygonResponse; - console.log(latestBatch.results.map((r)=>r.ticker)); - while(latestBatch.hasOwnProperty('next_url')){ - latestBatch = await (await fetch(`${latestBatch.next_url}&apiKey=${await getApiKey()}`)).json() as PolygonResponse; - console.log(latestBatch.results.map((r)=>r.ticker)); + // first mark the sync of this particular symbol and asOfDate as "pending": + await clickhouse.insert({ + table: 'option_contract_sync_statuses', + values: [{symbol: underlyingSymbol, asOfDate, status: 'pending'}], + format: 'JSONEachRow', + }); + // then commence the sync with the initial request: + let latestBatchResponse = await (await fetch(`https://api.polygon.io/v3/reference/options/contracts?underlying_ticker=${underlyingSymbol}&as_of=${asOfDate}&sort=ticker&limit=1000&apiKey=${await getApiKey()}`)).json() as PolygonResponse; + let latestBatch = latestBatchResponse.results + .map((result)=>({ + asOfDate, + symbol: underlyingSymbol, + expirationDate: result.expiration_date, + strike: result.strike_price, + type: result.contract_type, + })); + await clickhouse.insert({ + table: 'option_contracts', + values: latestBatch, + format: 'JSONEachRow', + }); + //console.log(latestBatch.results.map((r)=>r.ticker)); + // as long as there's a `next_url`, call that: + while(latestBatchResponse.hasOwnProperty('next_url')){ + latestBatchResponse = await (await fetch(`${latestBatchResponse.next_url}&apiKey=${await getApiKey()}`)).json() as PolygonResponse; + latestBatch = latestBatchResponse.results + .map((result)=>({ + asOfDate, + symbol: underlyingSymbol, + expirationDate: result.expiration_date, + strike: result.strike_price, + type: result.contract_type, + })); + //console.log(latestBatch.results.map((r)=>r.ticker)); + await clickhouse.insert({ + table: 'option_contracts', + values: latestBatch, + format: 'JSONEachRow', + }); } + await clickhouse.insert({ + table: 'option_contract_sync_statuses', + values: [{symbol: underlyingSymbol, asOfDate, status: 'done'}], + format: 'JSONEachRow', + }); } //await getOptionContracts('AAPL','2024-01-30'); +async function getNextUnstartedSymbolAndAsOfDate(previousUnstartedSymbolAndAsOfDate:{symbol:string, asOfDate:string}){ + const rows = await query<{symbol:string, earliestAsOfDate:string}>(` + SELECT + symbol, + first_value(asOfDate) as earliestAsOfDate + FROM ( + SELECT + symbol, + asOfDate, + last_value(status) as latestStatus + FROM ( + SELECT * + FROM option_contract_sync_statuses + ORDER BY asOfDate ASC, symbol ASC + ) + GROUP BY symbol, asOfDate + HAVING latestStatus = 'not-started' + ORDER BY symbol ASC, asOfDate ASC + ) + GROUP BY symbol + HAVING ( + symbol = '${previousUnstartedSymbolAndAsOfDate.symbol}' + AND asOfDate > '${previousUnstartedSymbolAndAsOfDate.asOfDate}' + ) + OR ( + symbol > '${previousUnstartedSymbolAndAsOfDate.symbol}' + ) + ORDER BY symbol ASC + LIMIT 1 + `); + if(rows.length === 0){ + return null; + } + else{ + return { + symbol: rows[0].symbol, + asOfDate: rows[0].earliestAsOfDate, + } + } +} /** * For each symbol in `symbols` table, check the latest `asOfDate` - * in `symbol_sync_statuses` for that symbol. Then fill-in the rest + * in `option_contract_sync_statuses` for that symbol. Then fill-in the rest * of the dates until today's date. */ async function fillSyncStatuses(){ - const symbols = (await query(` + const symbols = (await query<{symbol:string}>(` SELECT symbol from symbols `)).map(({symbol})=>symbol); @@ -30,11 +110,11 @@ async function fillSyncStatuses(){ ()=>query<{latestAsOfDate:string}>(` SELECT latestAsOfDate - FROM( + FROM ( SELECT last_value(asOfDate) as latestAsOfDate FROM ( SELECT * - FROM symbol_sync_statuses + FROM option_contract_sync_statuses WHERE symbol = '${symbol}' ORDER BY asOfDate ASC ) @@ -43,7 +123,7 @@ async function fillSyncStatuses(){ `).then((rows)=> clickhouse.command({ query: ` - INSERT INTO symbol_sync_statuses + INSERT INTO option_contract_sync_statuses SELECT '${symbol}' as symbol, Date(dateAdd(DAY,number,'${rows[0]?.latestAsOfDate || '2022-02-19'}')) as asOfDate, @@ -59,4 +139,15 @@ async function fillSyncStatuses(){ ); } -await fillSyncStatuses(); \ No newline at end of file +await fillSyncStatuses(); +const q = new pQueue({concurrency: 6}); +let nextUnstartedSymbolAndAsOfDate = {symbol:'A', asOfDate:'2022-02-01'}; +while((nextUnstartedSymbolAndAsOfDate = await getNextUnstartedSymbolAndAsOfDate(nextUnstartedSymbolAndAsOfDate)) !== null){ + await q.add(async ()=>{ + console.log(`Getting contracts for ${nextUnstartedSymbolAndAsOfDate.symbol} at ${nextUnstartedSymbolAndAsOfDate.asOfDate}`); + await getOptionContracts(nextUnstartedSymbolAndAsOfDate.symbol, nextUnstartedSymbolAndAsOfDate.asOfDate); + }); + // don't loop again until the queue has less than 50 items; we don't want it to grow in memory without bound: + console.log("Waiting till less than 50 in queue"); + await q.onSizeLessThan(50); +} \ No newline at end of file diff --git a/server/tables.sql b/server/tables.sql index ff9b047..5e4a605 100644 --- a/server/tables.sql +++ b/server/tables.sql @@ -12,7 +12,7 @@ CREATE TABLE symbols ENGINE MergeTree() ORDER BY (symbol); -CREATE TABLE symbol_sync_statuses +CREATE TABLE option_contract_sync_statuses ( symbol String, asOfDate Date, @@ -21,6 +21,18 @@ CREATE TABLE symbol_sync_statuses ENGINE MergeTree() ORDER BY (asOfDate, symbol); +CREATE TABLE option_contracts +( + asOfDate Date, + symbol String, + expirationDate Date, + strike Float32, + type ENUM('call', 'put') +) +ENGINE MergeTree() +PRIMARY KEY (asOfDate, symbol) +ORDER BY (asOfDate, symbol, expirationDate, strike, type); + CREATE TABLE stock_aggregates ( symbol LowCardinality(String),