diff --git a/src/ingest-options-aggregates-from-polygon.mjs b/src/ingest-options-aggregates-from-polygon.mjs new file mode 100644 index 0000000..07837e0 --- /dev/null +++ b/src/ingest-options-aggregates-from-polygon.mjs @@ -0,0 +1,129 @@ +//import { restClient as PolygonClient } from '@polygon.io/client-js'; +import { createClient as createClickhouseClient } from '@clickhouse/client' +import { sleep, getPolygonApiKey } from './util.mjs'; + +//const polygonClient = PolygonClient(apiKey, "https://api.polygon.io", {pagination: false, trace: true,}); // automatically call `next_url` if there is one + +const clickhouse = createClickhouseClient({username:'avraham', password:'buginoo'}); + +const optionAggregatesTableName = "option_aggregates"; +async function deleteClickhouseTable(){ + await clickhouse.command({ + query: `DROP TABLE IF EXISTS ${optionAggregatesTableName}`, + }) +} +async function createClickhouseTable(){ + await clickhouse.command({ + query: ` + CREATE TABLE ${optionAggregatesTableName} + ( + symbol LowCardinality(String), + expirationDate Date, + optionType Enum('call', 'put'), + strike Decimal64(9), + + tsStart DateTime32, + open Decimal64(9), + close Decimal64(9), + low Decimal64(9), + high Decimal64(9), + volume UInt64, + volumeWeightedPrice Decimal64(9) + ) + ENGINE MergeTree() + ORDER BY (symbol, expirationDate, optionType, strike, tsStart) + `, + }); +} + +async function insertOptionAggregatesIntoClickhouse({symbol, expirationDate, optionType, strike, optionAggregatesResponse}){ + await clickhouse.insert({ + table: optionAggregatesTableName, + // structure should match the desired format, JSONEachRow in this example + values: optionAggregatesResponse.results.map(r=>({ + symbol, + expirationDate, + optionType, + strike, + + tsStart: (r.t || 0)/1000, + open: r.o, + close: r.c, + low: r.l, + high: r.h, + volume: r.v, + volumeWeightedPrice: r.vw + })), + format: 'JSONEachRow', + }); +} + +async function fetchOptionAggregates({marketDate, optionContractTicker}){ + const limit = 5000; + let apiKey = await getPolygonApiKey(); + try{ + return await (await fetch(`https://api.polygon.io/v2/aggs/ticker/${optionContractTicker}/range/10/minute/${marketDate}/${marketDate}?adjusted=true&sort=asc&limit=${limit}&apiKey=${apiKey}`)).json(); + } + catch(err){ + console.error('fetchOptionAggregates() error', err); + return fetchOptionAggregates({marketDate, optionContractTicker}) + } +} + +const optionContractsTableName = "option_contracts"; +async function fetchMarketDates(){ + return (await (await clickhouse.query({ + query: `SELECT DISTINCT(asOfDate) AS asOfDate FROM ${optionContractsTableName} ORDER BY asOfDate ASC`, + format: 'JSONCompactEachRow' + })).json()).map((r)=>r[0]); +} + +//await deleteClickhouseTable(); +//await createClickhouseTable(); + +async function fetchOptionContractsAsOfDate(symbol, asOfDate){ + return (await (await clickhouse.query({ + query: `SELECT expirationDate, optionType, strike FROM ${optionContractsTableName} WHERE symbol = '${symbol}' AND asOfDate = '${asOfDate}' ORDER BY strike ASC`, + format: 'JSONEachRow' + })).json()); +} + +const marketDates = await fetchMarketDates(); +const underlyingSymbols = ["AAPL", "GOOGL", "MSFT", "TSLA", "AMD", "NFLX", "SPY"] +const continueFrom = ['2022-01-03', 'MSFT']; +for(let asOfDate of marketDates){ + if(asOfDate < continueFrom[0]){ continue; } + for(let underlyingSymbol of underlyingSymbols){ + if(asOfDate === continueFrom[0] && underlyingSymbols.indexOf(underlyingSymbol) <= underlyingSymbols.indexOf(continueFrom[1])){ continue; } + const optionContractRows = await fetchOptionContractsAsOfDate(underlyingSymbol, asOfDate); + const optionContractTickers = optionContractRows + //.slice(Math.floor(optionContractRows.length/2)-5, Math.floor(optionContractRows.length/2)+5) + .map(optionContractRow => `O:${underlyingSymbol}${optionContractRow.expirationDate.substring(2,4)}${optionContractRow.expirationDate.substring(5,7)}${optionContractRow.expirationDate.substring(8,10)}${optionContractRow.optionType==='call'?'C':'P'}${(Math.floor(optionContractRow.strike*1000)).toString().padStart(8,'0')}`); + console.log(optionContractTickers); + for(let i=0; i < optionContractTickers.length; i++){ + const optionContractTicker = optionContractTickers[i]; + const optionContractRow = optionContractRows[i]; + const optionAggregatesResponse = await fetchOptionAggregates({marketDate: asOfDate, optionContractTicker}); + if(optionAggregatesResponse.status === 'OK'){ + if(typeof optionAggregatesResponse.results !== "undefined" && optionAggregatesResponse.results.length > 0){ + try{ + await insertOptionAggregatesIntoClickhouse({symbol: underlyingSymbol, expirationDate:optionContractRow.expirationDate, optionType:optionContractRow.optionType, strike:optionContractRow.strike, optionAggregatesResponse}); + } + catch(err){ + console.error('clickhouse error', err); + } + console.log('inserted', optionAggregatesResponse.results.length, asOfDate, underlyingSymbol, optionContractRow.expirationDate, optionContractRow.strike); + } + else{ + console.error(`No results for ${optionContractTicker} @ ${asOfDate}`); + } + } + else{ + console.error(optionAggregatesResponse); + } + } + } +} + + +await clickhouse.close(); \ No newline at end of file diff --git a/src/util.mjs b/src/util.mjs index 618512c..ca4919e 100644 --- a/src/util.mjs +++ b/src/util.mjs @@ -9,6 +9,7 @@ export function sleep(ms){ */ export const getPolygonApiKey = ((rateLimit)=>{ const apiKey = "H95NTsatM1iTWLUwDLxM2J5zhUVYdCEz"; + return async ()=>apiKey; let lastInvocation = 0; return async ()=>{ const now = Date.now();