option aggregate sync statuses usable by ingest script

main
Avraham Sakal 1 year ago
parent ead2e5bd1b
commit 4580494810

@ -4,7 +4,17 @@ import pAll from "p-all";
import pQueue from "p-queue"; import pQueue from "p-queue";
import pSeries from "p-series"; import pSeries from "p-series";
const optionContractToTicker = ({ symbol, expirationDate, strike, type }) => const optionContractToTicker = ({
symbol,
expirationDate,
strike,
type,
}: {
symbol: string;
expirationDate: string;
strike: number;
type: "call" | "put";
}) =>
`O:${symbol}${expirationDate.substring(2, 4)}${expirationDate.substring( `O:${symbol}${expirationDate.substring(2, 4)}${expirationDate.substring(
5, 5,
7 7
@ -28,11 +38,11 @@ type PolygonResponse = {
}>; }>;
}; };
async function getOptionAggregates( async function getOptionAggregates(
asOfDate, asOfDate: string,
underlyingSymbol, underlyingSymbol: string,
expirationDate, expirationDate: string,
strike, strike: number,
type type: "call" | "put"
) { ) {
const optionContractTicker = optionContractToTicker({ const optionContractTicker = optionContractToTicker({
symbol: underlyingSymbol, symbol: underlyingSymbol,
@ -42,7 +52,7 @@ async function getOptionAggregates(
}); });
// first mark the sync of this particular option contract as "pending": // first mark the sync of this particular option contract as "pending":
await clickhouse.insert({ await clickhouse.insert({
table: "option_aggregate_sync_statuses", table: "amg_option_aggregate_sync_statuses",
values: [ values: [
{ {
asOfDate, asOfDate,
@ -61,6 +71,10 @@ async function getOptionAggregates(
`https://api.polygon.io/v2/aggs/ticker/${optionContractTicker}/range/1/minute/${asOfDate}/${asOfDate}?adjusted=false&sort=asc&limit=50000&apiKey=${await getApiKey()}` `https://api.polygon.io/v2/aggs/ticker/${optionContractTicker}/range/1/minute/${asOfDate}/${asOfDate}?adjusted=false&sort=asc&limit=50000&apiKey=${await getApiKey()}`
) )
).json()) as PolygonResponse; ).json()) as PolygonResponse;
if (!latestBatchResponse.results) {
console.log(latestBatchResponse);
return;
}
let latestBatch = latestBatchResponse.results.map((result) => ({ let latestBatch = latestBatchResponse.results.map((result) => ({
symbol: underlyingSymbol, symbol: underlyingSymbol,
expirationDate, expirationDate,
@ -79,7 +93,7 @@ async function getOptionAggregates(
format: "JSONEachRow", format: "JSONEachRow",
}); });
await clickhouse.insert({ await clickhouse.insert({
table: "option_contract_quote_sync_statuses", table: "amg_option_aggregate_sync_statuses",
values: [ values: [
{ {
asOfDate, asOfDate,
@ -114,16 +128,9 @@ async function getNextBatchOfUnstartedOptionAggregates(
symbol, symbol,
expirationDate, expirationDate,
strike, strike,
type, type
last_value(status) as latestStatus FROM amg_option_aggregate_sync_statuses
FROM ( WHERE (
SELECT *
FROM option_aggregate_sync_statuses
ORDER BY ts ASC
)
GROUP BY asOfDate, symbol, expirationDate, strike, type
HAVING latestStatus = 'not-started'
AND (
( (
asOfDate = '${previousUnstartedOptionContract.asOfDate}' asOfDate = '${previousUnstartedOptionContract.asOfDate}'
AND symbol = '${previousUnstartedOptionContract.symbol}' AND symbol = '${previousUnstartedOptionContract.symbol}'
@ -151,6 +158,7 @@ async function getNextBatchOfUnstartedOptionAggregates(
asOfDate > '${previousUnstartedOptionContract.asOfDate}' asOfDate > '${previousUnstartedOptionContract.asOfDate}'
) )
) )
AND status = 'not-started'
ORDER BY asOfDate, symbol, expirationDate, strike, type ORDER BY asOfDate, symbol, expirationDate, strike, type
LIMIT ${limit} LIMIT ${limit}
`); `);
@ -166,6 +174,7 @@ async function getNextBatchOfUnstartedOptionAggregates(
*/ */
async function revertPendingSyncs() { async function revertPendingSyncs() {
const pendingOptionContracts = await query<{ const pendingOptionContracts = await query<{
asOfDate: string;
symbol: string; symbol: string;
expirationDate: string; expirationDate: string;
strike: number; strike: number;
@ -173,58 +182,69 @@ async function revertPendingSyncs() {
latestStatus: "not-started" | "pending" | "done"; latestStatus: "not-started" | "pending" | "done";
}>(` }>(`
SELECT SELECT
asOfDate,
symbol, symbol,
expirationDate, expirationDate,
strike, strike,
type, type
last_value(status) as latestStatus FROM amg_option_aggregate_sync_statuses
FROM ( WHERE status = 'pending'
SELECT * ORDER BY asOfDate, symbol, expirationDate, strike, type
FROM option_aggregate_sync_statuses
ORDER BY symbol, expirationDate, strike, type, ts ASC
)
GROUP BY symbol, expirationDate, strike, type
HAVING latestStatus = 'pending'
ORDER BY symbol, expirationDate, strike, type
`); `);
console.log( console.log(
"Pending operations:", "Pending operations:",
pendingOptionContracts.map( pendingOptionContracts.map(
({ symbol, expirationDate, strike, type }) => ({ asOfDate, symbol, expirationDate, strike, type }) =>
`${symbol} ${expirationDate} ${strike} ${type}` `${symbol} ${expirationDate} ${strike} ${type} @ ${asOfDate}`
) )
); );
await pAll( await pAll(
pendingOptionContracts.map( pendingOptionContracts.map(
({ symbol, expirationDate, strike, type }) => ({ asOfDate, symbol, expirationDate, strike, type }) =>
() => () =>
pSeries([ pSeries([
// Delete option_contracts first, in case this `pAll` operation fails and we need to restart; so `option_contract_sync_statuses` "pending" rows are still there for the restart // Delete option_contracts first, in case this `pAll` operation fails and we need to restart; so `option_contract_sync_statuses` "pending" rows are still there for the restart
() => () =>
clickhouse.command({ clickhouse
query: ` .command({
query: `
DELETE FROM option_aggregates DELETE FROM option_aggregates
WHERE symbol = '${symbol}' WHERE symbol = '${symbol}'
AND expirationDate = '${expirationDate}' AND expirationDate = '${expirationDate}'
AND strike = ${strike} AND strike = ${strike}
AND type = '${type}' AND type = '${type}'
AND toDate(tsStart) = '${asOfDate}'
`, `,
}), })
.then(() => {
console.log(`Deleted aggregates for `);
}),
() => () =>
clickhouse.command({ clickhouse
query: ` .insert({
DELETE FROM option_aggregate_sync_statuses table: "amg_option_aggregate_sync_statuses",
WHERE symbol = '${symbol}' values: [
AND expirationDate = '${expirationDate}' {
AND strike = ${strike} asOfDate,
AND type = '${type}' symbol,
AND status = 'pending'`, expirationDate,
}), strike,
type,
status: "not-started",
},
],
format: "JSONEachRow",
})
.then(() => {
console.log();
}),
]) ])
) )
); );
} }
//await revertPendingSyncs();
// First, revert 'pending' syncs:
await revertPendingSyncs();
/** Second, for each option contract, get all of its quotes. /** Second, for each option contract, get all of its quotes.
* *
@ -236,7 +256,7 @@ const q = new pQueue({ concurrency: 6 });
*/ */
let nextBatchOfUnstartedOptionContracts: Array<OptionContractDay> = [ let nextBatchOfUnstartedOptionContracts: Array<OptionContractDay> = [
{ {
asOfDate: "2022-02-01", asOfDate: "2022-03-15",
symbol: "A", symbol: "A",
expirationDate: "2022-02-01", expirationDate: "2022-02-01",
strike: 0, strike: 0,
@ -255,15 +275,15 @@ while (
(unstartedOptionContract) => () => (unstartedOptionContract) => () =>
q.add(async () => { q.add(async () => {
console.log( console.log(
`Getting aggregates for ${unstartedOptionContract.asOfDate} ${unstartedOptionContract.symbol} at ${unstartedOptionContract.expirationDate} ${unstartedOptionContract.strike} ${unstartedOptionContract.type}` `Getting aggregates for ${unstartedOptionContract.symbol} ${unstartedOptionContract.expirationDate} ${unstartedOptionContract.strike} ${unstartedOptionContract.type} @ ${unstartedOptionContract.asOfDate}`
);
await getOptionAggregates(
unstartedOptionContract.asOfDate,
unstartedOptionContract.symbol,
unstartedOptionContract.expirationDate,
unstartedOptionContract.strike,
unstartedOptionContract.type
); );
// await getOptionAggregates(
// unstartedOptionContract.asOfDate,
// unstartedOptionContract.symbol,
// unstartedOptionContract.expirationDate,
// unstartedOptionContract.strike,
// unstartedOptionContract.type
// );
}) })
) )
); );

@ -67,6 +67,23 @@ SELECT
now() as ts now() as ts
FROM option_contracts; FROM option_contracts;
CREATE TABLE amg_option_aggregate_sync_statuses (
asOfDate Date,
symbol LowCardinality(String),
expirationDate Date,
strike Float32,
type ENUM('call', 'put'),
status SimpleAggregateFunction(anyLast, ENUM('not-started','pending','done')),
ts DateTime64 DEFAULT now()
)
ENGINE=AggregatingMergeTree
ORDER BY (asOfDate, symbol, expirationDate, strike, type, ts);
INSERT INTO amg_option_aggregate_sync_statuses
SELECT asOfDate, symbol, expirationDate, strike, type, status, ts
FROM option_aggregate_sync_statuses
ORDER BY asOfDate, symbol, expirationDate, strike, type, ts;
-- END: Option Contract Quotes -- END: Option Contract Quotes
CREATE TABLE stock_aggregates CREATE TABLE stock_aggregates

Loading…
Cancel
Save