diff --git a/.gitignore b/.gitignore old mode 100644 new mode 100755 index 12c78f2..17059b2 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ /target/ +/scripts/ /.idea/ -/*.iml \ No newline at end of file +/*.iml +scripts_achal/credentials diff --git a/Cargo.lock b/Cargo.lock index 0e67c77..51ac852 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10,53 +10,101 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" [[package]] name = "ahash" -version = "0.8.3" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", "getrandom", "once_cell", "version_check", + "zerocopy", ] +[[package]] +name = "anstream" +version = "0.6.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96bd03f33fe50a863e394ee9718a706f988b9079b20c3784fb726e7678b62fb" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc" + +[[package]] +name = "anstyle-parse" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c75ac65da39e5fe5ab759307499ddad880d724eed2f6ce5b5e8a26f4f387928c" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648" +dependencies = [ + "windows-sys", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7" +dependencies = [ + "anstyle", + "windows-sys", +] + +[[package]] +name = "anyhow" +version = "1.0.80" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ad32ce52e4161730f7098c077cd2ed6229b5804ccf99e5366be1ab72a98b4e1" + [[package]] name = "bff" version = "0.1.0" dependencies = [ "ahash", + "anyhow", "byteorder", "clap", "flate2", + "glob", + "human_bytes", + "indicatif", "rand", "serde_json", + "sysinfo", "threadpool", "unicode-segmentation", ] -[[package]] -name = "bitflags" -version = "1.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" - -[[package]] -name = "bitflags" -version = "2.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "487f1e0fcbe47deb8b0574e646def1c903389d95241dd1bbcc6ce4a715dfc0c1" - [[package]] name = "byteorder" -version = "1.4.3" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "cc" -version = "1.0.79" +version = "1.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f" +checksum = "8cd6604a82acf3039f1144f54b8eb34e91ffba622051189e71b781822d5ee1f5" [[package]] name = "cfg-if" @@ -66,27 +114,33 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "clap" -version = "4.1.11" +version = "4.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42dfd32784433290c51d92c438bb72ea5063797fc3cc9a21a8c4346bebbb2098" +checksum = "b230ab84b0ffdf890d5a10abdbc8b83ae1c4918275daea1ab8801f71536b2651" dependencies = [ - "bitflags 2.0.2", + "clap_builder", "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae129e2e766ae0ec03484e609954119f123cc1fe650337e155d03b022f24f7b4" +dependencies = [ + "anstream", + "anstyle", "clap_lex", - "is-terminal", - "once_cell", "strsim", - "termcolor", ] [[package]] name = "clap_derive" -version = "4.1.9" +version = "4.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fddf67631444a3a3e3e5ac51c36a5e01335302de677bd78759eaa90ab1f46644" +checksum = "307bc0538d5f0f83b8248db3087aa92fe504e4691294d0c96c0eabc33f47ba47" dependencies = [ "heck", - "proc-macro-error", "proc-macro2", "quote", "syn", @@ -94,57 +148,95 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.3.3" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "033f6b7a4acb1f358c742aaca805c939ee73b4c6209ae4318ec7aca81c42e646" -dependencies = [ - "os_str_bytes", -] +checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" [[package]] name = "cmake" -version = "0.1.49" +version = "0.1.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db34956e100b30725f2eb215f90d4871051239535632f84fea3bc92722c66b7c" +checksum = "a31c789563b815f77f4250caee12365734369f942439b7defd71e18a48197130" dependencies = [ "cc", ] +[[package]] +name = "colorchoice" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" + +[[package]] +name = "console" +version = "0.15.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e1f83fc076bd6dd27517eacdf25fef6c4dfe5f1d7448bafaaf3a26f13b5e4eb" +dependencies = [ + "encode_unicode", + "lazy_static", + "libc", + "unicode-width", + "windows-sys", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" + [[package]] name = "crc32fast" -version = "1.3.2" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +checksum = "b3855a8a784b474f333699ef2bbca9db2c4a1f6d9088a90a2d25b1eb53111eaa" dependencies = [ "cfg-if", ] [[package]] -name = "errno" -version = "0.2.8" +name = "crossbeam-deque" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f639046355ee4f37944e44f60642c6f3a7efa3cf6b78c78a0d989a8ce6c396a1" +checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" dependencies = [ - "errno-dragonfly", - "libc", - "winapi", + "crossbeam-epoch", + "crossbeam-utils", ] [[package]] -name = "errno-dragonfly" -version = "0.1.2" +name = "crossbeam-epoch" +version = "0.9.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" dependencies = [ - "cc", - "libc", + "crossbeam-utils", ] +[[package]] +name = "crossbeam-utils" +version = "0.8.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" + +[[package]] +name = "either" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11157ac094ffbdde99aa67b23417ebdd801842852b500e395a45a9c0aac03e4a" + +[[package]] +name = "encode_unicode" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" + [[package]] name = "flate2" -version = "1.0.25" +version = "1.0.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8a2db397cb1c8772f31494cb8917e48cd1e64f0fa7efac59fbd741a0a8ce841" +checksum = "46303f565772937ffe1d394a4fac6f411c6013172fadde9dcdb1e147a086940e" dependencies = [ "crc32fast", "libz-ng-sys", @@ -153,15 +245,21 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.8" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31" +checksum = "190092ea657667030ac6a35e305e62fc4dd69fd98ac98631e5d3a2b1575a12b5" dependencies = [ "cfg-if", "libc", "wasi", ] +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "heck" version = "0.4.1" @@ -170,100 +268,111 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" [[package]] name = "hermit-abi" -version = "0.2.6" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7" -dependencies = [ - "libc", -] +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" [[package]] -name = "hermit-abi" -version = "0.3.1" +name = "human_bytes" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286" +checksum = "91f255a4535024abf7640cb288260811fc14794f62b063652ed349f9a6c2348e" [[package]] -name = "io-lifetimes" -version = "1.0.9" +name = "indicatif" +version = "0.17.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09270fd4fa1111bc614ed2246c7ef56239a3063d5be0d1ec3b589c505d400aeb" +checksum = "763a5a8f45087d6bcea4222e7b72c291a054edf80e4ef6efd2a4979878c7bea3" dependencies = [ - "hermit-abi 0.3.1", - "libc", - "windows-sys", + "console", + "instant", + "number_prefix", + "portable-atomic", + "unicode-width", ] [[package]] -name = "is-terminal" -version = "0.4.5" +name = "instant" +version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8687c819457e979cc940d09cb16e42a1bf70aa6b60a549de6d3a62a0ee90c69e" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" dependencies = [ - "hermit-abi 0.3.1", - "io-lifetimes", - "rustix", - "windows-sys", + "cfg-if", ] [[package]] name = "itoa" -version = "1.0.6" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" + +[[package]] +name = "lazy_static" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.140" +version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99227334921fae1a979cf0bfdfcc6b3e5ce376ef57e16fb6fb3ea2ed6095f80c" +checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" [[package]] name = "libz-ng-sys" -version = "1.1.8" +version = "1.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4399ae96a9966bf581e726de86969f803a81b7ce795fcd5480e640589457e0f2" +checksum = "c6409efc61b12687963e602df8ecf70e8ddacf95bc6576bcf16e3ac6328083c5" dependencies = [ "cmake", "libc", ] [[package]] -name = "linux-raw-sys" -version = "0.1.4" +name = "miniz_oxide" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f051f77a7c8e6957c0696eac88f26b0117e54f52d3fc682ab19397a8812846a4" +checksum = "9d811f3e15f28568be3407c8e7fdb6514c1cda3cb30683f15b6a1a1dc4ea14a7" +dependencies = [ + "adler", +] [[package]] -name = "miniz_oxide" -version = "0.6.2" +name = "ntapi" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b275950c28b37e794e8c55d88aeb5e139d0ce23fdbbeda68f8d7174abdf9e8fa" +checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4" dependencies = [ - "adler", + "winapi", ] [[package]] name = "num_cpus" -version = "1.15.0" +version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" dependencies = [ - "hermit-abi 0.2.6", + "hermit-abi", "libc", ] +[[package]] +name = "number_prefix" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" + [[package]] name = "once_cell" -version = "1.17.1" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] -name = "os_str_bytes" -version = "6.5.0" +name = "portable-atomic" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ceedf44fb00f2d1984b0bc98102627ce622e083e49a5bacdb3e514fa4238e267" +checksum = "7170ef9988bc169ba16dd36a7fa041e5c4cbeb6a35b76d4c03daded371eae7c0" [[package]] name = "ppv-lite86" @@ -271,44 +380,20 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" -[[package]] -name = "proc-macro-error" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" -dependencies = [ - "proc-macro-error-attr", - "proc-macro2", - "quote", - "syn", - "version_check", -] - -[[package]] -name = "proc-macro-error-attr" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" -dependencies = [ - "proc-macro2", - "quote", - "version_check", -] - [[package]] name = "proc-macro2" -version = "1.0.53" +version = "1.0.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba466839c78239c09faf015484e5cc04860f88242cff4d03eb038f04b4699b73" +checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.26" +version = "1.0.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4424af4bf778aae2051a77b60283332f386554255d722233d09fbfc7e30da2fc" +checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" dependencies = [ "proc-macro2", ] @@ -344,36 +429,56 @@ dependencies = [ ] [[package]] -name = "rustix" -version = "0.36.11" +name = "rayon" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db4165c9963ab29e422d6c26fbc1d37f15bace6b2810221f9d925023480fcf0e" +checksum = "e4963ed1bc86e4f3ee217022bd855b297cef07fb9eac5dfa1f788b220b49b3bd" dependencies = [ - "bitflags 1.3.2", - "errno", - "io-lifetimes", - "libc", - "linux-raw-sys", - "windows-sys", + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", ] [[package]] name = "ryu" -version = "1.0.13" +version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041" +checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" [[package]] name = "serde" -version = "1.0.158" +version = "1.0.197" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fb1c873e1b9b056a4dc4c0c198b24c3ffa059243875552b2bd0933b1aee4ce2" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.197" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "771d4d9c4163ee138805e12c710dd365e4f44be8be0503cb1bb9eb989425d9c9" +checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] [[package]] name = "serde_json" -version = "1.0.94" +version = "1.0.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c533a59c9d8a93a09c6ab31f0fd5e5f4dd1b8fc9434804029839884765d04ea" +checksum = "c5f09b1bd632ef549eaa9f60a1f8de742bdbc698e6cee2095fc84dde5f549ae0" dependencies = [ "itoa", "ryu", @@ -382,15 +487,15 @@ dependencies = [ [[package]] name = "strsim" -version = "0.10.0" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +checksum = "5ee073c9e4cd00e28217186dbe12796d692868f432bf2e97ee73bed0c56dfa01" [[package]] name = "syn" -version = "1.0.109" +version = "2.0.52" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +checksum = "b699d15b36d1f02c3e7c69f8ffef53de37aefae075d8488d4ba1a7788d574a07" dependencies = [ "proc-macro2", "quote", @@ -398,12 +503,18 @@ dependencies = [ ] [[package]] -name = "termcolor" -version = "1.2.0" +name = "sysinfo" +version = "0.30.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6" +checksum = "0c385888ef380a852a16209afc8cfad22795dd8873d69c9a14d2e2088f118d18" dependencies = [ - "winapi-util", + "cfg-if", + "core-foundation-sys", + "libc", + "ntapi", + "once_cell", + "rayon", + "windows", ] [[package]] @@ -417,15 +528,27 @@ dependencies = [ [[package]] name = "unicode-ident" -version = "1.0.8" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5464a87b239f13a63a501f2701565754bae92d243d4bb7eb12f6d57d2269bf4" +checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" [[package]] name = "unicode-segmentation" -version = "1.10.1" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202" + +[[package]] +name = "unicode-width" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51733f11c9c4f72aa0c160008246859e340b00807569a0da0e7a1079b27ba85" + +[[package]] +name = "utf8parse" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1dd624098567895118886609431a7c3b8f516e41d30e0643f03d94592a147e36" +checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "version_check" @@ -456,34 +579,44 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" [[package]] -name = "winapi-util" -version = "0.1.5" +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows" +version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" dependencies = [ - "winapi", + "windows-core", + "windows-targets", ] [[package]] -name = "winapi-x86_64-pc-windows-gnu" -version = "0.4.0" +name = "windows-core" +version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets", +] [[package]] name = "windows-sys" -version = "0.45.0" +version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ "windows-targets", ] [[package]] name = "windows-targets" -version = "0.42.2" +version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" +checksum = "7dd37b7e5ab9018759f893a1952c9420d060016fc19a472b4bb20d1bdd694d1b" dependencies = [ "windows_aarch64_gnullvm", "windows_aarch64_msvc", @@ -496,42 +629,62 @@ dependencies = [ [[package]] name = "windows_aarch64_gnullvm" -version = "0.42.2" +version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" +checksum = "bcf46cf4c365c6f2d1cc93ce535f2c8b244591df96ceee75d8e83deb70a9cac9" [[package]] name = "windows_aarch64_msvc" -version = "0.42.2" +version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" +checksum = "da9f259dd3bcf6990b55bffd094c4f7235817ba4ceebde8e6d11cd0c5633b675" [[package]] name = "windows_i686_gnu" -version = "0.42.2" +version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" +checksum = "b474d8268f99e0995f25b9f095bc7434632601028cf86590aea5c8a5cb7801d3" [[package]] name = "windows_i686_msvc" -version = "0.42.2" +version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" +checksum = "1515e9a29e5bed743cb4415a9ecf5dfca648ce85ee42e15873c3cd8610ff8e02" [[package]] name = "windows_x86_64_gnu" -version = "0.42.2" +version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" +checksum = "5eee091590e89cc02ad514ffe3ead9eb6b660aedca2183455434b93546371a03" [[package]] name = "windows_x86_64_gnullvm" -version = "0.42.2" +version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" +checksum = "77ca79f2451b49fa9e2af39f0747fe999fcda4f5e241b2898624dca97a1f2177" [[package]] name = "windows_x86_64_msvc" -version = "0.42.2" +version = "0.52.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8" + +[[package]] +name = "zerocopy" +version = "0.7.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" +checksum = "74d4d3961e53fa4c9a25a8637fc2bfaf2595b3d3ae34875568a5cf64787716be" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/Cargo.toml b/Cargo.toml old mode 100644 new mode 100755 index ad43e7d..22b32f5 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,14 +3,25 @@ name = "bff" version = "0.1.0" edition = "2021" + # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +sysinfo="0.30.7" clap = { version = "4.1.11", features = ["derive"] } -flate2 = { version = "1.0", features = ["zlib-ng"], default-features = false } +flate2 = { version = "1.0.28", features = ["zlib-ng"], default-features = false } serde_json = "1.0" unicode-segmentation = "1.7" rand = "0.8.4" ahash = { version = "0.8.1", features = ["runtime-rng"] } byteorder = "1" -threadpool = "1.8.1" \ No newline at end of file +threadpool = "1.8.1" +human_bytes = "0.4.3" +indicatif = "0.17.8" +glob = "0.3.1" +anyhow = "1.0.80" +tokio = { version = "1.36.0", features = ["full"] } +aws-config = "1.1.8" +aws-sdk-s3 = "1.19.1" +async-compression = {version ="0.4.6", features=["all"]} +rayon = "1.10.0" \ No newline at end of file diff --git a/README.md b/README.md index 2e500b7..5501c99 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,7 @@ BFF === The big friendly filter 😁 +(originally written by Dirk @ AI2, updated by me) Getting started --------------- @@ -12,77 +13,37 @@ Getting started 2. Run `cargo build --release`. It places the binary at `target/release/bff`. 3. Run `./target/release/bff --help` to see the available options. + Examples -------- - -### Deduplicating a file against itself - -This is how you deduplicate a file against itself: -```bash -target/release/bff \ - --bloom-filter-file filter.bff \ - --bloom-filter-size 274877906944 \ - --expected-ngram-count 1000000000 \ - --output-directory deduped/ \ - input.json.gz -``` - -This creates the filter at `filter.bff`, with a size of 256 GB. -This size should be a little smaller than the amount of main memory you have. -It calculates the optimal setup for the filter based on the expected number of ngrams. -Getting that number right is very important. -If in doubt, guess high. -It's safer to guess a higher number than a lower number. -The filter will be created in memory, and only written to disk at the end of the job. - -### Deduplicating multiple files - -To get a lot of speed out of `bff`, you have to process multiple files at once: -```bash -target/release/bff \ - --bloom-filter-file filter.bff \ - --bloom-filter-size 274877906944 \ - --expected-ngram-count 1000000000 \ - --output-directory deduped/ \ - *.json.gz +There are three modes `bff` (local input -> local output), `bff-remote` (S3 input -> S3 output), and `sysreq` (for assessing system requirements). We always need an input, output, false positive rate, and expected number of ngrams. But then there's some optional hyperparameters: + +- `--min-ngram-size`: In pargraph/both mode, we ignore any paragraphs shorter than this. Defaults to 5. +- `--max-ngram-size`: The "working width" of shinglings of ngrams: e.g., for long paragraphs/documents, we check membership over ngrams of this size. Defaults to 13. +- `--filtering-threshold`: If at least this fraction of ngrams is present, we remove the entire paragraph/document. Defaults to 0.8 + +And some REMOTE ONLY arguments: +- `--shard-num`: For large nummbers of files, sharding is helpful. This selects some subset of the files. Defaults to 0 +- `--num-shards`: Dictates how many shards we have. Defaults to 1. + +### Deduplicating local files: +For files that exist locally, say a directory `to_be_deduped/`, we can output deduplicated versions of these files in `has_been_deduped/` like: +```cargo run --release bff \ + --inputs to_be_deduped \ + --output-directory has_been_deduped \ + --expected-ngram-count 12345678 \ + --fp-rate 0.01 ``` -Each input file will run in its own thread, and the filter will be shared between them. -In the end, as before the filter will be written to disk. - -### Pre-load the filter - -You can stick ngrams into the filter ahead of time, for example if you want to decontaminate your dataset: -```bash -target/release/bff \ - --bloom-filter-file decontaminating_filter.bff \ - --bloom-filter-size 274877906944 \ - --expected-ngram-count 1000000000 \ - --output-directory deduped/ \ - --filtering-threshold 1.0 \ - my_test_set.json.gz +### Deduplicating remote files +For files that exist on S3, say with the prefix `s3://my-bucket/to_be_deduped/`, we can output deduplicated versions of these files in `s3://my-bucket/has_been_deduped` like: +``` cargo run --release bff-remote \ +--bucket my-bucket \ +--input-dir to_be_deduped \ +--output_dir has_been_deduped \ +--expected-ngram-count 12345678 \\ +--fp-rate 0.01 ``` -This will copy the output unchanged to the `deduped/` directory, but it will also produce a filter that you can use afterwards. -It is important that you still take a good guess at the ngram count you expect to see when you do the actual -deduplication. -The parameters of the bloom filter are baked in when you first create the file, so you have to guess right the -first time. - -### Only decontaminate - -If you only want to decontaminate, but not deduplicate against itself, you can do that by using the filter -you just created in the previous step: -```bash -target/release/bff \ - --bloom-filter-file decontaminating_filter.bff \ - --bloom-filter-size 274877906944 \ - --expected-ngram-count 1000000000 \ - --output-directory deduped/ \ - --update-bloom-filter false \ - *.json.gz -``` +There's also some options to preload or save the bloom filter itself, but you can check the code for those. -If you are using the filter this way, you can use the number of ngrams in the decontamination set for the -`--expected-ngram-count` parameter. -Since this is usually much smaller, it might make the filter run faster. diff --git a/scripts/s5cmd_wrapper.sh b/scripts/s5cmd_wrapper.sh new file mode 100755 index 0000000..b4efa84 --- /dev/null +++ b/scripts/s5cmd_wrapper.sh @@ -0,0 +1,116 @@ +#!/bin/bash + + +# Default args +fp_rate="0.01" +filtering_threshold="0.8" +min_ngram_size="20" +max_ngram_size="20" +remove_type="both" +shard_num="0" +total_shards="1" + + +# Parsing the args + +while [[ $# -gt 0 ]]; do + case $1 in + --s3-input=*) # REQUIRED + s3_input="${1#*=}" + shift + ;; + --s3-output=*) # REQUIRED + s3_output="${1#*=}" + shift + ;; + --local-dir=*) #REQUIRED + local_dir="${1#*=}" + shift + ;; + --expected-ngram-count=*) # REQUIRED + expected_ngram_count="${1#*=}" + shift + ;; + --fp-rate=*) # default 0.01 + fp_rate="${1#*=}" + shift + ;; + --min-ngram-size=*) # default 20 + min_ngram_size="${1#*=}" + shift + ;; + --max-ngramsize=*) # default 20 + max_ngram_size="${1#*=}" + shift + ;; + --filtering-threshold=*) # default 0.8 + filtering_threshold="${1#*=}" + shift + ;; + --remove-type=*) # default both + remove_type="${1#*=}" + shift + ;; + --shard-num=*) # default 0 + shard_num="${1#*=}" + shift + ;; + --total-shards=*) # default 1 + total_shards="${1#*=}" + shift + ;; + *) + echo "Unknown argument: $1" + shift + ;; + esac +done + +# Run some asserts that the up/down is specified correctly +if [[ -z $s3_input ]]; then + echo "Error: --s3-input is required and cannot be empty." + exit 1 +fi + +if [[ -z $s3_output ]]; then + echo "Error: --s3-output is required and cannot be empty." + exit 1 +fi + +if [[ -z $local_dir ]]; then + echo "Error: --local-dir is required and cannot be empty." + exit 1 +fi + + +# Actually do the thing: +input_dir=${local_dir%/}/input/ +output_dir=${local_dir%/}/output/ + +mkdir -p $input_dir +mkdir -p $output_dir + +echo "Downloading S3 files to local..." +s5cmd cp --show-progress "${s3_input%/}/*" $input_dir + + +echo "Running BFF..." +cargo run --release bff \ +--inputs $input_dir \ +--output-directory $output_dir \ +--expected-ngram-count $expected_ngram_count \ +--fp-rate $fp_rate \ +--min-ngram-size $min_ngram_size \ +--max-ngram-size $max_ngram_size \ +--filtering-threshold $filtering_threshold \ +--remove-type $remove_type \ +--shard-num $shard_num \ +--total-shards $total_shards \ + + +echo "Uploading dedup'ed data to S3..." +s5cmd cp --show-progress $output_dir $s3_output + + + + diff --git a/src/main.rs b/src/main.rs old mode 100644 new mode 100755 index 1fad7c7..903ea16 --- a/src/main.rs +++ b/src/main.rs @@ -1,103 +1,156 @@ use ahash::RandomState; +use anyhow::{anyhow, Result}; use byteorder::{LittleEndian, NativeEndian, ReadBytesExt, WriteBytesExt}; -use clap::Parser; +use clap::{Parser, Subcommand}; use flate2::read::MultiGzDecoder; use flate2::write::GzEncoder; use flate2::Compression; -use rand::Rng; +use rand::{Rng, thread_rng}; +use rand::seq::SliceRandom; use serde_json::Value; use std::clone::Clone; use std::collections::VecDeque; -use std::fs::OpenOptions; use std::hash::{BuildHasher, Hash, Hasher}; use std::io; use std::io::{BufRead, BufReader, BufWriter, Write}; use std::mem::size_of; -use std::path::PathBuf; +use std::path::{PathBuf}; use std::sync::atomic::{AtomicU32, Ordering}; -use std::sync::Arc; use std::thread::available_parallelism; use threadpool::ThreadPool; use unicode_segmentation::UnicodeSegmentation; +use rayon::prelude::*; +use sysinfo::{ + System, +}; +use glob::glob; +use human_bytes::human_bytes; +use std::fs::{OpenOptions, remove_file, create_dir_all}; +use std::sync::{Arc, Mutex}; +use indicatif::{ProgressBar,ProgressStyle}; +use std::time::{Instant}; + + +#[derive(Parser)] +#[clap(author, version, about, long_about = None)] +struct ArgParser { + #[clap(subcommand)] + command: Commands, +} -#[derive(Parser, Debug)] -struct Args { - #[arg(long)] - bloom_filter_file: PathBuf, - - /// The size of the bloom filter in bytes. If the filter already exists, this parameter is - /// ignored. - #[arg(long)] - bloom_filter_size: usize, - - /// The number of expected ngrams. This is used to calculate the optimal number of hashers. - /// If the filter already exists, this parameter is ignored. - #[arg(long)] - expected_ngram_count: usize, - - /// The smallest ngram size to consider. Paragraphs that have fewer than this number of tokens - /// are not deduplicated and always kept. These ngrams are never added to the bloom filter. - /// Note that this value only matters if the paragraph has fewer tokens than the max ngram size. - #[arg(long, default_value_t = 5)] - min_ngram_size: usize, +#[derive(Subcommand, Debug)] +enum Commands { + #[clap(arg_required_else_help = true)] + Bff { + /// (List of) directories or files that are jsonl.gz files + #[arg(required=true, long)] + inputs: Vec, + + /// Output directory where the deduplicated files will end up. + /// These will have the same basename as the inputs, so it is up to you to ensure no collisions here! + #[arg(required=true, long)] + output_directory: PathBuf, + + /// If specified, tries to load the bloom filter from this file, and will save once complete. + /// If unspecified, will not save the bloom filter at the end + #[arg(long)] + bloom_filter_file: Option, + + /// The number of expected ngrams. This is used to calculate the optimal number of hashers. + /// If the filter already exists, this parameter is ignored. + #[arg(required=true, long)] + expected_ngram_count: usize, + + /// The desired false positive rate + /// Note that this is a per-ngram FP rate, and not a per-paragraph rate + #[arg(required=true, long)] + fp_rate: f64, + + /// The smallest ngram size to consider. Paragraphs that have fewer than this number of tokens + /// are not deduplicated and always kept. These ngrams are never added to the bloom filter. + /// Note that this value only matters if the paragraph has fewer tokens than the max ngram size. + #[arg(long, default_value_t = 20)] + min_ngram_size: usize, + + /// The largest ngram size to consider. Paragraphs are deduplicated based on the number of + /// ngrams of this size that are already present in the bloom filter. + #[arg(long, default_value_t = 20)] + max_ngram_size: usize, + + /// If this fraction of ngrams of the max ngram size are already present in the bloom filter, + /// the paragraph is considered a duplicate and is discarded. + /// Set this to 0 to never produce any output. This is useful when you want to prime the filter + /// with some content that should be considered duplicates, without deduplicating that content + /// itself. + #[arg(long, default_value_t = 0.80)] + filtering_threshold: f64, + + /// Which "BFF mode" we're in. We have options of 'paragraph', 'document', 'both' + /// indicating we remove individual paragraphs/documents if duplicated + /// The logic for "both" mode is a bit subtle. See comments below + #[arg(long, default_value_t = RemoveType::Paragraph, value_enum)] + remove_type: RemoveType, + + /// Whether or not to update the bloom filter. If this is true, the filter is not updated, but + /// the input is still deduplicated based on the filter. Default is false. + #[arg(long, default_value_t = false)] + no_update_bloom_filter: bool, + + /// If this is true, we keep the input intact, but we add an annotation to each document that + /// explains which spans from the text would have been deleted. + #[arg(long, default_value_t = false)] + annotate: bool, + + /// The number of threads to use for processing. + /// If this is 0, the number of threads is automatically determined. + #[arg(long, short = 't', default_value_t = 0)] + threads: usize, + + /// If this flag is present, we will never save a bloom filter to disk + #[arg(long, default_value_t = false)] + no_save_bloom_filter: bool, + + + /// Turn this flag on if we don't want to use a progress bar + /// Helpful when running through ssh and wanting to check progress via logs and not a terminal + #[arg(long, default_value_t = false)] + no_progress_bar: bool, + + /// For virtual "sharding", this param and the next one subselect files to deduplicate together + /// Defaults to no virtual sharding + #[arg(long, default_value_t=0)] + shard_num: usize, + + #[arg(long, default_value_t=1)] + total_shards: usize, + }, + + Sysreq { + /// Handy tool to help guess RAM requirements for a given pool of data + #[arg(required=true, long)] + expected_ngram_count: usize, + #[arg(required=true, long)] + fp_rate: f64 + }, + +} - /// The largest ngram size to consider. Paragraphs are deduplicated based on the number of - /// ngrams of this size that are already present in the bloom filter. - #[arg(long, default_value_t = 13)] - max_ngram_size: usize, +#[derive(Debug, Clone, Eq, PartialEq, clap::ValueEnum)] +enum RemoveType { + /// Types for what we check to see if is a duplicate - /// If this fraction of ngrams of the max ngram size are already present in the bloom filter, - /// the paragraph is considered a duplicate and is discarded. - /// Set this to 0 to never produce any output. This is useful when you want to prime the filter - /// with some content that should be considered duplicates, without deduplicating that content - /// itself. - #[arg(long, default_value_t = 0.80)] - filtering_threshold: f64, + ///Checks each paragraph of size >=min_ngram_size if it is duplicated. If so, it gets removed + Paragraph, + + /// Checks if enough of the ngrams of size ==max_ngram_size (or just one ngram if tokens in range [min_ngram_size, max_ngram_size]) + /// and if enough are present in filter, the whole document gets removed + Document, - /// Whether or not to update the bloom filter. If this is true, the filter is not updated, but - /// the input is still deduplicated based on the filter. Default is false. - #[arg(long, default_value_t = false)] - no_update_bloom_filter: bool, - - /// If this is true, we keep the input intact, but we add an annotation to each document that - /// explains which spans from the text would have been deleted. - #[arg(long, default_value_t = false)] - annotate_only: bool, - - /// If this is true, we only write out document id and source, and annotate which spans would - /// have been deleted. This produces an attribute file per the llm-data specification. - #[arg(long, default_value_t = false)] - annotate_attribute_only: bool, - - /// If you want ngrams to span across paragraph breaks, set this to true. - /// This also means that bff will only remove a complete document at a time. When this happens - /// the resulting document will be empty. This also means that deduplication within a document - /// no longer works. All in all, it might be best to only use this when you're also using - /// --annotate-only. - #[arg(long, default_value_t = false)] - whole_document: bool, - - /// If you want to always match whole paragraphs instead of ngrams, set this to true. - /// Paragraphs smaller than min_ngram_size will still be excluded. - #[arg(long, default_value_t = false)] - whole_paragraphs: bool, - - /// The number of threads to use for processing. - /// If this is 0, the number of threads is automatically determined. - #[arg(long, short = 't', default_value_t = 0)] - threads: usize, - - /// Input files. These are expected to be gzip compressed newline-delimited JSON files with a - /// "text" field. - #[arg(index = 1)] - inputs: Vec, - - /// Output directory. The output files will have the same name as the input files, but be placed - /// in this directory. - #[arg(long, short = 'o')] - output_directory: PathBuf, + /// Does paragraph removal, BUT if enough of the paragraph ngram checks are contained, removes the whole document + Both, } + fn tokenize(s: &str) -> impl Iterator { s.split_word_bounds().filter(|w| { for c in w.chars() { @@ -109,6 +162,11 @@ fn tokenize(s: &str) -> impl Iterator { }) } + +/*============================================================= += Bloom Filter stuff = +==============================================================*/ + struct BloomFilter { bits: Vec, hash_builder_seeds: Vec<[u64; 4]>, // RandomState does not store its seeds, so we have to store them ourselves. @@ -137,19 +195,6 @@ impl BloomFilter { (1.0 - (1.0 - (1.0 / m)).powf(k * n)).powf(k) } - fn suggest_size_in_bytes(expected_elements: usize) -> usize { - let mut size_in_bytes = 1024 * 1024; - while size_in_bytes < usize::MAX / 2 - && Self::prob_of_false_positive( - size_in_bytes, - expected_elements, - Self::optimal_number_of_hashers(size_in_bytes, expected_elements), - ) > 0.01 - { - size_in_bytes *= 2; - } - size_in_bytes - } fn my_prob_of_false_positive(&self, expected_elements: usize) -> f64 { Self::prob_of_false_positive( @@ -163,6 +208,17 @@ impl BloomFilter { self.bits.len() * size_of::() } + fn calculate_sparsity(&self) -> f64 { + let set_bits:usize = self.bits.par_iter() + .map(|atomic| { + let value = atomic.load(std::sync::atomic::Ordering::Relaxed); + value.count_ones() as usize + }) + .sum(); + let total_bits = self.size_in_bytes() * 8; + return (set_bits as f64) / (total_bits as f64); + } + fn new(size_in_bytes: usize, num_hashers: usize) -> Self { let mut rng = rand::thread_rng(); let mut hash_builder_seeds = Vec::with_capacity(num_hashers); @@ -175,12 +231,11 @@ impl BloomFilter { hash_builder_seeds.push(seeds); } - let mut bits = Vec::new(); let number_of_u32 = size_in_bytes / size_of::(); - bits.reserve_exact(number_of_u32); - for _ in 0..number_of_u32 { - bits.push(AtomicU32::new(0)); - } + let bits = { + (0..number_of_u32).into_par_iter().map(|_| AtomicU32::default()).collect() + }; + Self { bits, @@ -189,6 +244,8 @@ impl BloomFilter { } } + + fn from_file(path: &PathBuf) -> io::Result { let mut file = OpenOptions::new() .read(true) @@ -304,7 +361,6 @@ impl BloomFilter { return false; } } - true } @@ -313,8 +369,91 @@ impl BloomFilter { let hashes = self.hashes(s); self.contains_hashes(&hashes) } + + + fn from_args(bloom_filter_file: Option, expected_ngram_count: usize, fp_rate: f64,) -> Self { + /* Uses the CLI args to build a bloom filter + Logic: + - Check if file exists, if so, just load it and return + - Get size: + + if size is explicitly speciifed, use this + + otherwise, compute based on ngrams + fp rate + - Return + */ + + let bloom_filter = match &bloom_filter_file { + Some(path) if path.exists() => { + println!("Loading bloom filter from {:?}...", path); + BloomFilter::from_file(&path).unwrap() + } + _ => { + println!("Creating new bloom filter..."); + let bloom_filter_size = compute_bloom_size(fp_rate, expected_ngram_count, true); + let num_hashers = BloomFilter::optimal_number_of_hashers( + bloom_filter_size, + expected_ngram_count, + ); + BloomFilter::new(bloom_filter_size, num_hashers) + } + }; + + println!("Bloom filter has size {} | FP Rate {:?}", + human_bytes(bloom_filter.size_in_bytes() as f64), + bloom_filter.my_prob_of_false_positive(expected_ngram_count)); + bloom_filter + } + +} + + + +fn compute_bloom_size(fp_rate: f64, expected_ngram_count: usize, limit_to_sys: bool) -> usize { + /* Uses binary search to find optimal size of bloom filter using optimal number of hashers + and provided ngram counts + */ + // compute 90% of system ram + let mut sys = System::new_all(); + sys.refresh_all(); + + + let mut lo = 1 as usize; + + let mut hi = if limit_to_sys { + ((sys.total_memory() as f64) * 0.9) as usize + } else { + 420_744_073_709_551_615 as usize + }; + + + // Save some time by checking endpoint first + if limit_to_sys && BloomFilter::prob_of_false_positive(hi, expected_ngram_count, + BloomFilter::optimal_number_of_hashers(hi, expected_ngram_count)) > fp_rate { + println!( + "WARNING: To achieve desired false-positive rate, you'd need >90% of system RAM. Defaulting to 90% \ + system RAM."); + return hi; + } + + // Then do binary search to find optimal size + while lo < hi-1 { + let mid = lo + (hi - lo) / 2; + let num_hashers = BloomFilter::optimal_number_of_hashers(mid, expected_ngram_count); + let computed_fp = BloomFilter::prob_of_false_positive(mid, expected_ngram_count, num_hashers) ; + if computed_fp > fp_rate { + // FP rate too high, need to go bigger + lo = mid + 1; + } else { + // FP rate too low, can make bloom filter smaller + hi = mid -1; + } + } + hi } + + + + #[allow(clippy::too_many_arguments)] // TODO : abstract parameters into a struct fn process_file( input_file: &PathBuf, @@ -322,13 +461,14 @@ fn process_file( bloom_filter: &Arc, max_ngram_size: usize, min_ngram_size: usize, - update_bloom_filter: bool, + remove_type: &RemoveType, filtering_threshold: f64, - annotate_only: bool, - annotate_attribute_only: bool, - whole_document: bool, - whole_paragraphs: bool, -) -> Result<(), io::Error> { + no_update_bloom_filter: bool, + annotate: bool, + pbar_option: &Option>>, +) -> Result<(usize, usize), io::Error> { + + // Setup input/output writers let input_file = OpenOptions::new() .read(true) .write(false) @@ -336,6 +476,7 @@ fn process_file( .open(input_file)?; let reader = BufReader::with_capacity(1024 * 1024, MultiGzDecoder::new(input_file)); + let output_file_pathbuf = output_file; let output_file = OpenOptions::new() .read(false) .write(true) @@ -347,183 +488,343 @@ fn process_file( GzEncoder::new(output_file, Compression::default()), ); + // Loop over lines and do bff stuff + let mut count = 0; + let mut fully_skipped = 0; + let mut removed_text_bytes = 0; + let mut total_text_bytes = 0; for line in reader.lines() { - let line = line.unwrap(); - let mut data: Value = serde_json::from_str(&line).unwrap(); - let text = data["text"].as_str().unwrap(); + count += 1; + let (dedup_data, removed_line_bytes, total_line_bytes) = process_line(&line.unwrap(), &bloom_filter, + min_ngram_size, max_ngram_size, + remove_type, filtering_threshold, + no_update_bloom_filter, annotate); + removed_text_bytes += removed_line_bytes; + total_text_bytes += total_line_bytes; + if dedup_data.get("text").unwrap().as_str().unwrap().trim().is_empty() { + fully_skipped += 1 + } + else { + serde_json::to_writer(&mut writer, &dedup_data)?; + writer.write_all(b"\n")?; + } + } - let newlines = if whole_document { - vec![0, text.len()] - } else { - let mut newlines = Vec::new(); - newlines.push(0); - for i in text.match_indices('\n') { - newlines.push(i.0); - } - newlines.push(text.len()); - newlines - }; - let mut windows_to_remove = Vec::new(); - let mut total_contained_ngrams = 0; - - for paragraph_window in newlines.windows(2) { - let paragraph = &text[paragraph_window[0]..paragraph_window[1]]; - - // calculate hashes for the paragraph - let mut hashes: Vec> = Vec::new(); - let mut ngram: VecDeque<&str> = VecDeque::with_capacity(max_ngram_size); - for token in tokenize(paragraph) { - ngram.push_back(token); - // If not hashing whole paragraphs, add ngrams to the bloom filter as they reach max size - if !whole_paragraphs && ngram.len() >= max_ngram_size { - hashes.push(bloom_filter.hashes(&ngram)); - ngram.pop_front(); - } - } - // If the paragraph was too short, put in a shorter ngram, so we can dedupe short - // paragraphs exactly. - if hashes.is_empty() && ngram.len() >= min_ngram_size { + if count == fully_skipped { + remove_file(output_file_pathbuf)?; + } + match pbar_option { + Some(pbar) => { + let pb = pbar.lock().unwrap(); + pb.inc(1); + } + None => (), + } + Ok((removed_text_bytes, total_text_bytes)) +} + + +fn process_line(line: &String, bloom_filter: &BloomFilter, min_ngram_size: usize, max_ngram_size: usize, + remove_type: &RemoveType, filtering_threshold: f64, no_update_bloom_filter: bool, annotate: bool) -> + (serde_json::Value, usize, usize) { + // Main BFF logic: processes a single json document + // Does the following (handling the {paragraph, document, both} cases) + // 1. Breaks document into units (paragraph/both -> paragraph; document -> full text) + // 2. For each unit, tokenize and + // a. if num_tokens < min_ngram_size: do nothing, leave this unit intact + // b. if num_tokens >= max_ngram_size: break unit into ngram-shingling of max_ngram_size + // c. else, full unit is treated as one ngram + // 3. Check containment of each ngram in bloom filter. + // a. If > filtering_threshold contained, mark unit for deletion + // 4. If unit survives step 3, add all ngrams into bloom filter + // 5. BOTH-mode ONLY: If total_contained_ngrams * threshold >= total_ngrams, omit the WHOLE document + + // Outputs are (output_json, total_removed_bytes, total_input_bytes) + // If annotate is turned on, nothing gets removed, text is left intact, but byte-windows-removed + + + let mut data: Value = serde_json::from_str(&line).unwrap(); + let mut total_bytes = 0; + let mut removed_bytes = 0; + let text = data["text"].as_str().unwrap(); + + // Step 1: Break text into "units" + let newlines = if *remove_type == RemoveType::Document { + vec![0, text.len()] + } else { + let mut newlines = Vec::new(); + newlines.push(0); + for i in text.match_indices('\n') { + newlines.push(i.0); + } + newlines.push(text.len()); + newlines + }; + let mut windows_to_remove = Vec::new(); + + + let mut total_ngrams = 0; + let mut total_contained_ngrams = 0; + for paragraph_window in newlines.windows(2) { + let paragraph = &text[paragraph_window[0]..paragraph_window[1]]; + total_bytes += paragraph.len(); + + + // Step 2: Tokenize and chunk into ngram shinglings, hash each one for the bff + let mut hashes: Vec> = Vec::new(); + let mut ngram: VecDeque<&str> = VecDeque::with_capacity(max_ngram_size); + for token in tokenize(paragraph) { + ngram.push_back(token); + if ngram.len() >= max_ngram_size { // Step 2b: ngram shingling long enough hashes.push(bloom_filter.hashes(&ngram)); + ngram.pop_front(); } + } + // Step 2c: unit is short, but not TOO SHORT + if hashes.is_empty() && ngram.len() >= min_ngram_size { + hashes.push(bloom_filter.hashes(&ngram)); + } - let contained_ngrams = hashes - .iter() - .filter(|ngram| bloom_filter.contains_hashes(ngram)) - .count(); - total_contained_ngrams += contained_ngrams; - - // calculate how many ngrams are in the bloom filter - let number_of_ngrams = hashes.len(); - - // produce output - let too_many_duplicate_ngrams = - contained_ngrams as f64 / number_of_ngrams as f64 > filtering_threshold; - if too_many_duplicate_ngrams { - windows_to_remove.push(paragraph_window); - } else if update_bloom_filter { - for ngram in hashes { - bloom_filter.insert_hashes(&ngram); - } + + // Step 3: check containment of ngrams + let contained_ngrams = hashes + .iter() + .filter(|ngram| bloom_filter.contains_hashes(ngram)) + .count(); + total_ngrams += hashes.len(); + total_contained_ngrams += contained_ngrams; + let number_of_ngrams = hashes.len() as f64; + //windows_to_remove.ansoteuhoausenh(); + let should_remove = contained_ngrams as f64 / number_of_ngrams > filtering_threshold; + if should_remove { + windows_to_remove.push(paragraph_window); + removed_bytes += paragraph.len(); + } else if !no_update_bloom_filter { + // Step 4: add all ngrams to the bloom filter if we don't remove it + for ngram in hashes { + bloom_filter.insert_hashes(&ngram); } } + } - // if annotate_attribute_only or annotate_only, add the annotation to the json - if annotate_attribute_only || annotate_only { - data["bff_duplicate_spans"] = serde_json::to_value(windows_to_remove).unwrap(); - data["bff_contained_ngram_count"] = - serde_json::to_value(total_contained_ngrams).unwrap(); - } else { - let mut output_paragraphs = String::new(); - let mut last_end = 0; - for paragraph_window in windows_to_remove { - output_paragraphs.push_str(&text[last_end..paragraph_window[0]]); - last_end = paragraph_window[1]; - } - output_paragraphs.push_str(&text[last_end..]); - data["text"] = Value::String(output_paragraphs); - data["bff_contained_ngram_count_before_dedupe"] = - serde_json::to_value(total_contained_ngrams).unwrap(); + // Step 5: Handle the both case + let temp_window = vec![0, text.len()]; + if *remove_type == RemoveType::Both && + (total_contained_ngrams as f64) / (total_ngrams as f64) > filtering_threshold { + windows_to_remove.clear(); + windows_to_remove.push(&temp_window); + } + + // Format outputs: + if annotate { + data["bff_duplicate_spans"] = serde_json::to_value(windows_to_remove).unwrap(); + data["bff_contained_ngram_count"] = serde_json::to_value(total_contained_ngrams).unwrap(); + } else { + let mut output_paragraphs = String::new(); + let mut last_end = 0; + for paragraph_window in windows_to_remove { + output_paragraphs.push_str(&text[last_end..paragraph_window[0]]); + last_end = paragraph_window[1]; } + output_paragraphs.push_str(&text[last_end..]); + data["text"] = Value::String(output_paragraphs); + } + + (data, removed_bytes, total_bytes) +} - if annotate_attribute_only { - // Allowed fields - let allowed_fields = [ - "bff_duplicate_spans", - "bff_contained_ngram_count", - "id", - "source", - ]; - // Iterate through the keys of the JSON object and remove any field that is not in the allowed_fields list - if let Value::Object(ref mut map) = data { - map.retain(|key, _| allowed_fields.contains(&key.as_str())); - } - + + +/*======================================================== += I/O Stuff = +========================================================*/ + +fn expand_dirs(paths: &[PathBuf]) -> Result> { + let mut files = vec![]; + for path in paths { + if path.is_dir() { + let path_str = path + .to_str() + .ok_or_else(|| anyhow!("invalid path '{}'", path.to_string_lossy()))?; + for entry in glob(&format!("{}/**/*.json*.gz", path_str))? { + files.push(entry?.to_path_buf()); + } + } else { + files.push(path.clone()); } + } + Ok(files) +} - serde_json::to_writer(&mut writer, &data)?; - writer.write_all(b"\n")?; +fn create_dir_if_not_exists(path: &PathBuf) -> Result<(), std::io::Error> { + match create_dir_all(path) { + Ok(_) => Ok(()), + Err(err) => { + if err.kind() == std::io::ErrorKind::AlreadyExists { + Ok(()) + } else { + Err(err) + } + } } +} + + +/*============================================================= += Main Function = +=============================================================*/ + + +fn main() -> Result<()> { + let args = ArgParser::parse(); + + match &args.command { + Commands::Bff {inputs, output_directory, bloom_filter_file, expected_ngram_count, + fp_rate, min_ngram_size, max_ngram_size, filtering_threshold, + remove_type, no_update_bloom_filter, annotate, + threads, no_save_bloom_filter, no_progress_bar, shard_num, total_shards} => + { + assert!(shard_num < total_shards, "Shard num must be < total shards"); + bff(inputs, output_directory, bloom_filter_file, expected_ngram_count, + fp_rate, min_ngram_size, max_ngram_size, filtering_threshold, + remove_type, no_update_bloom_filter, annotate, + threads, no_save_bloom_filter, no_progress_bar, shard_num, total_shards)?; + }, + Commands::Sysreq {expected_ngram_count, fp_rate} => { + let bff_size = compute_bloom_size(*fp_rate, *expected_ngram_count, false); + let num_hashers = BloomFilter::optimal_number_of_hashers(bff_size, *expected_ngram_count); + println!("To handle {} tokens with fp rate {}, you'd need a filter of size {} and {} hashers", + expected_ngram_count, fp_rate, human_bytes(bff_size as f64), num_hashers); + }, + + } Ok(()) } -fn main() { - let args = Args::parse(); - let threads = if args.threads == 0 { + + +fn bff(inputs: &Vec, output_directory: &PathBuf, bloom_filter_file: &Option, + expected_ngram_count: &usize, fp_rate: &f64, min_ngram_size: &usize, max_ngram_size: &usize, + filtering_threshold: &f64, remove_type: &RemoveType, no_update_bloom_filter: &bool, + annotate: &bool, threads: &usize, no_save_bloom_filter: &bool, + no_progress_bar: &bool, shard_num: &usize, total_shards: &usize) -> Result<()> { + + + // SETUP PHASE: + // Set up {output_location, filter, inputs, threading, progress bar} + let start_time = Instant::now(); + create_dir_if_not_exists(output_directory).unwrap(); + let bloom_filter = Arc::new(BloomFilter::from_args(bloom_filter_file.clone(), *expected_ngram_count, *fp_rate)); + + + // Setup input files + let all_inputs = expand_dirs(inputs).unwrap(); + let mut shard: Vec = Vec::new(); + let mut idx = *shard_num; + while idx < all_inputs.len() { + shard.push(all_inputs[idx].clone()); + idx += total_shards; + } + let mut rng = thread_rng(); + shard.shuffle(&mut rng); + + // Setup threads + let threads = if *threads == 0 { available_parallelism().unwrap().get() } else { - args.threads + *threads }; - let bloom_filter = if args.bloom_filter_file.exists() { - println!("Loading bloom filter from {:?}...", args.bloom_filter_file); - BloomFilter::from_file(&args.bloom_filter_file).unwrap() - } else { - println!("Creating new bloom filter..."); - let num_hashers = BloomFilter::optimal_number_of_hashers( - args.bloom_filter_size, - args.expected_ngram_count, + // Setup progress bar + let pbar = ProgressBar::new(shard.len() as u64) + .with_style( + ProgressStyle::with_template( + "Files {human_pos}/{human_len} [{elapsed_precise}/{duration_precise}] [{wide_bar:.cyan/blue}]", + ).unwrap() ); - BloomFilter::new(args.bloom_filter_size, num_hashers) - }; - let bloom_filter = Arc::new(bloom_filter); - println!( - "Bloom filter loaded. ({} hashers)", - bloom_filter.hash_builders.len() - ); + let pbar = Arc::new(Mutex::new(pbar)); + if !no_progress_bar { + pbar.lock().unwrap().inc(0); // Makes pbar show up with 0/N files complete + } + println!("Completed setup phase in {:?} seconds", start_time.elapsed().as_secs()); - let p = bloom_filter.my_prob_of_false_positive(args.expected_ngram_count); - if p >= 0.5 { - println!( - "WARNING: Probability of a false positive after {} elements is {}.", - args.expected_ngram_count, p - ); - } else { - println!( - "Probability of a false positive after {} elements: {}", - args.expected_ngram_count, p - ); - } - - let suggested_size = BloomFilter::suggest_size_in_bytes(args.expected_ngram_count); - if suggested_size * 2 < bloom_filter.size_in_bytes() { - println!( - "WARNING: Your bloom filter is more than twice as large as suggested for {} elements. \ - This is good for accuracy, but it is much slower, and likely not worth the trade-off.", - args.expected_ngram_count - ); - } + // LOOP PHASE(using threadpool) + let loop_start_time = Instant::now(); + let total_bytes = Arc::new(Mutex::new(0)); + let removed_bytes = Arc::new(Mutex::new(0)); let threadpool = ThreadPool::new(threads); - for input in args.inputs { - let mut output = args.output_directory.clone(); - output.push(input.file_name().unwrap()); + for input in shard { + let output = output_directory.clone().join(input.file_name().unwrap()); let bloom_filter = bloom_filter.clone(); + let pbar_option: Option>> = if *no_progress_bar { + None + } else { + Some(pbar.clone()) + }; + let min_ngram_size = min_ngram_size.clone(); + let max_ngram_size = max_ngram_size.clone(); + let filtering_threshold = filtering_threshold.clone(); + let remove_type = remove_type.clone(); + let no_update_bloom_filter = no_update_bloom_filter.clone(); + let annotate = annotate.clone(); + let no_progress_bar = no_progress_bar.clone(); + let total_bytes = Arc::clone(&total_bytes); + let removed_bytes = Arc::clone(&removed_bytes); + threadpool.execute(move || { - println!("Processing {input:?}..."); - process_file( + if no_progress_bar { + println!("Processing {input:?}..."); + } + let (removed_doc_bytes, total_doc_bytes) = process_file( &input, &output, &bloom_filter, - args.max_ngram_size, - args.min_ngram_size, - !args.no_update_bloom_filter, - args.filtering_threshold, - args.annotate_only, - args.annotate_attribute_only, - args.whole_document, - args.whole_paragraphs, - ) - .unwrap(); + max_ngram_size, + min_ngram_size, + &remove_type, + filtering_threshold.clone(), + no_update_bloom_filter.clone(), + annotate.clone(), + &pbar_option + ).unwrap(); + + let mut total_guard = total_bytes.lock().unwrap(); + *total_guard += total_doc_bytes; + + let mut removed_guard = removed_bytes.lock().unwrap(); + *removed_guard += removed_doc_bytes; }); } threadpool.join(); + println!("Completed filtering all files in {:?} seconds", + loop_start_time.elapsed().as_secs()); + + + // FINALIZE PHASE + // Save bloom filter + match &bloom_filter_file { + Some(path) => { + if !no_update_bloom_filter && !no_save_bloom_filter { + let write_start_time = Instant::now(); + println!("Writing bloom filter to {:?}...", path); + bloom_filter.write_to_file(&path).unwrap(); + println!("...Bloom filter written in {:?} seconds.", write_start_time.elapsed().as_secs()); + } - if !args.no_update_bloom_filter { - println!("Writing bloom filter to {:?}...", args.bloom_filter_file); - bloom_filter.write_to_file(&args.bloom_filter_file).unwrap(); - println!("Bloom filter written."); + } + _ => {} } -} + // Print out summary + println!("After running, BFF sparsity was {:?}", bloom_filter.calculate_sparsity()); + println!("Completed full BFF run in {:?} seconds", start_time.elapsed().as_secs()); + + let total_bytes = *total_bytes.lock().unwrap(); + let removed_bytes = *removed_bytes.lock().unwrap(); + println!("Stats: Saw {} of text | Removed {} of them", + human_bytes(total_bytes as f64), removed_bytes as f64 / total_bytes as f64); + Ok(()) +} \ No newline at end of file