sync option contracts for every asOfDate
This commit is contained in:
@@ -15,6 +15,7 @@
|
|||||||
"cors": "^2.8.5",
|
"cors": "^2.8.5",
|
||||||
"dotenv": "^16.4.1",
|
"dotenv": "^16.4.1",
|
||||||
"p-all": "^5.0.0",
|
"p-all": "^5.0.0",
|
||||||
|
"p-queue": "^8.0.1",
|
||||||
"p-throttle": "^6.1.0"
|
"p-throttle": "^6.1.0"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
|
|||||||
Generated
+20
@@ -23,6 +23,9 @@ dependencies:
|
|||||||
p-all:
|
p-all:
|
||||||
specifier: ^5.0.0
|
specifier: ^5.0.0
|
||||||
version: 5.0.0
|
version: 5.0.0
|
||||||
|
p-queue:
|
||||||
|
specifier: ^8.0.1
|
||||||
|
version: 8.0.1
|
||||||
p-throttle:
|
p-throttle:
|
||||||
specifier: ^6.1.0
|
specifier: ^6.1.0
|
||||||
version: 6.1.0
|
version: 6.1.0
|
||||||
@@ -505,6 +508,10 @@ packages:
|
|||||||
engines: {node: '>=0.8.0'}
|
engines: {node: '>=0.8.0'}
|
||||||
dev: true
|
dev: true
|
||||||
|
|
||||||
|
/eventemitter3@5.0.1:
|
||||||
|
resolution: {integrity: sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==}
|
||||||
|
dev: false
|
||||||
|
|
||||||
/for-each@0.3.3:
|
/for-each@0.3.3:
|
||||||
resolution: {integrity: sha512-jqYfLp7mo9vIyQf8ykW2v7A+2N4QjeCeI5+Dz9XraiO1ign81wjiH7Fb9vSOWvQfNtmSa4H2RoQTrrXivdUZmw==}
|
resolution: {integrity: sha512-jqYfLp7mo9vIyQf8ykW2v7A+2N4QjeCeI5+Dz9XraiO1ign81wjiH7Fb9vSOWvQfNtmSa4H2RoQTrrXivdUZmw==}
|
||||||
dependencies:
|
dependencies:
|
||||||
@@ -810,11 +817,24 @@ packages:
|
|||||||
engines: {node: '>=16'}
|
engines: {node: '>=16'}
|
||||||
dev: false
|
dev: false
|
||||||
|
|
||||||
|
/p-queue@8.0.1:
|
||||||
|
resolution: {integrity: sha512-NXzu9aQJTAzbBqOt2hwsR63ea7yvxJc0PwN/zobNAudYfb1B7R08SzB4TsLeSbUCuG467NhnoT0oO6w1qRO+BA==}
|
||||||
|
engines: {node: '>=18'}
|
||||||
|
dependencies:
|
||||||
|
eventemitter3: 5.0.1
|
||||||
|
p-timeout: 6.1.2
|
||||||
|
dev: false
|
||||||
|
|
||||||
/p-throttle@6.1.0:
|
/p-throttle@6.1.0:
|
||||||
resolution: {integrity: sha512-eQMdGTxk2+047La67wefUtt0tEHh7D+C8Jl7QXoFCuIiNYeQ9zWs2AZiJdIAs72rSXZ06t11me2bgalRNdy3SQ==}
|
resolution: {integrity: sha512-eQMdGTxk2+047La67wefUtt0tEHh7D+C8Jl7QXoFCuIiNYeQ9zWs2AZiJdIAs72rSXZ06t11me2bgalRNdy3SQ==}
|
||||||
engines: {node: '>=18'}
|
engines: {node: '>=18'}
|
||||||
dev: false
|
dev: false
|
||||||
|
|
||||||
|
/p-timeout@6.1.2:
|
||||||
|
resolution: {integrity: sha512-UbD77BuZ9Bc9aABo74gfXhNvzC9Tx7SxtHSh1fxvx3jTLLYvmVhiQZZrJzqqU0jKbN32kb5VOKiLEQI/3bIjgQ==}
|
||||||
|
engines: {node: '>=14.16'}
|
||||||
|
dev: false
|
||||||
|
|
||||||
/parse-json@4.0.0:
|
/parse-json@4.0.0:
|
||||||
resolution: {integrity: sha512-aOIos8bujGN93/8Ox/jPLh7RwVnPEysynVFE+fQZyg6jKELEHwzgKdLRFHUgXJL6kylijVSBC4BvN9OmsB48Rw==}
|
resolution: {integrity: sha512-aOIos8bujGN93/8Ox/jPLh7RwVnPEysynVFE+fQZyg6jKELEHwzgKdLRFHUgXJL6kylijVSBC4BvN9OmsB48Rw==}
|
||||||
engines: {node: '>=4'}
|
engines: {node: '>=4'}
|
||||||
|
|||||||
@@ -1,26 +1,106 @@
|
|||||||
import { clickhouse, query } from "../clickhouse.js";
|
import { clickhouse, query } from "../clickhouse.js";
|
||||||
import { getApiKey } from "./polygon.js";
|
import { getApiKey } from "./polygon.js";
|
||||||
import pAll from 'p-all';
|
import pAll from 'p-all';
|
||||||
|
import pQueue from 'p-queue';
|
||||||
|
|
||||||
type PolygonResponse = {next_url?:string, results:Array<{ticker:string}>};
|
type PolygonResponse = {next_url?:string, results:Array<{ticker:string, expiration_date:string, strike_price:number, contract_type:'call'|'put'}>};
|
||||||
async function getOptionContracts(underlyingSymbol, asOfDate){
|
async function getOptionContracts(underlyingSymbol, asOfDate){
|
||||||
let latestBatch = await (await fetch(`https://api.polygon.io/v3/reference/options/contracts?underlying_ticker=${underlyingSymbol}&as_of=${asOfDate}&sort=ticker&limit=1000&apiKey=${await getApiKey()}`)).json() as PolygonResponse;
|
// first mark the sync of this particular symbol and asOfDate as "pending":
|
||||||
console.log(latestBatch.results.map((r)=>r.ticker));
|
await clickhouse.insert({
|
||||||
while(latestBatch.hasOwnProperty('next_url')){
|
table: 'option_contract_sync_statuses',
|
||||||
latestBatch = await (await fetch(`${latestBatch.next_url}&apiKey=${await getApiKey()}`)).json() as PolygonResponse;
|
values: [{symbol: underlyingSymbol, asOfDate, status: 'pending'}],
|
||||||
console.log(latestBatch.results.map((r)=>r.ticker));
|
format: 'JSONEachRow',
|
||||||
|
});
|
||||||
|
// then commence the sync with the initial request:
|
||||||
|
let latestBatchResponse = await (await fetch(`https://api.polygon.io/v3/reference/options/contracts?underlying_ticker=${underlyingSymbol}&as_of=${asOfDate}&sort=ticker&limit=1000&apiKey=${await getApiKey()}`)).json() as PolygonResponse;
|
||||||
|
let latestBatch = latestBatchResponse.results
|
||||||
|
.map((result)=>({
|
||||||
|
asOfDate,
|
||||||
|
symbol: underlyingSymbol,
|
||||||
|
expirationDate: result.expiration_date,
|
||||||
|
strike: result.strike_price,
|
||||||
|
type: result.contract_type,
|
||||||
|
}));
|
||||||
|
await clickhouse.insert({
|
||||||
|
table: 'option_contracts',
|
||||||
|
values: latestBatch,
|
||||||
|
format: 'JSONEachRow',
|
||||||
|
});
|
||||||
|
//console.log(latestBatch.results.map((r)=>r.ticker));
|
||||||
|
// as long as there's a `next_url`, call that:
|
||||||
|
while(latestBatchResponse.hasOwnProperty('next_url')){
|
||||||
|
latestBatchResponse = await (await fetch(`${latestBatchResponse.next_url}&apiKey=${await getApiKey()}`)).json() as PolygonResponse;
|
||||||
|
latestBatch = latestBatchResponse.results
|
||||||
|
.map((result)=>({
|
||||||
|
asOfDate,
|
||||||
|
symbol: underlyingSymbol,
|
||||||
|
expirationDate: result.expiration_date,
|
||||||
|
strike: result.strike_price,
|
||||||
|
type: result.contract_type,
|
||||||
|
}));
|
||||||
|
//console.log(latestBatch.results.map((r)=>r.ticker));
|
||||||
|
await clickhouse.insert({
|
||||||
|
table: 'option_contracts',
|
||||||
|
values: latestBatch,
|
||||||
|
format: 'JSONEachRow',
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
await clickhouse.insert({
|
||||||
|
table: 'option_contract_sync_statuses',
|
||||||
|
values: [{symbol: underlyingSymbol, asOfDate, status: 'done'}],
|
||||||
|
format: 'JSONEachRow',
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
//await getOptionContracts('AAPL','2024-01-30');
|
//await getOptionContracts('AAPL','2024-01-30');
|
||||||
|
async function getNextUnstartedSymbolAndAsOfDate(previousUnstartedSymbolAndAsOfDate:{symbol:string, asOfDate:string}){
|
||||||
|
const rows = await query<{symbol:string, earliestAsOfDate:string}>(`
|
||||||
|
SELECT
|
||||||
|
symbol,
|
||||||
|
first_value(asOfDate) as earliestAsOfDate
|
||||||
|
FROM (
|
||||||
|
SELECT
|
||||||
|
symbol,
|
||||||
|
asOfDate,
|
||||||
|
last_value(status) as latestStatus
|
||||||
|
FROM (
|
||||||
|
SELECT *
|
||||||
|
FROM option_contract_sync_statuses
|
||||||
|
ORDER BY asOfDate ASC, symbol ASC
|
||||||
|
)
|
||||||
|
GROUP BY symbol, asOfDate
|
||||||
|
HAVING latestStatus = 'not-started'
|
||||||
|
ORDER BY symbol ASC, asOfDate ASC
|
||||||
|
)
|
||||||
|
GROUP BY symbol
|
||||||
|
HAVING (
|
||||||
|
symbol = '${previousUnstartedSymbolAndAsOfDate.symbol}'
|
||||||
|
AND asOfDate > '${previousUnstartedSymbolAndAsOfDate.asOfDate}'
|
||||||
|
)
|
||||||
|
OR (
|
||||||
|
symbol > '${previousUnstartedSymbolAndAsOfDate.symbol}'
|
||||||
|
)
|
||||||
|
ORDER BY symbol ASC
|
||||||
|
LIMIT 1
|
||||||
|
`);
|
||||||
|
if(rows.length === 0){
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
else{
|
||||||
|
return {
|
||||||
|
symbol: rows[0].symbol,
|
||||||
|
asOfDate: rows[0].earliestAsOfDate,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* For each symbol in `symbols` table, check the latest `asOfDate`
|
* For each symbol in `symbols` table, check the latest `asOfDate`
|
||||||
* in `symbol_sync_statuses` for that symbol. Then fill-in the rest
|
* in `option_contract_sync_statuses` for that symbol. Then fill-in the rest
|
||||||
* of the dates until today's date.
|
* of the dates until today's date.
|
||||||
*/
|
*/
|
||||||
async function fillSyncStatuses(){
|
async function fillSyncStatuses(){
|
||||||
const symbols = (await query(`
|
const symbols = (await query<{symbol:string}>(`
|
||||||
SELECT symbol from symbols
|
SELECT symbol from symbols
|
||||||
`)).map(({symbol})=>symbol);
|
`)).map(({symbol})=>symbol);
|
||||||
|
|
||||||
@@ -30,11 +110,11 @@ async function fillSyncStatuses(){
|
|||||||
()=>query<{latestAsOfDate:string}>(`
|
()=>query<{latestAsOfDate:string}>(`
|
||||||
SELECT
|
SELECT
|
||||||
latestAsOfDate
|
latestAsOfDate
|
||||||
FROM(
|
FROM (
|
||||||
SELECT last_value(asOfDate) as latestAsOfDate
|
SELECT last_value(asOfDate) as latestAsOfDate
|
||||||
FROM (
|
FROM (
|
||||||
SELECT *
|
SELECT *
|
||||||
FROM symbol_sync_statuses
|
FROM option_contract_sync_statuses
|
||||||
WHERE symbol = '${symbol}'
|
WHERE symbol = '${symbol}'
|
||||||
ORDER BY asOfDate ASC
|
ORDER BY asOfDate ASC
|
||||||
)
|
)
|
||||||
@@ -43,7 +123,7 @@ async function fillSyncStatuses(){
|
|||||||
`).then((rows)=>
|
`).then((rows)=>
|
||||||
clickhouse.command({
|
clickhouse.command({
|
||||||
query: `
|
query: `
|
||||||
INSERT INTO symbol_sync_statuses
|
INSERT INTO option_contract_sync_statuses
|
||||||
SELECT
|
SELECT
|
||||||
'${symbol}' as symbol,
|
'${symbol}' as symbol,
|
||||||
Date(dateAdd(DAY,number,'${rows[0]?.latestAsOfDate || '2022-02-19'}')) as asOfDate,
|
Date(dateAdd(DAY,number,'${rows[0]?.latestAsOfDate || '2022-02-19'}')) as asOfDate,
|
||||||
@@ -59,4 +139,15 @@ async function fillSyncStatuses(){
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
await fillSyncStatuses();
|
await fillSyncStatuses();
|
||||||
|
const q = new pQueue({concurrency: 6});
|
||||||
|
let nextUnstartedSymbolAndAsOfDate = {symbol:'A', asOfDate:'2022-02-01'};
|
||||||
|
while((nextUnstartedSymbolAndAsOfDate = await getNextUnstartedSymbolAndAsOfDate(nextUnstartedSymbolAndAsOfDate)) !== null){
|
||||||
|
await q.add(async ()=>{
|
||||||
|
console.log(`Getting contracts for ${nextUnstartedSymbolAndAsOfDate.symbol} at ${nextUnstartedSymbolAndAsOfDate.asOfDate}`);
|
||||||
|
await getOptionContracts(nextUnstartedSymbolAndAsOfDate.symbol, nextUnstartedSymbolAndAsOfDate.asOfDate);
|
||||||
|
});
|
||||||
|
// don't loop again until the queue has less than 50 items; we don't want it to grow in memory without bound:
|
||||||
|
console.log("Waiting till less than 50 in queue");
|
||||||
|
await q.onSizeLessThan(50);
|
||||||
|
}
|
||||||
+13
-1
@@ -12,7 +12,7 @@ CREATE TABLE symbols
|
|||||||
ENGINE MergeTree()
|
ENGINE MergeTree()
|
||||||
ORDER BY (symbol);
|
ORDER BY (symbol);
|
||||||
|
|
||||||
CREATE TABLE symbol_sync_statuses
|
CREATE TABLE option_contract_sync_statuses
|
||||||
(
|
(
|
||||||
symbol String,
|
symbol String,
|
||||||
asOfDate Date,
|
asOfDate Date,
|
||||||
@@ -21,6 +21,18 @@ CREATE TABLE symbol_sync_statuses
|
|||||||
ENGINE MergeTree()
|
ENGINE MergeTree()
|
||||||
ORDER BY (asOfDate, symbol);
|
ORDER BY (asOfDate, symbol);
|
||||||
|
|
||||||
|
CREATE TABLE option_contracts
|
||||||
|
(
|
||||||
|
asOfDate Date,
|
||||||
|
symbol String,
|
||||||
|
expirationDate Date,
|
||||||
|
strike Float32,
|
||||||
|
type ENUM('call', 'put')
|
||||||
|
)
|
||||||
|
ENGINE MergeTree()
|
||||||
|
PRIMARY KEY (asOfDate, symbol)
|
||||||
|
ORDER BY (asOfDate, symbol, expirationDate, strike, type);
|
||||||
|
|
||||||
CREATE TABLE stock_aggregates
|
CREATE TABLE stock_aggregates
|
||||||
(
|
(
|
||||||
symbol LowCardinality(String),
|
symbol LowCardinality(String),
|
||||||
|
|||||||
Reference in New Issue
Block a user