get unsynced symbol-asOfDate combos in batches

use `clickhouse.sakal.us` instead of `kubectl port-forward`
main
Avraham Sakal 1 year ago
parent 5c8a54b4b5
commit dabfec86a1

@ -1,2 +1,2 @@
CLICKHOUSE_HOST=http://localhost:8123 CLICKHOUSE_HOST=https://clickhouse.sakal.us
LISTEN_PORT=3005 LISTEN_PORT=3005

@ -3,7 +3,6 @@
"type": "module", "type": "module",
"scripts": { "scripts": {
"build": "esbuild src/*.ts src/**/*.ts --platform=node --outdir=dist --format=esm", "build": "esbuild src/*.ts src/**/*.ts --platform=node --outdir=dist --format=esm",
"build-scripts": "esbuild scripts/*.ts --platform=node --outdir=dist/scripts --format=esm",
"dev:node": "node --watch dist/index.js", "dev:node": "node --watch dist/index.js",
"dev:esbuild": "pnpm run build --watch", "dev:esbuild": "pnpm run build --watch",
"dev": "run-p dev:*" "dev": "run-p dev:*"

@ -1,3 +1,4 @@
import _ from './env.js';
import { createClient as createClickhouseClient } from '@clickhouse/client'; import { createClient as createClickhouseClient } from '@clickhouse/client';
import type { DataFormat } from '@clickhouse/client'; import type { DataFormat } from '@clickhouse/client';

@ -1,7 +1,21 @@
import path from 'path';
import { fileURLToPath } from 'url';
import dotenv from 'dotenv'; import dotenv from 'dotenv';
if(process.env.NODE_DEV==="development"){ /** ES modules cannot use `__dirname`, so we have to mimic its functionality.
dotenv.config({ path:"../.env.development" }); * Taken from [https://flaviocopes.com/fix-dirname-not-defined-es-module-scope/]
*/
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
if(process.env.NODE_ENV==="development"){
const ret = dotenv.config({ path:`${__dirname}/../.env.development` });
if(ret.parsed){
console.log("parsed!", process.env)
}
else{
console.log("not parsed ;-(", ret.error)
}
} }
export default null; export default null;

@ -52,8 +52,7 @@ async function getOptionContracts(underlyingSymbol, asOfDate){
}); });
} }
//await getOptionContracts('AAPL','2024-01-30'); async function getNextBatchOfUnstartedSymbolsAndAsOfDates(previousUnstartedSymbolAndAsOfDate:{symbol:string, asOfDate:string}, limit:number){
async function getNextUnstartedSymbolAndAsOfDate(previousUnstartedSymbolAndAsOfDate:{symbol:string, asOfDate:string}){
const rows = await query<{symbol:string, earliestAsOfDate:string}>(` const rows = await query<{symbol:string, earliestAsOfDate:string}>(`
SELECT SELECT
symbol, symbol,
@ -81,17 +80,12 @@ async function getNextUnstartedSymbolAndAsOfDate(previousUnstartedSymbolAndAsOfD
symbol > '${previousUnstartedSymbolAndAsOfDate.symbol}' symbol > '${previousUnstartedSymbolAndAsOfDate.symbol}'
) )
ORDER BY symbol ASC ORDER BY symbol ASC
LIMIT 1 LIMIT ${limit}
`); `);
if(rows.length === 0){ return rows.map(row=>({
return null; symbol: row.symbol,
} asOfDate: row.earliestAsOfDate,
else{ }));
return {
symbol: rows[0].symbol,
asOfDate: rows[0].earliestAsOfDate,
}
}
} }
/** /**
@ -139,14 +133,28 @@ async function fillSyncStatuses(){
); );
} }
/** First, make sure we know which symbol-asOfDate combinations are
* yet un-synced.
*/
await fillSyncStatuses(); await fillSyncStatuses();
/** Second, for each symbol-asOfDate combination whose option contracts
* are not known, make them known.
*
* This queries Polygon with a concurrency of 6.
*/
const q = new pQueue({concurrency: 6}); const q = new pQueue({concurrency: 6});
let nextUnstartedSymbolAndAsOfDate = {symbol:'A', asOfDate:'2022-02-01'}; /** Initialized with the lowest possible symbol and the earliest possible asOfDate.
while((nextUnstartedSymbolAndAsOfDate = await getNextUnstartedSymbolAndAsOfDate(nextUnstartedSymbolAndAsOfDate)) !== null){ * It's passed into `getNextUnstartedSymbolAndAsOfDate()`.
await q.add(async ()=>{ */
console.log(`Getting contracts for ${nextUnstartedSymbolAndAsOfDate.symbol} at ${nextUnstartedSymbolAndAsOfDate.asOfDate}`); let nextBatchOfUnstartedSymbolsAndAsOfDates = [{symbol:'A', asOfDate:'2022-02-01'}];
await getOptionContracts(nextUnstartedSymbolAndAsOfDate.symbol, nextUnstartedSymbolAndAsOfDate.asOfDate); while((nextBatchOfUnstartedSymbolsAndAsOfDates = await getNextBatchOfUnstartedSymbolsAndAsOfDates(nextBatchOfUnstartedSymbolsAndAsOfDates.pop(), 200)) !== null){
}); await pAll(nextBatchOfUnstartedSymbolsAndAsOfDates.map((unstartedSymbolAndAsOfDate)=>
()=>q.add(async ()=>{
console.log(`Getting contracts for ${unstartedSymbolAndAsOfDate.symbol} at ${unstartedSymbolAndAsOfDate.asOfDate}`);
await getOptionContracts(unstartedSymbolAndAsOfDate.symbol, unstartedSymbolAndAsOfDate.asOfDate);
})
));
// don't loop again until the queue has less than 50 items; we don't want it to grow in memory without bound: // don't loop again until the queue has less than 50 items; we don't want it to grow in memory without bound:
console.log("Waiting till less than 50 in queue"); console.log("Waiting till less than 50 in queue");
await q.onSizeLessThan(50); await q.onSizeLessThan(50);

@ -24,7 +24,7 @@ ORDER BY (asOfDate, symbol);
CREATE TABLE option_contracts CREATE TABLE option_contracts
( (
asOfDate Date, asOfDate Date,
symbol String, symbol LowCardinality(String),
expirationDate Date, expirationDate Date,
strike Float32, strike Float32,
type ENUM('call', 'put') type ENUM('call', 'put')

Loading…
Cancel
Save