diff --git a/README.md b/README.md index b7010bf0..d3d271f4 100644 --- a/README.md +++ b/README.md @@ -34,8 +34,9 @@ This is why `ci-queue` optionally allows to put failed tests back into the queue Two implementations are provided, please refer to the respective documentations: - - [Python](python/) - - [Ruby](ruby/) +- [Python](python/) +- [Ruby](ruby/) +- [Rust](rust/) ## Redis Requirements @@ -48,4 +49,3 @@ Bug reports and pull requests are welcome on GitHub at https://github.com/Shopif ## License The code is available as open source under the terms of the [MIT License](http://opensource.org/licenses/MIT). - diff --git a/rust/.gitignore b/rust/.gitignore new file mode 100644 index 00000000..725d1fb2 --- /dev/null +++ b/rust/.gitignore @@ -0,0 +1,4 @@ +# Rust build artifacts +/target/ +**/*.rs.bk +*.pdb \ No newline at end of file diff --git a/rust/Cargo.lock b/rust/Cargo.lock new file mode 100644 index 00000000..ad2ae45a --- /dev/null +++ b/rust/Cargo.lock @@ -0,0 +1,1058 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "anstream" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ae563653d1938f79b1ab1b5e668c87c76a9930414574a6583a7b7e11a8e6192" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "862ed96ca487e809f1c8e5a8447f6ee2cf102f846893800b20cebdf541fc6bbd" + +[[package]] +name = "anstyle-parse" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e231f6134f61b71076a3eab506c379d4f36122f2af15a9ff04415ea4c3339e2" +dependencies = [ + "windows-sys 0.60.2", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e0633414522a32ffaac8ac6cc8f748e090c5717661fddeea04219e2344f5f2a" +dependencies = [ + "anstyle", + "once_cell_polyfill", + "windows-sys 0.60.2", +] + +[[package]] +name = "autocfg" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" + +[[package]] +name = "bitflags" +version = "2.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a65b545ab31d687cff52899d4890855fec459eb6afe0da6417b8a18da87aa29" + +[[package]] +name = "bumpalo" +version = "3.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" + +[[package]] +name = "bytes" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" + +[[package]] +name = "cfg-if" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fd1289c04a9ea8cb22300a459a72a385d7c73d3259e2ed7dcb2af674838cfa9" + +[[package]] +name = "ci-queue-core" +version = "0.1.0" +dependencies = [ + "redis", + "thiserror", + "uuid", +] + +[[package]] +name = "ci-queue-playwright" +version = "0.1.0" +dependencies = [ + "ci-queue-core", + "clap", + "rand", + "serde", + "serde_json", + "thiserror", + "which", +] + +[[package]] +name = "clap" +version = "4.5.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fc0e74a703892159f5ae7d3aac52c8e6c392f5ae5f359c70b5881d60aaac318" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3e7f4214277f3c7aa526a59dd3fbe306a370daee1f8b7b8c987069cd8e888a8" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.5.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14cb31bb0a7d536caef2639baa7fad459e15c3144efefa6dbd1c84562c4739f6" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b94f61472cee1439c0b966b47e3aca9ae07e45d070759512cd390ea2bebc6675" + +[[package]] +name = "colorchoice" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" + +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "memchr", +] + +[[package]] +name = "displaydoc" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "env_home" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7f84e12ccf0a7ddc17a6c41c93326024c42920d7ee630d04950e6926645c0fe" + +[[package]] +name = "errno" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad" +dependencies = [ + "libc", + "windows-sys 0.60.2", +] + +[[package]] +name = "form_urlencoded" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" +dependencies = [ + "percent-encoding", +] + +[[package]] +name = "getrandom" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" +dependencies = [ + "cfg-if", + "libc", + "wasi 0.11.1+wasi-snapshot-preview1", +] + +[[package]] +name = "getrandom" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasi 0.14.2+wasi-0.2.4", +] + +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + +[[package]] +name = "icu_collections" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "200072f5d0e3614556f94a9930d5dc3e0662a652823904c3a75dc3b0af7fee47" +dependencies = [ + "displaydoc", + "potential_utf", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_locale_core" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0cde2700ccaed3872079a65fb1a78f6c0a36c91570f28755dda67bc8f7d9f00a" +dependencies = [ + "displaydoc", + "litemap", + "tinystr", + "writeable", + "zerovec", +] + +[[package]] +name = "icu_normalizer" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "436880e8e18df4d7bbc06d58432329d6458cc84531f7ac5f024e93deadb37979" +dependencies = [ + "displaydoc", + "icu_collections", + "icu_normalizer_data", + "icu_properties", + "icu_provider", + "smallvec", + "zerovec", +] + +[[package]] +name = "icu_normalizer_data" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00210d6893afc98edb752b664b8890f0ef174c8adbb8d0be9710fa66fbbf72d3" + +[[package]] +name = "icu_properties" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "016c619c1eeb94efb86809b015c58f479963de65bdb6253345c1a1276f22e32b" +dependencies = [ + "displaydoc", + "icu_collections", + "icu_locale_core", + "icu_properties_data", + "icu_provider", + "potential_utf", + "zerotrie", + "zerovec", +] + +[[package]] +name = "icu_properties_data" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "298459143998310acd25ffe6810ed544932242d3f07083eee1084d83a71bd632" + +[[package]] +name = "icu_provider" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03c80da27b5f4187909049ee2d72f276f0d9f99a42c306bd0131ecfe04d8e5af" +dependencies = [ + "displaydoc", + "icu_locale_core", + "stable_deref_trait", + "tinystr", + "writeable", + "yoke", + "zerofrom", + "zerotrie", + "zerovec", +] + +[[package]] +name = "idna" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "686f825264d630750a544639377bae737628043f20d38bbc029e8f29ea968a7e" +dependencies = [ + "idna_adapter", + "smallvec", + "utf8_iter", +] + +[[package]] +name = "idna_adapter" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3acae9609540aa318d1bc588455225fb2085b9ed0c4f6bd0d9d5bcd86f1a0344" +dependencies = [ + "icu_normalizer", + "icu_properties", +] + +[[package]] +name = "is_terminal_polyfill" +version = "1.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" + +[[package]] +name = "itoa" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" + +[[package]] +name = "js-sys" +version = "0.3.77" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cfaf33c695fc6e08064efbc1f72ec937429614f25eef83af942d0e227c3a28f" +dependencies = [ + "once_cell", + "wasm-bindgen", +] + +[[package]] +name = "libc" +version = "0.2.175" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a82ae493e598baaea5209805c49bbf2ea7de956d50d7da0da1164f9c6d28543" + +[[package]] +name = "linux-raw-sys" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12" + +[[package]] +name = "litemap" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "241eaef5fd12c88705a01fc1066c48c4b36e0dd4377dcdc7ec3942cea7a69956" + +[[package]] +name = "log" +version = "0.4.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" + +[[package]] +name = "memchr" +version = "2.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" + +[[package]] +name = "num-bigint" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" +dependencies = [ + "num-integer", + "num-traits", +] + +[[package]] +name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + +[[package]] +name = "once_cell" +version = "1.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" + +[[package]] +name = "once_cell_polyfill" +version = "1.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" + +[[package]] +name = "percent-encoding" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" + +[[package]] +name = "potential_utf" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5a7c30837279ca13e7c867e9e40053bc68740f988cb07f7ca6df43cc734b585" +dependencies = [ + "zerovec", +] + +[[package]] +name = "ppv-lite86" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" +dependencies = [ + "zerocopy", +] + +[[package]] +name = "proc-macro2" +version = "1.0.101" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "r-efi" +version = "5.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" + +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom 0.2.16", +] + +[[package]] +name = "redis" +version = "0.32.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cd3650deebc68526b304898b192fa4102a4ef0b9ada24da096559cb60e0eef8" +dependencies = [ + "combine", + "itoa", + "num-bigint", + "percent-encoding", + "ryu", + "sha1_smol", + "socket2", + "url", +] + +[[package]] +name = "rustix" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11181fbabf243db407ef8df94a6ce0b2f9a733bd8be4ad02b4eda9602296cac8" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.60.2", +] + +[[package]] +name = "rustversion" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" + +[[package]] +name = "ryu" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" + +[[package]] +name = "serde" +version = "1.0.219" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.219" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.143" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d401abef1d108fbd9cbaebc3e46611f4b1021f714a0597a71f41ee463f5f4a5a" +dependencies = [ + "itoa", + "memchr", + "ryu", + "serde", +] + +[[package]] +name = "sha1_smol" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" + +[[package]] +name = "smallvec" +version = "1.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" + +[[package]] +name = "socket2" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "233504af464074f9d066d7b5416c5f9b894a5862a6506e306f7b816cdd6f1807" +dependencies = [ + "libc", + "windows-sys 0.59.0", +] + +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + +[[package]] +name = "syn" +version = "2.0.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ede7c438028d4436d71104916910f5bb611972c5cfd7f89b8300a8186e6fada6" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "synstructure" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "thiserror" +version = "2.0.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3467d614147380f2e4e374161426ff399c91084acd2363eaf549172b3d5e60c0" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c5e1be1c48b9172ee610da68fd9cd2770e7a4056cb3fc98710ee6906f0c7960" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tinystr" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d4f6d1145dcb577acf783d4e601bc1d76a13337bb54e6233add580b07344c8b" +dependencies = [ + "displaydoc", + "zerovec", +] + +[[package]] +name = "unicode-ident" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" + +[[package]] +name = "url" +version = "2.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32f8b686cadd1473f4bd0117a5d28d36b1ade384ea9b5069a1c40aefed7fda60" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", +] + +[[package]] +name = "utf8_iter" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" + +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + +[[package]] +name = "uuid" +version = "1.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f33196643e165781c20a5ead5582283a7dacbb87855d867fbc2df3f81eddc1be" +dependencies = [ + "getrandom 0.3.3", + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "wasi" +version = "0.11.1+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" + +[[package]] +name = "wasi" +version = "0.14.2+wasi-0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3" +dependencies = [ + "wit-bindgen-rt", +] + +[[package]] +name = "wasm-bindgen" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5" +dependencies = [ + "cfg-if", + "once_cell", + "rustversion", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f0a0651a5c2bc21487bde11ee802ccaf4c51935d0d3d42a6101f98161700bc6" +dependencies = [ + "bumpalo", + "log", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fe63fc6d09ed3792bd0897b314f53de8e16568c2b3f7982f468c0bf9bd0b407" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "which" +version = "8.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3fabb953106c3c8eea8306e4393700d7657561cb43122571b172bbfb7c7ba1d" +dependencies = [ + "env_home", + "rustix", + "winsafe", +] + +[[package]] +name = "windows-link" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" + +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-sys" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" +dependencies = [ + "windows-targets 0.53.3", +] + +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", + "windows_i686_gnullvm 0.52.6", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", + "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_msvc 0.52.6", +] + +[[package]] +name = "windows-targets" +version = "0.53.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5fe6031c4041849d7c496a8ded650796e7b6ecc19df1a431c1a363342e5dc91" +dependencies = [ + "windows-link", + "windows_aarch64_gnullvm 0.53.0", + "windows_aarch64_msvc 0.53.0", + "windows_i686_gnu 0.53.0", + "windows_i686_gnullvm 0.53.0", + "windows_i686_msvc 0.53.0", + "windows_x86_64_gnu 0.53.0", + "windows_x86_64_gnullvm 0.53.0", + "windows_x86_64_msvc 0.53.0", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86b8d5f90ddd19cb4a147a5fa63ca848db3df085e25fee3cc10b39b6eebae764" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7651a1f62a11b8cbd5e0d42526e55f2c99886c77e007179efff86c2b137e66c" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnu" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1dc67659d35f387f5f6c479dc4e28f1d4bb90ddd1a5d3da2e5d97b42d6272c3" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ce6ccbdedbf6d6354471319e781c0dfef054c81fbc7cf83f338a4296c0cae11" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_i686_msvc" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "581fee95406bb13382d2f65cd4a908ca7b1e4c2f1917f143ba16efe98a589b5d" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e55b5ac9ea33f2fc1716d1742db15574fd6fc8dadc51caab1c16a3d3b4190ba" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a6e035dd0599267ce1ee132e51c27dd29437f63325753051e71dd9e42406c57" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "271414315aff87387382ec3d271b52d7ae78726f5d44ac98b4f4030c91880486" + +[[package]] +name = "winsafe" +version = "0.0.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d135d17ab770252ad95e9a872d365cf3090e3be864a34ab46f48555993efc904" + +[[package]] +name = "wit-bindgen-rt" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" +dependencies = [ + "bitflags", +] + +[[package]] +name = "writeable" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea2f10b9bb0928dfb1b42b65e1f9e36f7f54dbdf08457afefb38afcdec4fa2bb" + +[[package]] +name = "yoke" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f41bb01b8226ef4bfd589436a297c53d118f65921786300e427be8d487695cc" +dependencies = [ + "serde", + "stable_deref_trait", + "yoke-derive", + "zerofrom", +] + +[[package]] +name = "yoke-derive" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38da3c9736e16c5d3c8c597a9aaa5d1fa565d0532ae05e27c24aa62fb32c0ab6" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + +[[package]] +name = "zerocopy" +version = "0.8.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1039dd0d3c310cf05de012d8a39ff557cb0d23087fd44cad61df08fc31907a2f" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ecf5b4cc5364572d7f4c329661bcc82724222973f2cab6f050a4e5c22f75181" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "zerofrom" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50cc42e0333e05660c3587f3bf9d0478688e15d870fab3346451ce7f8c9fbea5" +dependencies = [ + "zerofrom-derive", +] + +[[package]] +name = "zerofrom-derive" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + +[[package]] +name = "zerotrie" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36f0bbd478583f79edad978b407914f61b2972f5af6fa089686016be8f9af595" +dependencies = [ + "displaydoc", + "yoke", + "zerofrom", +] + +[[package]] +name = "zerovec" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7aa2bd55086f1ab526693ecbe444205da57e25f4489879da80635a46d90e73b" +dependencies = [ + "yoke", + "zerofrom", + "zerovec-derive", +] + +[[package]] +name = "zerovec-derive" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b96237efa0c878c64bd89c436f661be4e46b2f3eff1ebb976f7ef2321d2f58f" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/rust/Cargo.toml b/rust/Cargo.toml new file mode 100644 index 00000000..c9b49ec6 --- /dev/null +++ b/rust/Cargo.toml @@ -0,0 +1,3 @@ +[workspace] +members = ["ci-queue-core", "ci-queue-playwright"] +resolver = "2" \ No newline at end of file diff --git a/rust/README.md b/rust/README.md new file mode 100644 index 00000000..a01d29af --- /dev/null +++ b/rust/README.md @@ -0,0 +1,137 @@ +# ciqueue + +`ciqueue` offers a collection of queue implementations to offer automatic distribution of tests on CI. + +Using them requires an integration with the test framework runner. + +## Why a queue? + +One big problem with distributed test suites, is test imbalance. Meaning that one worker would spend 10 minutes while all the others are done after 1 minute. +There are algorithms available to balance perfectly your workers, but in practice your test performance tend to vary, and it's easier to consider tests as work unit in a queue and let workers pop them as fast as possible. + +Another advantage is that if you lose workers along the way, using a queue the other workers can pick up the job, making you resilient to failures. + +## Installation + +Add this to your `Cargo.toml`: + +```toml +[dependencies] +ci-queue-core = "0.1.0" +``` + +## Integrations + +### Playwright + +There is a Playwright integration that can be used as follows. +To start up the workers which execute the tests, run a command like the following on each worker node: + +```bash +playwright-queue --queue redis://:6379 --build run --worker --max-requeues --requeue-tolerance +``` + + + +## Implementing a new integration + +The reference implementation is the minitest one (Ruby). + +### Basic Interface + +All queue implementations implement the `Queue` trait and are iterable. To pop units of work off the queue, simply iterate over it. + +The simplest integration could look like this: + +```rust +use ci_queue_core::{StaticQueue, QueueConfig}; + +let tests = vec![ + "tests/foo.rs::test_foo".to_string(), + "tests/bar.rs::test_bar".to_string(), +]; +let config = QueueConfig::default(); +let mut queue = StaticQueue::new(tests, config); + +while let Some(test) = queue.next() { + let result = run_one_test(&test); // that part is heavily dependent on the test framework + queue.acknowledge(&test); + reporter.record(result); +} +``` + +Once a test was ran, the integration should call `queue.acknowledge`, otherwise the test could be reassigned to another worker. + +### Requeueing + +The larger a test suite gets, the more likely it is to break because of a transient issue. +In such context, it might be desirable to try the test again on another worker. + +To support requeueing, the integration can call `requeue` instead of `acknowledge`. +A complete integration should look like this: + +```rust +while let Some(test) = queue.next() { + let result = run_one_test(&test); // that part is heavily dependent on the test framework + + // Only attempt to requeue if the test failed. + // The method will return `false` if the test couldn't be requeued + if result.failed && queue.requeue(&test) { + // Since the test will run again, it should be marked as skipped, or a similar status + result.failed = false; + result.skipped = true; + reporter.record(result); + } else if queue.acknowledge(&test) || !result.failed { + // If the test was already acknowledged by another worker (we timed out) + // Then we only record it if it was successful. + reporter.record(result); + } +} +``` + +## Implementations + +`ciqueue` provides several queue implementations that can be swapped to implement many functionalities + +### Common parameters + +All implementations share the following constructor parameters: + +`tests`: should be a vector of strings. If you wish to randomize the test order (heavily recommended), you have to shuffle the list before you instantiate the queue. + +`max_requeues`: defines how many times a single test can be requeued. + +`requeue_tolerance`: defines how many requeues can be performed in total. Example, if your test suite contains 1000 tests, requeue_tolerance=0.05, means up to 5% of the suite can be requeued, so 50 tests. + +### `ci_queue_core::StaticQueue` + +The simplest implementation, mostly useful as a base class. + +The tests are held in memory, and not distributed. + +### `ci_queue_core::distributed::Worker` + +This one takes a few more arguments: + +`redis_url`: the Redis connection URL to use. + +`timeout`: the duration in seconds, after which a test, if not acknowledged, should be considered lost and re-assigned to another worker. Make sure this value is higher than your slowest test. + +`worker_id`: a unique identifier for your worker. It MUST be different for all your workers in a build. Your CI system likely provides a useful environment variable for it, e.g. `CIRCLE_NODE_INDEX` or `BUILDKITE_PARALLEL_JOB`. + +`build_id`: a unique identifier for your build. It MUST be the same for all workers in a build. Your system likely provides a useful environment variable for it, e.g. `CIRCLE_BUILD_NUM` or `BUILDKITE_BUILD_ID`. + +This implementation will use the passed Redis client to distribute the tests among all the workers sharing the same `build_id`. + +The first worker connected is automatically elected as the leader, and will push the test list inside Redis, once done all the workers will pop the tests one by one. +Which means any worker can crash at any point, without compromising the entire build. + + diff --git a/rust/ci-queue-core/Cargo.toml b/rust/ci-queue-core/Cargo.toml new file mode 100644 index 00000000..e940f3ff --- /dev/null +++ b/rust/ci-queue-core/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "ci-queue-core" +version = "0.1.0" +edition = "2024" + +[dependencies] +redis = "0.32.5" +thiserror = "2.0.16" +uuid = { version = "1.18.0", features = ["v4"] } diff --git a/rust/ci-queue-core/src/lib.rs b/rust/ci-queue-core/src/lib.rs new file mode 100644 index 00000000..bdbcb83a --- /dev/null +++ b/rust/ci-queue-core/src/lib.rs @@ -0,0 +1,5 @@ +pub mod queue; + +pub use queue::{Queue, QueueConfig, TestResult}; +pub use queue::static_queue::StaticQueue; +pub use queue::distributed::{Worker, Supervisor}; \ No newline at end of file diff --git a/rust/ci-queue-core/src/queue.rs b/rust/ci-queue-core/src/queue.rs new file mode 100644 index 00000000..fa0825a9 --- /dev/null +++ b/rust/ci-queue-core/src/queue.rs @@ -0,0 +1,85 @@ +use std::{collections::HashMap, hash::Hash}; + +pub mod static_queue; +pub mod distributed; + +/// Configuration for queue behavior +#[derive(Debug, Clone)] +pub struct QueueConfig { + pub max_requeues: usize, + pub requeue_tolerance: f64, + pub timeout: usize, +} + +impl Default for QueueConfig { + fn default() -> Self { + Self { + max_requeues: 0, + requeue_tolerance: 0.0, + timeout: 60, + } + } +} + +/// Result of a test execution +#[derive(Debug, Clone, PartialEq)] +pub enum TestResult { + Passed, + Failed, + Skipped, +} + +pub trait TestIdentifier: Clone + Hash + Eq { + fn to_redis_value(&self) -> String; + fn from_redis_value(s: &str, registry: &TestRegistry) -> Option; +} + +// Default implementation for String - unnecessary to use registry +impl TestIdentifier for String { + fn to_redis_value(&self) -> String { + self.clone() + } + + fn from_redis_value(s: &str, _registry: &TestRegistry) -> Option { + Some(s.to_string()) + } +} + +#[derive(Debug)] +pub struct TestRegistry { + tests: HashMap, +} + +impl TestRegistry { + pub fn new(tests: &[T]) -> Self { + Self { + tests: tests.into_iter().map(|t| (t.to_redis_value(), t.clone())).collect(), + } + } + + pub fn get(&self, s: &str) -> Option<&T> { + self.tests.get(s) + } +} + +pub trait Queue: Iterator { + /// Get the total number of tests + fn total(&self) -> usize; + + /// Get the current progress + fn progress(&mut self) -> usize; + + /// Get the remaining tests count + fn len(&mut self) -> usize; + + /// Check if the queue is empty + fn is_empty(&mut self) -> bool { + self.len() == 0 + } + + /// Acknowledge a test as completed + fn acknowledge(&mut self, test: &T) -> bool; + + /// Requeue a failed test + fn requeue(&mut self, test: &T) -> bool; +} \ No newline at end of file diff --git a/rust/ci-queue-core/src/queue/distributed.rs b/rust/ci-queue-core/src/queue/distributed.rs new file mode 100644 index 00000000..1b63eece --- /dev/null +++ b/rust/ci-queue-core/src/queue/distributed.rs @@ -0,0 +1,147 @@ +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use redis::{Commands, Connection}; + +pub mod scripts; +pub mod worker; +pub mod supervisor; + +pub use worker::Worker; +pub use supervisor::Supervisor; + +const KEY_PREFIX: &str = "build"; +const DEFAULT_MASTER_WAIT_TIMEOUT: Duration = Duration::from_secs(10); +const DEFAULT_REQUEUE_OFFSET: i64 = 42; + +#[derive(Debug, thiserror::Error)] +#[error("Master worker is {status:?} after {timeout:?} waiting")] +pub struct LostMasterError { + status: Option, + timeout: Duration, +} + +#[derive(Debug, thiserror::Error)] +#[error("Reservation error: {message}")] +pub struct ReservationError { + message: String, +} + +/// Base functionality shared by Worker and Supervisor +pub struct Base { + pub redis: Connection, + pub build_id: String, + pub is_master: bool, + pub total: Option, + master_wait_timeout: Duration, +} + +impl Base { + pub fn new(redis: Connection, build_id: String) -> Self { + Self { + redis, + build_id, + is_master: false, + total: None, + master_wait_timeout: DEFAULT_MASTER_WAIT_TIMEOUT, + } + } + + pub fn set_master_wait_timeout(&mut self, timeout: Duration) { + self.master_wait_timeout = timeout; + } + + pub fn key(&self, parts: &[&str]) -> String { + let mut key_parts = vec![KEY_PREFIX, &self.build_id]; + key_parts.extend_from_slice(parts); + key_parts.join(":") + } + + pub fn timestamp() -> f64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64() + } + + pub fn master_status(&mut self) -> Option { + self.redis + .get::<_, Option>(self.key(&["master-status"])) + .ok() + .flatten() + } + + pub fn wait_for_master(&mut self) -> Result<(), LostMasterError> { + if self.is_master { + return Ok(()); + } + + let start = SystemTime::now(); + loop { + match self.master_status() { + Some(status) if status == "ready" || status == "finished" => return Ok(()), + status => { + let elapsed = SystemTime::now().duration_since(start).unwrap_or_default(); + if elapsed > self.master_wait_timeout { + return Err(LostMasterError { + status, + timeout: self.master_wait_timeout, + }); + } + } + } + std::thread::sleep(Duration::from_millis(100)); + } + } + + /// Get queue length (items in queue + items running) + pub fn len(&mut self) -> usize { + let queue_len: usize = self.redis + .llen(self.key(&["queue"])) + .unwrap_or(0); + + let running_len: usize = self.redis + .zcard(self.key(&["running"])) + .unwrap_or(0); + + queue_len + running_len + } + + pub fn is_empty(&mut self) -> bool { + self.len() == 0 + } + + pub fn progress(&mut self) -> usize { + self.total.unwrap_or(0).saturating_sub(self.len()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_key_generation() { + // Test the key generation logic directly without Base struct + let build_id = "test-build-123"; + + // Replicate the key generation logic + fn make_key(build_id: &str, parts: &[&str]) -> String { + let mut key_parts = vec![KEY_PREFIX, build_id]; + key_parts.extend_from_slice(parts); + key_parts.join(":") + } + + assert_eq!(make_key(build_id, &["queue"]), "build:test-build-123:queue"); + assert_eq!(make_key(build_id, &["worker", "w1", "queue"]), "build:test-build-123:worker:w1:queue"); + assert_eq!(make_key(build_id, &[]), "build:test-build-123"); + } + + #[test] + fn test_timestamp() { + let ts1 = Base::timestamp(); + std::thread::sleep(Duration::from_millis(10)); + let ts2 = Base::timestamp(); + + assert!(ts2 > ts1); + assert!(ts1 > 0.0); + } +} \ No newline at end of file diff --git a/rust/ci-queue-core/src/queue/distributed/scripts.rs b/rust/ci-queue-core/src/queue/distributed/scripts.rs new file mode 100644 index 00000000..a95c9e8e --- /dev/null +++ b/rust/ci-queue-core/src/queue/distributed/scripts.rs @@ -0,0 +1,61 @@ +pub enum Script { + Reserve, + ReserveLost, + Acknowledge, + Requeue, + Release, + Heartbeat, +} + +impl Script { + pub fn name(&self) -> &'static str { + match self { + Script::Reserve => "reserve", + Script::ReserveLost => "reserve_lost", + Script::Acknowledge => "acknowledge", + Script::Requeue => "requeue", + Script::Release => "release", + Script::Heartbeat => "heartbeat", + } + } + + pub fn content(&self) -> &'static str { + match self { + Script::Reserve => include_str!("../../../../../redis/reserve.lua"), + Script::ReserveLost => include_str!("../../../../../redis/reserve_lost.lua"), + Script::Acknowledge => include_str!("../../../../../redis/acknowledge.lua"), + Script::Requeue => include_str!("../../../../../redis/requeue.lua"), + Script::Release => include_str!("../../../../../redis/release.lua"), + Script::Heartbeat => include_str!("../../../../../redis/heartbeat.lua"), + } + } + + pub fn eval(&self, redis: &mut redis::Connection, keys: Vec, args: Vec) -> redis::RedisResult { + let script = redis::Script::new(self.content()); + script.key(keys) + .arg(args) + .invoke(redis) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_scripts_load() { + // Ensure all scripts are loaded correctly + assert!(!Script::Reserve.content().is_empty()); + assert!(!Script::ReserveLost.content().is_empty()); + assert!(!Script::Acknowledge.content().is_empty()); + assert!(!Script::Requeue.content().is_empty()); + assert!(!Script::Release.content().is_empty()); + assert!(!Script::Heartbeat.content().is_empty()); + + // Check that scripts contain expected Redis commands + assert!(Script::Reserve.content().contains("rpop")); + assert!(Script::ReserveLost.content().contains("zrangebyscore")); + assert!(Script::Acknowledge.content().contains("zrem")); + assert!(Script::Requeue.content().contains("linsert")); + } +} \ No newline at end of file diff --git a/rust/ci-queue-core/src/queue/distributed/supervisor.rs b/rust/ci-queue-core/src/queue/distributed/supervisor.rs new file mode 100644 index 00000000..c5e0868b --- /dev/null +++ b/rust/ci-queue-core/src/queue/distributed/supervisor.rs @@ -0,0 +1,42 @@ +use std::time::Duration; + +use super::{Base}; + +/// Supervisor monitors the queue and waits for all workers to complete +pub struct Supervisor { + base: Base, +} + +impl Supervisor { + pub fn new(redis_url: &str, build_id: String) -> Result { + let client = redis::Client::open(redis_url)?; + let conn = client.get_connection()?; + let base = Base::new(conn, build_id); + + Ok(Self { base }) + } + + pub fn wait_for_workers(&mut self) -> bool { + if self.base.wait_for_master().is_err() { + return false; + } + + while !self.base.is_empty() { + std::thread::sleep(Duration::from_millis(100)); + } + + true + } + + pub fn len(&mut self) -> usize { + self.base.len() + } + + pub fn is_empty(&mut self) -> bool { + self.base.is_empty() + } + + pub fn progress(&mut self) -> usize { + self.base.progress() + } +} \ No newline at end of file diff --git a/rust/ci-queue-core/src/queue/distributed/worker.rs b/rust/ci-queue-core/src/queue/distributed/worker.rs new file mode 100644 index 00000000..d3c1b41c --- /dev/null +++ b/rust/ci-queue-core/src/queue/distributed/worker.rs @@ -0,0 +1,249 @@ +use std::collections::HashSet; +use std::time::Duration; +use redis::Commands; +use super::{Base, ReservationError, DEFAULT_REQUEUE_OFFSET}; +use super::scripts::Script; +use crate::queue::{Queue, QueueConfig, TestIdentifier, TestRegistry}; +use uuid::Uuid; + +/// Distributed worker that processes tests from the Redis queue +pub struct Worker { + base: Base, + config: QueueConfig, + worker_id: String, + shutdown_required: bool, + reserved_tests: HashSet, + test_registry: TestRegistry, +} + +impl Worker { + pub fn new( + redis_url: &str, + build_id: String, + worker_id: Option, + tests: Vec, + config: QueueConfig, + ) -> Result { + let client = redis::Client::open(redis_url)?; + let conn = client.get_connection()?; + + let mut base = Base::new(conn, build_id); + base.total = Some(tests.len()); + + let mut worker = Self { + base, + config, + worker_id: worker_id.unwrap_or_else(|| Uuid::new_v4().to_string()), + shutdown_required: false, + reserved_tests: HashSet::new(), + test_registry: TestRegistry::new(&tests), + }; + + // Attempt to become master and push tests + worker.push_tests(&tests)?; + + Ok(worker) + } + + pub fn set_master_wait_timeout(&mut self, timeout: Duration) { + self.base.set_master_wait_timeout(timeout); + } + + pub fn shutdown(&mut self) { + self.shutdown_required = true; + } + + pub fn is_shutdown_required(&self) -> bool { + self.shutdown_required + } + + pub fn is_master(&self) -> bool { + self.base.is_master + } + + /// Push tests to the queue (master election) + fn push_tests(&mut self, tests: &[T]) -> redis::RedisResult<()> { + let master_key = self.base.key(&["master-status"]); + + // Try to become master using SETNX + let became_master: bool = self.base.redis.set_nx(&master_key, "setup")?; + + if became_master { + self.base.is_master = true; + + let queue_key = self.base.key(&["queue"]); + let total_key = self.base.key(&["total"]); + + redis::pipe() + .atomic() + .lpush(&queue_key, &tests.iter().map(|t| t.to_redis_value()).collect::>()) + .set(&total_key, tests.len()) + .set(&master_key, "ready") + .query::<()>(&mut self.base.redis)?; + } + + // Register as a worker + let workers_key = self.base.key(&["workers"]); + self.base.redis.sadd::<_, _, ()>(&workers_key, &self.worker_id)?; + + Ok(()) + } + + /// Reserve a test from the queue + fn reserve(&mut self) -> Option { + // First try to get a lost test, then a normal test + self.try_reserve_lost_test() + .or_else(|| self.try_reserve_test()) + } + + /// Try to reserve a test that was lost (timed out) + fn try_reserve_lost_test(&mut self) -> Option { + let keys = vec![ + self.base.key(&["running"]), + self.base.key(&["completed"]), + self.base.key(&["worker", &self.worker_id, "queue"]), + self.base.key(&["owners"]), + ]; + + let args = vec![ + Base::timestamp().to_string(), + self.config.timeout.to_string(), + ]; + + match Script::ReserveLost.eval(&mut self.base.redis, keys, args) { + Ok(redis::Value::BulkString(data)) if !data.is_empty() => { + String::from_utf8(data) + .ok() + .and_then(|s| T::from_redis_value(&s, &self.test_registry)) + } + _ => None, + } + } + + /// Try to reserve a normal test from the queue + fn try_reserve_test(&mut self) -> Option { + let keys = vec![ + self.base.key(&["queue"]), + self.base.key(&["running"]), + self.base.key(&["processed"]), + self.base.key(&["worker", &self.worker_id, "queue"]), + self.base.key(&["owners"]), + ]; + + let args = vec![Base::timestamp().to_string()]; + + match Script::Reserve.eval(&mut self.base.redis, keys, args) { + Ok(redis::Value::BulkString(data)) if !data.is_empty() => { + String::from_utf8(data) + .ok() + .and_then(|s| T::from_redis_value(&s, &self.test_registry)) + } + _ => None, + } + } + + pub fn acknowledge_test(&mut self, test: &T) -> Result { + // Check if we have this test reserved + if !self.reserved_tests.remove(test) { + return Err(ReservationError { + message: format!("Test '{}' was not reserved by this worker", test.to_redis_value()), + }); + } + + let keys = vec![ + self.base.key(&["running"]), + self.base.key(&["processed"]), + self.base.key(&["owners"]), + self.base.key(&["error-reports"]), + ]; + + let args = vec![ + test.to_redis_value(), + String::new(), // No error + "28800".to_string(), // TTL: 8 hours + ]; + + match Script::Acknowledge.eval(&mut self.base.redis, keys, args) { + Ok(redis::Value::Int(1)) => Ok(true), + _ => Ok(false), + } + } + + /// Requeue a failed test + pub fn requeue_test(&mut self, test: &T) -> bool { + let global_max = (self.config.requeue_tolerance * self.base.total.unwrap_or(0) as f64).ceil() as usize; + + if self.config.max_requeues == 0 || global_max == 0 { + return false; + } + + let keys = vec![ + self.base.key(&["processed"]), + self.base.key(&["requeues-count"]), + self.base.key(&["queue"]), + self.base.key(&["running"]), + self.base.key(&["worker", &self.worker_id, "queue"]), + self.base.key(&["owners"]), + self.base.key(&["error-reports"]), + ]; + + let args = vec![ + self.config.max_requeues.to_string(), + global_max.to_string(), + test.to_redis_value(), + DEFAULT_REQUEUE_OFFSET.to_string(), + ]; + + match Script::Requeue.eval(&mut self.base.redis, keys, args) { + Ok(redis::Value::Int(1)) => { + self.reserved_tests.remove(test); + true + } + _ => false, + } + } +} + +impl Queue for Worker { + fn total(&self) -> usize { + self.base.total.unwrap_or(0) + } + + fn progress(&mut self) -> usize { + self.base.progress() + } + + fn len(&mut self) -> usize { + self.base.len() + } + + fn acknowledge(&mut self, test: &T) -> bool { + self.acknowledge_test(test).unwrap_or(false) + } + + fn requeue(&mut self, test: &T) -> bool { + self.requeue_test(test) + } +} + +impl Iterator for Worker { + type Item = T; + + fn next(&mut self) -> Option { + if self.base.wait_for_master().is_err() { + return None; + } + + if self.shutdown_required || self.base.is_empty() { + return None; + } + + if let Some(test) = self.reserve() { + self.reserved_tests.insert(test.clone()); + Some(test) + } else { + std::thread::sleep(Duration::from_millis(50)); + self.next() + } + } +} \ No newline at end of file diff --git a/rust/ci-queue-core/src/queue/static_queue.rs b/rust/ci-queue-core/src/queue/static_queue.rs new file mode 100644 index 00000000..54b29828 --- /dev/null +++ b/rust/ci-queue-core/src/queue/static_queue.rs @@ -0,0 +1,234 @@ +use std::collections::{HashMap, VecDeque}; +use crate::queue::TestIdentifier; + +use super::{Queue, QueueConfig}; + +/// Static queue implementation - holds tests in memory +pub struct StaticQueue { + queue: VecDeque, + progress: usize, + total: usize, + config: QueueConfig, + requeues: HashMap, + global_requeue_count: usize, +} + +impl StaticQueue { + /// Create a new static queue with the given tests + pub fn new(tests: Vec, config: QueueConfig) -> Self { + let total = tests.len(); + Self { + queue: tests.into_iter().collect(), + progress: 0, + total, + config, + requeues: HashMap::new(), + global_requeue_count: 0, + } + } + + /// Calculate the maximum global requeues allowed + fn global_max_requeues(&self) -> usize { + (self.config.requeue_tolerance * self.total as f64).ceil() as usize + } + + /// Check if a test can be requeued + fn should_requeue(&self, test: &T) -> bool { + // Check if we've disabled requeues entirely + if self.config.max_requeues == 0 || self.global_max_requeues() == 0 { + return false; + } + + let test_requeues = self.requeues.get(test).copied().unwrap_or(0); + test_requeues < self.config.max_requeues + && self.global_requeue_count < self.global_max_requeues() + } +} + +impl Queue for StaticQueue { + fn total(&self) -> usize { + self.total + } + + fn progress(&mut self) -> usize { + self.progress + } + + fn len(&mut self) -> usize { + self.queue.len() + } + + fn acknowledge(&mut self, _test: &T) -> bool { + // In static queue, acknowledge always succeeds + true + } + + fn requeue(&mut self, test: &T) -> bool { + if !self.should_requeue(test) { + return false; + } + + *self.requeues.entry(test.clone()).or_insert(0) += 1; + self.global_requeue_count += 1; + + // Insert at the front of the queue (like Python implementation) + self.queue.push_front(test.clone()); + true + } +} + +impl Iterator for StaticQueue { + type Item = T; + + fn next(&mut self) -> Option { + if let Some(test) = self.queue.pop_front() { + self.progress += 1; + Some(test) + } else { + None + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_basic_queue_iteration() { + let tests = vec!["test1".to_string(), "test2".to_string(), "test3".to_string()]; + let mut queue = StaticQueue::new(tests.clone(), QueueConfig::default()); + + assert_eq!(queue.total(), 3); + assert_eq!(queue.len(), 3); + assert_eq!(queue.progress(), 0); + + assert_eq!(queue.next(), Some("test1".to_string())); + assert_eq!(queue.progress(), 1); + assert_eq!(queue.len(), 2); + + assert_eq!(queue.next(), Some("test2".to_string())); + assert_eq!(queue.next(), Some("test3".to_string())); + assert_eq!(queue.next(), None); + + assert_eq!(queue.progress(), 3); + assert_eq!(queue.len(), 0); + } + + #[test] + fn test_requeue_respects_max_per_test() { + let config = QueueConfig { + max_requeues: 2, + requeue_tolerance: 3.0, // Allow 300% to be requeued (3 requeues for 1 test) + timeout: 60, + }; + let tests = vec!["test1".to_string()]; + let mut queue = StaticQueue::new(tests, config); + + // First run + let test = queue.next().unwrap(); + assert_eq!(test, "test1"); + + // First requeue - should succeed + assert!(queue.requeue(&test), "First requeue should succeed"); + assert_eq!(queue.len(), 1); + assert_eq!(queue.requeues.get("test1").copied().unwrap_or(0), 1); + + // Run again + let test = queue.next().unwrap(); + assert_eq!(test, "test1"); + + // Second requeue - should succeed (at limit) + assert!(queue.requeue(&test), "Second requeue should succeed (at limit)"); + assert_eq!(queue.requeues.get("test1").copied().unwrap_or(0), 2); + + // Run again + let test = queue.next().unwrap(); + + // Third requeue - should fail (exceeds max_requeues of 2) + assert!(!queue.requeue(&test), "Third requeue should fail"); + assert_eq!(queue.len(), 0); + } + + #[test] + fn test_requeue_respects_global_tolerance() { + let config = QueueConfig { + max_requeues: 10, // High per-test limit + requeue_tolerance: 0.5, // Only 50% of tests can be requeued + timeout: 60, + }; + let tests = vec!["test1".to_string(), "test2".to_string()]; + let mut queue = StaticQueue::new(tests, config); + + // With 2 tests and 0.5 tolerance, only 1 requeue total is allowed + // (0.5 * 2 = 1.0, ceil(1.0) = 1) + + // Run first test + let test1 = queue.next().unwrap(); + assert!(queue.requeue(&test1)); + + // Run second test + let test2 = queue.next().unwrap(); + // This should fail because we've hit global limit + assert!(!queue.requeue(&test2)); + } + + #[test] + fn test_requeue_inserts_at_front() { + let config = QueueConfig { + max_requeues: 1, + requeue_tolerance: 1.0, + timeout: 60, + }; + let tests = vec!["test1".to_string(), "test2".to_string(), "test3".to_string()]; + let mut queue = StaticQueue::new(tests, config); + + // Get first test + let test1 = queue.next().unwrap(); + assert_eq!(test1, "test1"); + + // Requeue it + assert!(queue.requeue(&test1)); + + // Next should be the requeued test (inserted at front) + assert_eq!(queue.next().unwrap(), "test1"); + // Then continue with original order + assert_eq!(queue.next().unwrap(), "test2"); + assert_eq!(queue.next().unwrap(), "test3"); + } + + #[test] + fn test_zero_requeue_config() { + let config = QueueConfig { + max_requeues: 0, + requeue_tolerance: 0.0, + timeout: 60, + }; + let tests = vec!["test1".to_string()]; + let mut queue = StaticQueue::new(tests, config); + + let test = queue.next().unwrap(); + // Should not allow any requeues + assert!(!queue.requeue(&test)); + } + + #[test] + fn test_requeue_tolerance_ceiling() { + // Test that requeue tolerance is ceiling'd, not floored + let config = QueueConfig { + max_requeues: 10, + requeue_tolerance: 0.15, // 15% of 3 = 0.45 + timeout: 60, + }; + let tests = vec!["test1".to_string(), "test2".to_string(), "test3".to_string()]; + let mut queue = StaticQueue::new(tests, config); + + // ceil(0.45) = 1, so one requeue should be allowed + let test1 = queue.next().unwrap(); + assert!(queue.requeue(&test1)); + + // Second requeue should fail + let test2 = queue.next().unwrap(); + assert!(!queue.requeue(&test2)); + } +} \ No newline at end of file diff --git a/rust/ci-queue-core/tests/utils.rs b/rust/ci-queue-core/tests/utils.rs new file mode 100644 index 00000000..ea1d393b --- /dev/null +++ b/rust/ci-queue-core/tests/utils.rs @@ -0,0 +1,62 @@ +use std::process::Command; + +pub struct RedisTestContext { + pub build_id: String, +} + +impl RedisTestContext { + pub fn new(test_name: &str) -> Option { + if !Self::redis_available() { + eprintln!("Skipping integration test: Redis not available"); + return None; + } + + let build_id = format!("{}-{}", test_name, std::process::id()); + let context = Self { build_id }; + context.cleanup(); + Some(context) + } + + fn redis_available() -> bool { + Command::new("redis-cli") + .arg("ping") + .output() + .map(|output| output.status.success()) + .unwrap_or(false) + } + + fn cleanup(&self) { + let _ = Command::new("redis-cli") + .arg("--scan") + .arg("--pattern") + .arg(&format!("build:{}:*", self.build_id)) + .output() + .and_then(|output| { + let keys = String::from_utf8_lossy(&output.stdout); + for key in keys.lines() { + let _ = Command::new("redis-cli") + .arg("del") + .arg(key) + .output(); + } + Ok(()) + }); + } +} + +impl Drop for RedisTestContext { + fn drop(&mut self) { + self.cleanup(); + } +} + +// Macro to skip tests when Redis is not available +#[macro_export] +macro_rules! redis_test { + ($context:ident, $body:block) => { + let Some($context) = crate::utils::RedisTestContext::new(stringify!($context)) else { + return; + }; + $body + }; +} diff --git a/rust/ci-queue-core/tests/worker_integration_tests.rs b/rust/ci-queue-core/tests/worker_integration_tests.rs new file mode 100644 index 00000000..b4c77929 --- /dev/null +++ b/rust/ci-queue-core/tests/worker_integration_tests.rs @@ -0,0 +1,303 @@ +mod utils; + +use ci_queue_core::queue::distributed::Worker; +use ci_queue_core::queue::{Queue, QueueConfig}; +use std::time::Duration; + +#[test] +#[ignore] +fn test_single_worker_processes_all_tests() { + redis_test!(ctx, { + let tests = vec![ + "test::module1::test_a".to_string(), + "test::module1::test_b".to_string(), + "test::module2::test_c".to_string(), + ]; + + let mut worker = Worker::new( + "redis://127.0.0.1/", + ctx.build_id.clone(), + Some("worker-1".to_string()), + tests.clone(), + QueueConfig::default(), + ).expect("Failed to create worker"); + + assert!(worker.is_master()); + assert_eq!(worker.total(), 3); + + let mut processed = Vec::new(); + while let Some(test) = worker.next() { + processed.push(test.clone()); + assert!(worker.acknowledge(&test)); + } + + assert_eq!(processed.len(), 3); + + let progress = worker.progress(); + assert_eq!(progress, 3); + + let remaining = worker.len(); + assert_eq!(remaining, 0); + }); +} + +#[test] +#[ignore] +fn test_multiple_workers_share_queue() { + redis_test!(ctx, { + let tests = vec![ + "test1".to_string(), + "test2".to_string(), + "test3".to_string(), + "test4".to_string(), + "test5".to_string(), + "test6".to_string(), + ]; + + let mut worker1 = Worker::new( + "redis://127.0.0.1/", + ctx.build_id.clone(), + Some("worker-1".to_string()), + tests.clone(), + QueueConfig::default(), + ).expect("Failed to create worker1"); + + let mut worker2 = Worker::new( + "redis://127.0.0.1/", + ctx.build_id.clone(), + Some("worker-2".to_string()), + tests.clone(), + QueueConfig::default(), + ).expect("Failed to create worker2"); + + assert!(worker1.is_master() != worker2.is_master()); + + let mut all_processed = Vec::new(); + + for _ in 0..3 { + if let Some(test) = worker1.next() { + all_processed.push(test.clone()); + assert!(worker1.acknowledge(&test)); + } + + if let Some(test) = worker2.next() { + all_processed.push(test.clone()); + assert!(worker2.acknowledge(&test)); + } + } + + assert_eq!(all_processed.len(), 6); + + let unique_tests: std::collections::HashSet<_> = all_processed.iter().collect(); + assert_eq!(unique_tests.len(), 6); + }); +} + +#[test] +#[ignore] +fn test_worker_requeue_failed_test() { + redis_test!(ctx, { + let tests = vec![ + "test1".to_string(), + "test2".to_string(), + ]; + + let config = QueueConfig { + max_requeues: 2, + requeue_tolerance: 1.0, + timeout: 60, + }; + + let mut worker = Worker::new( + "redis://127.0.0.1/", + ctx.build_id.clone(), + Some("worker-1".to_string()), + tests.clone(), + config, + ).expect("Failed to create worker"); + + let test1 = worker.next().expect("Should get test1"); + assert_eq!(test1, "test1"); + + assert!(worker.requeue(&test1)); + + let test2 = worker.next().expect("Should get test2"); + assert_eq!(test2, "test2"); + assert!(worker.acknowledge(&test2)); + + let requeued_test = worker.next().expect("Should get requeued test1"); + assert_eq!(requeued_test, "test1"); + assert!(worker.acknowledge(&requeued_test)); + + assert_eq!(worker.next(), None); + }); +} + +#[test] +#[ignore] +fn test_worker_timeout_and_recovery() { + redis_test!(ctx, { + let tests = vec!["test1".to_string(), "test2".to_string()]; + + let config = QueueConfig { + max_requeues: 2, + requeue_tolerance: 1.0, + timeout: 1, + }; + + let mut worker1 = Worker::new( + "redis://127.0.0.1/", + ctx.build_id.clone(), + Some("worker-1".to_string()), + tests.clone(), + config.clone(), + ).expect("Failed to create worker1"); + + + let test1 = worker1.next().expect("Should get test1"); + + std::thread::sleep(Duration::from_secs(2)); + + let mut worker2 = Worker::new( + "redis://127.0.0.1/", + ctx.build_id.clone(), + Some("worker-2".to_string()), + tests.clone(), + config.clone(), + ).expect("Failed to create worker2"); + + let recovered_test = worker2.next(); + assert!(recovered_test.is_some()); + + if let Some(test) = recovered_test { + assert_eq!(test, test1); + assert!(worker2.acknowledge(&test)); + } + }); +} + +#[test] +#[ignore] +fn test_worker_shutdown() { + redis_test!(ctx, { + let tests = vec![ + "test1".to_string(), + "test2".to_string(), + "test3".to_string(), + ]; + + let mut worker = Worker::new( + "redis://127.0.0.1/", + ctx.build_id.clone(), + Some("worker-1".to_string()), + tests.clone(), + QueueConfig::default(), + ).expect("Failed to create worker"); + + let test1 = worker.next().expect("Should get test1"); + assert!(worker.acknowledge(&test1)); + + worker.shutdown(); + assert!(worker.is_shutdown_required()); + + assert_eq!(worker.next(), None); + }); +} + +#[test] +#[ignore] +fn test_max_requeues_limit() { + redis_test!(ctx, { + // Use 2 tests so that with tolerance 1.0, we get global_max = 2 + // This allows us to test that per-test limit (max_requeues=2) is enforced + let tests = vec!["test1".to_string(), "test2".to_string()]; + + let config = QueueConfig { + max_requeues: 2, + requeue_tolerance: 1.0, // With 2 tests, global_max = ceil(1.0 * 2) = 2 + timeout: 60, + }; + + let mut worker = Worker::new( + "redis://127.0.0.1/", + ctx.build_id.clone(), + Some("worker-1".to_string()), + tests.clone(), + config, + ).expect("Failed to create worker"); + + // Get test1 first + let test1 = worker.next().expect("Should get test1"); + assert_eq!(test1, "test1"); + + // First requeue of test1 should succeed + assert!(worker.requeue(&test1), "First requeue of test1 should succeed"); + + // Get test2 + let test2 = worker.next().expect("Should get test2"); + assert_eq!(test2, "test2"); + // Just acknowledge test2 to move on + assert!(worker.acknowledge(&test2)); + + // Get test1 again (it was requeued) + let test1_again = worker.next().expect("Should get test1 again"); + assert_eq!(test1_again, "test1"); + + // Second requeue of test1 should succeed (still under max_requeues=2) + assert!(worker.requeue(&test1_again), "Second requeue of test1 should succeed"); + + // Get test1 for the third time + let test1_third = worker.next().expect("Should get test1 for third time"); + assert_eq!(test1_third, "test1"); + + // Third requeue attempt should fail (exceeds max_requeues=2) + assert!(!worker.requeue(&test1_third), "Third requeue should fail (exceeded max)"); + + // Must acknowledge it now + assert!(worker.acknowledge(&test1_third)); + }); +} + +#[test] +#[ignore] +fn test_lost_master_timeout() { + redis_test!(ctx, { + let tests = vec!["test1".to_string(), "test2".to_string()]; + + // Create master worker + let master = Worker::new( + "redis://127.0.0.1/", + ctx.build_id.clone(), + Some("master".to_string()), + tests.clone(), + QueueConfig::default(), + ).expect("Failed to create master"); + + assert!(master.is_master()); + + // Set master status to something other than "ready" or "finished" + // This simulates a stuck or crashed master + let client = redis::Client::open("redis://127.0.0.1/").unwrap(); + let mut con = client.get_connection().unwrap(); + let master_key = format!("build:{}:master-status", ctx.build_id); + + let _: () = redis::cmd("SET") + .arg(&master_key) + .arg("stuck") + .query(&mut con) + .unwrap(); + + // Create another worker and expect it to timeout after the master wait timeout + let mut worker = Worker::new( + "redis://127.0.0.1/", + ctx.build_id.clone(), + Some("worker-1".to_string()), + tests.clone(), + QueueConfig::default(), + ).expect("Failed to create worker"); + worker.set_master_wait_timeout(Duration::from_millis(50)); + + // Expect the worker to timeout - it should not be able to get a test + assert!(worker.next().is_none()); + }); +} \ No newline at end of file diff --git a/rust/ci-queue-playwright/Cargo.toml b/rust/ci-queue-playwright/Cargo.toml new file mode 100644 index 00000000..e9bf7ba1 --- /dev/null +++ b/rust/ci-queue-playwright/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "ci-queue-playwright" +version = "0.1.0" +edition = "2024" + +[dependencies] +ci-queue-core = { path = "../ci-queue-core" } +clap = { version = "4.5.45", features = ["derive", "env"] } +rand = "0.8" +serde = { version = "1.0.219", features = ["derive"] } +serde_json = "1.0.143" +thiserror = "2.0.16" +which = "8.0.0" diff --git a/rust/ci-queue-playwright/src/bin/playwright-queue.rs b/rust/ci-queue-playwright/src/bin/playwright-queue.rs new file mode 100644 index 00000000..4a46a6bd --- /dev/null +++ b/rust/ci-queue-playwright/src/bin/playwright-queue.rs @@ -0,0 +1,61 @@ +use clap::Parser; +use ci_queue_core::{QueueConfig, Worker}; +use ci_queue_playwright::playwright::{ + cli::{Args, Commands}, list_tests, node_runner, process_queue +}; +use rand::{SeedableRng, seq::SliceRandom}; +use rand::rngs::StdRng; + +#[derive(Debug, thiserror::Error)] +enum AppError { + #[error("Executor error: {0}")] + Executor(#[from] ci_queue_playwright::playwright::executor::ExecutorError), + + #[error("Worker initialization failed: {0}")] + WorkerInit(String), + + #[error("Node runner error: {0}")] + NodeRunner(#[from] node_runner::NodeRunnerError), +} + +fn main() -> Result<(), AppError> { + let args = Args::parse(); + + match args.command { + Commands::Run { max_requeues, requeue_tolerance, seed, timeout, worker } => { + let runner = args.runner.unwrap_or(node_runner::detect_node_runner()?); + + let config = QueueConfig { + max_requeues, + requeue_tolerance, + timeout, + }; + + let mut tests = list_tests(&runner)?; + + // Randomize tests based on seed if provided + if let Some(seed_str) = seed { + let mut rng = StdRng::from_seed(string_to_seed(&seed_str)); + tests.shuffle(&mut rng); + } + + println!("Running {} tests", tests.len()); + + let worker = Worker::new(&args.queue, args.build.clone(), worker, tests, config) + .map_err(|e| AppError::WorkerInit(e.to_string()))?; + + process_queue(&runner, Box::new(worker))?; + } + } + + Ok(()) +} + +fn string_to_seed(seed_str: &str) -> [u8; 32] { + let mut seed_bytes = [0u8; 32]; + let seed_hash = seed_str.as_bytes(); + for (i, &byte) in seed_hash.iter().enumerate().take(32) { + seed_bytes[i] = byte; + } + seed_bytes +} \ No newline at end of file diff --git a/rust/ci-queue-playwright/src/lib.rs b/rust/ci-queue-playwright/src/lib.rs new file mode 100644 index 00000000..64903b83 --- /dev/null +++ b/rust/ci-queue-playwright/src/lib.rs @@ -0,0 +1 @@ +pub mod playwright; \ No newline at end of file diff --git a/rust/ci-queue-playwright/src/playwright.rs b/rust/ci-queue-playwright/src/playwright.rs new file mode 100644 index 00000000..5a0b7c9a --- /dev/null +++ b/rust/ci-queue-playwright/src/playwright.rs @@ -0,0 +1,9 @@ +pub mod types; +pub mod node_runner; +pub mod executor; + +pub mod cli; + +pub use types::*; +pub use node_runner::*; +pub use executor::*; \ No newline at end of file diff --git a/rust/ci-queue-playwright/src/playwright/cli.rs b/rust/ci-queue-playwright/src/playwright/cli.rs new file mode 100644 index 00000000..cd1380a1 --- /dev/null +++ b/rust/ci-queue-playwright/src/playwright/cli.rs @@ -0,0 +1,142 @@ +use clap::{Parser, Subcommand}; + +use crate::playwright::node_runner; + +#[derive(Parser, Debug)] +#[command(version)] +#[command(name = "playwright-queue")] +#[command(about = "Distributed Playwright test runner using ci-queue")] +#[command(flatten_help = true)] +pub struct Args { + #[command(subcommand)] + pub command: Commands, + + #[arg(long, env = "BUILD_ID", help = "Unique identifier for the workload. All workers working on the same suite of tests must have the same build identifier")] + pub build: String, + + // TODO: Implement from minitest-queue + // #[arg(long, help = "Count defines how often each test in the grind list is going to be run")] + // grind_count: Option, + + // TODO: Implement from minitest-queue + // #[arg(long, help = "Path to the file that includes the list of tests to grind")] + // grind_list: Option, + + // TODO: Implement from minitest-queue + // #[arg(long, help = "Optional. Sets a prefix for the build id in case a single CI build runs multiple independent test suites")] + // namespace: Option, + + #[arg(long, env = "QUEUE_URL", help = "URL of the queue, e.g. redis://example.com")] + pub queue: String, + + #[arg(long, value_enum, env = "NODE_RUNNER", help = "Node runner to use for executing tests")] + pub runner: Option, +} + +#[derive(Subcommand, Debug)] +pub enum Commands { + Run { + // TODO: Implement from minitest-queue + // #[arg(long, help = "Path to debug log file for e.g. Redis instrumentation")] + // debug_log: Option, + + // TODO: Implement from minitest-queue + // #[arg(long, help = "Defines a file where flaky tests during the execution are written to in json format")] + // export_flaky_tests_file: Option, + + // TODO: Implement from minitest-queue + // #[arg(long, help = "Defines a file where the test failures are written to in the json format")] + // failure_file: Option, + + // TODO: Implement from minitest-queue + // #[arg(long, help = "If heartbeat is enabled, a background process will periodically signal it's still processing the current test (in seconds)")] + // heartbeat: Option, + + // TODO: Implement from minitest-queue + // #[arg(long, default_value = "30", help = "Specify a timeout after which all workers are inactive (e.g. died) (in seconds)")] + // inactive_workers_timeout: usize, + + // TODO: Implement from minitest-queue + // #[arg(long, help = "Defines after how many consecutive failures the worker will be considered unhealthy and terminate itself")] + // max_consecutive_failures: Option, + + // TODO: Implement from minitest-queue + // #[arg(long, help = "Defines how long ci-queue should maximally run in seconds")] + // max_duration: Option, + + #[arg(long, default_value = "0", help = "Defines how many times a single test can be requeued")] + max_requeues: usize, + + // TODO: Implement from minitest-queue + // #[arg(long, help = "Defines how many user test tests can fail")] + // max_test_failed: Option, + + // TODO: Implement from minitest-queue + // #[arg(long, help = "Set the time limit of the execution time from grinds on a given test (in milliseconds, decimal allowed)")] + // max_test_duration: Option, + + // TODO: Implement from minitest-queue + // #[arg(long, default_value = "0.5", help = "The percentile for max-test-duration. Must be within the range 0 < percentile <= 1")] + // max_test_duration_percentile: f64, + + // TODO: Implement from minitest-queue + // #[arg(long, default_value = "30", help = "Specify a timeout to elect the leader and populate the queue (in seconds)")] + // queue_init_timeout: usize, + + // TODO: Implement from minitest-queue + // #[arg(long, default_value = "28800", help = "Defines how long the test report remain after the test run, in seconds. Defaults to 28,800 (8 hours)")] + // redis_ttl: usize, + + // TODO: Implement from minitest-queue + // #[arg(long, default_value = "30", help = "Specify a timeout after which the report command will fail if not all tests have been processed (in seconds)")] + // report_timeout: usize, + + #[arg(long, default_value = "0.0", help = "Defines how many requeues can happen overall, based on the test suite size (e.g. 0.05 for 5%)")] + requeue_tolerance: f64, + + #[arg(long, help = "Specify a seed used to shuffle the test suite. If not provided, the tests will be run in the default order provided by `playwright test --list`.")] + seed: Option, + + #[arg(long, default_value = "60", help = "Timeout after which if a test hasn't completed, it will be picked up by another worker (in seconds)")] + timeout: usize, + + // TODO: Implement from minitest-queue + // #[arg(long, help = "Must set this option in report and report_grind command if you set --max-test-duration in the report_grind")] + // track_test_duration: bool, + + // TODO: Implement from minitest-queue + // #[arg(short, long, help = "Verbose. Show progress processing files")] + // verbose: bool, + + // TODO: Implement from minitest-queue + // #[arg(long, help = "Defines a file where warnings during the execution are written to")] + // warnings_file: Option, + + #[arg(long, env = "WORKER_ID", help = "A unique identifier for this worker. Must be consistent to allow retries")] + worker: Option, + }, + + // TODO: Implement from minitest-queue + // Report { + // #[arg(long, default_value = "30", help = "Specify a timeout after which all workers are inactive (e.g. died) (in seconds)")] + // inactive_workers_timeout: usize, + + // #[arg(long, default_value = "30", help = "Specify a timeout after which the report command will fail if not all tests have been processed (in seconds)")] + // report_timeout: usize, + + // #[arg(long, help = "Must set this option in report and report_grind command if you set --max-test-duration in the report_grind")] + // track_test_duration: bool, + + // #[arg(short, long, help = "Verbose. Show progress processing files")] + // verbose: bool, + // } + + // TODO: Implement from minitest-queue + // Retry {} + + // TODO: Implement from minitest-queue + // Bisect { + // #[arg(long)] + // failing_test: String, + // } +} diff --git a/rust/ci-queue-playwright/src/playwright/executor.rs b/rust/ci-queue-playwright/src/playwright/executor.rs new file mode 100644 index 00000000..b3234326 --- /dev/null +++ b/rust/ci-queue-playwright/src/playwright/executor.rs @@ -0,0 +1,121 @@ +use std::process::{Command, Stdio}; +use crate::playwright::types::{PlaywrightTest, PlaywrightListOutput}; +use crate::playwright::node_runner::{NodeRunner, NodeRunnerError}; +use ci_queue_core::Queue; + +#[derive(Debug, thiserror::Error)] +pub enum ExecutorError { + #[error("Node runner error: {0}")] + NodeRunner(#[from] NodeRunnerError), + + #[error("Failed to execute playwright test --list: {0}")] + ListCommand(#[from] std::io::Error), + + #[error("Failed to list tests: {0}")] + ListTestsFailed(String), + + #[error("Failed to parse Playwright JSON output: {0}")] + ParseJson(#[from] serde_json::Error), +} + +pub fn list_tests(runner: &NodeRunner) -> Result, ExecutorError> { + let cmd: &'static str = runner.command(); + let mut args = runner.args(); + args.extend(["test", "--list", "--reporter=json"].iter().map(|s| s.to_string())); + + let output = Command::new(&cmd) + .args(&args) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .output()?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(ExecutorError::ListTestsFailed(stderr.to_string())); + } + + let stdout = String::from_utf8_lossy(&output.stdout); + + let list_output: PlaywrightListOutput = serde_json::from_str(&stdout)?; + + let mut tests = Vec::new(); + for suite in list_output.suites { + for spec in suite.specs { + tests.push(PlaywrightTest { + title: spec.title, + file: spec.file, + line: spec.line, + column: spec.column, + }); + } + } + + Ok(tests) +} + +pub fn run_test(runner: &NodeRunner, test: &PlaywrightTest) -> Result { + let cmd: &'static str = runner.command(); + let mut args = runner.args(); + + // TODO: extend with passing through options to the test command + args.extend(["test", &format!("{}:{}", test.file, test.line.unwrap_or(0))].iter().map(|s| s.to_string())); + + println!("Running: {} ({}:{})", test.title, test.file, test.line.unwrap_or(0)); + + let status = Command::new(&cmd) + .args(&args) + .status()?; + + Ok(status.success()) +} + +pub fn process_queue(runner: &NodeRunner, mut queue: Box>) -> Result<(), ExecutorError> { + let mut passed = 0; + let mut failed = Vec::new(); + let mut test_count = 0; + + while let Some(test) = queue.next() { + test_count += 1; + + println!("\n[{}] ", test_count); + + match run_test(runner, &test) { + Ok(true) => { + println!(" ✅ PASSED"); + passed += 1; + } + Ok(false) => { + println!(" ❌ FAILED"); + + if !queue.requeue(&test) { + failed.push(test.clone()); + } else { + println!(" 🔄 Requeued for retry"); + } + } + Err(e) => { + eprintln!(" ⚠️ Error running test: {}", e); + failed.push(test.clone()); + } + } + + queue.acknowledge(&test); + } + + println!("\n========================================"); + println!("Test Results Summary"); + println!("========================================"); + println!("Total: {}", test_count); + println!("Passed: {}", passed); + println!("Failed: {}", failed.len()); + + if !failed.is_empty() { + println!("\nFailed tests:"); + for test in &failed { + println!(" - {} ({})", test.title, test.file); + } + std::process::exit(1); + } + + Ok(()) +} \ No newline at end of file diff --git a/rust/ci-queue-playwright/src/playwright/node_runner.rs b/rust/ci-queue-playwright/src/playwright/node_runner.rs new file mode 100644 index 00000000..e18bcdc5 --- /dev/null +++ b/rust/ci-queue-playwright/src/playwright/node_runner.rs @@ -0,0 +1,63 @@ +use clap::ValueEnum; + +#[derive(ValueEnum, Clone, Copy, Debug)] +pub enum NodeRunner { + Pnpm, + Yarn, + Npm, + Npx, +} + +const SEARCH_ORDER: &[NodeRunner] = &[NodeRunner::Pnpm, NodeRunner::Yarn, NodeRunner::Npm, NodeRunner::Npx]; + +impl NodeRunner { + pub fn command(&self) -> &'static str { + match self { + Self::Pnpm => "pnpm", + Self::Yarn => "yarn", + Self::Npm => "npm", + Self::Npx => "npx", + } + } + + pub fn args(&self) -> Vec { + match self { + Self::Pnpm => vec!["exec", "playwright"], + Self::Yarn => vec!["playwright"], + Self::Npm => vec!["exec", "--", "playwright"], + Self::Npx => vec!["playwright"], + } + .into_iter() + .map(String::from) + .collect() + } +} + +#[derive(Debug, thiserror::Error)] +pub enum NodeRunnerError { + #[error("No Node.js package manager found (tried: pnpm, yarn, npm, npx)")] + NoPackageManagerFound, +} + +pub fn detect_node_runner() -> Result { + SEARCH_ORDER + .iter() + .find(|runner| which::which(runner.command()).is_ok()) + .copied() + .ok_or(NodeRunnerError::NoPackageManagerFound) +} + + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_detect_node_runner() { + let result = detect_node_runner(); + if result.is_ok() { + let runner = result.unwrap(); + assert!(["pnpm", "yarn", "npm", "npx"].contains(&runner.command())); + } + } +} \ No newline at end of file diff --git a/rust/ci-queue-playwright/src/playwright/types.rs b/rust/ci-queue-playwright/src/playwright/types.rs new file mode 100644 index 00000000..d4d40838 --- /dev/null +++ b/rust/ci-queue-playwright/src/playwright/types.rs @@ -0,0 +1,61 @@ +use ci_queue_core::queue::{TestIdentifier, TestRegistry}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Deserialize, Serialize, Clone, Hash, PartialEq, Eq)] +pub struct PlaywrightTest { + pub title: String, + pub file: String, + pub line: Option, + pub column: Option, +} + +impl TestIdentifier for PlaywrightTest { + fn to_redis_value(&self) -> String { + format!("{}:{}:{}", self.file, self.line.unwrap_or(0), self.title) + } + + fn from_redis_value(s: &str, registry: &TestRegistry) -> Option { + registry.get(s).cloned() + } +} + +#[derive(Debug, Deserialize)] +#[allow(dead_code)] +pub struct PlaywrightListOutput { + #[serde(default)] + pub config: serde_json::Value, + #[serde(default)] + pub errors: Vec, + #[serde(default)] + pub suites: Vec, + #[serde(default)] + pub stats: serde_json::Value, +} + +#[derive(Debug, Deserialize)] +#[allow(dead_code)] +pub struct PlaywrightSuite { + pub title: String, + pub file: String, + #[serde(default)] + pub line: u32, + #[serde(default)] + pub column: u32, + #[serde(default)] + pub specs: Vec, +} + +#[derive(Debug, Deserialize)] +#[allow(dead_code)] +pub struct PlaywrightSpec { + pub title: String, + pub file: String, + pub line: Option, + pub column: Option, + #[serde(default)] + pub tags: Vec, + #[serde(default)] + pub tests: Vec, + #[serde(default)] + pub ok: bool, +} \ No newline at end of file