ingest stocks from polygon
							parent
							
								
									12d6bec403
								
							
						
					
					
						commit
						f67c092ed2
					
				| @ -0,0 +1,105 @@ | |||||||
|  | //import { restClient as PolygonClient } from '@polygon.io/client-js';
 | ||||||
|  | import { createClient as createClickhouseClient } from '@clickhouse/client' | ||||||
|  | 
 | ||||||
|  | const apiKey = "H95NTsatM1iTWLUwDLxM2J5zhUVYdCEz"; | ||||||
|  | //const polygonClient = PolygonClient(apiKey, "https://api.polygon.io", {pagination: false, trace: true,}); // automatically call `next_url` if there is one
 | ||||||
|  | 
 | ||||||
|  | const clickhouse = createClickhouseClient({username:'avraham', password:'buginoo'}); | ||||||
|  | 
 | ||||||
|  | const stockAggregatesTableName = "stock_aggregates"; | ||||||
|  | async function deleteClickhouseTable(){ | ||||||
|  |   await clickhouse.command({ | ||||||
|  |     query: `DROP TABLE IF EXISTS ${stockAggregatesTableName}`, | ||||||
|  |   }) | ||||||
|  | } | ||||||
|  | async function createClickhouseTable(){ | ||||||
|  |   await clickhouse.command({ | ||||||
|  |     query: ` | ||||||
|  |       CREATE TABLE ${stockAggregatesTableName} | ||||||
|  |       -- (id UInt64, name String) | ||||||
|  |       ( | ||||||
|  |         symbol LowCardinality(String),  | ||||||
|  |         tsStart DateTime32, | ||||||
|  |         open Decimal64(9),  | ||||||
|  |         close Decimal64(9),  | ||||||
|  |         low Decimal64(9),  | ||||||
|  |         high Decimal64(9),  | ||||||
|  |         volume UInt64,  | ||||||
|  |         volume_weighted_price Decimal64(9) | ||||||
|  |       ) | ||||||
|  |       ENGINE MergeTree() | ||||||
|  |       ORDER BY (symbol, tsStart) | ||||||
|  |     `,
 | ||||||
|  |   }) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | async function insertResultsIntoClickhouse(stockAggregatesResult){ | ||||||
|  |   await clickhouse.insert({ | ||||||
|  |     table: stockAggregatesTableName, | ||||||
|  |     // structure should match the desired format, JSONEachRow in this example
 | ||||||
|  |     values: stockAggregatesResult.results.map(r=>({ | ||||||
|  |       symbol: stockAggregatesResult.ticker,  | ||||||
|  |       tsStart: (r.t || 0)/1000, | ||||||
|  |       open: r.o,  | ||||||
|  |       close: r.c,  | ||||||
|  |       low: r.l,  | ||||||
|  |       high: r.h,  | ||||||
|  |       volume: r.v,  | ||||||
|  |       volume_weighted_price: r.vw | ||||||
|  |     })), | ||||||
|  |     format: 'JSONEachRow', | ||||||
|  |   }); | ||||||
|  |   console.log('inserted', stockAggregatesResult.ticker); | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | function sleep(ms){ | ||||||
|  |   return new Promise((resolve)=>{ | ||||||
|  |     setTimeout(resolve, ms); | ||||||
|  |   }); | ||||||
|  | } | ||||||
|  | /** | ||||||
|  |  * Get the underlying price aggregates for every 10-minute period starting 2 years ago (the lookback period for the free tier) until now | ||||||
|  |  */ | ||||||
|  | async function fetchStockAggregates(symbol, fromDate, toDate, limit){ | ||||||
|  |   let stockAggregatesResult = await (await fetch(`https://api.polygon.io/v2/aggs/ticker/${symbol}/range/10/minute/${fromDate}/${toDate}?adjusted=false&sort=asc&limit=${limit}&apiKey=${apiKey}`)).json(); | ||||||
|  |   console.log(stockAggregatesResult.resultsCount); | ||||||
|  |   if(stockAggregatesResult.status === 'OK' && typeof stockAggregatesResult.results !== "undefined" && stockAggregatesResult.results.length > 0){ | ||||||
|  |     await insertResultsIntoClickhouse(stockAggregatesResult); | ||||||
|  |     while(stockAggregatesResult.hasOwnProperty("next_url")){ | ||||||
|  |       await sleep(13000); | ||||||
|  |       stockAggregatesResult = await (await fetch(`${stockAggregatesResult.next_url}&apiKey=${apiKey}`)).json(); | ||||||
|  |       if(stockAggregatesResult.status === 'OK' && typeof stockAggregatesResult.results !== "undefined" && stockAggregatesResult.results.length > 0){ | ||||||
|  |         console.log(stockAggregatesResult.resultsCount); | ||||||
|  |         await insertResultsIntoClickhouse(stockAggregatesResult); | ||||||
|  |       } | ||||||
|  |       else{ | ||||||
|  |         console.log(stockAggregatesResult); | ||||||
|  |       } | ||||||
|  |     } | ||||||
|  |   } | ||||||
|  |   else{ | ||||||
|  |     console.log(stockAggregatesResult); | ||||||
|  |   } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | await deleteClickhouseTable(); | ||||||
|  | await createClickhouseTable(); | ||||||
|  | for(let symbol of ["AAPL", "GOOGL", "MSFT", "TSLA", "AMD", "NFLX", "SPY"]){ | ||||||
|  |   await fetchStockAggregates(symbol, "2021-11-27", "2023-11-25", 50000); | ||||||
|  |   await sleep(13000); | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * For each day (not aggregate), look-up the available option contracts (i.e. | ||||||
|  |  * all unique type/strike/exp combinations). Limit is 1000, but it gives you a | ||||||
|  |  * "nextUrl", so keep calling that until all results are in. | ||||||
|  |  */ | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * For each aggregate and the contracts on that day, get option aggregates | ||||||
|  |  */ | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | await clickhouse.close(); | ||||||
					Loading…
					
					
				
		Reference in New Issue