|
|
|
@@ -1,20 +1,32 @@
|
|
|
|
|
import { clickhouse, query } from "../clickhouse.js";
|
|
|
|
|
import { getApiKey } from "./polygon.js";
|
|
|
|
|
import pAll from 'p-all';
|
|
|
|
|
import pQueue from 'p-queue';
|
|
|
|
|
import pAll from "p-all";
|
|
|
|
|
import pQueue from "p-queue";
|
|
|
|
|
import pSeries from "p-series";
|
|
|
|
|
|
|
|
|
|
type PolygonResponse = {next_url?:string, results:Array<{ticker:string, expiration_date:string, strike_price:number, contract_type:'call'|'put'}>};
|
|
|
|
|
type PolygonResponse = {
|
|
|
|
|
next_url?: string;
|
|
|
|
|
results: Array<{
|
|
|
|
|
ticker: string;
|
|
|
|
|
expiration_date: string;
|
|
|
|
|
strike_price: number;
|
|
|
|
|
contract_type: "call" | "put";
|
|
|
|
|
}>;
|
|
|
|
|
};
|
|
|
|
|
async function getOptionContracts(underlyingSymbol, asOfDate) {
|
|
|
|
|
// first mark the sync of this particular symbol and asOfDate as "pending":
|
|
|
|
|
await clickhouse.insert({
|
|
|
|
|
table: 'option_contract_sync_statuses',
|
|
|
|
|
values: [{symbol: underlyingSymbol, asOfDate, status: 'pending'}],
|
|
|
|
|
format: 'JSONEachRow',
|
|
|
|
|
table: "option_contract_sync_statuses",
|
|
|
|
|
values: [{ symbol: underlyingSymbol, asOfDate, status: "pending" }],
|
|
|
|
|
format: "JSONEachRow",
|
|
|
|
|
});
|
|
|
|
|
// then commence the sync with the initial request:
|
|
|
|
|
let latestBatchResponse = await (await fetch(`https://api.polygon.io/v3/reference/options/contracts?underlying_ticker=${underlyingSymbol}&as_of=${asOfDate}&sort=ticker&limit=1000&apiKey=${await getApiKey()}`)).json() as PolygonResponse;
|
|
|
|
|
let latestBatch = latestBatchResponse.results
|
|
|
|
|
.map((result)=>({
|
|
|
|
|
let latestBatchResponse = (await (
|
|
|
|
|
await fetch(
|
|
|
|
|
`https://api.polygon.io/v3/reference/options/contracts?underlying_ticker=${underlyingSymbol}&as_of=${asOfDate}&sort=ticker&limit=1000&apiKey=${await getApiKey()}`
|
|
|
|
|
)
|
|
|
|
|
).json()) as PolygonResponse;
|
|
|
|
|
let latestBatch = latestBatchResponse.results.map((result) => ({
|
|
|
|
|
asOfDate,
|
|
|
|
|
symbol: underlyingSymbol,
|
|
|
|
|
expirationDate: result.expiration_date,
|
|
|
|
@@ -22,16 +34,17 @@ async function getOptionContracts(underlyingSymbol, asOfDate){
|
|
|
|
|
type: result.contract_type,
|
|
|
|
|
}));
|
|
|
|
|
await clickhouse.insert({
|
|
|
|
|
table: 'option_contracts',
|
|
|
|
|
table: "option_contracts",
|
|
|
|
|
values: latestBatch,
|
|
|
|
|
format: 'JSONEachRow',
|
|
|
|
|
format: "JSONEachRow",
|
|
|
|
|
});
|
|
|
|
|
//console.log(latestBatch.results.map((r)=>r.ticker));
|
|
|
|
|
// as long as there's a `next_url`, call that:
|
|
|
|
|
while(latestBatchResponse.hasOwnProperty('next_url')){
|
|
|
|
|
latestBatchResponse = await (await fetch(`${latestBatchResponse.next_url}&apiKey=${await getApiKey()}`)).json() as PolygonResponse;
|
|
|
|
|
latestBatch = latestBatchResponse.results
|
|
|
|
|
.map((result)=>({
|
|
|
|
|
while (latestBatchResponse.hasOwnProperty("next_url")) {
|
|
|
|
|
latestBatchResponse = (await (
|
|
|
|
|
await fetch(`${latestBatchResponse.next_url}&apiKey=${await getApiKey()}`)
|
|
|
|
|
).json()) as PolygonResponse;
|
|
|
|
|
latestBatch = latestBatchResponse.results.map((result) => ({
|
|
|
|
|
asOfDate,
|
|
|
|
|
symbol: underlyingSymbol,
|
|
|
|
|
expirationDate: result.expiration_date,
|
|
|
|
@@ -40,20 +53,28 @@ async function getOptionContracts(underlyingSymbol, asOfDate){
|
|
|
|
|
}));
|
|
|
|
|
//console.log(latestBatch.results.map((r)=>r.ticker));
|
|
|
|
|
await clickhouse.insert({
|
|
|
|
|
table: 'option_contracts',
|
|
|
|
|
table: "option_contracts",
|
|
|
|
|
values: latestBatch,
|
|
|
|
|
format: 'JSONEachRow',
|
|
|
|
|
format: "JSONEachRow",
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
await clickhouse.insert({
|
|
|
|
|
table: 'option_contract_sync_statuses',
|
|
|
|
|
values: [{symbol: underlyingSymbol, asOfDate, status: 'done'}],
|
|
|
|
|
format: 'JSONEachRow',
|
|
|
|
|
table: "option_contract_sync_statuses",
|
|
|
|
|
values: [{ symbol: underlyingSymbol, asOfDate, status: "done" }],
|
|
|
|
|
format: "JSONEachRow",
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async function getNextBatchOfUnstartedSymbolsAndAsOfDates(previousUnstartedSymbolAndAsOfDate:{symbol:string, asOfDate:string}, limit:number){
|
|
|
|
|
const rows = await query<{symbol:string, earliestAsOfDate:string}>(`
|
|
|
|
|
async function getNextBatchOfUnstartedSymbolsAndAsOfDates(
|
|
|
|
|
previousUnstartedSymbolAndAsOfDate:
|
|
|
|
|
| { symbol: string; asOfDate: string }
|
|
|
|
|
| undefined,
|
|
|
|
|
limit: number
|
|
|
|
|
) {
|
|
|
|
|
if (typeof previousUnstartedSymbolAndAsOfDate === "undefined") {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
const rows = await query<{ symbol: string; earliestAsOfDate: string }>(`
|
|
|
|
|
SELECT
|
|
|
|
|
symbol,
|
|
|
|
|
first_value(asOfDate) as earliestAsOfDate
|
|
|
|
@@ -82,7 +103,7 @@ async function getNextBatchOfUnstartedSymbolsAndAsOfDates(previousUnstartedSymbo
|
|
|
|
|
ORDER BY symbol ASC
|
|
|
|
|
LIMIT ${limit}
|
|
|
|
|
`);
|
|
|
|
|
return rows.map(row=>({
|
|
|
|
|
return rows.map((row) => ({
|
|
|
|
|
symbol: row.symbol,
|
|
|
|
|
asOfDate: row.earliestAsOfDate,
|
|
|
|
|
}));
|
|
|
|
@@ -94,14 +115,17 @@ async function getNextBatchOfUnstartedSymbolsAndAsOfDates(previousUnstartedSymbo
|
|
|
|
|
* of the dates until today's date.
|
|
|
|
|
*/
|
|
|
|
|
async function fillSyncStatuses() {
|
|
|
|
|
const symbols = (await query<{symbol:string}>(`
|
|
|
|
|
const symbols = (
|
|
|
|
|
await query<{ symbol: string }>(`
|
|
|
|
|
SELECT symbol from symbols
|
|
|
|
|
`)).map(({symbol})=>symbol);
|
|
|
|
|
`)
|
|
|
|
|
).map(({ symbol }) => symbol);
|
|
|
|
|
|
|
|
|
|
console.log('symbols', symbols);
|
|
|
|
|
await pAll(symbols.map(
|
|
|
|
|
(symbol)=>
|
|
|
|
|
()=>query<{latestAsOfDate:string}>(`
|
|
|
|
|
console.log("symbols", symbols);
|
|
|
|
|
await pAll(
|
|
|
|
|
symbols.map(
|
|
|
|
|
(symbol) => () =>
|
|
|
|
|
query<{ latestAsOfDate: string }>(`
|
|
|
|
|
SELECT
|
|
|
|
|
latestAsOfDate
|
|
|
|
|
FROM (
|
|
|
|
@@ -115,18 +139,26 @@ async function fillSyncStatuses(){
|
|
|
|
|
)
|
|
|
|
|
WHERE latestAsOfDate > '2022-02-18'
|
|
|
|
|
`).then((rows) =>
|
|
|
|
|
clickhouse.command({
|
|
|
|
|
clickhouse
|
|
|
|
|
.command({
|
|
|
|
|
query: `
|
|
|
|
|
INSERT INTO option_contract_sync_statuses
|
|
|
|
|
SELECT
|
|
|
|
|
'${symbol}' as symbol,
|
|
|
|
|
Date(dateAdd(DAY,number,'${rows[0]?.latestAsOfDate || '2022-02-19'}')) as asOfDate,
|
|
|
|
|
Date(dateAdd(DAY,number,'${
|
|
|
|
|
rows[0]?.latestAsOfDate || "2022-02-19"
|
|
|
|
|
}')) as asOfDate,
|
|
|
|
|
'not-started' as status
|
|
|
|
|
FROM system.numbers
|
|
|
|
|
WHERE number < dateDiff('days',Date('${rows[0]?.latestAsOfDate || '2022-02-19'}'), Date(now()))
|
|
|
|
|
WHERE number < dateDiff('days',Date('${
|
|
|
|
|
rows[0]?.latestAsOfDate || "2022-02-19"
|
|
|
|
|
}'), Date(now()))
|
|
|
|
|
AND number > 0
|
|
|
|
|
`
|
|
|
|
|
}).then(()=>{console.log(`Done ${symbol}`);})
|
|
|
|
|
`,
|
|
|
|
|
})
|
|
|
|
|
.then(() => {
|
|
|
|
|
console.log(`Done ${symbol}`);
|
|
|
|
|
})
|
|
|
|
|
)
|
|
|
|
|
),
|
|
|
|
|
{ concurrency: 6 }
|
|
|
|
@@ -138,6 +170,52 @@ async function fillSyncStatuses(){
|
|
|
|
|
*/
|
|
|
|
|
await fillSyncStatuses();
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Second, since this is startup time, obviously anything `pending` is not really running.
|
|
|
|
|
* So, for each `pending` combo, delete its status, and all contracts synced so far, so as to start afresh.
|
|
|
|
|
*/
|
|
|
|
|
const pendingSymbolsAndAsOfDates = await query<{
|
|
|
|
|
symbol: string;
|
|
|
|
|
asOfDate: string;
|
|
|
|
|
latestStatus: "not-started" | "pending" | "done";
|
|
|
|
|
}>(`
|
|
|
|
|
SELECT
|
|
|
|
|
symbol,
|
|
|
|
|
asOfDate,
|
|
|
|
|
last_value(status) as latestStatus
|
|
|
|
|
FROM (
|
|
|
|
|
SELECT *
|
|
|
|
|
FROM option_contract_sync_statuses
|
|
|
|
|
ORDER BY asOfDate ASC, symbol ASC
|
|
|
|
|
)
|
|
|
|
|
GROUP BY symbol, asOfDate
|
|
|
|
|
HAVING latestStatus = 'pending'
|
|
|
|
|
ORDER BY symbol ASC, asOfDate ASC
|
|
|
|
|
`);
|
|
|
|
|
console.log(
|
|
|
|
|
"Pending operations:",
|
|
|
|
|
pendingSymbolsAndAsOfDates.map(
|
|
|
|
|
({ symbol, asOfDate }) => `${symbol} ${asOfDate}`
|
|
|
|
|
)
|
|
|
|
|
);
|
|
|
|
|
await pAll(
|
|
|
|
|
pendingSymbolsAndAsOfDates.map(
|
|
|
|
|
({ symbol, asOfDate }) =>
|
|
|
|
|
() =>
|
|
|
|
|
pSeries([
|
|
|
|
|
// Delete option_contracts first, in case this `pAll` operation fails and we need to restart; so `option_contract_sync_statuses` "pending" rows are still there for the restart
|
|
|
|
|
() =>
|
|
|
|
|
clickhouse.command({
|
|
|
|
|
query: `DELETE FROM option_contracts WHERE symbol = '${symbol}' AND asOfDate = '${asOfDate}'`,
|
|
|
|
|
}),
|
|
|
|
|
() =>
|
|
|
|
|
clickhouse.command({
|
|
|
|
|
query: `DELETE FROM option_contract_sync_statuses WHERE symbol = '${symbol}' AND asOfDate = '${asOfDate}' AND status = 'pending'`,
|
|
|
|
|
}),
|
|
|
|
|
])
|
|
|
|
|
)
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
/** Second, for each symbol-asOfDate combination whose option contracts
|
|
|
|
|
* are not known, make them known.
|
|
|
|
|
*
|
|
|
|
@@ -147,14 +225,30 @@ const q = new pQueue({concurrency: 6});
|
|
|
|
|
/** Initialized with the lowest possible symbol and the earliest possible asOfDate.
|
|
|
|
|
* It's passed into `getNextUnstartedSymbolAndAsOfDate()`.
|
|
|
|
|
*/
|
|
|
|
|
let nextBatchOfUnstartedSymbolsAndAsOfDates = [{symbol:'A', asOfDate:'2022-02-01'}];
|
|
|
|
|
while((nextBatchOfUnstartedSymbolsAndAsOfDates = await getNextBatchOfUnstartedSymbolsAndAsOfDates(nextBatchOfUnstartedSymbolsAndAsOfDates.pop(), 200)) !== null){
|
|
|
|
|
await pAll(nextBatchOfUnstartedSymbolsAndAsOfDates.map((unstartedSymbolAndAsOfDate)=>
|
|
|
|
|
()=>q.add(async ()=>{
|
|
|
|
|
console.log(`Getting contracts for ${unstartedSymbolAndAsOfDate.symbol} at ${unstartedSymbolAndAsOfDate.asOfDate}`);
|
|
|
|
|
await getOptionContracts(unstartedSymbolAndAsOfDate.symbol, unstartedSymbolAndAsOfDate.asOfDate);
|
|
|
|
|
let nextBatchOfUnstartedSymbolsAndAsOfDates = [
|
|
|
|
|
{ symbol: "A", asOfDate: "2022-02-01" },
|
|
|
|
|
];
|
|
|
|
|
while (
|
|
|
|
|
(nextBatchOfUnstartedSymbolsAndAsOfDates =
|
|
|
|
|
await getNextBatchOfUnstartedSymbolsAndAsOfDates(
|
|
|
|
|
nextBatchOfUnstartedSymbolsAndAsOfDates.pop(),
|
|
|
|
|
200
|
|
|
|
|
)) !== null
|
|
|
|
|
) {
|
|
|
|
|
await pAll(
|
|
|
|
|
nextBatchOfUnstartedSymbolsAndAsOfDates.map(
|
|
|
|
|
(unstartedSymbolAndAsOfDate) => () =>
|
|
|
|
|
q.add(async () => {
|
|
|
|
|
console.log(
|
|
|
|
|
`Getting contracts for ${unstartedSymbolAndAsOfDate.symbol} at ${unstartedSymbolAndAsOfDate.asOfDate}`
|
|
|
|
|
);
|
|
|
|
|
await getOptionContracts(
|
|
|
|
|
unstartedSymbolAndAsOfDate.symbol,
|
|
|
|
|
unstartedSymbolAndAsOfDate.asOfDate
|
|
|
|
|
);
|
|
|
|
|
})
|
|
|
|
|
));
|
|
|
|
|
)
|
|
|
|
|
);
|
|
|
|
|
// don't loop again until the queue has less than 50 items; we don't want it to grow in memory without bound:
|
|
|
|
|
console.log("Waiting till less than 50 in queue");
|
|
|
|
|
await q.onSizeLessThan(50);
|
|
|
|
@@ -162,8 +256,12 @@ while((nextBatchOfUnstartedSymbolsAndAsOfDates = await getNextBatchOfUnstartedSy
|
|
|
|
|
// wait until pending queue operations are done:
|
|
|
|
|
await q.onSizeLessThan(1);
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* For each option contract, find its earliest date of existence, and get all quotes from
|
|
|
|
|
* then on.
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
/*** TODOs ***/
|
|
|
|
|
/*
|
|
|
|
|
+ Gracefully recover from errors in individual operations.
|
|
|
|
|
+ If program stops and is restarted, it should restart anything `pending`: erase existing option contracts for that symbol/asOfDate
|
|
|
|
|
*/
|