Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 742414697f | |||
| df4925e3a4 | |||
| cd9dd9fefc |
+6
-5
@@ -37,10 +37,11 @@ export function RpcType<T extends TSchema>(schema: T) {
|
||||
|
||||
const appRouter = router({
|
||||
getAvailableUnderlyings: publicProcedure.query(async (opts) => {
|
||||
// return (await query<{symbol:string}>(`
|
||||
// SELECT DISTINCT(symbol) as symbol FROM option_contract_existences
|
||||
// `))
|
||||
// .map(({symbol})=>symbol);
|
||||
// return (
|
||||
// await query<{ symbol: string }>(`
|
||||
// SELECT DISTINCT(symbol) as symbol FROM option_contract_existences WHERE asOfDate = (SELECT max(asOfDate) FROM option_contract_existences)
|
||||
// `)
|
||||
// ).map(({ symbol }) => symbol);
|
||||
return ["AAPL", "AMD", "GOOGL", "MSFT", "NFLX"];
|
||||
}),
|
||||
getAvailableAsOfDates: publicProcedure
|
||||
@@ -142,7 +143,7 @@ const appRouter = router({
|
||||
SELECT
|
||||
toUnixTimestamp(tsStart) as x,
|
||||
open as y
|
||||
FROM option_aggregates
|
||||
FROM option_contract_aggregates
|
||||
WHERE symbol = '${underlying}'
|
||||
AND expirationDate = '${expirationDate}'
|
||||
AND strike = ${strike}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// import pThrottle from "p-throttle";
|
||||
import pThrottle from "p-throttle";
|
||||
import pRetry from "p-retry";
|
||||
import { Env } from "@humanwhocodes/env";
|
||||
|
||||
@@ -7,8 +7,8 @@ const env = new Env();
|
||||
const { POLYGON_API_KEY } = env.required;
|
||||
|
||||
const apiKey = POLYGON_API_KEY;
|
||||
// export const getApiKey = pThrottle({ limit: 5, interval: 60000 })(() => apiKey);
|
||||
export const getApiKey = () => apiKey;
|
||||
export const getApiKey = pThrottle({ limit: 5, interval: 72000 })(() => apiKey);
|
||||
// export const getApiKey = () => apiKey;
|
||||
|
||||
export const optionContractToTicker = ({
|
||||
symbol,
|
||||
@@ -154,3 +154,84 @@ export async function* makeGetOptionContractAggregatesIterator({
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
type PolygonStockAggregatesResponse = {
|
||||
next_url?: string;
|
||||
status: string;
|
||||
resultsCount: number;
|
||||
results: Array<{
|
||||
c: number;
|
||||
h: number;
|
||||
n: number;
|
||||
l: number;
|
||||
o: number;
|
||||
t: number;
|
||||
v: number;
|
||||
vw: number;
|
||||
}>;
|
||||
};
|
||||
export async function* makeGetStockAggregatesIterator({
|
||||
symbol,
|
||||
firstDate,
|
||||
lastDate,
|
||||
}: {
|
||||
symbol: string;
|
||||
firstDate: string;
|
||||
lastDate: string;
|
||||
}) {
|
||||
const url = `https://api.polygon.io/v2/aggs/ticker/${symbol}/range/1/minute/${firstDate}/${lastDate}?adjusted=false&sort=asc&limit=10000&apiKey=${await getApiKey()}`;
|
||||
let latestBatchResponse = await pRetry(
|
||||
async () =>
|
||||
(await (await fetch(url)).json()) as PolygonStockAggregatesResponse,
|
||||
{ forever: true, factor: 2, maxTimeout: 120000 }
|
||||
);
|
||||
if (latestBatchResponse.status.toLowerCase() === "ok") {
|
||||
yield latestBatchResponse.results?.map((result) => ({
|
||||
symbol,
|
||||
|
||||
tsStart: (result.t || 0) / 1000,
|
||||
open: result.o,
|
||||
close: result.c,
|
||||
low: result.l,
|
||||
high: result.h,
|
||||
})) || [];
|
||||
} else if (latestBatchResponse.status === "NOT_AUTHORIZED") {
|
||||
console.error("Skipping due to:", latestBatchResponse);
|
||||
} else if (latestBatchResponse.status === "DELAYED") {
|
||||
console.error("Skipping due to:", latestBatchResponse);
|
||||
} else {
|
||||
console.error(latestBatchResponse);
|
||||
throw new Error(`error fetching option contract aggregate ${url}`);
|
||||
}
|
||||
|
||||
// as long as there's a `next_url`, call that:
|
||||
while (latestBatchResponse.hasOwnProperty("next_url")) {
|
||||
latestBatchResponse = await pRetry(
|
||||
async () =>
|
||||
(await (
|
||||
await fetch(
|
||||
`${latestBatchResponse.next_url}&apiKey=${await getApiKey()}`
|
||||
)
|
||||
).json()) as PolygonStockAggregatesResponse,
|
||||
{ forever: true, factor: 2, maxTimeout: 120000 }
|
||||
);
|
||||
if (latestBatchResponse.status.toLowerCase() === "ok") {
|
||||
yield latestBatchResponse.results?.map((result) => ({
|
||||
symbol,
|
||||
|
||||
tsStart: (result.t || 0) / 1000,
|
||||
open: result.o,
|
||||
close: result.c,
|
||||
low: result.l,
|
||||
high: result.h,
|
||||
})) || [];
|
||||
} else if (latestBatchResponse.status === "NOT_AUTHORIZED") {
|
||||
console.error("Skipping due to:", latestBatchResponse);
|
||||
} else if (latestBatchResponse.status === "DELAYED") {
|
||||
console.error("Skipping due to:", latestBatchResponse);
|
||||
} else {
|
||||
console.error(latestBatchResponse);
|
||||
throw new Error(`error fetching option contract aggregate ${url}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
+65
-5
@@ -80,7 +80,7 @@ export async function setPullOptionContractsState(
|
||||
);
|
||||
}
|
||||
|
||||
export async function getPullOptionContractAggregatesState(ticker: string) {
|
||||
export async function getPullAggregatesState(ticker: string) {
|
||||
const state = await sqliteDb.get(
|
||||
`
|
||||
SELECT
|
||||
@@ -102,7 +102,7 @@ const enum OptionContractAggregatesSyncStatus {
|
||||
type OptionContractAggregatesSyncState = {
|
||||
status: OptionContractAggregatesSyncStatus;
|
||||
};
|
||||
export async function setPullOptionContractAggregatesState(
|
||||
export async function setPullAggregatesState(
|
||||
ticker: string,
|
||||
state: OptionContractAggregatesSyncState
|
||||
) {
|
||||
@@ -154,10 +154,10 @@ export async function pullOptionContractAggregates(
|
||||
const ticker = Polygon.optionContractToTicker(optionContract);
|
||||
// check if sync was completed:
|
||||
if (
|
||||
(await getPullOptionContractAggregatesState(ticker))?.status !==
|
||||
(await getPullAggregatesState(ticker))?.status !==
|
||||
OptionContractAggregatesSyncStatus.COMPLETED
|
||||
) {
|
||||
await setPullOptionContractAggregatesState(ticker, {
|
||||
await setPullAggregatesState(ticker, {
|
||||
status: OptionContractAggregatesSyncStatus.STARTED,
|
||||
});
|
||||
const { firstDate } = await getOptionContractDateRange(optionContract);
|
||||
@@ -186,12 +186,72 @@ export async function pullOptionContractAggregates(
|
||||
);
|
||||
}
|
||||
}
|
||||
await setPullOptionContractAggregatesState(ticker, {
|
||||
await setPullAggregatesState(ticker, {
|
||||
status: OptionContractAggregatesSyncStatus.COMPLETED,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export async function pullStockAggregates(
|
||||
symbol: string,
|
||||
firstDate: string,
|
||||
lastDate: string
|
||||
) {
|
||||
// check if sync was completed:
|
||||
if (
|
||||
(await getPullAggregatesState(symbol))?.status !==
|
||||
OptionContractAggregatesSyncStatus.COMPLETED
|
||||
) {
|
||||
await setPullAggregatesState(symbol, {
|
||||
status: OptionContractAggregatesSyncStatus.STARTED,
|
||||
});
|
||||
for await (const batch of Polygon.makeGetStockAggregatesIterator({
|
||||
symbol,
|
||||
firstDate,
|
||||
lastDate,
|
||||
})) {
|
||||
if (batch.length > 0) {
|
||||
console.log(
|
||||
symbol,
|
||||
new Date(batch[0].tsStart * 1000),
|
||||
new Date(batch[batch.length - 1].tsStart * 1000)
|
||||
);
|
||||
await clickhouse.insert({
|
||||
table: "stock_aggregates",
|
||||
values: batch,
|
||||
format: "JSONEachRow",
|
||||
});
|
||||
// await pRetry(
|
||||
// async () => {
|
||||
// console.log(`inserting ${batch.length} rows`);
|
||||
// await clickhouse.insert({
|
||||
// table: "stock_aggregates",
|
||||
// values: batch,
|
||||
// format: "JSONEachRow",
|
||||
// });
|
||||
// console.log("inserted");
|
||||
// },
|
||||
// { forever: true, factor: 2, maxTimeout: 120000 }
|
||||
// );
|
||||
}
|
||||
}
|
||||
await setPullAggregatesState(symbol, {
|
||||
status: OptionContractAggregatesSyncStatus.COMPLETED,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export async function pullAllStockAggregates(firstDate, lastDate) {
|
||||
const symbols = (
|
||||
await query(
|
||||
`SELECT DISTINCT(symbol) as symbol FROM option_contract_existences WHERE asOfDate >= '${firstDate}' AND asOfDate <= '${lastDate}'`
|
||||
)
|
||||
).map(({ symbol }) => symbol);
|
||||
for (const symbol of symbols) {
|
||||
await pullStockAggregates(symbol, firstDate, lastDate);
|
||||
}
|
||||
}
|
||||
|
||||
export async function* makeGetOptionContractsIterator(
|
||||
symbol: string,
|
||||
asOfDate: string
|
||||
|
||||
@@ -23,6 +23,21 @@ CREATE TABLE option_contract_existences
|
||||
ENGINE ReplacingMergeTree()
|
||||
PRIMARY KEY (asOfDate, symbol)
|
||||
ORDER BY (asOfDate, symbol, expirationDate, strike, type);
|
||||
CREATE MATERIALIZED VIEW option_contract_existences_mv TO option_contract_existences
|
||||
AS
|
||||
SELECT
|
||||
toDate(tsStart) as asOfDate,
|
||||
symbol,
|
||||
expirationDate,
|
||||
strike,
|
||||
type
|
||||
FROM option_contract_aggregates
|
||||
GROUP BY
|
||||
asOfDate,
|
||||
symbol,
|
||||
expirationDate,
|
||||
strike,
|
||||
type;
|
||||
|
||||
CREATE TABLE option_contracts
|
||||
(
|
||||
|
||||
Reference in New Issue
Block a user