functioning polygon-to-clickhouse sync script for option contracts

main
Avraham Sakal 1 year ago
parent c9dc6c8c3d
commit de48720d17

@ -15,6 +15,7 @@
"dotenv": "^16.4.1", "dotenv": "^16.4.1",
"p-all": "^5.0.0", "p-all": "^5.0.0",
"p-queue": "^8.0.1", "p-queue": "^8.0.1",
"p-series": "^3.0.0",
"p-throttle": "^6.1.0" "p-throttle": "^6.1.0"
}, },
"devDependencies": { "devDependencies": {

@ -26,6 +26,9 @@ dependencies:
p-queue: p-queue:
specifier: ^8.0.1 specifier: ^8.0.1
version: 8.0.1 version: 8.0.1
p-series:
specifier: ^3.0.0
version: 3.0.0
p-throttle: p-throttle:
specifier: ^6.1.0 specifier: ^6.1.0
version: 6.1.0 version: 6.1.0
@ -825,6 +828,11 @@ packages:
p-timeout: 6.1.2 p-timeout: 6.1.2
dev: false dev: false
/p-series@3.0.0:
resolution: {integrity: sha512-geaabIwiqy+jN4vuJROl1rpMJT/myHAMAfdubPQGJT3Grr8td+ogWvTk2qLsNlhYXcoZZAfl01pfq7lK3/gYKQ==}
engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0}
dev: false
/p-throttle@6.1.0: /p-throttle@6.1.0:
resolution: {integrity: sha512-eQMdGTxk2+047La67wefUtt0tEHh7D+C8Jl7QXoFCuIiNYeQ9zWs2AZiJdIAs72rSXZ06t11me2bgalRNdy3SQ==} resolution: {integrity: sha512-eQMdGTxk2+047La67wefUtt0tEHh7D+C8Jl7QXoFCuIiNYeQ9zWs2AZiJdIAs72rSXZ06t11me2bgalRNdy3SQ==}
engines: {node: '>=18'} engines: {node: '>=18'}

@ -1,59 +1,80 @@
import { clickhouse, query } from "../clickhouse.js"; import { clickhouse, query } from "../clickhouse.js";
import { getApiKey } from "./polygon.js"; import { getApiKey } from "./polygon.js";
import pAll from 'p-all'; import pAll from "p-all";
import pQueue from 'p-queue'; import pQueue from "p-queue";
import pSeries from "p-series";
type PolygonResponse = {next_url?:string, results:Array<{ticker:string, expiration_date:string, strike_price:number, contract_type:'call'|'put'}>}; type PolygonResponse = {
async function getOptionContracts(underlyingSymbol, asOfDate){ next_url?: string;
results: Array<{
ticker: string;
expiration_date: string;
strike_price: number;
contract_type: "call" | "put";
}>;
};
async function getOptionContracts(underlyingSymbol, asOfDate) {
// first mark the sync of this particular symbol and asOfDate as "pending": // first mark the sync of this particular symbol and asOfDate as "pending":
await clickhouse.insert({ await clickhouse.insert({
table: 'option_contract_sync_statuses', table: "option_contract_sync_statuses",
values: [{symbol: underlyingSymbol, asOfDate, status: 'pending'}], values: [{ symbol: underlyingSymbol, asOfDate, status: "pending" }],
format: 'JSONEachRow', format: "JSONEachRow",
}); });
// then commence the sync with the initial request: // then commence the sync with the initial request:
let latestBatchResponse = await (await fetch(`https://api.polygon.io/v3/reference/options/contracts?underlying_ticker=${underlyingSymbol}&as_of=${asOfDate}&sort=ticker&limit=1000&apiKey=${await getApiKey()}`)).json() as PolygonResponse; let latestBatchResponse = (await (
let latestBatch = latestBatchResponse.results await fetch(
.map((result)=>({ `https://api.polygon.io/v3/reference/options/contracts?underlying_ticker=${underlyingSymbol}&as_of=${asOfDate}&sort=ticker&limit=1000&apiKey=${await getApiKey()}`
)
).json()) as PolygonResponse;
let latestBatch = latestBatchResponse.results.map((result) => ({
asOfDate,
symbol: underlyingSymbol,
expirationDate: result.expiration_date,
strike: result.strike_price,
type: result.contract_type,
}));
await clickhouse.insert({
table: "option_contracts",
values: latestBatch,
format: "JSONEachRow",
});
//console.log(latestBatch.results.map((r)=>r.ticker));
// as long as there's a `next_url`, call that:
while (latestBatchResponse.hasOwnProperty("next_url")) {
latestBatchResponse = (await (
await fetch(`${latestBatchResponse.next_url}&apiKey=${await getApiKey()}`)
).json()) as PolygonResponse;
latestBatch = latestBatchResponse.results.map((result) => ({
asOfDate, asOfDate,
symbol: underlyingSymbol, symbol: underlyingSymbol,
expirationDate: result.expiration_date, expirationDate: result.expiration_date,
strike: result.strike_price, strike: result.strike_price,
type: result.contract_type, type: result.contract_type,
})); }));
await clickhouse.insert({
table: 'option_contracts',
values: latestBatch,
format: 'JSONEachRow',
});
//console.log(latestBatch.results.map((r)=>r.ticker));
// as long as there's a `next_url`, call that:
while(latestBatchResponse.hasOwnProperty('next_url')){
latestBatchResponse = await (await fetch(`${latestBatchResponse.next_url}&apiKey=${await getApiKey()}`)).json() as PolygonResponse;
latestBatch = latestBatchResponse.results
.map((result)=>({
asOfDate,
symbol: underlyingSymbol,
expirationDate: result.expiration_date,
strike: result.strike_price,
type: result.contract_type,
}));
//console.log(latestBatch.results.map((r)=>r.ticker)); //console.log(latestBatch.results.map((r)=>r.ticker));
await clickhouse.insert({ await clickhouse.insert({
table: 'option_contracts', table: "option_contracts",
values: latestBatch, values: latestBatch,
format: 'JSONEachRow', format: "JSONEachRow",
}); });
} }
await clickhouse.insert({ await clickhouse.insert({
table: 'option_contract_sync_statuses', table: "option_contract_sync_statuses",
values: [{symbol: underlyingSymbol, asOfDate, status: 'done'}], values: [{ symbol: underlyingSymbol, asOfDate, status: "done" }],
format: 'JSONEachRow', format: "JSONEachRow",
}); });
} }
async function getNextBatchOfUnstartedSymbolsAndAsOfDates(previousUnstartedSymbolAndAsOfDate:{symbol:string, asOfDate:string}, limit:number){ async function getNextBatchOfUnstartedSymbolsAndAsOfDates(
const rows = await query<{symbol:string, earliestAsOfDate:string}>(` previousUnstartedSymbolAndAsOfDate:
| { symbol: string; asOfDate: string }
| undefined,
limit: number
) {
if (typeof previousUnstartedSymbolAndAsOfDate === "undefined") {
return;
}
const rows = await query<{ symbol: string; earliestAsOfDate: string }>(`
SELECT SELECT
symbol, symbol,
first_value(asOfDate) as earliestAsOfDate first_value(asOfDate) as earliestAsOfDate
@ -82,26 +103,29 @@ async function getNextBatchOfUnstartedSymbolsAndAsOfDates(previousUnstartedSymbo
ORDER BY symbol ASC ORDER BY symbol ASC
LIMIT ${limit} LIMIT ${limit}
`); `);
return rows.map(row=>({ return rows.map((row) => ({
symbol: row.symbol, symbol: row.symbol,
asOfDate: row.earliestAsOfDate, asOfDate: row.earliestAsOfDate,
})); }));
} }
/** /**
* For each symbol in `symbols` table, check the latest `asOfDate` * For each symbol in `symbols` table, check the latest `asOfDate`
* in `option_contract_sync_statuses` for that symbol. Then fill-in the rest * in `option_contract_sync_statuses` for that symbol. Then fill-in the rest
* of the dates until today's date. * of the dates until today's date.
*/ */
async function fillSyncStatuses(){ async function fillSyncStatuses() {
const symbols = (await query<{symbol:string}>(` const symbols = (
await query<{ symbol: string }>(`
SELECT symbol from symbols SELECT symbol from symbols
`)).map(({symbol})=>symbol); `)
).map(({ symbol }) => symbol);
console.log('symbols', symbols);
await pAll(symbols.map( console.log("symbols", symbols);
(symbol)=> await pAll(
()=>query<{latestAsOfDate:string}>(` symbols.map(
(symbol) => () =>
query<{ latestAsOfDate: string }>(`
SELECT SELECT
latestAsOfDate latestAsOfDate
FROM ( FROM (
@ -114,47 +138,117 @@ async function fillSyncStatuses(){
) )
) )
WHERE latestAsOfDate > '2022-02-18' WHERE latestAsOfDate > '2022-02-18'
`).then((rows)=> `).then((rows) =>
clickhouse.command({ clickhouse
query: ` .command({
query: `
INSERT INTO option_contract_sync_statuses INSERT INTO option_contract_sync_statuses
SELECT SELECT
'${symbol}' as symbol, '${symbol}' as symbol,
Date(dateAdd(DAY,number,'${rows[0]?.latestAsOfDate || '2022-02-19'}')) as asOfDate, Date(dateAdd(DAY,number,'${
rows[0]?.latestAsOfDate || "2022-02-19"
}')) as asOfDate,
'not-started' as status 'not-started' as status
FROM system.numbers FROM system.numbers
WHERE number < dateDiff('days',Date('${rows[0]?.latestAsOfDate || '2022-02-19'}'), Date(now())) WHERE number < dateDiff('days',Date('${
rows[0]?.latestAsOfDate || "2022-02-19"
}'), Date(now()))
AND number > 0 AND number > 0
` `,
}).then(()=>{console.log(`Done ${symbol}`);}) })
) .then(() => {
), console.log(`Done ${symbol}`);
{concurrency: 6} })
)
),
{ concurrency: 6 }
); );
} }
/** First, make sure we know which symbol-asOfDate combinations are /** First, make sure we know which symbol-asOfDate combinations are
* yet un-synced. * yet un-synced.
*/ */
await fillSyncStatuses(); await fillSyncStatuses();
/** Second, for each symbol-asOfDate combination whose option contracts /**
* Second, since this is startup time, obviously anything `pending` is not really running.
* So, for each `pending` combo, delete its status, and all contracts synced so far, so as to start afresh.
*/
const pendingSymbolsAndAsOfDates = await query<{
symbol: string;
asOfDate: string;
latestStatus: "not-started" | "pending" | "done";
}>(`
SELECT
symbol,
asOfDate,
last_value(status) as latestStatus
FROM (
SELECT *
FROM option_contract_sync_statuses
ORDER BY asOfDate ASC, symbol ASC
)
GROUP BY symbol, asOfDate
HAVING latestStatus = 'pending'
ORDER BY symbol ASC, asOfDate ASC
`);
console.log(
"Pending operations:",
pendingSymbolsAndAsOfDates.map(
({ symbol, asOfDate }) => `${symbol} ${asOfDate}`
)
);
await pAll(
pendingSymbolsAndAsOfDates.map(
({ symbol, asOfDate }) =>
() =>
pSeries([
// Delete option_contracts first, in case this `pAll` operation fails and we need to restart; so `option_contract_sync_statuses` "pending" rows are still there for the restart
() =>
clickhouse.command({
query: `DELETE FROM option_contracts WHERE symbol = '${symbol}' AND asOfDate = '${asOfDate}'`,
}),
() =>
clickhouse.command({
query: `DELETE FROM option_contract_sync_statuses WHERE symbol = '${symbol}' AND asOfDate = '${asOfDate}' AND status = 'pending'`,
}),
])
)
);
/** Second, for each symbol-asOfDate combination whose option contracts
* are not known, make them known. * are not known, make them known.
* *
* This queries Polygon with a concurrency of 6. * This queries Polygon with a concurrency of 6.
*/ */
const q = new pQueue({concurrency: 6}); const q = new pQueue({ concurrency: 6 });
/** Initialized with the lowest possible symbol and the earliest possible asOfDate. /** Initialized with the lowest possible symbol and the earliest possible asOfDate.
* It's passed into `getNextUnstartedSymbolAndAsOfDate()`. * It's passed into `getNextUnstartedSymbolAndAsOfDate()`.
*/ */
let nextBatchOfUnstartedSymbolsAndAsOfDates = [{symbol:'A', asOfDate:'2022-02-01'}]; let nextBatchOfUnstartedSymbolsAndAsOfDates = [
while((nextBatchOfUnstartedSymbolsAndAsOfDates = await getNextBatchOfUnstartedSymbolsAndAsOfDates(nextBatchOfUnstartedSymbolsAndAsOfDates.pop(), 200)) !== null){ { symbol: "A", asOfDate: "2022-02-01" },
await pAll(nextBatchOfUnstartedSymbolsAndAsOfDates.map((unstartedSymbolAndAsOfDate)=> ];
()=>q.add(async ()=>{ while (
console.log(`Getting contracts for ${unstartedSymbolAndAsOfDate.symbol} at ${unstartedSymbolAndAsOfDate.asOfDate}`); (nextBatchOfUnstartedSymbolsAndAsOfDates =
await getOptionContracts(unstartedSymbolAndAsOfDate.symbol, unstartedSymbolAndAsOfDate.asOfDate); await getNextBatchOfUnstartedSymbolsAndAsOfDates(
}) nextBatchOfUnstartedSymbolsAndAsOfDates.pop(),
)); 200
)) !== null
) {
await pAll(
nextBatchOfUnstartedSymbolsAndAsOfDates.map(
(unstartedSymbolAndAsOfDate) => () =>
q.add(async () => {
console.log(
`Getting contracts for ${unstartedSymbolAndAsOfDate.symbol} at ${unstartedSymbolAndAsOfDate.asOfDate}`
);
await getOptionContracts(
unstartedSymbolAndAsOfDate.symbol,
unstartedSymbolAndAsOfDate.asOfDate
);
})
)
);
// don't loop again until the queue has less than 50 items; we don't want it to grow in memory without bound: // don't loop again until the queue has less than 50 items; we don't want it to grow in memory without bound:
console.log("Waiting till less than 50 in queue"); console.log("Waiting till less than 50 in queue");
await q.onSizeLessThan(50); await q.onSizeLessThan(50);
@ -162,8 +256,12 @@ while((nextBatchOfUnstartedSymbolsAndAsOfDates = await getNextBatchOfUnstartedSy
// wait until pending queue operations are done: // wait until pending queue operations are done:
await q.onSizeLessThan(1); await q.onSizeLessThan(1);
/**
* For each option contract, find its earliest date of existence, and get all quotes from
* then on.
*/
/*** TODOs ***/ /*** TODOs ***/
/* /*
+ Gracefully recover from errors in individual operations. + Gracefully recover from errors in individual operations.
+ If program stops and is restarted, it should restart anything `pending`: erase existing option contracts for that symbol/asOfDate */
*/

Loading…
Cancel
Save