From 1a293fd2fd8cb92efa75659adf80ca305965b91f Mon Sep 17 00:00:00 2001 From: Ely Delva Date: Thu, 6 Nov 2025 05:24:48 +0100 Subject: [PATCH 1/3] feat: update dependencies and refactor Redis connection handling Add new dependencies for improved functionality, including allocator-api2, cassowary, and compact_str. Refactor Redis connection handling by introducing a new RedisPool structure, enhancing error handling, and implementing retry logic for connection failures. Update configuration to support console modes and improve CLI commands for better user experience. --- Cargo.lock | 353 ++++++++++++++++++++++++++++++++-- Cargo.toml | 8 +- src/client/mod.rs | 364 ++++++++++++++++++++++++++++++++++++ src/{ => command}/cli.rs | 3 +- src/command/executor.rs | 2 +- src/command/mod.rs | 5 + src/config.rs | 29 +++ src/connection_pool.rs | 132 ------------- src/console/banner.rs | 15 ++ src/console/console.rs | 287 ++++++++++++++++++++++++++++ src/console/mod.rs | 8 + src/console/render.rs | 204 ++++++++++++++++++++ src/console/state.rs | 36 ++++ src/console/status.rs | 58 ++++++ src/handlers/command.rs | 6 +- src/handlers/pipeline.rs | 6 +- src/handlers/transaction.rs | 184 ++++++++++++------ src/http/mod.rs | 2 + src/{ => http}/routes.rs | 6 +- src/{ => http}/server.rs | 12 +- src/main.rs | 240 ++++++++++++++++++------ src/models/mod.rs | 11 ++ src/{ => models}/models.rs | 1 + src/pool_manager.rs | 80 -------- src/redis_client.rs | 57 ------ src/{ => utils}/auth.rs | 2 +- src/{ => utils}/encoding.rs | 0 src/{ => utils}/errors.rs | 1 + src/utils/mod.rs | 5 + 29 files changed, 1701 insertions(+), 416 deletions(-) create mode 100644 src/client/mod.rs rename src/{ => command}/cli.rs (99%) create mode 100644 src/command/mod.rs delete mode 100644 src/connection_pool.rs create mode 100644 src/console/banner.rs create mode 100644 src/console/console.rs create mode 100644 src/console/mod.rs create mode 100644 src/console/render.rs create mode 100644 src/console/state.rs create mode 100644 src/console/status.rs create mode 100644 src/http/mod.rs rename src/{ => http}/routes.rs (95%) rename src/{ => http}/server.rs (69%) create mode 100644 src/models/mod.rs rename src/{ => models}/models.rs (99%) delete mode 100644 src/pool_manager.rs delete mode 100644 src/redis_client.rs rename src/{ => utils}/auth.rs (97%) rename src/{ => utils}/encoding.rs (100%) rename src/{ => utils}/errors.rs (98%) create mode 100644 src/utils/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 5fb5c29..4f7d07f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11,6 +11,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" + [[package]] name = "anstream" version = "0.6.21" @@ -241,6 +247,21 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" +[[package]] +name = "cassowary" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df8670b8c7b9dae1793364eafadf7239c40d669904660c5960d74cfd80b46a53" + +[[package]] +name = "castaway" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dec551ab6e7578819132c713a93c022a05d60159dc86e7a7050223577484c55a" +dependencies = [ + "rustversion", +] + [[package]] name = "cfg-if" version = "1.0.4" @@ -307,6 +328,19 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "compact_str" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f86b9c4c00838774a6d902ef931eff7470720c51d90c2e32cfe15dc304737b3f" +dependencies = [ + "castaway", + "cfg-if", + "itoa", + "ryu", + "static_assertions", +] + [[package]] name = "console" version = "0.15.11" @@ -316,7 +350,7 @@ dependencies = [ "encode_unicode", "libc", "once_cell", - "unicode-width", + "unicode-width 0.2.2", "windows-sys 0.59.0", ] @@ -331,16 +365,44 @@ dependencies = [ ] [[package]] -name = "dashmap" -version = "5.5.3" +name = "crossterm" +version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +checksum = "f476fe445d41c9e991fd07515a6f463074b782242ccf4a5b7b1d1012e70824df" dependencies = [ - "cfg-if", - "hashbrown", - "lock_api", - "once_cell", - "parking_lot_core", + "bitflags 2.10.0", + "crossterm_winapi", + "libc", + "mio 0.8.11", + "parking_lot", + "signal-hook", + "signal-hook-mio", + "winapi", +] + +[[package]] +name = "crossterm" +version = "0.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "829d955a0bb380ef178a640b91779e3987da38c9aea133b20614cfed8cdea9c6" +dependencies = [ + "bitflags 2.10.0", + "crossterm_winapi", + "mio 1.1.0", + "parking_lot", + "rustix", + "signal-hook", + "signal-hook-mio", + "winapi", +] + +[[package]] +name = "crossterm_winapi" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acdd7c62a3665c7f6830a51635d9ac9b23ed385797f70a83bb8bafe9c572ab2b" +dependencies = [ + "winapi", ] [[package]] @@ -363,18 +425,46 @@ dependencies = [ "syn", ] +[[package]] +name = "either" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" + [[package]] name = "encode_unicode" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0" +[[package]] +name = "equivalent" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" + +[[package]] +name = "errno" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" +dependencies = [ + "libc", + "windows-sys 0.61.2", +] + [[package]] name = "fnv" version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + [[package]] name = "form_urlencoded" version = "1.2.2" @@ -486,9 +576,14 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.14.5" +version = "0.15.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash", +] [[package]] name = "heck" @@ -732,6 +827,15 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.15" @@ -750,6 +854,12 @@ version = "0.2.177" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2874a2af47a2325c2001a6e6fad9b16a53b802102b528163885171cf92b15976" +[[package]] +name = "linux-raw-sys" +version = "0.4.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" + [[package]] name = "litemap" version = "0.8.1" @@ -771,6 +881,15 @@ version = "0.4.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" +[[package]] +name = "lru" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" +dependencies = [ + "hashbrown", +] + [[package]] name = "matchers" version = "0.2.0" @@ -798,6 +917,18 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "mio" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +dependencies = [ + "libc", + "log", + "wasi", + "windows-sys 0.48.0", +] + [[package]] name = "mio" version = "1.1.0" @@ -805,6 +936,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69d83b0086dc8ecf3ce9ae2874b2d1290252e2a30720bea58a5c6639b0092873" dependencies = [ "libc", + "log", "wasi", "windows-sys 0.61.2", ] @@ -859,6 +991,12 @@ dependencies = [ "windows-link", ] +[[package]] +name = "paste" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" + [[package]] name = "percent-encoding" version = "2.3.2" @@ -978,6 +1116,27 @@ dependencies = [ "getrandom", ] +[[package]] +name = "ratatui" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d16546c5b5962abf8ce6e2881e722b4e0ae3b6f1a08a26ae3573c55853ca68d3" +dependencies = [ + "bitflags 2.10.0", + "cassowary", + "compact_str", + "crossterm 0.27.0", + "itertools", + "lru", + "paste", + "stability", + "strum", + "strum_macros", + "unicode-segmentation", + "unicode-truncate", + "unicode-width 0.1.14", +] + [[package]] name = "redis" version = "0.24.0" @@ -1028,6 +1187,19 @@ version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58" +[[package]] +name = "rustix" +version = "0.38.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" +dependencies = [ + "bitflags 2.10.0", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.59.0", +] + [[package]] name = "rustversion" version = "1.0.22" @@ -1127,6 +1299,28 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "signal-hook" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d881a16cf4426aa584979d30bd82cb33429027e42122b169753d6ef1085ed6e2" +dependencies = [ + "libc", + "signal-hook-registry", +] + +[[package]] +name = "signal-hook-mio" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b75a19a7a740b25bc7944bdee6172368f988763b744e3d4dfe753f6b4ece40cc" +dependencies = [ + "libc", + "mio 0.8.11", + "mio 1.1.0", + "signal-hook", +] + [[package]] name = "signal-hook-registry" version = "1.4.6" @@ -1178,6 +1372,16 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "stability" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d904e7009df136af5297832a3ace3370cd14ff1546a232f4f185036c2736fcac" +dependencies = [ + "quote", + "syn", +] + [[package]] name = "stable_deref_trait" version = "1.2.1" @@ -1186,34 +1390,60 @@ checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" [[package]] name = "stashless" -version = "0.1.0" +version = "0.2.0" dependencies = [ - "anyhow", "axum 0.7.9", "axum-test", "base64", "clap", "console", - "dashmap", + "crossterm 0.28.1", "rand", + "ratatui", "redis", "serde", "serde_json", - "serde_urlencoded", "thiserror", "tokio", - "tower 0.4.13", "tower-http", "tracing", "tracing-subscriber", ] +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "strsim" version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "strum" +version = "0.26.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" +dependencies = [ + "strum_macros", +] + +[[package]] +name = "strum_macros" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "rustversion", + "syn", +] + [[package]] name = "syn" version = "2.0.108" @@ -1326,7 +1556,7 @@ checksum = "ff360e02eab121e0bc37a2d3b4d4dc622e6eda3a8e5253d5435ecf5bd4c68408" dependencies = [ "bytes", "libc", - "mio", + "mio 1.1.0", "parking_lot", "pin-project-lite", "signal-hook-registry", @@ -1504,6 +1734,29 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5" +[[package]] +name = "unicode-segmentation" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" + +[[package]] +name = "unicode-truncate" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3644627a5af5fa321c95b9b235a72fd24cd29c648c2c379431e6628655627bf" +dependencies = [ + "itertools", + "unicode-segmentation", + "unicode-width 0.1.14", +] + +[[package]] +name = "unicode-width" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af" + [[package]] name = "unicode-width" version = "0.2.2" @@ -1589,6 +1842,15 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.5", +] + [[package]] name = "windows-sys" version = "0.52.0" @@ -1625,6 +1887,21 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + [[package]] name = "windows-targets" version = "0.52.6" @@ -1658,6 +1935,12 @@ dependencies = [ "windows_x86_64_msvc 0.53.1", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" @@ -1670,6 +1953,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + [[package]] name = "windows_aarch64_msvc" version = "0.52.6" @@ -1682,6 +1971,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -1706,6 +2001,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + [[package]] name = "windows_i686_msvc" version = "0.52.6" @@ -1718,6 +2019,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + [[package]] name = "windows_x86_64_gnu" version = "0.52.6" @@ -1730,6 +2037,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" @@ -1742,6 +2055,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + [[package]] name = "windows_x86_64_msvc" version = "0.52.6" diff --git a/Cargo.toml b/Cargo.toml index d52b8c2..d3a260e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "stashless" -version = "0.1.0" +version = "0.2.0" edition = "2021" [dependencies] @@ -12,15 +12,13 @@ tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -anyhow = "1.0" thiserror = "1.0" -tower = "0.4" tower-http = { version = "0.5", features = ["cors", "limit"] } rand = "0.8" console = "0.15" base64 = "0.21" -serde_urlencoded = "0.7" -dashmap = "5.5" +ratatui = { version = "0.27", default-features = false, features = ["crossterm"] } +crossterm = "0.28" [dev-dependencies] axum-test = "9.0" diff --git a/src/client/mod.rs b/src/client/mod.rs new file mode 100644 index 0000000..f814a12 --- /dev/null +++ b/src/client/mod.rs @@ -0,0 +1,364 @@ +use crate::config::Config; +use crate::console::Console; +use redis::aio::ConnectionManager; +use redis::{Client, RedisError}; +use std::sync::{ + atomic::{AtomicU32, Ordering}, + Arc, +}; +use std::time::Duration; +use tokio::sync::Semaphore; +use tokio::time::sleep; + +#[derive(Clone)] +pub struct RedisPool { + client: Arc, + semaphore: Arc, + console: Option>, + max_retry: i32, +} + +impl RedisPool { + #[allow(dead_code)] + pub async fn new(config: &Config) -> Result { + Self::new_with_console(config, None).await + } + + pub async fn new_with_console( + config: &Config, + console: Option>, + ) -> Result { + let url = config.redis_url(); + + // Configure client with connection timeouts + let client = Client::open(url)?; + + // Test connection + let mut conn = client.get_connection_manager().await?; + redis::cmd("PING") + .query_async::<_, String>(&mut conn) + .await?; + + Ok(Self { + client: Arc::new(client), + semaphore: Arc::new(Semaphore::new(config.max_connections)), + console, + max_retry: config.max_retry, + }) + } + + /// Check if an error indicates a connection failure that requires reconnection + fn is_connection_error(&self, error: &RedisError) -> bool { + match error.kind() { + redis::ErrorKind::IoError => true, + redis::ErrorKind::ResponseError => { + // Check if it's a connection-related response error + let error_msg = error.to_string().to_lowercase(); + error_msg.contains("connection") + || error_msg.contains("broken pipe") + || error_msg.contains("connection reset") + || error_msg.contains("closed") + || error_msg.contains("connection refused") + } + _ => false, + } + } + + /// Log disconnection and update console status + fn log_disconnection(&self, error: &str) { + if let Some(console) = &self.console { + let _ = console.log_warn(format!("Redis connection lost: {}", error)); + if let Err(e) = console.update_redis_status(crate::console::Status::Reconnecting) { + let _ = console.log_error(format!("Failed to update console status: {}", e)); + } + } + } + + /// Log reconnection attempt + fn log_reconnection_attempt(&self, attempt: u32) { + if let Some(console) = &self.console { + let _ = console.log_info(format!( + "Attempting to reconnect to Redis (attempt {})", + attempt + )); + } + } + + /// Log connection attempt + fn log_connection_attempt(&self, attempt: u32) { + if let Some(console) = &self.console { + let _ = console.log_debug(format!("Testing Redis connection (attempt {})", attempt)); + } + } + + /// Log successful connection + fn log_connection_success(&self) { + if let Some(console) = &self.console { + let _ = console.log_debug("Redis connection successful".to_string()); + } + } + + /// Log successful reconnection and update console status + fn log_reconnection_success(&self) { + if let Some(console) = &self.console { + let _ = console.log_info("Redis connection restored".to_string()); + if let Err(e) = console.update_redis_status(crate::console::Status::Connected) { + let _ = console.log_error(format!("Failed to update console status: {}", e)); + } + } + } + + /// Log reconnection failure + fn log_reconnection_failure(&self, error: &str) { + if let Some(console) = &self.console { + let _ = console.log_error(format!("Failed to reconnect to Redis: {}", error)); + } + } + + /// Ping Redis with retry logic + pub async fn ping_with_retry(&self) -> Result { + const RETRY_DELAY: Duration = Duration::from_millis(500); + + let mut attempt = 1u32; + loop { + match self.get_connection().await { + Ok(mut conn) => { + match redis::cmd("PING").query_async::<_, String>(&mut conn).await { + Ok(result) => { + // Success on retry - log reconnection + if attempt > 1 { + self.log_reconnection_success(); + } + return Ok(result); + } + Err(e) => { + if self.is_connection_error(&e) { + let error_msg = e.to_string(); + if attempt == 1 { + self.log_disconnection(&error_msg); + } + + if self.should_retry(attempt) { + attempt += 1; + self.log_reconnection_attempt(attempt); + sleep(RETRY_DELAY).await; + continue; + } else { + self.log_reconnection_failure(&error_msg); + return Err(e); + } + } else { + // Non-connection error, return immediately + return Err(e); + } + } + } + } + Err(e) => { + if self.is_connection_error(&e) { + let error_msg = e.to_string(); + if attempt == 1 { + self.log_disconnection(&error_msg); + } + + if self.should_retry(attempt) { + attempt += 1; + self.log_reconnection_attempt(attempt); + sleep(RETRY_DELAY).await; + continue; + } else { + self.log_reconnection_failure(&error_msg); + return Err(e); + } + } else { + return Err(e); + } + } + } + } + } + + /// Check if we should retry based on max_retry setting + fn should_retry(&self, attempt: u32) -> bool { + if self.max_retry == -1 { + // Infinite retries + true + } else { + attempt < self.max_retry as u32 + } + } + + /// Get the maximum number of retries (-1 for infinite) + pub fn max_retry(&self) -> i32 { + self.max_retry + } + + pub async fn get_connection(&self) -> Result { + // Acquire permit from semaphore to limit concurrent connections + let _permit = self.semaphore.acquire().await.map_err(|_| { + RedisError::from(( + redis::ErrorKind::IoError, + "Failed to acquire connection permit", + )) + })?; + + match self.client.get_connection_manager().await { + Ok(conn) => { + // Log to console occasionally to show activity (every 10th connection) + static CONNECTION_COUNT: AtomicU32 = AtomicU32::new(0); + let count = CONNECTION_COUNT.fetch_add(1, Ordering::Relaxed); + if count % 10 == 0 && count > 0 { + self.log_connection_success(); + } + Ok(conn) + } + Err(e) => { + // Check if it's a connection error and log it + if self.is_connection_error(&e) { + self.log_disconnection(&e.to_string()); + } + Err(e) + } + } + } + + pub async fn execute_command( + &self, + cmd: redis::Cmd, + ) -> Result { + const RETRY_DELAY: Duration = Duration::from_secs(1); + + let mut attempt = 1u32; + loop { + // Log connection attempt (only for retries to avoid spam) + if attempt > 1 { + self.log_connection_attempt(attempt); + } + + match self.get_connection().await { + Ok(mut conn) => { + match cmd.query_async(&mut conn).await { + Ok(result) => { + // If we had retried, log success + if attempt > 1 { + self.log_reconnection_success(); + } + return Ok(result); + } + Err(e) => { + if self.is_connection_error(&e) { + let error_msg = e.to_string(); + if attempt == 1 { + self.log_disconnection(&error_msg); + } + + if self.should_retry(attempt) { + attempt += 1; + self.log_reconnection_attempt(attempt); + sleep(RETRY_DELAY * (attempt - 1)).await; + continue; + } else { + self.log_reconnection_failure(&error_msg); + return Err(e); + } + } else { + // Non-connection error, return immediately + return Err(e); + } + } + } + } + Err(e) => { + if self.is_connection_error(&e) { + let error_msg = e.to_string(); + if attempt == 1 { + self.log_disconnection(&error_msg); + } + + if self.should_retry(attempt) { + attempt += 1; + self.log_reconnection_attempt(attempt); + sleep(RETRY_DELAY * (attempt - 1)).await; + continue; + } else { + self.log_reconnection_failure(&error_msg); + return Err(e); + } + } else { + return Err(e); + } + } + } + } + } + + pub async fn execute_pipeline( + &self, + pipeline: &mut redis::Pipeline, + ) -> Result, RedisError> { + const RETRY_DELAY: Duration = Duration::from_secs(1); + + let mut attempt = 1u32; + loop { + // Log connection attempt (only for retries to avoid spam) + if attempt > 1 { + self.log_connection_attempt(attempt); + } + + match self.get_connection().await { + Ok(mut conn) => { + match pipeline.query_async(&mut conn).await { + Ok(result) => { + // If we had retried, log success + if attempt > 1 { + self.log_reconnection_success(); + } + return Ok(result); + } + Err(e) => { + if self.is_connection_error(&e) { + let error_msg = e.to_string(); + if attempt == 1 { + self.log_disconnection(&error_msg); + } + + if self.should_retry(attempt) { + attempt += 1; + self.log_reconnection_attempt(attempt); + sleep(RETRY_DELAY * (attempt - 1)).await; + continue; + } else { + self.log_reconnection_failure(&error_msg); + return Err(e); + } + } else { + // Non-connection error, return immediately + return Err(e); + } + } + } + } + Err(e) => { + if self.is_connection_error(&e) { + let error_msg = e.to_string(); + if attempt == 1 { + self.log_disconnection(&error_msg); + } + + if self.should_retry(attempt) { + attempt += 1; + self.log_reconnection_attempt(attempt); + sleep(RETRY_DELAY * (attempt - 1)).await; + continue; + } else { + self.log_reconnection_failure(&error_msg); + return Err(e); + } + } else { + return Err(e); + } + } + } + } + } +} diff --git a/src/cli.rs b/src/command/cli.rs similarity index 99% rename from src/cli.rs rename to src/command/cli.rs index d9f55a4..24a47a0 100644 --- a/src/cli.rs +++ b/src/command/cli.rs @@ -2,7 +2,7 @@ use clap::{Parser, Subcommand}; use console::{style, Emoji}; use crate::config::Config; use crate::redis_client::RedisClientWrapper; -use crate::command::CommandExecutor; +use crate::command::executor::CommandExecutor; #[derive(Parser)] #[command(name = "stashless")] @@ -193,3 +193,4 @@ pub fn handle_version() -> Result<(), String> { println!("{}", style("Redis-over-HTTP adapter compatible with Upstash SDK").dim()); Ok(()) } + diff --git a/src/command/executor.rs b/src/command/executor.rs index 5a274bb..128c7a6 100644 --- a/src/command/executor.rs +++ b/src/command/executor.rs @@ -1,4 +1,4 @@ -use crate::error::AppError; +use crate::utils::AppError; use crate::command::registry::CommandRegistry; use redis::aio::ConnectionManager; use redis::Value as RedisValue; diff --git a/src/command/mod.rs b/src/command/mod.rs new file mode 100644 index 0000000..ca8c7ac --- /dev/null +++ b/src/command/mod.rs @@ -0,0 +1,5 @@ +pub mod executor; +pub mod cli; + +pub use executor::CommandExecutor; + diff --git a/src/config.rs b/src/config.rs index 5c7e1bf..2fcc026 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,5 +1,21 @@ use std::env; +#[derive(Debug, Clone)] +pub enum ConsoleMode { + Rich, + Standard, +} + +impl ConsoleMode { + pub fn from_str(s: &str) -> Self { + match s.to_lowercase().as_str() { + "standard" => ConsoleMode::Standard, + "rich" => ConsoleMode::Rich, + _ => ConsoleMode::Standard, + } + } +} + #[derive(Debug, Clone)] pub struct Config { pub redis_host: String, @@ -8,6 +24,8 @@ pub struct Config { pub port: u16, pub token: String, pub max_connections: usize, + pub max_retry: i32, + pub console_mode: ConsoleMode, } impl Config { @@ -42,6 +60,14 @@ impl Config { return Err("SLASHLESS_MAX_CONNECTION must be greater than 0".to_string()); } + let console_mode = env::var("SLASHLESS_MODE").unwrap_or_else(|_| "standard".to_string()); + let console_mode = ConsoleMode::from_str(&console_mode); + + let max_retry = env::var("SLASHLESS_MAX_RETRY") + .unwrap_or_else(|_| "-1".to_string()) + .parse::() + .map_err(|_| "SLASHLESS_MAX_RETRY must be a valid integer")?; + Ok(Self { redis_host, redis_port, @@ -49,6 +75,8 @@ impl Config { port, token, max_connections, + max_retry, + console_mode, }) } @@ -60,6 +88,7 @@ impl Config { format!("{}:{}", self.host, self.port) } + #[allow(dead_code)] pub fn masked_token(&self) -> String { if self.token.len() <= 8 { format!("{}***", &self.token[..1.min(self.token.len())]) diff --git a/src/connection_pool.rs b/src/connection_pool.rs deleted file mode 100644 index c779555..0000000 --- a/src/connection_pool.rs +++ /dev/null @@ -1,132 +0,0 @@ -use crate::error::AppError; -use crate::token_resolver::TokenConfig; -use dashmap::DashMap; -use redis::aio::ConnectionManager; -use redis::Client as RedisClient; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; -use std::time::{Duration, Instant}; -use tokio::time::interval; - -pub struct ConnectionPool { - connections: Arc>, - config: TokenConfig, - current_index: AtomicUsize, - last_used: Arc>, -} - -impl ConnectionPool { - pub fn new(config: TokenConfig) -> Self { - Self { - connections: Arc::new(DashMap::new()), - config, - current_index: AtomicUsize::new(0), - last_used: Arc::new(tokio::sync::RwLock::new(Instant::now())), - } - } - - async fn ensure_connections(&self) -> Result<(), AppError> { - if !self.connections.is_empty() { - // Mettre à jour le timestamp de dernière utilisation - *self.last_used.write().await = Instant::now(); - return Ok(()); - } - - // Lazy initialization : créer les connexions à la première utilisation - let max_conns = self.config.max_connections as usize; - - tracing::debug!("Creating {} connections for pool {}", max_conns, self.config.srh_id); - - for i in 0..max_conns { - let connection = Self::create_connection(&self.config.connection_string).await?; - self.connections.insert(i, connection); - } - - *self.last_used.write().await = Instant::now(); - tracing::info!("Created {} connections for pool {}", max_conns, self.config.srh_id); - - Ok(()) - } - - async fn create_connection(connection_string: &str) -> Result { - let client = RedisClient::open(connection_string) - .map_err(|e| AppError::RedisError(e))?; - - let connection_manager = ConnectionManager::new(client).await - .map_err(|e| AppError::RedisError(e))?; - - Ok(connection_manager) - } - - pub async fn borrow_connection(&self) -> Result { - self.ensure_connections().await?; - - // Round-robin pour distribuer les requêtes - let index = self.current_index.fetch_add(1, Ordering::Relaxed); - let pool_size = self.connections.len(); - - if pool_size == 0 { - return Err(AppError::ConfigError("No connections available".to_string())); - } - - let actual_index = index % pool_size; - - // Mettre à jour le timestamp - *self.last_used.write().await = Instant::now(); - - // Obtenir la connexion (ConnectionManager implémente Clone) - let connection = self.connections - .get(&actual_index) - .ok_or_else(|| AppError::ConfigError("Connection not found".to_string()))? - .clone(); - - Ok(connection) - } - - pub async fn get_last_used(&self) -> Instant { - *self.last_used.read().await - } - - pub async fn destroy(&self) { - tracing::debug!("Destroying pool {}", self.config.srh_id); - self.connections.clear(); - } - - pub fn srh_id(&self) -> &str { - &self.config.srh_id - } -} - -// Task de nettoyage des pools inactifs -pub async fn start_idle_cleanup_task( - pools: Arc>>, -) { - let mut interval = interval(Duration::from_secs(60)); // Vérifier toutes les minutes - let idle_timeout = Duration::from_secs(15 * 60); // 15 minutes - - loop { - interval.tick().await; - - let now = Instant::now(); - let mut to_remove = Vec::new(); - - // Vérifier tous les pools - for entry in pools.iter() { - let last_used = entry.value().get_last_used().await; - let elapsed = now.duration_since(last_used); - - if elapsed >= idle_timeout { - tracing::info!("Pool {} has been idle for {:?}, destroying it", entry.key(), elapsed); - to_remove.push(entry.key().clone()); - } - } - - // Supprimer les pools inactifs - for key in to_remove { - if let Some((_, pool)) = pools.remove(&key) { - pool.destroy().await; - } - } - } -} - diff --git a/src/console/banner.rs b/src/console/banner.rs new file mode 100644 index 0000000..f6ff3b0 --- /dev/null +++ b/src/console/banner.rs @@ -0,0 +1,15 @@ +pub const BANNER: &str = r#" + :#%%%%%%%%%%%%%%*. .:#%%%%%%%%%%%%%%+. + .*@@@@@@@@@@@@@@%: -@@@@@@@@@@@@@@@+. + .=@@@@@@@@@@@@@@@= .*@@@@@@@@@@@@@@@: + .-%@@@@@@@@@@@@@@@+. .#@@@@@@@@@@@@@@#: + =@@@@@@@@@@@@@@@@@@#. :%@@@@@@@@@@@@@@*. + .*@@@@@@@@@@@@@@@@@@@@%- =@@@@@@@@@@@@@@@= + :#@@@@@@@@@@@@@@@@@@@@@@@%@@@@@@@@@@@@@@%- + :%@@@@@@@@@@@@@@@@@@@@@@@-.+@@@@@@@@@@@@%: + .=@@@@@@@@@@@@@@@@@@@@@@@%: -%@@@@@@@@@*. + .*@@@@@@@@@@@@@@@@@@@@@@@*. .:#@@@@@@@=. + .#@@@@@@@@@@@@@@@@@@@@@@@+. .#@@@@@-. + :@@@@@@@@@@@@@@@@@@@@@@@@- =@@%. +.=@@@@@@@@@@@@@@@@@@@@@@@#. .-=. +"#; diff --git a/src/console/console.rs b/src/console/console.rs new file mode 100644 index 0000000..ef5ad69 --- /dev/null +++ b/src/console/console.rs @@ -0,0 +1,287 @@ +use crate::config::ConsoleMode; +use crossterm::{ + event::{self, Event, KeyCode, KeyEventKind, KeyModifiers}, + execute, + terminal::{disable_raw_mode, enable_raw_mode}, +}; +use ratatui::{backend::CrosstermBackend, Terminal}; +use std::io::{self, stdout, Write}; +use tokio::sync::mpsc; + +use super::render::render_console; +use super::state::{ConsoleCommand, ConsoleState}; +use super::status::Status; + +#[derive(Clone)] +pub struct Console { + sender: Option>, + mode: ConsoleMode, +} + +impl Console { + pub fn new( + server_address: String, + redis_address: String, + connections: usize, + max_retry: i32, + version: String, + mode: ConsoleMode, + ) -> io::Result<(Self, Option>)> { + match mode { + ConsoleMode::Rich => { + // Clear screen first + print!("\x1B[2J\x1B[1;1H"); + io::stdout().flush().unwrap(); + + // Enable raw mode + enable_raw_mode()?; + + // Create channel for commands + let (sender, mut receiver) = mpsc::unbounded_channel(); + + // Create channel for shutdown signal (from keyboard input) + let (shutdown_sender, shutdown_receiver) = mpsc::unbounded_channel(); + let shutdown_sender_for_thread = shutdown_sender.clone(); + + // Create initial state + let mut state = ConsoleState::new( + server_address, + redis_address, + connections, + max_retry, + version, + ); + + // Initialize terminal in a blocking task + let stdout = stdout(); + let backend = CrosstermBackend::new(stdout); + let mut terminal = Terminal::new(backend)?; + + // Spawn rendering task that runs in a blocking thread + tokio::task::spawn_blocking(move || { + loop { + // Check for keyboard input (non-blocking) + if event::poll(std::time::Duration::from_millis(10)).unwrap_or(false) { + if let Ok(Event::Key(key_event)) = event::read() { + // Check for Ctrl+C + if key_event.kind == KeyEventKind::Press { + match key_event.code { + KeyCode::Char('c') + if key_event + .modifiers + .contains(KeyModifiers::CONTROL) => + { + // Ctrl+C detected - send shutdown signal + let _ = shutdown_sender_for_thread.send(()); + break; + } + KeyCode::Char('C') + if key_event + .modifiers + .contains(KeyModifiers::CONTROL) => + { + // Ctrl+C detected (uppercase) + let _ = shutdown_sender_for_thread.send(()); + break; + } + _ => {} + } + } + } + } + + // Try to receive all pending commands + let mut should_render = false; + + // Process all available commands + while let Ok(cmd) = receiver.try_recv() { + match cmd { + ConsoleCommand::UpdateServerStatus(status) => { + state.server_status = status; + should_render = true; + } + ConsoleCommand::UpdateRedisStatus(status) => { + state.redis_status = status; + should_render = true; + } + } + } + + // Render if we have updates + if should_render { + if let Err(e) = terminal.draw(|f| { + render_console(f, &state, &ConsoleMode::Rich); + }) { + tracing::error!("Failed to render console: {}", e); + break; + } + } + + // Small sleep to avoid busy-waiting + std::thread::sleep(std::time::Duration::from_millis(50)); + } + }); + + Ok(( + Self { + sender: Some(sender), + mode, + }, + Some(shutdown_receiver), + )) + } + ConsoleMode::Standard => { + // Standard mode: no console UI, just log to tracing + tracing::info!("Starting server in standard mode"); + tracing::info!("Server: {}", server_address); + tracing::info!("Redis: {}", redis_address); + tracing::info!("Max connections: {}", connections); + let retry_display = if max_retry == -1 { + "unlimited".to_string() + } else { + max_retry.to_string() + }; + tracing::info!("Max retry: {}", retry_display); + tracing::info!("Version: {}", version); + + Ok((Self { sender: None, mode }, None)) + } + } + } + + pub fn update_server_status(&self, status: Status) -> io::Result<()> { + match &self.mode { + ConsoleMode::Rich => { + if let Some(ref sender) = self.sender { + sender + .send(ConsoleCommand::UpdateServerStatus(status)) + .map_err(|e| { + io::Error::new( + io::ErrorKind::Other, + format!("Failed to send command: {}", e), + ) + })?; + } + } + ConsoleMode::Standard => { + tracing::info!("Server status: {}", status.text()); + } + } + Ok(()) + } + + pub fn update_redis_status(&self, status: Status) -> io::Result<()> { + match &self.mode { + ConsoleMode::Rich => { + if let Some(ref sender) = self.sender { + sender + .send(ConsoleCommand::UpdateRedisStatus(status)) + .map_err(|e| { + io::Error::new( + io::ErrorKind::Other, + format!("Failed to send command: {}", e), + ) + })?; + } + } + ConsoleMode::Standard => { + tracing::info!("Redis status: {}", status.text()); + } + } + Ok(()) + } + + pub fn log_info(&self, message: String) -> io::Result<()> { + match &self.mode { + ConsoleMode::Rich => { + // In rich mode, logs are ignored (not displayed) + } + ConsoleMode::Standard => { + tracing::info!("{}", message); + } + } + Ok(()) + } + + pub fn log_error(&self, message: String) -> io::Result<()> { + match &self.mode { + ConsoleMode::Rich => { + // In rich mode, logs are ignored (not displayed) + } + ConsoleMode::Standard => { + tracing::error!("{}", message); + } + } + Ok(()) + } + + pub fn log_warn(&self, message: String) -> io::Result<()> { + match &self.mode { + ConsoleMode::Rich => { + // In rich mode, logs are ignored (not displayed) + } + ConsoleMode::Standard => { + tracing::warn!("{}", message); + } + } + Ok(()) + } + + #[allow(dead_code)] + pub fn log_debug(&self, message: String) -> io::Result<()> { + match &self.mode { + ConsoleMode::Rich => { + // In rich mode, logs are ignored (not displayed) + } + ConsoleMode::Standard => { + tracing::debug!("{}", message); + } + } + Ok(()) + } + + #[allow(dead_code)] + pub fn mode(&self) -> &ConsoleMode { + &self.mode + } + + pub fn cleanup(&self) -> io::Result<()> { + match &self.mode { + ConsoleMode::Rich => { + // Restore terminal state - this must happen immediately + // First disable raw mode to restore normal terminal behavior + // This will stop ^C from being displayed + if let Err(e) = disable_raw_mode() { + // If disable_raw_mode fails, try to continue anyway + eprintln!("Warning: Failed to disable raw mode: {}", e); + } + + // Then show cursor and restore terminal + if let Err(e) = execute!(io::stdout(), crossterm::cursor::Show,) { + eprintln!("Warning: Failed to show cursor: {}", e); + } + + // Clear any pending input that might show ^C + // Note: In raw mode, stdin might not be readable, so we just flush + // The important thing is that raw mode is disabled first + + // Flush stdout to ensure all output is written + let _ = io::stdout().flush(); + + // Clear the screen and move cursor to top-left + print!("\x1B[2J\x1B[1;1H"); + let _ = io::stdout().flush(); + } + ConsoleMode::Standard => { + // Nothing to clean up in standard mode + } + } + Ok(()) + } +} + +impl Drop for Console { + fn drop(&mut self) { + let _ = self.cleanup(); + } +} diff --git a/src/console/mod.rs b/src/console/mod.rs new file mode 100644 index 0000000..da6862b --- /dev/null +++ b/src/console/mod.rs @@ -0,0 +1,8 @@ +pub mod banner; +pub mod console; +pub mod render; +pub mod state; +pub mod status; + +pub use console::Console; +pub use status::Status; diff --git a/src/console/render.rs b/src/console/render.rs new file mode 100644 index 0000000..a3a5fab --- /dev/null +++ b/src/console/render.rs @@ -0,0 +1,204 @@ +use super::banner::BANNER; +use super::state::ConsoleState; +use crate::config::ConsoleMode; +use ratatui::{ + layout::{Alignment, Constraint, Layout, Rect}, + style::{Color, Modifier, Style}, + text::{Line, Span}, + widgets::Paragraph, + Frame, +}; + +pub fn render_console(f: &mut Frame, state: &ConsoleState, mode: &ConsoleMode) { + let area = f.size(); + + // Calculate banner height + let banner_height = 17; + + // In rich mode, don't show logs section + let vertical = match mode { + ConsoleMode::Rich => Layout::vertical([ + Constraint::Length(banner_height), + Constraint::Length(1), + Constraint::Length(2), + Constraint::Length(4), + ]) + .split(area), + ConsoleMode::Standard => { + // This shouldn't be called in standard mode, but just in case + Layout::vertical([ + Constraint::Length(banner_height), + Constraint::Length(1), + Constraint::Length(2), + Constraint::Length(4), + ]) + .split(area) + } + }; + + // Render banner + render_banner(f, vertical[0], &state.version); + + // Status section + let status_area = vertical[2]; + let status_layout = + Layout::vertical([Constraint::Length(1), Constraint::Length(1)]).split(status_area); + + // Server status + let server_label = "Server Status "; + let server_value = state.server_status.text(); + let server_line = Line::from(vec![ + Span::styled(server_label, Style::default()), + Span::styled(server_value, state.server_status.style()), + ]); + let server_para = Paragraph::new(server_line).alignment(Alignment::Left); + let server_area = Rect { + x: status_layout[0].x + 2, + y: status_layout[0].y, + width: status_layout[0].width.saturating_sub(2), + height: status_layout[0].height, + }; + f.render_widget(server_para, server_area); + + // Redis status + let redis_label = "Redis Status "; + let redis_value = state.redis_status.text(); + let redis_line = Line::from(vec![ + Span::styled(redis_label, Style::default()), + Span::styled(redis_value, state.redis_status.style()), + ]); + let redis_para = Paragraph::new(redis_line).alignment(Alignment::Left); + let redis_area = Rect { + x: status_layout[1].x + 2, + y: status_layout[1].y, + width: status_layout[1].width.saturating_sub(2), + height: status_layout[1].height, + }; + f.render_widget(redis_para, redis_area); + + // Config section + let config_area = vertical[3]; + let config_layout = Layout::vertical([ + Constraint::Length(1), + Constraint::Length(1), + Constraint::Length(1), + Constraint::Length(1), + ]) + .split(config_area); + + // Server config + let server_config_label = "Listen Address "; + let server_config_line = Line::from(vec![ + Span::styled(server_config_label, Style::default().fg(Color::Gray)), + Span::styled( + state.server_address.clone(), + Style::default().fg(Color::DarkGray), + ), + ]); + let server_config = Paragraph::new(server_config_line).alignment(Alignment::Left); + let server_config_area = Rect { + x: config_layout[0].x + 2, + y: config_layout[0].y, + width: config_layout[0].width.saturating_sub(2), + height: config_layout[0].height, + }; + f.render_widget(server_config, server_config_area); + + // Redis config + let redis_config_label = "Redis Endpoint "; + let redis_config_line = Line::from(vec![ + Span::styled(redis_config_label, Style::default().fg(Color::Gray)), + Span::styled( + state.redis_address.clone(), + Style::default().fg(Color::DarkGray), + ), + ]); + let redis_config = Paragraph::new(redis_config_line).alignment(Alignment::Left); + let redis_config_area = Rect { + x: config_layout[1].x + 2, + y: config_layout[1].y, + width: config_layout[1].width.saturating_sub(2), + height: config_layout[1].height, + }; + f.render_widget(redis_config, redis_config_area); + + // Connections config + let connections_config_label = "Pool Size "; + let connections_config_line = Line::from(vec![ + Span::styled(connections_config_label, Style::default().fg(Color::Gray)), + Span::styled( + state.connections.to_string(), + Style::default().fg(Color::DarkGray), + ), + ]); + let connections_config = Paragraph::new(connections_config_line).alignment(Alignment::Left); + let connections_config_area = Rect { + x: config_layout[2].x + 2, + y: config_layout[2].y, + width: config_layout[2].width.saturating_sub(2), + height: config_layout[2].height, + }; + f.render_widget(connections_config, connections_config_area); + + // Max retry config + let max_retry_display = if state.max_retry == -1 { + "unlimited".to_string() + } else { + state.max_retry.to_string() + }; + let max_retry_config_label = "Max Retries "; + let max_retry_config_line = Line::from(vec![ + Span::styled(max_retry_config_label, Style::default().fg(Color::Gray)), + Span::styled(max_retry_display, Style::default().fg(Color::DarkGray)), + ]); + let max_retry_config = Paragraph::new(max_retry_config_line).alignment(Alignment::Left); + let max_retry_config_area = Rect { + x: config_layout[3].x + 2, + y: config_layout[3].y, + width: config_layout[3].width.saturating_sub(2), + height: config_layout[3].height, + }; + f.render_widget(max_retry_config, max_retry_config_area); +} + +pub fn render_banner(f: &mut Frame, area: Rect, version: &str) { + let lines: Vec<&str> = BANNER.lines().collect(); + let mut displayed_lines = Vec::new(); + + for line in lines.iter() { + if !line.trim().is_empty() { + displayed_lines.push(*line); + } + } + + let banner_layout = Layout::vertical([ + Constraint::Length(3), + Constraint::Length(displayed_lines.len() as u16), + Constraint::Length(1), + Constraint::Length(1), + ]) + .split(area); + + // Render banner ASCII art + for (i, line) in displayed_lines.iter().enumerate() { + if i < banner_layout[1].height as usize { + let para = Paragraph::new(*line).style( + Style::default() + .fg(Color::Cyan) + .add_modifier(Modifier::BOLD), + ); + let line_area = Rect { + x: banner_layout[1].x, + y: banner_layout[1].y + i as u16, + width: banner_layout[1].width, + height: 1, + }; + f.render_widget(para, line_area); + } + } + + // Render version + let version_text = format!(" v{}", version); + let version_para = Paragraph::new(version_text).style(Style::default().fg(Color::DarkGray)); + f.render_widget(version_para, banner_layout[3]); +} diff --git a/src/console/state.rs b/src/console/state.rs new file mode 100644 index 0000000..19b4881 --- /dev/null +++ b/src/console/state.rs @@ -0,0 +1,36 @@ +use super::status::Status; + +pub enum ConsoleCommand { + UpdateServerStatus(Status), + UpdateRedisStatus(Status), +} + +pub struct ConsoleState { + pub server_status: Status, + pub redis_status: Status, + pub server_address: String, + pub redis_address: String, + pub connections: usize, + pub max_retry: i32, + pub version: String, +} + +impl ConsoleState { + pub fn new( + server_address: String, + redis_address: String, + connections: usize, + max_retry: i32, + version: String, + ) -> Self { + Self { + server_status: Status::Starting, + redis_status: Status::Establishing, + server_address, + redis_address, + connections, + max_retry, + version, + } + } +} diff --git a/src/console/status.rs b/src/console/status.rs new file mode 100644 index 0000000..5fdc274 --- /dev/null +++ b/src/console/status.rs @@ -0,0 +1,58 @@ +use ratatui::style::{Color, Modifier, Style}; + +pub enum Status { + Connected, + Ready, + Starting, + Establishing, + #[allow(dead_code)] + Disconnected, + Reconnecting, + BindError, + ConnectionError, +} + +impl Status { + pub fn text(&self) -> &'static str { + match self { + Status::Connected => "CONNECTED", + Status::Ready => "READY", + Status::Starting => "STARTING", + Status::Establishing => "ESTABLISHING CONNECTION", + Status::Disconnected => "DISCONNECTED", + Status::Reconnecting => "RECONNECTING", + Status::BindError => "BIND ERROR", + Status::ConnectionError => "CONNECTION ERROR", + } + } + + pub fn color(&self) -> Color { + match self { + Status::Connected => Color::Green, + Status::Ready => Color::Green, + Status::Starting => Color::Gray, + Status::Establishing => Color::DarkGray, + Status::Disconnected => Color::Yellow, + Status::Reconnecting => Color::Gray, + Status::BindError => Color::Red, + Status::ConnectionError => Color::Red, + } + } + + pub fn style(&self) -> Style { + Style::default().fg(self.color()).add_modifier( + if matches!( + self, + Status::Connected + | Status::Ready + | Status::Disconnected + | Status::BindError + | Status::ConnectionError + ) { + Modifier::BOLD + } else { + Modifier::empty() + }, + ) + } +} diff --git a/src/handlers/command.rs b/src/handlers/command.rs index 79338df..4381ff9 100644 --- a/src/handlers/command.rs +++ b/src/handlers/command.rs @@ -1,6 +1,6 @@ -use crate::encoding::encode_result; -use crate::errors::AppError; -use crate::redis_client::RedisPool; +use crate::client::RedisPool; +use crate::utils::encoding::encode_result; +use crate::utils::AppError; use axum::extract::State; use axum::Json; use serde_json::{json, Value}; diff --git a/src/handlers/pipeline.rs b/src/handlers/pipeline.rs index 103cc6b..6388956 100644 --- a/src/handlers/pipeline.rs +++ b/src/handlers/pipeline.rs @@ -1,7 +1,7 @@ -use crate::encoding::encode_result; -use crate::errors::AppError; +use crate::client::RedisPool; use crate::handlers::command::redis_value_to_json; -use crate::redis_client::RedisPool; +use crate::utils::encoding::encode_result; +use crate::utils::AppError; use axum::extract::State; use axum::Json; use serde_json::{json, Value}; diff --git a/src/handlers/transaction.rs b/src/handlers/transaction.rs index a1fabd2..7db3714 100644 --- a/src/handlers/transaction.rs +++ b/src/handlers/transaction.rs @@ -1,10 +1,12 @@ -use crate::encoding::encode_result; -use crate::errors::AppError; +use crate::client::RedisPool; use crate::handlers::command::redis_value_to_json; -use crate::redis_client::RedisPool; +use crate::utils::encoding::encode_result; +use crate::utils::AppError; use axum::extract::State; use axum::Json; use serde_json::{json, Value}; +use std::time::Duration; +use tokio::time::sleep; pub async fn handle_transaction_internal( State(pool): State, @@ -37,70 +39,138 @@ pub async fn handle_transaction_internal( } } - let mut conn = pool.get_connection().await.map_err(AppError::Redis)?; + // Transactions need retry logic too, but we must retry the entire transaction + let max_retry = pool.max_retry(); + let mut attempt = 1u32; - // Execute MULTI - let multi_result: Result = - redis::cmd("MULTI").query_async(&mut conn).await; + loop { + match pool.get_connection().await { + Ok(mut conn) => { + // Execute MULTI + match redis::cmd("MULTI") + .query_async::<_, String>(&mut conn) + .await + { + Ok(_) => { + // Execute all commands in transaction + let mut transaction_failed = false; + for cmd_array in command_arrays { + let cmd_args: Vec = cmd_array + .as_array() + .unwrap() + .iter() + .map(|v| match v { + Value::String(s) => s.clone(), + Value::Number(n) => n.to_string(), + Value::Bool(b) => b.to_string(), + Value::Null => "".to_string(), + _ => v.to_string(), + }) + .collect(); - match multi_result { - Ok(_) => { - // Execute all commands in transaction - for cmd_array in command_arrays { - let cmd_args: Vec = cmd_array - .as_array() - .unwrap() - .iter() - .map(|v| match v { - Value::String(s) => s.clone(), - Value::Number(n) => n.to_string(), - Value::Bool(b) => b.to_string(), - Value::Null => "".to_string(), - _ => v.to_string(), - }) - .collect(); + if cmd_args.is_empty() { + continue; + } - if cmd_args.is_empty() { - continue; - } - - let mut cmd = redis::cmd(&cmd_args[0]); - for arg in cmd_args.iter().skip(1) { - cmd.arg(arg); - } + let mut cmd = redis::cmd(&cmd_args[0]); + for arg in cmd_args.iter().skip(1) { + cmd.arg(arg); + } - let _: String = cmd.query_async(&mut conn).await.map_err(AppError::Redis)?; - } + match cmd.query_async::<_, String>(&mut conn).await { + Ok(_) => {} + Err(e) => { + // Check if it's a connection error + if matches!(e.kind(), redis::ErrorKind::IoError) { + transaction_failed = true; + break; + } + // For other errors during transaction, continue + } + } + } - // Execute EXEC - let exec_result: Result, redis::RedisError> = - redis::cmd("EXEC").query_async(&mut conn).await; + if transaction_failed { + if max_retry == -1 || attempt < max_retry as u32 { + attempt += 1; + sleep(Duration::from_secs(1) * (attempt - 1)).await; + continue; + } else { + break; + } + } - match exec_result { - Ok(redis_values) => { - let responses: Vec = redis_values - .into_iter() - .map(|v| { - let json_value = redis_value_to_json(v); - json!({ "result": json_value }) - }) - .collect(); + // Execute EXEC + match redis::cmd("EXEC") + .query_async::<_, Vec>(&mut conn) + .await + { + Ok(redis_values) => { + let responses: Vec = redis_values + .into_iter() + .map(|v| { + let json_value = redis_value_to_json(v); + json!({ "result": json_value }) + }) + .collect(); - let response = Value::Array(responses); - let encoded = encode_result(&response, encoding_enabled); - Ok(Json(encoded)) + let response = Value::Array(responses); + let encoded = encode_result(&response, encoding_enabled); + return Ok(Json(encoded)); + } + Err(e) => { + // Check if it's a connection error + if matches!(e.kind(), redis::ErrorKind::IoError) { + if max_retry == -1 || attempt < max_retry as u32 { + attempt += 1; + sleep(Duration::from_secs(1) * (attempt - 1)).await; + continue; + } else { + break; + } + } + let error_msg = e.to_string(); + let response = json!({ "error": error_msg }); + return Ok(Json(response)); + } + } + } + Err(e) => { + // Check if it's a connection error + if matches!(e.kind(), redis::ErrorKind::IoError) { + if max_retry == -1 || attempt < max_retry as u32 { + attempt += 1; + sleep(Duration::from_secs(1) * (attempt - 1)).await; + continue; + } else { + break; + } + } + let error_msg = AppError::Redis(e).to_string(); + let response = json!({ "error": error_msg }); + return Ok(Json(response)); + } } - Err(e) => { - let error_msg = e.to_string(); - let response = json!({ "error": error_msg }); - Ok(Json(response)) + } + Err(e) => { + // Connection error, retry + if matches!(e.kind(), redis::ErrorKind::IoError) { + if max_retry == -1 || attempt < max_retry as u32 { + attempt += 1; + sleep(Duration::from_secs(1) * (attempt - 1)).await; + continue; + } else { + break; + } } + let error_msg = AppError::Redis(e).to_string(); + let response = json!({ "error": error_msg }); + return Ok(Json(response)); } } - Err(e) => { - let error_msg = AppError::Redis(e).to_string(); - let response = json!({ "error": error_msg }); - Ok(Json(response)) - } } + + // Fallback error + let response = json!({ "error": "Transaction failed after retries" }); + Ok(Json(response)) } diff --git a/src/http/mod.rs b/src/http/mod.rs new file mode 100644 index 0000000..f2ec6ff --- /dev/null +++ b/src/http/mod.rs @@ -0,0 +1,2 @@ +pub mod routes; +pub mod server; diff --git a/src/routes.rs b/src/http/routes.rs similarity index 95% rename from src/routes.rs rename to src/http/routes.rs index 3fd1286..f65407a 100644 --- a/src/routes.rs +++ b/src/http/routes.rs @@ -1,8 +1,8 @@ -use crate::auth::{check_encoding_header, extract_bearer_token, validate_token}; +use crate::client::RedisPool; use crate::config::Config; -use crate::errors::AppError; use crate::handlers::{command, pipeline, transaction}; -use crate::redis_client::RedisPool; +use crate::utils::auth::{check_encoding_header, extract_bearer_token, validate_token}; +use crate::utils::AppError; use axum::{ body::Body, extract::{Request, State}, diff --git a/src/server.rs b/src/http/server.rs similarity index 69% rename from src/server.rs rename to src/http/server.rs index f0437ec..d0a0cfe 100644 --- a/src/server.rs +++ b/src/http/server.rs @@ -1,17 +1,16 @@ use crate::config::Config; -use crate::errors::AppError; +use crate::utils::AppError; use axum::Router; use tokio::net::TcpListener; use tower_http::{cors::CorsLayer, limit::RequestBodyLimitLayer}; +#[allow(dead_code)] pub async fn start_server(router: Router, config: &Config) -> Result<(), AppError> { let addr = config.server_address(); let listener = TcpListener::bind(&addr) .await .map_err(|e| AppError::ServerError(format!("Failed to bind to {}: {}", addr, e)))?; - tracing::info!("Server listening on {}", addr); - let app = router .layer(CorsLayer::permissive()) .layer(RequestBodyLimitLayer::new(10 * 1024 * 1024)); // 10MB limit @@ -22,3 +21,10 @@ pub async fn start_server(router: Router, config: &Config) -> Result<(), AppErro Ok(()) } + +pub async fn bind_server(config: &Config) -> Result { + let addr = config.server_address(); + TcpListener::bind(&addr) + .await + .map_err(|e| AppError::ServerError(format!("Failed to bind to {}: {}", addr, e))) +} diff --git a/src/main.rs b/src/main.rs index 9b323b1..1780bc7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,75 +1,209 @@ -mod auth; +mod client; mod config; -mod encoding; -mod errors; +mod console; mod handlers; -mod redis_client; -mod routes; -mod server; +mod http; +mod utils; +use client::RedisPool; use config::Config; -use errors::AppError; -use redis_client::RedisPool; -use routes::create_router; -use server::start_server; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; +use http::routes::create_router; +use http::server::bind_server; +use std::sync::Arc; +use tracing_subscriber::{ + fmt::format::{DefaultFields, Format}, + layer::SubscriberExt, + util::SubscriberInitExt, + EnvFilter, +}; +use utils::AppError; const VERSION: &str = env!("CARGO_PKG_VERSION"); #[tokio::main] async fn main() -> Result<(), Box> { - // Initialize tracing subscriber with custom format + // Initialize tracing subscriber + let format = Format::default() + .with_target(false) + .with_thread_ids(false) + .with_thread_names(false) + .with_line_number(false) + .with_file(false) + .with_timer(tracing_subscriber::fmt::time::SystemTime); + tracing_subscriber::registry() .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"))) .with( tracing_subscriber::fmt::layer() - .with_target(true) - .with_thread_ids(false) - .with_thread_names(false) - .with_line_number(false) - .with_file(false), + .event_format(format) + .fmt_fields(DefaultFields::new()), ) .init(); - // Load configuration - let config = Config::from_env().map_err(|e| { - eprintln!("Configuration error: {}", e); - e - })?; - - // Print startup banner - tracing::info!("Starting stashless v{}", VERSION); - tracing::info!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"); - tracing::info!("Configuration:"); - tracing::info!( - " Redis URL: redis://{}:{}", - config.redis_host, - config.redis_port - ); - tracing::info!(" Redis Password: not set"); - tracing::info!(" Bearer Token: {}", config.masked_token()); - tracing::info!(" Max Connections: {}", config.max_connections); - tracing::info!(" Server Address: {}", config.server_address()); - tracing::info!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"); - - // Initialize Redis connection pool - tracing::info!("Connecting to Redis at {}...", config.redis_url()); - let pool = RedisPool::new(&config).await.map_err(|e| { - let msg = format!("Failed to connect to Redis: {}", e); - tracing::error!("{}", msg); - AppError::ConnectionError(msg) - })?; - - tracing::info!( - "Redis connection pool initialized with {} max connections", - config.max_connections - ); + // Load configuration first to get console mode + let config = match Config::from_env() { + Ok(config) => config, + Err(e) => { + eprintln!("Configuration error: {}", e); + eprintln!("Please set SLASHLESS_TOKEN environment variable"); + std::process::exit(1); + } + }; + + // Create console with initial state (banner is integrated in console) + // Console::new now returns both the console and an optional shutdown receiver + let (console, shutdown_receiver) = console::Console::new( + config.server_address(), + format!("{}:{}", config.redis_host, config.redis_port), + config.max_connections, + config.max_retry, + VERSION.to_string(), + config.console_mode.clone(), + ) + .map_err(|e| format!("Failed to initialize console: {}", e))?; + + // Initial render + console.update_server_status(console::Status::Starting)?; + console.update_redis_status(console::Status::Establishing)?; + console.log_info("Initializing server".to_string())?; + + // Bind server (with error handling) + let listener = match bind_server(&config).await { + Ok(listener) => { + console.update_server_status(console::Status::Ready)?; + console.log_info(format!("Server bound to {}", config.server_address()))?; + listener + } + Err(e) => { + console.update_server_status(console::Status::BindError)?; + let error_msg = e.to_string(); + console.log_error(format!("Failed to bind to server: {}", error_msg.clone()))?; + let _ = console.cleanup(); + std::process::exit(1); + } + }; + + // Initialize Redis connection pool (with error handling) + console.log_info(format!( + "Connecting to Redis at {}:{}", + config.redis_host, config.redis_port + ))?; + + // Console is now Clone-safe (uses channel internally) + let console_arc = Arc::new(console.clone()); + let pool = match RedisPool::new_with_console(&config, Some(console_arc.clone())).await { + Ok(pool) => { + let _ = console.update_redis_status(console::Status::Connected); + let _ = console.log_info("Redis connection established".to_string()); + pool + } + Err(e) => { + let _ = console.update_redis_status(console::Status::ConnectionError); + let error_msg = format!("Failed to connect to Redis: {}", e); + let _ = console.log_error(error_msg); + let _ = console.cleanup(); + std::process::exit(1); + } + }; // Create router - let router = create_router(pool, config.clone()); + let router = create_router(pool.clone(), config.clone()); + let _ = console.log_info("Router created".to_string()); + + // Start Redis healthcheck task - ping every 2 seconds + let pool_for_healthcheck = pool.clone(); + let console_for_healthcheck = console.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(2)); // 2 seconds + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + interval.tick().await; + + // Ping Redis with retry (max 3 attempts) + match pool_for_healthcheck.ping_with_retry().await { + Ok(pong) => { + tracing::debug!("Redis PING successful: {}", pong); + // Connection is OK, no need to log every time + } + Err(e) => { + // All retry attempts failed - connection is lost + tracing::error!("Redis healthcheck failed after all retries: {}", e); + if let Err(log_err) = console_for_healthcheck + .log_error(format!("Redis healthcheck failed: {}", e)) + { + tracing::error!("Failed to log healthcheck error: {}", log_err); + } + } + } + } + }); + + // Start serving (this will block until server stops) + let _ = console.log_info("Starting HTTP server".to_string()); + let app = router + .layer(tower_http::cors::CorsLayer::permissive()) + .layer(tower_http::limit::RequestBodyLimitLayer::new( + 10 * 1024 * 1024, + )); + + // Handle shutdown signal + let console_for_shutdown = console.clone(); + let shutdown_signal: std::pin::Pin + Send>> = + match config.console_mode { + config::ConsoleMode::Rich => { + // In rich mode, wait for Ctrl+C from the console's keyboard input handler + if let Some(mut receiver) = shutdown_receiver { + Box::pin(async move { + // Wait for Ctrl+C signal from the console thread + // The console thread will detect Ctrl+C and send a signal + let _ = receiver.recv().await; + // Cleanup the terminal when shutdown is triggered (this clears the screen) + let _ = console_for_shutdown.cleanup(); + // Message is displayed after cleanup clears the screen + println!("Shutting down gracefully..."); + }) + } else { + // This should never happen, but handle it gracefully + Box::pin(async move { + tokio::signal::ctrl_c() + .await + .expect("Failed to install Ctrl+C handler"); + // Cleanup the terminal when shutdown is triggered (this clears the screen) + let _ = console_for_shutdown.cleanup(); + // Message is displayed after cleanup clears the screen + println!("Shutting down gracefully..."); + }) + } + } + config::ConsoleMode::Standard => { + // In standard mode, use standard tokio signal + Box::pin(async move { + tokio::signal::ctrl_c() + .await + .expect("Failed to install Ctrl+C handler"); + let _ = + console_for_shutdown.log_warn("Shutting down gracefully...".to_string()); + let _ = console_for_shutdown.cleanup(); + }) + } + }; + + // Start server with graceful shutdown + let server_result = axum::serve(listener, app) + .with_graceful_shutdown(shutdown_signal) + .await + .map_err(|e| { + let _ = console.log_error(format!("Server error: {}", e)); + AppError::ServerError(format!("Server error: {}", e)) + }); + + // Always cleanup console to restore terminal state + if matches!(config.console_mode, config::ConsoleMode::Rich) { + let _ = console.cleanup(); + } - // Start server - start_server(router, &config).await?; + server_result?; Ok(()) } diff --git a/src/models/mod.rs b/src/models/mod.rs new file mode 100644 index 0000000..f774521 --- /dev/null +++ b/src/models/mod.rs @@ -0,0 +1,11 @@ +pub mod models; + +pub use models::{ + CommandRequest, CommandResponse, HealthResponse, PipelineRequest, PipelineResponse, +}; + +#[cfg(test)] +mod tests { + pub use super::models::tests; +} + diff --git a/src/models.rs b/src/models/models.rs similarity index 99% rename from src/models.rs rename to src/models/models.rs index ee01ab6..6da3248 100644 --- a/src/models.rs +++ b/src/models/models.rs @@ -75,3 +75,4 @@ mod tests { assert!(parsed["error"].is_string()); } } + diff --git a/src/pool_manager.rs b/src/pool_manager.rs deleted file mode 100644 index 4596aab..0000000 --- a/src/pool_manager.rs +++ /dev/null @@ -1,80 +0,0 @@ -use crate::connection_pool::{ConnectionPool, start_idle_cleanup_task}; -use crate::error::AppError; -use crate::token_resolver::TokenResolver; -use dashmap::DashMap; -use std::sync::Arc; -use tokio::task::JoinHandle; - -pub struct PoolManager { - pools: Arc>>, - token_resolver: Arc, - cleanup_handle: Option>, -} - -impl PoolManager { - pub fn new(token_resolver: Arc) -> Self { - let pools = Arc::new(DashMap::new()); - - // Démarrer la tâche de nettoyage des pools inactifs - let pools_clone = pools.clone(); - let cleanup_handle = Some(tokio::spawn(async move { - start_idle_cleanup_task(pools_clone).await; - })); - - Self { - pools, - token_resolver, - cleanup_handle, - } - } - - /// Lookup or start pattern : récupère un pool existant ou en crée un nouveau - pub async fn lookup_or_start(&self, token: &str) -> Result, AppError> { - // Résoudre le token vers une configuration - let token_config = self.token_resolver.resolve(token).await?; - - let srh_id = token_config.srh_id.clone(); - - // Vérifier si le pool existe déjà - if let Some(pool) = self.pools.get(&srh_id) { - return Ok(pool.clone()); - } - - // Créer un nouveau pool (lazy initialization) - let pool = Arc::new(ConnectionPool::new(token_config)); - - // Insérer dans le registry (avec gestion de race condition) - // Si un autre thread a créé le pool entre-temps, utiliser celui existant - match self.pools.entry(srh_id.clone()) { - dashmap::mapref::entry::Entry::Occupied(entry) => { - Ok(entry.get().clone()) - } - dashmap::mapref::entry::Entry::Vacant(entry) => { - entry.insert(pool.clone()); - tracing::info!("Created new pool for srh_id: {}", srh_id); - Ok(pool) - } - } - } - - pub async fn get_pool(&self, srh_id: &str) -> Option> { - self.pools.get(srh_id).map(|entry| entry.clone()) - } - - pub async fn destroy_pool(&self, srh_id: &str) { - if let Some((_, pool)) = self.pools.remove(srh_id) { - pool.destroy().await; - tracing::info!("Destroyed pool {}", srh_id); - } - } -} - -impl Drop for PoolManager { - fn drop(&mut self) { - // Arrêter la tâche de nettoyage - if let Some(handle) = self.cleanup_handle.take() { - handle.abort(); - } - } -} - diff --git a/src/redis_client.rs b/src/redis_client.rs deleted file mode 100644 index ac670e9..0000000 --- a/src/redis_client.rs +++ /dev/null @@ -1,57 +0,0 @@ -use crate::config::Config; -use redis::aio::ConnectionManager; -use redis::{Client, RedisError}; -use std::sync::Arc; -use tokio::sync::Semaphore; - -#[derive(Clone)] -pub struct RedisPool { - client: Arc, - semaphore: Arc, -} - -impl RedisPool { - pub async fn new(config: &Config) -> Result { - let url = config.redis_url(); - let client = Client::open(url)?; - - // Test connection - let mut conn = client.get_connection_manager().await?; - redis::cmd("PING") - .query_async::<_, String>(&mut conn) - .await?; - - Ok(Self { - client: Arc::new(client), - semaphore: Arc::new(Semaphore::new(config.max_connections)), - }) - } - - pub async fn get_connection(&self) -> Result { - // Acquire permit from semaphore to limit concurrent connections - let _permit = self.semaphore.acquire().await.map_err(|_| { - RedisError::from(( - redis::ErrorKind::IoError, - "Failed to acquire connection permit", - )) - })?; - - self.client.get_connection_manager().await - } - - pub async fn execute_command( - &self, - cmd: redis::Cmd, - ) -> Result { - let mut conn = self.get_connection().await?; - cmd.query_async(&mut conn).await - } - - pub async fn execute_pipeline( - &self, - pipeline: &mut redis::Pipeline, - ) -> Result, RedisError> { - let mut conn = self.get_connection().await?; - pipeline.query_async(&mut conn).await - } -} diff --git a/src/auth.rs b/src/utils/auth.rs similarity index 97% rename from src/auth.rs rename to src/utils/auth.rs index 8dee770..e726585 100644 --- a/src/auth.rs +++ b/src/utils/auth.rs @@ -1,5 +1,5 @@ +use super::AppError; use crate::config::Config; -use crate::errors::AppError; use axum::extract::Request; pub fn extract_bearer_token(request: &Request) -> Result { diff --git a/src/encoding.rs b/src/utils/encoding.rs similarity index 100% rename from src/encoding.rs rename to src/utils/encoding.rs diff --git a/src/errors.rs b/src/utils/errors.rs similarity index 98% rename from src/errors.rs rename to src/utils/errors.rs index a9398d0..2348bdd 100644 --- a/src/errors.rs +++ b/src/utils/errors.rs @@ -15,6 +15,7 @@ pub enum AppError { #[error("Malformed request: {0}")] MalformedRequest(String), + #[allow(dead_code)] #[error("Connection error: {0}")] ConnectionError(String), diff --git a/src/utils/mod.rs b/src/utils/mod.rs new file mode 100644 index 0000000..abffef6 --- /dev/null +++ b/src/utils/mod.rs @@ -0,0 +1,5 @@ +pub mod auth; +pub mod encoding; +pub mod errors; + +pub use errors::AppError; From e4165031603d635e0d28e85d457d1e048be06c65 Mon Sep 17 00:00:00 2001 From: Ely Delva Date: Thu, 6 Nov 2025 05:28:27 +0100 Subject: [PATCH 2/3] docs: enhance README with new configuration options and console modes Add instructions for creating a new configuration using a .env file, detailing environment variables for Redis and server settings. Introduce console modes with usage examples, improving clarity for users. Update existing sections for better organization and readability. --- README.md | 48 +++++++++++++++++++++++++++++++++++++----- src/client/mod.rs | 2 +- src/console/console.rs | 10 ++------- src/console/mod.rs | 1 + 4 files changed, 47 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 414e892..b5a1da9 100644 --- a/README.md +++ b/README.md @@ -47,10 +47,6 @@ docker run -d \ That's it! The API is available at `http://localhost:3000`. -**Available Docker image tags:** -- `latest` - Latest build from main branch -- `0.1.0` (or current version) - Specific version tag -- All images are automatically built and pushed to [GitHub Container Registry](https://github.com/thebackslashs/slashless/pkgs/container/stashless) on every push to main ### Docker Compose @@ -65,7 +61,6 @@ export SLASHLESS_TOKEN=your-secret-token-here docker-compose up -d ``` - ### From Source ```bash @@ -86,6 +81,30 @@ export SLASHLESS_TOKEN=your-secret-token All configuration is done via environment variables. Only `SLASHLESS_TOKEN` is required. +### Creating a New Configuration + +To create a new configuration, you can use a `.env` file in your project root: + +```bash +# Generate a secure token first (or use stashless generate-token) +# Then create a .env file: + +cat > .env << EOF +SLASHLESS_TOKEN=your-secret-token-here +SLASHLESS_REDIS_HOST=127.0.0.1 +SLASHLESS_REDIS_PORT=6379 +SLASHLESS_HOST=0.0.0.0 +SLASHLESS_PORT=3000 +SLASHLESS_MAX_CONNECTION=3 +SLASHLESS_MAX_RETRY=-1 +SLASHLESS_MODE=standard +EOF +``` + +Or use the `stashless generate-token` command to generate a secure token, then add it to your `.env` file. + +### Configuration Variables + | Variable | Default | Description | |----------|---------|-------------| | `SLASHLESS_REDIS_HOST` | `127.0.0.1` | Redis host | @@ -94,6 +113,25 @@ All configuration is done via environment variables. Only `SLASHLESS_TOKEN` is r | `SLASHLESS_PORT` | `3000` | HTTP port | | `SLASHLESS_TOKEN` | **Required** | Bearer token for auth | | `SLASHLESS_MAX_CONNECTION` | `3` | Connection pool size | +| `SLASHLESS_MAX_RETRY` | `-1` | Maximum Redis connection retry attempts (-1 for unlimited) | +| `SLASHLESS_MODE` | `standard` | Console display mode (`standard` or `rich`) | + +### Console Modes + +Stashless supports two console display modes: + +- **`standard`** (default): Simple text-based output with standard logging. Best for production environments, CI/CD pipelines, and when running in the background. +- **`rich`**: Enhanced terminal UI with real-time status updates, colored output, and interactive console. Best for local development and monitoring. + +Set the mode using the `SLASHLESS_MODE` environment variable: + +```bash +# Standard mode (default) +export SLASHLESS_MODE=standard + +# Rich mode +export SLASHLESS_MODE=rich +``` You can also use a `.env` file - just export it before running. diff --git a/src/client/mod.rs b/src/client/mod.rs index f814a12..4fd1214 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -207,7 +207,7 @@ impl RedisPool { // Log to console occasionally to show activity (every 10th connection) static CONNECTION_COUNT: AtomicU32 = AtomicU32::new(0); let count = CONNECTION_COUNT.fetch_add(1, Ordering::Relaxed); - if count % 10 == 0 && count > 0 { + if count.is_multiple_of(10) && count > 0 { self.log_connection_success(); } Ok(conn) diff --git a/src/console/console.rs b/src/console/console.rs index ef5ad69..7dc3c02 100644 --- a/src/console/console.rs +++ b/src/console/console.rs @@ -156,10 +156,7 @@ impl Console { sender .send(ConsoleCommand::UpdateServerStatus(status)) .map_err(|e| { - io::Error::new( - io::ErrorKind::Other, - format!("Failed to send command: {}", e), - ) + io::Error::other(format!("Failed to send command: {}", e)) })?; } } @@ -177,10 +174,7 @@ impl Console { sender .send(ConsoleCommand::UpdateRedisStatus(status)) .map_err(|e| { - io::Error::new( - io::ErrorKind::Other, - format!("Failed to send command: {}", e), - ) + io::Error::other(format!("Failed to send command: {}", e)) })?; } } diff --git a/src/console/mod.rs b/src/console/mod.rs index da6862b..bb25ab9 100644 --- a/src/console/mod.rs +++ b/src/console/mod.rs @@ -1,4 +1,5 @@ pub mod banner; +#[allow(clippy::module_inception)] pub mod console; pub mod render; pub mod state; From e933c2a2102f5ff134dfb867dd8cd76263ab1b6d Mon Sep 17 00:00:00 2001 From: Ely Delva Date: Thu, 6 Nov 2025 05:34:14 +0100 Subject: [PATCH 3/3] fix: format --- src/console/console.rs | 8 ++------ src/models/models.rs | 1 + 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/src/console/console.rs b/src/console/console.rs index 7dc3c02..8a27839 100644 --- a/src/console/console.rs +++ b/src/console/console.rs @@ -155,9 +155,7 @@ impl Console { if let Some(ref sender) = self.sender { sender .send(ConsoleCommand::UpdateServerStatus(status)) - .map_err(|e| { - io::Error::other(format!("Failed to send command: {}", e)) - })?; + .map_err(|e| io::Error::other(format!("Failed to send command: {}", e)))?; } } ConsoleMode::Standard => { @@ -173,9 +171,7 @@ impl Console { if let Some(ref sender) = self.sender { sender .send(ConsoleCommand::UpdateRedisStatus(status)) - .map_err(|e| { - io::Error::other(format!("Failed to send command: {}", e)) - })?; + .map_err(|e| io::Error::other(format!("Failed to send command: {}", e)))?; } } ConsoleMode::Standard => { diff --git a/src/models/models.rs b/src/models/models.rs index 6da3248..ad9783f 100644 --- a/src/models/models.rs +++ b/src/models/models.rs @@ -76,3 +76,4 @@ mod tests { } } +