diff --git a/all_files.vcxitems b/all_files.vcxitems index db938d2..ef2b93b 100644 --- a/all_files.vcxitems +++ b/all_files.vcxitems @@ -78,10 +78,6 @@ - - - - @@ -163,7 +159,6 @@ - @@ -207,7 +202,6 @@ - diff --git a/cdc_rsync/BUILD b/cdc_rsync/BUILD index 33e65a2..f2a451e 100644 --- a/cdc_rsync/BUILD +++ b/cdc_rsync/BUILD @@ -113,7 +113,6 @@ cc_library( deps = [ ":cdc_rsync_client", "//common:build_version", - "//common:port_range_parser", "@com_github_zstd//:zstd", "@com_google_absl//absl/status", ], diff --git a/cdc_rsync/params.cc b/cdc_rsync/params.cc index 7633098..7aba1ff 100644 --- a/cdc_rsync/params.cc +++ b/cdc_rsync/params.cc @@ -21,7 +21,6 @@ #include "absl/strings/str_split.h" #include "common/build_version.h" #include "common/path.h" -#include "common/port_range_parser.h" #include "common/remote_util.h" #include "lib/zstd.h" diff --git a/cdc_stream/BUILD b/cdc_stream/BUILD index e5e4fcf..f664072 100644 --- a/cdc_stream/BUILD +++ b/cdc_stream/BUILD @@ -24,7 +24,6 @@ cc_library( hdrs = ["base_command.h"], deps = [ "//absl_helper:jedec_size_flag", - "//common:port_range_parser", "@com_github_lyra//:lyra", "@com_google_absl//absl/status", "@com_google_absl//absl/strings:str_format", @@ -210,7 +209,6 @@ cc_library( "//common:log", "//common:path", "//common:path_filter", - "//common:port_manager", "//common:process", "//common:remote_util", "//common:sdk_util", diff --git a/cdc_stream/asset_stream_config.cc b/cdc_stream/asset_stream_config.cc index 7cd33f2..096e99c 100644 --- a/cdc_stream/asset_stream_config.cc +++ b/cdc_stream/asset_stream_config.cc @@ -50,11 +50,7 @@ void AssetStreamConfig::RegisterCommandLineFlags(lyra::command& cmd, "asset stream service, default: " + std::to_string(service_port_))); - cmd.add_argument(lyra::opt(base_command.PortRangeParser( - "--forward-port", - &session_cfg_.deprecated_forward_port_first, - &session_cfg_.deprecated_forward_port_last), - "port") + cmd.add_argument(lyra::opt(session_cfg_.deprecated_forward_port_range, "port") .name("--forward-port") .help("[Deprecated, ignored] TCP port or range used for " "SSH port forwarding")); diff --git a/cdc_stream/base_command.cc b/cdc_stream/base_command.cc index 6f0ff33..c3ce5b3 100644 --- a/cdc_stream/base_command.cc +++ b/cdc_stream/base_command.cc @@ -17,7 +17,6 @@ #include "absl/strings/str_format.h" #include "absl/strings/str_split.h" #include "absl_helper/jedec_size_flag.h" -#include "common/port_range_parser.h" #include "lyra/lyra.hpp" namespace cdc_ft { @@ -57,18 +56,6 @@ std::function BaseCommand::JedecParser( }; } -std::function BaseCommand::PortRangeParser( - const char* flag_name, uint16_t* first, uint16_t* last) { - return [flag_name, first, last, - error = &parse_error_](const std::string& value) { - if (!port_range::Parse(value.c_str(), first, last)) { - *error = absl::StrFormat( - "Failed to parse %s=%s, expected or -", - flag_name, value); - } - }; -} - std::function BaseCommand::PosArgValidator( std::string* str) { return [str, invalid_arg = &invalid_arg_](const std::string& value) { diff --git a/cdc_stream/base_command.h b/cdc_stream/base_command.h index f82d824..c1f2b56 100644 --- a/cdc_stream/base_command.h +++ b/cdc_stream/base_command.h @@ -48,13 +48,6 @@ class BaseCommand { std::function JedecParser(const char* flag_name, uint64_t* bytes); - // Parser for single ports "123" or port ranges "123-234". Usage: - // lyra::opt(PortRangeParser("port-flag", &first, &last), "port")) - // Automatically reports a parse failure on error. - std::function PortRangeParser(const char* flag_name, - uint16_t* first, - uint16_t* last); - // Validator that should be used for all positional arguments. Lyra interprets // -u, --unknown_flag as positional argument. This validator makes sure that // a positional argument starting with - is reported as an error. Otherwise, diff --git a/cdc_stream/multi_session.cc b/cdc_stream/multi_session.cc index dc02e55..be6c93f 100644 --- a/cdc_stream/multi_session.cc +++ b/cdc_stream/multi_session.cc @@ -20,7 +20,6 @@ #include "common/path.h" #include "common/path_filter.h" #include "common/platform.h" -#include "common/port_manager.h" #include "common/process.h" #include "common/server_socket.h" #include "common/util.h" diff --git a/cdc_stream/session.cc b/cdc_stream/session.cc index 8a9c667..51adeb7 100644 --- a/cdc_stream/session.cc +++ b/cdc_stream/session.cc @@ -16,7 +16,6 @@ #include "cdc_stream/cdc_fuse_manager.h" #include "common/log.h" -#include "common/port_manager.h" #include "common/status.h" #include "common/status_macros.h" #include "metrics/enums.h" diff --git a/cdc_stream/session_config.h b/cdc_stream/session_config.h index 3222a16..cb7efb9 100644 --- a/cdc_stream/session_config.h +++ b/cdc_stream/session_config.h @@ -59,8 +59,7 @@ struct SessionConfig { // Ports used for local port forwarding. Deprecated as forward ports are // determined automatically now using ephemeral ports. - uint16_t deprecated_forward_port_first = 0; - uint16_t deprecated_forward_port_last = 0; + std::string deprecated_forward_port_range; }; } // namespace cdc_ft diff --git a/common/BUILD b/common/BUILD index 640e7ba..aa9f9fc 100644 --- a/common/BUILD +++ b/common/BUILD @@ -300,54 +300,6 @@ cc_library( hdrs = ["platform.h"], ) -cc_library( - name = "port_manager", - srcs = ["port_manager_win.cc"], - hdrs = ["port_manager.h"], - target_compatible_with = ["@platforms//os:windows"], - deps = [ - ":arch_type", - ":remote_util", - ":status", - ":stopwatch", - ":util", - "@com_google_absl//absl/status:statusor", - ], -) - -cc_test( - name = "port_manager_test", - srcs = ["port_manager_test.cc"], - target_compatible_with = ["@platforms//os:windows"], - deps = [ - ":port_manager", - ":status_test_macros", - ":stub_process", - ":test_main", - ":testing_clock", - "@com_google_googletest//:gtest", - ], -) - -cc_library( - name = "port_range_parser", - srcs = ["port_range_parser.cc"], - hdrs = ["port_range_parser.h"], - deps = [ - "@com_google_absl//absl/strings", - ], -) - -cc_test( - name = "port_range_parser_test", - srcs = ["port_range_parser_test.cc"], - deps = [ - ":port_range_parser", - ":test_main", - "@com_google_googletest//:gtest", - ], -) - cc_library( name = "process", srcs = ["process_win.cc"], diff --git a/common/port_manager.h b/common/port_manager.h deleted file mode 100644 index 19627cd..0000000 --- a/common/port_manager.h +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Copyright 2022 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef COMMON_PORT_MANAGER_H_ -#define COMMON_PORT_MANAGER_H_ - -#include -#include -#include - -#include "absl/status/statusor.h" -#include "common/arch_type.h" -#include "common/clock.h" - -namespace cdc_ft { - -class ProcessFactory; -class RemoteUtil; -class SharedMemory; - -// Class for reserving ports globally. Use if there can be multiple processes -// of the same type that might request ports at the same time, e.g. multiple -// cdc_rsync.exe processes running concurrently. -class PortManager { - public: - // |unique_name| is a globally unique name used for shared memory to - // synchronize port reservation. The range of possible ports managed by this - // instance is [|first_port|, |last_port|]. |process_factory| is a valid - // pointer to a ProcessFactory instance to run processes locally. - // |remote_util| is the RemoteUtil instance to run processes remotely. If it - // is nullptr, no remote ports are reserved. - PortManager(std::string unique_name, int first_port, int last_port, - ProcessFactory* process_factory, RemoteUtil* remote_util, - SystemClock* system_clock = DefaultSystemClock::GetInstance(), - SteadyClock* steady_clock = DefaultSteadyClock::GetInstance()); - ~PortManager(); - - // Reserves a port in the range passed to the constructor. The port is - // released automatically upon destruction if ReleasePort() is not called - // explicitly. - // |remote_timeout_sec| is the timeout for finding available ports on the - // remote instance. - // |remote_arch_type| is the architecture of the remote device. - // Both |remote_timeout_sec| and |remote_arch_type| are ignored if - // |remote_util| is nullptr. Returns a DeadlineExceeded error if the timeout - // is exceeded. Returns a ResourceExhausted error if no ports are available. - absl::StatusOr ReservePort(int remote_timeout_sec, - ArchType remote_arch_type); - - // Releases a reserved port. - absl::Status ReleasePort(int port); - - // - // Lower-level interface for finding available ports directly. - // - - // Finds available ports in the range [first_port, last_port] for port - // forwarding on the local workstation. - // |arch_type| is the architecture of the local device. - // |process_factory| is used to create a netstat process. - // Returns ResourceExhaustedError if no port is available. - static absl::StatusOr> FindAvailableLocalPorts( - int first_port, int last_port, ArchType arch_type, - ProcessFactory* process_factory); - - // Finds available ports in the range [first_port, last_port] for port - // forwarding on the instance. - // |arch_type| is the architecture of the remote device. - // |process_factory| is used to create a netstat process. - // |remote_util| is used to connect to the instance. - // |timeout_sec| is the connection timeout in seconds. - // Returns a DeadlineExceeded error if the timeout is exceeded. - // Returns ResourceExhaustedError if no port is available. - static absl::StatusOr> FindAvailableRemotePorts( - int first_port, int last_port, ArchType arch_type, - ProcessFactory* process_factory, RemoteUtil* remote_util, int timeout_sec, - SteadyClock* steady_clock = DefaultSteadyClock::GetInstance()); - - private: - // Returns a list of available ports in the range [|first_port|, |last_port|] - // from the given |netstat_output|. - // |arch_type| is the architecture of the device where netstat was called. - // Returns ResourceExhaustedError if no port is available. - static absl::StatusOr> FindAvailablePorts( - int first_port, int last_port, const std::string& netstat_output, - ArchType arch_type); - - int first_port_; - int last_port_; - ProcessFactory* process_factory_; - RemoteUtil* remote_util_; - SystemClock* system_clock_; - SteadyClock* steady_clock_; - std::unique_ptr shared_mem_; - std::unordered_set reserved_ports_; -}; - -} // namespace cdc_ft - -#endif // COMMON_PORT_MANAGER_H_ diff --git a/common/port_manager_test.cc b/common/port_manager_test.cc deleted file mode 100644 index 94f40e7..0000000 --- a/common/port_manager_test.cc +++ /dev/null @@ -1,327 +0,0 @@ -// Copyright 2022 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "common/port_manager.h" - -#include "absl/strings/match.h" -#include "common/log.h" -#include "common/remote_util.h" -#include "common/status_test_macros.h" -#include "common/stub_process.h" -#include "common/testing_clock.h" -#include "gtest/gtest.h" - -namespace cdc_ft { -namespace { - -constexpr char kUserHost[] = "user@1.2.3.4"; - -constexpr char kGuid[] = "f77bcdfe-368c-4c45-9f01-230c5e7e2132"; -constexpr int kFirstPort = 44450; -constexpr int kLastPort = 44459; -constexpr int kNumPorts = kLastPort - kFirstPort + 1; - -constexpr int kTimeoutSec = 1; - -constexpr char kWindowsNetstat[] = "netstat -a -n -p tcp"; -constexpr char kLinuxNetstatOrSS[] = - "which ss2 && ss --numeric --listening --tcp || netstat --numeric " - "--listening --tcp"; - -constexpr char kWindowsNetstatOutFmt[] = - "TCP 127.0.0.1:50000 127.0.0.1:%i ESTABLISHED"; -constexpr char kLinuxNetstatOutFmt[] = - "tcp 0 0 0.0.0.0:%i 0.0.0.0:* LISTEN"; -constexpr char kLinuxSSOutFmt[] = - "LISTEN 0 128 " - " 0.0.0.0:%i 0.0.0.0:* "; - -class PortManagerTest : public ::testing::Test { - public: - PortManagerTest() - : remote_util_(kUserHost, /*verbosity=*/0, /*quiet=*/false, - &process_factory_, - /*forward_output_to_log=*/true), - port_manager_(kGuid, kFirstPort, kLastPort, &process_factory_, - &remote_util_, &system_clock_, &steady_clock_) {} - - void SetUp() override { - Log::Initialize(std::make_unique(LogLevel::kInfo)); - } - - void TearDown() override { Log::Shutdown(); } - - protected: - StubProcessFactory process_factory_; - TestingSystemClock system_clock_; - TestingSteadyClock steady_clock_; - RemoteUtil remote_util_; - PortManager port_manager_; -}; - -TEST_F(PortManagerTest, ReservePortSuccess) { - process_factory_.SetProcessOutput(kWindowsNetstat, "", "", 0); - process_factory_.SetProcessOutput(kLinuxNetstatOrSS, "", "", 0); - - absl::StatusOr port = - port_manager_.ReservePort(kTimeoutSec, ArchType::kLinux_x86_64); - ASSERT_OK(port); - EXPECT_EQ(*port, kFirstPort); -} - -TEST_F(PortManagerTest, ReservePortAllLocalPortsTaken) { - std::string local_netstat_out = ""; - for (int port = kFirstPort; port <= kLastPort; ++port) { - local_netstat_out += absl::StrFormat(kWindowsNetstatOutFmt, port); - } - process_factory_.SetProcessOutput(kWindowsNetstat, local_netstat_out, "", 0); - process_factory_.SetProcessOutput(kLinuxNetstatOrSS, "", "", 0); - - absl::StatusOr port = - port_manager_.ReservePort(kTimeoutSec, ArchType::kLinux_x86_64); - EXPECT_TRUE(absl::IsResourceExhausted(port.status())); - EXPECT_TRUE( - absl::StrContains(port.status().message(), "No port available in range")); -} - -TEST_F(PortManagerTest, ReservePortAllRemotePortsTaken) { - std::string remote_netstat_out = ""; - for (int port = kFirstPort; port <= kLastPort; ++port) { - remote_netstat_out += absl::StrFormat(kLinuxNetstatOutFmt, port); - } - process_factory_.SetProcessOutput(kWindowsNetstat, "", "", 0); - process_factory_.SetProcessOutput(kLinuxNetstatOrSS, remote_netstat_out, "", - 0); - - absl::StatusOr port = - port_manager_.ReservePort(kTimeoutSec, ArchType::kLinux_x86_64); - EXPECT_TRUE(absl::IsResourceExhausted(port.status())); - EXPECT_TRUE( - absl::StrContains(port.status().message(), "No port available in range")); -} - -TEST_F(PortManagerTest, ReservePortLocalNetstatFails) { - process_factory_.SetProcessOutput(kWindowsNetstat, "", "", 1); - process_factory_.SetProcessOutput(kLinuxNetstatOrSS, "", "", 0); - - absl::StatusOr port = - port_manager_.ReservePort(kTimeoutSec, ArchType::kLinux_x86_64); - EXPECT_NOT_OK(port); - EXPECT_TRUE(absl::StrContains(port.status().message(), - "Failed to find available local ports")); -} - -TEST_F(PortManagerTest, ReservePortRemoteNetstatFails) { - process_factory_.SetProcessOutput(kWindowsNetstat, "", "", 0); - process_factory_.SetProcessOutput(kLinuxNetstatOrSS, "", "", 1); - - absl::StatusOr port = - port_manager_.ReservePort(kTimeoutSec, ArchType::kLinux_x86_64); - EXPECT_NOT_OK(port); - EXPECT_TRUE(absl::StrContains(port.status().message(), - "Failed to find available remote ports")); -} - -TEST_F(PortManagerTest, ReservePortRemoteNetstatTimesOut) { - process_factory_.SetProcessOutput(kWindowsNetstat, "", "", 0); - process_factory_.SetProcessNeverExits(kLinuxNetstatOrSS); - steady_clock_.AutoAdvance(kTimeoutSec * 2 * 1000); - - absl::StatusOr port = - port_manager_.ReservePort(kTimeoutSec, ArchType::kLinux_x86_64); - EXPECT_NOT_OK(port); - EXPECT_TRUE(absl::IsDeadlineExceeded(port.status())); - EXPECT_TRUE(absl::StrContains(port.status().message(), - "Timeout while running netstat")); -} - -TEST_F(PortManagerTest, ReservePortMultipleInstances) { - process_factory_.SetProcessOutput(kWindowsNetstat, "", "", 0); - process_factory_.SetProcessOutput(kLinuxNetstatOrSS, "", "", 0); - - PortManager port_manager2(kGuid, kFirstPort, kLastPort, &process_factory_, - &remote_util_); - - // Port managers use shared memory, so different instances know about each - // other. This would even work if |port_manager_| and |port_manager2| belonged - // to different processes, but we don't test that here. - EXPECT_EQ(*port_manager_.ReservePort(kTimeoutSec, ArchType::kLinux_x86_64), - kFirstPort + 0); - EXPECT_EQ(*port_manager2.ReservePort(kTimeoutSec, ArchType::kLinux_x86_64), - kFirstPort + 1); - EXPECT_EQ(*port_manager_.ReservePort(kTimeoutSec, ArchType::kLinux_x86_64), - kFirstPort + 2); - EXPECT_EQ(*port_manager2.ReservePort(kTimeoutSec, ArchType::kLinux_x86_64), - kFirstPort + 3); -} - -TEST_F(PortManagerTest, ReservePortReusesPortsInLRUOrder) { - process_factory_.SetProcessOutput(kWindowsNetstat, "", "", 0); - process_factory_.SetProcessOutput(kLinuxNetstatOrSS, "", "", 0); - - for (int n = 0; n < kNumPorts * 2; ++n) { - EXPECT_EQ(*port_manager_.ReservePort(kTimeoutSec, ArchType::kLinux_x86_64), - kFirstPort + n % kNumPorts); - system_clock_.Advance(1000); - } -} - -TEST_F(PortManagerTest, ReleasePort) { - process_factory_.SetProcessOutput(kWindowsNetstat, "", "", 0); - process_factory_.SetProcessOutput(kLinuxNetstatOrSS, "", "", 0); - - absl::StatusOr port = - port_manager_.ReservePort(kTimeoutSec, ArchType::kLinux_x86_64); - EXPECT_EQ(*port, kFirstPort); - EXPECT_OK(port_manager_.ReleasePort(*port)); - port = port_manager_.ReservePort(kTimeoutSec, ArchType::kLinux_x86_64); - EXPECT_EQ(*port, kFirstPort); -} - -TEST_F(PortManagerTest, ReleasePortOnDestruction) { - process_factory_.SetProcessOutput(kWindowsNetstat, "", "", 0); - process_factory_.SetProcessOutput(kLinuxNetstatOrSS, "", "", 0); - - auto port_manager2 = std::make_unique( - kGuid, kFirstPort, kLastPort, &process_factory_, &remote_util_); - EXPECT_EQ(*port_manager2->ReservePort(kTimeoutSec, ArchType::kLinux_x86_64), - kFirstPort + 0); - EXPECT_EQ(*port_manager_.ReservePort(kTimeoutSec, ArchType::kLinux_x86_64), - kFirstPort + 1); - port_manager2.reset(); - EXPECT_EQ(*port_manager_.ReservePort(kTimeoutSec, ArchType::kLinux_x86_64), - kFirstPort + 0); -} - -TEST_F(PortManagerTest, FindAvailableLocalPortsSuccessWindows) { - // First port is in use. - std::string local_netstat_out = - absl::StrFormat(kWindowsNetstatOutFmt, kFirstPort); - process_factory_.SetProcessOutput(kWindowsNetstat, local_netstat_out, "", 0); - - absl::StatusOr> ports = - PortManager::FindAvailableLocalPorts( - kFirstPort, kLastPort, ArchType::kWindows_x86_64, &process_factory_); - ASSERT_OK(ports); - EXPECT_EQ(ports->size(), kNumPorts - 1); - for (int port = kFirstPort + 1; port <= kLastPort; ++port) { - EXPECT_TRUE(ports->find(port) != ports->end()); - } -} - -TEST_F(PortManagerTest, FindAvailableLocalPortsSuccessLinux) { - // First port is in use. - std::string local_netstat_out = - absl::StrFormat(kLinuxNetstatOutFmt, kFirstPort); - process_factory_.SetProcessOutput(kLinuxNetstatOrSS, local_netstat_out, "", - 0); - - absl::StatusOr> ports = - PortManager::FindAvailableLocalPorts( - kFirstPort, kLastPort, ArchType::kLinux_x86_64, &process_factory_); - ASSERT_OK(ports); - EXPECT_EQ(ports->size(), kNumPorts - 1); - for (int port = kFirstPort + 1; port <= kLastPort; ++port) { - EXPECT_TRUE(ports->find(port) != ports->end()); - } -} - -TEST_F(PortManagerTest, FindAvailableLocalPortsFailsNoPorts) { - // All ports are in use. - std::string local_netstat_out = ""; - for (int port = kFirstPort; port <= kLastPort; ++port) { - local_netstat_out += absl::StrFormat(kWindowsNetstatOutFmt, port); - } - process_factory_.SetProcessOutput(kWindowsNetstat, local_netstat_out, "", 0); - - absl::StatusOr> ports = - PortManager::FindAvailableLocalPorts( - kFirstPort, kLastPort, ArchType::kWindows_x86_64, &process_factory_); - EXPECT_TRUE(absl::IsResourceExhausted(ports.status())); - EXPECT_TRUE(absl::StrContains(ports.status().message(), - "No port available in range")); -} - -TEST_F(PortManagerTest, FindAvailableRemotePortsSuccessLinux) { - // First port is in use. - std::string remote_netstat_out = - absl::StrFormat(kLinuxNetstatOutFmt, kFirstPort); - process_factory_.SetProcessOutput(kLinuxNetstatOrSS, remote_netstat_out, "", - 0); - - absl::StatusOr> ports = - PortManager::FindAvailableRemotePorts( - kFirstPort, kLastPort, ArchType::kLinux_x86_64, &process_factory_, - &remote_util_, kTimeoutSec); - ASSERT_OK(ports); - EXPECT_EQ(ports->size(), kNumPorts - 1); - for (int port = kFirstPort + 1; port <= kLastPort; ++port) { - EXPECT_TRUE(ports->find(port) != ports->end()); - } -} - -TEST_F(PortManagerTest, FindAvailableRemotePortsSuccessLinuxSS) { - // First port is in use, but reporting is done by SS, not netsta. - std::string remote_netstat_out = absl::StrFormat(kLinuxSSOutFmt, kFirstPort); - process_factory_.SetProcessOutput(kLinuxNetstatOrSS, remote_netstat_out, "", - 0); - - absl::StatusOr> ports = - PortManager::FindAvailableRemotePorts( - kFirstPort, kLastPort, ArchType::kLinux_x86_64, &process_factory_, - &remote_util_, kTimeoutSec); - ASSERT_OK(ports); - EXPECT_EQ(ports->size(), kNumPorts - 1); - for (int port = kFirstPort + 1; port <= kLastPort; ++port) { - EXPECT_TRUE(ports->find(port) != ports->end()); - } -} - -TEST_F(PortManagerTest, FindAvailableRemotePortsSuccessWindows) { - // First port is in use. - std::string remote_netstat_out = - absl::StrFormat(kWindowsNetstatOutFmt, kFirstPort); - process_factory_.SetProcessOutput(kWindowsNetstat, remote_netstat_out, "", 0); - - absl::StatusOr> ports = - PortManager::FindAvailableRemotePorts( - kFirstPort, kLastPort, ArchType::kWindows_x86_64, &process_factory_, - &remote_util_, kTimeoutSec); - ASSERT_OK(ports); - EXPECT_EQ(ports->size(), kNumPorts - 1); - for (int port = kFirstPort + 1; port <= kLastPort; ++port) { - EXPECT_TRUE(ports->find(port) != ports->end()); - } -} - -TEST_F(PortManagerTest, FindAvailableRemotePortsFailsNoPorts) { - // All ports are in use. - std::string remote_netstat_out = ""; - for (int port = kFirstPort; port <= kLastPort; ++port) { - remote_netstat_out += absl::StrFormat(kLinuxNetstatOutFmt, port); - } - process_factory_.SetProcessOutput(kLinuxNetstatOrSS, remote_netstat_out, "", - 0); - - absl::StatusOr> ports = - PortManager::FindAvailableRemotePorts( - kFirstPort, kLastPort, ArchType::kLinux_x86_64, &process_factory_, - &remote_util_, kTimeoutSec); - EXPECT_TRUE(absl::IsResourceExhausted(ports.status())); - EXPECT_TRUE(absl::StrContains(ports.status().message(), - "No port available in range")); -} - -} // namespace -} // namespace cdc_ft diff --git a/common/port_manager_win.cc b/common/port_manager_win.cc deleted file mode 100644 index 4d64865..0000000 --- a/common/port_manager_win.cc +++ /dev/null @@ -1,359 +0,0 @@ -// Copyright 2022 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "common/port_manager.h" - -#define WIN32_LEAN_AND_MEAN -#include - -#include - -#include "absl/strings/str_split.h" -#include "common/arch_type.h" -#include "common/log.h" -#include "common/process.h" -#include "common/remote_util.h" -#include "common/status.h" -#include "common/status_macros.h" -#include "common/stopwatch.h" -#include "common/util.h" - -namespace cdc_ft { - -constexpr char kErrorArchTypeUnhandled[] = "arch_type_unhandled"; - -// Returns the arch-specific netstat command. -const char* GetNetstatCommand(ArchType arch_type) { - if (IsWindowsArchType(arch_type)) { - // -a to get the connection and ports the computer is listening on. - // -n to get numerical addresses to avoid the overhead of getting names. - // -p tcp to limit the output to TCPv4 connections. - return "netstat -a -n -p tcp"; - } - - if (IsLinuxArchType(arch_type)) { - // Prefer ss over netstat. The flags out output are compatible. - // --numeric to get numerical addresses. - // --listening to get only listening sockets. - // --tcp to get only TCP connections. - return "which ss2 && ss --numeric --listening --tcp || netstat " - "--numeric --listening --tcp"; - } - - assert(!kErrorArchTypeUnhandled); - return ""; -} - -// Returns the arch-specific IP address to filter netstat results by. -const char* GetNetstatFilterIp(ArchType arch_type) { - if (IsWindowsArchType(arch_type)) { - return "127.0.0.1"; - } - - if (IsLinuxArchType(arch_type)) { - return "0.0.0.0"; - } - - assert(!kErrorArchTypeUnhandled); - return ""; -} - -class SharedMemory { - public: - // Creates a new shared memory instance with given |name| and |size| in bytes. - // Different instances with matching names reference the same piece of memory, - // even if they belong to different processes. If shared memory with the given - // |name| already exists, the existing memory is referenced. Otherwise, a new - // piece of memory is allocated and zero-initialized. - SharedMemory(std::string name, size_t size) - : name_(std::move(name)), size_(size) {} - - absl::StatusOr Get() { - // Already initialized? - if (shared_mem_) return shared_mem_; - assert(!map_file_handle_); - - LARGE_INTEGER size; - size.QuadPart = size_; - map_file_handle_ = CreateFileMapping( - INVALID_HANDLE_VALUE, // use paging file - nullptr, // default security - PAGE_READWRITE, // read/write access - size.HighPart, // maximum object size (high-order DWORD) - size.LowPart, // maximum object size (low-order DWORD) - Util::Utf8ToWideStr(name_).c_str()); // name of mapping object - - if (!map_file_handle_) { - return MakeStatus("Failed to create file mapping object: %s", - Util::GetLastWin32Error()); - } - - // The shared memory holds the timestamps when the ports were reserved. - shared_mem_ = MapViewOfFile(map_file_handle_, // handle to map object - FILE_MAP_ALL_ACCESS, // read/write permission - 0, 0, size.QuadPart); - - if (!shared_mem_) { - std::string errorMessage = Util::GetLastWin32Error(); - CloseHandle(map_file_handle_); - map_file_handle_ = nullptr; - return MakeStatus("Failed to map view of file: %s", errorMessage); - } - - return shared_mem_; - } - - ~SharedMemory() { - if (shared_mem_) { - UnmapViewOfFile(shared_mem_); - shared_mem_ = nullptr; - } - - if (map_file_handle_) { - CloseHandle(map_file_handle_); - map_file_handle_ = nullptr; - } - } - - private: - std::string name_; - size_t size_; - HANDLE map_file_handle_ = nullptr; - void* shared_mem_ = nullptr; -}; - -PortManager::PortManager(std::string name, int first_port, int last_port, - ProcessFactory* process_factory, - RemoteUtil* remote_util, SystemClock* system_clock, - SteadyClock* steady_clock) - : first_port_(first_port), - last_port_(last_port), - process_factory_(process_factory), - remote_util_(remote_util), - system_clock_(system_clock), - steady_clock_(steady_clock), - shared_mem_(std::make_unique( - std::move(name), (last_port - first_port + 1) * sizeof(time_t))) { - assert(last_port_ >= first_port_); -} - -PortManager::~PortManager() { - std::vector ports_copy; - ports_copy.insert(ports_copy.end(), reserved_ports_.begin(), - reserved_ports_.end()); - for (int port : ports_copy) { - absl::Status status = ReleasePort(port); - if (!status.ok()) { - LOG_WARNING("Failed to release port %d: %s", port, status.ToString()); - } - } -} - -absl::StatusOr PortManager::ReservePort(int remote_timeout_sec, - ArchType remote_arch_type) { - // Find available port on workstation. - std::unordered_set local_ports; - ASSIGN_OR_RETURN( - local_ports, - FindAvailableLocalPorts(first_port_, last_port_, - ArchType::kWindows_x86_64, process_factory_), - "Failed to find available local ports"); - - // Find available port on remote instance. - std::unordered_set remote_ports = local_ports; - if (remote_util_ != nullptr) { - ASSIGN_OR_RETURN( - remote_ports, - FindAvailableRemotePorts(first_port_, last_port_, remote_arch_type, - process_factory_, remote_util_, - remote_timeout_sec, steady_clock_), - "Failed to find available remote ports"); - } - - // Fetch shared memory. - void* mem; - ASSIGN_OR_RETURN(mem, shared_mem_->Get(), "Failed to get shared memory"); - time_t* port_timestamps = static_cast(mem); - - // Put ports into a multimap to iterate in LRU order. - int num_ports = last_port_ - first_port_ + 1; - std::multimap ports_to_index; - for (int n = 0; n < num_ports; ++n) { - ports_to_index.insert({port_timestamps[n], n}); - } - - // Iterate over the ports, unused first (timestamp 0), the rest in LRU order. - // The ones with timestamps != 0 might either be stuck (e.g. process crashed - // and did not release port) or still in use. - const time_t now = std::chrono::system_clock::to_time_t(system_clock_->Now()); - for (const auto& [port_timestamp, n] : ports_to_index) { - // Note that some other process might have hijacked the port in the - // meantime, hence do an InterlockedCompareExchange. - volatile time_t* ts_ptr = &port_timestamps[n]; - static_assert(sizeof(time_t) == sizeof(uint64_t), "time_t must be 64 bit"); - assert((reinterpret_cast(ts_ptr) & 7) == 0); - if (InterlockedCompareExchange64(ts_ptr, now, port_timestamp) == - port_timestamp) { - int port = first_port_ + n; - LOG_DEBUG("Trying to reserve port %i", port); - - // We have reserved this port. Double-check that it's actually not in use - // on both the local and the remote device - if (local_ports.find(port) == local_ports.end()) { - LOG_DEBUG("Local port %i not available", port); - InterlockedCompareExchange64(ts_ptr, now, port_timestamp); - continue; - } - if (remote_ports.find(port) == remote_ports.end()) { - LOG_DEBUG("Port %i not available on instance", port); - InterlockedCompareExchange64(ts_ptr, now, port_timestamp); - continue; - } - - LOG_DEBUG("Port %i is available", port); - reserved_ports_.insert(port); - return port; - } - } - - return absl::ResourceExhaustedError(absl::StrFormat( - "No port available in range [%i, %i]", first_port_, last_port_)); -} - -absl::Status PortManager::ReleasePort(int port) { - if (reserved_ports_.find(port) == reserved_ports_.end()) - return absl::OkStatus(); - void* mem; - ASSIGN_OR_RETURN(mem, shared_mem_->Get(), "Failed to get shared memory"); - time_t* port_timestamps = static_cast(mem); - volatile time_t* ts_ptr = &port_timestamps[port - first_port_]; - InterlockedExchange64(ts_ptr, 0); - reserved_ports_.erase(port); - return absl::OkStatus(); -} - -// static -absl::StatusOr> PortManager::FindAvailableLocalPorts( - int first_port, int last_port, ArchType arch_type, - ProcessFactory* process_factory) { - // TODO: Use local APIs instead of netstat. - ProcessStartInfo start_info; - start_info.command = GetNetstatCommand(arch_type); - start_info.name = "netstat"; - start_info.flags = ProcessFlags::kNoWindow; - - std::string output; - start_info.stdout_handler = [&output](const char* data, size_t data_size) { - output.append(data, data_size); - return absl::OkStatus(); - }; - std::string errors; - start_info.stderr_handler = [&errors](const char* data, size_t data_size) { - errors.append(data, data_size); - return absl::OkStatus(); - }; - - absl::Status status = process_factory->Run(start_info); - if (!status.ok()) { - return WrapStatus(status, "Failed to run netstat:\n%s", errors); - } - - LOG_DEBUG("netstat (local) output:\n%s", output); - return FindAvailablePorts(first_port, last_port, output, arch_type); -} - -// static -absl::StatusOr> PortManager::FindAvailableRemotePorts( - int first_port, int last_port, ArchType arch_type, - ProcessFactory* process_factory, RemoteUtil* remote_util, int timeout_sec, - SteadyClock* steady_clock) { - std::string remote_command = GetNetstatCommand(arch_type); - ProcessStartInfo start_info = - remote_util->BuildProcessStartInfoForSsh(remote_command, arch_type); - start_info.name = "netstat"; - start_info.flags = ProcessFlags::kNoWindow; - - std::string output; - start_info.stdout_handler = [&output](const char* data, size_t data_size) { - output.append(data, data_size); - return absl::OkStatus(); - }; - std::string errors; - start_info.stderr_handler = [&errors](const char* data, size_t data_size) { - errors.append(data, data_size); - return absl::OkStatus(); - }; - - std::unique_ptr process = process_factory->Create(start_info); - absl::Status status = process->Start(); - if (!status.ok()) return WrapStatus(status, "Failed to start netstat"); - - Stopwatch timeout_timer(steady_clock); - bool is_timeout = false; - auto detect_timeout = [&timeout_timer, timeout_sec, &is_timeout]() { - is_timeout = timeout_timer.ElapsedSeconds() > timeout_sec; - return is_timeout; - }; - status = process->RunUntil(detect_timeout); - if (!status.ok()) return WrapStatus(status, "Failed to run netstat process"); - if (is_timeout) - return absl::DeadlineExceededError("Timeout while running netstat"); - - uint32_t exit_code = process->ExitCode(); - if (exit_code != 0) { - return MakeStatus("netstat process exited with code %u:\n%s", exit_code, - errors); - } - - LOG_DEBUG("netstat (remote) output:\n%s", output); - return FindAvailablePorts(first_port, last_port, output, arch_type); -} - -// static -absl::StatusOr> PortManager::FindAvailablePorts( - int first_port, int last_port, const std::string& netstat_output, - ArchType arch_type) { - std::unordered_set available_ports; - std::vector lines; - const char* filter_ip = GetNetstatFilterIp(arch_type); - for (const auto& line : absl::StrSplit(netstat_output, '\n')) { - if (absl::StrContains(line, filter_ip)) { - lines.push_back(std::string(line)); - } - } - - for (int port = first_port; port <= last_port; ++port) { - bool port_occupied = false; - std::string portToken = absl::StrFormat("%s:%i", filter_ip, port); - for (const std::string& line : lines) { - // Ports in the TIME_WAIT state can be reused. It is common that ports - // stay in this state for O(minutes). - if (absl::StrContains(line, portToken) && - !absl::StrContains(line, "TIME_WAIT")) { - port_occupied = true; - break; - } - } - if (!port_occupied) available_ports.insert(port); - } - - if (available_ports.empty()) { - return absl::ResourceExhaustedError(absl::StrFormat( - "No port available in range [%i, %i]", first_port, last_port)); - } - - return available_ports; -} - -} // namespace cdc_ft diff --git a/common/port_range_parser.cc b/common/port_range_parser.cc deleted file mode 100644 index 2e5c34e..0000000 --- a/common/port_range_parser.cc +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright 2022 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "common/port_range_parser.h" - -#include - -#include "absl/strings/str_split.h" - -namespace cdc_ft { -namespace port_range { - -bool Parse(const char* value, uint16_t* first, uint16_t* last) { - assert(value); - *first = 0; - *last = 0; - std::vector parts = absl::StrSplit(value, '-'); - if (parts.empty() || parts.size() > 2) return false; - const int ifirst = atoi(parts[0].c_str()); - const int ilast = parts.size() > 1 ? atoi(parts[1].c_str()) : ifirst; - if (ifirst <= 0 || ifirst > UINT16_MAX) return false; - if (ilast <= 0 || ilast > UINT16_MAX || ifirst > ilast) return false; - *first = static_cast(ifirst); - *last = static_cast(ilast); - return true; -} - -} // namespace port_range -} // namespace cdc_ft diff --git a/common/port_range_parser.h b/common/port_range_parser.h deleted file mode 100644 index 3e59fb1..0000000 --- a/common/port_range_parser.h +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright 2022 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef COMMON_PORT_RANGE_PARSER_H_ -#define COMMON_PORT_RANGE_PARSER_H_ - -#include - -namespace cdc_ft { -namespace port_range { - -// Parses |value| into a port range |first|-|last|. -// If |value| is a single number a, assigns |first|=|last|=a. -// If |value| is a range a-b, assigns |first|=a, |last|=b. -bool Parse(const char* value, uint16_t* first, uint16_t* last); - -} // namespace port_range -} // namespace cdc_ft - -#endif // COMMON_PORT_RANGE_PARSER_H_ diff --git a/common/port_range_parser_test.cc b/common/port_range_parser_test.cc deleted file mode 100644 index 0a8c247..0000000 --- a/common/port_range_parser_test.cc +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright 2022 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "common/port_range_parser.h" - -#include "gtest/gtest.h" - -namespace cdc_ft { -namespace { - -TEST(PortRangeParserTest, SingleSuccess) { - uint16_t first, last; - EXPECT_TRUE(port_range::Parse("65535", &first, &last)); - EXPECT_EQ(first, 65535); - EXPECT_EQ(last, 65535); -} - -TEST(PortRangeParserTest, RangeSuccess) { - uint16_t first, last; - EXPECT_TRUE(port_range::Parse("1-2", &first, &last)); - EXPECT_EQ(first, 1); - EXPECT_EQ(last, 2); -} - -TEST(ParamsTest, NoValueFail) { - uint16_t first = 1, last = 1; - EXPECT_FALSE(port_range::Parse("", &first, &last)); - EXPECT_EQ(first, 0); - EXPECT_EQ(last, 0); -} - -TEST(ParamsTest, BadValueTooSmallFail) { - uint16_t first, last; - EXPECT_FALSE(port_range::Parse("0", &first, &last)); -} - -TEST(ParamsTest, BadValueNotIntegerFail) { - uint16_t first, last; - EXPECT_FALSE(port_range::Parse("port", &first, &last)); -} - -TEST(ParamsTest, ForwardPort_BadRangeTooBig) { - uint16_t first, last; - EXPECT_FALSE(port_range::Parse("50000-65536", &first, &last)); -} - -TEST(ParamsTest, ForwardPort_BadRangeFirstGtLast) { - uint16_t first, last; - EXPECT_FALSE(port_range::Parse("50001-50000", &first, &last)); -} - -TEST(ParamsTest, ForwardPort_BadRangeTwoMinus) { - uint16_t first, last; - EXPECT_FALSE(port_range::Parse("1-2-3", &first, &last)); -} - -} // namespace -} // namespace cdc_ft diff --git a/tests_common/BUILD b/tests_common/BUILD index 30b6ec9..285952a 100644 --- a/tests_common/BUILD +++ b/tests_common/BUILD @@ -31,8 +31,6 @@ cc_binary( "//common:path", "//common:path_filter", "//common:platform", - "//common:port_manager", - "//common:port_range_parser", "//common:process", "//common:remote_util", "//common:sdk_util",