Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ proto_library(
"//envoy/extensions/transport_sockets/raw_buffer/v3:pkg",
"//envoy/extensions/transport_sockets/tap/v3:pkg",
"//envoy/extensions/transport_sockets/tls/v3:pkg",
"//envoy/extensions/upstreams/http/cluster_sensitive/v3:pkg",
"//envoy/extensions/upstreams/http/generic/v3:pkg",
"//envoy/extensions/upstreams/http/http/v3:pkg",
"//envoy/extensions/upstreams/http/tcp/v3:pkg",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py.

load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package")

licenses(["notice"]) # Apache 2

api_proto_package(
deps = ["@com_github_cncf_udpa//udpa/annotations:pkg"],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
syntax = "proto3";

package envoy.extensions.upstreams.http.cluster_sensitive.v3;

import "udpa/annotations/status.proto";

option java_package = "io.envoyproxy.envoy.extensions.upstreams.http.cluster_sensitive.v3";
option java_outer_classname = "HttpConnectionPoolProtoOuterClass";
option java_multiple_files = true;
option (udpa.annotations.file_status).package_version_status = ACTIVE;

// [#protodoc-title: Http Connection Pool]

// A connection pool which forwards downstream HTTP as HTTP to upstream.
// [#extension: envoy.upstreams.http.cluster_sensitive]
message HttpConnectionPoolProto {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mark

}
1 change: 1 addition & 0 deletions api/versioning/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ proto_library(
"//envoy/extensions/transport_sockets/raw_buffer/v3:pkg",
"//envoy/extensions/transport_sockets/tap/v3:pkg",
"//envoy/extensions/transport_sockets/tls/v3:pkg",
"//envoy/extensions/upstreams/http/cluster_sensitive/v3:pkg",
"//envoy/extensions/upstreams/http/generic/v3:pkg",
"//envoy/extensions/upstreams/http/http/v3:pkg",
"//envoy/extensions/upstreams/http/tcp/v3:pkg",
Expand Down
2 changes: 1 addition & 1 deletion source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ std::unique_ptr<GenericConnPool> Filter::createConnPool() {
cluster_->upstreamConfig().value());
} else {
factory = &Envoy::Config::Utility::getAndCheckFactoryByName<GenericConnPoolFactory>(
"envoy.filters.connection_pools.http.generic");
"envoy.filters.connection_pools.http.cluster_sensitive");
}
const bool should_tcp_proxy =
route_entry_->connectConfig().has_value() &&
Expand Down
1 change: 1 addition & 0 deletions source/extensions/extensions_build_config.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ EXTENSIONS = {
#
"envoy.upstreams.http.http": "//source/extensions/upstreams/http/http:config",
"envoy.upstreams.http.tcp": "//source/extensions/upstreams/http/tcp:config",
"envoy.upstreams.http.cluster_sensitive": "//source/extensions/upstreams/http/cluster_sensitive:config",

#
# Watchdog actions
Expand Down
72 changes: 72 additions & 0 deletions source/extensions/upstreams/http/cluster_sensitive/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_extension",
"envoy_cc_library",
"envoy_extension_package",
)

licenses(["notice"]) # Apache 2

envoy_extension_package()

envoy_cc_extension(
name = "config",
srcs = [
"config.cc",
],
hdrs = [
"config.h",
],
security_posture = "robust_to_untrusted_downstream",
visibility = ["//visibility:public"],
deps = [
":upstream_request_lib",
"//source/extensions/upstreams/http/http:upstream_request_lib",
"//source/extensions/upstreams/http/tcp:upstream_request_lib",
"@envoy_api//envoy/extensions/upstreams/http/cluster_sensitive/v3:pkg_cc_proto",
],
)

envoy_cc_library(
name = "upstream_request_lib",
deps = [
":http_upstream_request_lib",
":tcp_upstream_request_lib",
],
)

envoy_cc_library(
name = "http_upstream_request_lib",
srcs = [
"http_upstream_request.cc",
],
hdrs = [
"http_upstream_request.h",
],
deps = [
"//include/envoy/http:codes_interface",
"//include/envoy/http:conn_pool_interface",
"//include/envoy/http:filter_interface",
"//include/envoy/upstream:cluster_manager_interface",
"//include/envoy/upstream:upstream_interface",
"//source/common/common:assert_lib",
"//source/common/common:minimal_logger_lib",
"//source/common/http:codes_lib",
"//source/common/http:header_map_lib",
"//source/common/http:headers_lib",
"//source/common/http:message_lib",
"//source/common/network:application_protocol_lib",
"//source/common/network:transport_socket_options_lib",
"//source/common/router:router_lib",
"//source/common/upstream:load_balancer_lib",
"//source/extensions/common/proxy_protocol:proxy_protocol_header_lib",
"//source/extensions/upstreams/http/http:upstream_request_lib",
],
)

envoy_cc_library(
name = "tcp_upstream_request_lib",
deps = [
"//source/extensions/upstreams/http/http:upstream_request_lib",
],
)
32 changes: 32 additions & 0 deletions source/extensions/upstreams/http/cluster_sensitive/config.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#include "extensions/upstreams/http/cluster_sensitive/config.h"

#include "extensions/upstreams/http/cluster_sensitive/http_upstream_request.h"
#include "extensions/upstreams/http/tcp/upstream_request.h"

namespace Envoy {
namespace Extensions {
namespace Upstreams {
namespace Http {
namespace ClusterSensitive {

Router::GenericConnPoolPtr ClusterSensitiveGenericConnPoolFactory::createGenericConnPool(
Upstream::ClusterManager& cm, bool is_connect, const Router::RouteEntry& route_entry,
absl::optional<Envoy::Http::Protocol> downstream_protocol,
Upstream::LoadBalancerContext* ctx) const {
if (is_connect) {
auto ret = std::make_unique<Upstreams::Http::Tcp::TcpConnPool>(cm, is_connect, route_entry,
downstream_protocol, ctx);
return (ret->valid() ? std::move(ret) : nullptr);
}
auto ret = std::make_unique<Upstreams::Http::ClusterSensitive::HttpConnPool>(
cm, is_connect, route_entry, downstream_protocol, ctx);
return (ret->valid() ? std::move(ret) : nullptr);
}

REGISTER_FACTORY(ClusterSensitiveGenericConnPoolFactory, Router::GenericConnPoolFactory);

} // namespace ClusterSensitive
} // namespace Http
} // namespace Upstreams
} // namespace Extensions
} // namespace Envoy
40 changes: 40 additions & 0 deletions source/extensions/upstreams/http/cluster_sensitive/config.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#pragma once

#include "envoy/extensions/upstreams/http/cluster_sensitive/v3/http_connection_pool.pb.h"
#include "envoy/registry/registry.h"
#include "envoy/router/router.h"

namespace Envoy {
namespace Extensions {
namespace Upstreams {
namespace Http {
namespace ClusterSensitive {

/**
* Config registration for the HttpConnPool. @see Router::GenericConnPoolFactory
*/
class ClusterSensitiveGenericConnPoolFactory : public Router::GenericConnPoolFactory {
public:
std::string name() const override {
return "envoy.filters.connection_pools.http.cluster_sensitive";
}
std::string category() const override { return "envoy.upstreams"; }
Router::GenericConnPoolPtr
createGenericConnPool(Upstream::ClusterManager& cm, bool is_connect,
const Router::RouteEntry& route_entry,
absl::optional<Envoy::Http::Protocol> downstream_protocol,
Upstream::LoadBalancerContext* ctx) const override;

ProtobufTypes::MessagePtr createEmptyConfigProto() override {
return std::make_unique<
envoy::extensions::upstreams::http::cluster_sensitive::v3::HttpConnectionPoolProto>();
}
};

DECLARE_FACTORY(ClusterSensitiveGenericConnPoolFactory);

} // namespace ClusterSensitive
} // namespace Http
} // namespace Upstreams
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#include "extensions/upstreams/http/cluster_sensitive/http_upstream_request.h"

#include <cstdint>
#include <memory>

#include "envoy/event/dispatcher.h"
#include "envoy/event/timer.h"
#include "envoy/grpc/status.h"
#include "envoy/http/conn_pool.h"
#include "envoy/upstream/cluster_manager.h"
#include "envoy/upstream/upstream.h"

#include "common/common/assert.h"
#include "common/common/utility.h"
#include "common/http/codes.h"
#include "common/http/header_map_impl.h"
#include "common/http/headers.h"
#include "common/http/message_impl.h"
#include "common/http/utility.h"
#include "common/router/router.h"

using Envoy::Router::GenericConnectionPoolCallbacks;

namespace Envoy {
namespace Extensions {
namespace Upstreams {
namespace Http {
namespace ClusterSensitive {

void HttpConnPool::newStream(GenericConnectionPoolCallbacks* callbacks) {
callbacks_ = callbacks;
// It's possible for a reset to happen inline within the newStream() call. In this case, we
// might get deleted inline as well. Only write the returned handle out if it is not nullptr to
// deal with this case.
Envoy::Http::ConnectionPool::Cancellable* handle =
conn_pool_->newStream(callbacks->upstreamToDownstream(), *this);
if (handle) {
conn_pool_stream_handle_ = handle;
}
}

bool HttpConnPool::cancelAnyPendingStream() {
if (conn_pool_stream_handle_) {
conn_pool_stream_handle_->cancel(ConnectionPool::CancelPolicy::Default);
conn_pool_stream_handle_ = nullptr;
return true;
}
return false;
}

absl::optional<Envoy::Http::Protocol> HttpConnPool::protocol() const {
return conn_pool_->protocol();
}

void HttpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason,
absl::string_view transport_failure_reason,
Upstream::HostDescriptionConstSharedPtr host) {
callbacks_->onPoolFailure(reason, transport_failure_reason, host);
}

void HttpConnPool::onPoolReady(Envoy::Http::RequestEncoder& request_encoder,
Upstream::HostDescriptionConstSharedPtr host,
const StreamInfo::StreamInfo& info) {
conn_pool_stream_handle_ = nullptr;
auto upstream =
std::make_unique<HttpUpstream>(callbacks_->upstreamToDownstream(), &request_encoder, host);
callbacks_->onPoolReady(std::move(upstream), host,
request_encoder.getStream().connectionLocalAddress(), info);
}

} // namespace ClusterSensitive
} // namespace Http
} // namespace Upstreams
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#pragma once

#include <cstdint>
#include <memory>
#include <string>
#include <string_view>

#include "envoy/http/codes.h"
#include "envoy/http/conn_pool.h"

#include "common/common/logger.h"
#include "common/config/well_known_names.h"
#include "common/http/header_map_impl.h"
#include "common/router/upstream_request.h"

#include "extensions/upstreams/http/http/upstream_request.h"

namespace Envoy {
namespace Extensions {
namespace Upstreams {
namespace Http {
namespace ClusterSensitive {

class HttpConnPool : public Router::GenericConnPool, public Envoy::Http::ConnectionPool::Callbacks {
public:
// GenericConnPool
HttpConnPool(Upstream::ClusterManager& cm, bool is_connect, const Router::RouteEntry& route_entry,
absl::optional<Envoy::Http::Protocol> downstream_protocol,
Upstream::LoadBalancerContext* ctx) {
ASSERT(!is_connect);
conn_pool_ = cm.httpConnPoolForCluster(route_entry.clusterName(), route_entry.priority(),
downstream_protocol, ctx);
}
void newStream(Router::GenericConnectionPoolCallbacks* callbacks) override;
bool cancelAnyPendingStream() override;
absl::optional<Envoy::Http::Protocol> protocol() const override;

// Http::ConnectionPool::Callbacks
void onPoolFailure(ConnectionPool::PoolFailureReason reason,
absl::string_view transport_failure_reason,
Upstream::HostDescriptionConstSharedPtr host) override;
void onPoolReady(Envoy::Http::RequestEncoder& callbacks_encoder,
Upstream::HostDescriptionConstSharedPtr host,
const StreamInfo::StreamInfo& info) override;
Upstream::HostDescriptionConstSharedPtr host() const override { return conn_pool_->host(); }

bool valid() { return conn_pool_ != nullptr; }

private:
// Points to the actual connection pool to create streams from.
Envoy::Http::ConnectionPool::Instance* conn_pool_{};
Envoy::Http::ConnectionPool::Cancellable* conn_pool_stream_handle_{};
Router::GenericConnectionPoolCallbacks* callbacks_{};
};

class HttpUpstream : public Router::GenericUpstream {
public:
HttpUpstream(Router::UpstreamToDownstream& upstream_request, Envoy::Http::RequestEncoder* encoder,
Upstream::HostDescriptionConstSharedPtr host)
: sub_upstream_(upstream_request, encoder), host_(host) {}

// GenericUpstream
void encodeData(Buffer::Instance& data, bool end_stream) override {
sub_upstream_.encodeData(data, end_stream);
}

void encodeMetadata(const Envoy::Http::MetadataMapVector& metadata_map_vector) override {
sub_upstream_.encodeMetadata(metadata_map_vector);
}

void encodeHeaders(const Envoy::Http::RequestHeaderMap& headers, bool end_stream) override {
auto dup = Envoy::Http::RequestHeaderMapImpl::create();
Envoy::Http::HeaderMapImpl::copyFrom(*dup, headers);
dup->setCopy(Envoy::Http::LowerCaseString("X-istio-test"), "lambdai");
// Sanitize original port header.
dup->remove(Envoy::Http::LowerCaseString("X-istio-original-port"));
if (auto filter_metadata = host_->cluster().metadata().filter_metadata().find("istio");
filter_metadata != host_->cluster().metadata().filter_metadata().end()) {
ENVOY_LOG_MISC(warn, "lambdai: find filter_metadata from {}",
host_->cluster().metadata().DebugString());
const ProtobufWkt::Struct& data_struct = filter_metadata->second;
const auto& fields = data_struct.fields();
if (auto iter = fields.find("original_port"); iter != fields.end()) {
if (iter->second.kind_case() == ProtobufWkt::Value::kStringValue) {
dup->setCopy(Envoy::Http::LowerCaseString("X-istio-original-port"),
iter->second.string_value());
}
}
}
sub_upstream_.encodeHeaders(*dup, end_stream);
}

void encodeTrailers(const Envoy::Http::RequestTrailerMap& trailers) override {
sub_upstream_.encodeTrailers(trailers);
}

void readDisable(bool disable) override { sub_upstream_.readDisable(disable); }

void resetStream() override { sub_upstream_.resetStream(); }

private:
Upstreams::Http::Http::HttpUpstream sub_upstream_;
Upstream::HostDescriptionConstSharedPtr host_{};
};

} // namespace ClusterSensitive
} // namespace Http
} // namespace Upstreams
} // namespace Extensions
} // namespace Envoy
Loading