add stock aggregate fetching functions

main
Avraham Sakal 10 months ago
parent df4925e3a4
commit 742414697f

@ -1,4 +1,4 @@
// import pThrottle from "p-throttle";
import pThrottle from "p-throttle";
import pRetry from "p-retry";
import { Env } from "@humanwhocodes/env";
@ -7,8 +7,8 @@ const env = new Env();
const { POLYGON_API_KEY } = env.required;
const apiKey = POLYGON_API_KEY;
// export const getApiKey = pThrottle({ limit: 5, interval: 60000 })(() => apiKey);
export const getApiKey = () => apiKey;
export const getApiKey = pThrottle({ limit: 5, interval: 72000 })(() => apiKey);
// export const getApiKey = () => apiKey;
export const optionContractToTicker = ({
symbol,
@ -154,3 +154,84 @@ export async function* makeGetOptionContractAggregatesIterator({
);
}
}
type PolygonStockAggregatesResponse = {
next_url?: string;
status: string;
resultsCount: number;
results: Array<{
c: number;
h: number;
n: number;
l: number;
o: number;
t: number;
v: number;
vw: number;
}>;
};
export async function* makeGetStockAggregatesIterator({
symbol,
firstDate,
lastDate,
}: {
symbol: string;
firstDate: string;
lastDate: string;
}) {
const url = `https://api.polygon.io/v2/aggs/ticker/${symbol}/range/1/minute/${firstDate}/${lastDate}?adjusted=false&sort=asc&limit=10000&apiKey=${await getApiKey()}`;
let latestBatchResponse = await pRetry(
async () =>
(await (await fetch(url)).json()) as PolygonStockAggregatesResponse,
{ forever: true, factor: 2, maxTimeout: 120000 }
);
if (latestBatchResponse.status.toLowerCase() === "ok") {
yield latestBatchResponse.results?.map((result) => ({
symbol,
tsStart: (result.t || 0) / 1000,
open: result.o,
close: result.c,
low: result.l,
high: result.h,
})) || [];
} else if (latestBatchResponse.status === "NOT_AUTHORIZED") {
console.error("Skipping due to:", latestBatchResponse);
} else if (latestBatchResponse.status === "DELAYED") {
console.error("Skipping due to:", latestBatchResponse);
} else {
console.error(latestBatchResponse);
throw new Error(`error fetching option contract aggregate ${url}`);
}
// as long as there's a `next_url`, call that:
while (latestBatchResponse.hasOwnProperty("next_url")) {
latestBatchResponse = await pRetry(
async () =>
(await (
await fetch(
`${latestBatchResponse.next_url}&apiKey=${await getApiKey()}`
)
).json()) as PolygonStockAggregatesResponse,
{ forever: true, factor: 2, maxTimeout: 120000 }
);
if (latestBatchResponse.status.toLowerCase() === "ok") {
yield latestBatchResponse.results?.map((result) => ({
symbol,
tsStart: (result.t || 0) / 1000,
open: result.o,
close: result.c,
low: result.l,
high: result.h,
})) || [];
} else if (latestBatchResponse.status === "NOT_AUTHORIZED") {
console.error("Skipping due to:", latestBatchResponse);
} else if (latestBatchResponse.status === "DELAYED") {
console.error("Skipping due to:", latestBatchResponse);
} else {
console.error(latestBatchResponse);
throw new Error(`error fetching option contract aggregate ${url}`);
}
}
}

@ -80,7 +80,7 @@ export async function setPullOptionContractsState(
);
}
export async function getPullOptionContractAggregatesState(ticker: string) {
export async function getPullAggregatesState(ticker: string) {
const state = await sqliteDb.get(
`
SELECT
@ -102,7 +102,7 @@ const enum OptionContractAggregatesSyncStatus {
type OptionContractAggregatesSyncState = {
status: OptionContractAggregatesSyncStatus;
};
export async function setPullOptionContractAggregatesState(
export async function setPullAggregatesState(
ticker: string,
state: OptionContractAggregatesSyncState
) {
@ -154,10 +154,10 @@ export async function pullOptionContractAggregates(
const ticker = Polygon.optionContractToTicker(optionContract);
// check if sync was completed:
if (
(await getPullOptionContractAggregatesState(ticker))?.status !==
(await getPullAggregatesState(ticker))?.status !==
OptionContractAggregatesSyncStatus.COMPLETED
) {
await setPullOptionContractAggregatesState(ticker, {
await setPullAggregatesState(ticker, {
status: OptionContractAggregatesSyncStatus.STARTED,
});
const { firstDate } = await getOptionContractDateRange(optionContract);
@ -186,12 +186,72 @@ export async function pullOptionContractAggregates(
);
}
}
await setPullOptionContractAggregatesState(ticker, {
await setPullAggregatesState(ticker, {
status: OptionContractAggregatesSyncStatus.COMPLETED,
});
}
}
export async function pullStockAggregates(
symbol: string,
firstDate: string,
lastDate: string
) {
// check if sync was completed:
if (
(await getPullAggregatesState(symbol))?.status !==
OptionContractAggregatesSyncStatus.COMPLETED
) {
await setPullAggregatesState(symbol, {
status: OptionContractAggregatesSyncStatus.STARTED,
});
for await (const batch of Polygon.makeGetStockAggregatesIterator({
symbol,
firstDate,
lastDate,
})) {
if (batch.length > 0) {
console.log(
symbol,
new Date(batch[0].tsStart * 1000),
new Date(batch[batch.length - 1].tsStart * 1000)
);
await clickhouse.insert({
table: "stock_aggregates",
values: batch,
format: "JSONEachRow",
});
// await pRetry(
// async () => {
// console.log(`inserting ${batch.length} rows`);
// await clickhouse.insert({
// table: "stock_aggregates",
// values: batch,
// format: "JSONEachRow",
// });
// console.log("inserted");
// },
// { forever: true, factor: 2, maxTimeout: 120000 }
// );
}
}
await setPullAggregatesState(symbol, {
status: OptionContractAggregatesSyncStatus.COMPLETED,
});
}
}
export async function pullAllStockAggregates(firstDate, lastDate) {
const symbols = (
await query(
`SELECT DISTINCT(symbol) as symbol FROM option_contract_existences WHERE asOfDate >= '${firstDate}' AND asOfDate <= '${lastDate}'`
)
).map(({ symbol }) => symbol);
for (const symbol of symbols) {
await pullStockAggregates(symbol, firstDate, lastDate);
}
}
export async function* makeGetOptionContractsIterator(
symbol: string,
asOfDate: string

Loading…
Cancel
Save