import * as Polygon from "./polygon.js"; import sqlite3 from "sqlite3"; import { open } from "sqlite"; import { clickhouse, query } from "./clickhouse.js"; import { OptionContract } from "./polygon.js"; const sqliteDb = await open({ filename: "/tmp/sync-state.db", driver: sqlite3.Database, }); await sqliteDb.exec(` CREATE TABLE IF NOT EXISTS option_contract_sync_states ( symbol TEXT, date TEXT, status TINYINT, UNIQUE (symbol, date) ON CONFLICT REPLACE ) `); await sqliteDb.exec(` CREATE TABLE IF NOT EXISTS option_contract_aggregates_sync_states ( ticker TEXT, status TINYINT, UNIQUE (ticker) ON CONFLICT REPLACE ) `); export async function getPullOptionContractsState( symbol: string, date: string ) { const state = await sqliteDb.get( ` SELECT * FROM option_contract_sync_states WHERE symbol = :symbol AND date = :date `, { ":symbol": symbol, ":date": date, } ); return state; } const enum OptionContractSyncStatus { STARTED = 0, COMPLETED = 1, } type OptionContractSyncState = { status: OptionContractSyncStatus; }; export async function setPullOptionContractsState( symbol: string, date: string, state: OptionContractSyncState ) { await sqliteDb.run( ` INSERT INTO option_contract_sync_states (symbol, date, status) VALUES (:symbol, :date, :status) `, { ":symbol": symbol, ":date": date, ":status": state.status, } ); } export async function getPullOptionContractAggregatesState(ticker: string) { const state = await sqliteDb.get( ` SELECT * FROM option_contract_aggregates_sync_states WHERE ticker = :ticker `, { ":ticker": ticker, } ); return state; } const enum OptionContractAggregatesSyncStatus { STARTED = 0, COMPLETED = 1, } type OptionContractAggregatesSyncState = { status: OptionContractAggregatesSyncStatus; }; export async function setPullOptionContractAggregatesState( ticker: string, state: OptionContractAggregatesSyncState ) { await sqliteDb.run( ` INSERT INTO option_contract_aggregates_sync_states (ticker, status) VALUES (:ticker, :status) `, { ":ticker": ticker, ":status": state.status, } ); } export async function pullOptionContracts(symbol: string, date: string) { // check if sync was completed: if ( (await getPullOptionContractsState(symbol, date))?.status !== OptionContractSyncStatus.COMPLETED ) { await setPullOptionContractsState(symbol, date, { status: OptionContractSyncStatus.STARTED, }); for await (const batch of Polygon.makeGetOptionContractsIterator( symbol, date )) { console.log(batch.length); await clickhouse.insert({ table: "option_contract_existences", values: batch, format: "JSONEachRow", }); } await setPullOptionContractsState(symbol, date, { status: OptionContractSyncStatus.COMPLETED, }); } } export async function pullOptionContractAggregates( optionContract: OptionContract ) { const ticker = Polygon.optionContractToTicker(optionContract); // check if sync was completed: if ( (await getPullOptionContractAggregatesState(ticker))?.status !== OptionContractAggregatesSyncStatus.COMPLETED ) { await setPullOptionContractAggregatesState(ticker, { status: OptionContractAggregatesSyncStatus.STARTED, }); const { firstDate } = await getOptionContractDateRange(optionContract); for await (const batch of Polygon.makeGetOptionContractAggregatesIterator({ ...optionContract, firstDate, })) { if (batch.length > 0) { console.log( optionContract.symbol, optionContract.expirationDate, optionContract.strike, optionContract.type, new Date(batch[0].tsStart * 1000), new Date(batch[batch.length - 1].tsStart * 1000) ); await clickhouse.insert({ table: "option_contract_aggregates", values: batch, format: "JSONEachRow", }); } } await setPullOptionContractAggregatesState(ticker, { status: OptionContractAggregatesSyncStatus.COMPLETED, }); } } export async function* makeGetOptionContractsIterator( symbol: string, asOfDate: string ) { const limit = 2000; let offset = 0; let batch; do { batch = await query(` SELECT * FROM option_contract_existences WHERE asOfDate = '${asOfDate}' AND symbol = '${symbol}' ORDER BY expirationDate ASC, strike ASC, type ASC LIMIT ${limit} OFFSET ${offset} `); yield batch; offset = offset + limit; } while (batch.length === limit); } export async function pullOptionContractsSince( symbol: string, firstDate: string ) { const currentDateAsDateObject = new Date(firstDate); const yesterdayAsDateObject = new Date(); yesterdayAsDateObject.setUTCDate(yesterdayAsDateObject.getUTCDate() - 1); while (currentDateAsDateObject <= yesterdayAsDateObject) { const currentDate = currentDateAsDateObject.toISOString().substring(0, 10); console.log(`Date: ${currentDate}:`); await pullOptionContracts(symbol, currentDate); currentDateAsDateObject.setUTCDate( currentDateAsDateObject.getUTCDate() + 1 ); } } export async function pullOptionContractAggregatesSince( symbol: string, firstDate: string ) { const currentDateAsDateObject = new Date(firstDate); const yesterdayAsDateObject = new Date(); yesterdayAsDateObject.setUTCDate(yesterdayAsDateObject.getUTCDate() - 1); while (currentDateAsDateObject <= yesterdayAsDateObject) { const currentDate = currentDateAsDateObject.toISOString().substring(0, 10); console.log(`Date: ${currentDate}`); for await (const optionContracts of makeGetOptionContractsIterator( symbol, currentDate )) { for (const optionContract of optionContracts) { await pullOptionContractAggregates(optionContract); } } currentDateAsDateObject.setUTCDate( currentDateAsDateObject.getUTCDate() + 1 ); } } export async function getOptionContractDateRange({ symbol, expirationDate, strike, type, }: OptionContract) { const rows = await query<{ firstDate: string; lastDate: string }>(` SELECT min(asOfDate) AS firstDate, max(asOfDate) AS lastDate FROM option_contract_existences WHERE symbol = '${symbol}' AND expirationDate = '${expirationDate}' AND strike = ${strike} AND type = '${type}' `); return rows[0] || { firstDate: null, lastDate: null }; }