use pnpm, clickhouse; ingest scripts

This commit is contained in:
2023-12-11 16:06:26 -05:00
parent 1dc049c7ee
commit 795a903eba
5 changed files with 778 additions and 615 deletions
+60 -39
View File
@@ -1,18 +1,33 @@
//import { restClient as PolygonClient } from '@polygon.io/client-js';
import { createClient as createClickhouseClient } from '@clickhouse/client'
import { sleep, getPolygonApiKey } from './util.mjs';
const apiKey = "H95NTsatM1iTWLUwDLxM2J5zhUVYdCEz";
//const polygonClient = PolygonClient(apiKey, "https://api.polygon.io", {pagination: false, trace: true,}); // automatically call `next_url` if there is one
const clickhouse = createClickhouseClient({username:'avraham', password:'buginoo'});
const optionAggregatesTableName = "option_aggregates";
const optionContractsTableName = "option_aggregates";
async function deleteClickhouseTable(){
await clickhouse.command({
query: `DROP TABLE IF EXISTS ${optionAggregatesTableName}`,
query: `DROP TABLE IF EXISTS ${optionContractsTableName}`,
})
}
async function createClickhouseTable(){
await clickhouse.command({
query: `
CREATE TABLE ${optionContractsTableName}
(
symbol LowCardinality(String),
asOfDate Date,
expirationDate Date,
optionType Enum('call', 'put'),
strike Decimal64(9)
)
ENGINE MergeTree()
ORDER BY (symbol, asOfDate, expirationDate, optionType, strike)
`,
});
/*
await clickhouse.command({
query: `
CREATE TABLE ${optionAggregatesTableName}
@@ -32,65 +47,71 @@ async function createClickhouseTable(){
volume_weighted_price Decimal64(9)
)
ENGINE MergeTree()
ORDER BY (symbol, tsStart)
ORDER BY (tsStart, symbol, expirationDate, optionType, strike)
`,
})
});
*/
}
async function insertResultsIntoClickhouse(optionAggregatesResult){
async function insertResultsIntoClickhouse(symbol, asOfDate, optionContractsResult){
await clickhouse.insert({
table: optionAggregatesTableName,
table: optionContractsTableName,
// structure should match the desired format, JSONEachRow in this example
values: optionAggregatesResult.results.map(r=>({
symbol: optionAggregatesResult.ticker,
tsStart: (r.t || 0)/1000,
open: r.o,
close: r.c,
low: r.l,
high: r.h,
volume: r.v,
volume_weighted_price: r.vw
values: optionContractsResult.results.map(r=>({
symbol,
asOfDate,
expirationDate: r.expiration_date,
optionType: r.contract_type,
strike: r.strike_price
})),
format: 'JSONEachRow',
});
console.log('inserted', optionAggregatesResult.ticker);
console.log('inserted', symbol);
}
function sleep(ms){
return new Promise((resolve)=>{
setTimeout(resolve, ms);
});
}
/**
* Get the underlying price aggregates for every 10-minute period starting 2 years ago (the lookback period for the free tier) until now
* Get all available option expirations and strikes, from the point-of-view of a given date
*/
async function fetchOptionAggregates(symbol, fromDate, toDate, limit){
let optionAggregatesResult = await (await fetch(`https://api.polygon.io/v2/aggs/ticker/${symbol}/range/10/minute/${fromDate}/${toDate}?adjusted=false&sort=asc&limit=${limit}&apiKey=${apiKey}`)).json();
console.log(optionAggregatesResult.resultsCount);
if(optionAggregatesResult.status === 'OK' && typeof optionAggregatesResult.results !== "undefined" && optionAggregatesResult.results.length > 0){
await insertResultsIntoClickhouse(optionAggregatesResult);
while(optionAggregatesResult.hasOwnProperty("next_url")){
await sleep(13000);
optionAggregatesResult = await (await fetch(`${optionAggregatesResult.next_url}&apiKey=${apiKey}`)).json();
if(optionAggregatesResult.status === 'OK' && typeof optionAggregatesResult.results !== "undefined" && optionAggregatesResult.results.length > 0){
console.log(optionAggregatesResult.resultsCount);
await insertResultsIntoClickhouse(optionAggregatesResult);
async function fetchAvailableOptionExpirationsAndStrikesAsOfDate(symbol, asOfDate){
const limit = 1000;
let apiKey = await getPolygonApiKey();
let optionContractsResult = await (await fetch(`https://api.polygon.io/v3/reference/options/contracts?underlying_ticker=${symbol}&contract_type=call&as_of=${asOfDate}&expired=false&order=asc&limit=${limit}&sort=expiration_date&apiKey=${apiKey}`)).json();
console.log(optionContractsResult.results.length);
if(optionContractsResult.status === 'OK' && typeof optionContractsResult.results !== "undefined" && optionContractsResult.results.length > 0){
await insertResultsIntoClickhouse(symbol, asOfDate, optionContractsResult);
while(optionContractsResult.hasOwnProperty("next_url")){
apiKey = await getPolygonApiKey();
optionContractsResult = await (await fetch(`${optionContractsResult.next_url}&apiKey=${apiKey}`)).json();
if(optionContractsResult.status === 'OK' && typeof optionContractsResult.results !== "undefined" && optionContractsResult.results.length > 0){
console.log(optionContractsResult.results.length);
await insertResultsIntoClickhouse(symbol, asOfDate, optionContractsResult);
}
else{
console.log(optionAggregatesResult);
console.log(optionContractsResult);
}
}
}
else{
console.log(optionAggregatesResult);
console.log(optionContractsResult);
}
}
const stockAggregatesTableName = "stock_aggregates";
async function fetchMarketDates(){
return (await (await clickhouse.query({
query: `SELECT DISTINCT(Date(tsStart)) FROM ${stockAggregatesTableName}`,
format: 'JSONCompactEachRow'
})).json()).map((r)=>r[0]);
}
await deleteClickhouseTable();
await createClickhouseTable();
for(let symbol of ["AAPL", "GOOGL", "MSFT", "TSLA", "AMD", "NFLX", "SPY"]){
//await fetchOptionAggregates(symbol, "2021-11-27", "2023-11-25", 50000);
await sleep(13000);
const marketDates = await fetchMarketDates();
for(let asOfDate of marketDates){
for(let symbol of ["AAPL", "GOOGL", "MSFT", "TSLA", "AMD", "NFLX", "SPY"]){
await fetchAvailableOptionExpirationsAndStrikesAsOfDate(symbol, asOfDate);
}
}
+21
View File
@@ -0,0 +1,21 @@
export function sleep(ms){
return new Promise((resolve)=>{
setTimeout(resolve, ms);
});
}
/**
* Returns the api key, but applies a rate limit.
*/
export const getPolygonApiKey = ((rateLimit)=>{
const apiKey = "H95NTsatM1iTWLUwDLxM2J5zhUVYdCEz";
let lastInvocation = 0;
return async ()=>{
const now = Date.now();
if(now-lastInvocation < rateLimit){
await sleep(rateLimit - (now-lastInvocation));
}
lastInvocation = Date.now();
return apiKey;
}
})(13000);