From 742414697f0e2e44623da792f0be132e119a1023 Mon Sep 17 00:00:00 2001 From: Avraham Sakal Date: Fri, 12 Jul 2024 19:39:13 -0400 Subject: [PATCH] add stock aggregate fetching functions --- server/src/lib/polygon.ts | 87 +++++++++++++++++++++++++++++++++++++-- server/src/lib/sync.ts | 70 ++++++++++++++++++++++++++++--- 2 files changed, 149 insertions(+), 8 deletions(-) diff --git a/server/src/lib/polygon.ts b/server/src/lib/polygon.ts index 3bff4e8..f0cf431 100644 --- a/server/src/lib/polygon.ts +++ b/server/src/lib/polygon.ts @@ -1,4 +1,4 @@ -// import pThrottle from "p-throttle"; +import pThrottle from "p-throttle"; import pRetry from "p-retry"; import { Env } from "@humanwhocodes/env"; @@ -7,8 +7,8 @@ const env = new Env(); const { POLYGON_API_KEY } = env.required; const apiKey = POLYGON_API_KEY; -// export const getApiKey = pThrottle({ limit: 5, interval: 60000 })(() => apiKey); -export const getApiKey = () => apiKey; +export const getApiKey = pThrottle({ limit: 5, interval: 72000 })(() => apiKey); +// export const getApiKey = () => apiKey; export const optionContractToTicker = ({ symbol, @@ -154,3 +154,84 @@ export async function* makeGetOptionContractAggregatesIterator({ ); } } + +type PolygonStockAggregatesResponse = { + next_url?: string; + status: string; + resultsCount: number; + results: Array<{ + c: number; + h: number; + n: number; + l: number; + o: number; + t: number; + v: number; + vw: number; + }>; +}; +export async function* makeGetStockAggregatesIterator({ + symbol, + firstDate, + lastDate, +}: { + symbol: string; + firstDate: string; + lastDate: string; +}) { + const url = `https://api.polygon.io/v2/aggs/ticker/${symbol}/range/1/minute/${firstDate}/${lastDate}?adjusted=false&sort=asc&limit=10000&apiKey=${await getApiKey()}`; + let latestBatchResponse = await pRetry( + async () => + (await (await fetch(url)).json()) as PolygonStockAggregatesResponse, + { forever: true, factor: 2, maxTimeout: 120000 } + ); + if (latestBatchResponse.status.toLowerCase() === "ok") { + yield latestBatchResponse.results?.map((result) => ({ + symbol, + + tsStart: (result.t || 0) / 1000, + open: result.o, + close: result.c, + low: result.l, + high: result.h, + })) || []; + } else if (latestBatchResponse.status === "NOT_AUTHORIZED") { + console.error("Skipping due to:", latestBatchResponse); + } else if (latestBatchResponse.status === "DELAYED") { + console.error("Skipping due to:", latestBatchResponse); + } else { + console.error(latestBatchResponse); + throw new Error(`error fetching option contract aggregate ${url}`); + } + + // as long as there's a `next_url`, call that: + while (latestBatchResponse.hasOwnProperty("next_url")) { + latestBatchResponse = await pRetry( + async () => + (await ( + await fetch( + `${latestBatchResponse.next_url}&apiKey=${await getApiKey()}` + ) + ).json()) as PolygonStockAggregatesResponse, + { forever: true, factor: 2, maxTimeout: 120000 } + ); + if (latestBatchResponse.status.toLowerCase() === "ok") { + yield latestBatchResponse.results?.map((result) => ({ + symbol, + + tsStart: (result.t || 0) / 1000, + open: result.o, + close: result.c, + low: result.l, + high: result.h, + })) || []; + } else if (latestBatchResponse.status === "NOT_AUTHORIZED") { + console.error("Skipping due to:", latestBatchResponse); + } else if (latestBatchResponse.status === "DELAYED") { + console.error("Skipping due to:", latestBatchResponse); + } else { + console.error(latestBatchResponse); + throw new Error(`error fetching option contract aggregate ${url}`); + } + } +} diff --git a/server/src/lib/sync.ts b/server/src/lib/sync.ts index 19f44fd..2b137ea 100644 --- a/server/src/lib/sync.ts +++ b/server/src/lib/sync.ts @@ -80,7 +80,7 @@ export async function setPullOptionContractsState( ); } -export async function getPullOptionContractAggregatesState(ticker: string) { +export async function getPullAggregatesState(ticker: string) { const state = await sqliteDb.get( ` SELECT @@ -102,7 +102,7 @@ const enum OptionContractAggregatesSyncStatus { type OptionContractAggregatesSyncState = { status: OptionContractAggregatesSyncStatus; }; -export async function setPullOptionContractAggregatesState( +export async function setPullAggregatesState( ticker: string, state: OptionContractAggregatesSyncState ) { @@ -154,10 +154,10 @@ export async function pullOptionContractAggregates( const ticker = Polygon.optionContractToTicker(optionContract); // check if sync was completed: if ( - (await getPullOptionContractAggregatesState(ticker))?.status !== + (await getPullAggregatesState(ticker))?.status !== OptionContractAggregatesSyncStatus.COMPLETED ) { - await setPullOptionContractAggregatesState(ticker, { + await setPullAggregatesState(ticker, { status: OptionContractAggregatesSyncStatus.STARTED, }); const { firstDate } = await getOptionContractDateRange(optionContract); @@ -186,12 +186,72 @@ export async function pullOptionContractAggregates( ); } } - await setPullOptionContractAggregatesState(ticker, { + await setPullAggregatesState(ticker, { status: OptionContractAggregatesSyncStatus.COMPLETED, }); } } +export async function pullStockAggregates( + symbol: string, + firstDate: string, + lastDate: string +) { + // check if sync was completed: + if ( + (await getPullAggregatesState(symbol))?.status !== + OptionContractAggregatesSyncStatus.COMPLETED + ) { + await setPullAggregatesState(symbol, { + status: OptionContractAggregatesSyncStatus.STARTED, + }); + for await (const batch of Polygon.makeGetStockAggregatesIterator({ + symbol, + firstDate, + lastDate, + })) { + if (batch.length > 0) { + console.log( + symbol, + new Date(batch[0].tsStart * 1000), + new Date(batch[batch.length - 1].tsStart * 1000) + ); + await clickhouse.insert({ + table: "stock_aggregates", + values: batch, + format: "JSONEachRow", + }); + // await pRetry( + // async () => { + // console.log(`inserting ${batch.length} rows`); + // await clickhouse.insert({ + // table: "stock_aggregates", + // values: batch, + // format: "JSONEachRow", + // }); + // console.log("inserted"); + // }, + // { forever: true, factor: 2, maxTimeout: 120000 } + // ); + } + } + await setPullAggregatesState(symbol, { + status: OptionContractAggregatesSyncStatus.COMPLETED, + }); + } +} + +export async function pullAllStockAggregates(firstDate, lastDate) { + const symbols = ( + await query( + `SELECT DISTINCT(symbol) as symbol FROM option_contract_existences WHERE asOfDate >= '${firstDate}' AND asOfDate <= '${lastDate}'` + ) + ).map(({ symbol }) => symbol); + for (const symbol of symbols) { + await pullStockAggregates(symbol, firstDate, lastDate); + } +} + export async function* makeGetOptionContractsIterator( symbol: string, asOfDate: string