From 684a376fc5627d8333663d3fdc21e63e68cbc187 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Thu, 27 Oct 2022 10:10:13 -0300 Subject: [PATCH] feat: Introduce MongoDb storage --- Cargo.lock | 757 ++++++++++++++++++++++++++++++++++++++++- Cargo.toml | 4 + src/storage/mod.rs | 27 ++ src/storage/mongodb.rs | 266 +++++++++++++++ 4 files changed, 1040 insertions(+), 14 deletions(-) create mode 100644 src/storage/mongodb.rs diff --git a/Cargo.lock b/Cargo.lock index 80f5af66..e73af661 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,6 +8,17 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "ahash" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" +dependencies = [ + "getrandom", + "once_cell", + "version_check", +] + [[package]] name = "aho-corasick" version = "0.7.18" @@ -17,6 +28,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "ascii" version = "1.0.0" @@ -100,6 +120,34 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "block-buffer" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69cce20737498f97b993470a6e536b8523f0af7892a4f928cceb1ac5e52ebe7e" +dependencies = [ + "generic-array", +] + +[[package]] +name = "bson" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d76085681585d39016f4d3841eb019201fc54d2dd0d92ad1e4fab3bfb32754" +dependencies = [ + "ahash", + "base64 0.13.0", + "hex", + "indexmap", + "lazy_static", + "rand", + "serde", + "serde_bytes", + "serde_json", + "time 0.3.9", + "uuid 1.2.1", +] + [[package]] name = "bumpalo" version = "3.11.0" @@ -136,6 +184,21 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfd4d1b31faaa3a89d7934dbded3111da0d2ef28e3ebccdb4f0179f5929d1ef1" +dependencies = [ + "iana-time-zone", + "js-sys", + "num-integer", + "num-traits", + "time 0.1.43", + "wasm-bindgen", + "winapi", +] + [[package]] name = "chunked_transfer" version = "1.4.0" @@ -181,6 +244,16 @@ dependencies = [ "os_str_bytes", ] +[[package]] +name = "codespan-reporting" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3538270d33cc669650c4b093848450d380def10c331d38c768e34cac80576e6e" +dependencies = [ + "termcolor", + "unicode-width", +] + [[package]] name = "combine" version = "4.6.3" @@ -237,6 +310,15 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" +[[package]] +name = "cpufeatures" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28d997bd5e24a5928dd43e46dc529867e207907fe0b239c3477d924f7f2ca320" +dependencies = [ + "libc", +] + [[package]] name = "crc32fast" version = "1.3.2" @@ -315,12 +397,66 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + [[package]] name = "cryptoxide" version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "129eabb7b0b78644a3a7e7cf220714aba47463bb281f69fa7a71ca5d12564cca" +[[package]] +name = "cxx" +version = "1.0.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e599641dff337570f6aa9c304ecca92341d30bf72e1c50287869ed6a36615a6" +dependencies = [ + "cc", + "cxxbridge-flags", + "cxxbridge-macro", + "link-cplusplus", +] + +[[package]] +name = "cxx-build" +version = "1.0.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60e2434bc22249c056e12d2e87db46380730da0f2648471edea3e8e11834a892" +dependencies = [ + "cc", + "codespan-reporting", + "once_cell", + "proc-macro2", + "quote", + "scratch", + "syn", +] + +[[package]] +name = "cxxbridge-flags" +version = "1.0.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3894ad0c6d517cb5a4ce8ec20b37cd0ea31b480fe582a104c5db67ae21270853" + +[[package]] +name = "cxxbridge-macro" +version = "1.0.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34fa7e395dc1c001083c7eed28c8f0f0b5a225610f3b6284675f444af6fab86b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "darling" version = "0.13.4" @@ -356,6 +492,34 @@ dependencies = [ "syn", ] +[[package]] +name = "data-encoding" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ee2393c4a91429dffb4bedf19f4d6abf27d8a732c8ce4980305d782e5426d57" + +[[package]] +name = "derivative" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "digest" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adfbc57365a37acbd2ebf2b64d7e69bb766e2fea813521ed536f5d0520dcf86c" +dependencies = [ + "block-buffer", + "crypto-common", + "subtle", +] + [[package]] name = "dtoa" version = "0.4.8" @@ -409,6 +573,18 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "enum-as-inner" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21cdad81446a7f7dc43f6a77409efeb9733d2fa65553efef6018ef257c959b73" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "env_logger" version = "0.9.0" @@ -590,6 +766,16 @@ dependencies = [ "thiserror", ] +[[package]] +name = "generic-array" +version = "0.14.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bff49e947297f3312447abdca79f45f4738097cc82b06e72054d2223f601f1b9" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "getrandom" version = "0.2.6" @@ -653,6 +839,26 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest", +] + +[[package]] +name = "hostname" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c731c3e10504cc8ed35cfe2f1db4c9274c3d35fa486e3b31df46f068ef3e867" +dependencies = [ + "libc", + "match_cfg", + "winapi", +] + [[package]] name = "http" version = "0.2.8" @@ -730,6 +936,30 @@ dependencies = [ "tokio-native-tls", ] +[[package]] +name = "iana-time-zone" +version = "0.1.51" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5a6ef98976b22b3b7f2f3a806f858cb862044cfa66805aa3ad84cb3d3b785ed" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "winapi", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0703ae284fc167426161c2e3f1da3ea71d94b21bedbcc9494e92b28e334e3dca" +dependencies = [ + "cxx", + "cxx-build", +] + [[package]] name = "ident_case" version = "1.0.1" @@ -777,6 +1007,18 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "ipconfig" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "723519edce41262b05d4143ceb95050e4c614f483e78e9fd9e39a8275a84ad98" +dependencies = [ + "socket2", + "widestring", + "winapi", + "winreg 0.7.0", +] + [[package]] name = "ipnet" version = "2.5.0" @@ -821,9 +1063,24 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.121" +version = "0.2.137" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efaa7b300f3b5fe8eb6bf21ce3895e1751d9665086af2d64b42f19701015ff4f" +checksum = "fc7fcc620a3bff7cdd7a365be3376c97191aeaccc2a603e600951e452615bf89" + +[[package]] +name = "link-cplusplus" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9272ab7b96c9046fbc5bc56c06c117cb639fe2d509df0c421cad82d2915cf369" +dependencies = [ + "cc", +] + +[[package]] +name = "linked-hash-map" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" [[package]] name = "lock_api" @@ -844,12 +1101,36 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "lru-cache" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31e24f1ad8321ca0e8a1e0ac13f23cb668e6f5466c2c57319f6a5cf1cc8e3b1c" +dependencies = [ + "linked-hash-map", +] + +[[package]] +name = "match_cfg" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" + [[package]] name = "matches" version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" +[[package]] +name = "md-5" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca" +dependencies = [ + "digest", +] + [[package]] name = "memchr" version = "2.4.1" @@ -944,7 +1225,53 @@ dependencies = [ "libc", "log", "wasi 0.11.0+wasi-snapshot-preview1", - "windows-sys", + "windows-sys 0.36.1", +] + +[[package]] +name = "mongodb" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5a1df476ac9541b0e4fdc8e2cc48884e66c92c933cd17a1fd75e68caf75752e" +dependencies = [ + "async-trait", + "base64 0.13.0", + "bitflags", + "bson", + "chrono", + "derivative", + "futures-core", + "futures-executor", + "futures-util", + "hex", + "hmac", + "lazy_static", + "md-5", + "os_info", + "pbkdf2", + "percent-encoding", + "rand", + "rustc_version_runtime", + "rustls", + "rustls-pemfile", + "serde", + "serde_bytes", + "serde_with", + "sha-1", + "sha2", + "socket2", + "stringprep", + "strsim", + "take_mut", + "thiserror", + "tokio", + "tokio-rustls", + "tokio-util", + "trust-dns-proto", + "trust-dns-resolver", + "typed-builder", + "uuid 0.8.2", + "webpki-roots", ] [[package]] @@ -986,6 +1313,16 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "num-integer" +version = "0.1.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9" +dependencies = [ + "autocfg", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.14" @@ -1081,6 +1418,16 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "os_info" +version = "3.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4750134fb6a5d49afc80777394ad5d95b04bc12068c6abb92fae8f43817270f" +dependencies = [ + "log", + "winapi", +] + [[package]] name = "os_str_bytes" version = "6.0.0" @@ -1199,7 +1546,17 @@ checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" dependencies = [ "instant", "lock_api", - "parking_lot_core", + "parking_lot_core 0.8.5", +] + +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api", + "parking_lot_core 0.9.4", ] [[package]] @@ -1216,12 +1573,34 @@ dependencies = [ "winapi", ] +[[package]] +name = "parking_lot_core" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dc9e0dc2adc1c69d09143aff38d3d30c5c3f0df0dad82e6d25547af174ebec0" +dependencies = [ + "cfg-if 1.0.0", + "libc", + "redox_syscall", + "smallvec", + "windows-sys 0.42.0", +] + [[package]] name = "pathdiff" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8835116a5c179084a830efb3adc117ab007512b535bc1a21c991d3b32a6b44dd" +[[package]] +name = "pbkdf2" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "271779f35b581956db91a3e55737327a03aa051e90b1c47aeb189508533adfd7" +dependencies = [ + "digest", +] + [[package]] name = "percent-encoding" version = "2.1.0" @@ -1295,7 +1674,7 @@ dependencies = [ "fnv", "lazy_static", "memchr", - "parking_lot", + "parking_lot 0.11.2", "thiserror", ] @@ -1311,6 +1690,12 @@ dependencies = [ "tiny_http", ] +[[package]] +name = "quick-error" +version = "1.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" + [[package]] name = "quote" version = "1.0.17" @@ -1460,7 +1845,32 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "web-sys", - "winreg", + "winreg 0.10.1", +] + +[[package]] +name = "resolv-conf" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52e44394d2086d010551b14b53b1f24e31647570cd1deb0379e2c21b329aba00" +dependencies = [ + "hostname", + "quick-error", +] + +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin", + "untrusted", + "web-sys", + "winapi", ] [[package]] @@ -1472,6 +1882,37 @@ dependencies = [ "semver", ] +[[package]] +name = "rustc_version_runtime" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d31b7153270ebf48bf91c65ae5b0c00e749c4cfad505f66530ac74950249582f" +dependencies = [ + "rustc_version", + "semver", +] + +[[package]] +name = "rustls" +version = "0.20.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "539a2bfe908f471bfa933876bd1eb6a19cf2176d375f82ef7f99530a40e48c2c" +dependencies = [ + "log", + "ring", + "sct", + "webpki", +] + +[[package]] +name = "rustls-pemfile" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ee86d63972a7c661d1536fefe8c3c8407321c3df668891286de28abcd087360" +dependencies = [ + "base64 0.13.0", +] + [[package]] name = "ryu" version = "1.0.9" @@ -1485,7 +1926,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "88d6731146462ea25d9244b2ed5fd1d716d25c52e4d54aa4fb0f3c4e9854dbe2" dependencies = [ "lazy_static", - "windows-sys", + "windows-sys 0.36.1", ] [[package]] @@ -1494,6 +1935,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +[[package]] +name = "scratch" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8132065adcfd6e02db789d9285a0deb2f3fcb04002865ab67d5fb103533898" + [[package]] name = "scrolls" version = "0.4.2" @@ -1511,6 +1958,7 @@ dependencies = [ "log", "merge", "minicbor 0.14.2", + "mongodb", "net2", "openssl", "pallas", @@ -1524,6 +1972,16 @@ dependencies = [ "tokio", ] +[[package]] +name = "sct" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "security-framework" version = "2.6.1" @@ -1571,6 +2029,15 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde_bytes" +version = "0.11.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfc50e8183eeeb6178dcb167ae34a8051d63535023ae38b5d8d12beae193d37b" +dependencies = [ + "serde", +] + [[package]] name = "serde_derive" version = "1.0.144" @@ -1588,6 +2055,7 @@ version = "1.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e8d9fa5c3b304765ce1fd9c4c8a3de2c8db365a5b91be52f186efc675681d95" dependencies = [ + "indexmap", "itoa 1.0.1", "ryu", "serde", @@ -1627,6 +2095,17 @@ dependencies = [ "syn", ] +[[package]] +name = "sha-1" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "028f48d513f9678cda28f6e4064755b3fbb2af6acd672f2c209b62323f7aea0f" +dependencies = [ + "cfg-if 1.0.0", + "cpufeatures", + "digest", +] + [[package]] name = "sha1" version = "0.6.1" @@ -1642,6 +2121,17 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" +[[package]] +name = "sha2" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82e6b795fe2e3b1e845bafcb27aa35405c4d47cdfc92af5fc8d3002f76cebdc0" +dependencies = [ + "cfg-if 1.0.0", + "cpufeatures", + "digest", +] + [[package]] name = "slab" version = "0.4.7" @@ -1664,7 +2154,7 @@ dependencies = [ "fxhash", "libc", "log", - "parking_lot", + "parking_lot 0.11.2", ] [[package]] @@ -1683,12 +2173,34 @@ dependencies = [ "winapi", ] +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + +[[package]] +name = "stringprep" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "strsim" version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "subtle" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" + [[package]] name = "syn" version = "1.0.90" @@ -1700,6 +2212,12 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "take_mut" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60" + [[package]] name = "tempfile" version = "3.3.0" @@ -1759,6 +2277,16 @@ dependencies = [ "syn", ] +[[package]] +name = "time" +version = "0.1.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "time" version = "0.3.9" @@ -1786,7 +2314,7 @@ dependencies = [ "ascii", "chunked_transfer", "log", - "time", + "time 0.3.9", "url", ] @@ -1820,9 +2348,21 @@ dependencies = [ "once_cell", "pin-project-lite", "socket2", + "tokio-macros", "winapi", ] +[[package]] +name = "tokio-macros" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tokio-native-tls" version = "0.3.0" @@ -1833,6 +2373,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.23.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" +dependencies = [ + "rustls", + "tokio", + "webpki", +] + [[package]] name = "tokio-util" version = "0.7.4" @@ -1882,12 +2433,74 @@ dependencies = [ "once_cell", ] +[[package]] +name = "trust-dns-proto" +version = "0.21.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c31f240f59877c3d4bb3b3ea0ec5a6a0cff07323580ff8c7a605cd7d08b255d" +dependencies = [ + "async-trait", + "cfg-if 1.0.0", + "data-encoding", + "enum-as-inner", + "futures-channel", + "futures-io", + "futures-util", + "idna", + "ipnet", + "lazy_static", + "log", + "rand", + "smallvec", + "thiserror", + "tinyvec", + "tokio", + "url", +] + +[[package]] +name = "trust-dns-resolver" +version = "0.21.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4ba72c2ea84515690c9fcef4c6c660bb9df3036ed1051686de84605b74fd558" +dependencies = [ + "cfg-if 1.0.0", + "futures-util", + "ipconfig", + "lazy_static", + "log", + "lru-cache", + "parking_lot 0.12.1", + "resolv-conf", + "smallvec", + "thiserror", + "tokio", + "trust-dns-proto", +] + [[package]] name = "try-lock" version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" +[[package]] +name = "typed-builder" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89851716b67b937e393b3daa8423e67ddfc4bbbf1654bcf05488e95e0828db0c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "typenum" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987" + [[package]] name = "unicode-bidi" version = "0.3.7" @@ -1915,6 +2528,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "url" version = "2.2.2" @@ -1927,6 +2546,25 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "uuid" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" +dependencies = [ + "getrandom", +] + +[[package]] +name = "uuid" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "feb41e78f93363bb2df8b0e86a2ca30eed7806ea16ea0c790d757cf93f79be83" +dependencies = [ + "getrandom", + "serde", +] + [[package]] name = "vcpkg" version = "0.2.15" @@ -2043,6 +2681,31 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" +dependencies = [ + "ring", + "untrusted", +] + +[[package]] +name = "webpki-roots" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368bfe657969fb01238bb756d351dcade285e0f6fcbd36dcb23359a5169975be" +dependencies = [ + "webpki", +] + +[[package]] +name = "widestring" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17882f045410753661207383517a6f62ec3dbeb6a4ed2acce01f0728238d1983" + [[package]] name = "winapi" version = "0.3.9" @@ -2080,43 +2743,109 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2" dependencies = [ - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_msvc", + "windows_aarch64_msvc 0.36.1", + "windows_i686_gnu 0.36.1", + "windows_i686_msvc 0.36.1", + "windows_x86_64_gnu 0.36.1", + "windows_x86_64_msvc 0.36.1", +] + +[[package]] +name = "windows-sys" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc 0.42.0", + "windows_i686_gnu 0.42.0", + "windows_i686_msvc 0.42.0", + "windows_x86_64_gnu 0.42.0", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc 0.42.0", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d2aa71f6f0cbe00ae5167d90ef3cfe66527d6f613ca78ac8024c3ccab9a19e" + [[package]] name = "windows_aarch64_msvc" version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd0f252f5a35cac83d6311b2e795981f5ee6e67eb1f9a7f64eb4500fbc4dcdb4" + [[package]] name = "windows_i686_gnu" version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" +[[package]] +name = "windows_i686_gnu" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbeae19f6716841636c28d695375df17562ca208b2b7d0dc47635a50ae6c5de7" + [[package]] name = "windows_i686_msvc" version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" +[[package]] +name = "windows_i686_msvc" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84c12f65daa39dd2babe6e442988fc329d6243fdce47d7d2d155b8d874862246" + [[package]] name = "windows_x86_64_gnu" version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" +[[package]] +name = "windows_x86_64_gnu" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf7b1b21b5362cbc318f686150e5bcea75ecedc74dd157d874d754a2ca44b0ed" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09d525d2ba30eeb3297665bd434a54297e4170c7f1a44cad4ef58095b4cd2028" + [[package]] name = "windows_x86_64_msvc" version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" +[[package]] +name = "windows_x86_64_msvc" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5" + +[[package]] +name = "winreg" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0120db82e8a1e0b9fb3345a539c478767c0048d842860994d96113d5b667bd69" +dependencies = [ + "winapi", +] + [[package]] name = "winreg" version = "0.10.1" diff --git a/Cargo.toml b/Cargo.toml index 8a7e976f..2d952f28 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,9 @@ tokio = { version = "1.21.1", features = ["rt-multi-thread"], optional = true } # elastic feature elasticsearch = { version = "8.4.0-alpha.1", optional = true } +# mongodb feature +mongodb = { version = "2.3.1", optional = true } + # tui feature indicatif = { version = "0.17.0-rc.11", optional = true } @@ -54,6 +57,7 @@ openssl = { version = "0.10", optional = true, features = ["vendored"] } [features] async = ["futures", "tokio"] elastic = ["elasticsearch", "async", "openssl"] +storage-mongodb = ["mongodb", "async"] unstable = ["elastic"] tui = ["indicatif"] default = ["tui"] diff --git a/src/storage/mod.rs b/src/storage/mod.rs index c335347f..6bf5f07b 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -4,6 +4,9 @@ pub mod skip; #[cfg(feature = "elastic")] pub mod elastic; +#[cfg(feature = "storage-mongodb")] +pub mod mongodb; + use gasket::messaging::TwoPhaseInputPort; use serde::Deserialize; @@ -21,6 +24,9 @@ pub enum Config { #[cfg(feature = "elastic")] Elastic(elastic::Config), + + #[cfg(feature = "storage-mongodb")] + MongoDb(mongodb::Config), } impl Config { @@ -36,6 +42,9 @@ impl Config { #[cfg(feature = "elastic")] Config::Elastic(c) => Bootstrapper::Elastic(c.bootstrapper(chain, intersect, policy)), + + #[cfg(feature = "storage-mongodb")] + Config::MongoDb(c) => Bootstrapper::MongoDb(c.bootstrapper(chain, intersect, policy)), } } } @@ -46,6 +55,9 @@ pub enum Bootstrapper { #[cfg(feature = "elastic")] Elastic(elastic::Bootstrapper), + + #[cfg(feature = "storage-mongodb")] + MongoDb(mongodb::Bootstrapper), } impl Bootstrapper { @@ -56,6 +68,9 @@ impl Bootstrapper { #[cfg(feature = "elastic")] Bootstrapper::Elastic(x) => x.borrow_input_port(), + + #[cfg(feature = "storage-mongodb")] + Bootstrapper::MongoDb(x) => x.borrow_input_port(), } } @@ -66,6 +81,9 @@ impl Bootstrapper { #[cfg(feature = "elastic")] Bootstrapper::Elastic(x) => Cursor::Elastic(x.build_cursor()), + + #[cfg(feature = "storage-mongodb")] + Bootstrapper::MongoDb(x) => Cursor::MongoDb(x.build_cursor()), } } @@ -76,6 +94,9 @@ impl Bootstrapper { #[cfg(feature = "elastic")] Bootstrapper::Elastic(x) => x.spawn_stages(pipeline), + + #[cfg(feature = "storage-mongodb")] + Bootstrapper::MongoDb(x) => x.spawn_stages(pipeline), } } } @@ -86,6 +107,9 @@ pub enum Cursor { #[cfg(feature = "elastic")] Elastic(elastic::Cursor), + + #[cfg(feature = "storage-mongodb")] + MongoDb(mongodb::Cursor), } impl Cursor { @@ -96,6 +120,9 @@ impl Cursor { #[cfg(feature = "elastic")] Cursor::Elastic(x) => x.last_point(), + + #[cfg(feature = "storage-mongodb")] + Cursor::MongoDb(x) => x.last_point(), } } } diff --git a/src/storage/mongodb.rs b/src/storage/mongodb.rs new file mode 100644 index 00000000..242dfc92 --- /dev/null +++ b/src/storage/mongodb.rs @@ -0,0 +1,266 @@ +use std::time::Duration; + +use futures::stream::StreamExt; + +use gasket::{ + error::AsWorkError, + runtime::{spawn_stage, WorkOutcome}, +}; + +use mongodb::{ + bson::{doc, spec::BinarySubtype, Binary, Bson, Decimal128, Document}, + options::ClientOptions, + Client, Database, +}; +use serde::Deserialize; + +use crate::prelude::*; +use crate::{bootstrap, crosscut, model, Error}; + +type InputPort = gasket::messaging::TwoPhaseInputPort; + +#[derive(Deserialize, Clone)] +pub struct Config { + pub connection_url: String, + pub database: String, + pub worker_threads: Option, + pub username: Option, + pub password: Option, +} + +impl Config { + pub fn bootstrapper( + self, + _chain: &crosscut::ChainWellKnownInfo, + _intersect: &crosscut::IntersectConfig, + policy: &crosscut::policies::RuntimePolicy, + ) -> Bootstrapper { + Bootstrapper { + config: self, + policy: policy.clone(), + input: Default::default(), + } + } +} + +pub struct Bootstrapper { + config: Config, + policy: crosscut::policies::RuntimePolicy, + input: InputPort, +} + +impl Bootstrapper { + pub fn borrow_input_port(&mut self) -> &'_ mut InputPort { + &mut self.input + } + + pub fn build_cursor(&self) -> Cursor { + Cursor {} + } + + pub fn spawn_stages(self, pipeline: &mut bootstrap::Pipeline) { + let threads = self.config.worker_threads.unwrap_or(3); + + let worker = Worker { + config: self.config, + policy: self.policy, + database: None, + runtime: tokio::runtime::Builder::new_multi_thread() + .worker_threads(threads) + .enable_io() + .enable_time() + .build() + .expect("couldn't setup tokio async runtime"), + input: self.input, + ops_count: Default::default(), + }; + + pipeline.register_stage(spawn_stage( + worker, + gasket::runtime::Policy { + tick_timeout: Some(Duration::from_secs(600)), + bootstrap_retry: gasket::retries::Policy { + max_retries: 20, + backoff_unit: Duration::from_secs(1), + backoff_factor: 2, + max_backoff: Duration::from_secs(60), + }, + ..Default::default() + }, + Some("mongodb"), + )); + } +} + +pub struct Cursor {} + +impl Cursor { + pub fn last_point(&mut self) -> Result, crate::Error> { + Ok(None) + } +} + +pub struct Worker { + config: Config, + database: Option, + runtime: tokio::runtime::Runtime, + policy: crosscut::policies::RuntimePolicy, + ops_count: gasket::metrics::Counter, + input: InputPort, +} + +impl Worker { + async fn connect(&self) -> Result { + // Parse a connection string into an options struct. + let options = ClientOptions::parse(&self.config.connection_url) + .await + .map_err(|err| Error::config(err.to_string())) + .or_panic()?; + + let client = Client::with_options(options).or_restart()?; + let database = client.database(&self.config.database); + + Ok(database) + } +} + +const BATCH_SIZE: usize = 40; + +#[derive(Default)] +struct Batch { + block_end: Option, + items: Vec, +} + +fn recv_batch(input: &mut InputPort) -> Result { + let mut batch = Batch::default(); + + loop { + match input.recv_or_idle() { + Ok(x) => match x.payload { + model::CRDTCommand::BlockStarting(_) => (), + model::CRDTCommand::BlockFinished(_) => { + batch.block_end = Some(x.payload); + return Ok(batch); + } + _ => { + batch.items.push(x.payload); + } + }, + Err(gasket::error::Error::RecvIdle) => return Ok(batch), + Err(err) => return Err(err), + }; + + if batch.items.len() >= BATCH_SIZE { + return Ok(batch); + } + } +} + +fn value_to_bson(value: model::Value) -> Bson { + match value { + model::Value::String(x) => x.into(), + model::Value::BigInt(x) => Bson::Decimal128(Decimal128::from_bytes(x.to_be_bytes())), + model::Value::Cbor(x) => Bson::Binary(Binary { + bytes: x, + subtype: BinarySubtype::UserDefined(1), + }), + model::Value::Json(x) => Bson::try_from(x).unwrap(), + } +} + +// TODO: collection name should be defined by the prefix of the key of the +// command. The problem is that we're concatenating the string upstream, so we +// don't have access to the clean prefix value. We could parse it, but it's a +// very ugly solution. A solution I like better is to keep the key prefix as an +// independent value in the command enum and let the storage stage decide how to +// persist it. The later solution requires several changes that we'll be treated +// as a different PR. In the meantime, we'll keep working in this MongoDb sink +// sending everything to a single, hardcoded collection. +const BAD_HARDCODED_COLLECTION: &str = &"catchall"; + +async fn apply_command( + cmd: model::CRDTCommand, + db: &Database, +) -> Result<(), mongodb::error::Error> { + match cmd { + model::CRDTCommand::BlockStarting(_) => Ok(()), + model::CRDTCommand::AnyWriteWins(key, value) => { + db.collection::(BAD_HARDCODED_COLLECTION) + .update_one( + doc! { "_id": &key }, + doc! { "_id": &key, "value": value_to_bson(value) }, + None, + ) + .await?; + + Ok(()) + } + model::CRDTCommand::BlockFinished(_) => { + log::warn!("MongoDb storage doesn't support cursors ATM"); + Ok(()) + } + _ => todo!(), + } +} + +async fn apply_batch( + batch: Batch, + db: &Database, + policy: &crosscut::policies::RuntimePolicy, +) -> Result<(), gasket::error::Error> { + let mut stream = futures::stream::iter(batch.items) + .map(|cmd| apply_command(cmd, db)) + .buffer_unordered(10); + + while let Some(op) = stream.next().await { + op.map_err(|e| Error::StorageError(e.to_string())) + .apply_policy(policy) + .or_retry()?; + } + + // we process the block end after the rest of the commands to ensure that no + // other change from the block remains pending in the async queue + if let Some(block_end) = batch.block_end { + apply_command(block_end, db) + .await + .map_err(|e| Error::StorageError(e.to_string())) + .apply_policy(policy) + .or_panic()?; + } + + Ok(()) +} + +impl gasket::runtime::Worker for Worker { + fn metrics(&self) -> gasket::metrics::Registry { + gasket::metrics::Builder::new() + .with_counter("storage_ops", &self.ops_count) + .build() + } + + fn work(&mut self) -> gasket::runtime::WorkResult { + let batch = recv_batch(&mut self.input)?; + let count = batch.items.len(); + let db = self.database.as_ref().unwrap(); + + self.runtime + .block_on(async { apply_batch(batch, db, &self.policy).await })?; + + self.ops_count.inc(count as u64); + self.input.commit(); + + Ok(WorkOutcome::Partial) + } + + fn bootstrap(&mut self) -> Result<(), gasket::error::Error> { + let db = self.runtime.block_on(self.connect())?; + self.database = Some(db); + + Ok(()) + } + + fn teardown(&mut self) -> Result<(), gasket::error::Error> { + Ok(()) + } +}