diff --git a/server/src/scripts/ingest-option-contracts-from-polygon-to-clickhouse.ts b/server/src/scripts/ingest-option-contracts-from-polygon-to-clickhouse.ts new file mode 100644 index 0000000..b177d17 --- /dev/null +++ b/server/src/scripts/ingest-option-contracts-from-polygon-to-clickhouse.ts @@ -0,0 +1,271 @@ +import { clickhouse, query } from "../clickhouse.js"; +import { getApiKey } from "./polygon.js"; +import pAll from "p-all"; +import pQueue from "p-queue"; +import pSeries from "p-series"; + +type PolygonResponse = { + next_url?: string; + results: Array<{ + ticker: string; + expiration_date: string; + strike_price: number; + contract_type: "call" | "put"; + }>; +}; +async function getOptionContracts(underlyingSymbol, asOfDate) { + // first mark the sync of this particular symbol and asOfDate as "pending": + await clickhouse.insert({ + table: "option_contract_sync_statuses", + values: [{ symbol: underlyingSymbol, asOfDate, status: "pending" }], + format: "JSONEachRow", + }); + // then commence the sync with the initial request: + let latestBatchResponse = (await ( + await fetch( + `https://api.polygon.io/v3/reference/options/contracts?underlying_ticker=${underlyingSymbol}&as_of=${asOfDate}&sort=ticker&limit=1000&apiKey=${await getApiKey()}` + ) + ).json()) as PolygonResponse; + let latestBatch = latestBatchResponse.results.map((result) => ({ + asOfDate, + symbol: underlyingSymbol, + expirationDate: result.expiration_date, + strike: result.strike_price, + type: result.contract_type, + })); + await clickhouse.insert({ + table: "option_contracts", + values: latestBatch, + format: "JSONEachRow", + }); + //console.log(latestBatch.results.map((r)=>r.ticker)); + // as long as there's a `next_url`, call that: + while (latestBatchResponse.hasOwnProperty("next_url")) { + latestBatchResponse = (await ( + await fetch(`${latestBatchResponse.next_url}&apiKey=${await getApiKey()}`) + ).json()) as PolygonResponse; + latestBatch = latestBatchResponse.results.map((result) => ({ + asOfDate, + symbol: underlyingSymbol, + expirationDate: result.expiration_date, + strike: result.strike_price, + type: result.contract_type, + })); + //console.log(latestBatch.results.map((r)=>r.ticker)); + await clickhouse.insert({ + table: "option_contracts", + values: latestBatch, + format: "JSONEachRow", + }); + } + await clickhouse.insert({ + table: "option_contract_sync_statuses", + values: [{ symbol: underlyingSymbol, asOfDate, status: "done" }], + format: "JSONEachRow", + }); +} + +async function getNextBatchOfUnstartedSymbolsAndAsOfDates( + previousUnstartedSymbolAndAsOfDate: + | { symbol: string; asOfDate: string } + | undefined, + limit: number +) { + if (typeof previousUnstartedSymbolAndAsOfDate === "undefined") { + return null; + } + // the `OR symbol >` and `OR symbol <` need to be there separately and in that order, to maintain predicatable syncing order + const rows = await query<{ symbol: string; earliestAsOfDate: string }>(` + SELECT + symbol, + first_value(asOfDate) as earliestAsOfDate + FROM ( + SELECT + symbol, + asOfDate, + last_value(status) as latestStatus + FROM ( + SELECT * + FROM option_contract_sync_statuses + ORDER BY asOfDate ASC, symbol ASC + ) + GROUP BY symbol, asOfDate + HAVING latestStatus = 'not-started' + ORDER BY symbol ASC, asOfDate ASC + ) + GROUP BY symbol + HAVING ( + symbol = '${previousUnstartedSymbolAndAsOfDate.symbol}' + AND asOfDate > '${previousUnstartedSymbolAndAsOfDate.asOfDate}' + ) + OR ( + symbol > '${previousUnstartedSymbolAndAsOfDate.symbol}' + ) + OR ( + symbol < '${previousUnstartedSymbolAndAsOfDate.symbol}' + ) + ORDER BY symbol ASC + LIMIT ${limit} + `); + return rows.map((row) => ({ + symbol: row.symbol, + asOfDate: row.earliestAsOfDate, + })); +} + +/** + * For each symbol in `symbols` table, check the latest `asOfDate` + * in `option_contract_sync_statuses` for that symbol. Then fill-in the rest + * of the dates until today's date. + */ +async function fillSyncStatuses() { + const symbols = ( + await query<{ symbol: string }>(` + SELECT symbol from symbols + `) + ).map(({ symbol }) => symbol); + + console.log("symbols", symbols); + await pAll( + symbols.map( + (symbol) => () => + query<{ latestAsOfDate: string }>(` + SELECT + latestAsOfDate + FROM ( + SELECT last_value(asOfDate) as latestAsOfDate + FROM ( + SELECT * + FROM option_contract_sync_statuses + WHERE symbol = '${symbol}' + ORDER BY asOfDate ASC + ) + ) + WHERE latestAsOfDate > '2022-02-18' + `).then((rows) => + clickhouse + .command({ + query: ` + INSERT INTO option_contract_sync_statuses + SELECT + '${symbol}' as symbol, + Date(dateAdd(DAY,number,'${ + rows[0]?.latestAsOfDate || "2022-02-19" + }')) as asOfDate, + 'not-started' as status + FROM system.numbers + WHERE number < dateDiff('days',Date('${ + rows[0]?.latestAsOfDate || "2022-02-19" + }'), Date(now())) + AND number > 0 + `, + }) + .then(() => { + console.log(`Done ${symbol}`); + }) + ) + ), + { concurrency: 6 } + ); +} + +/** First, make sure we know which symbol-asOfDate combinations are + * yet un-synced. + */ +await fillSyncStatuses(); + +/** + * Second, since this is startup time, obviously anything `pending` is not really running. + * So, for each `pending` combo, delete its status, and all contracts synced so far, so as to start afresh. + */ +const pendingSymbolsAndAsOfDates = await query<{ + symbol: string; + asOfDate: string; + latestStatus: "not-started" | "pending" | "done"; +}>(` + SELECT + symbol, + asOfDate, + last_value(status) as latestStatus + FROM ( + SELECT * + FROM option_contract_sync_statuses + ORDER BY asOfDate ASC, symbol ASC + ) + GROUP BY symbol, asOfDate + HAVING latestStatus = 'pending' + ORDER BY symbol ASC, asOfDate ASC +`); +console.log( + "Pending operations:", + pendingSymbolsAndAsOfDates.map( + ({ symbol, asOfDate }) => `${symbol} ${asOfDate}` + ) +); +await pAll( + pendingSymbolsAndAsOfDates.map( + ({ symbol, asOfDate }) => + () => + pSeries([ + // Delete option_contracts first, in case this `pAll` operation fails and we need to restart; so `option_contract_sync_statuses` "pending" rows are still there for the restart + () => + clickhouse.command({ + query: `DELETE FROM option_contracts WHERE symbol = '${symbol}' AND asOfDate = '${asOfDate}'`, + }), + () => + clickhouse.command({ + query: `DELETE FROM option_contract_sync_statuses WHERE symbol = '${symbol}' AND asOfDate = '${asOfDate}' AND status = 'pending'`, + }), + ]) + ) +); + +/** Second, for each symbol-asOfDate combination whose option contracts + * are not known, make them known. + * + * This queries Polygon with a concurrency of 6. + */ +const q = new pQueue({ concurrency: 6 }); +/** Initialized with the lowest possible symbol and the earliest possible asOfDate. + * It's passed into `getNextUnstartedSymbolAndAsOfDate()`. + */ +let nextBatchOfUnstartedSymbolsAndAsOfDates = [ + { symbol: "A", asOfDate: "2022-02-01" }, +]; +while ( + (nextBatchOfUnstartedSymbolsAndAsOfDates = + await getNextBatchOfUnstartedSymbolsAndAsOfDates( + nextBatchOfUnstartedSymbolsAndAsOfDates.pop(), + 200 + )) !== null +) { + await pAll( + nextBatchOfUnstartedSymbolsAndAsOfDates.map( + (unstartedSymbolAndAsOfDate) => () => + q.add(async () => { + console.log( + `Getting contracts for ${unstartedSymbolAndAsOfDate.symbol} at ${unstartedSymbolAndAsOfDate.asOfDate}` + ); + await getOptionContracts( + unstartedSymbolAndAsOfDate.symbol, + unstartedSymbolAndAsOfDate.asOfDate + ); + }) + ) + ); + // don't loop again until the queue has less than 50 items; we don't want it to grow in memory without bound: + console.log("Waiting till less than 50 in queue"); + await q.onSizeLessThan(50); +} +// wait until pending queue operations are done: +await q.onSizeLessThan(1); + +/** + * For each option contract, find its earliest date of existence, and get all quotes from + * then on. + */ + +/*** TODOs ***/ +/* + + Gracefully recover from errors in individual operations. +*/ diff --git a/server/src/scripts/ingest-option-quotes-from-polygon-to-clickhouse.ts b/server/src/scripts/ingest-option-quotes-from-polygon-to-clickhouse.ts index a85cd6f..3f2fbb9 100644 --- a/server/src/scripts/ingest-option-quotes-from-polygon-to-clickhouse.ts +++ b/server/src/scripts/ingest-option-quotes-from-polygon-to-clickhouse.ts @@ -4,248 +4,266 @@ import pAll from "p-all"; import pQueue from "p-queue"; import pSeries from "p-series"; +const optionContractToTicker = ({ symbol, expirationDate, strike, type }) => + `O:${symbol}${expirationDate.substring(2, 4)}${expirationDate.substring( + 5, + 7 + )}${expirationDate.substring(8, 10)}${ + type === "call" ? "C" : "P" + }${Math.floor(strike * 1000) + .toString() + .padStart(8, "0")}`; + type PolygonResponse = { next_url?: string; results: Array<{ - ticker: string; - expiration_date: string; - strike_price: number; - contract_type: "call" | "put"; + c: number; + h: number; + n: number; + l: number; + o: number; + t: number; + v: number; + vw: number; }>; }; -async function getOptionContracts(underlyingSymbol, asOfDate) { - // first mark the sync of this particular symbol and asOfDate as "pending": +async function getOptionAggregates( + asOfDate, + underlyingSymbol, + expirationDate, + strike, + type +) { + const optionContractTicker = optionContractToTicker({ + symbol: underlyingSymbol, + expirationDate, + strike, + type, + }); + // first mark the sync of this particular option contract as "pending": await clickhouse.insert({ - table: "option_contract_sync_statuses", - values: [{ symbol: underlyingSymbol, asOfDate, status: "pending" }], + table: "option_aggregate_sync_statuses", + values: [ + { + asOfDate, + symbol: underlyingSymbol, + expirationDate, + strike, + type, + status: "pending", + }, + ], format: "JSONEachRow", }); - // then commence the sync with the initial request: + let latestBatchResponse = (await ( await fetch( - `https://api.polygon.io/v3/reference/options/contracts?underlying_ticker=${underlyingSymbol}&as_of=${asOfDate}&sort=ticker&limit=1000&apiKey=${await getApiKey()}` + `https://api.polygon.io/v2/aggs/ticker/${optionContractTicker}/range/1/minute/${asOfDate}/${asOfDate}?adjusted=false&sort=asc&limit=50000&apiKey=${await getApiKey()}` ) ).json()) as PolygonResponse; let latestBatch = latestBatchResponse.results.map((result) => ({ - asOfDate, symbol: underlyingSymbol, - expirationDate: result.expiration_date, - strike: result.strike_price, - type: result.contract_type, + expirationDate, + strike, + type, + + tsStart: (result.t || 0) / 1000, + open: result.o, + close: result.c, + low: result.l, + high: result.h, })); await clickhouse.insert({ - table: "option_contracts", + table: "option_aggregates", values: latestBatch, format: "JSONEachRow", }); - //console.log(latestBatch.results.map((r)=>r.ticker)); - // as long as there's a `next_url`, call that: - while (latestBatchResponse.hasOwnProperty("next_url")) { - latestBatchResponse = (await ( - await fetch(`${latestBatchResponse.next_url}&apiKey=${await getApiKey()}`) - ).json()) as PolygonResponse; - latestBatch = latestBatchResponse.results.map((result) => ({ - asOfDate, - symbol: underlyingSymbol, - expirationDate: result.expiration_date, - strike: result.strike_price, - type: result.contract_type, - })); - //console.log(latestBatch.results.map((r)=>r.ticker)); - await clickhouse.insert({ - table: "option_contracts", - values: latestBatch, - format: "JSONEachRow", - }); - } await clickhouse.insert({ - table: "option_contract_sync_statuses", - values: [{ symbol: underlyingSymbol, asOfDate, status: "done" }], + table: "option_contract_quote_sync_statuses", + values: [ + { + asOfDate, + symbol: underlyingSymbol, + expirationDate, + strike, + type, + status: "done", + }, + ], format: "JSONEachRow", }); } -async function getNextBatchOfUnstartedSymbolsAndAsOfDates( - previousUnstartedSymbolAndAsOfDate: - | { symbol: string; asOfDate: string } - | undefined, +type OptionContract = { + symbol: string; + expirationDate: string; + strike: number; + type: "call" | "put"; +}; +type OptionContractDay = OptionContract & { asOfDate: string }; +async function getNextBatchOfUnstartedOptionAggregates( + previousUnstartedOptionContract: OptionContractDay | undefined, limit: number -) { - if (typeof previousUnstartedSymbolAndAsOfDate === "undefined") { +): Promise> { + if (typeof previousUnstartedOptionContract === "undefined") { return; } - const rows = await query<{ symbol: string; earliestAsOfDate: string }>(` - SELECT - symbol, - first_value(asOfDate) as earliestAsOfDate + const optionContractsWithoutAggregates = await query(` + SELECT + asOfDate, + symbol, + expirationDate, + strike, + type, + last_value(status) as latestStatus FROM ( - SELECT - symbol, - asOfDate, - last_value(status) as latestStatus - FROM ( - SELECT * - FROM option_contract_sync_statuses - ORDER BY asOfDate ASC, symbol ASC - ) - GROUP BY symbol, asOfDate - HAVING latestStatus = 'not-started' - ORDER BY symbol ASC, asOfDate ASC + SELECT * + FROM option_aggregate_sync_statuses + ORDER BY ts ASC ) - GROUP BY symbol - HAVING ( - symbol = '${previousUnstartedSymbolAndAsOfDate.symbol}' - AND asOfDate > '${previousUnstartedSymbolAndAsOfDate.asOfDate}' - ) - OR ( - symbol > '${previousUnstartedSymbolAndAsOfDate.symbol}' + GROUP BY asOfDate, symbol, expirationDate, strike, type + HAVING latestStatus = 'not-started' + AND ( + ( + asOfDate = '${previousUnstartedOptionContract.asOfDate}' + AND symbol = '${previousUnstartedOptionContract.symbol}' + AND expirationDate = '${previousUnstartedOptionContract.expirationDate}' + AND strike = ${previousUnstartedOptionContract.strike} + AND type > '${previousUnstartedOptionContract.type}' + ) + OR + ( + asOfDate = '${previousUnstartedOptionContract.asOfDate}' + AND symbol = '${previousUnstartedOptionContract.symbol}' + AND expirationDate = '${previousUnstartedOptionContract.expirationDate}' + AND strike > ${previousUnstartedOptionContract.strike} + ) + OR ( + asOfDate = '${previousUnstartedOptionContract.asOfDate}' + AND symbol = '${previousUnstartedOptionContract.symbol}' + AND expirationDate > '${previousUnstartedOptionContract.expirationDate}' + ) + OR ( + asOfDate = '${previousUnstartedOptionContract.asOfDate}' + AND symbol > '${previousUnstartedOptionContract.symbol}' + ) + OR ( + asOfDate > '${previousUnstartedOptionContract.asOfDate}' + ) ) - ORDER BY symbol ASC + ORDER BY asOfDate, symbol, expirationDate, strike, type LIMIT ${limit} `); - return rows.map((row) => ({ - symbol: row.symbol, - asOfDate: row.earliestAsOfDate, - })); + return optionContractsWithoutAggregates; } /** - * For each symbol in `symbols` table, check the latest `asOfDate` - * in `option_contract_sync_statuses` for that symbol. Then fill-in the rest - * of the dates until today's date. + * First, since this is startup time, obviously anything `pending` is not + * really running. So, for each `pending` option contract (i.e. unique + * combinations of `(symbol, expirationDate, strike, type)` in the + * `option_contracts` table), delete its status, and all quotes synced so far, + * so as to start afresh. */ -async function fillSyncStatuses() { - const symbols = ( - await query<{ symbol: string }>(` - SELECT symbol from symbols - `) - ).map(({ symbol }) => symbol); - - console.log("symbols", symbols); +async function revertPendingSyncs() { + const pendingOptionContracts = await query<{ + symbol: string; + expirationDate: string; + strike: number; + type: "call" | "put"; + latestStatus: "not-started" | "pending" | "done"; + }>(` + SELECT + symbol, + expirationDate, + strike, + type, + last_value(status) as latestStatus + FROM ( + SELECT * + FROM option_aggregate_sync_statuses + ORDER BY symbol, expirationDate, strike, type, ts ASC + ) + GROUP BY symbol, expirationDate, strike, type + HAVING latestStatus = 'pending' + ORDER BY symbol, expirationDate, strike, type + `); + console.log( + "Pending operations:", + pendingOptionContracts.map( + ({ symbol, expirationDate, strike, type }) => + `${symbol} ${expirationDate} ${strike} ${type}` + ) + ); await pAll( - symbols.map( - (symbol) => () => - query<{ latestAsOfDate: string }>(` - SELECT - latestAsOfDate - FROM ( - SELECT last_value(asOfDate) as latestAsOfDate - FROM ( - SELECT * - FROM option_contract_sync_statuses - WHERE symbol = '${symbol}' - ORDER BY asOfDate ASC - ) - ) - WHERE latestAsOfDate > '2022-02-18' - `).then((rows) => - clickhouse - .command({ - query: ` - INSERT INTO option_contract_sync_statuses - SELECT - '${symbol}' as symbol, - Date(dateAdd(DAY,number,'${ - rows[0]?.latestAsOfDate || "2022-02-19" - }')) as asOfDate, - 'not-started' as status - FROM system.numbers - WHERE number < dateDiff('days',Date('${ - rows[0]?.latestAsOfDate || "2022-02-19" - }'), Date(now())) - AND number > 0 - `, - }) - .then(() => { - console.log(`Done ${symbol}`); - }) - ) - ), - { concurrency: 6 } + pendingOptionContracts.map( + ({ symbol, expirationDate, strike, type }) => + () => + pSeries([ + // Delete option_contracts first, in case this `pAll` operation fails and we need to restart; so `option_contract_sync_statuses` "pending" rows are still there for the restart + () => + clickhouse.command({ + query: ` + DELETE FROM option_aggregates + WHERE symbol = '${symbol}' + AND expirationDate = '${expirationDate}' + AND strike = ${strike} + AND type = '${type}' + `, + }), + () => + clickhouse.command({ + query: ` + DELETE FROM option_aggregate_sync_statuses + WHERE symbol = '${symbol}' + AND expirationDate = '${expirationDate}' + AND strike = ${strike} + AND type = '${type}' + AND status = 'pending'`, + }), + ]) + ) ); } +//await revertPendingSyncs(); -/** First, make sure we know which symbol-asOfDate combinations are - * yet un-synced. - */ -await fillSyncStatuses(); - -/** - * Second, since this is startup time, obviously anything `pending` is not really running. - * So, for each `pending` combo, delete its status, and all contracts synced so far, so as to start afresh. - */ -const pendingSymbolsAndAsOfDates = await query<{ - symbol: string; - asOfDate: string; - latestStatus: "not-started" | "pending" | "done"; -}>(` - SELECT - symbol, - asOfDate, - last_value(status) as latestStatus - FROM ( - SELECT * - FROM option_contract_sync_statuses - ORDER BY asOfDate ASC, symbol ASC - ) - GROUP BY symbol, asOfDate - HAVING latestStatus = 'pending' - ORDER BY symbol ASC, asOfDate ASC -`); -console.log( - "Pending operations:", - pendingSymbolsAndAsOfDates.map( - ({ symbol, asOfDate }) => `${symbol} ${asOfDate}` - ) -); -await pAll( - pendingSymbolsAndAsOfDates.map( - ({ symbol, asOfDate }) => - () => - pSeries([ - // Delete option_contracts first, in case this `pAll` operation fails and we need to restart; so `option_contract_sync_statuses` "pending" rows are still there for the restart - () => - clickhouse.command({ - query: `DELETE FROM option_contracts WHERE symbol = '${symbol}' AND asOfDate = '${asOfDate}'`, - }), - () => - clickhouse.command({ - query: `DELETE FROM option_contract_sync_statuses WHERE symbol = '${symbol}' AND asOfDate = '${asOfDate}' AND status = 'pending'`, - }), - ]) - ) -); - -/** Second, for each symbol-asOfDate combination whose option contracts - * are not known, make them known. +/** Second, for each option contract, get all of its quotes. * * This queries Polygon with a concurrency of 6. */ const q = new pQueue({ concurrency: 6 }); -/** Initialized with the lowest possible symbol and the earliest possible asOfDate. +/** Initialized with the lowest possible option contract. * It's passed into `getNextUnstartedSymbolAndAsOfDate()`. */ -let nextBatchOfUnstartedSymbolsAndAsOfDates = [ - { symbol: "A", asOfDate: "2022-02-01" }, +let nextBatchOfUnstartedOptionContracts: Array = [ + { + asOfDate: "2022-02-01", + symbol: "A", + expirationDate: "2022-02-01", + strike: 0, + type: "call", + }, ]; while ( - (nextBatchOfUnstartedSymbolsAndAsOfDates = - await getNextBatchOfUnstartedSymbolsAndAsOfDates( - nextBatchOfUnstartedSymbolsAndAsOfDates.pop(), + (nextBatchOfUnstartedOptionContracts = + await getNextBatchOfUnstartedOptionAggregates( + nextBatchOfUnstartedOptionContracts.pop(), 200 )) !== null ) { await pAll( - nextBatchOfUnstartedSymbolsAndAsOfDates.map( - (unstartedSymbolAndAsOfDate) => () => + nextBatchOfUnstartedOptionContracts.map( + (unstartedOptionContract) => () => q.add(async () => { console.log( - `Getting contracts for ${unstartedSymbolAndAsOfDate.symbol} at ${unstartedSymbolAndAsOfDate.asOfDate}` - ); - await getOptionContracts( - unstartedSymbolAndAsOfDate.symbol, - unstartedSymbolAndAsOfDate.asOfDate + `Getting aggregates for ${unstartedOptionContract.asOfDate} ${unstartedOptionContract.symbol} at ${unstartedOptionContract.expirationDate} ${unstartedOptionContract.strike} ${unstartedOptionContract.type}` ); + // await getOptionAggregates( + // unstartedOptionContract.asOfDate, + // unstartedOptionContract.symbol, + // unstartedOptionContract.expirationDate, + // unstartedOptionContract.strike, + // unstartedOptionContract.type + // ); }) ) ); @@ -256,11 +274,6 @@ while ( // wait until pending queue operations are done: await q.onSizeLessThan(1); -/** - * For each option contract, find its earliest date of existence, and get all quotes from - * then on. - */ - /*** TODOs ***/ /* + Gracefully recover from errors in individual operations. diff --git a/server/src/scripts/polygon.ts b/server/src/scripts/polygon.ts index 6402f35..6b445e4 100644 --- a/server/src/scripts/polygon.ts +++ b/server/src/scripts/polygon.ts @@ -1,4 +1,5 @@ -import pThrottle from 'p-throttle'; +//import pThrottle from 'p-throttle'; -const apiKey = 'H95NTsatM1iTWLUwDLxM2J5zhUVYdCEz'; -export const getApiKey = pThrottle({limit: 5, interval: 60000})(()=>apiKey); \ No newline at end of file +const apiKey = "H95NTsatM1iTWLUwDLxM2J5zhUVYdCEz"; +//export const getApiKey = pThrottle({limit: 5, interval: 60000})(()=>apiKey); +export const getApiKey = () => apiKey; diff --git a/server/tables.sql b/server/tables.sql index d6582b0..5707944 100644 --- a/server/tables.sql +++ b/server/tables.sql @@ -33,6 +33,42 @@ ENGINE MergeTree() PRIMARY KEY (asOfDate, symbol) ORDER BY (asOfDate, symbol, expirationDate, strike, type); + +-- BEGIN: Option Contract Quotes +CREATE TABLE option_aggregate_sync_statuses +( + asOfDate Date, + symbol LowCardinality(String), + expirationDate Date, + strike Float32, + type ENUM('call', 'put'), + status ENUM('not-started','pending','done'), + ts DateTime64 DEFAULT now() +) +ENGINE MergeTree() +ORDER BY (asOfDate, symbol, expirationDate, strike, type, ts); +CREATE MATERIALIZED VIEW option_aggregate_sync_statuses_mv +TO option_aggregate_sync_statuses +AS +SELECT + DISTINCT ON ( + asOfDate, + symbol, + expirationDate, + strike, + type + ) + asOfDate, + symbol, + expirationDate, + strike, + type, + 'not-started' as status, + now() as ts +FROM option_contracts; + +-- END: Option Contract Quotes + CREATE TABLE stock_aggregates ( symbol LowCardinality(String), @@ -51,19 +87,19 @@ CREATE TABLE option_aggregates ( symbol LowCardinality(String), expirationDate Date, - optionType Enum('call', 'put'), - strike Float64, + strike Float32, + type Enum('call', 'put'), - tsStart DateTime32, - open Float64, - close Float64, - low Float64, - high Float64, - volume UInt64, - volumeWeightedPrice Float64 + tsStart DateTime32 CODEC(DoubleDelta(1), ZSTD), + open Float32 CODEC(Delta(2), ZSTD), + close Float32 CODEC(Delta(2), ZSTD), + low Float32 CODEC(Delta(2), ZSTD), + high Float32 CODEC(Delta(2), ZSTD), + volume UInt32 CODEC(T64), + volumeWeightedPrice Float32 CODEC(Delta(2), ZSTD) ) ENGINE MergeTree() -ORDER BY (symbol, expirationDate, optionType, strike, tsStart) +ORDER BY (symbol, expirationDate, strike, type, tsStart) ALTER TABLE option_aggregates ADD INDEX idx_expirationDate expirationDate TYPE minmax GRANULARITY 2; ALTER TABLE option_aggregates ADD INDEX idx_strike strike TYPE minmax GRANULARITY 2;