diff --git a/include/envoy/network/connection_handler.h b/include/envoy/network/connection_handler.h index 8df955119503f..d752b49b74f0b 100644 --- a/include/envoy/network/connection_handler.h +++ b/include/envoy/network/connection_handler.h @@ -19,6 +19,31 @@ class ConnectionHandler { public: virtual ~ConnectionHandler() = default; + /** + * Used by ConnectionHandler to manage listeners. + */ + class ActiveListener { + public: + virtual ~ActiveListener() = default; + + /** + * @return the tag value as configured. + */ + virtual uint64_t listenerTag() PURE; + + /** + * @return the actual Listener object. + */ + virtual Listener* listener() PURE; + + /** + * Destroy the actual Listener it wraps. + */ + virtual void destroy() PURE; + }; + + using ActiveListenerPtr = std::unique_ptr; + /** * @return uint64_t the number of active connections owned by the handler. */ @@ -57,6 +82,10 @@ class ConnectionHandler { */ virtual void stopListeners(uint64_t listener_tag) PURE; + virtual bool updateListener( + uint64_t listener_tag, + std::function listener_update_func) PURE; + /** * Stop all listeners. This will not close any connections and is used for draining. */ @@ -78,31 +107,6 @@ class ConnectionHandler { * @return the stat prefix used for per-handler stats. */ virtual const std::string& statPrefix() PURE; - - /** - * Used by ConnectionHandler to manage listeners. - */ - class ActiveListener { - public: - virtual ~ActiveListener() = default; - - /** - * @return the tag value as configured. - */ - virtual uint64_t listenerTag() PURE; - - /** - * @return the actual Listener object. - */ - virtual Listener* listener() PURE; - - /** - * Destroy the actual Listener it wraps. - */ - virtual void destroy() PURE; - }; - - using ActiveListenerPtr = std::unique_ptr; }; using ConnectionHandlerPtr = std::unique_ptr; diff --git a/include/envoy/network/filter.h b/include/envoy/network/filter.h index e446b1bf7a2c5..26b1644e5fa3b 100644 --- a/include/envoy/network/filter.h +++ b/include/envoy/network/filter.h @@ -331,6 +331,13 @@ class FilterChain { * const std::vector& a list of filters to be used by the new connection. */ virtual const std::vector& networkFilterFactories() const PURE; + + /** + * @brief Get the Tag object associated with this filter chain. + * + * @return int64_t + */ + virtual int64_t getTag() const { return 0; } }; using FilterChainSharedPtr = std::shared_ptr; @@ -351,6 +358,8 @@ class FilterChainManager { virtual const FilterChain* findFilterChain(const ConnectionSocket& socket) const PURE; }; +using FilterChainManagerSharedPtr = std::shared_ptr; + /** * Callbacks used by individual UDP listener read filter instances to communicate with the filter * manager. diff --git a/include/envoy/network/listener.h b/include/envoy/network/listener.h index fbb0ff7c17f97..a26286d2c5093 100644 --- a/include/envoy/network/listener.h +++ b/include/envoy/network/listener.h @@ -29,6 +29,12 @@ class ListenerConfig { */ virtual FilterChainManager& filterChainManager() PURE; + /** + * @return FilterChainManagerSharedPtr the factory for adding and searching through configured + * filter chains. + */ + virtual FilterChainManagerSharedPtr sharedFilterChainManager() PURE; + /** * @return FilterChainFactory& the factory for setting up the filter chain on a new * connection. @@ -111,7 +117,7 @@ class ListenerConfig { }; /** - * Callbacks invoked by a listener. + * Callbacks invoked by a tcp listener. */ class ListenerCallbacks { public: @@ -122,6 +128,13 @@ class ListenerCallbacks { * @param socket supplies the socket that is moved into the callee. */ virtual void onAccept(ConnectionSocketPtr&& socket) PURE; + + /** + * Called when a new filter chain manager is updated. + * + * @param filter_chain_manager + */ + virtual void onNewFilterChainManger(FilterChainManager& filter_chain_manager) PURE; }; /** diff --git a/include/envoy/server/worker.h b/include/envoy/server/worker.h index ef924996edded..d52a95ee8219c 100644 --- a/include/envoy/server/worker.h +++ b/include/envoy/server/worker.h @@ -74,6 +74,11 @@ class Worker { */ virtual void stopListener(Network::ListenerConfig& listener, std::function completion) PURE; + + virtual void updateListener( + uint64_t listener_tag, + std::function listener_update_func, + std::function completion) PURE; }; using WorkerPtr = std::unique_ptr; diff --git a/source/common/network/listener_impl.cc b/source/common/network/listener_impl.cc index 84498125d5118..a72f1150327cc 100644 --- a/source/common/network/listener_impl.cc +++ b/source/common/network/listener_impl.cc @@ -18,7 +18,7 @@ namespace Network { void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr* remote_addr, int remote_addr_len, void* arg) { - ListenerImpl* listener = static_cast(arg); + auto* listener = static_cast(arg); // Create the IoSocketHandleImpl for the fd here. IoHandlePtr io_handle = std::make_unique(fd); diff --git a/source/server/BUILD b/source/server/BUILD index ee4de6e782dff..f55c93a9af5bd 100644 --- a/source/server/BUILD +++ b/source/server/BUILD @@ -271,6 +271,7 @@ envoy_cc_library( ":drain_manager_lib", ":filter_chain_manager_lib", ":listener_manager_impl", + ":tag_generator_lib", ":transport_socket_config_lib", ":well_known_names_lib", "//include/envoy/server:active_udp_listener_config_interface", @@ -333,9 +334,11 @@ envoy_cc_library( "//include/envoy/server:transport_socket_config_interface", "//source/common/common:empty_string", "//source/common/config:utility_lib", + "//source/common/init:manager_lib", "//source/common/network:cidr_range_lib", "//source/common/network:lc_trie_lib", "//source/server:configuration_lib", + "//source/server:tag_generator_lib", "@envoy_api//envoy/api/v2/listener:pkg_cc_proto", ], ) @@ -500,3 +503,16 @@ envoy_cc_library( "//include/envoy/server:active_udp_listener_config_interface", ], ) + +envoy_cc_library( + name = "tag_generator_lib", + srcs = ["tag_generator_batch_impl.cc"], + hdrs = [ + "tag_generator.h", + "tag_generator_batch_impl.h", + ], + deps = [ + "//source/common/protobuf:utility_lib", + "@envoy_api//envoy/api/v2/listener:pkg_cc_proto", + ], +) diff --git a/source/server/connection_handler_impl.cc b/source/server/connection_handler_impl.cc index ed342c64379bb..7541b467cac57 100644 --- a/source/server/connection_handler_impl.cc +++ b/source/server/connection_handler_impl.cc @@ -56,6 +56,21 @@ void ConnectionHandlerImpl::removeListeners(uint64_t listener_tag) { } } +bool ConnectionHandlerImpl::updateListener( + uint64_t listener_tag, + std::function activeListenerUpdate) { + bool res = false; + for (auto& listener : listeners_) { + // listener type: std::pair + if (listener.second.listener_->listenerTag() == listener_tag) { + // TODO(lambdai): explore in which condition there are more than one active listeners sharing + // the same tag + res |= activeListenerUpdate(*listener.second.listener_); + } + } + return res; +} + void ConnectionHandlerImpl::stopListeners(uint64_t listener_tag) { for (auto& listener : listeners_) { if (listener.second.listener_->listenerTag() == listener_tag) { @@ -86,7 +101,12 @@ void ConnectionHandlerImpl::enableListeners() { void ConnectionHandlerImpl::ActiveTcpListener::removeConnection(ActiveTcpConnection& connection) { ENVOY_CONN_LOG(debug, "adding to cleanup list", *connection.connection_); - ActiveTcpConnectionPtr removed = connection.removeFromList(connections_); + auto connections = tagged_connections_.find(connection.tag_); + ASSERT(connections != tagged_connections_.end()); + ActiveTcpConnectionPtr removed = connection.removeFromList(connections->second); + if (connections->second.empty()) { + tagged_connections_.erase(connections); + } parent_.dispatcher_.deferredDelete(std::move(removed)); } @@ -111,7 +131,8 @@ ConnectionHandlerImpl::ActiveTcpListener::ActiveTcpListener(ConnectionHandlerImp Network::ListenerConfig& config) : ConnectionHandlerImpl::ActiveListenerImplBase(parent, config), parent_(parent), listener_(std::move(listener)), listener_filters_timeout_(config.listenerFiltersTimeout()), - continue_on_listener_filters_timeout_(config.continueOnListenerFiltersTimeout()) { + continue_on_listener_filters_timeout_(config.continueOnListenerFiltersTimeout()), + active_filter_chain_manager_(config.sharedFilterChainManager()) { config.connectionBalancer().registerHandler(*this); } @@ -125,8 +146,11 @@ ConnectionHandlerImpl::ActiveTcpListener::~ActiveTcpListener() { parent_.dispatcher_.deferredDelete(std::move(removed)); } - while (!connections_.empty()) { - connections_.front()->connection_->close(Network::ConnectionCloseType::NoFlush); + for (auto& kv : tagged_connections_) { + auto& connections = kv.second; + while (!connections.empty()) { + connections.front()->connection_->close(Network::ConnectionCloseType::NoFlush); + } } parent_.dispatcher_.clearDeferredDeleteList(); @@ -270,7 +294,7 @@ void ConnectionHandlerImpl::ActiveTcpSocket::newConnection() { // Particularly the assigned events need to reset before assigning new events in the follow up. accept_filters_.clear(); // Create a new connection on this listener. - listener_.newConnection(std::move(socket_)); + listener_.newConnection(*this); } } @@ -278,6 +302,13 @@ void ConnectionHandlerImpl::ActiveTcpListener::onAccept(Network::ConnectionSocke onAcceptWorker(std::move(socket), config_.handOffRestoredDestinationConnections(), false); } +void ConnectionHandlerImpl::ActiveTcpListener::onNewFilterChainManger( + Network::FilterChainManager& filter_chain_manager) { + UNREFERENCED_PARAMETER(filter_chain_manager); + // obtain the tags traits + // drain the connections out of the tracking tags +} + void ConnectionHandlerImpl::ActiveTcpListener::onAcceptWorker( Network::ConnectionSocketPtr&& socket, bool hand_off_restored_destination_connections, bool rebalanced) { @@ -306,21 +337,23 @@ void ConnectionHandlerImpl::ActiveTcpListener::onAcceptWorker( } void ConnectionHandlerImpl::ActiveTcpListener::newConnection( - Network::ConnectionSocketPtr&& socket) { + ConnectionHandlerImpl::ActiveTcpSocket& tcp_socket) { // Find matching filter chain. - const auto filter_chain = config_.filterChainManager().findFilterChain(*socket); + auto snapped_filter_chain_manager = active_filter_chain_manager_; + const auto filter_chain = snapped_filter_chain_manager->findFilterChain(*tcp_socket.socket_); if (filter_chain == nullptr) { ENVOY_LOG(debug, "closing connection: no matching filter chain found"); stats_.no_filter_chain_match_.inc(); - socket->close(); + tcp_socket.socket_->close(); return; } auto transport_socket = filter_chain->transportSocketFactory().createTransportSocket(nullptr); - ActiveTcpConnectionPtr active_connection(new ActiveTcpConnection( - *this, - parent_.dispatcher_.createServerConnection(std::move(socket), std::move(transport_socket)), - parent_.dispatcher_.timeSource())); + ActiveTcpConnectionPtr active_connection( + new ActiveTcpConnection(*this, + parent_.dispatcher_.createServerConnection( + std::move(tcp_socket.socket_), std::move(transport_socket)), + parent_.dispatcher_.timeSource())); active_connection->connection_->setBufferLimits(config_.perConnectionBufferLimitBytes()); const bool empty_filter_chain = !config_.filterChainFactory().createNetworkFilterChain( @@ -334,7 +367,8 @@ void ConnectionHandlerImpl::ActiveTcpListener::newConnection( if (active_connection->connection_->state() != Network::Connection::State::Closed) { ENVOY_CONN_LOG(debug, "new connection", *active_connection->connection_); active_connection->connection_->addConnectionCallbacks(*active_connection); - active_connection->moveIntoList(std::move(active_connection), connections_); + auto& connections = tagged_connections_[active_connection->tag_]; + active_connection->moveIntoList(std::move(active_connection), connections); } } diff --git a/source/server/connection_handler_impl.h b/source/server/connection_handler_impl.h index cc809fcf4ea3e..790d457e0680f 100644 --- a/source/server/connection_handler_impl.h +++ b/source/server/connection_handler_impl.h @@ -72,6 +72,9 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler, void stopListeners() override; void disableListeners() override; void enableListeners() override; + bool updateListener(uint64_t listener_tag, + std::function + listener_update_func) override; const std::string& statPrefix() override { return per_handler_stat_prefix_; } /** @@ -115,6 +118,7 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler, // Network::ListenerCallbacks void onAccept(Network::ConnectionSocketPtr&& socket) override; + void onNewFilterChainManger(Network::FilterChainManager& filter_chain_manager) override; // ActiveListenerImplBase Network::Listener* listener() override { return listener_.get(); } @@ -134,15 +138,21 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler, /** * Create a new connection from a socket accepted by the listener. */ - void newConnection(Network::ConnectionSocketPtr&& socket); + void newConnection(ActiveTcpSocket& socket); + + void + updateFilterChainManager(const Network::FilterChainManagerSharedPtr& new_filter_chain_manager); ConnectionHandlerImpl& parent_; Network::ListenerPtr listener_; const std::chrono::milliseconds listener_filters_timeout_; const bool continue_on_listener_filters_timeout_; + // Sockets going through listener filter chain std::list sockets_; - std::list connections_; - + // Connections completed listener filter chain and currently going through network filter chain + absl::flat_hash_map> tagged_connections_; + // The filter chain manager which should serve the new connections. + Network::FilterChainManagerSharedPtr active_filter_chain_manager_; // The number of connections currently active on this listener. This is typically used for // connection balancing across per-handler listeners. std::atomic num_listener_connections_{}; @@ -172,6 +182,7 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler, ActiveTcpListener& listener_; Network::ConnectionPtr connection_; Stats::TimespanPtr conn_length_; + int64_t tag_{0}; }; /** @@ -185,7 +196,8 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler, bool hand_off_restored_destination_connections) : listener_(listener), socket_(std::move(socket)), hand_off_restored_destination_connections_(hand_off_restored_destination_connections), - iter_(accept_filters_.end()) { + iter_(accept_filters_.end()), + snapped_filter_chain_manager_(listener.active_filter_chain_manager_) { listener_.stats_.downstream_pre_cx_active_.inc(); } ~ActiveTcpSocket() override { @@ -224,6 +236,7 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler, const bool hand_off_restored_destination_connections_; std::list accept_filters_; std::list::iterator iter_; + Network::FilterChainManagerSharedPtr snapped_filter_chain_manager_; Event::TimerPtr timer_; }; diff --git a/source/server/filter_chain_manager_impl.h b/source/server/filter_chain_manager_impl.h index 6a06bcc7c6a05..5765e40805c66 100644 --- a/source/server/filter_chain_manager_impl.h +++ b/source/server/filter_chain_manager_impl.h @@ -4,11 +4,15 @@ #include "envoy/api/v2/listener/listener.pb.h" #include "envoy/server/transport_socket_config.h" +#include "envoy/thread_local/thread_local.h" #include "common/common/logger.h" +#include "common/init/manager_impl.h" #include "common/network/cidr_range.h" #include "common/network/lc_trie.h" +#include "server/tag_generator_batch_impl.h" + #include "absl/container/flat_hash_map.h" namespace Envoy { @@ -21,6 +25,20 @@ class FilterChainFactoryBuilder { buildFilterChain(const ::envoy::api::v2::listener::FilterChain& filter_chain) const PURE; }; +class FilterChainManagerImpl; + +class ThreadLocalFilterChainManagerHelper : public ThreadLocal::ThreadLocalObject { +public: + // The FCM which can be snapped by worker. + std::shared_ptr filter_chain_manager_; + // The per worker listener which owns the thread local filter chain manager + Network::ListenerCallbacks* listener_; + + // Below could be mutated by main thread. Worker thread should access with causion. + std::unique_ptr fcm_init_manager_; + std::unique_ptr fcm_init_watcher_; + std::unique_ptr> filter_chains_trait_; +}; /** * Implementation of FilterChainManager. */ @@ -137,9 +155,9 @@ class FilterChainManagerImpl : public Network::FilterChainManager, class FilterChainImpl : public Network::FilterChain { public: FilterChainImpl(Network::TransportSocketFactoryPtr&& transport_socket_factory, - std::vector&& filters_factory) + std::vector&& filters_factory, int64_t tag = 0) : transport_socket_factory_(std::move(transport_socket_factory)), - filters_factory_(std::move(filters_factory)) {} + filters_factory_(std::move(filters_factory)), tag_(tag) {} // Network::FilterChain const Network::TransportSocketFactory& transportSocketFactory() const override { @@ -150,9 +168,12 @@ class FilterChainImpl : public Network::FilterChain { return filters_factory_; } + int64_t getTag() const override { return tag_; } + private: const Network::TransportSocketFactoryPtr transport_socket_factory_; const std::vector filters_factory_; + const int64_t tag_{0}; }; } // namespace Server diff --git a/source/server/http/admin.h b/source/server/http/admin.h index 3ace506ef6efd..cbba86ec18734 100644 --- a/source/server/http/admin.h +++ b/source/server/http/admin.h @@ -332,6 +332,12 @@ class AdminImpl : public Admin, // Network::ListenerConfig Network::FilterChainManager& filterChainManager() override { return parent_; } + // TODO: replace this if the interface is agreed + Network::FilterChainManagerSharedPtr sharedFilterChainManager() override { + return Network::FilterChainManagerSharedPtr(&parent_, + // A fake deleter + [](auto) {}); + } Network::FilterChainFactory& filterChainFactory() override { return parent_; } Network::Socket& socket() override { return parent_.mutable_socket(); } const Network::Socket& socket() const override { return parent_.mutable_socket(); } diff --git a/source/server/listener_impl.cc b/source/server/listener_impl.cc index bec30d337b542..7acdba4f938be 100644 --- a/source/server/listener_impl.cc +++ b/source/server/listener_impl.cc @@ -31,7 +31,8 @@ ListenerImpl::ListenerImpl(const envoy::api::v2::Listener& config, const std::st bool workers_started, uint64_t hash, ProtobufMessage::ValidationVisitor& validation_visitor) : parent_(parent), address_(Network::Address::resolveProtoAddress(config.address())), - filter_chain_manager_(address_), + filter_chain_manager_(std::make_shared(address_)), + fcm_tls_(parent_.server_.threadLocal().allocateSlot()), socket_type_(Network::Utility::protobufAddressSocketType(config.address())), global_scope_(parent_.server_.stats().createScope("")), listener_scope_( @@ -122,8 +123,8 @@ ListenerImpl::ListenerImpl(const envoy::api::v2::Listener& config, const std::st parent_.server_.random(), parent_.server_.stats(), parent_.server_.singletonManager(), parent_.server_.threadLocal(), validation_visitor, parent_.server_.api()); factory_context.setInitManager(initManager()); - ListenerFilterChainFactoryBuilder builder(*this, factory_context); - filter_chain_manager_.addFilterChain(config.filter_chains(), builder); + ListenerFilterChainFactoryBuilder builder(*this, factory_context, filter_chain_tag_generator_); + filter_chain_manager_->addFilterChain(config.filter_chains(), builder); if (socket_type_ == Network::Address::SocketType::Datagram) { return; @@ -283,6 +284,61 @@ void ListenerImpl::initialize() { } } +bool ListenerImpl::takeOver(const envoy::api::v2::Listener& config) { + auto fcm_helper = std::make_shared(); + fcm_helper->fcm_init_manager_ = std::make_unique("fcm_take_over"); + fcm_helper->filter_chain_manager_ = std::make_shared(address_); + Server::Configuration::TransportSocketFactoryContextImpl factory_context( + parent_.server_.admin(), parent_.server_.sslContextManager(), *listener_scope_, + parent_.server_.clusterManager(), parent_.server_.localInfo(), parent_.server_.dispatcher(), + parent_.server_.random(), parent_.server_.stats(), parent_.server_.singletonManager(), + parent_.server_.threadLocal(), + parent_.server_.messageValidationContext().dynamicValidationVisitor(), parent_.server_.api()); + factory_context.setInitManager(*fcm_helper->fcm_init_manager_); + ListenerFilterChainFactoryBuilder builder(*this, factory_context, filter_chain_tag_generator_); + auto tags = + builder.submitFilterChains(*fcm_helper->filter_chain_manager_, config.filter_chains()); + + // TODO(lambdai): determine the correct strategy for concurrent take over. Cancel previous or + // allow it? + pending_fcms_.emplace_back(fcm_helper); + + fcm_helper->fcm_init_watcher_ = std::make_unique( + "fcm_take_over", [fcm_helper, tags = std::move(tags), this]() { + /* + tls_->runOnAllThreads([update_fn]() + -> ThreadLocal::ThreadLocalObjectSharedPtr { + auto prev_thread_local_config = std::dynamic_pointer_cast(previous); + prev_thread_local_config->config_ = update_fn(prev_thread_local_config->config_); + return previous; + }); + */ + fcm_tls_->runOnAllThreads([fcm_helper]( + ThreadLocal::ThreadLocalObjectSharedPtr previous_fcm) mutable + -> ThreadLocal::ThreadLocalObjectSharedPtr { + auto prev_thread_local_config = + std::dynamic_pointer_cast(previous_fcm); + auto new_thread_local_config = std::make_shared(); + UNREFERENCED_PARAMETER(prev_thread_local_config); + // new_thread_local_config.replace(prev_thread_local_config); + return new_thread_local_config; + // replace: + // fetch the listener instance + // moving connection according to tag + // drain + // init: + // fill listener instance + }); + parent_.updateFilterChainManager(listener_tag_, *fcm_helper, tags); + // Remove all the fcms which were added prior to this fcm, if any. + // This logic is correct no matter the insert strategy above. + pending_fcms_.erase(pending_fcms_.begin(), + std::find(pending_fcms_.begin(), pending_fcms_.end(), fcm_helper)); + }); + fcm_helper->fcm_init_manager_->initialize(*fcm_helper->fcm_init_watcher_); + return true; +} + Init::Manager& ListenerImpl::initManager() { // See initialize() for why we choose different init managers to return. if (workers_started_) { diff --git a/source/server/listener_impl.h b/source/server/listener_impl.h index 8cbe954f7226f..9a29930ebe192 100644 --- a/source/server/listener_impl.h +++ b/source/server/listener_impl.h @@ -12,6 +12,7 @@ #include "common/init/manager_impl.h" #include "server/filter_chain_manager_impl.h" +#include "server/tag_generator_batch_impl.h" namespace Envoy { namespace Server { @@ -64,6 +65,15 @@ class ListenerImpl : public Network::ListenerConfig, return ret; } + /** + * Update this active listener with the new listener proto config. + * May throw EnvoyException. + * + * @param config the new listener config + * @return true if update succeeds + */ + bool takeOver(const envoy::api::v2::Listener& config); + Network::Address::InstanceConstSharedPtr address() const { return address_; } Network::Address::SocketType socketType() const { return socket_type_; } const envoy::api::v2::Listener& config() const { return config_; } @@ -77,7 +87,10 @@ class ListenerImpl : public Network::ListenerConfig, const std::string& versionInfo() const { return version_info_; } // Network::ListenerConfig - Network::FilterChainManager& filterChainManager() override { return filter_chain_manager_; } + Network::FilterChainManager& filterChainManager() override { return *filter_chain_manager_; } + Network::FilterChainManagerSharedPtr sharedFilterChainManager() override { + return fcm_tls_->getTyped().filter_chain_manager_; + } Network::FilterChainFactory& filterChainFactory() override { return *this; } Network::Socket& socket() override { return *socket_; } const Network::Socket& socket() const override { return *socket_; } @@ -151,7 +164,7 @@ class ListenerImpl : public Network::ListenerConfig, private: void addListenSocketOption(const Network::Socket::OptionConstSharedPtr& option) { ensureSocketOptions(); - listen_socket_options_->emplace_back(std::move(option)); + listen_socket_options_->emplace_back(option); } void addListenSocketOptions(const Network::Socket::OptionsSharedPtr& options) { ensureSocketOptions(); @@ -159,8 +172,13 @@ class ListenerImpl : public Network::ListenerConfig, } ListenerManagerImpl& parent_; + TagGeneratorBatchImpl filter_chain_tag_generator_; Network::Address::InstanceConstSharedPtr address_; - FilterChainManagerImpl filter_chain_manager_; + // Active filter_chain_manager + std::shared_ptr filter_chain_manager_; + // The pending filter chain managers in the order of insertion. + std::list> pending_fcms_; + ThreadLocal::SlotPtr fcm_tls_; Network::Address::SocketType socket_type_; Network::SocketSharedPtr socket_; diff --git a/source/server/listener_manager_impl.cc b/source/server/listener_manager_impl.cc index 488e2e8a8f8ff..e52dea26183dc 100644 --- a/source/server/listener_manager_impl.cc +++ b/source/server/listener_manager_impl.cc @@ -65,7 +65,6 @@ void fillState(envoy::admin::v2alpha::ListenersConfigDump_DynamicListenerState& state.mutable_listener()->MergeFrom(listener.config()); TimestampUtil::systemClockToTimestamp(listener.last_updated_, *(state.mutable_last_updated())); } - } // namespace std::vector ProdListenerComponentFactory::createNetworkFilterFactoryList_( @@ -341,7 +340,13 @@ bool ListenerManagerImpl::addOrUpdateListenerInternal(const envoy::api::v2::List ENVOY_LOG(debug, "duplicate/locked listener '{}'. no add/update", name); return false; } + if (existing_active_listener != active_listeners_.end() && + couldTakeOver(**existing_active_listener, config)) { + (*existing_active_listener)->takeOver(config); + // TODO: after take over + // metric, initialize + } ListenerImplPtr new_listener(new ListenerImpl( config, version_info, *this, name, added_via_api, workers_started_, hash, added_via_api ? server_.messageValidationContext().dynamicValidationVisitor() @@ -441,6 +446,35 @@ bool ListenerManagerImpl::addOrUpdateListenerInternal(const envoy::api::v2::List return true; } +// TODO(lambdai): Improve efficiency and false negative. +bool ListenerManagerImpl::isFilterChainOnlyUpdate(const envoy::api::v2::Listener& existing_config, + const envoy::api::v2::Listener& new_config) { + auto lhs = existing_config; + auto rhs = new_config; + lhs.clear_filter_chains(); + rhs.clear_filter_chains(); + return lhs.DebugString() == rhs.DebugString(); + + /* try below + bool IsEmptyDigitalMediaData( + const ads_proto::DigitalMediaData& digital_media_data) { + // Note that we set the app_id and vendor of the digital media data, so that + // the AppSignal is self-describing. Ignore these fields. + proto2::util::MessageDifferencer differ; + differ.IgnoreField(kAppIdFieldDescriptor); + differ.IgnoreField(kVendorFieldDescriptor); + + return differ.Compare(digital_media_data, + ads_proto::DigitalMediaData::default_instance()); + } + */ +} + +bool ListenerManagerImpl::couldTakeOver(ListenerImpl& existing_listener, + const envoy::api::v2::Listener& new_config) { + return isFilterChainOnlyUpdate(existing_listener.config(), new_config); +} + bool ListenerManagerImpl::hasListenerWithAddress(const ListenerList& list, const Network::Address::Instance& address) { for (const auto& listener : list) { @@ -664,6 +698,26 @@ void ListenerManagerImpl::stopListener(Network::ListenerConfig& listener, } } +bool ListenerManagerImpl::updateFilterChainManager( + uint64_t listener_tag, ThreadLocalFilterChainManagerHelper& filter_chain_helper, + TagGenerator::Tags filter_chain_tags) { + UNREFERENCED_PARAMETER(filter_chain_helper); + UNREFERENCED_PARAMETER(filter_chain_tags); + for (const auto& worker : workers_) { + worker->updateListener( + listener_tag, + /* update listener func */ + [&filter_chain_helper, + filter_chain_tags](Network::ConnectionHandler::ActiveListener&) -> bool { + UNREFERENCED_PARAMETER(filter_chain_helper); + return true; + }, + /* completion */ + [](bool) {}); + } + return true; +} + void ListenerManagerImpl::stopListeners(StopListenersType stop_listeners_type) { stop_listeners_type_ = stop_listeners_type; for (Network::ListenerConfig& listener : listeners()) { @@ -706,11 +760,28 @@ void ListenerManagerImpl::endListenerUpdate(FailureStates&& failure_states) { ListenerFilterChainFactoryBuilder::ListenerFilterChainFactoryBuilder( ListenerImpl& listener, - Server::Configuration::TransportSocketFactoryContextImpl& factory_context) - : parent_(listener), factory_context_(factory_context) {} + Server::Configuration::TransportSocketFactoryContextImpl& factory_context, + TagGeneratorBatchImpl& tag_generator) + : parent_(listener), factory_context_(factory_context), tag_generator_(tag_generator) {} std::unique_ptr ListenerFilterChainFactoryBuilder::buildFilterChain( const ::envoy::api::v2::listener::FilterChain& filter_chain) const { + ListenerFilterChainFactoryBuilder::InternalBuilder builder(*this); + return builder.buildFilterChainInternal(filter_chain, 0); +} + +TagGenerator::Tags ListenerFilterChainFactoryBuilder::submitFilterChains( + FilterChainManagerImpl& fcm, + absl::Span filter_chain_span) { + + auto tags = tag_generator_.addFilterChains(filter_chain_span); + fcm.addFilterChain(filter_chain_span, *this); + return tags; +} + +std::unique_ptr +ListenerFilterChainFactoryBuilder::InternalBuilder::buildFilterChainInternal( + const ::envoy::api::v2::listener::FilterChain& filter_chain, uint64_t tag) const { // If the cluster doesn't have transport socket configured, then use the default "raw_buffer" // transport socket or BoringSSL-based "tls" transport socket if TLS settings are configured. // We copy by value first then override if necessary. @@ -728,15 +799,23 @@ std::unique_ptr ListenerFilterChainFactoryBuilder::buildFi auto& config_factory = Config::Utility::getAndCheckFactory< Server::Configuration::DownstreamTransportSocketConfigFactory>(transport_socket.name()); ProtobufTypes::MessagePtr message = Config::Utility::translateToFactoryConfig( - transport_socket, parent_.messageValidationVisitor(), config_factory); + transport_socket, outer_builder_.parent_.messageValidationVisitor(), config_factory); std::vector server_names(filter_chain.filter_chain_match().server_names().begin(), filter_chain.filter_chain_match().server_names().end()); return std::make_unique( - config_factory.createTransportSocketFactory(*message, factory_context_, + config_factory.createTransportSocketFactory(*message, outer_builder_.factory_context_, std::move(server_names)), - parent_.parent_.factory_.createNetworkFilterFactoryList(filter_chain.filters(), parent_)); + outer_builder_.parent_.parent_.factory_.createNetworkFilterFactoryList( + filter_chain.filters(), outer_builder_.parent_), + tag); +} + +/* static */ +const ListenerFilterChainFactoryBuilder::TagsIndex& +ListenerFilterChainFactoryBuilder::InternalBuilder::emptyTags() { + CONSTRUCT_ON_FIRST_USE(TagsIndex, {}); } } // namespace Server diff --git a/source/server/listener_manager_impl.h b/source/server/listener_manager_impl.h index 37bf491e4a7a5..be67f516d8727 100644 --- a/source/server/listener_manager_impl.h +++ b/source/server/listener_manager_impl.h @@ -120,9 +120,18 @@ class ListenerManagerImpl : public ListenerManager, Logger::Loggable; ListenerFilterChainFactoryBuilder( - ListenerImpl& listener, Configuration::TransportSocketFactoryContextImpl& factory_context); + ListenerImpl& listener, Configuration::TransportSocketFactoryContextImpl& factory_context, + TagGeneratorBatchImpl& tag_generator); + std::unique_ptr buildFilterChain(const ::envoy::api::v2::listener::FilterChain& filter_chain) const override; + // consider rewrite ListenerFilterChainFactoryBuilder so that submitFilterChains is the only + // interface. + TagGenerator::Tags submitFilterChains( + FilterChainManagerImpl& fcm, + absl::Span filter_chain_span); + private: + class InternalBuilder : public FilterChainFactoryBuilder { + public: + InternalBuilder(const ListenerFilterChainFactoryBuilder& outer_builder, + const TagsIndex& filter_chain_index = emptyTags()) + : outer_builder_(outer_builder), filter_chain_index_(filter_chain_index) {} + + std::unique_ptr + buildFilterChain(const ::envoy::api::v2::listener::FilterChain& filter_chain) const override { + auto iter = filter_chain_index_.find(&filter_chain); + auto tag = iter != filter_chain_index_.end() ? iter->second : 0; + return buildFilterChainInternal(filter_chain, tag); + } + static const TagsIndex& emptyTags(); + const ListenerFilterChainFactoryBuilder& outer_builder_; + const TagsIndex& filter_chain_index_; + + std::unique_ptr + buildFilterChainInternal(const ::envoy::api::v2::listener::FilterChain& filter_chain, + uint64_t tag) const; + }; ListenerImpl& parent_; Configuration::TransportSocketFactoryContextImpl& factory_context_; + TagGeneratorBatchImpl& tag_generator_; }; } // namespace Server diff --git a/source/server/tag_generator.h b/source/server/tag_generator.h new file mode 100644 index 0000000000000..c4998bb1f1683 --- /dev/null +++ b/source/server/tag_generator.h @@ -0,0 +1,23 @@ +#pragma once + +#include +#include + +#include "envoy/common/pure.h" + +namespace Envoy { +namespace Server { + +/** + * Generate tag for filter chain. The tag is used to identify the connections belong to the tag + * asociated filter chain. If two filter chains are different, their tags must be different. Two + * identical filter chains should shared the same tag but it is not required. + */ +class TagGenerator { +public: + using Tags = std::set; + virtual ~TagGenerator() = default; + virtual Tags getTags() PURE; +}; +} // namespace Server +} // namespace Envoy \ No newline at end of file diff --git a/source/server/tag_generator_batch_impl.cc b/source/server/tag_generator_batch_impl.cc new file mode 100644 index 0000000000000..2da6659af8699 --- /dev/null +++ b/source/server/tag_generator_batch_impl.cc @@ -0,0 +1,26 @@ +#include "server/tag_generator_batch_impl.h" + +namespace Envoy { +namespace Server { +TagGeneratorBatchImpl::~TagGeneratorBatchImpl() = default; + +TagGenerator::Tags TagGeneratorBatchImpl::addFilterChains( + absl::Span filter_chain_span) { + TagGenerator::Tags tags; + for (const auto& fc : filter_chain_span) { + const auto& kv = filter_chains_.emplace(*fc, ++next_tag_); + tags.insert(kv.second ? next_tag_ : kv.first->second); + } + return tags; +} + +TagGenerator::Tags TagGeneratorBatchImpl::getTags() { + TagGenerator::Tags res; + for (const auto& kv : filter_chains_) { + res.insert(kv.second); + } + return res; +} + +} // namespace Server +} // namespace Envoy diff --git a/source/server/tag_generator_batch_impl.h b/source/server/tag_generator_batch_impl.h new file mode 100644 index 0000000000000..05125d18cc274 --- /dev/null +++ b/source/server/tag_generator_batch_impl.h @@ -0,0 +1,30 @@ +#pragma once + +#include "envoy/api/v2/listener/listener.pb.h" + +#include "common/protobuf/utility.h" + +#include "tag_generator.h" + +namespace Envoy { +namespace Server { + +/** + * Generate tag for filter chain. The tag is used to identify the connections belong to the tag + * asociated filter chain. If two filter chains are different, their tags must be different. Two + * identical filter chains should shared the same tag but it is not required. + */ +class TagGeneratorBatchImpl : public TagGenerator { +public: + ~TagGeneratorBatchImpl() override; + TagGenerator::Tags getTags() override; + TagGenerator::Tags addFilterChains( + absl::Span filter_chain_span); + +private: + std::unordered_map + filter_chains_; + uint64_t next_tag_{0}; +}; +} // namespace Server +} // namespace Envoy diff --git a/source/server/worker_impl.cc b/source/server/worker_impl.cc index 58371525c6b0f..96e58a6d140f7 100644 --- a/source/server/worker_impl.cc +++ b/source/server/worker_impl.cc @@ -99,6 +99,15 @@ void WorkerImpl::stopListener(Network::ListenerConfig& listener, std::function listener_update_func, + std::function completion) { + dispatcher_->post([this, listener_tag, listener_update_func, completion]() -> void { + completion(handler_->updateListener(listener_tag, listener_update_func)); + }); +} + void WorkerImpl::threadRoutine(GuardDog& guard_dog) { ENVOY_LOG(debug, "worker entering dispatch loop"); // The watch dog must be created after the dispatcher starts running and has post events flushed, diff --git a/source/server/worker_impl.h b/source/server/worker_impl.h index 9d9720a261ee1..d64cb512a846f 100644 --- a/source/server/worker_impl.h +++ b/source/server/worker_impl.h @@ -49,6 +49,10 @@ class WorkerImpl : public Worker, Logger::Loggable { void initializeStats(Stats::Scope& scope, const std::string& prefix) override; void stop() override; void stopListener(Network::ListenerConfig& listener, std::function completion) override; + void updateListener( + uint64_t listener_tag, + std::function listener_update_func, + std::function completion) override; private: void threadRoutine(GuardDog& guard_dog); diff --git a/test/mocks/network/mocks.h b/test/mocks/network/mocks.h index 2387e363db9d1..cf0ad5335194e 100644 --- a/test/mocks/network/mocks.h +++ b/test/mocks/network/mocks.h @@ -301,6 +301,7 @@ class MockListenerConfig : public ListenerConfig { ~MockListenerConfig() override; MOCK_METHOD0(filterChainManager, FilterChainManager&()); + MOCK_METHOD0(sharedFilterChainManager, FilterChainManagerSharedPtr()); MOCK_METHOD0(filterChainFactory, FilterChainFactory&()); MOCK_METHOD0(socket, Socket&()); MOCK_CONST_METHOD0(socket, const Socket&()); diff --git a/test/server/connection_handler_test.cc b/test/server/connection_handler_test.cc index 4ce3f337b79cb..d9ec1c9225dd3 100644 --- a/test/server/connection_handler_test.cc +++ b/test/server/connection_handler_test.cc @@ -864,6 +864,30 @@ TEST_F(ConnectionHandlerTest, UdpListenerNoFilterThrowsException) { } } +TEST_F(ConnectionHandlerTest, ActiveTcpSocketDestroyAcceptFilters) { + Network::MockListener* listener = new NiceMock(); + Network::ListenerCallbacks* listener_callbacks; + TestListener* test_listener = addListener(1, true, false, "test_listener"); + EXPECT_CALL(dispatcher_, createListener_(_, _, _)) + .WillOnce( + Invoke([&](Network::Socket&, Network::ListenerCallbacks& cb, bool) -> Network::Listener* { + listener_callbacks = &cb; + return listener; + })); + EXPECT_CALL(test_listener->socket_, localAddress()); + handler_->addListener(*test_listener); + + ConnectionHandlerImpl connection_handler_impl; + ActiveTcpListener tcp_listener(connection_handler_impl, test_listener); + Network::MockConnectionSocket* connection = new NiceMock(); + Network::ConnectionSocketPtr connection_socket(connection); + ActiveTcpSocket tcp_socket(tcp_listener, std::move(connection_socket), false); + EXPECT_CALL(dispatcher, createServerConnection(_, _)).InvokeThat([]() { + EXPECT_TRUE(tcp_socket.accept_filter_.empty()); + }); + tcp_socket.onTimeout(); +} + } // namespace } // namespace Server } // namespace Envoy diff --git a/test/server/listener_manager_impl_test.cc b/test/server/listener_manager_impl_test.cc index 3bc26e054a449..4bdb662c27992 100644 --- a/test/server/listener_manager_impl_test.cc +++ b/test/server/listener_manager_impl_test.cc @@ -3483,6 +3483,34 @@ TEST_F(ListenerManagerImplWithRealFiltersTest, VerifyIgnoreExpirationWithCA) { EXPECT_NO_THROW(manager_->addOrUpdateListener(parseListenerFromV2Yaml(yaml), "", true)); } +TEST(ListenerManagerUtils, CompareListenerConfigWithSelf) { + envoy::api::v2::Listener c0; + ASSERT_TRUE(ListenerManagerImpl::isFilterChainOnlyUpdate(c0, c0)); + c0.set_continue_on_listener_filters_timeout(true); + ASSERT_TRUE(ListenerManagerImpl::isFilterChainOnlyUpdate(c0, c0)); + c0.add_filter_chains(); + ASSERT_TRUE(ListenerManagerImpl::isFilterChainOnlyUpdate(c0, c0)); +} + +TEST(ListenerManagerUtils, CompareListenerConfigWithNetworkFilters) { + envoy::api::v2::Listener c0; + envoy::api::v2::Listener c1; + c0.add_filter_chains()->set_name("common"); + c1.add_filter_chains()->set_name("common"); + ASSERT_TRUE(ListenerManagerImpl::isFilterChainOnlyUpdate(c0, c1)); + c0.add_filter_chains()->set_name("c0"); + c1.add_filter_chains()->set_name("c1"); + ASSERT_TRUE(ListenerManagerImpl::isFilterChainOnlyUpdate(c0, c1)); +} + +TEST(ListenerManagerUtils, CompareListenerConfigWithOtherFields) { + envoy::api::v2::Listener c0; + envoy::api::v2::Listener c1; + c0.set_continue_on_listener_filters_timeout(true); + c1.set_continue_on_listener_filters_timeout(false); + ASSERT_FALSE(ListenerManagerImpl::isFilterChainOnlyUpdate(c0, c1)); +} + } // namespace } // namespace Server } // namespace Envoy