From efbcbade11fa1ebc46f4e4b42cc996cff56a82b0 Mon Sep 17 00:00:00 2001 From: Avraham Sakal Date: Sun, 17 Mar 2024 12:02:41 -0400 Subject: [PATCH] implement retries --- server/package.json | 1 + server/pnpm-lock.yaml | 26 ++++ ...ption-quotes-from-polygon-to-clickhouse.ts | 114 +++++++++++------- 3 files changed, 95 insertions(+), 46 deletions(-) diff --git a/server/package.json b/server/package.json index 677ed0b..5a30e2d 100644 --- a/server/package.json +++ b/server/package.json @@ -15,6 +15,7 @@ "dotenv": "^16.4.1", "p-all": "^5.0.0", "p-queue": "^8.0.1", + "p-retry": "^6.2.0", "p-series": "^3.0.0", "p-throttle": "^6.1.0" }, diff --git a/server/pnpm-lock.yaml b/server/pnpm-lock.yaml index bb4b9b6..492cd5f 100644 --- a/server/pnpm-lock.yaml +++ b/server/pnpm-lock.yaml @@ -26,6 +26,9 @@ dependencies: p-queue: specifier: ^8.0.1 version: 8.0.1 + p-retry: + specifier: ^6.2.0 + version: 6.2.0 p-series: specifier: ^3.0.0 version: 3.0.0 @@ -290,6 +293,10 @@ packages: undici-types: 5.26.5 dev: true + /@types/retry@0.12.2: + resolution: {integrity: sha512-XISRgDJ2Tc5q4TRqvgJtzsRkFYNJzZrhTdtMoGVBttwzzQJkPnS3WWTFc7kuDRoPtPakl+T+OfdEUjYJj7Jbow==} + dev: false + /ansi-styles@3.2.1: resolution: {integrity: sha512-VT0ZI6kZRdTh8YyJw3SMbYm/u+NqfsAxEpWO0Pf9sq8/e94WxxOpPKx9FR1FlyCtOVDNOQ+8ntlqFxiRc+r5qA==} engines: {node: '>=4'} @@ -674,6 +681,11 @@ packages: engines: {node: '>= 0.4'} dev: true + /is-network-error@1.0.1: + resolution: {integrity: sha512-OwQXkwBJeESyhFw+OumbJVD58BFBJJI5OM5S1+eyrDKlgDZPX2XNT5gXS56GSD3NPbbwUuMlR1Q71SRp5SobuQ==} + engines: {node: '>=16'} + dev: false + /is-number-object@1.0.7: resolution: {integrity: sha512-k1U0IRzLMo7ZlYIfzRu23Oh6MiIFasgpb9X76eqfFZAqwH44UI4KTBvBYIZ1dSL9ZzChTB9ShHfLkR4pdW5krQ==} engines: {node: '>= 0.4'} @@ -828,6 +840,15 @@ packages: p-timeout: 6.1.2 dev: false + /p-retry@6.2.0: + resolution: {integrity: sha512-JA6nkq6hKyWLLasXQXUrO4z8BUZGUt/LjlJxx8Gb2+2ntodU/SS63YZ8b0LUTbQ8ZB9iwOfhEPhg4ykKnn2KsA==} + engines: {node: '>=16.17'} + dependencies: + '@types/retry': 0.12.2 + is-network-error: 1.0.1 + retry: 0.13.1 + dev: false + /p-series@3.0.0: resolution: {integrity: sha512-geaabIwiqy+jN4vuJROl1rpMJT/myHAMAfdubPQGJT3Grr8td+ogWvTk2qLsNlhYXcoZZAfl01pfq7lK3/gYKQ==} engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} @@ -905,6 +926,11 @@ packages: supports-preserve-symlinks-flag: 1.0.0 dev: true + /retry@0.13.1: + resolution: {integrity: sha512-XQBQ3I8W1Cge0Seh+6gjj03LbmRFWuoszgK9ooCpwYIrhhoO80pfq4cUkU5DkknwfOfFteRwlZ56PYOGYyFWdg==} + engines: {node: '>= 4'} + dev: false + /safe-array-concat@1.0.1: resolution: {integrity: sha512-6XbUAseYE2KtOuGueyeobCySj9L4+66Tn6KQMOPQJrAJEowYKW/YR/MGJZl7FdydUdaFu4LYyDZjxf4/Nmo23Q==} engines: {node: '>=0.4'} diff --git a/server/src/scripts/ingest-option-quotes-from-polygon-to-clickhouse.ts b/server/src/scripts/ingest-option-quotes-from-polygon-to-clickhouse.ts index 1393cc7..a95f3b5 100644 --- a/server/src/scripts/ingest-option-quotes-from-polygon-to-clickhouse.ts +++ b/server/src/scripts/ingest-option-quotes-from-polygon-to-clickhouse.ts @@ -3,6 +3,7 @@ import { getApiKey } from "./polygon.js"; import pAll from "p-all"; import pQueue from "p-queue"; import pSeries from "p-series"; +import pRetry from "p-retry"; const optionContractToTicker = ({ symbol, @@ -51,26 +52,34 @@ async function getOptionAggregates( type, }); // first mark the sync of this particular option contract as "pending": - await clickhouse.insert({ - table: "amg_option_aggregate_sync_statuses", - values: [ - { - asOfDate, - symbol: underlyingSymbol, - expirationDate, - strike, - type, - status: "pending", - }, - ], - format: "JSONEachRow", - }); + await pRetry( + () => + clickhouse.insert({ + table: "amg_option_aggregate_sync_statuses", + values: [ + { + asOfDate, + symbol: underlyingSymbol, + expirationDate, + strike, + type, + status: "pending", + }, + ], + format: "JSONEachRow", + }), + { retries: 5, factor: 2, minTimeout: 1000, maxTimeout: 60 * 1000 } + ); - let latestBatchResponse = (await ( - await fetch( - `https://api.polygon.io/v2/aggs/ticker/${optionContractTicker}/range/1/minute/${asOfDate}/${asOfDate}?adjusted=false&sort=asc&limit=50000&apiKey=${await getApiKey()}` - ) - ).json()) as PolygonResponse; + let latestBatchResponse = await pRetry( + async () => + (await ( + await fetch( + `https://api.polygon.io/v2/aggs/ticker/${optionContractTicker}/range/1/minute/${asOfDate}/${asOfDate}?adjusted=false&sort=asc&limit=50000&apiKey=${await getApiKey()}` + ) + ).json()) as PolygonResponse, + { retries: 5, factor: 2, minTimeout: 1000, maxTimeout: 60 * 1000 } + ); if (!latestBatchResponse.results) { console.log(latestBatchResponse); return; @@ -87,25 +96,33 @@ async function getOptionAggregates( low: result.l, high: result.h, })); - await clickhouse.insert({ - table: "option_aggregates", - values: latestBatch, - format: "JSONEachRow", - }); - await clickhouse.insert({ - table: "amg_option_aggregate_sync_statuses", - values: [ - { - asOfDate, - symbol: underlyingSymbol, - expirationDate, - strike, - type, - status: "done", - }, - ], - format: "JSONEachRow", - }); + await pRetry( + () => + clickhouse.insert({ + table: "option_aggregates", + values: latestBatch, + format: "JSONEachRow", + }), + { retries: 5, factor: 2, minTimeout: 1000, maxTimeout: 60 * 1000 } + ); + await pRetry( + () => + clickhouse.insert({ + table: "amg_option_aggregate_sync_statuses", + values: [ + { + asOfDate, + symbol: underlyingSymbol, + expirationDate, + strike, + type, + status: "done", + }, + ], + format: "JSONEachRow", + }), + { retries: 5, factor: 2, minTimeout: 1000, maxTimeout: 60 * 1000 } + ); } type OptionContract = { @@ -122,7 +139,9 @@ async function getNextBatchOfUnstartedOptionAggregates( if (typeof previousUnstartedOptionContract === "undefined") { return; } - const optionContractsWithoutAggregates = await query(` + const optionContractsWithoutAggregates = await pRetry( + () => + query(` SELECT asOfDate, symbol, @@ -161,7 +180,9 @@ async function getNextBatchOfUnstartedOptionAggregates( AND status = 'not-started' ORDER BY asOfDate, symbol, expirationDate, strike, type LIMIT ${limit} - `); + `), + { retries: 5, factor: 2, minTimeout: 1000, maxTimeout: 60 * 1000 } + ); return optionContractsWithoutAggregates; } @@ -239,7 +260,8 @@ async function revertPendingSyncs() { console.log(); }), ]) - ) + ), + { concurrency: 1 } ); } @@ -250,13 +272,13 @@ async function revertPendingSyncs() { * * This queries Polygon with a concurrency of 6. */ -const q = new pQueue({ concurrency: 60 }); +const q = new pQueue({ concurrency: 6 }); /** Initialized with the lowest possible option contract. * It's passed into `getNextUnstartedSymbolAndAsOfDate()`. */ let nextBatchOfUnstartedOptionContracts: Array = [ { - asOfDate: "2022-03-15", + asOfDate: "2022-03-18", symbol: "A", expirationDate: "2022-02-01", strike: 0, @@ -287,9 +309,9 @@ while ( }) ) ); - // don't loop again until the queue has less than 50 items; we don't want it to grow in memory without bound: - console.log("Waiting till less than 50 in queue"); - await q.onSizeLessThan(50); + // don't loop again until the queue has less than 2 items; we don't want it to grow in memory without bound: + console.log("Waiting till less than 2 in queue"); + await q.onSizeLessThan(2); } // wait until pending queue operations are done: await q.onSizeLessThan(1);