Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 742414697f | |||
| df4925e3a4 | |||
| cd9dd9fefc |
+6
-5
@@ -37,10 +37,11 @@ export function RpcType<T extends TSchema>(schema: T) {
|
|||||||
|
|
||||||
const appRouter = router({
|
const appRouter = router({
|
||||||
getAvailableUnderlyings: publicProcedure.query(async (opts) => {
|
getAvailableUnderlyings: publicProcedure.query(async (opts) => {
|
||||||
// return (await query<{symbol:string}>(`
|
// return (
|
||||||
// SELECT DISTINCT(symbol) as symbol FROM option_contract_existences
|
// await query<{ symbol: string }>(`
|
||||||
// `))
|
// SELECT DISTINCT(symbol) as symbol FROM option_contract_existences WHERE asOfDate = (SELECT max(asOfDate) FROM option_contract_existences)
|
||||||
// .map(({symbol})=>symbol);
|
// `)
|
||||||
|
// ).map(({ symbol }) => symbol);
|
||||||
return ["AAPL", "AMD", "GOOGL", "MSFT", "NFLX"];
|
return ["AAPL", "AMD", "GOOGL", "MSFT", "NFLX"];
|
||||||
}),
|
}),
|
||||||
getAvailableAsOfDates: publicProcedure
|
getAvailableAsOfDates: publicProcedure
|
||||||
@@ -142,7 +143,7 @@ const appRouter = router({
|
|||||||
SELECT
|
SELECT
|
||||||
toUnixTimestamp(tsStart) as x,
|
toUnixTimestamp(tsStart) as x,
|
||||||
open as y
|
open as y
|
||||||
FROM option_aggregates
|
FROM option_contract_aggregates
|
||||||
WHERE symbol = '${underlying}'
|
WHERE symbol = '${underlying}'
|
||||||
AND expirationDate = '${expirationDate}'
|
AND expirationDate = '${expirationDate}'
|
||||||
AND strike = ${strike}
|
AND strike = ${strike}
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
// import pThrottle from "p-throttle";
|
import pThrottle from "p-throttle";
|
||||||
import pRetry from "p-retry";
|
import pRetry from "p-retry";
|
||||||
import { Env } from "@humanwhocodes/env";
|
import { Env } from "@humanwhocodes/env";
|
||||||
|
|
||||||
@@ -7,8 +7,8 @@ const env = new Env();
|
|||||||
const { POLYGON_API_KEY } = env.required;
|
const { POLYGON_API_KEY } = env.required;
|
||||||
|
|
||||||
const apiKey = POLYGON_API_KEY;
|
const apiKey = POLYGON_API_KEY;
|
||||||
// export const getApiKey = pThrottle({ limit: 5, interval: 60000 })(() => apiKey);
|
export const getApiKey = pThrottle({ limit: 5, interval: 72000 })(() => apiKey);
|
||||||
export const getApiKey = () => apiKey;
|
// export const getApiKey = () => apiKey;
|
||||||
|
|
||||||
export const optionContractToTicker = ({
|
export const optionContractToTicker = ({
|
||||||
symbol,
|
symbol,
|
||||||
@@ -154,3 +154,84 @@ export async function* makeGetOptionContractAggregatesIterator({
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PolygonStockAggregatesResponse = {
|
||||||
|
next_url?: string;
|
||||||
|
status: string;
|
||||||
|
resultsCount: number;
|
||||||
|
results: Array<{
|
||||||
|
c: number;
|
||||||
|
h: number;
|
||||||
|
n: number;
|
||||||
|
l: number;
|
||||||
|
o: number;
|
||||||
|
t: number;
|
||||||
|
v: number;
|
||||||
|
vw: number;
|
||||||
|
}>;
|
||||||
|
};
|
||||||
|
export async function* makeGetStockAggregatesIterator({
|
||||||
|
symbol,
|
||||||
|
firstDate,
|
||||||
|
lastDate,
|
||||||
|
}: {
|
||||||
|
symbol: string;
|
||||||
|
firstDate: string;
|
||||||
|
lastDate: string;
|
||||||
|
}) {
|
||||||
|
const url = `https://api.polygon.io/v2/aggs/ticker/${symbol}/range/1/minute/${firstDate}/${lastDate}?adjusted=false&sort=asc&limit=10000&apiKey=${await getApiKey()}`;
|
||||||
|
let latestBatchResponse = await pRetry(
|
||||||
|
async () =>
|
||||||
|
(await (await fetch(url)).json()) as PolygonStockAggregatesResponse,
|
||||||
|
{ forever: true, factor: 2, maxTimeout: 120000 }
|
||||||
|
);
|
||||||
|
if (latestBatchResponse.status.toLowerCase() === "ok") {
|
||||||
|
yield latestBatchResponse.results?.map((result) => ({
|
||||||
|
symbol,
|
||||||
|
|
||||||
|
tsStart: (result.t || 0) / 1000,
|
||||||
|
open: result.o,
|
||||||
|
close: result.c,
|
||||||
|
low: result.l,
|
||||||
|
high: result.h,
|
||||||
|
})) || [];
|
||||||
|
} else if (latestBatchResponse.status === "NOT_AUTHORIZED") {
|
||||||
|
console.error("Skipping due to:", latestBatchResponse);
|
||||||
|
} else if (latestBatchResponse.status === "DELAYED") {
|
||||||
|
console.error("Skipping due to:", latestBatchResponse);
|
||||||
|
} else {
|
||||||
|
console.error(latestBatchResponse);
|
||||||
|
throw new Error(`error fetching option contract aggregate ${url}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// as long as there's a `next_url`, call that:
|
||||||
|
while (latestBatchResponse.hasOwnProperty("next_url")) {
|
||||||
|
latestBatchResponse = await pRetry(
|
||||||
|
async () =>
|
||||||
|
(await (
|
||||||
|
await fetch(
|
||||||
|
`${latestBatchResponse.next_url}&apiKey=${await getApiKey()}`
|
||||||
|
)
|
||||||
|
).json()) as PolygonStockAggregatesResponse,
|
||||||
|
{ forever: true, factor: 2, maxTimeout: 120000 }
|
||||||
|
);
|
||||||
|
if (latestBatchResponse.status.toLowerCase() === "ok") {
|
||||||
|
yield latestBatchResponse.results?.map((result) => ({
|
||||||
|
symbol,
|
||||||
|
|
||||||
|
tsStart: (result.t || 0) / 1000,
|
||||||
|
open: result.o,
|
||||||
|
close: result.c,
|
||||||
|
low: result.l,
|
||||||
|
high: result.h,
|
||||||
|
})) || [];
|
||||||
|
} else if (latestBatchResponse.status === "NOT_AUTHORIZED") {
|
||||||
|
console.error("Skipping due to:", latestBatchResponse);
|
||||||
|
} else if (latestBatchResponse.status === "DELAYED") {
|
||||||
|
console.error("Skipping due to:", latestBatchResponse);
|
||||||
|
} else {
|
||||||
|
console.error(latestBatchResponse);
|
||||||
|
throw new Error(`error fetching option contract aggregate ${url}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
+65
-5
@@ -80,7 +80,7 @@ export async function setPullOptionContractsState(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function getPullOptionContractAggregatesState(ticker: string) {
|
export async function getPullAggregatesState(ticker: string) {
|
||||||
const state = await sqliteDb.get(
|
const state = await sqliteDb.get(
|
||||||
`
|
`
|
||||||
SELECT
|
SELECT
|
||||||
@@ -102,7 +102,7 @@ const enum OptionContractAggregatesSyncStatus {
|
|||||||
type OptionContractAggregatesSyncState = {
|
type OptionContractAggregatesSyncState = {
|
||||||
status: OptionContractAggregatesSyncStatus;
|
status: OptionContractAggregatesSyncStatus;
|
||||||
};
|
};
|
||||||
export async function setPullOptionContractAggregatesState(
|
export async function setPullAggregatesState(
|
||||||
ticker: string,
|
ticker: string,
|
||||||
state: OptionContractAggregatesSyncState
|
state: OptionContractAggregatesSyncState
|
||||||
) {
|
) {
|
||||||
@@ -154,10 +154,10 @@ export async function pullOptionContractAggregates(
|
|||||||
const ticker = Polygon.optionContractToTicker(optionContract);
|
const ticker = Polygon.optionContractToTicker(optionContract);
|
||||||
// check if sync was completed:
|
// check if sync was completed:
|
||||||
if (
|
if (
|
||||||
(await getPullOptionContractAggregatesState(ticker))?.status !==
|
(await getPullAggregatesState(ticker))?.status !==
|
||||||
OptionContractAggregatesSyncStatus.COMPLETED
|
OptionContractAggregatesSyncStatus.COMPLETED
|
||||||
) {
|
) {
|
||||||
await setPullOptionContractAggregatesState(ticker, {
|
await setPullAggregatesState(ticker, {
|
||||||
status: OptionContractAggregatesSyncStatus.STARTED,
|
status: OptionContractAggregatesSyncStatus.STARTED,
|
||||||
});
|
});
|
||||||
const { firstDate } = await getOptionContractDateRange(optionContract);
|
const { firstDate } = await getOptionContractDateRange(optionContract);
|
||||||
@@ -186,12 +186,72 @@ export async function pullOptionContractAggregates(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
await setPullOptionContractAggregatesState(ticker, {
|
await setPullAggregatesState(ticker, {
|
||||||
status: OptionContractAggregatesSyncStatus.COMPLETED,
|
status: OptionContractAggregatesSyncStatus.COMPLETED,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function pullStockAggregates(
|
||||||
|
symbol: string,
|
||||||
|
firstDate: string,
|
||||||
|
lastDate: string
|
||||||
|
) {
|
||||||
|
// check if sync was completed:
|
||||||
|
if (
|
||||||
|
(await getPullAggregatesState(symbol))?.status !==
|
||||||
|
OptionContractAggregatesSyncStatus.COMPLETED
|
||||||
|
) {
|
||||||
|
await setPullAggregatesState(symbol, {
|
||||||
|
status: OptionContractAggregatesSyncStatus.STARTED,
|
||||||
|
});
|
||||||
|
for await (const batch of Polygon.makeGetStockAggregatesIterator({
|
||||||
|
symbol,
|
||||||
|
firstDate,
|
||||||
|
lastDate,
|
||||||
|
})) {
|
||||||
|
if (batch.length > 0) {
|
||||||
|
console.log(
|
||||||
|
symbol,
|
||||||
|
new Date(batch[0].tsStart * 1000),
|
||||||
|
new Date(batch[batch.length - 1].tsStart * 1000)
|
||||||
|
);
|
||||||
|
await clickhouse.insert({
|
||||||
|
table: "stock_aggregates",
|
||||||
|
values: batch,
|
||||||
|
format: "JSONEachRow",
|
||||||
|
});
|
||||||
|
// await pRetry(
|
||||||
|
// async () => {
|
||||||
|
// console.log(`inserting ${batch.length} rows`);
|
||||||
|
// await clickhouse.insert({
|
||||||
|
// table: "stock_aggregates",
|
||||||
|
// values: batch,
|
||||||
|
// format: "JSONEachRow",
|
||||||
|
// });
|
||||||
|
// console.log("inserted");
|
||||||
|
// },
|
||||||
|
// { forever: true, factor: 2, maxTimeout: 120000 }
|
||||||
|
// );
|
||||||
|
}
|
||||||
|
}
|
||||||
|
await setPullAggregatesState(symbol, {
|
||||||
|
status: OptionContractAggregatesSyncStatus.COMPLETED,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function pullAllStockAggregates(firstDate, lastDate) {
|
||||||
|
const symbols = (
|
||||||
|
await query(
|
||||||
|
`SELECT DISTINCT(symbol) as symbol FROM option_contract_existences WHERE asOfDate >= '${firstDate}' AND asOfDate <= '${lastDate}'`
|
||||||
|
)
|
||||||
|
).map(({ symbol }) => symbol);
|
||||||
|
for (const symbol of symbols) {
|
||||||
|
await pullStockAggregates(symbol, firstDate, lastDate);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export async function* makeGetOptionContractsIterator(
|
export async function* makeGetOptionContractsIterator(
|
||||||
symbol: string,
|
symbol: string,
|
||||||
asOfDate: string
|
asOfDate: string
|
||||||
|
|||||||
@@ -23,6 +23,21 @@ CREATE TABLE option_contract_existences
|
|||||||
ENGINE ReplacingMergeTree()
|
ENGINE ReplacingMergeTree()
|
||||||
PRIMARY KEY (asOfDate, symbol)
|
PRIMARY KEY (asOfDate, symbol)
|
||||||
ORDER BY (asOfDate, symbol, expirationDate, strike, type);
|
ORDER BY (asOfDate, symbol, expirationDate, strike, type);
|
||||||
|
CREATE MATERIALIZED VIEW option_contract_existences_mv TO option_contract_existences
|
||||||
|
AS
|
||||||
|
SELECT
|
||||||
|
toDate(tsStart) as asOfDate,
|
||||||
|
symbol,
|
||||||
|
expirationDate,
|
||||||
|
strike,
|
||||||
|
type
|
||||||
|
FROM option_contract_aggregates
|
||||||
|
GROUP BY
|
||||||
|
asOfDate,
|
||||||
|
symbol,
|
||||||
|
expirationDate,
|
||||||
|
strike,
|
||||||
|
type;
|
||||||
|
|
||||||
CREATE TABLE option_contracts
|
CREATE TABLE option_contracts
|
||||||
(
|
(
|
||||||
|
|||||||
Reference in New Issue
Block a user