Compare commits

..

2 Commits

Author SHA1 Message Date
Avraham Sakal 71f72eb474 handle any socket hangups 2024-07-01 08:16:41 -04:00
Avraham Sakal f8279d4932 retry clickhouse insert in case of socket hangup 2024-06-30 21:45:51 -04:00
2 changed files with 46 additions and 21 deletions
+25 -11
View File
@@ -38,11 +38,15 @@ export async function* makeGetOptionContractsIterator(
symbol: string, symbol: string,
date: string date: string
) { ) {
let latestBatchResponse = (await ( let latestBatchResponse = await pRetry(
await fetch( async () =>
`https://api.polygon.io/v3/reference/options/contracts?underlying_ticker=${symbol}&as_of=${date}&sort=ticker&limit=1000&apiKey=${await getApiKey()}` (await (
) await fetch(
).json()) as PolygonOptionContractsResponse; `https://api.polygon.io/v3/reference/options/contracts?underlying_ticker=${symbol}&as_of=${date}&sort=ticker&limit=1000&apiKey=${await getApiKey()}`
)
).json()) as PolygonOptionContractsResponse,
{ forever: true, factor: 2, maxTimeout: 120000 }
);
yield latestBatchResponse.results.map((result) => ({ yield latestBatchResponse.results.map((result) => ({
asOfDate: date, asOfDate: date,
symbol, symbol,
@@ -53,9 +57,15 @@ export async function* makeGetOptionContractsIterator(
// as long as there's a `next_url`, call that: // as long as there's a `next_url`, call that:
while (latestBatchResponse.hasOwnProperty("next_url")) { while (latestBatchResponse.hasOwnProperty("next_url")) {
latestBatchResponse = (await ( latestBatchResponse = await pRetry(
await fetch(`${latestBatchResponse.next_url}&apiKey=${await getApiKey()}`) async () =>
).json()) as PolygonOptionContractsResponse; (await (
await fetch(
`${latestBatchResponse.next_url}&apiKey=${await getApiKey()}`
)
).json()) as PolygonOptionContractsResponse,
{ forever: true, factor: 2, maxTimeout: 120000 }
);
yield latestBatchResponse.results?.map((result) => ({ yield latestBatchResponse.results?.map((result) => ({
asOfDate: date, asOfDate: date,
symbol, symbol,
@@ -106,9 +116,13 @@ export async function* makeGetOptionContractAggregatesIterator({
const asOfDate = currentDateAsDateObject.toISOString().substring(0, 10); const asOfDate = currentDateAsDateObject.toISOString().substring(0, 10);
const url = `https://api.polygon.io/v2/aggs/ticker/${optionContractTicker}/range/1/minute/${asOfDate}/${asOfDate}?adjusted=false&sort=asc&limit=50000&apiKey=${await getApiKey()}`; const url = `https://api.polygon.io/v2/aggs/ticker/${optionContractTicker}/range/1/minute/${asOfDate}/${asOfDate}?adjusted=false&sort=asc&limit=50000&apiKey=${await getApiKey()}`;
let latestBatchResponse; let latestBatchResponse;
latestBatchResponse = (await ( latestBatchResponse = await pRetry(
await fetch(url) async () =>
).json()) as PolygonOptionContractAggregatesResponse; (await (
await fetch(url)
).json()) as PolygonOptionContractAggregatesResponse,
{ forever: true, factor: 2, maxTimeout: 120000 }
);
if (latestBatchResponse.status.toLowerCase() === "ok") { if (latestBatchResponse.status.toLowerCase() === "ok") {
yield latestBatchResponse.results?.map((result) => ({ yield latestBatchResponse.results?.map((result) => ({
symbol, symbol,
+21 -10
View File
@@ -3,6 +3,7 @@ import sqlite3 from "sqlite3";
import { open } from "sqlite"; import { open } from "sqlite";
import { clickhouse, query } from "./clickhouse.js"; import { clickhouse, query } from "./clickhouse.js";
import { OptionContract } from "./polygon.js"; import { OptionContract } from "./polygon.js";
import pRetry from "p-retry";
const sqliteDb = await open({ const sqliteDb = await open({
filename: "/tmp/sync-state.db", filename: "/tmp/sync-state.db",
@@ -119,11 +120,16 @@ export async function pullOptionContracts(symbol: string, date: string) {
date date
)) { )) {
console.log(batch.length); console.log(batch.length);
await clickhouse.insert({ await pRetry(
table: "option_contract_existences", async () => {
values: batch, await clickhouse.insert({
format: "JSONEachRow", table: "option_contract_existences",
}); values: batch,
format: "JSONEachRow",
});
},
{ forever: true, factor: 2, maxTimeout: 120000 }
);
} }
await setPullOptionContractsState(symbol, date, { await setPullOptionContractsState(symbol, date, {
status: OptionContractSyncStatus.COMPLETED, status: OptionContractSyncStatus.COMPLETED,
@@ -157,11 +163,16 @@ export async function pullOptionContractAggregates(
new Date(batch[0].tsStart * 1000), new Date(batch[0].tsStart * 1000),
new Date(batch[batch.length - 1].tsStart * 1000) new Date(batch[batch.length - 1].tsStart * 1000)
); );
await clickhouse.insert({ await pRetry(
table: "option_contract_aggregates", async () => {
values: batch, await clickhouse.insert({
format: "JSONEachRow", table: "option_contract_aggregates",
}); values: batch,
format: "JSONEachRow",
});
},
{ forever: true, factor: 2, maxTimeout: 120000 }
);
} }
} }
await setPullOptionContractAggregatesState(ticker, { await setPullOptionContractAggregatesState(ticker, {