Compare commits

...

3 Commits

Author SHA1 Message Date
Avraham Sakal 742414697f add stock aggregate fetching functions 2024-07-12 19:39:13 -04:00
Avraham Sakal df4925e3a4 add materialized view to keep individual contract existences 2024-07-05 16:47:15 -04:00
Avraham Sakal cd9dd9fefc I don't have the underlying data yet 2024-07-05 16:42:04 -04:00
4 changed files with 170 additions and 13 deletions
+6 -5
View File
@@ -37,10 +37,11 @@ export function RpcType<T extends TSchema>(schema: T) {
const appRouter = router({
getAvailableUnderlyings: publicProcedure.query(async (opts) => {
// return (await query<{symbol:string}>(`
// SELECT DISTINCT(symbol) as symbol FROM option_contract_existences
// `))
// .map(({symbol})=>symbol);
// return (
// await query<{ symbol: string }>(`
// SELECT DISTINCT(symbol) as symbol FROM option_contract_existences WHERE asOfDate = (SELECT max(asOfDate) FROM option_contract_existences)
// `)
// ).map(({ symbol }) => symbol);
return ["AAPL", "AMD", "GOOGL", "MSFT", "NFLX"];
}),
getAvailableAsOfDates: publicProcedure
@@ -142,7 +143,7 @@ const appRouter = router({
SELECT
toUnixTimestamp(tsStart) as x,
open as y
FROM option_aggregates
FROM option_contract_aggregates
WHERE symbol = '${underlying}'
AND expirationDate = '${expirationDate}'
AND strike = ${strike}
+84 -3
View File
@@ -1,4 +1,4 @@
// import pThrottle from "p-throttle";
import pThrottle from "p-throttle";
import pRetry from "p-retry";
import { Env } from "@humanwhocodes/env";
@@ -7,8 +7,8 @@ const env = new Env();
const { POLYGON_API_KEY } = env.required;
const apiKey = POLYGON_API_KEY;
// export const getApiKey = pThrottle({ limit: 5, interval: 60000 })(() => apiKey);
export const getApiKey = () => apiKey;
export const getApiKey = pThrottle({ limit: 5, interval: 72000 })(() => apiKey);
// export const getApiKey = () => apiKey;
export const optionContractToTicker = ({
symbol,
@@ -154,3 +154,84 @@ export async function* makeGetOptionContractAggregatesIterator({
);
}
}
type PolygonStockAggregatesResponse = {
next_url?: string;
status: string;
resultsCount: number;
results: Array<{
c: number;
h: number;
n: number;
l: number;
o: number;
t: number;
v: number;
vw: number;
}>;
};
export async function* makeGetStockAggregatesIterator({
symbol,
firstDate,
lastDate,
}: {
symbol: string;
firstDate: string;
lastDate: string;
}) {
const url = `https://api.polygon.io/v2/aggs/ticker/${symbol}/range/1/minute/${firstDate}/${lastDate}?adjusted=false&sort=asc&limit=10000&apiKey=${await getApiKey()}`;
let latestBatchResponse = await pRetry(
async () =>
(await (await fetch(url)).json()) as PolygonStockAggregatesResponse,
{ forever: true, factor: 2, maxTimeout: 120000 }
);
if (latestBatchResponse.status.toLowerCase() === "ok") {
yield latestBatchResponse.results?.map((result) => ({
symbol,
tsStart: (result.t || 0) / 1000,
open: result.o,
close: result.c,
low: result.l,
high: result.h,
})) || [];
} else if (latestBatchResponse.status === "NOT_AUTHORIZED") {
console.error("Skipping due to:", latestBatchResponse);
} else if (latestBatchResponse.status === "DELAYED") {
console.error("Skipping due to:", latestBatchResponse);
} else {
console.error(latestBatchResponse);
throw new Error(`error fetching option contract aggregate ${url}`);
}
// as long as there's a `next_url`, call that:
while (latestBatchResponse.hasOwnProperty("next_url")) {
latestBatchResponse = await pRetry(
async () =>
(await (
await fetch(
`${latestBatchResponse.next_url}&apiKey=${await getApiKey()}`
)
).json()) as PolygonStockAggregatesResponse,
{ forever: true, factor: 2, maxTimeout: 120000 }
);
if (latestBatchResponse.status.toLowerCase() === "ok") {
yield latestBatchResponse.results?.map((result) => ({
symbol,
tsStart: (result.t || 0) / 1000,
open: result.o,
close: result.c,
low: result.l,
high: result.h,
})) || [];
} else if (latestBatchResponse.status === "NOT_AUTHORIZED") {
console.error("Skipping due to:", latestBatchResponse);
} else if (latestBatchResponse.status === "DELAYED") {
console.error("Skipping due to:", latestBatchResponse);
} else {
console.error(latestBatchResponse);
throw new Error(`error fetching option contract aggregate ${url}`);
}
}
}
+65 -5
View File
@@ -80,7 +80,7 @@ export async function setPullOptionContractsState(
);
}
export async function getPullOptionContractAggregatesState(ticker: string) {
export async function getPullAggregatesState(ticker: string) {
const state = await sqliteDb.get(
`
SELECT
@@ -102,7 +102,7 @@ const enum OptionContractAggregatesSyncStatus {
type OptionContractAggregatesSyncState = {
status: OptionContractAggregatesSyncStatus;
};
export async function setPullOptionContractAggregatesState(
export async function setPullAggregatesState(
ticker: string,
state: OptionContractAggregatesSyncState
) {
@@ -154,10 +154,10 @@ export async function pullOptionContractAggregates(
const ticker = Polygon.optionContractToTicker(optionContract);
// check if sync was completed:
if (
(await getPullOptionContractAggregatesState(ticker))?.status !==
(await getPullAggregatesState(ticker))?.status !==
OptionContractAggregatesSyncStatus.COMPLETED
) {
await setPullOptionContractAggregatesState(ticker, {
await setPullAggregatesState(ticker, {
status: OptionContractAggregatesSyncStatus.STARTED,
});
const { firstDate } = await getOptionContractDateRange(optionContract);
@@ -186,12 +186,72 @@ export async function pullOptionContractAggregates(
);
}
}
await setPullOptionContractAggregatesState(ticker, {
await setPullAggregatesState(ticker, {
status: OptionContractAggregatesSyncStatus.COMPLETED,
});
}
}
export async function pullStockAggregates(
symbol: string,
firstDate: string,
lastDate: string
) {
// check if sync was completed:
if (
(await getPullAggregatesState(symbol))?.status !==
OptionContractAggregatesSyncStatus.COMPLETED
) {
await setPullAggregatesState(symbol, {
status: OptionContractAggregatesSyncStatus.STARTED,
});
for await (const batch of Polygon.makeGetStockAggregatesIterator({
symbol,
firstDate,
lastDate,
})) {
if (batch.length > 0) {
console.log(
symbol,
new Date(batch[0].tsStart * 1000),
new Date(batch[batch.length - 1].tsStart * 1000)
);
await clickhouse.insert({
table: "stock_aggregates",
values: batch,
format: "JSONEachRow",
});
// await pRetry(
// async () => {
// console.log(`inserting ${batch.length} rows`);
// await clickhouse.insert({
// table: "stock_aggregates",
// values: batch,
// format: "JSONEachRow",
// });
// console.log("inserted");
// },
// { forever: true, factor: 2, maxTimeout: 120000 }
// );
}
}
await setPullAggregatesState(symbol, {
status: OptionContractAggregatesSyncStatus.COMPLETED,
});
}
}
export async function pullAllStockAggregates(firstDate, lastDate) {
const symbols = (
await query(
`SELECT DISTINCT(symbol) as symbol FROM option_contract_existences WHERE asOfDate >= '${firstDate}' AND asOfDate <= '${lastDate}'`
)
).map(({ symbol }) => symbol);
for (const symbol of symbols) {
await pullStockAggregates(symbol, firstDate, lastDate);
}
}
export async function* makeGetOptionContractsIterator(
symbol: string,
asOfDate: string
+15
View File
@@ -23,6 +23,21 @@ CREATE TABLE option_contract_existences
ENGINE ReplacingMergeTree()
PRIMARY KEY (asOfDate, symbol)
ORDER BY (asOfDate, symbol, expirationDate, strike, type);
CREATE MATERIALIZED VIEW option_contract_existences_mv TO option_contract_existences
AS
SELECT
toDate(tsStart) as asOfDate,
symbol,
expirationDate,
strike,
type
FROM option_contract_aggregates
GROUP BY
asOfDate,
symbol,
expirationDate,
strike,
type;
CREATE TABLE option_contracts
(