diff --git a/envoy/redis/async_client.h b/envoy/redis/async_client.h index 98e8d88ebc00b..a565d2610b88e 100644 --- a/envoy/redis/async_client.h +++ b/envoy/redis/async_client.h @@ -1,10 +1,11 @@ #pragma once #include +#include +#include #include #include #include -#include namespace Envoy { @@ -20,17 +21,39 @@ struct AsyncClientConfig { AsyncClientConfig(std::string&& username, std::string&& password, int op_timeout_milliseconds, std::map&& params) : auth_username_(std::move(username)), auth_password_(std::move(password)), - op_timeout_(op_timeout_milliseconds), buffer_flush_timeout_(3), params_(std::move(params)) { + op_timeout_(op_timeout_milliseconds), + max_buffer_size_before_flush_(parseUint32FromParams(params, "max_buffer_size_before_flush", 1024)), + buffer_flush_timeout_(parseUint32FromParams(params, "buffer_flush_timeout", 3)), + params_(std::move(params)) { } + const std::string auth_username_; const std::string auth_password_; const std::chrono::milliseconds op_timeout_; - const uint32_t max_buffer_size_before_flush_{1024}; + const uint32_t max_buffer_size_before_flush_; const std::chrono::milliseconds buffer_flush_timeout_; const uint32_t max_upstream_unknown_connections_{100}; const bool enable_command_stats_{false}; const std::map params_; + +private: + // Helper function to parse uint32 from params map with default value + static uint32_t parseUint32FromParams(const std::map& params, + const std::string& key, uint32_t default_value) { + auto it = params.find(key); + if (it != params.end()) { + try { + unsigned long value = std::stoul(it->second); + if (value <= std::numeric_limits::max()) { + return static_cast(value); + } + } catch (const std::exception&) { + // If parsing fails, return default value + } + } + return default_value; + } }; /** diff --git a/test/extensions/filters/http/wasm/test_data/test_redis_call_cpp.cc b/test/extensions/filters/http/wasm/test_data/test_redis_call_cpp.cc index 63f7d406fcb6a..c2c0c132f2292 100644 --- a/test/extensions/filters/http/wasm/test_data/test_redis_call_cpp.cc +++ b/test/extensions/filters/http/wasm/test_data/test_redis_call_cpp.cc @@ -30,7 +30,8 @@ static RegisterContextFactory register_RedisCallContext(CONTEXT_FACTORY(RedisCal "redis_call"); bool RedisCallRootContext::onConfigure(size_t) { - redisInit("cluster?db=1", "admin", "123456", 1000); + // Test with buffer configuration parameters + redisInit("cluster?db=1&buffer_flush_timeout=1&max_buffer_size_before_flush=512", "admin", "123456", 1000); return true; } @@ -52,14 +53,16 @@ FilterHeadersStatus RedisCallContext::onRequestHeaders(uint32_t, bool) { auto query = "*3\r\n$3\r\nset\r\n$2\r\nid\r\n$1\r\n1\r\n"; auto path = getRequestHeader(":path"); if (path->view() == "/bad") { - if (root()->redisCall("cluster?db=1", query, callback) != WasmResult::Ok) { + // Test with different buffer params on runtime call + if (root()->redisCall("cluster?db=1&buffer_flush_timeout=2", query, callback) != WasmResult::Ok) { logInfo("redis_call rejected"); } } else { if (root()->redisCall("bogus cluster", query, callback) == WasmResult::Ok) { logError("bogus cluster found error"); } - root()->redisCall("cluster?db=1", query, callback); + // Test with buffer params in query string + root()->redisCall("cluster?db=1&buffer_flush_timeout=1&max_buffer_size_before_flush=512", query, callback); logInfo("onRequestHeaders"); } diff --git a/test/extensions/filters/http/wasm/wasm_filter_test.cc b/test/extensions/filters/http/wasm/wasm_filter_test.cc index a985af3113d36..2404c980ba1fb 100644 --- a/test/extensions/filters/http/wasm/wasm_filter_test.cc +++ b/test/extensions/filters/http/wasm/wasm_filter_test.cc @@ -1,4 +1,5 @@ #include "envoy/grpc/async_client.h" +#include "envoy/redis/async_client.h" #include "source/common/http/message_impl.h" #include "source/extensions/filters/http/wasm/wasm_filter.h" @@ -795,6 +796,115 @@ TEST_P(WasmHttpFilterTest, RedisCall) { EXPECT_NE(callbacks, nullptr); } +#if defined(HIGRESS) +// Unit test for AsyncClientConfig parameter parsing +TEST(RedisAsyncClientConfigTest, ParseBufferParamsFromQueryString) { + // Test with all parameters specified + { + std::map params = { + {"db", "1"}, + {"buffer_flush_timeout", "5"}, + {"max_buffer_size_before_flush", "2048"} + }; + Redis::AsyncClientConfig config("testuser", "testpass", 1000, std::move(params)); + + EXPECT_EQ(config.auth_username_, "testuser"); + EXPECT_EQ(config.auth_password_, "testpass"); + EXPECT_EQ(config.op_timeout_.count(), 1000); + EXPECT_EQ(config.buffer_flush_timeout_.count(), 5); + EXPECT_EQ(config.max_buffer_size_before_flush_, 2048); + EXPECT_EQ(config.params_.at("db"), "1"); + } + + // Test with only buffer_flush_timeout specified (max_buffer uses default) + { + std::map params = { + {"buffer_flush_timeout", "1"} + }; + Redis::AsyncClientConfig config("admin", "123456", 2000, std::move(params)); + + EXPECT_EQ(config.buffer_flush_timeout_.count(), 1); + EXPECT_EQ(config.max_buffer_size_before_flush_, 1024); // default value + } + + // Test with only max_buffer_size_before_flush specified (timeout uses default) + { + std::map params = { + {"max_buffer_size_before_flush", "512"} + }; + Redis::AsyncClientConfig config("admin", "123456", 2000, std::move(params)); + + EXPECT_EQ(config.buffer_flush_timeout_.count(), 3); // default value + EXPECT_EQ(config.max_buffer_size_before_flush_, 512); + } + + // Test with no buffer params (both use defaults) + { + std::map params = { + {"db", "0"} + }; + Redis::AsyncClientConfig config("user", "pass", 500, std::move(params)); + + EXPECT_EQ(config.buffer_flush_timeout_.count(), 3); // default 3ms + EXPECT_EQ(config.max_buffer_size_before_flush_, 1024); // default 1024 bytes + } + + // Test with invalid buffer_flush_timeout (should use default) + { + std::map params = { + {"buffer_flush_timeout", "invalid_number"} + }; + Redis::AsyncClientConfig config("user", "pass", 500, std::move(params)); + + EXPECT_EQ(config.buffer_flush_timeout_.count(), 3); // default due to parse error + } + + // Test with invalid max_buffer_size_before_flush (should use default) + { + std::map params = { + {"max_buffer_size_before_flush", "not_a_number"} + }; + Redis::AsyncClientConfig config("user", "pass", 500, std::move(params)); + + EXPECT_EQ(config.max_buffer_size_before_flush_, 1024); // default due to parse error + } + + // Test with zero values (edge case - disable buffering) + { + std::map params = { + {"buffer_flush_timeout", "0"}, + {"max_buffer_size_before_flush", "0"} + }; + Redis::AsyncClientConfig config("user", "pass", 500, std::move(params)); + + EXPECT_EQ(config.buffer_flush_timeout_.count(), 0); + EXPECT_EQ(config.max_buffer_size_before_flush_, 0); + } + + // Test with very large values (within uint32 range) + { + std::map params = { + {"buffer_flush_timeout", "10000"}, + {"max_buffer_size_before_flush", "1048576"} // 1MB + }; + Redis::AsyncClientConfig config("user", "pass", 500, std::move(params)); + + EXPECT_EQ(config.buffer_flush_timeout_.count(), 10000); + EXPECT_EQ(config.max_buffer_size_before_flush_, 1048576); + } + + // Test with value exceeding uint32 max (should use default) + { + std::map params = { + {"max_buffer_size_before_flush", "99999999999999"} // exceeds uint32::max + }; + Redis::AsyncClientConfig config("user", "pass", 500, std::move(params)); + + EXPECT_EQ(config.max_buffer_size_before_flush_, 1024); // default due to overflow + } +} +#endif + TEST_P(WasmHttpFilterTest, DisableClearRouteCache) { if (std::get<1>(GetParam()) == "rust") { // This feature is not supported in rust test code