Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/envoy/service/ext_proc/v3/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ licenses(["notice"]) # Apache 2
api_proto_package(
has_services = True,
deps = [
"//envoy/annotations:pkg",
"//envoy/config/core/v3:pkg",
"//envoy/extensions/filters/http/ext_proc/v3:pkg",
"//envoy/type/v3:pkg",
Expand Down
14 changes: 6 additions & 8 deletions api/envoy/service/ext_proc/v3/external_processor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ syntax = "proto3";

package envoy.service.ext_proc.v3;

import "envoy/annotations/deprecation.proto";
import "envoy/config/core/v3/base.proto";
import "envoy/extensions/filters/http/ext_proc/v3/processing_mode.proto";
import "envoy/type/v3/http_status.proto";
Expand Down Expand Up @@ -113,7 +114,6 @@ message ProcessingRequest {
// Dynamic metadata associated with the request.
config.core.v3.Metadata metadata_context = 8;

// [#not-implemented-hide:]
// The values of properties selected by the ``request_attributes``
// or ``response_attributes`` list in the configuration. Each entry
// in the list is populated from the standard
Expand Down Expand Up @@ -211,13 +211,11 @@ message HttpHeaders {
config.core.v3.HeaderMap headers = 1;

// [#not-implemented-hide:]
// TODO(jbohanon) reserve field as part of rework detailed in https://github.com/envoyproxy/envoy/issues/32125
// The values of properties selected by the ``request_attributes``
// or ``response_attributes`` list in the configuration. Each entry
// in the list is populated
// from the standard :ref:`attributes <arch_overview_attributes>`
// supported across Envoy.
map<string, google.protobuf.Struct> attributes = 2;
// This field is deprecated and not implemented. Attributes will be sent in
// the top-level :ref:`attributes <envoy_v3_api_field_service.ext_proc.v3.ProcessingRequest.attributes`
// field.
map<string, google.protobuf.Struct> attributes = 2
[deprecated = true, (envoy.annotations.deprecated_at_minor_version) = "3.0"];

// If true, then there is no message body associated with this
// request or response.
Expand Down
54 changes: 26 additions & 28 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,7 @@ void Filter::onDestroy() {
}

FilterHeadersStatus Filter::onHeaders(ProcessorState& state,
Http::RequestOrResponseHeaderMap& headers, bool end_stream,
ProtobufWkt::Struct* proto) {
Http::RequestOrResponseHeaderMap& headers, bool end_stream) {
switch (openStream()) {
case StreamOpenState::Error:
return FilterHeadersStatus::StopIteration;
Expand All @@ -291,14 +290,12 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state,
state.setHeaders(&headers);
state.setHasNoBody(end_stream);
ProcessingRequest req;
addAttributes(state, req);
addDynamicMetadata(state, req);
auto* headers_req = state.mutableHeaders(req);
MutationUtils::headersToProto(headers, config_->allowedHeaders(), config_->disallowedHeaders(),
*headers_req->mutable_headers());
headers_req->set_end_of_stream(end_stream);
if (proto != nullptr) {
(*headers_req->mutable_attributes())[FilterName] = *proto;
}
state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(),
ProcessorState::CallbackState::HeadersCallback);
ENVOY_LOG(debug, "Sending headers message");
Expand All @@ -315,19 +312,14 @@ FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_st
decoding_state_.setCompleteBodyAvailable(true);
}

// Set the request headers on decoding and encoding state in case they are
// needed later.
decoding_state_.setRequestHeaders(&headers);
encoding_state_.setRequestHeaders(&headers);

FilterHeadersStatus status = FilterHeadersStatus::Continue;
if (decoding_state_.sendHeaders()) {
ProtobufWkt::Struct proto;

if (config_->expressionManager().hasRequestExpr()) {
auto activation_ptr = Filters::Common::Expr::createActivation(
&config_->expressionManager().localInfo(), decoding_state_.callbacks()->streamInfo(),
&headers, nullptr, nullptr);
proto = config_->expressionManager().evaluateRequestAttributes(*activation_ptr);
}

status = onHeaders(decoding_state_, headers, end_stream,
config_->expressionManager().hasRequestExpr() ? &proto : nullptr);
status = onHeaders(decoding_state_, headers, end_stream);
ENVOY_LOG(trace, "onHeaders returning {}", static_cast<int>(status));
} else {
ENVOY_LOG(trace, "decodeHeaders: Skipped header processing");
Expand Down Expand Up @@ -590,7 +582,7 @@ FilterTrailersStatus Filter::onTrailers(ProcessorState& state, Http::HeaderMap&
FilterTrailersStatus Filter::decodeTrailers(RequestTrailerMap& trailers) {
ENVOY_LOG(trace, "decodeTrailers");
const auto status = onTrailers(decoding_state_, trailers);
ENVOY_LOG(trace, "encodeTrailers returning {}", static_cast<int>(status));
ENVOY_LOG(trace, "decodeTrailers returning {}", static_cast<int>(status));
return status;
}

Expand All @@ -605,17 +597,7 @@ FilterHeadersStatus Filter::encodeHeaders(ResponseHeaderMap& headers, bool end_s

FilterHeadersStatus status = FilterHeadersStatus::Continue;
if (!processing_complete_ && encoding_state_.sendHeaders()) {
ProtobufWkt::Struct proto;

if (config_->expressionManager().hasResponseExpr()) {
auto activation_ptr = Filters::Common::Expr::createActivation(
&config_->expressionManager().localInfo(), encoding_state_.callbacks()->streamInfo(),
nullptr, &headers, nullptr);
proto = config_->expressionManager().evaluateResponseAttributes(*activation_ptr);
}

status = onHeaders(encoding_state_, headers, end_stream,
config_->expressionManager().hasResponseExpr() ? &proto : nullptr);
status = onHeaders(encoding_state_, headers, end_stream);
ENVOY_LOG(trace, "onHeaders returns {}", static_cast<int>(status));
} else {
ENVOY_LOG(trace, "encodeHeaders: Skipped header processing");
Expand Down Expand Up @@ -650,6 +632,7 @@ ProcessingRequest Filter::setupBodyChunk(ProcessorState& state, const Buffer::In
bool end_stream) {
ENVOY_LOG(debug, "Sending a body chunk of {} bytes, end_stream {}", data.length(), end_stream);
ProcessingRequest req;
addAttributes(state, req);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dont we need the same for the response bodies as well? or is this function called in the response body path as well?

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is called from the onData helper method which is called on both decode and encode

addDynamicMetadata(state, req);
auto* body_req = state.mutableBody(req);
body_req->set_end_of_stream(end_stream);
Expand All @@ -667,6 +650,7 @@ void Filter::sendBodyChunk(ProcessorState& state, ProcessorState::CallbackState

void Filter::sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers) {
ProcessingRequest req;
addAttributes(state, req);
addDynamicMetadata(state, req);
auto* trailers_req = state.mutableTrailers(req);
MutationUtils::headersToProto(trailers, config_->allowedHeaders(), config_->disallowedHeaders(),
Expand Down Expand Up @@ -771,6 +755,20 @@ void Filter::addDynamicMetadata(const ProcessorState& state, ProcessingRequest&
*req.mutable_metadata_context() = forwarding_metadata;
}

void Filter::addAttributes(const ProcessorState& state, ProcessingRequest& req) {
if (!state.sendAttributes(config_->expressionManager())) {
return;
}

auto activation_ptr = Filters::Common::Expr::createActivation(
&config_->expressionManager().localInfo(), state.callbacks()->streamInfo(),
state.requestHeaders(), state.responseHeaders(), state.trailers());
attributes = config_->expressionManager().evaluateRequestAttributes(*activation_ptr);

state.setSentAttributes(true);
(*req.mutable_attributes())[FilterName] = *attributes;
}

void Filter::setDynamicMetadata(Http::StreamFilterCallbacks* cb, const ProcessorState& state,
const ProcessingResponse& response) {
if (state.untypedReceivingMetadataNamespaces().empty() || !response.has_dynamic_metadata()) {
Expand Down
5 changes: 3 additions & 2 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,7 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
void sendImmediateResponse(const envoy::service::ext_proc::v3::ImmediateResponse& response);

Http::FilterHeadersStatus onHeaders(ProcessorState& state,
Http::RequestOrResponseHeaderMap& headers, bool end_stream,
ProtobufWkt::Struct* proto);
Http::RequestOrResponseHeaderMap& headers, bool end_stream);

// Return a pair of whether to terminate returning the current result.
std::pair<bool, Http::FilterDataStatus> sendStreamChunk(ProcessorState& state);
Expand All @@ -386,6 +385,8 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
void setDecoderDynamicMetadata(const envoy::service::ext_proc::v3::ProcessingResponse& response);
void addDynamicMetadata(const ProcessorState& state,
envoy::service::ext_proc::v3::ProcessingRequest& req);
void addAttributes(const ProcessorState& state,
envoy::service::ext_proc::v3::ProcessingRequest& req);

const FilterConfigSharedPtr config_;
const ExternalProcessorClientPtr client_;
Expand Down
28 changes: 28 additions & 0 deletions source/extensions/filters/http/ext_proc/processor_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "source/common/common/logger.h"

#include "absl/status/status.h"
#include "matching_utils.h"

namespace Envoy {
namespace Extensions {
Expand Down Expand Up @@ -134,8 +135,12 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> {
return body_mode_;
}

void setRequestHeaders(Http::RequestOrResponseHeaderMap* headers) { request_headers_ = headers; }
void setHeaders(Http::RequestOrResponseHeaderMap* headers) { headers_ = headers; }
void setTrailers(Http::HeaderMap* trailers) { trailers_ = trailers; }
const Http::RequestOrResponseHeaderMap* requestHeaders() const { return request_headers_; };
virtual const Http::RequestOrResponseHeaderMap* responseHeaders() const PURE;
const Http::HeaderMap* responseTrailers() const { return trailers_; }

void onStartProcessorCall(Event::TimerCb cb, std::chrono::milliseconds timeout,
CallbackState callback_state);
Expand Down Expand Up @@ -202,6 +207,10 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> {

virtual Http::StreamFilterCallbacks* callbacks() const PURE;

virtual bool sendAttributes(const ExpressionManager& mgr) const PURE;

void setSentAttributes(bool sent) { attributes_sent_ = sent; }

protected:
void setBodyMode(
envoy::extensions::filters::http::ext_proc::v3::ProcessingMode_BodySendMode body_mode);
Expand Down Expand Up @@ -236,6 +245,10 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> {
// The specific mode for body handling
envoy::extensions::filters::http::ext_proc::v3::ProcessingMode_BodySendMode body_mode_;

// The request_headers_ field is guaranteed to hold the pointer to the request
// headers as set in decodeHeaders. This allows both decoding and encoding states
// to have access to the request headers map.
Http::RequestOrResponseHeaderMap* request_headers_ = nullptr;
Http::RequestOrResponseHeaderMap* headers_ = nullptr;
Http::HeaderMap* trailers_ = nullptr;
Event::TimerPtr message_timer_;
Expand All @@ -250,6 +263,9 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> {
const std::vector<std::string>* typed_forwarding_namespaces_{};
const std::vector<std::string>* untyped_receiving_namespaces_{};

// If true, the attributes for this processing state have already been sent.
bool attributes_sent_{};

private:
virtual void clearRouteCache(const envoy::service::ext_proc::v3::CommonResponse&) {}
};
Expand Down Expand Up @@ -324,6 +340,12 @@ class DecodingProcessorState : public ProcessorState {

Http::StreamFilterCallbacks* callbacks() const override { return decoder_callbacks_; }

bool sendAttributes(const ExpressionManager& mgr) const override {
return !attributes_sent_ && mgr.hasRequestExpr();
}

const Http::RequestOrResponseHeaderMap* responseHeaders() const override { return nullptr; }

private:
void setProcessingModeInternal(
const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode);
Expand Down Expand Up @@ -404,6 +426,12 @@ class EncodingProcessorState : public ProcessorState {

Http::StreamFilterCallbacks* callbacks() const override { return encoder_callbacks_; }

bool sendAttributes(const ExpressionManager& mgr) const override {
return !attributes_sent_ && mgr.hasResponseExpr();
}

const Http::RequestOrResponseHeaderMap* responseHeaders() const override { return headers_; }

private:
void setProcessingModeInternal(
const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode);
Expand Down
46 changes: 38 additions & 8 deletions test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3429,12 +3429,11 @@ TEST_P(ExtProcIntegrationTest, SendAndReceiveDynamicMetadata) {
}

#if defined(USE_CEL_PARSER)
// Test the filter using the default configuration by connecting to
// an ext_proc server that responds to the request_headers message
// by requesting to modify the request headers.
TEST_P(ExtProcIntegrationTest, GetAndSetRequestResponseAttributes) {
TEST_P(ExtProcIntegrationTest, RequestResponseAttributes) {
proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SEND);
proto_config_.mutable_processing_mode()->set_request_trailer_mode(ProcessingMode::SEND);
proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SEND);
proto_config_.mutable_processing_mode()->set_response_trailer_mode(ProcessingMode::SEND);
proto_config_.mutable_request_attributes()->Add("request.path");
proto_config_.mutable_request_attributes()->Add("request.method");
proto_config_.mutable_request_attributes()->Add("request.scheme");
Expand All @@ -3445,29 +3444,60 @@ TEST_P(ExtProcIntegrationTest, GetAndSetRequestResponseAttributes) {
initializeConfig();
HttpIntegrationTest::initialize();
auto response = sendDownstreamRequest(absl::nullopt);
processRequestHeadersMessage(
*grpc_upstreams_[0], true, [](const HttpHeaders& req, HeadersResponse&) {

// Handle request headers message.
processGenericMessage(
*grpc_upstreams_[0], true, [](const ProcessingRequest& req, ProcessingResponse&) {
EXPECT_TRUE(req.has_request_headers());
EXPECT_EQ(req.attributes().size(), 1);
auto proto_struct = req.attributes().at("envoy.filters.http.ext_proc");
EXPECT_EQ(proto_struct.fields().at("request.path").string_value(), "/");
EXPECT_EQ(proto_struct.fields().at("request.method").string_value(), "GET");
EXPECT_EQ(proto_struct.fields().at("request.scheme").string_value(), "http");
EXPECT_EQ(proto_struct.fields().at("connection.mtls").bool_value(), false);

// Make sure we are not including any data in the deprecated HttpHeaders.attributes.
EXPECT_TRUE(req.request_headers().attributes().empty());
return true;
});

// Handle request trailers message, making sure we did not send request attributes again.
processGenericMessage(*grpc_upstreams_[0], false,
[](const ProcessingRequest& req, ProcessingResponse&) {
EXPECT_TRUE(req.has_request_trailers());
EXPECT_TRUE(req.attributes().empty());
return true;
});

handleUpstreamRequest();

processResponseHeadersMessage(
*grpc_upstreams_[0], false, [](const HttpHeaders& req, HeadersResponse&) {
// Handle response headers message.
processGenericMessage(
*grpc_upstreams_[0], false, [](const ProcessingRequest& req, ProcessingResponse&) {
EXPECT_TRUE(req.has_response_headers());
EXPECT_EQ(req.attributes().size(), 1);
auto proto_struct = req.attributes().at("envoy.filters.http.ext_proc");
EXPECT_EQ(proto_struct.fields().at("response.code").string_value(), "200");
EXPECT_EQ(proto_struct.fields().at("response.code_details").string_value(),
StreamInfo::ResponseCodeDetails::get().ViaUpstream);

// Make sure we didn't include request attributes in the response-path processing request.
EXPECT_FALSE(proto_struct.fields().contains("request.method"));

// Make sure we are not including any data in the deprecated HttpHeaders.attributes.
EXPECT_TRUE(req.response_headers().attributes().empty());
return true;
});

// Handle response trailers message, making sure we did not send request or response attributes
// again.
processGenericMessage(*grpc_upstreams_[0], false,
[](const ProcessingRequest& req, ProcessingResponse&) {
EXPECT_TRUE(req.has_response_trailers());
EXPECT_TRUE(req.attributes().empty());
return true;
});

verifyDownstreamResponse(*response, 200);
}
#endif
Expand Down