stream-ingest flat files from polygon; add a few tables

This commit is contained in:
Avraham Sakal
2024-07-05 15:33:54 -04:00
parent c6e5c76952
commit cb7dbc29e8
4 changed files with 403 additions and 44 deletions
+170
View File
@@ -4,6 +4,17 @@ import { open } from "sqlite";
import { clickhouse, query } from "./clickhouse.js";
import { OptionContract } from "./polygon.js";
import pRetry from "p-retry";
import { createReadStream } from "node:fs";
import zlib from "node:zlib";
import { Transform, Writable } from "stream";
import { pipeline } from "stream/promises";
import { execa } from "execa";
import { rm } from "node:fs/promises";
import { Env } from "@humanwhocodes/env";
const env = new Env();
const { POLYGON_S3_ACCESS_KEY_ID, POLYGON_S3_SECRET_ACCESS_KEY } = env.required;
const sqliteDb = await open({
filename: "/tmp/sync-state.db",
@@ -263,3 +274,162 @@ export async function getOptionContractDateRange({
`);
return rows[0] || { firstDate: null, lastDate: null };
}
function transformCsvLineToObject(line: string) {
const [
ticker,
volume,
open,
close,
high,
low,
window_start,
numberOfTransactions,
] = line.split(",");
const symbol = ticker.substring(2, ticker.length - 15);
const tickerDate = ticker.substring(ticker.length - 15, ticker.length - 9);
const expirationDate = `20${tickerDate.substring(
0,
2
)}-${tickerDate.substring(2, 4)}-${tickerDate.substring(4)}`;
const type =
ticker.substring(ticker.length - 9, ticker.length - 8) === "C"
? "call"
: "put";
const strike = parseInt(ticker.substring(ticker.length - 8)) / 1000;
/** UNIX time in seconds */
const tsStart = parseInt(window_start.substring(0, window_start.length - 9));
return {
symbol,
expirationDate,
strike,
type,
tsStart,
open,
close,
low,
high,
volume,
volumeWeightedPrice: 0,
};
}
export async function uploadCsvToClickhouse(filename: string) {
let buf = "";
for await (const chunk of createReadStream(filename, {
start: 60 /* skip header */,
highWaterMark: 1024 * 1024,
})) {
const lines = buf.concat(chunk).split(/\r?\n/);
buf = lines.pop() ?? "";
await clickhouse.insert({
table: "option_contract_aggregates",
values: lines.map(transformCsvLineToObject),
format: "JSONEachRow",
});
}
if (buf.length) {
// last line, if file does not end with newline
await clickhouse.insert({
table: "option_contract_aggregates",
values: [transformCsvLineToObject(buf)],
format: "JSONEachRow",
});
}
}
const accessKeyId = POLYGON_S3_ACCESS_KEY_ID;
const secretAccessKey = POLYGON_S3_SECRET_ACCESS_KEY;
// const X_Amz_Algorithm = "AWS4-HMAC-SHA256";
// const X_Amz_Credential =
// "bfe011b0-01e7-4c16-aedf-52cb83722c36/20240702/us-east-1/s3/aws4_request";
// const X_Amz_Expires = "900";
// const X_Amz_SignedHeaders = "host";
// const X_Amz_Signature =
// "e9fd0ac569c4d5fd5757ba4104e95ce4cfd0c17b16648ec32f978265d4188f37";
export async function ingestOptionContractAggregateFlatfile(date: string) {
let buf = "";
let skippedFirstLine = false;
const [year, month, day] = date.split("-");
const localFilename = `/tmp/${date}.csv.gz`;
try {
await execa({
env: {
AWS_ACCESS_KEY_ID: accessKeyId,
AWS_SECRET_ACCESS_KEY: secretAccessKey,
S3_ENDPOINT_URL: "https://files.polygon.io",
AWS_REGION: "us-east-1",
},
})("s5cmd", [
"cp",
`s3://flatfiles/us_options_opra/minute_aggs_v1/${year}/${month}/${date}.csv.gz`,
localFilename,
]);
} catch (err) {
return;
// if (err.includes("status code: 404")) {
// return;
// } else {
// console.error("error downloading flatfile from polygon s3", err);
// throw err;
// }
}
try {
await pipeline(
createReadStream(localFilename, {
highWaterMark: 1024 * 1024,
}),
zlib.createGunzip(),
new Transform({
transform(chunk, encoding, next) {
const lines = buf.concat(chunk).split(/\r?\n/);
if (!skippedFirstLine) {
lines.shift();
skippedFirstLine = true;
}
buf = lines.pop();
next(null, lines);
},
objectMode: true,
}),
new Writable({
objectMode: true,
async write(lines, encoding, next) {
// console.log(lines.map(transformCsvLineToObject));
await clickhouse.insert({
table: "option_contract_aggregates",
values: lines.map(transformCsvLineToObject),
format: "JSONEachRow",
});
next();
},
})
);
} catch (err) {
console.error(err);
}
await rm(localFilename);
console.log("done");
}
export async function pullOptionContractAggregatesFromFlatFileSince(
firstDate: string,
lastDate?: string
) {
const currentDateAsDateObject = new Date(firstDate);
const yesterdayAsDateObject = new Date();
yesterdayAsDateObject.setUTCDate(yesterdayAsDateObject.getUTCDate() - 1);
const lastDateAsDateObject = lastDate
? new Date(lastDate)
: yesterdayAsDateObject;
while (currentDateAsDateObject <= lastDateAsDateObject) {
const currentDate = currentDateAsDateObject.toISOString().substring(0, 10);
console.log(`Date: ${currentDate}`);
await ingestOptionContractAggregateFlatfile(currentDate);
currentDateAsDateObject.setUTCDate(
currentDateAsDateObject.getUTCDate() + 1
);
}
}