From b496a35a2710315b95e0f6ed4d99f4ca42e9856b Mon Sep 17 00:00:00 2001 From: Avraham Sakal Date: Tue, 13 Jun 2023 00:35:06 -0400 Subject: [PATCH] begin move to postgres --- .gitignore | 1 + ingest.ts | 124 ---------------------------------- package.json | 2 +- src/index.ts | 31 ++------- src/ingest.ts | 172 +++++++++++++++++++++++++++++++++++++++++++++++ startPostgres.sh | 7 ++ yarn.lock | 15 ++--- 7 files changed, 190 insertions(+), 162 deletions(-) delete mode 100644 ingest.ts create mode 100644 src/ingest.ts create mode 100755 startPostgres.sh diff --git a/.gitignore b/.gitignore index 8af3d39..52e7fe1 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ *.duckdb.wal *.parquet dist +postgres node_modules \ No newline at end of file diff --git a/ingest.ts b/ingest.ts deleted file mode 100644 index a0ad6bb..0000000 --- a/ingest.ts +++ /dev/null @@ -1,124 +0,0 @@ -import fs from 'fs/promises'; -import duckdb from 'duckdb'; -const db = new duckdb.Database('quotes.duckdb'); // or a file name for a persistent DB - -const pathToCsvs = "/home/brian/Downloads/options-data"; - -const statements = [ - "CREATE TYPE OPTION_TYPE as ENUM ('put', 'call');", - "CREATE TYPE OPTION_STYLE as ENUM ('A', 'E');", - `CREATE TABLE IF NOT EXISTS option_quote ( - contract VARCHAR GENERATED ALWAYS AS ( - CONCAT( - underlying , - RIGHT(YEAR(expiration)::VARCHAR,2) , - LPAD(MONTH(expiration)::VARCHAR,2,'0') , - LPAD(DAY(expiration)::VARCHAR,2,'0') , - (CASE WHEN type = 'call' THEN 'C' ELSE 'P' END) , - LPAD(((strike*1000)::INTEGER)::VARCHAR,8,'0') - ) - ) VIRTUAL, - underlying VARCHAR, - expiration DATE, - type OPTION_TYPE, - strike FLOAT, - style OPTION_STYLE, - bid FLOAT, - bid_size INTEGER DEFAULT 0, - ask FLOAT, - ask_size INTEGER DEFAULT 0, - volume INTEGER, - open_interest INTEGER, - quote_date DATE, - delta FLOAT, - gamma FLOAT, - theta FLOAT, - vega FLOAT, - implied_volatility FLOAT - );`, - `CREATE TABLE IF NOT EXISTS stock_quote ( - quote_date DATE, - symbol VARCHAR, - open FLOAT DEFAULT 0.0, - high FLOAT DEFAULT 0.0, - low FLOAT DEFAULT 0.0, - close FLOAT DEFAULT 0.0, - volume FLOAT DEFAULT 0.0, - adjust_close FLOAT DEFAULT 0.0 - );` -]; - -try { - const files = await fs.readdir(pathToCsvs); - for (const filename of files){ - const fileExtension = filename.substring(filename.length-11); - if(fileExtension === 'options.csv' || fileExtension === 'options.cvs'){ - const quoteDate = filename.substring(0,10); - statements.push(`INSERT INTO option_quote ( - SELECT - underlying, - expiration, - type, - strike, - style, - bid, - bid_size, - ask, - ask_size, - volume, - open_interest, - quote_date, - delta, - gamma, - theta, - vega, - implied_volatility - FROM read_csv_auto('${pathToCsvs}/${filename}') - );`); - statements.push(`INSERT INTO stock_quote ( - SELECT - '${quoteDate}', - symbol, - open, - high, - low, - close, - volume, - adjust_close - FROM read_csv_auto('${pathToCsvs}/${quoteDate}stocks.cvs') - );`); - } - } - - console.log(statements); - db.exec(statements.join(' '), (err)=>{ - if(err){ - console.error(err); - return; - } - db.all("SELECT contract FROM option_quote WHERE underlying = 'TSLA' LIMIT 10", (err, res)=>{ - if(err){ - console.error(err); - return; - } - console.log(res[0]) - }); - }) -} catch (err) { - console.error(err); -} -/* -db.run(`CREATE TABLE option_quote AS SELECT * FROM read_csv_auto('${filename}')`, (err)=>{ - if (err) { - throw err; - } - db.run(``); - db.all("SELECT count(*) AS count FROM prices WHERE underlying = 'TSLA'", function(err, res) { - if (err) { - throw err; - } - console.log(res[0].count) - }); - -}); -*/ \ No newline at end of file diff --git a/package.json b/package.json index 9517a74..2f66fa7 100644 --- a/package.json +++ b/package.json @@ -6,7 +6,7 @@ }, "dependencies": { "csv-parse": "^5.4.0", - "lowdb": "^6.0.1" + "postgres": "^3.3.5" }, "devDependencies": { "@types/node": "^18.16.3", diff --git a/src/index.ts b/src/index.ts index bd77856..9f347a0 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,28 +1,7 @@ -import { join, dirname } from 'node:path' -import { fileURLToPath } from 'node:url' -import fs from 'node:fs/promises'; +import { ingestOptions, ingestStocks, sql } from './ingest'; -import { Low } from 'lowdb' -import { JSONFile } from 'lowdb/node' +const sourceDataDir = '/home/avraham/programming/calendar-optimizer-csv'; +//await ingestStocks(sourceDataDir); +await ingestOptions(sourceDataDir); -import {parse} from 'csv-parse/sync'; - - -// initialize lowdb: -// db.json file path -const __dirname = dirname(fileURLToPath(import.meta.url)) -const file = join(__dirname, 'db.json') - -// Configure lowdb to write data to JSON file -const adapter = new JSONFile<{ stocks: Array, options: Array }>(file) -const defaultData = { stocks: [], options: [] } -const db = new Low(adapter, defaultData) - -// read each csv, and ingest each row into lowdb -const csvDir = '/home/avraham/programming/calendar-optimizer-csv/'; -const csvFiles = await fs.readdir(csvDir); -await Promise.all(csvFiles.filter((csvFile)=>csvFile.substring(10,16)==='stocks').map(async (csvFile)=>{ - db.data.stocks.push( parse(await fs.readFile(join(csvDir, csvFile))) ); -})); - -await db.write(); \ No newline at end of file +await sql.end({timeout:60}); \ No newline at end of file diff --git a/src/ingest.ts b/src/ingest.ts new file mode 100644 index 0000000..e3af0a5 --- /dev/null +++ b/src/ingest.ts @@ -0,0 +1,172 @@ +import { join, dirname } from 'node:path' +import fs from 'node:fs/promises'; + +import postgres from 'postgres'; + +import {parse} from 'csv-parse/sync'; + +export const sql = postgres({ + host: '127.0.0.1', + port: 5432, + user: 'postgres', + password: 'buginoo' +}); + + +try { +await sql`CREATE TYPE OPTION_TYPE as ENUM ('put', 'call');`; +await sql`CREATE TYPE OPTION_STYLE as ENUM ('A', 'E');`; +} +catch(err){} + +/* +contract VARCHAR(20) GENERATED ALWAYS AS ( + CONCAT( + underlying , + RIGHT(YEAR(expiration)::VARCHAR,2) , + LPAD(MONTH(expiration)::VARCHAR,2,'0') , + LPAD(DAY(expiration)::VARCHAR,2,'0') , + (CASE WHEN type = 'call' THEN 'C' ELSE 'P' END) , + LPAD(((strike*1000)::INTEGER)::VARCHAR,8,'0') + ) + ), + */ +await sql`CREATE TABLE IF NOT EXISTS option_quote ( + underlying VARCHAR(4), + expiration DATE, + type OPTION_TYPE, + strike FLOAT, + style OPTION_STYLE, + bid FLOAT, + bid_size INTEGER DEFAULT 0, + ask FLOAT, + ask_size INTEGER DEFAULT 0, + volume INTEGER, + open_interest INTEGER, + quote_date DATE, + delta FLOAT, + gamma FLOAT, + theta FLOAT, + vega FLOAT, + implied_volatility FLOAT + );`; +await sql`CREATE TABLE IF NOT EXISTS stock_quote ( + quote_date DATE, + symbol VARCHAR, + open FLOAT DEFAULT 0.0, + high FLOAT DEFAULT 0.0, + low FLOAT DEFAULT 0.0, + close FLOAT DEFAULT 0.0, + volume FLOAT DEFAULT 0.0, + adjust_close FLOAT DEFAULT 0.0 +);`; + +export async function ingestStocks(sourceDataDir:string):Promise{ + // read each csv, and ingest each row into postgres: + const csvFiles = await fs.readdir(sourceDataDir); + await Promise.all(csvFiles.filter((csvFile)=>csvFile.substring(10,16)==='stocks').map(async (csvFile)=>{ + const quoteDate = csvFile.substring(0,10); + const rows = parse(await fs.readFile(join(sourceDataDir, csvFile))); + await Promise.all(rows.map(async ([symbol, open, high, low, close, volume, adjust_close])=>{ + open = Number(open); + high = Number(high); + low = Number(low); + close = Number(close); + volume = Number(volume); + adjust_close = Number(adjust_close); + try{ + await sql`insert into "stock_quote" ( + quote_date, + symbol, + open, + high, + low, + close, + volume, + adjust_close + ) values ( + ${quoteDate}, + ${symbol}, + ${open}, + ${high}, + ${low}, + ${close}, + ${volume}, + ${adjust_close || 0} + );`; + console.log(`${quoteDate} ${symbol}`); + } + catch(err){ + console.error(err); + } + })); + })); +} + +export async function ingestOptions(sourceDataDir:string):Promise{ + // read each csv, and ingest each row into postgres: + const csvFiles = await fs.readdir(sourceDataDir); + await Promise.all(csvFiles.filter((csvFile)=>csvFile.substring(10,17)==='options').map(async (csvFile)=>{ + const quoteDate = csvFile.substring(0,10); + const rows = parse(await fs.readFile(join(sourceDataDir, csvFile))); + await Promise.all(rows.map(async ([underlying, expiration, type, strike, style, bid, bid_size, ask, ask_size, volume, open_interest, quote_date, delta, gamma, theta, vega, implied_volatility])=>{ + expiration=Number(expiration); + strike=Number(strike); + bid=Number(bid); + bid_size=Number(bid_size); + ask=Number(ask); + ask_size=Number(ask_size); + volume=Number(volume); + open_interest=Number(open_interest); + quote_date=Number(quote_date) + delta=Number(delta); + gamma=Number(gamma); + theta=Number(theta); + vega=Number(vega); + implied_volatility=Number(implied_volatility); + try{ + await sql`insert into "option_quote" ( + underlying, + expiration, + type, + strike, + style, + bid, + bid_size, + ask, + ask_size, + volume, + open_interest, + quote_date, + delta, + gamma, + theta, + vega, + implied_volatility + ) values ( + ${underlying}, + ${expiration}, + ${type}, + ${strike}, + ${style}, + ${bid}, + ${bid_size}, + ${ask}, + ${ask_size}, + ${volume}, + ${open_interest}, + ${quote_date}, + ${delta}, + ${gamma}, + ${theta}, + ${vega}, + ${implied_volatility} + );`; + console.log(`options ${quoteDate} ${underlying}`); + } + catch(err){ + console.error(err); + } + })); + })); +} \ No newline at end of file diff --git a/startPostgres.sh b/startPostgres.sh new file mode 100755 index 0000000..c554dac --- /dev/null +++ b/startPostgres.sh @@ -0,0 +1,7 @@ +#!/bin/sh +docker run --rm \ + --name 'postgres-calendar-optimizer' \ + --network host \ + -e POSTGRES_PASSWORD=buginoo \ + -v "${PWD}/postgres:/var/lib/postgresql/data" \ + postgres:15.3-bullseye \ No newline at end of file diff --git a/yarn.lock b/yarn.lock index dc26bf1..0c6a366 100644 --- a/yarn.lock +++ b/yarn.lock @@ -150,14 +150,7 @@ esbuild@^0.18.0: "@esbuild/win32-ia32" "0.18.0" "@esbuild/win32-x64" "0.18.0" -lowdb@^6.0.1: - version "6.0.1" - resolved "https://registry.yarnpkg.com/lowdb/-/lowdb-6.0.1.tgz#2c84ae74340fa81ace9c8f17b2fd7fbf9544e7d4" - integrity sha512-1ktuKYLlQzAWwl4/PQkIr8JzNXgcTM6rAhpXaQ6BR+VwI98Q8ZwMFhBOn9u0ldcW3K/WWzhYpS3xyGTshgVGzA== - dependencies: - steno "^3.0.0" - -steno@^3.0.0: - version "3.0.0" - resolved "https://registry.yarnpkg.com/steno/-/steno-3.0.0.tgz#212a11e8ef3646b610efc8953842f556fd0df28f" - integrity sha512-uZtn7Ht9yXLiYgOsmo8btj4+f7VxyYheMt8g6F1ANjyqByQXEE2Gygjgenp3otHH1TlHsS4JAaRGv5wJ1wvMNw== +postgres@^3.3.5: + version "3.3.5" + resolved "https://registry.yarnpkg.com/postgres/-/postgres-3.3.5.tgz#8431605aae2112759d50e362fc08759165455677" + integrity sha512-+JD93VELV9gHkqpV5gdL5/70HdGtEw4/XE1S4BC8f1mcPmdib3K5XsKVbnR1XcAyC41zOnifJ+9YRKxdIsXiUw==