diff --git a/Cargo.lock b/Cargo.lock index b722e3f..b105e3c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -129,7 +129,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28701885014b411b29369a0061b8af72eab5bd2280e40f399ceb33e52a1b0d68" dependencies = [ "agave-feature-set", - "bincode", + "bincode 1.3.3", "digest 0.10.7", "ed25519-dalek 1.0.1", "libsecp256k1", @@ -162,7 +162,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fb9e9323670f5f83063fb645f133404c57d1eeebef35eea029d23ec745987083" dependencies = [ "agave-fs", - "bincode", + "bincode 1.3.3", "bzip2", "crossbeam-channel", "log", @@ -191,7 +191,7 @@ version = "3.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "776409f32d798250aa57e4a0e8e19cc3b5c477fbce2c3ae309f69160470f3e2b" dependencies = [ - "bincode", + "bincode 1.3.3", "libsecp256k1", "num-traits", "solana-account", @@ -276,6 +276,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" dependencies = [ "cfg-if 1.0.4", + "const-random", "getrandom 0.3.4", "once_cell", "version_check", @@ -659,6 +660,214 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +[[package]] +name = "arrow" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e833808ff2d94ed40d9379848a950d995043c7fb3e81a30b383f4c6033821cc" +dependencies = [ + "arrow-arith", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-csv", + "arrow-data", + "arrow-ipc", + "arrow-json", + "arrow-ord", + "arrow-row", + "arrow-schema", + "arrow-select", + "arrow-string", +] + +[[package]] +name = "arrow-arith" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad08897b81588f60ba983e3ca39bda2b179bdd84dced378e7df81a5313802ef8" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "num 0.4.3", +] + +[[package]] +name = "arrow-array" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8548ca7c070d8db9ce7aa43f37393e4bfcf3f2d3681df278490772fd1673d08d" +dependencies = [ + "ahash 0.8.12", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "half 2.7.1", + "hashbrown 0.16.1", + "num 0.4.3", +] + +[[package]] +name = "arrow-buffer" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e003216336f70446457e280807a73899dd822feaf02087d31febca1363e2fccc" +dependencies = [ + "bytes", + "half 2.7.1", + "num 0.4.3", +] + +[[package]] +name = "arrow-cast" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "919418a0681298d3a77d1a315f625916cb5678ad0d74b9c60108eb15fd083023" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "atoi", + "base64 0.22.1", + "chrono", + "half 2.7.1", + "lexical-core", + "num 0.4.3", + "ryu", +] + +[[package]] +name = "arrow-csv" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa9bf02705b5cf762b6f764c65f04ae9082c7cfc4e96e0c33548ee3f67012eb" +dependencies = [ + "arrow-array", + "arrow-cast", + "arrow-schema", + "chrono", + "csv", + "csv-core", + "regex", +] + +[[package]] +name = "arrow-data" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5c64fff1d142f833d78897a772f2e5b55b36cb3e6320376f0961ab0db7bd6d0" +dependencies = [ + "arrow-buffer", + "arrow-schema", + "half 2.7.1", + "num 0.4.3", +] + +[[package]] +name = "arrow-ipc" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d3594dcddccc7f20fd069bc8e9828ce37220372680ff638c5e00dea427d88f5" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "flatbuffers", +] + +[[package]] +name = "arrow-json" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88cf36502b64a127dc659e3b305f1d993a544eab0d48cce704424e62074dc04b" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", + "chrono", + "half 2.7.1", + "indexmap 2.12.1", + "lexical-core", + "memchr", + "num 0.4.3", + "serde", + "serde_json", + "simdutf8", +] + +[[package]] +name = "arrow-ord" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c8f82583eb4f8d84d4ee55fd1cb306720cddead7596edce95b50ee418edf66f" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", +] + +[[package]] +name = "arrow-row" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d07ba24522229d9085031df6b94605e0f4b26e099fb7cdeec37abd941a73753" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "half 2.7.1", +] + +[[package]] +name = "arrow-schema" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3aa9e59c611ebc291c28582077ef25c97f1975383f1479b12f3b9ffee2ffabe" + +[[package]] +name = "arrow-select" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c41dbbd1e97bfcaee4fcb30e29105fb2c75e4d82ae4de70b792a5d3f66b2e7a" +dependencies = [ + "ahash 0.8.12", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "num 0.4.3", +] + +[[package]] +name = "arrow-string" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53f5183c150fbc619eede22b861ea7c0eebed8eaac0333eaa7f6da5205fd504d" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "memchr", + "num 0.4.3", + "regex", + "regex-syntax", +] + [[package]] name = "ascii" version = "0.9.3" @@ -767,6 +976,15 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -799,6 +1017,36 @@ dependencies = [ "cc", ] +[[package]] +name = "aws-config" +version = "1.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0149602eeaf915158e14029ba0c78dedb8c08d554b024d54c8f239aab46511d" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-sdk-sso", + "aws-sdk-ssooidc", + "aws-sdk-sts", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "hex", + "http 1.4.0", + "ring", + "time", + "tokio", + "tracing", + "url 2.5.7", + "zeroize", +] + [[package]] name = "aws-credential-types" version = "1.2.10" @@ -892,6 +1140,73 @@ dependencies = [ "url 2.5.7", ] +[[package]] +name = "aws-sdk-sso" +version = "1.90.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f18e53542c522459e757f81e274783a78f8c81acdfc8d1522ee8a18b5fb1c66" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-ssooidc" +version = "1.92.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "532f4d866012ffa724a4385c82e8dd0e59f0ca0e600f3f22d4c03b6824b34e4a" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-sts" +version = "1.94.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1be6fbbfa1a57724788853a623378223fe828fc4c09b146c992f0c95b6256174" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "fastrand", + "http 0.2.12", + "regex-lite", + "tracing", +] + [[package]] name = "aws-sigv4" version = "1.3.6" @@ -1032,6 +1347,16 @@ dependencies = [ "aws-smithy-runtime-api", ] +[[package]] +name = "aws-smithy-query" +version = "0.60.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d28a63441360c477465f80c7abac3b9c4d075ca638f982e605b7dc2a2c7156c9" +dependencies = [ + "aws-smithy-types", + "urlencoding", +] + [[package]] name = "aws-smithy-runtime" version = "1.9.4" @@ -1258,6 +1583,26 @@ dependencies = [ "serde", ] +[[package]] +name = "bincode" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36eaf5d7b090263e8150820482d5d93cd964a81e4019913c972f4edcc6edb740" +dependencies = [ + "bincode_derive", + "serde", + "unty", +] + +[[package]] +name = "bincode_derive" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf95709a440f45e986983918d0e8a1f30a9b1df04918fc828670606804ac3c09" +dependencies = [ + "virtue", +] + [[package]] name = "bindgen" version = "0.72.1" @@ -1672,6 +2017,46 @@ dependencies = [ "vec_map", ] +[[package]] +name = "clap" +version = "4.5.58" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63be97961acde393029492ce0be7a1af7e323e6bae9511ebfac33751be5e6806" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.58" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f13174bda5dfd69d7e947827e5af4b0f2f94a4a3ee92912fba07a66150f21e2" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim 0.11.1", +] + +[[package]] +name = "clap_derive" +version = "4.5.55" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a92793da1a46a5f2a02a6f4c46c6496b28c43638adea8306fcb0caa1634f24e5" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "syn 2.0.111", +] + +[[package]] +name = "clap_lex" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a822ea5bc7590f9d40f1ba12c0dc3c2760f3482c6984db1573ad11031420831" + [[package]] name = "clickhouse" version = "0.14.1" @@ -1814,6 +2199,26 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom 0.2.16", + "once_cell", + "tiny-keccak", +] + [[package]] name = "const-str" version = "0.4.3" @@ -2010,6 +2415,27 @@ dependencies = [ "subtle", ] +[[package]] +name = "csv" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52cd9d68cf7efc6ddfaaee42e7288d3a99d613d4b50f76ce9827ae0c6e14f938" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde_core", +] + +[[package]] +name = "csv-core" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704a3c26996a80471189265814dbc2c257598b96b8a7feae2d31ace646bb9782" +dependencies = [ + "memchr", +] + [[package]] name = "ctr" version = "0.9.2" @@ -2189,6 +2615,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ececcb659e7ba858fb4f10388c250a7252eb0a27373f1a72b8748afdd248e587" dependencies = [ "powerfmt", + "serde_core", ] [[package]] @@ -2348,6 +2775,12 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" +[[package]] +name = "dyn-clone" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" + [[package]] name = "eager" version = "0.1.0" @@ -2735,6 +3168,16 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "flatbuffers" +version = "25.12.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35f6839d7b3b98adde531effaf34f0c2badc6f4735d26fe74709d8e513a96ef3" +dependencies = [ + "bitflags 2.10.0", + "rustc_version", +] + [[package]] name = "flate2" version = "1.1.5" @@ -2742,6 +3185,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bfe33edd8e85a12a67454e37f8c75e730830d83e313556ab9ebf9ee7fbeb3bfb" dependencies = [ "crc32fast", + "libz-rs-sys", "miniz_oxide", ] @@ -3096,6 +3540,18 @@ version = "1.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b43ede17f21864e81be2fa654110bf1e793774238d86ef8555c37e6519c0403" +[[package]] +name = "half" +version = "2.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ea2d84b969582b4b1864a92dc5d27cd2b77b622a8d79306834f1be5ba20d84b" +dependencies = [ + "cfg-if 1.0.4", + "crunchy", + "num-traits", + "zerocopy", +] + [[package]] name = "hash32" version = "0.3.1" @@ -3176,6 +3632,12 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -3214,6 +3676,40 @@ version = "0.6.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "12cb882ccb290b8646e554b157ab0b71e64e8d5bef775cd66b6531e52d302669" +[[package]] +name = "history-import-old-faithful" +version = "0.4.0" +dependencies = [ + "arrow", + "aws-config", + "aws-sdk-s3", + "bincode 2.0.1", + "bs58", + "clap 4.5.58", + "clickhouse", + "dashmap", + "futures-util", + "jetstreamer", + "jetstreamer-firehose", + "jetstreamer-plugin", + "log", + "parquet", + "serde", + "serde_with", + "solana-account-decoder", + "solana-address 1.1.0", + "solana-hash", + "solana-reward-info", + "solana-transaction", + "solana-transaction-error", + "solana-transaction-status", + "tempfile", + "thiserror 2.0.17", + "tikv-jemallocator", + "tokio", + "zstd", +] + [[package]] name = "hmac" version = "0.8.1" @@ -3683,6 +4179,7 @@ checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" dependencies = [ "autocfg", "hashbrown 0.12.3", + "serde", ] [[package]] @@ -3694,6 +4191,8 @@ dependencies = [ "equivalent", "hashbrown 0.16.1", "rayon", + "serde", + "serde_core", ] [[package]] @@ -3740,6 +4239,12 @@ dependencies = [ "cfg-if 1.0.4", ] +[[package]] +name = "integer-encoding" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" + [[package]] name = "io-uring" version = "0.7.11" @@ -3833,7 +4338,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "base64 0.22.1", - "bincode", + "bincode 1.3.3", "cid", "crc", "crossbeam-channel", @@ -4133,6 +4638,63 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" +[[package]] +name = "lexical-core" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d8d125a277f807e55a77304455eb7b1cb52f2b18c143b60e766c120bd64a594" +dependencies = [ + "lexical-parse-float", + "lexical-parse-integer", + "lexical-util", + "lexical-write-float", + "lexical-write-integer", +] + +[[package]] +name = "lexical-parse-float" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52a9f232fbd6f550bc0137dcb5f99ab674071ac2d690ac69704593cb4abbea56" +dependencies = [ + "lexical-parse-integer", + "lexical-util", +] + +[[package]] +name = "lexical-parse-integer" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a7a039f8fb9c19c996cd7b2fcce303c1b2874fe1aca544edc85c4a5f8489b34" +dependencies = [ + "lexical-util", +] + +[[package]] +name = "lexical-util" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2604dd126bb14f13fb5d1bd6a66155079cb9fa655b37f875b3a742c705dbed17" + +[[package]] +name = "lexical-write-float" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50c438c87c013188d415fbabbb1dceb44249ab81664efbd31b14ae55dabb6361" +dependencies = [ + "lexical-util", + "lexical-write-integer", +] + +[[package]] +name = "lexical-write-integer" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "409851a618475d2d5796377cad353802345cba92c867d9fbcde9cf4eac4e14df" +dependencies = [ + "lexical-util", +] + [[package]] name = "libc" version = "0.2.177" @@ -4228,6 +4790,15 @@ dependencies = [ "libsecp256k1-core", ] +[[package]] +name = "libz-rs-sys" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c10501e7805cee23da17c7790e59df2870c0d4043ec6d03f67d31e2b53e77415" +dependencies = [ + "zlib-rs", +] + [[package]] name = "libz-sys" version = "1.1.23" @@ -4339,6 +4910,15 @@ dependencies = [ "libc", ] +[[package]] +name = "lz4_flex" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08ab2867e3eeeca90e844d1940eab391c9dc5228783db2ed999acbc0a9ed375a" +dependencies = [ + "twox-hash", +] + [[package]] name = "macro_rules_attribute" version = "0.2.2" @@ -4624,10 +5204,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b8536030f9fea7127f841b45bb6243b27255787fb4eb83958aa1ef9d2fdc0c36" dependencies = [ "num-bigint 0.2.6", - "num-complex", + "num-complex 0.2.4", "num-integer", "num-iter", - "num-rational", + "num-rational 0.2.4", + "num-traits", +] + +[[package]] +name = "num" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35bd024e8b2ff75562e5f34e7f4905839deb4b22955ef5e73d2fea1b9813cb23" +dependencies = [ + "num-bigint 0.4.6", + "num-complex 0.4.6", + "num-integer", + "num-iter", + "num-rational 0.4.2", "num-traits", ] @@ -4662,6 +5256,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-complex" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495" +dependencies = [ + "num-traits", +] + [[package]] name = "num-conv" version = "0.1.0" @@ -4711,6 +5314,17 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-rational" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824" +dependencies = [ + "num-bigint 0.4.6", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -4718,6 +5332,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" dependencies = [ "autocfg", + "libm", ] [[package]] @@ -4854,6 +5469,15 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "ordered-float" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c" +dependencies = [ + "num-traits", +] + [[package]] name = "outref" version = "0.5.2" @@ -4934,6 +5558,41 @@ dependencies = [ "windows-link", ] +[[package]] +name = "parquet" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0dbd48ad52d7dccf8ea1b90a3ddbfaea4f69878dd7683e51c507d4bc52b5b27" +dependencies = [ + "ahash 0.8.12", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-ipc", + "arrow-schema", + "arrow-select", + "base64 0.22.1", + "brotli", + "bytes", + "chrono", + "flate2", + "futures 0.3.31", + "half 2.7.1", + "hashbrown 0.16.1", + "lz4_flex", + "num 0.4.3", + "num-bigint 0.4.6", + "paste", + "seq-macro", + "simdutf8", + "snap", + "thrift", + "tokio", + "twox-hash", + "zstd", +] + [[package]] name = "paste" version = "1.0.15" @@ -4986,7 +5645,7 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2fd23b938276f14057220b707937bcb42fa76dda7560e57a2da30cb52d557937" dependencies = [ - "num", + "num 0.2.1", ] [[package]] @@ -5272,7 +5931,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270" dependencies = [ "bytes", - "heck", + "heck 0.4.1", "itertools 0.10.5", "lazy_static", "log", @@ -5622,6 +6281,26 @@ dependencies = [ "spin", ] +[[package]] +name = "ref-cast" +version = "1.0.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f354300ae66f76f1c85c5f84693f0ce81d747e2c3f21a45fef496d89c960bf7d" +dependencies = [ + "ref-cast-impl", +] + +[[package]] +name = "ref-cast-impl" +version = "1.0.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.111", +] + [[package]] name = "regex" version = "1.12.2" @@ -6057,6 +6736,30 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "schemars" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd191f9397d57d581cddd31014772520aa448f65ef991055d7f61582c65165f" +dependencies = [ + "dyn-clone", + "ref-cast", + "serde", + "serde_json", +] + +[[package]] +name = "schemars" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2b42f36aa1cd011945615b92222f6bf73c599a102a300334cd7f8dbeec726cc" +dependencies = [ + "dyn-clone", + "ref-cast", + "serde", + "serde_json", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -6143,6 +6846,12 @@ version = "1.0.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" +[[package]] +name = "seq-macro" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bc711410fbe7399f390ca1c3b60ad0f53f80e95c5eb935e52268a0e2cd49acc" + [[package]] name = "seqlock" version = "0.2.0" @@ -6187,7 +6896,7 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2bef2ebfde456fb76bbcf9f59315333decc4fda0b2b44b420243c11e0f5ec1f5" dependencies = [ - "half", + "half 1.8.3", "serde", ] @@ -6253,8 +6962,17 @@ version = "3.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fa237f2807440d238e0364a218270b98f767a00d3dada77b1c53ae88940e2e7" dependencies = [ + "base64 0.22.1", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.12.1", + "schemars 0.9.0", + "schemars 1.2.1", "serde_core", + "serde_json", "serde_with_macros", + "time", ] [[package]] @@ -6422,6 +7140,12 @@ version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe" +[[package]] +name = "simdutf8" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" + [[package]] name = "simpl" version = "0.1.0" @@ -6478,6 +7202,12 @@ dependencies = [ "time", ] +[[package]] +name = "snap" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" + [[package]] name = "socket2" version = "0.5.10" @@ -6519,7 +7249,7 @@ version = "3.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "014dcb9293341241dd153b35f89ea906e4170914f4a347a95e7fb07ade47cd6f" dependencies = [ - "bincode", + "bincode 1.3.3", "serde", "serde_bytes", "serde_derive", @@ -6539,7 +7269,7 @@ checksum = "5fd3308940576fd279b73156e29ed398ad1c5424fea9e42cca38b2e6bf98d6a2" dependencies = [ "Inflector", "base64 0.22.1", - "bincode", + "bincode 1.3.3", "bs58", "bv", "serde", @@ -6607,7 +7337,7 @@ checksum = "d7f54a3079b6d1c270c4b3c4ced2f4c218f7e71521411839ba83db3a1826a7fd" dependencies = [ "agave-fs", "ahash 0.8.12", - "bincode", + "bincode 1.3.3", "blake3", "bv", "bytemuck", @@ -6695,7 +7425,7 @@ version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2f56cac5e70517a2f27d05e5100b20de7182473ffd0035b23ea273307905987" dependencies = [ - "bincode", + "bincode 1.3.3", "bytemuck", "serde", "serde_derive", @@ -6733,7 +7463,7 @@ version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "278a1a5bad62cd9da89ac8d4b7ec444e83caa8ae96aa656dfc27684b28d49a5d" dependencies = [ - "bincode", + "bincode 1.3.3", "serde_core", "solana-instruction-error", ] @@ -6818,7 +7548,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7cb75c221b02918427762bcebdbfd34c831bd1c66442d2df928fa13f6b73fe" dependencies = [ "agave-syscalls", - "bincode", + "bincode 1.3.3", "qualifier_attr", "solana-account", "solana-bincode", @@ -6904,7 +7634,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b45846c3741bc62c5695c495e4a27f830e65cbde7dd244809337768ae6ad0b6" dependencies = [ "chrono", - "clap", + "clap 2.34.0", "rpassword", "solana-bls-signatures", "solana-clock", @@ -6951,7 +7681,7 @@ dependencies = [ "agave-reserved-account-keys", "base64 0.22.1", "chrono", - "clap", + "clap 2.34.0", "console 0.16.1", "humantime", "indicatif 0.18.3", @@ -6990,7 +7720,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b459e5f9ab10d2ae6959b96db5b1a56310e762e023102159bf9645f2097ecbbf" dependencies = [ "async-trait", - "bincode", + "bincode 1.3.3", "dashmap", "futures 0.3.31", "futures-util", @@ -7143,7 +7873,7 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63e401ae56aed512821cc7a0adaa412ff97fecd2dff4602be7b1330d2daec0c4" dependencies = [ - "bincode", + "bincode 1.3.3", "serde", "serde_derive", "solana-account", @@ -7161,7 +7891,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0748f2086e095d357408944ba8db6a8c8ba49376cb9f911f3fe2c44c055604f5" dependencies = [ "async-trait", - "bincode", + "bincode 1.3.3", "crossbeam-channel", "futures-util", "indexmap 2.12.1", @@ -7288,7 +8018,7 @@ version = "3.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b32ff86ab80b1e349377536073b50653a489a8767bf6993323f5b1e61ee07c7" dependencies = [ - "bincode", + "bincode 1.3.3", "crossbeam-channel", "dlopen2", "log", @@ -7368,8 +8098,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ef05bab4daca11502c90fc4febe6e9741b2a229008bfe1af8c85913d1f86515" dependencies = [ "agave-logger", - "bincode", - "clap", + "bincode 1.3.3", + "clap 2.34.0", "crossbeam-channel", "log", "serde", @@ -7400,7 +8130,7 @@ version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7347ab62e6d47a82e340c865133795b394feea7c2b2771d293f57691c6544c3f" dependencies = [ - "bincode", + "bincode 1.3.3", "serde", "serde_derive", "solana-account", @@ -7463,7 +8193,7 @@ version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "749eccc960e85c9b33608450093d256006253e1cb436b8380e71777840a3f675" dependencies = [ - "bincode", + "bincode 1.3.3", "chrono", "memmap2 0.5.10", "serde", @@ -7544,9 +8274,9 @@ dependencies = [ "arc-swap", "arrayvec", "assert_matches", - "bincode", + "bincode 1.3.3", "bv", - "clap", + "clap 2.34.0", "crossbeam-channel", "flate2", "indexmap 2.12.1", @@ -7641,7 +8371,7 @@ version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8df4e8fcba01d7efa647ed20a081c234475df5e11a93acb4393cc2c9a7b99bab" dependencies = [ - "bincode", + "bincode 1.3.3", "serde", "serde_derive", "solana-define-syscall 3.0.0", @@ -7744,7 +8474,7 @@ dependencies = [ "agave-snapshots", "anyhow", "assert_matches", - "bincode", + "bincode 1.3.3", "bitflags 2.10.0", "bytes", "bzip2", @@ -7937,7 +8667,7 @@ version = "3.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85666605c9fd727f865ed381665db0a8fc29f984a030ecc1e40f43bfb2541623" dependencies = [ - "bincode", + "bincode 1.3.3", "blake3", "lazy_static", "serde", @@ -7989,7 +8719,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2edb6edf83fa8b3d71135cda15d650062120e26021b41683bbbd94f527ed683" dependencies = [ "anyhow", - "bincode", + "bincode 1.3.3", "bytes", "cfg-if 1.0.4", "dashmap", @@ -8058,7 +8788,7 @@ version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6edf2f25743c95229ac0fdc32f8f5893ef738dbf332c669e9861d33ddb0f469d" dependencies = [ - "bincode", + "bincode 1.3.3", "bitflags 2.10.0", "cfg_eval", "serde", @@ -8073,7 +8803,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60c681205a42c004c5a66d90bebe30f646936041894f20acd941667a412a4a5f" dependencies = [ "ahash 0.8.12", - "bincode", + "bincode 1.3.3", "bv", "bytes", "caps", @@ -8220,7 +8950,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1946dc97d7666617c6eb9231d07ffdfd36841ebef3b08e04e2dd2249c56843a1" dependencies = [ "base64 0.22.1", - "bincode", + "bincode 1.3.3", "itertools 0.12.1", "log", "percentage", @@ -8407,7 +9137,7 @@ dependencies = [ "agave-feature-set", "agave-snapshots", "base64 0.22.1", - "bincode", + "bincode 1.3.3", "bs58", "crossbeam-channel", "dashmap", @@ -8492,7 +9222,7 @@ checksum = "695f5c9e9afbb79269d173db59ec79993720b817212addc0367fc447e12eb0da" dependencies = [ "async-trait", "base64 0.22.1", - "bincode", + "bincode 1.3.3", "bs58", "futures 0.3.31", "indicatif 0.18.3", @@ -8608,7 +9338,7 @@ dependencies = [ "arrayref", "assert_matches", "base64 0.22.1", - "bincode", + "bincode 1.3.3", "blake3", "bv", "bytemuck", @@ -9025,7 +9755,7 @@ checksum = "0ca57335e89774043dbf8f41d1bd300f81e6a61f13cad688eabae1c7b24f0729" dependencies = [ "agave-reserved-account-keys", "backoff", - "bincode", + "bincode 1.3.3", "bytes", "bzip2", "enum-iterator", @@ -9064,7 +9794,7 @@ version = "3.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f59231870d0ddabf9860c0a22ecdef8ff8f778aa4105d7ad8a4ac18bc1efd428" dependencies = [ - "bincode", + "bincode 1.3.3", "bs58", "prost", "protobuf-src", @@ -9262,7 +9992,7 @@ version = "3.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6a7a47efcfe3ada26f190077a0d90dfcffa7e08fc0174a5fce75b0159761b59" dependencies = [ - "bincode", + "bincode 1.3.3", "log", "serde", "solana-account", @@ -9304,7 +10034,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63205e68d680bcc315337dec311b616ab32fea0a612db3b883ce4de02e0953f9" dependencies = [ "base64 0.22.1", - "bincode", + "bincode 1.3.3", "lazy_static", "serde", "serde_derive", @@ -9365,7 +10095,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73b148fd0833086cb75a8d38d341752f5c1f082d73d3150cc45b47032e5d0fe8" dependencies = [ "async-trait", - "bincode", + "bincode 1.3.3", "futures-util", "indexmap 2.12.1", "indicatif 0.18.3", @@ -9425,7 +10155,7 @@ version = "3.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64928e6af3058dcddd6da6680cbe08324b4e071ad73115738235bbaa9e9f72a5" dependencies = [ - "bincode", + "bincode 1.3.3", "serde", "serde_derive", "solana-address 1.1.0", @@ -9447,7 +10177,7 @@ version = "3.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd9a056caa8b6bc1f47db81e6b92da836b2aa3cf20553ab49a1a2b2ab8fde31e" dependencies = [ - "bincode", + "bincode 1.3.3", "serde", "solana-account", "solana-instruction", @@ -9477,7 +10207,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a34f0be8c181093704032287650a52d2a884eba81614aa4c404a7ec43bca7b7c" dependencies = [ "base64 0.22.1", - "bincode", + "bincode 1.3.3", "log", "rand 0.8.5", "solana-packet", @@ -9495,7 +10225,7 @@ dependencies = [ "Inflector", "agave-reserved-account-keys", "base64 0.22.1", - "bincode", + "bincode 1.3.3", "borsh", "bs58", "log", @@ -9536,7 +10266,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b68112658f8ed0901054d3d1e7fcce3bedab88f190ca1b00ac5f121384cdb3b" dependencies = [ "base64 0.22.1", - "bincode", + "bincode 1.3.3", "bs58", "serde", "serde_json", @@ -9636,7 +10366,7 @@ version = "4.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db6e123e16bfdd7a81d71b4c4699e0b29580b619f4cd2ef5b6aae1eb85e8979f" dependencies = [ - "bincode", + "bincode 1.3.3", "cfg_eval", "num-derive", "num-traits", @@ -9663,7 +10393,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e25b1cebb26a3e4cce242612beb2bb2ada3a51e99d76a1cbece67591769273e7" dependencies = [ "agave-feature-set", - "bincode", + "bincode 1.3.3", "log", "num-derive", "num-traits", @@ -9713,7 +10443,7 @@ checksum = "9602bcb1f7af15caef92b91132ec2347e1c51a72ecdbefdaefa3eac4b8711475" dependencies = [ "aes-gcm-siv", "base64 0.22.1", - "bincode", + "bincode 1.3.3", "bytemuck", "bytemuck_derive", "curve25519-dalek 4.1.3", @@ -9767,7 +10497,7 @@ checksum = "5cae28b0bffeeb4431c12fb3b95f7afe748d81f7b3862a8a8770a84aff9b8282" dependencies = [ "aes-gcm-siv", "base64 0.22.1", - "bincode", + "bincode 1.3.3", "bytemuck", "bytemuck_derive", "curve25519-dalek 4.1.3", @@ -10098,7 +10828,7 @@ version = "0.24.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59" dependencies = [ - "heck", + "heck 0.4.1", "proc-macro2", "quote", "rustversion", @@ -10313,6 +11043,37 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "thrift" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" +dependencies = [ + "byteorder", + "integer-encoding", + "ordered-float", +] + +[[package]] +name = "tikv-jemalloc-sys" +version = "0.6.1+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd8aa5b2ab86a2cefa406d889139c162cbb230092f7d1d7cbc1716405d852a3b" +dependencies = [ + "cc", + "libc", +] + +[[package]] +name = "tikv-jemallocator" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0359b4327f954e0567e69fb191cf1436617748813819c94b8cd4a431422d053a" +dependencies = [ + "libc", + "tikv-jemalloc-sys", +] + [[package]] name = "time" version = "0.3.44" @@ -10361,6 +11122,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinystr" version = "0.8.2" @@ -10722,6 +11492,12 @@ dependencies = [ "webpki-roots 0.26.11", ] +[[package]] +name = "twox-hash" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ea3136b675547379c4bd395ca6b938e5ad3c3d20fad76e7fe85f9e0d011419c" + [[package]] name = "typenum" version = "1.19.0" @@ -10822,6 +11598,12 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" +[[package]] +name = "unty" +version = "0.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d49784317cd0d1ee7ec5c716dd598ec5b4483ea832a2dced265471cc0f690ae" + [[package]] name = "unwrap_none" version = "0.1.2" @@ -10861,6 +11643,12 @@ dependencies = [ "serde", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "utf-8" version = "0.7.6" @@ -10907,6 +11695,12 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "virtue" +version = "0.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "051eb1abcf10076295e815102942cc58f9d5e3b4560e46e53c21e8ff6f3af7b1" + [[package]] name = "void" version = "1.0.2" @@ -11752,6 +12546,12 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "zlib-rs" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40990edd51aae2c2b6907af74ffb635029d5788228222c4bb811e9351c0caad3" + [[package]] name = "zstd" version = "0.13.3" diff --git a/Cargo.toml b/Cargo.toml index c77b1e5..b9a730b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["jetstreamer-firehose", "jetstreamer-plugin", "jetstreamer-utils"] +members = ["jetstreamer-firehose", "jetstreamer-plugin", "jetstreamer-utils", "history-import-old-faithful"] [workspace.package] edition = "2024" @@ -97,10 +97,18 @@ ctrlc = "3" indoc = "2" tempfile = "3" libc = "0" +arrow = "56" +aws-config = "1" aws-credential-types = "1" aws-sdk-s3 = "1" aws-smithy-types = "1" aws-types = "1" +bincode_2 = { package = "bincode", version = "2", features = ["serde", "std", "alloc"] } +clap = { version = "4", features = ["derive", "env"] } +parquet = { version = "56", features = ["async"] } +serde_with = { version = "3", features = ["base64"] } +solana-account-decoder = "3" +solana-transaction-error = "3" [dependencies] tokio.workspace = true diff --git a/history-import-old-faithful/Cargo.toml b/history-import-old-faithful/Cargo.toml new file mode 100644 index 0000000..5f234d0 --- /dev/null +++ b/history-import-old-faithful/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "history-import-old-faithful" +edition.workspace = true +version.workspace = true + +[dependencies] +arrow.workspace = true +aws-config.workspace = true +aws-sdk-s3.workspace = true +bincode_2.workspace = true +bs58.workspace = true +clap.workspace = true +clickhouse.workspace = true +dashmap.workspace = true +futures-util.workspace = true +log.workspace = true +parquet.workspace = true +serde.workspace = true +serde_with.workspace = true +tempfile.workspace = true +thiserror.workspace = true +tokio = { workspace = true, features = ["full"] } +zstd = { workspace = true, features = ["zstdmt"] } +tikv-jemallocator = "0.6" + +jetstreamer = { path = ".." } +jetstreamer-firehose.workspace = true +jetstreamer-plugin.workspace = true + +solana-account-decoder.workspace = true +solana-address.workspace = true +solana-hash.workspace = true +solana-reward-info.workspace = true +solana-transaction.workspace = true +solana-transaction-error.workspace = true +solana-transaction-status.workspace = true diff --git a/history-import-old-faithful/src/compat_bincode.rs b/history-import-old-faithful/src/compat_bincode.rs new file mode 100644 index 0000000..52b1227 --- /dev/null +++ b/history-import-old-faithful/src/compat_bincode.rs @@ -0,0 +1,20 @@ +//! Bincode helpers matching `nitro_utils::bincode` configuration. +//! +//! Uses bincode 2.x with serde compat, fixed-int encoding, and little-endian +//! byte order — the exact same config as nitro-stream, so serialized bytes are +//! identical. + +use bincode_2::config::Config; +use serde::Serialize; + +pub type EncodeError = bincode_2::error::EncodeError; + +fn config() -> impl Config { + bincode_2::config::standard() + .with_fixed_int_encoding() + .with_little_endian() +} + +pub fn serialize(value: &T) -> Result, EncodeError> { + bincode_2::serde::encode_to_vec(value, config()) +} diff --git a/history-import-old-faithful/src/main.rs b/history-import-old-faithful/src/main.rs new file mode 100644 index 0000000..8b42c08 --- /dev/null +++ b/history-import-old-faithful/src/main.rs @@ -0,0 +1,82 @@ +mod compat_bincode; +mod plugin; +mod types; +mod writer; + +use clap::Parser; +use jetstreamer::JetstreamerRunner; +use std::path::PathBuf; +use tikv_jemallocator::Jemalloc; + +use crate::plugin::ParquetExportPlugin; + +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + +#[derive(Parser)] +#[command(about = "Export Old Faithful block data to partitioned parquet files on S3")] +struct Args { + /// First slot to process (inclusive). + #[arg(long, env)] + start_slot: u64, + + /// Last slot to process (exclusive). + #[arg(long, env)] + end_slot: u64, + + /// S3 bucket for parquet uploads. + #[arg(long, env)] + s3_bucket: String, + + /// S3 key prefix (e.g. "data/v1"). + #[arg(long, env, default_value = "")] + s3_prefix: String, + + /// Number of firehose ingestion threads. + #[arg(long, env = "JETSTREAMER_THREADS", default_value = "4")] + threads: usize, + + /// Optional local root directory where partition files are persisted. + /// Files are written under this root using the same key layout as S3. + #[arg(long, env)] + data_path: Option, + + /// Probe S3 once at startup and skip partitions that already exist. + #[arg( + long, + env = "HISTORY_IMPORT_SKIP_EXISTING_PARTITIONS", + default_value_t = true + )] + skip_existing_partitions: bool, +} + +fn main() -> Result<(), Box> { + let args = Args::parse(); + + assert!( + args.start_slot < args.end_slot, + "start_slot must be less than end_slot" + ); + + let temp_dir = tempfile::tempdir()?; + + let plugin = ParquetExportPlugin::new( + args.s3_bucket, + args.s3_prefix, + temp_dir.path().to_path_buf(), + args.data_path, + args.start_slot, + args.end_slot, + args.threads, + args.skip_existing_partitions, + ); + + JetstreamerRunner::new() + .with_log_level("info") + .with_plugin(Box::new(plugin)) + .with_threads(args.threads) + .with_slot_range_bounds(args.start_slot, args.end_slot) + .run()?; + + Ok(()) +} diff --git a/history-import-old-faithful/src/plugin.rs b/history-import-old-faithful/src/plugin.rs new file mode 100644 index 0000000..ea3ef3b --- /dev/null +++ b/history-import-old-faithful/src/plugin.rs @@ -0,0 +1,979 @@ +use std::{ + collections::BTreeMap, + collections::HashSet, + fs, io, + ops::Range, + path::PathBuf, + sync::OnceLock, + sync::{ + Arc, Mutex, + atomic::{AtomicU64, Ordering}, + }, + time::Duration, +}; + +use clickhouse::Client; +use dashmap::DashMap; +use futures_util::FutureExt; +use jetstreamer_firehose::firehose::{BlockData, TransactionData, generate_subranges}; +use jetstreamer_plugin::{Plugin, PluginFuture}; +use solana_reward_info::RewardType; +use tokio::{ + sync::mpsc::{self, error::TrySendError}, + task::JoinSet, +}; + +use crate::{ + types::{BalanceDiffs, BlockEvent, TransactionTokenBalanceSerde, TxWithMeta}, + writer::{PartitionWriter, PreparedPartitionUpload, SLOTS_PER_PARTITION, slot_to_partition}, +}; + +const MIN_SERIALIZE_QUEUE_CAPACITY: usize = 4; +const MAX_SERIALIZE_QUEUE_CAPACITY: usize = 32; +const SERIALIZE_QUEUE_CAPACITY_PER_THREAD: usize = 1; + +const MIN_PREPARED_QUEUE_CAPACITY: usize = 2; +const MAX_PREPARED_QUEUE_CAPACITY: usize = 16; +const PREPARED_QUEUE_CAPACITY_PER_THREAD: usize = 1; + +const MAX_IN_FLIGHT_UPLOADS: usize = 4; +const MAX_IN_FLIGHT_SERIALIZERS: usize = 8; +const DEFAULT_MEMORY_BUDGET_FRACTION: f64 = 0.70; +const DEFAULT_MEMORY_RESUME_FRACTION: f64 = 0.85; +const DEFAULT_MEMORY_CHECK_INTERVAL_BLOCKS: u64 = 32; +const MEMORY_BACKPRESSURE_POLL_MS: u64 = 50; +const MIN_MEMORY_BUDGET_BYTES: u64 = 512 * 1024 * 1024; +const DEFAULT_MEMORY_BUDGET_BYTES: u64 = 8 * 1024 * 1024 * 1024; +const MEMORY_BUDGET_MB_ENV: &str = "HISTORY_IMPORT_MEMORY_BUDGET_MB"; +const MEMORY_BUDGET_FRACTION_ENV: &str = "HISTORY_IMPORT_MEMORY_BUDGET_FRACTION"; +const MEMORY_RESUME_FRACTION_ENV: &str = "HISTORY_IMPORT_MEMORY_RESUME_FRACTION"; +const MEMORY_CHECK_INTERVAL_BLOCKS_ENV: &str = "HISTORY_IMPORT_MEMORY_CHECK_INTERVAL_BLOCKS"; +const MIN_PARTITION_PROBE_CONCURRENCY: usize = 8; +const MAX_PARTITION_PROBE_CONCURRENCY: usize = 128; +const PARTITION_PROBE_CONCURRENCY_PER_THREAD: usize = 4; + +struct BufferState { + thread_high_watermarks: Vec, + partitions: BTreeMap>, +} + +struct UploadJob { + partition: u64, + blocks: Vec, +} + +struct UploadQueue { + sender: mpsc::Sender, + error: Arc>>, + serializer_manager: tokio::task::JoinHandle>, + uploader_manager: tokio::task::JoinHandle>, +} + +pub struct ParquetExportPlugin { + pending_txs: DashMap>, + state: Mutex, + writer: Arc, + upload_queue: Mutex>, + serialize_queue_capacity: usize, + prepared_queue_capacity: usize, + serialization_concurrency: usize, + upload_concurrency: usize, + thread_ranges: Vec>, + memory_budget_bytes: u64, + memory_resume_bytes: u64, + memory_check_every_n_blocks: u64, + memory_check_counter: AtomicU64, + skip_existing_partitions: bool, + partition_probe_concurrency: usize, + existing_partitions: OnceLock>, + start_slot: u64, + end_slot_exclusive: u64, +} + +impl ParquetExportPlugin { + pub fn new( + s3_bucket: String, + s3_prefix: String, + temp_dir: PathBuf, + data_path: Option, + start_slot: u64, + end_slot_exclusive: u64, + threads: usize, + skip_existing_partitions: bool, + ) -> Self { + let thread_count = threads.max(1); + let slot_range = start_slot..end_slot_exclusive; + let thread_ranges = generate_subranges(&slot_range, thread_count as u64); + let thread_high_watermarks = thread_ranges + .iter() + .map(|thread_range| thread_range.start.saturating_sub(1)) + .collect(); + + let serialize_queue_capacity = Self::derive_queue_capacity( + thread_count, + SERIALIZE_QUEUE_CAPACITY_PER_THREAD, + MIN_SERIALIZE_QUEUE_CAPACITY, + MAX_SERIALIZE_QUEUE_CAPACITY, + ); + let prepared_queue_capacity = Self::derive_queue_capacity( + thread_count, + PREPARED_QUEUE_CAPACITY_PER_THREAD, + MIN_PREPARED_QUEUE_CAPACITY, + MAX_PREPARED_QUEUE_CAPACITY, + ); + let upload_concurrency = Self::derive_upload_concurrency(thread_count); + let serialization_concurrency = Self::derive_serialization_concurrency(thread_count); + let partition_probe_concurrency = Self::derive_queue_capacity( + thread_count, + PARTITION_PROBE_CONCURRENCY_PER_THREAD, + MIN_PARTITION_PROBE_CONCURRENCY, + MAX_PARTITION_PROBE_CONCURRENCY, + ); + let memory_budget_bytes = Self::derive_memory_budget_bytes(); + let memory_resume_bytes = Self::derive_memory_resume_bytes(memory_budget_bytes); + let memory_check_every_n_blocks = Self::derive_memory_check_interval_blocks(); + + Self { + pending_txs: DashMap::new(), + state: Mutex::new(BufferState { + thread_high_watermarks, + partitions: BTreeMap::new(), + }), + writer: Arc::new(PartitionWriter::new( + s3_bucket, s3_prefix, temp_dir, data_path, + )), + upload_queue: Mutex::new(None), + serialize_queue_capacity, + prepared_queue_capacity, + serialization_concurrency, + upload_concurrency, + thread_ranges, + memory_budget_bytes, + memory_resume_bytes, + memory_check_every_n_blocks, + memory_check_counter: AtomicU64::new(0), + skip_existing_partitions, + partition_probe_concurrency, + existing_partitions: OnceLock::new(), + start_slot, + end_slot_exclusive, + } + } + + fn read_meminfo_value_bytes(key: &str) -> Option { + let meminfo = fs::read_to_string("/proc/meminfo").ok()?; + let line = meminfo.lines().find(|line| line.starts_with(key))?; + let value_kib = line.split_whitespace().nth(1)?.parse::().ok()?; + value_kib.checked_mul(1024) + } + + fn read_process_rss_bytes() -> Option { + let status = fs::read_to_string("/proc/self/status").ok()?; + let line = status.lines().find(|line| line.starts_with("VmRSS:"))?; + let value_kib = line.split_whitespace().nth(1)?.parse::().ok()?; + value_kib.checked_mul(1024) + } + + fn parse_env_f64(name: &str) -> Option { + std::env::var(name).ok()?.parse::().ok() + } + + fn parse_env_u64(name: &str) -> Option { + std::env::var(name).ok()?.parse::().ok() + } + + fn derive_memory_budget_bytes() -> u64 { + if let Some(budget_mb) = Self::parse_env_u64(MEMORY_BUDGET_MB_ENV) { + return budget_mb + .saturating_mul(1024 * 1024) + .max(MIN_MEMORY_BUDGET_BYTES); + } + + let fraction = Self::parse_env_f64(MEMORY_BUDGET_FRACTION_ENV) + .unwrap_or(DEFAULT_MEMORY_BUDGET_FRACTION) + .clamp(0.05, 0.95); + + let available = Self::read_meminfo_value_bytes("MemAvailable:"); + let from_available = available + .map(|bytes| ((bytes as f64) * fraction) as u64) + .unwrap_or(DEFAULT_MEMORY_BUDGET_BYTES); + + from_available.max(MIN_MEMORY_BUDGET_BYTES) + } + + fn derive_memory_resume_bytes(memory_budget_bytes: u64) -> u64 { + let resume_fraction = Self::parse_env_f64(MEMORY_RESUME_FRACTION_ENV) + .unwrap_or(DEFAULT_MEMORY_RESUME_FRACTION) + .clamp(0.50, 0.99); + ((memory_budget_bytes as f64) * resume_fraction) as u64 + } + + fn derive_memory_check_interval_blocks() -> u64 { + Self::parse_env_u64(MEMORY_CHECK_INTERVAL_BLOCKS_ENV) + .unwrap_or(DEFAULT_MEMORY_CHECK_INTERVAL_BLOCKS) + .clamp(1, 4096) + } + + fn derive_queue_capacity( + thread_count: usize, + per_thread: usize, + min_capacity: usize, + max_capacity: usize, + ) -> usize { + thread_count + .saturating_mul(per_thread) + .clamp(min_capacity, max_capacity) + } + + fn derive_upload_concurrency(thread_count: usize) -> usize { + thread_count.clamp(1, MAX_IN_FLIGHT_UPLOADS) + } + + fn derive_serialization_concurrency(thread_count: usize) -> usize { + thread_count.clamp(1, MAX_IN_FLIGHT_SERIALIZERS) + } + + fn requested_partition_bounds(&self) -> Option<(u64, u64)> { + if self.end_slot_exclusive <= self.start_slot { + return None; + } + let start_partition = slot_to_partition(self.start_slot); + let end_partition = slot_to_partition(self.end_slot_exclusive.saturating_sub(1)); + Some((start_partition, end_partition)) + } + + fn should_skip_partition(&self, partition: u64) -> bool { + self.skip_existing_partitions + && self + .existing_partitions + .get() + .is_some_and(|existing| existing.contains(&partition)) + } + + async fn probe_existing_partitions( + &self, + ) -> Result, Box> { + if !self.skip_existing_partitions { + return Ok(HashSet::new()); + } + + let Some((start_partition, end_partition)) = self.requested_partition_bounds() else { + return Ok(HashSet::new()); + }; + let total_partitions = end_partition + .saturating_sub(start_partition) + .saturating_add(1); + log::info!( + "Probing S3 for existing partitions in range {}..={} ({} partitions, concurrency {})", + start_partition, + end_partition, + total_partitions, + self.partition_probe_concurrency, + ); + + let mut existing = HashSet::new(); + let mut in_flight = JoinSet::new(); + let mut next_partition = Some(start_partition); + + while next_partition.is_some() || !in_flight.is_empty() { + while let Some(partition) = next_partition { + if in_flight.len() >= self.partition_probe_concurrency { + break; + } + next_partition = if partition == end_partition { + None + } else { + Some(partition + 1) + }; + let writer = self.writer.clone(); + in_flight.spawn(async move { + writer + .partition_exists(partition) + .await + .map(|exists| (partition, exists)) + .map_err(|err| format!("partition {partition}: {err}")) + }); + } + + let Some(join_result) = in_flight.join_next().await else { + break; + }; + match join_result { + Ok(Ok((partition, true))) => { + existing.insert(partition); + } + Ok(Ok((_partition, false))) => {} + Ok(Err(err)) => return Err(Self::boxed_error(format!("S3 probe failed: {err}"))), + Err(join_err) => { + return Err(Self::boxed_error(format!( + "S3 partition probe task panicked: {join_err}" + ))); + } + } + } + + Ok(existing) + } + + fn boxed_error( + message: impl Into, + ) -> Box { + Box::new(io::Error::new(io::ErrorKind::Other, message.into())) + } + + fn format_mib(bytes: u64) -> u64 { + bytes / (1024 * 1024) + } + + fn set_upload_error(error: &Arc>>, message: String) { + let mut guard = error.lock().expect("upload error lock poisoned"); + if guard.is_none() { + *guard = Some(message); + } + } + + fn get_upload_error(error: &Arc>>) -> Option { + error.lock().expect("upload error lock poisoned").clone() + } + + fn get_pipeline_error(&self) -> Option { + let upload_queue = self + .upload_queue + .lock() + .expect("upload queue lock poisoned"); + upload_queue + .as_ref() + .and_then(|queue| Self::get_upload_error(&queue.error)) + } + + async fn maybe_apply_memory_backpressure( + &self, + ) -> Result<(), Box> { + let check_index = self.memory_check_counter.fetch_add(1, Ordering::Relaxed) + 1; + if check_index % self.memory_check_every_n_blocks != 0 { + return Ok(()); + } + + let Some(mut rss_bytes) = Self::read_process_rss_bytes() else { + return Ok(()); + }; + if rss_bytes <= self.memory_budget_bytes { + return Ok(()); + } + + log::warn!( + "RSS {} MiB exceeded memory budget {} MiB; pausing ingestion until below {} MiB", + Self::format_mib(rss_bytes), + Self::format_mib(self.memory_budget_bytes), + Self::format_mib(self.memory_resume_bytes), + ); + + loop { + if let Some(err) = self.get_pipeline_error() { + return Err(Self::boxed_error(format!( + "upload pipeline unavailable during memory backpressure: {err}" + ))); + } + + tokio::time::sleep(Duration::from_millis(MEMORY_BACKPRESSURE_POLL_MS)).await; + + let Some(current_rss_bytes) = Self::read_process_rss_bytes() else { + break; + }; + rss_bytes = current_rss_bytes; + if rss_bytes <= self.memory_resume_bytes { + break; + } + } + + log::info!( + "Memory backpressure released at RSS {} MiB", + Self::format_mib(rss_bytes) + ); + + Ok(()) + } + + fn handle_join_result( + join_result: Result, tokio::task::JoinError>, + error: &Arc>>, + worker_label: &str, + ) -> Result { + match join_result { + Ok(Ok(value)) => Ok(value), + Ok(Err(err)) => { + Self::set_upload_error(error, err.clone()); + Err(err) + } + Err(join_err) => { + let message = format!("{worker_label} task panicked: {join_err}"); + Self::set_upload_error(error, message.clone()); + Err(message) + } + } + } + + async fn send_prepared_partition( + prepared_sender: &mpsc::Sender, + prepared: PreparedPartitionUpload, + error: &Arc>>, + ) -> Result<(), String> { + if prepared.rows_written == 0 { + return Ok(()); + } + + let partition = prepared.partition; + prepared_sender.send(prepared).await.map_err(|_| { + Self::get_upload_error(error) + .unwrap_or_else(|| format!("failed to enqueue prepared partition {partition}")) + }) + } + + async fn run_serialization_manager( + writer: Arc, + mut receiver: mpsc::Receiver, + prepared_sender: mpsc::Sender, + max_in_flight: usize, + error: Arc>>, + ) -> Result<(), String> { + let mut in_flight = JoinSet::new(); + + loop { + if in_flight.len() >= max_in_flight { + if let Some(join_result) = in_flight.join_next().await { + let prepared = + Self::handle_join_result(join_result, &error, "serialization worker")?; + Self::send_prepared_partition(&prepared_sender, prepared, &error).await?; + } + continue; + } + + tokio::select! { + maybe_job = receiver.recv() => { + let Some(job) = maybe_job else { + break; + }; + let writer = writer.clone(); + in_flight.spawn_blocking(move || { + let partition = job.partition; + writer.prepare_partition(partition, job.blocks).map_err(|e| { + format!("partition {partition}: {e}") + }) + }); + } + Some(join_result) = in_flight.join_next(), if !in_flight.is_empty() => { + let prepared = Self::handle_join_result(join_result, &error, "serialization worker")?; + Self::send_prepared_partition(&prepared_sender, prepared, &error).await?; + } + } + } + + while let Some(join_result) = in_flight.join_next().await { + let prepared = Self::handle_join_result(join_result, &error, "serialization worker")?; + Self::send_prepared_partition(&prepared_sender, prepared, &error).await?; + } + + drop(prepared_sender); + + Ok(()) + } + + async fn run_upload_manager( + writer: Arc, + mut receiver: mpsc::Receiver, + max_in_flight: usize, + error: Arc>>, + ) -> Result<(), String> { + let mut in_flight = JoinSet::new(); + + loop { + if in_flight.len() >= max_in_flight { + if let Some(join_result) = in_flight.join_next().await { + Self::handle_join_result(join_result, &error, "upload worker")?; + } + continue; + } + + tokio::select! { + maybe_prepared = receiver.recv() => { + let Some(prepared) = maybe_prepared else { + break; + }; + let writer = writer.clone(); + in_flight.spawn(async move { + let partition = prepared.partition; + writer.upload_prepared_partition(prepared).await.map_err(|e| { + format!("partition {partition}: {e}") + }) + }); + } + Some(join_result) = in_flight.join_next(), if !in_flight.is_empty() => { + Self::handle_join_result(join_result, &error, "upload worker")?; + } + } + } + + while let Some(join_result) = in_flight.join_next().await { + Self::handle_join_result(join_result, &error, "upload worker")?; + } + + Ok(()) + } + + fn convert_transaction(tx_data: &TransactionData) -> TxWithMeta { + let meta = &tx_data.transaction_status_meta; + let error = meta.status.clone().err(); + let logs = meta.log_messages.clone(); + + let balance_diffs = Some(BalanceDiffs { + pre_balances: meta.pre_balances.clone(), + post_balances: meta.post_balances.clone(), + pre_token_balances: meta.pre_token_balances.as_ref().map(|balances| { + balances + .iter() + .cloned() + .map(TransactionTokenBalanceSerde::from) + .collect() + }), + post_token_balances: meta.post_token_balances.as_ref().map(|balances| { + balances + .iter() + .cloned() + .map(TransactionTokenBalanceSerde::from) + .collect() + }), + }); + + TxWithMeta { + transaction: tx_data.transaction.clone(), + error, + balance_diffs, + logs, + } + } + + fn convert_keyed_rewards( + keyed_rewards: &[(solana_address::Address, solana_reward_info::RewardInfo)], + ) -> solana_transaction_status::Rewards { + keyed_rewards + .iter() + .map(|(address, info)| { + use solana_transaction_status::RewardType as TSR; + + solana_transaction_status::Reward { + pubkey: address.to_string(), + lamports: info.lamports, + post_balance: info.post_balance, + reward_type: Some(match info.reward_type { + RewardType::Fee => TSR::Fee, + RewardType::Rent => TSR::Rent, + RewardType::Staking => TSR::Staking, + RewardType::Voting => TSR::Voting, + }), + commission: info.commission, + } + }) + .collect() + } + + fn update_thread_watermark(&self, state: &mut BufferState, thread_id: usize, slot: u64) { + if thread_id >= state.thread_high_watermarks.len() { + state + .thread_high_watermarks + .resize(thread_id + 1, self.start_slot.saturating_sub(1)); + } + + let current = &mut state.thread_high_watermarks[thread_id]; + if slot > *current { + *current = slot; + } + } + + fn partition_bounds(partition: u64) -> (u64, u64) { + let start = partition.saturating_mul(SLOTS_PER_PARTITION); + let end = start.saturating_add(SLOTS_PER_PARTITION); + (start, end) + } + + fn partition_is_ready(&self, state: &BufferState, partition: u64) -> bool { + let (partition_start, partition_end) = Self::partition_bounds(partition); + let mut has_overlapping_thread = false; + + for (thread_id, thread_range) in self.thread_ranges.iter().enumerate() { + let overlap_start = thread_range.start.max(partition_start); + let overlap_end = thread_range.end.min(partition_end); + if overlap_start >= overlap_end { + continue; + } + + has_overlapping_thread = true; + let required_high_watermark = overlap_end.saturating_sub(1); + let current_high_watermark = state + .thread_high_watermarks + .get(thread_id) + .copied() + .unwrap_or(self.start_slot.saturating_sub(1)); + if current_high_watermark < required_high_watermark { + return false; + } + } + + has_overlapping_thread + } + + fn drain_next_ready_partition( + &self, + state: &mut BufferState, + ) -> Option<(u64, Vec)> { + let ready_partition = state + .partitions + .keys() + .copied() + .find(|partition| self.partition_is_ready(state, *partition))?; + + state + .partitions + .remove(&ready_partition) + .map(|slot_map| (ready_partition, slot_map.into_values().collect())) + } + + async fn queue_partition_upload( + &self, + partition: u64, + blocks: Vec, + ) -> Result<(), Box> { + let (sender, upload_error_state) = { + let upload_queue = self + .upload_queue + .lock() + .expect("upload queue lock poisoned"); + let Some(queue) = upload_queue.as_ref() else { + return Err(Self::boxed_error("upload queue not initialized")); + }; + (queue.sender.clone(), queue.error.clone()) + }; + + if let Some(err) = Self::get_upload_error(&upload_error_state) { + return Err(Self::boxed_error(format!( + "upload pipeline unavailable: {err}" + ))); + } + + let job = UploadJob { partition, blocks }; + match sender.try_send(job) { + Ok(()) => {} + Err(TrySendError::Full(job)) => { + log::debug!( + "Serialization queue full (capacity {}), applying backpressure", + self.serialize_queue_capacity, + ); + sender.send(job).await.map_err(|_| { + let reason = Self::get_upload_error(&upload_error_state) + .unwrap_or_else(|| "upload queue closed".to_string()); + Self::boxed_error(format!( + "failed to enqueue partition {partition} for serialization: {reason}" + )) + })?; + } + Err(TrySendError::Closed(_)) => { + let reason = Self::get_upload_error(&upload_error_state) + .unwrap_or_else(|| "upload queue closed".to_string()); + return Err(Self::boxed_error(format!( + "failed to enqueue partition {partition} for serialization: {reason}" + ))); + } + } + + if let Some(err) = Self::get_upload_error(&upload_error_state) { + return Err(Self::boxed_error(format!( + "upload pipeline unavailable: {err}" + ))); + } + + Ok(()) + } + + async fn shutdown_upload_queue( + &self, + ) -> Result<(), Box> { + let queue = { + let mut upload_queue = self + .upload_queue + .lock() + .expect("upload queue lock poisoned"); + upload_queue.take() + }; + + let Some(queue) = queue else { + return Ok(()); + }; + + let UploadQueue { + sender, + error, + serializer_manager, + uploader_manager, + } = queue; + + drop(sender); + + let mut first_error: Option = None; + + match serializer_manager.await { + Ok(Ok(())) => {} + Ok(Err(err)) => { + first_error = Some(format!("serialization manager failed: {err}")); + } + Err(join_err) => { + first_error = Some(format!("serialization manager task panicked: {join_err}")); + } + } + + match uploader_manager.await { + Ok(Ok(())) => {} + Ok(Err(err)) => { + if first_error.is_none() { + first_error = Some(format!("upload manager failed: {err}")); + } + } + Err(join_err) => { + if first_error.is_none() { + first_error = Some(format!("upload manager task panicked: {join_err}")); + } + } + } + + if first_error.is_none() + && let Some(err) = Self::get_upload_error(&error) + { + first_error = Some(err); + } + + if let Some(err) = first_error { + return Err(Self::boxed_error(format!("upload pipeline failed: {err}"))); + } + + Ok(()) + } +} + +impl Plugin for ParquetExportPlugin { + fn name(&self) -> &'static str { + "Parquet Export" + } + + fn on_transaction<'a>( + &'a self, + _thread_id: usize, + _db: Option>, + transaction: &'a TransactionData, + ) -> PluginFuture<'a> { + let slot = transaction.slot; + + async move { + if slot < self.start_slot || slot >= self.end_slot_exclusive { + return Ok(()); + } + + if self.should_skip_partition(slot_to_partition(slot)) { + return Ok(()); + } + + let tx = Self::convert_transaction(transaction); + self.pending_txs.entry(slot).or_default().push(tx); + Ok(()) + } + .boxed() + } + + fn on_block<'a>( + &'a self, + thread_id: usize, + _db: Option>, + block: &'a BlockData, + ) -> PluginFuture<'a> { + async move { + let slot = block.slot(); + + let mut pending_block_event = match block { + BlockData::Block { + slot, + blockhash, + parent_slot, + parent_blockhash, + rewards, + block_time, + .. + } => { + let partition = slot_to_partition(*slot); + if self.should_skip_partition(partition) { + self.pending_txs.remove(slot); + None + } else { + let transactions = self + .pending_txs + .remove(slot) + .map(|(_, txs)| txs) + .unwrap_or_default(); + + let converted_rewards = Self::convert_keyed_rewards(&rewards.keyed_rewards); + + Some(BlockEvent { + slot: *slot, + blockhash: *blockhash, + block_time: block_time.unwrap_or(0), + parent_slot: *parent_slot, + parent_blockhash: *parent_blockhash, + rewards: converted_rewards, + transactions, + }) + } + } + _ => None, + }; + + let mut watermark_updated = false; + loop { + let maybe_ready_partition = { + let mut state = self.state.lock().expect("partition state lock poisoned"); + + if !watermark_updated { + self.update_thread_watermark(&mut state, thread_id, slot); + watermark_updated = true; + } + + if let Some(block_event) = pending_block_event.take() + && block_event.slot >= self.start_slot + && block_event.slot < self.end_slot_exclusive + { + let partition = slot_to_partition(block_event.slot); + let slots = state.partitions.entry(partition).or_default(); + if slots.insert(block_event.slot, block_event).is_some() { + log::warn!( + "Replacing duplicate block event for slot {slot} in partition {partition}" + ); + } + } + + self.drain_next_ready_partition(&mut state) + }; + + let Some((partition, blocks)) = maybe_ready_partition else { + break; + }; + + self.queue_partition_upload(partition, blocks).await?; + } + + self.maybe_apply_memory_backpressure().await?; + + Ok(()) + } + .boxed() + } + + fn on_load(&self, _db: Option>) -> PluginFuture<'_> { + async move { + self.writer.init_s3().await; + + if self.skip_existing_partitions { + let existing = self.probe_existing_partitions().await?; + let existing_count = existing.len(); + if self.existing_partitions.set(existing).is_err() { + return Err(Self::boxed_error( + "existing partition cache already initialized", + )); + } + log::info!( + "Will skip {} partition(s) already present on S3", + existing_count + ); + } else if self.existing_partitions.set(HashSet::new()).is_err() { + return Err(Self::boxed_error( + "existing partition cache already initialized", + )); + } + + let mut upload_queue = self.upload_queue.lock().expect("upload queue lock poisoned"); + if upload_queue.is_none() { + let (sender, serialize_receiver) = mpsc::channel(self.serialize_queue_capacity); + let (prepared_sender, prepared_receiver) = + mpsc::channel(self.prepared_queue_capacity); + let error = Arc::new(Mutex::new(None)); + let serialization_concurrency = self.serialization_concurrency; + let upload_concurrency = self.upload_concurrency; + let writer_for_serializer = self.writer.clone(); + let writer_for_uploader = self.writer.clone(); + let serializer_error = error.clone(); + let uploader_error = error.clone(); + + let serializer_manager = tokio::spawn(async move { + Self::run_serialization_manager( + writer_for_serializer, + serialize_receiver, + prepared_sender, + serialization_concurrency, + serializer_error, + ) + .await + }); + let uploader_manager = tokio::spawn(async move { + Self::run_upload_manager( + writer_for_uploader, + prepared_receiver, + upload_concurrency, + uploader_error, + ) + .await + }); + + *upload_queue = Some(UploadQueue { + sender, + error, + serializer_manager, + uploader_manager, + }); + } + + log::info!( + "Parquet Export plugin initialized upload pipeline: serialize queue {}, prepared queue {}, serialization concurrency {}, upload concurrency {}, memory budget {} MiB (resume {} MiB, check every {} blocks), skip existing partitions {}", + self.serialize_queue_capacity, + self.prepared_queue_capacity, + self.serialization_concurrency, + self.upload_concurrency, + Self::format_mib(self.memory_budget_bytes), + Self::format_mib(self.memory_resume_bytes), + self.memory_check_every_n_blocks, + self.skip_existing_partitions, + ); + Ok(()) + } + .boxed() + } + + fn on_exit(&self, _db: Option>) -> PluginFuture<'_> { + async move { + let remaining = { + let mut state = self.state.lock().expect("partition state lock poisoned"); + std::mem::take(&mut state.partitions) + }; + + if !remaining.is_empty() { + log::info!( + "Flushing {} remaining partition(s) on exit", + remaining.len() + ); + } + + for (partition, slot_map) in remaining { + self.queue_partition_upload(partition, slot_map.into_values().collect()) + .await?; + } + + self.shutdown_upload_queue().await?; + + Ok(()) + } + .boxed() + } +} diff --git a/history-import-old-faithful/src/types.rs b/history-import-old-faithful/src/types.rs new file mode 100644 index 0000000..855ecbf --- /dev/null +++ b/history-import-old-faithful/src/types.rs @@ -0,0 +1,78 @@ +//! Types mirrored from nitro-stream's `history-model` and `nitro-stream` crates. +//! +//! These must produce identical bincode output so that parquet files written here +//! can be read by `history_model::deserialize_block_event`. + +use std::str::FromStr; + +use serde::{Deserialize, Serialize}; +use serde_with::{DisplayFromStr, serde_as}; +use solana_account_decoder::parse_token::UiTokenAmount; +use solana_hash::Hash; +use solana_transaction::versioned::VersionedTransaction; +use solana_transaction_error::TransactionError; +use solana_transaction_status::Rewards; + +/// Mirrors `history_model::types::BlockEvent`. +#[derive(Serialize, Deserialize, Clone)] +pub struct BlockEvent { + pub slot: u64, + pub blockhash: Hash, + pub block_time: i64, + pub parent_slot: u64, + pub parent_blockhash: Hash, + pub rewards: Rewards, + pub transactions: Vec, +} + +/// Mirrors `history_model::types::TxWithMeta`. +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct TxWithMeta { + pub transaction: VersionedTransaction, + pub error: Option, + pub balance_diffs: Option, + pub logs: Option>, +} + +/// Mirrors `nitro_stream::BalanceDiffs`. +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct BalanceDiffs { + pub pre_balances: Vec, + pub post_balances: Vec, + pub pre_token_balances: Option>, + pub post_token_balances: Option>, +} + +/// Mirrors `nitro_stream::TransactionTokenBalanceSerde`. +/// +/// The `serde_as(DisplayFromStr)` attributes on the address fields are +/// critical — they match the serde representation used by nitro-stream so that +/// bincode output is byte-identical. +#[serde_as] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TransactionTokenBalanceSerde { + pub account_index: u8, + #[serde_as(as = "DisplayFromStr")] + pub mint: solana_address::Address, + pub ui_token_amount: UiTokenAmount, + #[serde_as(as = "DisplayFromStr")] + pub owner: solana_address::Address, + #[serde_as(as = "DisplayFromStr")] + pub program_id: solana_address::Address, +} + +impl From for TransactionTokenBalanceSerde { + fn from(value: solana_transaction_status::TransactionTokenBalance) -> Self { + Self { + account_index: value.account_index, + mint: parse_address_or_default(&value.mint), + ui_token_amount: value.ui_token_amount, + owner: parse_address_or_default(&value.owner), + program_id: parse_address_or_default(&value.program_id), + } + } +} + +fn parse_address_or_default(s: &str) -> solana_address::Address { + solana_address::Address::from_str(s).unwrap_or_default() +} diff --git a/history-import-old-faithful/src/writer.rs b/history-import-old-faithful/src/writer.rs new file mode 100644 index 0000000..cc1e3d6 --- /dev/null +++ b/history-import-old-faithful/src/writer.rs @@ -0,0 +1,387 @@ +use std::{ + fs::{self, File}, + io::{BufWriter, Write}, + path::PathBuf, + sync::Arc, + time::{SystemTime, UNIX_EPOCH}, +}; + +use arrow::{ + array::{BinaryBuilder, TimestampSecondBuilder, UInt64Builder}, + datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}, + record_batch::RecordBatch, +}; +use aws_config::BehaviorVersion; +use aws_sdk_s3::{Client as S3Client, error::ProvideErrorMetadata, primitives::ByteStream}; +use parquet::{ + arrow::ArrowWriter, + basic::Compression, + errors::ParquetError, + file::properties::{WriterProperties, WriterVersion}, +}; +use thiserror::Error; +use tokio::sync::OnceCell; + +use crate::{compat_bincode, types::BlockEvent}; + +pub const SLOTS_PER_PARTITION: u64 = 1_000; +const WRITE_OPTIMIZED_MAX_ROW_GROUP_ROWS: usize = 1_000; +const ZSTD_COMPRESSION_LEVEL: i32 = 6; +const DEFAULT_ZSTD_THREADS_PER_WRITER: u32 = 1; +const MAX_ZSTD_THREADS_PER_WRITER: u32 = 8; +// Keep batch payloads modest to reduce peak RSS while still writing contiguous parquet batches. +const MAX_BINARY_BYTES_PER_BATCH: usize = 1_024 * 1_024 * 1_024 * 3 / 2; + +#[derive(Debug, Error)] +pub enum WriterError { + #[error("parquet error: {0}")] + Parquet(#[from] ParquetError), + #[error("bincode encode error: {0}")] + Bincode(#[from] compat_bincode::EncodeError), + #[error("arrow error: {0}")] + Arrow(#[from] arrow::error::ArrowError), + #[error("I/O error: {0}")] + Io(#[from] std::io::Error), + #[error("S3 put error: {0}")] + S3Put(String), + #[error("S3 head error: {0}")] + S3Head(String), + #[error("ByteStream error: {0}")] + ByteStream(String), +} + +fn blocks_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("slot", DataType::UInt64, false), + Field::new("time", DataType::Timestamp(TimeUnit::Second, None), false), + Field::new("block", DataType::Binary, false), + ])) +} + +struct BlockBatchBuilder { + slot_builder: UInt64Builder, + block_time_builder: TimestampSecondBuilder, + block_builder: BinaryBuilder, + rows: usize, + binary_bytes: usize, +} + +impl BlockBatchBuilder { + fn new() -> Self { + Self { + slot_builder: UInt64Builder::new(), + block_time_builder: TimestampSecondBuilder::new(), + block_builder: BinaryBuilder::new(), + rows: 0, + binary_bytes: 0, + } + } + + fn is_empty(&self) -> bool { + self.rows == 0 + } + + fn can_fit(&self, additional_binary_bytes: usize) -> bool { + self.binary_bytes + .checked_add(additional_binary_bytes) + .is_some_and(|total| total <= MAX_BINARY_BYTES_PER_BATCH) + } + + fn append_serialized_block( + &mut self, + slot: u64, + block_time: i64, + block_data: &[u8], + ) -> Result<(), WriterError> { + self.slot_builder.append_value(slot); + self.block_time_builder.append_value(block_time); + self.block_builder.append_value(block_data); + self.rows += 1; + self.binary_bytes += block_data.len(); + Ok(()) + } + + fn finish(&mut self) -> Result { + let batch = RecordBatch::try_new( + blocks_schema(), + vec![ + Arc::new(self.slot_builder.finish()), + Arc::new(self.block_time_builder.finish()), + Arc::new(self.block_builder.finish()), + ], + )?; + + self.rows = 0; + self.binary_bytes = 0; + + Ok(batch) + } +} + +struct ZstdArrowWriter { + writer: ArrowWriter>>, +} + +impl ZstdArrowWriter { + fn new( + writer: W, + schema: SchemaRef, + zstd_threads_per_writer: u32, + ) -> Result { + let buf_writer = BufWriter::with_capacity(1024 * 1024, writer); + let mut encoder = zstd::Encoder::new(buf_writer, ZSTD_COMPRESSION_LEVEL) + .map_err(|e| ParquetError::External(Box::new(e)))?; + encoder + .multithread(zstd_threads_per_writer) + .map_err(|e| ParquetError::External(Box::new(e)))?; + + let props = WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_2_0) + .set_compression(Compression::UNCOMPRESSED) + .set_max_row_group_size(WRITE_OPTIMIZED_MAX_ROW_GROUP_ROWS) + .build(); + + let writer = ArrowWriter::try_new(encoder, schema, Some(props))?; + Ok(Self { writer }) + } + + fn write(&mut self, batch: &RecordBatch) -> Result<(), ParquetError> { + self.writer.write(batch) + } + + fn close(self) -> Result<(), ParquetError> { + let encoder = self.writer.into_inner()?; + encoder + .finish() + .map_err(|e| ParquetError::External(Box::new(e)))?; + Ok(()) + } +} + +pub fn slot_to_partition(slot: u64) -> u64 { + slot / SLOTS_PER_PARTITION +} + +pub struct PreparedPartitionUpload { + pub partition: u64, + pub rows_written: usize, + pub s3_key: String, + pub output_path: PathBuf, + pub should_cleanup: bool, +} + +pub struct PartitionWriter { + s3_bucket: String, + s3_prefix: String, + s3_client: OnceCell, + temp_dir: PathBuf, + data_path: Option, + zstd_threads_per_writer: u32, +} + +impl PartitionWriter { + pub fn new( + s3_bucket: String, + s3_prefix: String, + temp_dir: PathBuf, + data_path: Option, + ) -> Self { + let zstd_threads_per_writer = std::env::var("HISTORY_IMPORT_ZSTD_THREADS_PER_WRITER") + .ok() + .and_then(|value| value.parse::().ok()) + .map(|value| value.clamp(1, MAX_ZSTD_THREADS_PER_WRITER)) + .unwrap_or(DEFAULT_ZSTD_THREADS_PER_WRITER); + + Self { + s3_bucket, + s3_prefix: s3_prefix.trim_matches('/').to_string(), + s3_client: OnceCell::new(), + temp_dir, + data_path, + zstd_threads_per_writer, + } + } + + pub async fn init_s3(&self) -> &S3Client { + self.s3_client + .get_or_init(|| async { + let config = aws_config::defaults(BehaviorVersion::latest()).load().await; + S3Client::new(&config) + }) + .await + } + + pub async fn partition_exists(&self, partition: u64) -> Result { + let client = self.init_s3().await; + let s3_key = self.s3_key_for_partition(partition); + + match client + .head_object() + .bucket(&self.s3_bucket) + .key(&s3_key) + .send() + .await + { + Ok(_) => Ok(true), + Err(err) => { + if let Some(service_err) = err.as_service_error() { + let is_not_found = + matches!(service_err.code(), Some("NotFound" | "NoSuchKey" | "404")); + if is_not_found { + return Ok(false); + } + } + if err.to_string().contains("404") { + return Ok(false); + } + Err(WriterError::S3Head(format!("{err:?}"))) + } + } + } + + fn s3_key_for_partition(&self, partition: u64) -> String { + if self.s3_prefix.is_empty() { + format!("blocks/{partition:010}/block.parquet.zst") + } else { + format!( + "{}/blocks/{partition:010}/block.parquet.zst", + self.s3_prefix + ) + } + } + + fn output_path_for_key( + &self, + partition: u64, + s3_key: &str, + ) -> Result<(PathBuf, bool), WriterError> { + if let Some(root) = &self.data_path { + let local_path = root.join(s3_key); + if let Some(parent) = local_path.parent() { + fs::create_dir_all(parent)?; + } + Ok((local_path, false)) + } else { + let unique_suffix = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|duration| duration.as_nanos()) + .unwrap_or_default(); + let temp_path = self.temp_dir.join(format!( + "{partition:010}-{}-{unique_suffix}.parquet.zst", + std::process::id() + )); + Ok((temp_path, true)) + } + } + + pub fn prepare_partition( + &self, + partition: u64, + mut blocks: Vec, + ) -> Result { + if blocks.is_empty() { + return Ok(PreparedPartitionUpload { + partition, + rows_written: 0, + s3_key: self.s3_key_for_partition(partition), + output_path: PathBuf::new(), + should_cleanup: false, + }); + } + + // Keep deterministic in-partition ordering before serialization. + blocks.sort_by_key(|block| block.slot); + + let s3_key = self.s3_key_for_partition(partition); + let (output_path, should_cleanup) = self.output_path_for_key(partition, &s3_key)?; + + let mut rows_written = 0usize; + { + let file = File::create(&output_path)?; + let mut parquet_writer = + ZstdArrowWriter::new(file, blocks_schema(), self.zstd_threads_per_writer)?; + let mut batch_builder = BlockBatchBuilder::new(); + + for block in blocks.drain(..) { + let block_data = compat_bincode::serialize(&block)?; + if block_data.len() > MAX_BINARY_BYTES_PER_BATCH { + return Err(WriterError::Io(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!( + "serialized block at slot {} is too large ({} bytes)", + block.slot, + block_data.len(), + ), + ))); + } + + if !batch_builder.is_empty() && !batch_builder.can_fit(block_data.len()) { + let batch = batch_builder.finish()?; + rows_written += batch.num_rows(); + parquet_writer.write(&batch)?; + } + + batch_builder.append_serialized_block(block.slot, block.block_time, &block_data)?; + } + + if !batch_builder.is_empty() { + let batch = batch_builder.finish()?; + rows_written += batch.num_rows(); + parquet_writer.write(&batch)?; + } + + parquet_writer.close()?; + } + + Ok(PreparedPartitionUpload { + partition, + rows_written, + s3_key, + output_path, + should_cleanup, + }) + } + + pub async fn upload_prepared_partition( + &self, + prepared: PreparedPartitionUpload, + ) -> Result<(), WriterError> { + if prepared.rows_written == 0 { + return Ok(()); + } + + let client = self.init_s3().await; + let body = ByteStream::from_path(&prepared.output_path) + .await + .map_err(|e| WriterError::ByteStream(e.to_string()))?; + + client + .put_object() + .bucket(&self.s3_bucket) + .key(&prepared.s3_key) + .body(body) + .send() + .await + .map_err(|e| WriterError::S3Put(format!("{e:?}")))?; + + log::info!( + "Uploaded partition {partition} ({} blocks) to s3://{}/{}", + prepared.rows_written, + self.s3_bucket, + prepared.s3_key, + partition = prepared.partition, + ); + + if prepared.should_cleanup { + fs::remove_file(&prepared.output_path).ok(); + } else { + log::info!( + "Persisted partition {partition} locally at {}", + prepared.output_path.display(), + partition = prepared.partition, + ); + } + + Ok(()) + } +}