Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
462 changes: 462 additions & 0 deletions bazel/protobuf_hash_cache.patch

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion bazel/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,8 @@ def _com_google_protobuf():

external_http_archive(
"com_google_protobuf",
patches = ["@envoy//bazel:protobuf.patch"],
patches = ["@envoy//bazel:protobuf.patch",
"@envoy//bazel:protobuf_hash_cache.patch"],
patch_args = ["-p1"],
)

Expand Down
1 change: 1 addition & 0 deletions envoy/config/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ using SubscriptionPtr = std::unique_ptr<Subscription>;
COUNTER(update_failure) \
COUNTER(update_rejected) \
COUNTER(update_success) \
GAUGE(last_update_success, NeverImport) \
GAUGE(update_time, NeverImport) \
GAUGE(version, NeverImport) \
HISTOGRAM(update_duration, Milliseconds) \
Expand Down
4 changes: 2 additions & 2 deletions source/common/http/conn_manager_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
namespace Envoy {
namespace Http {

#if defined(ALIMESH)
#if defined(HIGRESS)
#define HIGRESS_EXT_HTTP_CONN_MAN_STATS(COUNTER, GAUGE, HISTOGRAM) \
COUNTER(downstream_rq_retry_scope_found_total) \
COUNTER(downstream_rq_retry_scope_not_found_total)
Expand Down Expand Up @@ -98,7 +98,7 @@ namespace Http {
*/
struct ConnectionManagerNamedStats {
ALL_HTTP_CONN_MAN_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT, GENERATE_HISTOGRAM_STRUCT)
#if defined(ALIMESH)
#if defined(HIGRESS)
HIGRESS_EXT_HTTP_CONN_MAN_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT,
GENERATE_HISTOGRAM_STRUCT)
#endif
Expand Down
9 changes: 8 additions & 1 deletion source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ bool requestWasConnect(const RequestHeaderMapSharedPtr& headers, Protocol protoc
ConnectionManagerStats ConnectionManagerImpl::generateStats(const std::string& prefix,
Stats::Scope& scope) {
return ConnectionManagerStats(
#if defined(ALIMESH)
#if defined(HIGRESS)
{ALL_HTTP_CONN_MAN_STATS(POOL_COUNTER_PREFIX(scope, prefix), POOL_GAUGE_PREFIX(scope, prefix),
POOL_HISTOGRAM_PREFIX(scope, prefix))
HIGRESS_EXT_HTTP_CONN_MAN_STATS(POOL_COUNTER_PREFIX(scope, prefix),
Expand Down Expand Up @@ -2228,6 +2228,8 @@ void ConnectionManagerImpl::ActiveStream::recreateStream(
// Prevent the stream from being used through the commonContinue process of
// ActiveStreamDecoderFilter or ActiveStreamEncoderFilter.
filter_manager_.interruptContinue();
const auto& original_remote_address =
filter_manager_.streamInfo().downstreamAddressProvider().remoteAddress();
#else
const auto& buffered_request_data = filter_manager_.bufferedRequestData();
const bool proxy_body = buffered_request_data != nullptr && buffered_request_data->length() > 0;
Expand All @@ -2246,6 +2248,11 @@ void ConnectionManagerImpl::ActiveStream::recreateStream(

RequestDecoder& new_stream = connection_manager_.newStream(*response_encoder, true);

#if defined(HIGRESS)
auto& active_stream = static_cast<ActiveStream&>(new_stream);
active_stream.filter_manager_.setDownstreamRemoteAddress(original_remote_address);
#endif

// Set the new RequestDecoder on the ResponseEncoder. Even though all of the decoder callbacks
// have already been called at this point, the encoder still needs the new decoder for deferred
// logging in some cases.
Expand Down
20 changes: 20 additions & 0 deletions source/common/http/filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,16 @@ void ActiveStreamDecoderFilter::injectDecodedDataToFilterChain(Buffer::Instance&
headers_continued_ = true;
doHeaders(false);
}
#if defined(HIGRESS)
// Fix: When injecting data with end_stream=true, we must set remote_decode_complete_ flag
// to ensure subsequent filter chain iterations (e.g., via commonContinue) correctly recognize
// the stream is complete. Without this, if a downstream filter returns StopIteration and later
// resumes via continueDecoding()->commonContinue()->doData(), the complete() check would
// incorrectly return false, causing end_stream state inconsistency across the filter chain.
if (end_stream) {
parent_.state_.remote_decode_complete_ = true;
}
#endif
parent_.decodeData(this, data, end_stream,
FilterManager::FilterIterationStartState::CanStartFromCurrent);
}
Expand Down Expand Up @@ -1679,6 +1689,16 @@ void ActiveStreamEncoderFilter::injectEncodedDataToFilterChain(Buffer::Instance&
headers_continued_ = true;
doHeaders(false);
}
#if defined(HIGRESS)
// Fix: When injecting data with end_stream=true, we must set local_complete_ flag to ensure
// subsequent filter chain iterations (e.g., via commonContinue) correctly recognize the stream
// is complete. Without this, if a downstream filter returns StopIteration and later resumes
// via continueEncoding()->commonContinue()->doData(), the complete() check would incorrectly
// return false, causing end_stream state inconsistency across the filter chain.
if (end_stream) {
parent_.state_.local_complete_ = true;
}
#endif
parent_.encodeData(this, data, end_stream,
FilterManager::FilterIterationStartState::CanStartFromCurrent);
}
Expand Down
13 changes: 13 additions & 0 deletions source/common/protobuf/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,19 @@ class MessageUtil {
static std::string sanitizeUtf8String(absl::string_view str);
};

#if defined(HIGRESS) && defined(ENVOY_ENABLE_FULL_PROTOS)
class HashCachedMessageUtil : public MessageUtil {
public:
bool operator()(const Protobuf::Message& message) const { return message.GetCachedHashValue(); }
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Hash Functor Returns Wrong Type, Catastrophic Collisions

The hash functor operator() in HashCachedMessageUtil returns bool instead of std::size_t. When used as the hash template parameter in absl::node_hash_map and absl::flat_hash_map, this causes the hash value to be implicitly converted to bool (0 or 1), resulting in catastrophic hash collisions where all messages hash to either 0 or 1, breaking hash map functionality.

Fix in Cursor Fix in Web


bool operator()(const Protobuf::Message& lhs, const Protobuf::Message& rhs) const {
return lhs.GetCachedHashValue() == rhs.GetCachedHashValue();
}

static std::size_t hash(const Protobuf::Message& message) { return message.GetCachedHashValue(); }
};
#endif

class ValueUtil {
public:
static std::size_t hash(const ProtobufWkt::Value& value) { return MessageUtil::hash(value); }
Expand Down
5 changes: 4 additions & 1 deletion source/common/rds/route_config_update_receiver_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ class RouteConfigUpdateReceiverImpl : public RouteConfigUpdateReceiver {
public:
RouteConfigUpdateReceiverImpl(ConfigTraits& config_traits, ProtoTraits& proto_traits,
Server::Configuration::ServerFactoryContext& factory_context);

#if defined(HIGRESS) && defined(ENVOY_ENABLE_FULL_PROTOS)
uint64_t getHash(const Protobuf::Message& rc) const { return HashCachedMessageUtil::hash(rc); }
#else
uint64_t getHash(const Protobuf::Message& rc) const { return MessageUtil::hash(rc); }
#endif
bool checkHash(uint64_t new_hash) const { return (new_hash != last_config_hash_); }
void updateHash(uint64_t hash) { last_config_hash_ = hash; }
void updateConfig(std::unique_ptr<Protobuf::Message>&& route_config_proto);
Expand Down
41 changes: 40 additions & 1 deletion source/extensions/common/wasm/context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ constexpr std::string_view ClearRouteCacheKey = "clear_route_cache";
constexpr std::string_view DisableClearRouteCache = "off";
constexpr std::string_view SetDecoderBufferLimit = "set_decoder_buffer_limit";
constexpr std::string_view SetEncoderBufferLimit = "set_encoder_buffer_limit";
constexpr std::string_view WasmRebuildKey = "wasm_need_rebuild";

bool stringViewToUint32(std::string_view str, uint32_t& out_value) {
try {
Expand Down Expand Up @@ -455,10 +456,17 @@ WasmResult serializeValue(Filters::Common::Expr::CelValue value, std::string* re
return WasmResult::SerializationFailure;
}

#if defined(HIGRESS)
#define PROPERTY_TOKENS(_f) \
_f(NODE) _f(LISTENER_DIRECTION) _f(LISTENER_METADATA) _f(CLUSTER_NAME) _f(CLUSTER_METADATA) \
_f(ROUTE_NAME) _f(ROUTE_METADATA) _f(PLUGIN_NAME) _f(UPSTREAM_HOST_METADATA) \
_f(PLUGIN_ROOT_ID) _f(PLUGIN_VM_ID) _f(PLUGIN_VM_MEMORY) _f(CONNECTION_ID)
#else
#define PROPERTY_TOKENS(_f) \
_f(NODE) _f(LISTENER_DIRECTION) _f(LISTENER_METADATA) _f(CLUSTER_NAME) _f(CLUSTER_METADATA) \
_f(ROUTE_NAME) _f(ROUTE_METADATA) _f(PLUGIN_NAME) _f(UPSTREAM_HOST_METADATA) \
_f(PLUGIN_ROOT_ID) _f(PLUGIN_VM_ID) _f(CONNECTION_ID)
#endif

static inline std::string downCase(std::string s) {
std::transform(s.begin(), s.end(), s.begin(), [](unsigned char c) { return std::tolower(c); });
Expand Down Expand Up @@ -613,6 +621,13 @@ Context::findValue(absl::string_view name, Protobuf::Arena* arena, bool last) co
return CelValue::CreateStringView(toAbslStringView(root_id()));
case PropertyToken::PLUGIN_VM_ID:
return CelValue::CreateStringView(toAbslStringView(wasm()->vm_id()));
#if defined(HIGRESS)
case PropertyToken::PLUGIN_VM_MEMORY:
if (wasm() && wasm()->wasm_vm()) {
return CelValue::CreateUint64(wasm()->wasm_vm()->getMemorySize());
}
break;
#endif
}
return {};
}
Expand Down Expand Up @@ -720,12 +735,30 @@ Http::HeaderMap* Context::getMap(WasmHeaderMapType type) {
}

const Http::HeaderMap* Context::getConstMap(WasmHeaderMapType type) {
#if defined(HIGRESS)
const StreamInfo::StreamInfo* request_stream_info = getConstRequestStreamInfo();
#endif
switch (type) {
case WasmHeaderMapType::RequestHeaders:
if (access_log_phase_) {
return access_log_request_headers_;
}
#if defined(HIGRESS)
// Fallback mechanism for retrieving request headers:
// 1. First try the cached request_headers_ pointer (most common case)
// 2. If null, attempt to retrieve from StreamInfo (e.g., after internal redirects or
// when headers are stored in stream info but not directly cached)
// 3. Return nullptr if both sources are unavailable
if (request_headers_ != nullptr) {
return request_headers_;
}
if (request_stream_info == nullptr) {
return nullptr;
}
return request_stream_info->getRequestHeaders();
#else
return request_headers_;
#endif
case WasmHeaderMapType::RequestTrailers:
if (access_log_phase_) {
return nullptr;
Expand Down Expand Up @@ -1354,7 +1387,13 @@ WasmResult Context::setProperty(std::string_view path, std::string_view value) {
prototype.life_span_);
}
#if defined(HIGRESS)
if (path == ClearRouteCacheKey) {
if (path == WasmRebuildKey) {
if (wasm_) {
wasm_->setShouldRebuild(true);
ENVOY_LOG(debug, "Wasm rebuild flag set by plugin");
}
return WasmResult::Ok;
} else if (path == ClearRouteCacheKey) {
disable_clear_route_cache_ = value == DisableClearRouteCache;
} else if (path == SetDecoderBufferLimit && decoder_callbacks_) {
uint32_t buffer_limit;
Expand Down
1 change: 1 addition & 0 deletions source/extensions/common/wasm/stats_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ struct CreateWasmStats {
COUNTER(created) \
GAUGE(active, NeverImport) \
PLUGIN_COUNTER(recover_total) \
PLUGIN_COUNTER(rebuild_total) \
PLUGIN_COUNTER(crash_total) \
PLUGIN_COUNTER(recover_error) \
PLUGIN_GAUGE(crash, NeverImport)
Expand Down
20 changes: 13 additions & 7 deletions source/extensions/common/wasm/wasm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ WasmEvent failStateToWasmEvent(FailState state) {
PANIC("corrupt enum");
}

const int MIN_RECOVER_INTERVAL_SECONDS = 5;
const int MIN_RECOVER_INTERVAL_SECONDS = 1;
#endif

} // namespace
Expand Down Expand Up @@ -195,7 +195,7 @@ Wasm::~Wasm() {
}

#if defined(HIGRESS)
bool PluginHandleSharedPtrThreadLocal::recover() {
bool PluginHandleSharedPtrThreadLocal::rebuild(bool is_fail_recovery) {
if (handle_ == nullptr || handle_->wasmHandle() == nullptr ||
handle_->wasmHandle()->wasm() == nullptr) {
ENVOY_LOG(warn, "wasm has not been initialized");
Expand All @@ -204,16 +204,22 @@ bool PluginHandleSharedPtrThreadLocal::recover() {
auto& dispatcher = handle_->wasmHandle()->wasm()->dispatcher();
auto now = dispatcher.timeSource().monotonicTime() + cache_time_offset_for_testing;
if (now - last_recover_time_ < std::chrono::seconds(MIN_RECOVER_INTERVAL_SECONDS)) {
ENVOY_LOG(debug, "recover interval has not been reached");
ENVOY_LOG(info, "rebuild interval has not been reached");
return false;
}
// Even if recovery fails, it will be retried after the interval
// Even if rebuild fails, it will be retried after the interval
last_recover_time_ = now;
std::shared_ptr<PluginHandleBase> new_handle;
if (handle_->doRecover(new_handle)) {
if (handle_->rebuild(new_handle)) {
handle_ = std::static_pointer_cast<PluginHandle>(new_handle);
handle_->wasmHandle()->wasm()->lifecycleStats().recover_total_.inc();
ENVOY_LOG(info, "wasm vm recover from crash success");
// Increment appropriate metrics based on rebuild type
if (is_fail_recovery) {
handle_->wasmHandle()->wasm()->lifecycleStats().recover_total_.inc();
ENVOY_LOG(info, "wasm vm recover from crash success");
} else {
handle_->wasmHandle()->wasm()->lifecycleStats().rebuild_total_.inc();
ENVOY_LOG(info, "wasm vm rebuild success");
}
return true;
}
return false;
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/common/wasm/wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class PluginHandleSharedPtrThreadLocal : public ThreadLocal::ThreadLocalObject,
public Logger::Loggable<Logger::Id::wasm> {
public:
PluginHandleSharedPtrThreadLocal(PluginHandleSharedPtr handle) : handle_(handle){};
bool recover();
bool rebuild(bool is_fail_recovery = false);
#else
class PluginHandleSharedPtrThreadLocal : public ThreadLocal::ThreadLocalObject {
public:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ void GrpcSubscriptionImpl::onConfigUpdate(const std::vector<Config::DecodedResou
std::chrono::milliseconds update_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
dispatcher_.timeSource().monotonicTime() - start);
stats_.update_success_.inc();
#ifdef ALIMESH
stats_.last_update_success_.set(true);
#endif
stats_.update_attempt_.inc();
stats_.update_time_.set(DateUtil::nowToMilliseconds(dispatcher_.timeSource()));
stats_.version_.set(HashUtil::xxHash64(version_info));
Expand All @@ -101,6 +104,9 @@ void GrpcSubscriptionImpl::onConfigUpdate(
std::chrono::milliseconds update_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
dispatcher_.timeSource().monotonicTime() - start);
stats_.update_success_.inc();
#ifdef ALIMESH
stats_.last_update_success_.set(true);
#endif
stats_.update_time_.set(DateUtil::nowToMilliseconds(dispatcher_.timeSource()));
stats_.version_.set(HashUtil::xxHash64(system_version_info));
stats_.version_text_.set(system_version_info);
Expand All @@ -112,10 +118,16 @@ void GrpcSubscriptionImpl::onConfigUpdateFailed(ConfigUpdateFailureReason reason
switch (reason) {
case Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure:
stats_.update_failure_.inc();
#ifdef ALIMESH
stats_.last_update_success_.set(false);
#endif
ENVOY_LOG(debug, "gRPC update for {} failed", type_url_);
break;
case Envoy::Config::ConfigUpdateFailureReason::FetchTimedout:
stats_.init_fetch_timeout_.inc();
#ifdef ALIMESH
stats_.last_update_success_.set(false);
#endif
disableInitFetchTimeoutTimer();
ENVOY_LOG(warn, "gRPC config: initial fetch timed out for {}", type_url_);
callbacks_.onConfigUpdateFailed(reason, e);
Expand All @@ -125,6 +137,9 @@ void GrpcSubscriptionImpl::onConfigUpdateFailed(ConfigUpdateFailureReason reason
ASSERT(e != nullptr);
disableInitFetchTimeoutTimer();
stats_.update_rejected_.inc();
#ifdef ALIMESH
stats_.last_update_success_.set(false);
#endif
ENVOY_LOG(warn, "gRPC config for {} rejected: {}", type_url_, e->what());
callbacks_.onConfigUpdateFailed(reason, e);
break;
Expand Down
13 changes: 12 additions & 1 deletion source/extensions/filters/http/wasm/wasm_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,25 @@ class FilterConfig : Logger::Loggable<Logger::Id::wasm> {
failed = true;
} else if (wasm->isFailed()) {
ENVOY_LOG(info, "wasm vm is crashed, try to recover");
if (opt_ref->recover()) {
if (opt_ref->rebuild(true)) {
ENVOY_LOG(info, "wasm vm recover success");
wasm = opt_ref->handle()->wasmHandle()->wasm().get();
handle = opt_ref->handle();
} else {
ENVOY_LOG(info, "wasm vm recover failed");
failed = true;
}
} else if (wasm->shouldRebuild()) {
ENVOY_LOG(info, "wasm vm requested rebuild, try to rebuild");
if (opt_ref->rebuild(false)) {
ENVOY_LOG(info, "wasm vm rebuild success");
wasm = opt_ref->handle()->wasmHandle()->wasm().get();
handle = opt_ref->handle();
// Reset rebuild state
wasm->setShouldRebuild(false);
} else {
ENVOY_LOG(info, "wasm vm rebuild failed, still using the stale one");
}
}
if (failed) {
if (handle->plugin()->fail_open_) {
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/filters/network/wasm/wasm_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class FilterConfig : Logger::Loggable<Logger::Id::wasm> {
failed = true;
} else if (wasm->isFailed()) {
ENVOY_LOG(info, "wasm vm is crashed, try to recover");
if (opt_ref->recover()) {
if (opt_ref->rebuild(true)) {
ENVOY_LOG(info, "wasm vm recover success");
wasm = opt_ref->handle()->wasmHandle()->wasm().get();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ ::Envoy::Http::FilterHeadersStatus RedirectPolicy::encodeHeaders(
downstream_headers->setPath(path_and_query);
#if defined(HIGRESS)
}
auto original_upstream_cluster = encoder_callbacks->streamInfo().upstreamClusterInfo();
#endif
if (decoder_callbacks->downstreamCallbacks()) {
decoder_callbacks->downstreamCallbacks()->clearRouteCache();
Expand Down Expand Up @@ -307,9 +308,15 @@ ::Envoy::Http::FilterHeadersStatus RedirectPolicy::encodeHeaders(
// Cache the original response code.
absl::optional<::Envoy::Http::Code> original_response_code;
#if defined(HIGRESS)
if (original_upstream_cluster.has_value()) {
encoder_callbacks->streamInfo().setUpstreamClusterInfo(*original_upstream_cluster);
}
absl::optional<uint64_t> current_code =
::Envoy::Http::Utility::getResponseStatusOrNullopt(headers);
if (current_code.has_value()) {
encoder_callbacks->streamInfo().setResponseCode(static_cast<uint32_t>(*current_code));
}
if (keep_original_response_code_) {
absl::optional<uint64_t> current_code =
::Envoy::Http::Utility::getResponseStatusOrNullopt(headers);
if (current_code.has_value()) {
original_response_code = static_cast<::Envoy::Http::Code>(*current_code);
}
Expand Down
Loading