diff --git a/server/package.json b/server/package.json index a61a926..5ceab0b 100644 --- a/server/package.json +++ b/server/package.json @@ -9,10 +9,12 @@ }, "dependencies": { "@clickhouse/client": "^0.2.7", + "@humanwhocodes/env": "^3.0.5", "@sinclair/typebox": "^0.32.5", "@trpc/server": "^10.45.0", "cors": "^2.8.5", "dotenv": "^16.4.1", + "execa": "^9.3.0", "p-all": "^5.0.0", "p-queue": "^8.0.1", "p-retry": "^6.2.0", diff --git a/server/pnpm-lock.yaml b/server/pnpm-lock.yaml index 73c9265..199ff4b 100644 --- a/server/pnpm-lock.yaml +++ b/server/pnpm-lock.yaml @@ -11,6 +11,9 @@ importers: '@clickhouse/client': specifier: ^0.2.7 version: 0.2.7 + '@humanwhocodes/env': + specifier: ^3.0.5 + version: 3.0.5 '@sinclair/typebox': specifier: ^0.32.5 version: 0.32.5 @@ -23,6 +26,9 @@ importers: dotenv: specifier: ^16.4.1 version: 16.4.1 + execa: + specifier: ^9.3.0 + version: 9.3.0 p-all: specifier: ^5.0.0 version: 5.0.0 @@ -211,6 +217,9 @@ packages: '@gar/promisify@1.1.3': resolution: {integrity: sha512-k2Ty1JcVojjJFwrg/ThKi2ujJ7XNLYaFGNB/bWT9wGR+oSMJHMa5w+CUq6p/pVrKeNNgA7pCqEcjSnHVoqJQFw==} + '@humanwhocodes/env@3.0.5': + resolution: {integrity: sha512-IpnujSwQ93i/amSy4GoynqaOAjbYKAI1b28JmPogfEytAh2aSjOE2ZlFnOAuXHUt3OQA41RvU0JL4lzTnVKeIw==} + '@npmcli/fs@1.1.1': resolution: {integrity: sha512-8KG5RD0GVP4ydEzRn/I4BNDuxDtqVbOdm8675T49OIG/NGhaK0pjPX7ZcDlvKYbA+ulvVK3ztfcF4uBdOxuJbQ==} @@ -219,9 +228,16 @@ packages: engines: {node: '>=10'} deprecated: This functionality has been moved to @npmcli/fs + '@sec-ant/readable-stream@0.4.1': + resolution: {integrity: sha512-831qok9r2t8AlxLko40y2ebgSDhenenCatLVeW/uBtnHPyhHOvG0C7TvfgecV+wHzIm5KUICgzmVpWS+IMEAeg==} + '@sinclair/typebox@0.32.5': resolution: {integrity: sha512-0M6FyxZwIEu/Ly6W+l7iYqiZQYJ8khLOJGzg+cxivNKRKqk9hctcuDC0UYI7B9vNgycExA8w40m4M3yDKW37RA==} + '@sindresorhus/merge-streams@4.0.0': + resolution: {integrity: sha512-tlqY9xq5ukxTUZBmoOp+m61cqwQD5pHJtFY3Mn8CA8ps6yghLH/Hw8UPdqg4OLmFW3IFlcXnQNmo/dh8HzXYIQ==} + engines: {node: '>=18'} + '@tootallnate/once@1.1.2': resolution: {integrity: sha512-RbzJvlNzmRq5c3O09UipeuXno4tA1FE6ikOjxZK0tuxVv3412l64l5t1W5pj4+rJq9vpkm/kwiR07aZXnsKPxw==} engines: {node: '>= 6'} @@ -344,6 +360,10 @@ packages: resolution: {integrity: sha512-eTVLrBSt7fjbDygz805pMnstIs2VTBNkRm0qxZd+M7A5XDdxVRWO5MxGBXZhjY4cqLYLdtrGqRf8mBPmzwSpWQ==} engines: {node: '>=4.8'} + cross-spawn@7.0.3: + resolution: {integrity: sha512-iRDPJKUPVEND7dHPO8rkbOnPpyDygcDFtWjpeWNCgy8WP2rXcxXL8TskReQl6OrB2G7+UJrags1q15Fudc7G6w==} + engines: {node: '>= 8'} + debug@4.3.5: resolution: {integrity: sha512-pt0bNEmneDIvdL1Xsd9oDQ/wrQRkXDT4AUWlNZNPKvW5x/jyO9VFXkJUP07vQ2upmw5PlaITaPKc31jK13V+jg==} engines: {node: '>=6.0'} @@ -423,10 +443,18 @@ packages: eventemitter3@5.0.1: resolution: {integrity: sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==} + execa@9.3.0: + resolution: {integrity: sha512-l6JFbqnHEadBoVAVpN5dl2yCyfX28WoBAGaoQcNmLLSedOxTxcn2Qa83s8I/PA5i56vWru2OHOtrwF7Om2vqlg==} + engines: {node: ^18.19.0 || >=20.5.0} + expand-template@2.0.3: resolution: {integrity: sha512-XYfuKMvj4O35f/pOXLObndIRvyQ+/+6AhODh+OKWj9S9498pHHn/IMszH+gt0fBCRWMNfk1ZSp5x3AifmnI2vg==} engines: {node: '>=6'} + figures@6.1.0: + resolution: {integrity: sha512-d+l3qxjSesT4V7v2fh+QnmFnUWv9lSpjarhShNTgBOfA0ttejbQUAlHLitbjkoRiDulW0OPoQPYIGhIC8ohejg==} + engines: {node: '>=18'} + file-uri-to-path@1.0.0: resolution: {integrity: sha512-0Zt+s3L7Vf1biwWZ29aARiVYLx7iMGnEUl9x33fbB/j3jR81u/O2LbqK+Bm1CDSNDKVtJ/YjwY7TUd5SkeLQLw==} @@ -461,6 +489,10 @@ packages: get-intrinsic@1.2.2: resolution: {integrity: sha512-0gSo4ml/0j98Y3lngkFEot/zhiCeWsbYIlZ+uZOVgzLyLaUw7wxUL+nCTP0XJvJg1AXulJRI3UJi8GsbDuxdGA==} + get-stream@9.0.1: + resolution: {integrity: sha512-kVCxPF3vQM/N0B1PmoqVUqgHP+EeVjmZSQn+1oCRPxd2P21P2F19lIgbR3HBosbB1PUhOAoctJnfEn2GbN2eZA==} + engines: {node: '>=18'} + get-symbol-description@1.0.0: resolution: {integrity: sha512-2EmdH1YvIQiZpltCNgkuiUnyukzxM/R6NDJX31Ke3BG1Nq5b0S2PhX59UKi9vZpPDQVdqn+1IcaAwnzTT5vCjw==} engines: {node: '>= 0.4'} @@ -525,6 +557,10 @@ packages: resolution: {integrity: sha512-dFcAjpTQFgoLMzC2VwU+C/CbS7uRL0lWmxDITmqm7C+7F0Odmj6s9l6alZc6AELXhrnggM2CeWSXHGOdX2YtwA==} engines: {node: '>= 6'} + human-signals@7.0.0: + resolution: {integrity: sha512-74kytxOUSvNbjrT9KisAbaTZ/eJwD/LrbM/kh5j0IhPuJzwuA19dWvniFGwBzN9rVjg+O/e+F310PjObDXS+9Q==} + engines: {node: '>=18.18.0'} + humanize-ms@1.2.1: resolution: {integrity: sha512-Fl70vYtsAFb/C06PTS9dZBo7ihau+Tu/DNCk/OyHhea07S+aeMWpFFkUaXRa8fI+ScZbEI8dfSxwY7gxZ9SAVQ==} @@ -607,6 +643,10 @@ packages: resolution: {integrity: sha512-k1U0IRzLMo7ZlYIfzRu23Oh6MiIFasgpb9X76eqfFZAqwH44UI4KTBvBYIZ1dSL9ZzChTB9ShHfLkR4pdW5krQ==} engines: {node: '>= 0.4'} + is-plain-obj@4.1.0: + resolution: {integrity: sha512-+Pgi+vMuUNkJyExiMBt5IlFoMyKnr5zhJ4Uspz58WOhBF5QoIZkFyNHIbBAtHwzVAgk5RtndVNsDRN61/mmDqg==} + engines: {node: '>=12'} + is-regex@1.1.4: resolution: {integrity: sha512-kvRdxDsxZjhzUX07ZnLydzS1TU/TJlTUHHY4YLL87e37oUA49DfkLqgy+VjFocowy29cKvcSiu+kIv728jTTVg==} engines: {node: '>= 0.4'} @@ -614,6 +654,10 @@ packages: is-shared-array-buffer@1.0.2: resolution: {integrity: sha512-sqN2UDu1/0y6uvXyStCOzyhAjCSlHceFoMKJW8W9EU9cvic/QdsZ0kEU93HEy3IUEFZIiH/3w+AH/UQbPHNdhA==} + is-stream@4.0.1: + resolution: {integrity: sha512-Dnz92NInDqYckGEUJv689RbRiTSEHCQ7wOVeALbkOz999YpqT46yMRIGtSNl2iCL1waAZSx40+h59NV/EwzV/A==} + engines: {node: '>=18'} + is-string@1.0.7: resolution: {integrity: sha512-tE2UXzivje6ofPW7l23cjDOMa09gb7xlAqG6jG5ej6uPV32TlWP3NKPigtaGeHNu9fohccRYvIiZMfOOnOYUtg==} engines: {node: '>= 0.4'} @@ -626,6 +670,10 @@ packages: resolution: {integrity: sha512-Z14TF2JNG8Lss5/HMqt0//T9JeHXttXy5pH/DBU4vi98ozO2btxzq9MwYDZYnKwU8nRsz/+GVFVRDq3DkVuSPg==} engines: {node: '>= 0.4'} + is-unicode-supported@2.0.0: + resolution: {integrity: sha512-FRdAyx5lusK1iHG0TWpVtk9+1i+GjrzRffhDg4ovQ7mcidMQ6mj+MhKPmvh7Xwyv5gIS06ns49CA7Sqg7lC22Q==} + engines: {node: '>=18'} + is-weakref@1.0.2: resolution: {integrity: sha512-qctsuLZmIQ0+vSSMfoVvyFe2+GSEvnmZ2ezTup1SBse9+twCCeial6EEi3Nc2KFcf6+qz2FBPnjXsk8xhKSaPQ==} @@ -749,6 +797,10 @@ packages: engines: {node: '>= 4'} hasBin: true + npm-run-path@5.3.0: + resolution: {integrity: sha512-ppwTtiJZq0O/ai0z7yfudtBpWIoxM8yE6nHi1X47eFR2EWORqfbu6CnPlNsjeN683eT0qG6H/Pyf9fCcvjnnnQ==} + engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} + npmlog@6.0.2: resolution: {integrity: sha512-/vBvz5Jfr9dT/aFWd0FIRf+T/Q2WBsLENygUaFUqstqsycmZAP/t5BvFJTK0viFmSUxiUKTUplWy5vt+rvKIxg==} engines: {node: ^12.13.0 || ^14.15.0 || >=16.0.0} @@ -808,6 +860,10 @@ packages: resolution: {integrity: sha512-aOIos8bujGN93/8Ox/jPLh7RwVnPEysynVFE+fQZyg6jKELEHwzgKdLRFHUgXJL6kylijVSBC4BvN9OmsB48Rw==} engines: {node: '>=4'} + parse-ms@4.0.0: + resolution: {integrity: sha512-TXfryirbmq34y8QBwgqCVLi+8oA3oWx2eAnSn62ITyEhEYaWRlVZ2DvMM9eZbMs/RfxPu/PK/aBLyGj4IrqMHw==} + engines: {node: '>=18'} + path-is-absolute@1.0.1: resolution: {integrity: sha512-AVbw3UJ2e9bq64vSaS9Am0fje1Pa8pbGqTTsmXfaIiMpnr5DlDhfJOuLj9Sf95ZPVDAUerDfEk88MPmPe7UCQg==} engines: {node: '>=0.10.0'} @@ -816,6 +872,14 @@ packages: resolution: {integrity: sha512-fEHGKCSmUSDPv4uoj8AlD+joPlq3peND+HRYyxFz4KPw4z926S/b8rIuFs2FYJg3BwsxJf6A9/3eIdLaYC+9Dw==} engines: {node: '>=4'} + path-key@3.1.1: + resolution: {integrity: sha512-ojmeN0qd+y0jszEtoY48r0Peq5dwMEkIlCOu6Q5f41lfkswXuKtYrhgoTpLnyIcHm24Uhqx+5Tqm2InSwLhE6Q==} + engines: {node: '>=8'} + + path-key@4.0.0: + resolution: {integrity: sha512-haREypq7xkM7ErfgIyA0z+Bj4AGKlMSdlQE2jvJo6huWD1EdkKYV+G/T4nq0YEF2vgTT8kqMFKo1uHn950r4SQ==} + engines: {node: '>=12'} + path-parse@1.0.7: resolution: {integrity: sha512-LDJzPVEEEPR+y48z93A0Ed0yXb8pAByGWo/k5YYdYgpY2/2EsOsksJrq7lOHxryrVOn1ejG6oAp8ahvOIQD8sw==} @@ -837,6 +901,10 @@ packages: engines: {node: '>=10'} hasBin: true + pretty-ms@9.0.0: + resolution: {integrity: sha512-E9e9HJ9R9NasGOgPaPE8VMeiPKAyWR5jcFpNnwIejslIhWqdqOrb2wShBsncMPUb+BcCd2OPYfh7p2W6oemTng==} + engines: {node: '>=18'} + promise-inflight@1.0.1: resolution: {integrity: sha512-6zWPyEOFaQBJYcGMHBKTKJ3u6TBsnMFOIZSa6ce1e/ZrrsOlnHRHbabMjLiBYKp+n44X9eUI6VUPaukCXHuG4g==} peerDependencies: @@ -922,10 +990,18 @@ packages: resolution: {integrity: sha512-EV3L1+UQWGor21OmnvojK36mhg+TyIKDh3iFBKBohr5xeXIhNBcx8oWdgkTEEQ+BEFFYdLRuqMfd5L84N1V5Vg==} engines: {node: '>=0.10.0'} + shebang-command@2.0.0: + resolution: {integrity: sha512-kHxr2zZpYtdmrN1qDjrrX/Z1rR1kG8Dx+gkpK1G4eXmvXswmcE1hTWBWYUzlraYw1/yZp6YuDY77YtvbN0dmDA==} + engines: {node: '>=8'} + shebang-regex@1.0.0: resolution: {integrity: sha512-wpoSFAxys6b2a2wHZ1XpDSgD7N9iVjg29Ph9uV/uaP9Ex/KXlkTZTeddxDPSYQpgvzKLGJke2UU0AzoGCjNIvQ==} engines: {node: '>=0.10.0'} + shebang-regex@3.0.0: + resolution: {integrity: sha512-7++dFhtcx3353uBaq8DDR4NuxBetBzC7ZQOhmTQInHEd6bSrXdiEyzCvG07Z44UYdLShWUyXt5M/yhz8ekcb1A==} + engines: {node: '>=8'} + shell-quote@1.8.1: resolution: {integrity: sha512-6j1W9l1iAs/4xYBI1SYOVZyFcCis9b4KCLQ8fgAGG07QvzaRLVVRQvAy85yNmmZSjYjg4MWh4gNvlPujU/5LpA==} @@ -935,6 +1011,10 @@ packages: signal-exit@3.0.7: resolution: {integrity: sha512-wnD2ZE+l+SPC/uoS0vXeE9L1+0wuaMqKlfz9AMUo38JsyLSBWSFcHR1Rri62LZc12vLr1gb3jl7iwQhgwpAbGQ==} + signal-exit@4.1.0: + resolution: {integrity: sha512-bzyZ1e88w9O1iNJbKnOlvYTrWPDl46O1bG0D3XInv+9tkPrxrN8jUUTiFlDkkmKWgn1M6CfIA13SuGqOa9Korw==} + engines: {node: '>=14'} + simple-concat@1.0.1: resolution: {integrity: sha512-cSFtAPtRhljv69IK0hTVZQ+OfE9nePi/rtJmw5UjHeVyVroEqJXP1sFztKUy1qU+xvz3u/sfYJLa947b7nAN2Q==} @@ -1007,6 +1087,10 @@ packages: resolution: {integrity: sha512-vavAMRXOgBVNF6nyEEmL3DBK19iRpDcoIwW+swQ+CbGiu7lju6t+JklA1MHweoWtadgt4ISVUsXLyDq34ddcwA==} engines: {node: '>=4'} + strip-final-newline@4.0.0: + resolution: {integrity: sha512-aulFJcD6YK8V1G7iRB5tigAP4TsHBZZrOV8pjV++zdUwmeV8uzbY7yn6h9MswN62adStNZFuCIx4haBnRuMDaw==} + engines: {node: '>=18'} + strip-json-comments@2.0.1: resolution: {integrity: sha512-4gB8na07fecVVkOI6Rs4e7T6NOTki5EmL7TUduTs6bu3EdnSycntVJ4re8kgZA+wx9IueI2Y11bfbgwtzuE0KQ==} engines: {node: '>=0.10.0'} @@ -1100,6 +1184,10 @@ packages: yallist@4.0.0: resolution: {integrity: sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==} + yoctocolors@2.1.0: + resolution: {integrity: sha512-FsQpXXeOEe05tcJN4Z2eicuC6+6KiJdBbPOAChanSkwwjZ277XGsh8wh/HaPuGeifTiw/7dgAzabitu2bnDvRg==} + engines: {node: '>=18'} + snapshots: '@clickhouse/client-common@0.2.7': {} @@ -1180,6 +1268,8 @@ snapshots: '@gar/promisify@1.1.3': optional: true + '@humanwhocodes/env@3.0.5': {} + '@npmcli/fs@1.1.1': dependencies: '@gar/promisify': 1.1.3 @@ -1192,8 +1282,12 @@ snapshots: rimraf: 3.0.2 optional: true + '@sec-ant/readable-stream@0.4.1': {} + '@sinclair/typebox@0.32.5': {} + '@sindresorhus/merge-streams@4.0.0': {} + '@tootallnate/once@1.1.2': optional: true @@ -1357,6 +1451,12 @@ snapshots: shebang-command: 1.2.0 which: 1.3.1 + cross-spawn@7.0.3: + dependencies: + path-key: 3.1.1 + shebang-command: 2.0.0 + which: 2.0.2 + debug@4.3.5: dependencies: ms: 2.1.2 @@ -1493,8 +1593,27 @@ snapshots: eventemitter3@5.0.1: {} + execa@9.3.0: + dependencies: + '@sindresorhus/merge-streams': 4.0.0 + cross-spawn: 7.0.3 + figures: 6.1.0 + get-stream: 9.0.1 + human-signals: 7.0.0 + is-plain-obj: 4.1.0 + is-stream: 4.0.1 + npm-run-path: 5.3.0 + pretty-ms: 9.0.0 + signal-exit: 4.1.0 + strip-final-newline: 4.0.0 + yoctocolors: 2.1.0 + expand-template@2.0.3: {} + figures@6.1.0: + dependencies: + is-unicode-supported: 2.0.0 + file-uri-to-path@1.0.0: {} for-each@0.3.3: @@ -1540,6 +1659,11 @@ snapshots: has-symbols: 1.0.3 hasown: 2.0.0 + get-stream@9.0.1: + dependencies: + '@sec-ant/readable-stream': 0.4.1 + is-stream: 4.0.1 + get-symbol-description@1.0.0: dependencies: call-bind: 1.0.5 @@ -1612,6 +1736,8 @@ snapshots: - supports-color optional: true + human-signals@7.0.0: {} + humanize-ms@1.2.1: dependencies: ms: 2.1.3 @@ -1696,6 +1822,8 @@ snapshots: dependencies: has-tostringtag: 1.0.0 + is-plain-obj@4.1.0: {} + is-regex@1.1.4: dependencies: call-bind: 1.0.5 @@ -1705,6 +1833,8 @@ snapshots: dependencies: call-bind: 1.0.5 + is-stream@4.0.1: {} + is-string@1.0.7: dependencies: has-tostringtag: 1.0.0 @@ -1717,6 +1847,8 @@ snapshots: dependencies: which-typed-array: 1.1.13 + is-unicode-supported@2.0.0: {} + is-weakref@1.0.2: dependencies: call-bind: 1.0.5 @@ -1879,6 +2011,10 @@ snapshots: shell-quote: 1.8.1 string.prototype.padend: 3.1.5 + npm-run-path@5.3.0: + dependencies: + path-key: 4.0.0 + npmlog@6.0.2: dependencies: are-we-there-yet: 3.0.1 @@ -1937,11 +2073,17 @@ snapshots: error-ex: 1.3.2 json-parse-better-errors: 1.0.2 + parse-ms@4.0.0: {} + path-is-absolute@1.0.1: optional: true path-key@2.0.1: {} + path-key@3.1.1: {} + + path-key@4.0.0: {} + path-parse@1.0.7: {} path-type@3.0.0: @@ -1967,6 +2109,10 @@ snapshots: tar-fs: 2.1.1 tunnel-agent: 0.6.0 + pretty-ms@9.0.0: + dependencies: + parse-ms: 4.0.0 + promise-inflight@1.0.1: optional: true @@ -2064,8 +2210,14 @@ snapshots: dependencies: shebang-regex: 1.0.0 + shebang-command@2.0.0: + dependencies: + shebang-regex: 3.0.0 + shebang-regex@1.0.0: {} + shebang-regex@3.0.0: {} + shell-quote@1.8.1: {} side-channel@1.0.4: @@ -2077,6 +2229,8 @@ snapshots: signal-exit@3.0.7: optional: true + signal-exit@4.1.0: {} + simple-concat@1.0.1: {} simple-get@4.0.1: @@ -2181,6 +2335,8 @@ snapshots: strip-bom@3.0.0: {} + strip-final-newline@4.0.0: {} + strip-json-comments@2.0.1: {} supports-color@5.5.0: @@ -2297,7 +2453,6 @@ snapshots: which@2.0.2: dependencies: isexe: 2.0.0 - optional: true wide-align@1.1.5: dependencies: @@ -2307,3 +2462,5 @@ snapshots: wrappy@1.0.2: {} yallist@4.0.0: {} + + yoctocolors@2.1.0: {} diff --git a/server/src/lib/sync.ts b/server/src/lib/sync.ts index bd0dc43..19f44fd 100644 --- a/server/src/lib/sync.ts +++ b/server/src/lib/sync.ts @@ -4,6 +4,17 @@ import { open } from "sqlite"; import { clickhouse, query } from "./clickhouse.js"; import { OptionContract } from "./polygon.js"; import pRetry from "p-retry"; +import { createReadStream } from "node:fs"; +import zlib from "node:zlib"; +import { Transform, Writable } from "stream"; +import { pipeline } from "stream/promises"; +import { execa } from "execa"; +import { rm } from "node:fs/promises"; +import { Env } from "@humanwhocodes/env"; + +const env = new Env(); + +const { POLYGON_S3_ACCESS_KEY_ID, POLYGON_S3_SECRET_ACCESS_KEY } = env.required; const sqliteDb = await open({ filename: "/tmp/sync-state.db", @@ -263,3 +274,162 @@ export async function getOptionContractDateRange({ `); return rows[0] || { firstDate: null, lastDate: null }; } + +function transformCsvLineToObject(line: string) { + const [ + ticker, + volume, + open, + close, + high, + low, + window_start, + numberOfTransactions, + ] = line.split(","); + const symbol = ticker.substring(2, ticker.length - 15); + const tickerDate = ticker.substring(ticker.length - 15, ticker.length - 9); + const expirationDate = `20${tickerDate.substring( + 0, + 2 + )}-${tickerDate.substring(2, 4)}-${tickerDate.substring(4)}`; + const type = + ticker.substring(ticker.length - 9, ticker.length - 8) === "C" + ? "call" + : "put"; + const strike = parseInt(ticker.substring(ticker.length - 8)) / 1000; + /** UNIX time in seconds */ + const tsStart = parseInt(window_start.substring(0, window_start.length - 9)); + return { + symbol, + expirationDate, + strike, + type, + tsStart, + open, + close, + low, + high, + volume, + volumeWeightedPrice: 0, + }; +} + +export async function uploadCsvToClickhouse(filename: string) { + let buf = ""; + for await (const chunk of createReadStream(filename, { + start: 60 /* skip header */, + highWaterMark: 1024 * 1024, + })) { + const lines = buf.concat(chunk).split(/\r?\n/); + buf = lines.pop() ?? ""; + + await clickhouse.insert({ + table: "option_contract_aggregates", + values: lines.map(transformCsvLineToObject), + format: "JSONEachRow", + }); + } + if (buf.length) { + // last line, if file does not end with newline + await clickhouse.insert({ + table: "option_contract_aggregates", + values: [transformCsvLineToObject(buf)], + format: "JSONEachRow", + }); + } +} + +const accessKeyId = POLYGON_S3_ACCESS_KEY_ID; +const secretAccessKey = POLYGON_S3_SECRET_ACCESS_KEY; +// const X_Amz_Algorithm = "AWS4-HMAC-SHA256"; +// const X_Amz_Credential = +// "bfe011b0-01e7-4c16-aedf-52cb83722c36/20240702/us-east-1/s3/aws4_request"; +// const X_Amz_Expires = "900"; +// const X_Amz_SignedHeaders = "host"; +// const X_Amz_Signature = +// "e9fd0ac569c4d5fd5757ba4104e95ce4cfd0c17b16648ec32f978265d4188f37"; +export async function ingestOptionContractAggregateFlatfile(date: string) { + let buf = ""; + let skippedFirstLine = false; + const [year, month, day] = date.split("-"); + const localFilename = `/tmp/${date}.csv.gz`; + try { + await execa({ + env: { + AWS_ACCESS_KEY_ID: accessKeyId, + AWS_SECRET_ACCESS_KEY: secretAccessKey, + S3_ENDPOINT_URL: "https://files.polygon.io", + AWS_REGION: "us-east-1", + }, + })("s5cmd", [ + "cp", + `s3://flatfiles/us_options_opra/minute_aggs_v1/${year}/${month}/${date}.csv.gz`, + localFilename, + ]); + } catch (err) { + return; + // if (err.includes("status code: 404")) { + // return; + // } else { + // console.error("error downloading flatfile from polygon s3", err); + // throw err; + // } + } + + try { + await pipeline( + createReadStream(localFilename, { + highWaterMark: 1024 * 1024, + }), + zlib.createGunzip(), + new Transform({ + transform(chunk, encoding, next) { + const lines = buf.concat(chunk).split(/\r?\n/); + if (!skippedFirstLine) { + lines.shift(); + skippedFirstLine = true; + } + buf = lines.pop(); + next(null, lines); + }, + objectMode: true, + }), + new Writable({ + objectMode: true, + async write(lines, encoding, next) { + // console.log(lines.map(transformCsvLineToObject)); + await clickhouse.insert({ + table: "option_contract_aggregates", + values: lines.map(transformCsvLineToObject), + format: "JSONEachRow", + }); + next(); + }, + }) + ); + } catch (err) { + console.error(err); + } + await rm(localFilename); + console.log("done"); +} + +export async function pullOptionContractAggregatesFromFlatFileSince( + firstDate: string, + lastDate?: string +) { + const currentDateAsDateObject = new Date(firstDate); + const yesterdayAsDateObject = new Date(); + yesterdayAsDateObject.setUTCDate(yesterdayAsDateObject.getUTCDate() - 1); + const lastDateAsDateObject = lastDate + ? new Date(lastDate) + : yesterdayAsDateObject; + while (currentDateAsDateObject <= lastDateAsDateObject) { + const currentDate = currentDateAsDateObject.toISOString().substring(0, 10); + console.log(`Date: ${currentDate}`); + await ingestOptionContractAggregateFlatfile(currentDate); + currentDateAsDateObject.setUTCDate( + currentDateAsDateObject.getUTCDate() + 1 + ); + } +} diff --git a/server/tables.sql b/server/tables.sql index 76f22b5..c9fc63f 100644 --- a/server/tables.sql +++ b/server/tables.sql @@ -12,16 +12,7 @@ CREATE TABLE symbols ENGINE MergeTree() ORDER BY (symbol); -CREATE TABLE option_contract_sync_statuses -( - symbol String, - asOfDate Date, - status ENUM('not-started','pending','done') -) -ENGINE MergeTree() -ORDER BY (asOfDate, symbol); - -CREATE TABLE option_contracts +CREATE TABLE option_contract_existences ( asOfDate Date, symbol LowCardinality(String), @@ -29,60 +20,39 @@ CREATE TABLE option_contracts strike Float32, type ENUM('call', 'put') ) -ENGINE MergeTree() +ENGINE ReplacingMergeTree() PRIMARY KEY (asOfDate, symbol) ORDER BY (asOfDate, symbol, expirationDate, strike, type); - --- BEGIN: Option Contract Quotes -CREATE TABLE option_aggregate_sync_statuses +CREATE TABLE option_contracts ( - asOfDate Date, symbol LowCardinality(String), expirationDate Date, strike Float32, - type ENUM('call', 'put'), - status ENUM('not-started','pending','done'), - ts DateTime64 DEFAULT now() + type ENUM('call', 'put') ) -ENGINE MergeTree() -ORDER BY (asOfDate, symbol, expirationDate, strike, type, ts); -CREATE MATERIALIZED VIEW option_aggregate_sync_statuses_mv -TO option_aggregate_sync_statuses +ENGINE ReplacingMergeTree() +PRIMARY KEY (symbol, expirationDate) +ORDER BY (symbol, expirationDate, strike, type); + +CREATE MATERIALIZED VIEW option_contracts_mv +TO option_contracts AS SELECT DISTINCT ON ( - asOfDate, symbol, expirationDate, strike, type ) - asOfDate, symbol, expirationDate, strike, - type, - 'not-started' as status, - now() as ts -FROM option_contracts; + type +FROM option_contract_existences; -CREATE TABLE amg_option_aggregate_sync_statuses ( - asOfDate Date, - symbol LowCardinality(String), - expirationDate Date, - strike Float32, - type ENUM('call', 'put'), - status SimpleAggregateFunction(anyLast, ENUM('not-started','pending','done')), - ts DateTime64 DEFAULT now() -) -ENGINE=AggregatingMergeTree -ORDER BY (asOfDate, symbol, expirationDate, strike, type); -INSERT INTO amg_option_aggregate_sync_statuses -SELECT asOfDate, symbol, expirationDate, strike, type, status, ts -FROM option_aggregate_sync_statuses -ORDER BY asOfDate, symbol, expirationDate, strike, type, ts; +-- BEGIN: Option Contract Quotes -- END: Option Contract Quotes @@ -122,6 +92,66 @@ ALTER TABLE option_aggregates ADD INDEX idx_expirationDate expirationDate TYPE m ALTER TABLE option_aggregates ADD INDEX idx_strike strike TYPE minmax GRANULARITY 2; ALTER TABLE option_aggregates ADD INDEX idx_tsStart tsStart TYPE minmax GRANULARITY 2; +CREATE TABLE option_contract_aggregates +( + symbol LowCardinality(String), + expirationDate Date, + strike Float32, + type Enum('call', 'put'), + + tsStart DateTime32 CODEC(DoubleDelta(1), ZSTD), + open Float32 CODEC(Delta(2), ZSTD), + close Float32 CODEC(Delta(2), ZSTD), + low Float32 CODEC(Delta(2), ZSTD), + high Float32 CODEC(Delta(2), ZSTD), + volume UInt32 CODEC(T64), + volumeWeightedPrice Float32 CODEC(Delta(2), ZSTD) +) +ENGINE ReplacingMergeTree() +ORDER BY (symbol, expirationDate, strike, type, tsStart) + +CREATE TABLE option_histories_last_day +( + symbol LowCardinality(String), + expirationDate Date, + strike Float64, + type Enum('call', 'put'), + + tsStart DateTime32, + open Float64, + minutesToFront UInt16, + underlyingPrice Float64, + strikePercentageFromUnderlyingPrice Float64 +) +ENGINE MergeTree() +ORDER BY (symbol, minutesToFront, strikePercentageFromUnderlyingPrice) + +INSERT INTO option_histories_last_day +SELECT + option_aggregates.symbol as symbol, + option_aggregates.expirationDate as expirationDate, + option_aggregates.strike as strike, + option_aggregates.type as type, + option_aggregates.tsStart as tsStart, + option_aggregates.open as open, + date_diff('minute', tsStart, timestamp_add(expirationDate, INTERVAL 16 HOUR)) as minutesToFront, + stock_aggregates.open as underlyingPrice Float64, + (strike-underlyingPrice)/underlyingPrice as strikePercentageFromUnderlyingPrice Float64 +FROM ( + SELECT + symbol, + expirationDate, + strike, + type, + tsStart, + open + FROM option_aggregates + WHERE toDate(tsStart) = expirationDate +) as option_aggregates +INNER JOIN stock_aggregates +ON option_aggregates.symbol = stock_aggregates.symbol +AND option_aggregates.tsStart = stock_aggregates.tsStart + CREATE TABLE option_histories ( symbol LowCardinality(String),