From f67c092ed2a3906828b1ea1efd2a28468ebe98ab Mon Sep 17 00:00:00 2001 From: Avraham Sakal Date: Sun, 26 Nov 2023 21:04:28 -0500 Subject: [PATCH] ingest stocks from polygon --- src/ingest-from-polygon.mjs | 105 ++++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) create mode 100644 src/ingest-from-polygon.mjs diff --git a/src/ingest-from-polygon.mjs b/src/ingest-from-polygon.mjs new file mode 100644 index 0000000..3901798 --- /dev/null +++ b/src/ingest-from-polygon.mjs @@ -0,0 +1,105 @@ +//import { restClient as PolygonClient } from '@polygon.io/client-js'; +import { createClient as createClickhouseClient } from '@clickhouse/client' + +const apiKey = "H95NTsatM1iTWLUwDLxM2J5zhUVYdCEz"; +//const polygonClient = PolygonClient(apiKey, "https://api.polygon.io", {pagination: false, trace: true,}); // automatically call `next_url` if there is one + +const clickhouse = createClickhouseClient({username:'avraham', password:'buginoo'}); + +const stockAggregatesTableName = "stock_aggregates"; +async function deleteClickhouseTable(){ + await clickhouse.command({ + query: `DROP TABLE IF EXISTS ${stockAggregatesTableName}`, + }) +} +async function createClickhouseTable(){ + await clickhouse.command({ + query: ` + CREATE TABLE ${stockAggregatesTableName} + -- (id UInt64, name String) + ( + symbol LowCardinality(String), + tsStart DateTime32, + open Decimal64(9), + close Decimal64(9), + low Decimal64(9), + high Decimal64(9), + volume UInt64, + volume_weighted_price Decimal64(9) + ) + ENGINE MergeTree() + ORDER BY (symbol, tsStart) + `, + }) +} + +async function insertResultsIntoClickhouse(stockAggregatesResult){ + await clickhouse.insert({ + table: stockAggregatesTableName, + // structure should match the desired format, JSONEachRow in this example + values: stockAggregatesResult.results.map(r=>({ + symbol: stockAggregatesResult.ticker, + tsStart: (r.t || 0)/1000, + open: r.o, + close: r.c, + low: r.l, + high: r.h, + volume: r.v, + volume_weighted_price: r.vw + })), + format: 'JSONEachRow', + }); + console.log('inserted', stockAggregatesResult.ticker); +} + +function sleep(ms){ + return new Promise((resolve)=>{ + setTimeout(resolve, ms); + }); +} +/** + * Get the underlying price aggregates for every 10-minute period starting 2 years ago (the lookback period for the free tier) until now + */ +async function fetchStockAggregates(symbol, fromDate, toDate, limit){ + let stockAggregatesResult = await (await fetch(`https://api.polygon.io/v2/aggs/ticker/${symbol}/range/10/minute/${fromDate}/${toDate}?adjusted=false&sort=asc&limit=${limit}&apiKey=${apiKey}`)).json(); + console.log(stockAggregatesResult.resultsCount); + if(stockAggregatesResult.status === 'OK' && typeof stockAggregatesResult.results !== "undefined" && stockAggregatesResult.results.length > 0){ + await insertResultsIntoClickhouse(stockAggregatesResult); + while(stockAggregatesResult.hasOwnProperty("next_url")){ + await sleep(13000); + stockAggregatesResult = await (await fetch(`${stockAggregatesResult.next_url}&apiKey=${apiKey}`)).json(); + if(stockAggregatesResult.status === 'OK' && typeof stockAggregatesResult.results !== "undefined" && stockAggregatesResult.results.length > 0){ + console.log(stockAggregatesResult.resultsCount); + await insertResultsIntoClickhouse(stockAggregatesResult); + } + else{ + console.log(stockAggregatesResult); + } + } + } + else{ + console.log(stockAggregatesResult); + } +} + +await deleteClickhouseTable(); +await createClickhouseTable(); +for(let symbol of ["AAPL", "GOOGL", "MSFT", "TSLA", "AMD", "NFLX", "SPY"]){ + await fetchStockAggregates(symbol, "2021-11-27", "2023-11-25", 50000); + await sleep(13000); +} + + +/** + * For each day (not aggregate), look-up the available option contracts (i.e. + * all unique type/strike/exp combinations). Limit is 1000, but it gives you a + * "nextUrl", so keep calling that until all results are in. + */ + +/** + * For each aggregate and the contracts on that day, get option aggregates + */ + + + +await clickhouse.close(); \ No newline at end of file