fix ingestion script not marking `done` when no results are returned

main
Avraham Sakal 1 year ago
parent a0970a7fca
commit 5614586b66

@ -27,6 +27,8 @@ const optionContractToTicker = ({
type PolygonResponse = { type PolygonResponse = {
next_url?: string; next_url?: string;
status: string;
resultsCount: number;
results: Array<{ results: Array<{
c: number; c: number;
h: number; h: number;
@ -80,31 +82,33 @@ async function getOptionAggregates(
).json()) as PolygonResponse, ).json()) as PolygonResponse,
{ retries: 5, factor: 2, minTimeout: 1000, maxTimeout: 60 * 1000 } { retries: 5, factor: 2, minTimeout: 1000, maxTimeout: 60 * 1000 }
); );
if (!latestBatchResponse.results) { if (latestBatchResponse.status.toLowerCase() !== "ok") {
console.log(latestBatchResponse); console.log(latestBatchResponse);
return; return;
} }
let latestBatch = latestBatchResponse.results.map((result) => ({ if (latestBatchResponse.resultsCount > 0) {
symbol: underlyingSymbol, let latestBatch = latestBatchResponse.results.map((result) => ({
expirationDate, symbol: underlyingSymbol,
strike, expirationDate,
type, strike,
type,
tsStart: (result.t || 0) / 1000, tsStart: (result.t || 0) / 1000,
open: result.o, open: result.o,
close: result.c, close: result.c,
low: result.l, low: result.l,
high: result.h, high: result.h,
})); }));
await pRetry( await pRetry(
() => () =>
clickhouse.insert({ clickhouse.insert({
table: "option_aggregates", table: "option_aggregates",
values: latestBatch, values: latestBatch,
format: "JSONEachRow", format: "JSONEachRow",
}), }),
{ retries: 5, factor: 2, minTimeout: 1000, maxTimeout: 60 * 1000 } { retries: 5, factor: 2, minTimeout: 1000, maxTimeout: 60 * 1000 }
); );
}
await pRetry( await pRetry(
() => () =>
clickhouse.insert({ clickhouse.insert({
@ -137,19 +141,19 @@ async function getNextBatchOfUnstartedOptionAggregates(
limit: number limit: number
): Promise<Array<OptionContractDay>> { ): Promise<Array<OptionContractDay>> {
if (typeof previousUnstartedOptionContract === "undefined") { if (typeof previousUnstartedOptionContract === "undefined") {
return; return [];
} }
const optionContractsWithoutAggregates = await pRetry( const queryContents = `
() =>
query<OptionContractDay>(`
SELECT SELECT
asOfDate, asOfDate,
symbol, symbol,
expirationDate, expirationDate,
strike, strike,
type type,
argMax(status, ts) as status
FROM amg_option_aggregate_sync_statuses FROM amg_option_aggregate_sync_statuses
WHERE ( WHERE symbol IN ['AAPL','AMD','GOOGL','MSFT','NFLX']
AND (
( (
asOfDate = '${previousUnstartedOptionContract.asOfDate}' asOfDate = '${previousUnstartedOptionContract.asOfDate}'
AND symbol = '${previousUnstartedOptionContract.symbol}' AND symbol = '${previousUnstartedOptionContract.symbol}'
@ -177,13 +181,18 @@ async function getNextBatchOfUnstartedOptionAggregates(
asOfDate > '${previousUnstartedOptionContract.asOfDate}' asOfDate > '${previousUnstartedOptionContract.asOfDate}'
) )
) )
AND status = 'not-started' GROUP BY asOfDate, symbol, expirationDate, strike, type
HAVING status = 'not-started'
ORDER BY asOfDate, symbol, expirationDate, strike, type ORDER BY asOfDate, symbol, expirationDate, strike, type
LIMIT ${limit} LIMIT ${limit}
`), `;
//console.log(queryContents);
const optionContractsWithoutAggregates = await pRetry(
() => query<OptionContractDay>(queryContents),
{ retries: 5, factor: 2, minTimeout: 1000, maxTimeout: 60 * 1000 } { retries: 5, factor: 2, minTimeout: 1000, maxTimeout: 60 * 1000 }
); );
return optionContractsWithoutAggregates; console.log(`Got ${optionContractsWithoutAggregates.length} records`);
return optionContractsWithoutAggregates || [];
} }
/** /**
@ -194,91 +203,97 @@ async function getNextBatchOfUnstartedOptionAggregates(
* so as to start afresh. * so as to start afresh.
*/ */
async function revertPendingSyncs() { async function revertPendingSyncs() {
const pendingOptionContracts = await query<{ const batchSize = 1000;
asOfDate: string; let pendingOptionContracts;
symbol: string; do {
expirationDate: string; pendingOptionContracts = await query<{
strike: number; asOfDate: string;
type: "call" | "put"; symbol: string;
latestStatus: "not-started" | "pending" | "done"; expirationDate: string;
}>(` strike: number;
SELECT type: "call" | "put";
latestStatus: "not-started" | "pending" | "done";
}>(`
SELECT
asOfDate, asOfDate,
symbol, symbol,
expirationDate, expirationDate,
strike, strike,
type type,
argMax(status, ts) as status
FROM amg_option_aggregate_sync_statuses FROM amg_option_aggregate_sync_statuses
WHERE status = 'pending' WHERE symbol IN ['AAPL','AMD','GOOGL','MSFT','NFLX']
ORDER BY asOfDate, symbol, expirationDate, strike, type GROUP BY asOfDate, symbol, expirationDate, strike, type
`); HAVING status = 'pending'
console.log( LIMIT ${batchSize}
"Pending operations:", `);
pendingOptionContracts.map( console.log(
({ asOfDate, symbol, expirationDate, strike, type }) => "Pending operations:",
`${symbol} ${expirationDate} ${strike} ${type} @ ${asOfDate}` pendingOptionContracts.map(
) ({ asOfDate, symbol, expirationDate, strike, type }) =>
); `${symbol} ${expirationDate} ${strike} ${type} @ ${asOfDate}`
await pAll( )
pendingOptionContracts.map( );
({ asOfDate, symbol, expirationDate, strike, type }) => await pSeries([
() => // Delete option_contracts first, in case this `pAll` operation fails and we need to restart; so `option_contract_sync_statuses` "pending" rows are still there for the restart
pSeries([ () =>
// Delete option_contracts first, in case this `pAll` operation fails and we need to restart; so `option_contract_sync_statuses` "pending" rows are still there for the restart clickhouse
() => .command({
clickhouse query: `
.command({ DELETE FROM option_aggregates
query: ` WHERE (symbol, expirationDate, strike, type, toDate(tsStart))
DELETE FROM option_aggregates IN [${pendingOptionContracts
WHERE symbol = '${symbol}' .map(
AND expirationDate = '${expirationDate}' ({ asOfDate, symbol, expirationDate, strike, type }) =>
AND strike = ${strike} `('${symbol}', '${expirationDate}', ${strike}, '${type}', '${asOfDate}')`
AND type = '${type}' )
AND toDate(tsStart) = '${asOfDate}' .join(",")}
`, ]
}) `,
.then(() => { })
console.log(`Deleted aggregates for `); .then(() => {
}), console.log(`Deleted ${pendingOptionContracts.length} aggregates`);
() => }),
clickhouse () =>
.insert({ clickhouse
table: "amg_option_aggregate_sync_statuses", .insert({
values: [ table: "amg_option_aggregate_sync_statuses",
{ values: pendingOptionContracts.map(
asOfDate, ({ asOfDate, symbol, expirationDate, strike, type }) => ({
symbol, asOfDate,
expirationDate, symbol,
strike, expirationDate,
type, strike,
status: "not-started", type,
}, status: "not-started",
], })
format: "JSONEachRow", ),
}) format: "JSONEachRow",
.then(() => { })
console.log(); .then(() => {}),
}), ]);
]) } while (pendingOptionContracts.length === batchSize);
), await clickhouse.command({
{ concurrency: 1 } query: `
); OPTIMIZE TABLE amg_option_aggregate_sync_statuses FINAL
`,
});
} }
// First, revert 'pending' syncs: // First, revert 'pending' syncs:
//await revertPendingSyncs(); await revertPendingSyncs();
/** Second, for each option contract, get all of its quotes. /** Second, for each option contract, get all of its quotes.
* *
* This queries Polygon with a concurrency of 6. * This queries Polygon with a concurrency of 16.
*/ */
const q = new pQueue({ concurrency: 6 }); const q = new pQueue({ concurrency: 16 });
/** Initialized with the lowest possible option contract. /** Initialized with the lowest possible option contract.
* It's passed into `getNextUnstartedSymbolAndAsOfDate()`. * It's passed into `getNextUnstartedSymbolAndAsOfDate()`.
*/ */
let nextBatchOfUnstartedOptionContracts: Array<OptionContractDay> = [ let nextBatchOfUnstartedOptionContracts: Array<OptionContractDay> = [
{ {
asOfDate: "2022-04-05", asOfDate: "2022-03-27",
symbol: "A", symbol: "A",
expirationDate: "2022-02-01", expirationDate: "2022-02-01",
strike: 0, strike: 0,
@ -289,8 +304,8 @@ while (
(nextBatchOfUnstartedOptionContracts = (nextBatchOfUnstartedOptionContracts =
await getNextBatchOfUnstartedOptionAggregates( await getNextBatchOfUnstartedOptionAggregates(
nextBatchOfUnstartedOptionContracts.pop(), nextBatchOfUnstartedOptionContracts.pop(),
200 100
)) !== null )).length !== 0
) { ) {
await pAll( await pAll(
nextBatchOfUnstartedOptionContracts.map( nextBatchOfUnstartedOptionContracts.map(

@ -77,7 +77,7 @@ CREATE TABLE amg_option_aggregate_sync_statuses (
ts DateTime64 DEFAULT now() ts DateTime64 DEFAULT now()
) )
ENGINE=AggregatingMergeTree ENGINE=AggregatingMergeTree
ORDER BY (asOfDate, symbol, expirationDate, strike, type, ts); ORDER BY (asOfDate, symbol, expirationDate, strike, type);
INSERT INTO amg_option_aggregate_sync_statuses INSERT INTO amg_option_aggregate_sync_statuses
SELECT asOfDate, symbol, expirationDate, strike, type, status, ts SELECT asOfDate, symbol, expirationDate, strike, type, status, ts

Loading…
Cancel
Save