Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions envoy/redis/async_client.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
#pragma once

#include <chrono>
#include <limits>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <map>

namespace Envoy {

Expand All @@ -20,17 +21,39 @@ struct AsyncClientConfig {
AsyncClientConfig(std::string&& username, std::string&& password, int op_timeout_milliseconds,
std::map<std::string, std::string>&& params)
: auth_username_(std::move(username)), auth_password_(std::move(password)),
op_timeout_(op_timeout_milliseconds), buffer_flush_timeout_(3), params_(std::move(params)) {
op_timeout_(op_timeout_milliseconds),
max_buffer_size_before_flush_(parseUint32FromParams(params, "max_buffer_size_before_flush", 1024)),
buffer_flush_timeout_(parseUint32FromParams(params, "buffer_flush_timeout", 3)),
params_(std::move(params)) {
}

const std::string auth_username_;
const std::string auth_password_;

const std::chrono::milliseconds op_timeout_;
const uint32_t max_buffer_size_before_flush_{1024};
const uint32_t max_buffer_size_before_flush_;
const std::chrono::milliseconds buffer_flush_timeout_;
const uint32_t max_upstream_unknown_connections_{100};
const bool enable_command_stats_{false};
const std::map<std::string, std::string> params_;

private:
// Helper function to parse uint32 from params map with default value
static uint32_t parseUint32FromParams(const std::map<std::string, std::string>& params,
const std::string& key, uint32_t default_value) {
auto it = params.find(key);
if (it != params.end()) {
try {
unsigned long value = std::stoul(it->second);
if (value <= std::numeric_limits<uint32_t>::max()) {
return static_cast<uint32_t>(value);
}
} catch (const std::exception&) {
// If parsing fails, return default value
}
}
return default_value;
}
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ static RegisterContextFactory register_RedisCallContext(CONTEXT_FACTORY(RedisCal
"redis_call");

bool RedisCallRootContext::onConfigure(size_t) {
redisInit("cluster?db=1", "admin", "123456", 1000);
// Test with buffer configuration parameters
redisInit("cluster?db=1&buffer_flush_timeout=1&max_buffer_size_before_flush=512", "admin", "123456", 1000);
return true;
}

Expand All @@ -52,14 +53,16 @@ FilterHeadersStatus RedisCallContext::onRequestHeaders(uint32_t, bool) {
auto query = "*3\r\n$3\r\nset\r\n$2\r\nid\r\n$1\r\n1\r\n";
auto path = getRequestHeader(":path");
if (path->view() == "/bad") {
if (root()->redisCall("cluster?db=1", query, callback) != WasmResult::Ok) {
// Test with different buffer params on runtime call
if (root()->redisCall("cluster?db=1&buffer_flush_timeout=2", query, callback) != WasmResult::Ok) {
logInfo("redis_call rejected");
}
} else {
if (root()->redisCall("bogus cluster", query, callback) == WasmResult::Ok) {
logError("bogus cluster found error");
}
root()->redisCall("cluster?db=1", query, callback);
// Test with buffer params in query string
root()->redisCall("cluster?db=1&buffer_flush_timeout=1&max_buffer_size_before_flush=512", query, callback);
logInfo("onRequestHeaders");
}

Expand Down
110 changes: 110 additions & 0 deletions test/extensions/filters/http/wasm/wasm_filter_test.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "envoy/grpc/async_client.h"
#include "envoy/redis/async_client.h"

#include "source/common/http/message_impl.h"
#include "source/extensions/filters/http/wasm/wasm_filter.h"
Expand Down Expand Up @@ -795,6 +796,115 @@ TEST_P(WasmHttpFilterTest, RedisCall) {
EXPECT_NE(callbacks, nullptr);
}

#if defined(HIGRESS)
// Unit test for AsyncClientConfig parameter parsing
TEST(RedisAsyncClientConfigTest, ParseBufferParamsFromQueryString) {
// Test with all parameters specified
{
std::map<std::string, std::string> params = {
{"db", "1"},
{"buffer_flush_timeout", "5"},
{"max_buffer_size_before_flush", "2048"}
};
Redis::AsyncClientConfig config("testuser", "testpass", 1000, std::move(params));

EXPECT_EQ(config.auth_username_, "testuser");
EXPECT_EQ(config.auth_password_, "testpass");
EXPECT_EQ(config.op_timeout_.count(), 1000);
EXPECT_EQ(config.buffer_flush_timeout_.count(), 5);
EXPECT_EQ(config.max_buffer_size_before_flush_, 2048);
EXPECT_EQ(config.params_.at("db"), "1");
}

// Test with only buffer_flush_timeout specified (max_buffer uses default)
{
std::map<std::string, std::string> params = {
{"buffer_flush_timeout", "1"}
};
Redis::AsyncClientConfig config("admin", "123456", 2000, std::move(params));

EXPECT_EQ(config.buffer_flush_timeout_.count(), 1);
EXPECT_EQ(config.max_buffer_size_before_flush_, 1024); // default value
}

// Test with only max_buffer_size_before_flush specified (timeout uses default)
{
std::map<std::string, std::string> params = {
{"max_buffer_size_before_flush", "512"}
};
Redis::AsyncClientConfig config("admin", "123456", 2000, std::move(params));

EXPECT_EQ(config.buffer_flush_timeout_.count(), 3); // default value
EXPECT_EQ(config.max_buffer_size_before_flush_, 512);
}

// Test with no buffer params (both use defaults)
{
std::map<std::string, std::string> params = {
{"db", "0"}
};
Redis::AsyncClientConfig config("user", "pass", 500, std::move(params));

EXPECT_EQ(config.buffer_flush_timeout_.count(), 3); // default 3ms
EXPECT_EQ(config.max_buffer_size_before_flush_, 1024); // default 1024 bytes
}

// Test with invalid buffer_flush_timeout (should use default)
{
std::map<std::string, std::string> params = {
{"buffer_flush_timeout", "invalid_number"}
};
Redis::AsyncClientConfig config("user", "pass", 500, std::move(params));

EXPECT_EQ(config.buffer_flush_timeout_.count(), 3); // default due to parse error
}

// Test with invalid max_buffer_size_before_flush (should use default)
{
std::map<std::string, std::string> params = {
{"max_buffer_size_before_flush", "not_a_number"}
};
Redis::AsyncClientConfig config("user", "pass", 500, std::move(params));

EXPECT_EQ(config.max_buffer_size_before_flush_, 1024); // default due to parse error
}

// Test with zero values (edge case - disable buffering)
{
std::map<std::string, std::string> params = {
{"buffer_flush_timeout", "0"},
{"max_buffer_size_before_flush", "0"}
};
Redis::AsyncClientConfig config("user", "pass", 500, std::move(params));

EXPECT_EQ(config.buffer_flush_timeout_.count(), 0);
EXPECT_EQ(config.max_buffer_size_before_flush_, 0);
}

// Test with very large values (within uint32 range)
{
std::map<std::string, std::string> params = {
{"buffer_flush_timeout", "10000"},
{"max_buffer_size_before_flush", "1048576"} // 1MB
};
Redis::AsyncClientConfig config("user", "pass", 500, std::move(params));

EXPECT_EQ(config.buffer_flush_timeout_.count(), 10000);
EXPECT_EQ(config.max_buffer_size_before_flush_, 1048576);
}

// Test with value exceeding uint32 max (should use default)
{
std::map<std::string, std::string> params = {
{"max_buffer_size_before_flush", "99999999999999"} // exceeds uint32::max
};
Redis::AsyncClientConfig config("user", "pass", 500, std::move(params));

EXPECT_EQ(config.max_buffer_size_before_flush_, 1024); // default due to overflow
}
}
#endif

TEST_P(WasmHttpFilterTest, DisableClearRouteCache) {
if (std::get<1>(GetParam()) == "rust") {
// This feature is not supported in rust test code
Expand Down