From dabfec86a16e058eb357901958bc92bc48cc8cc7 Mon Sep 17 00:00:00 2001 From: Avraham Sakal Date: Sun, 25 Feb 2024 15:18:15 -0500 Subject: [PATCH] get unsynced symbol-asOfDate combos in batches use `clickhouse.sakal.us` instead of `kubectl port-forward` --- server/.env.development | 2 +- server/package.json | 1 - server/src/clickhouse.ts | 1 + server/src/env.ts | 18 +++++++- ...ption-quotes-from-polygon-to-clickhouse.ts | 44 +++++++++++-------- server/tables.sql | 2 +- 6 files changed, 45 insertions(+), 23 deletions(-) diff --git a/server/.env.development b/server/.env.development index cca970c..2c363ca 100644 --- a/server/.env.development +++ b/server/.env.development @@ -1,2 +1,2 @@ -CLICKHOUSE_HOST=http://localhost:8123 +CLICKHOUSE_HOST=https://clickhouse.sakal.us LISTEN_PORT=3005 \ No newline at end of file diff --git a/server/package.json b/server/package.json index 68bed06..491d539 100644 --- a/server/package.json +++ b/server/package.json @@ -3,7 +3,6 @@ "type": "module", "scripts": { "build": "esbuild src/*.ts src/**/*.ts --platform=node --outdir=dist --format=esm", - "build-scripts": "esbuild scripts/*.ts --platform=node --outdir=dist/scripts --format=esm", "dev:node": "node --watch dist/index.js", "dev:esbuild": "pnpm run build --watch", "dev": "run-p dev:*" diff --git a/server/src/clickhouse.ts b/server/src/clickhouse.ts index 4a99b43..bc2fbf1 100644 --- a/server/src/clickhouse.ts +++ b/server/src/clickhouse.ts @@ -1,3 +1,4 @@ +import _ from './env.js'; import { createClient as createClickhouseClient } from '@clickhouse/client'; import type { DataFormat } from '@clickhouse/client'; diff --git a/server/src/env.ts b/server/src/env.ts index 428b1a9..e90ea8d 100644 --- a/server/src/env.ts +++ b/server/src/env.ts @@ -1,7 +1,21 @@ +import path from 'path'; +import { fileURLToPath } from 'url'; import dotenv from 'dotenv'; -if(process.env.NODE_DEV==="development"){ - dotenv.config({ path:"../.env.development" }); +/** ES modules cannot use `__dirname`, so we have to mimic its functionality. + * Taken from [https://flaviocopes.com/fix-dirname-not-defined-es-module-scope/] + */ +const __filename = fileURLToPath(import.meta.url); +const __dirname = path.dirname(__filename); + +if(process.env.NODE_ENV==="development"){ + const ret = dotenv.config({ path:`${__dirname}/../.env.development` }); + if(ret.parsed){ + console.log("parsed!", process.env) + } + else{ + console.log("not parsed ;-(", ret.error) + } } export default null; \ No newline at end of file diff --git a/server/src/scripts/ingest-option-quotes-from-polygon-to-clickhouse.ts b/server/src/scripts/ingest-option-quotes-from-polygon-to-clickhouse.ts index 86952e2..53c32e4 100644 --- a/server/src/scripts/ingest-option-quotes-from-polygon-to-clickhouse.ts +++ b/server/src/scripts/ingest-option-quotes-from-polygon-to-clickhouse.ts @@ -52,8 +52,7 @@ async function getOptionContracts(underlyingSymbol, asOfDate){ }); } -//await getOptionContracts('AAPL','2024-01-30'); -async function getNextUnstartedSymbolAndAsOfDate(previousUnstartedSymbolAndAsOfDate:{symbol:string, asOfDate:string}){ +async function getNextBatchOfUnstartedSymbolsAndAsOfDates(previousUnstartedSymbolAndAsOfDate:{symbol:string, asOfDate:string}, limit:number){ const rows = await query<{symbol:string, earliestAsOfDate:string}>(` SELECT symbol, @@ -81,17 +80,12 @@ async function getNextUnstartedSymbolAndAsOfDate(previousUnstartedSymbolAndAsOfD symbol > '${previousUnstartedSymbolAndAsOfDate.symbol}' ) ORDER BY symbol ASC - LIMIT 1 + LIMIT ${limit} `); - if(rows.length === 0){ - return null; - } - else{ - return { - symbol: rows[0].symbol, - asOfDate: rows[0].earliestAsOfDate, - } - } + return rows.map(row=>({ + symbol: row.symbol, + asOfDate: row.earliestAsOfDate, + })); } /** @@ -139,14 +133,28 @@ async function fillSyncStatuses(){ ); } +/** First, make sure we know which symbol-asOfDate combinations are + * yet un-synced. +*/ await fillSyncStatuses(); + +/** Second, for each symbol-asOfDate combination whose option contracts + * are not known, make them known. + * + * This queries Polygon with a concurrency of 6. + */ const q = new pQueue({concurrency: 6}); -let nextUnstartedSymbolAndAsOfDate = {symbol:'A', asOfDate:'2022-02-01'}; -while((nextUnstartedSymbolAndAsOfDate = await getNextUnstartedSymbolAndAsOfDate(nextUnstartedSymbolAndAsOfDate)) !== null){ - await q.add(async ()=>{ - console.log(`Getting contracts for ${nextUnstartedSymbolAndAsOfDate.symbol} at ${nextUnstartedSymbolAndAsOfDate.asOfDate}`); - await getOptionContracts(nextUnstartedSymbolAndAsOfDate.symbol, nextUnstartedSymbolAndAsOfDate.asOfDate); - }); +/** Initialized with the lowest possible symbol and the earliest possible asOfDate. + * It's passed into `getNextUnstartedSymbolAndAsOfDate()`. + */ +let nextBatchOfUnstartedSymbolsAndAsOfDates = [{symbol:'A', asOfDate:'2022-02-01'}]; +while((nextBatchOfUnstartedSymbolsAndAsOfDates = await getNextBatchOfUnstartedSymbolsAndAsOfDates(nextBatchOfUnstartedSymbolsAndAsOfDates.pop(), 200)) !== null){ + await pAll(nextBatchOfUnstartedSymbolsAndAsOfDates.map((unstartedSymbolAndAsOfDate)=> + ()=>q.add(async ()=>{ + console.log(`Getting contracts for ${unstartedSymbolAndAsOfDate.symbol} at ${unstartedSymbolAndAsOfDate.asOfDate}`); + await getOptionContracts(unstartedSymbolAndAsOfDate.symbol, unstartedSymbolAndAsOfDate.asOfDate); + }) + )); // don't loop again until the queue has less than 50 items; we don't want it to grow in memory without bound: console.log("Waiting till less than 50 in queue"); await q.onSizeLessThan(50); diff --git a/server/tables.sql b/server/tables.sql index 5e4a605..d6582b0 100644 --- a/server/tables.sql +++ b/server/tables.sql @@ -24,7 +24,7 @@ ORDER BY (asOfDate, symbol); CREATE TABLE option_contracts ( asOfDate Date, - symbol String, + symbol LowCardinality(String), expirationDate Date, strike Float32, type ENUM('call', 'put')