diff --git a/build/deps/gen/build_deps.MODULE.bazel b/build/deps/gen/build_deps.MODULE.bazel index 325d4136b7f..36a83f37a05 100644 --- a/build/deps/gen/build_deps.MODULE.bazel +++ b/build/deps/gen/build_deps.MODULE.bazel @@ -5,7 +5,7 @@ http = use_extension("@//:build/exts/http.bzl", "http") git_repository = use_repo_rule("@bazel_tools//tools/build_defs/repo:git.bzl", "git_repository") # abseil-cpp -bazel_dep(name = "abseil-cpp", version = "20260107.0") +bazel_dep(name = "abseil-cpp", version = "20260107.1") # apple_support bazel_dep(name = "apple_support", version = "2.2.0") @@ -17,13 +17,13 @@ bazel_dep(name = "aspect_rules_esbuild", version = "0.25.0") bazel_dep(name = "aspect_rules_js", version = "2.9.2") # aspect_rules_lint -bazel_dep(name = "aspect_rules_lint", version = "1.13.0") +bazel_dep(name = "aspect_rules_lint", version = "2.1.0") # aspect_rules_ts -bazel_dep(name = "aspect_rules_ts", version = "3.8.3") +bazel_dep(name = "aspect_rules_ts", version = "3.8.4") # bazel_lib -bazel_dep(name = "bazel_lib", version = "3.1.1") +bazel_dep(name = "bazel_lib", version = "3.2.0") # bazel_skylib bazel_dep(name = "bazel_skylib", version = "1.9.0") @@ -117,7 +117,7 @@ bazel_dep(name = "rules_nodejs", version = "6.7.3") bazel_dep(name = "rules_oci", version = "2.2.7") # rules_python -bazel_dep(name = "rules_python", version = "1.8.0") +bazel_dep(name = "rules_python", version = "1.8.4") # rules_shell bazel_dep(name = "rules_shell", version = "0.6.1") diff --git a/build/deps/gen/deps.MODULE.bazel b/build/deps/gen/deps.MODULE.bazel index aa17edf7756..138176bcd45 100644 --- a/build/deps/gen/deps.MODULE.bazel +++ b/build/deps/gen/deps.MODULE.bazel @@ -27,10 +27,10 @@ bazel_dep(name = "brotli", version = "1.2.0") # capnp-cpp http.archive( name = "capnp-cpp", - sha256 = "58a883721d220a3d8d75531a7e7ede3fd87c3d6923caf645faff0c78f8807b23", - strip_prefix = "capnproto-capnproto-79b3170/c++", + sha256 = "2e8519d77eb453463b1f2b1e22f40959fe560c143d78e7a51c606ce3bca30c5b", + strip_prefix = "capnproto-capnproto-ac7d90a/c++", type = "tgz", - url = "https://github.com/capnproto/capnproto/tarball/79b317039adad92da1204929f4047f84dfd17350", + url = "https://github.com/capnproto/capnproto/tarball/ac7d90ae2e171a714be4348718fed6f26b0b85f2", ) use_repo(http, "capnp-cpp") diff --git a/build/deps/oci.MODULE.bazel b/build/deps/oci.MODULE.bazel index d7c8f38a4b7..8aece83b8e8 100644 --- a/build/deps/oci.MODULE.bazel +++ b/build/deps/oci.MODULE.bazel @@ -12,4 +12,12 @@ oci.pull( "linux/arm64/v8", ], ) -use_repo(oci, "node_25_slim", "node_25_slim_linux_amd64", "node_25_slim_linux_arm64_v8") +oci.pull( + name = "proxy_everything", + image = "docker.io/cloudflare/proxy-everything:main", + platforms = [ + "linux/amd64", + "linux/arm64", + ], +) +use_repo(oci, "node_25_slim", "node_25_slim_linux_amd64", "node_25_slim_linux_arm64_v8", "proxy_everything", "proxy_everything_linux_amd64", "proxy_everything_linux_arm64") diff --git a/images/BUILD.bazel b/images/BUILD.bazel index 789428f60a3..91c80184435 100644 --- a/images/BUILD.bazel +++ b/images/BUILD.bazel @@ -2,6 +2,7 @@ load("@rules_multirun//:defs.bzl", "command", "multirun") IMAGES = { "container-client-test": "//images/container-client-test:load", + "proxy-everything": "//images/container-client-test:load-proxy-everything", } [ diff --git a/images/container-client-test/BUILD.bazel b/images/container-client-test/BUILD.bazel index c3069f29edf..e8d0cad2aa6 100644 --- a/images/container-client-test/BUILD.bazel +++ b/images/container-client-test/BUILD.bazel @@ -40,3 +40,10 @@ oci_load( repo_tags = ["cloudflare/workerd/container-client-test:latest"], visibility = ["//visibility:public"], ) + +oci_load( + name = "load-proxy-everything", + image = "@proxy_everything", + repo_tags = ["cloudflare/proxy-everything:main"], + visibility = ["//visibility:public"], +) diff --git a/images/container-client-test/app.js b/images/container-client-test/app.js index 8190141809b..8c349482796 100644 --- a/images/container-client-test/app.js +++ b/images/container-client-test/app.js @@ -1,11 +1,27 @@ const { createServer } = require('http'); const webSocketEnabled = process.env.WS_ENABLED === 'true'; +const wsProxyTarget = process.env.WS_PROXY_TARGET || null; -// Create HTTP server const server = createServer(function (req, res) { if (req.url === '/ws') { - // WebSocket upgrade will be handled by the WebSocket server + return; + } + + if (req.url === '/intercept') { + const targetHost = req.headers['x-host'] || '11.0.0.1'; + fetch(`http://${targetHost}`) + .then((result) => result.text()) + .then((body) => { + res.writeHead(200); + res.write(body); + res.end(); + }) + .catch((err) => { + res.writeHead(500); + res.write(`${targetHost} ${err.message}`); + res.end(); + }); return; } @@ -14,30 +30,30 @@ const server = createServer(function (req, res) { res.end(); }); -// Check if WebSocket functionality is enabled if (webSocketEnabled) { const WebSocket = require('ws'); + const wss = new WebSocket.Server({ server, path: '/ws' }); - // Create WebSocket server - const wss = new WebSocket.Server({ - server: server, - path: '/ws', - }); - - wss.on('connection', function connection(ws) { - console.log('WebSocket connection established'); - - ws.on('message', function message(data) { - console.log('Received:', data.toString()); - // Echo the message back with prefix - ws.send('Echo: ' + data.toString()); - }); + wss.on('connection', function (clientWs) { + if (wsProxyTarget) { + const targetWs = new WebSocket(`ws://${wsProxyTarget}/ws`); + const ready = new Promise(function (resolve) { + targetWs.on('open', resolve); + }); - ws.on('close', function close() { - console.log('WebSocket connection closed'); - }); + targetWs.on('message', (data) => clientWs.send(data)); + clientWs.on('message', async function (data) { + await ready; + targetWs.send(data); + }); - ws.on('error', console.error); + clientWs.on('close', targetWs.close); + targetWs.on('close', clientWs.close); + } else { + clientWs.on('message', function (data) { + clientWs.send('Echo: ' + data.toString()); + }); + } }); } diff --git a/src/workerd/api/container.c++ b/src/workerd/api/container.c++ index 24137471ff0..8c3db6e871d 100644 --- a/src/workerd/api/container.c++ +++ b/src/workerd/api/container.c++ @@ -71,6 +71,39 @@ jsg::Promise Container::setInactivityTimeout(jsg::Lock& js, int64_t durati return IoContext::current().awaitIo(js, req.sendIgnoringResult()); } +jsg::Promise Container::interceptOutboundHttp( + jsg::Lock& js, kj::String addr, jsg::Ref binding) { + auto& ioctx = IoContext::current(); + auto channel = binding->getSubrequestChannel(ioctx); + + // Get a channel token for RPC usage, the container runtime can use this + // token later to redeem a Fetcher. + auto token = channel->getToken(IoChannelFactory::ChannelTokenUsage::RPC); + + auto req = rpcClient->setEgressHttpRequest(); + req.setHostPort(addr); + req.setChannelToken(token); + return ioctx.awaitIo(js, req.sendIgnoringResult()); +} + +jsg::Promise Container::interceptAllOutboundHttp(jsg::Lock& js, jsg::Ref binding) { + auto& ioctx = IoContext::current(); + auto channel = binding->getSubrequestChannel(ioctx); + auto token = channel->getToken(IoChannelFactory::ChannelTokenUsage::RPC); + + // Register for all IPv4 and IPv6 addresses (on port 80) + auto reqV4 = rpcClient->setEgressHttpRequest(); + reqV4.setHostPort("0.0.0.0/0"_kj); + reqV4.setChannelToken(token); + + auto reqV6 = rpcClient->setEgressHttpRequest(); + reqV6.setHostPort("::/0"_kj); + reqV6.setChannelToken(token); + + return ioctx.awaitIo(js, + kj::joinPromisesFailFast(kj::arr(reqV4.sendIgnoringResult(), reqV6.sendIgnoringResult()))); +} + jsg::Promise Container::monitor(jsg::Lock& js) { JSG_REQUIRE(running, Error, "monitor() cannot be called on a container that is not running."); diff --git a/src/workerd/api/container.h b/src/workerd/api/container.h index 1cbacce4027..f3a13aa33dd 100644 --- a/src/workerd/api/container.h +++ b/src/workerd/api/container.h @@ -62,6 +62,9 @@ class Container: public jsg::Object { void signal(jsg::Lock& js, int signo); jsg::Ref getTcpPort(jsg::Lock& js, int port); jsg::Promise setInactivityTimeout(jsg::Lock& js, int64_t durationMs); + jsg::Promise interceptOutboundHttp( + jsg::Lock& js, kj::String addr, jsg::Ref binding); + jsg::Promise interceptAllOutboundHttp(jsg::Lock& js, jsg::Ref binding); // TODO(containers): listenTcp() @@ -73,6 +76,11 @@ class Container: public jsg::Object { JSG_METHOD(signal); JSG_METHOD(getTcpPort); JSG_METHOD(setInactivityTimeout); + + if (flags.getWorkerdExperimental()) { + JSG_METHOD(interceptOutboundHttp); + JSG_METHOD(interceptAllOutboundHttp); + } } void visitForMemoryInfo(jsg::MemoryTracker& tracker) const { diff --git a/src/workerd/io/container.capnp b/src/workerd/io/container.capnp index f79e85defb2..fa9bf192818 100644 --- a/src/workerd/io/container.capnp +++ b/src/workerd/io/container.capnp @@ -110,4 +110,13 @@ interface Container @0x9aaceefc06523bca { # Note that if there is an open connection to the container, the runtime must not shutdown the container. # If there is no activity timeout duration configured and no container connection, it's up to the runtime # to decide when to signal the container to exit. + + setEgressHttp @8 (hostPort :Text, channelToken :Data); + # Configures egress HTTP routing for the container. When the container attempts to connect to the + # specified host:port, the connection should be routed back to the Workers runtime using the channel token. + # The format of hostPort can be '[':']'. If port is omitted, it's assumed to only cover port 80. + # This method does not support HTTPs yet. + + + # TODO: setEgressTcp } diff --git a/src/workerd/server/BUILD.bazel b/src/workerd/server/BUILD.bazel index 4f547439006..3594a8ee891 100644 --- a/src/workerd/server/BUILD.bazel +++ b/src/workerd/server/BUILD.bazel @@ -204,19 +204,29 @@ wd_cc_library( }), ) +wd_cc_library( + name = "channel-token", + srcs = ["channel-token.c++"], + hdrs = ["channel-token.h"], + deps = [ + ":channel-token_capnp", + "//src/workerd/io", + "//src/workerd/util:entropy", + ], +) + wd_cc_library( name = "server", srcs = [ - "channel-token.c++", "server.c++", ], hdrs = [ - "channel-token.h", "server.h", ], deps = [ ":actor-id-impl", ":alarm-scheduler", + ":channel-token", ":channel-token_capnp", ":container-client", ":facet-tree-index", @@ -268,7 +278,9 @@ wd_cc_library( hdrs = ["container-client.h"], visibility = ["//visibility:public"], deps = [ + ":channel-token", ":docker-api_capnp", + "//src/workerd/io", "//src/workerd/io:container_capnp", "//src/workerd/jsg", "@capnp-cpp//src/capnp/compat:http-over-capnp", diff --git a/src/workerd/server/container-client.c++ b/src/workerd/server/container-client.c++ index 5c118b1f955..0c55e9032a1 100644 --- a/src/workerd/server/container-client.c++ +++ b/src/workerd/server/container-client.c++ @@ -5,6 +5,7 @@ #include "container-client.h" #include +#include #include #include @@ -12,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -21,6 +23,74 @@ namespace workerd::server { namespace { + +struct ParsedAddress { + kj::CidrRange cidr; + kj::Maybe port; +}; + +struct HostAndPort { + kj::String host; + kj::Maybe port; +}; + +// Strips a port suffix from a string, returning the host and port separately. +// For IPv6, expects brackets: "[::1]:8080" -> ("::1", 8080) +// For IPv4: "10.0.0.1:8080" -> ("10.0.0.1", 8080) +// If no port, returns the host as-is with no port. +HostAndPort stripPort(kj::StringPtr str) { + if (str.startsWith("[")) { + // Bracketed IPv6: "[ipv6]" or "[ipv6]:port" + size_t closeBracket = + KJ_REQUIRE_NONNULL(str.findLast(']'), "Unclosed '[' in address string.", str); + + auto host = str.slice(1, closeBracket); + + if (str.size() > closeBracket + 1) { + KJ_REQUIRE( + str.slice(closeBracket + 1).startsWith(":"), "Expected port suffix after ']'.", str); + auto port = KJ_REQUIRE_NONNULL( + str.slice(closeBracket + 2).tryParseAs(), "Invalid port number.", str); + return {kj::str(host), port}; + } + return {kj::str(host), kj::none}; + } + + // No brackets - check if there's exactly one colon (IPv4 with port) + // IPv6 without brackets has 2+ colons and no port suffix supported + KJ_IF_SOME(colonPos, str.findLast(':')) { + auto afterColon = str.slice(colonPos + 1); + KJ_IF_SOME(port, afterColon.tryParseAs()) { + // Valid port - but only treat as port for IPv4 (check no other colons before) + auto beforeColon = str.first(colonPos); + if (beforeColon.findFirst(':') == kj::none) { + return {kj::str(beforeColon), port}; + } + } + } + + return {kj::str(str), kj::none}; +} + +// Build a CidrRange from a host string, adding /32 or /128 prefix if not present. +kj::CidrRange makeCidr(kj::StringPtr host) { + if (host.findFirst('/') != kj::none) { + return kj::CidrRange(host); + } + // No CIDR prefix - add /32 for IPv4, /128 for IPv6 + bool isIpv6 = host.findFirst(':') != kj::none; + return kj::CidrRange(kj::str(host, isIpv6 ? "/128" : "/32")); +} + +// Parses "host[:port]" strings. Handles: +// - IPv4: "10.0.0.1", "10.0.0.1:8080", "10.0.0.0/8", "10.0.0.0/8:8080" +// - IPv6 with brackets: "[::1]", "[::1]:8080", "[fe80::1]", "[fe80::/10]:8080" +// - IPv6 without brackets: "::1", "fe80::1", "fe80::/10" +ParsedAddress parseHostPort(kj::StringPtr str) { + auto hostAndPort = stripPort(str); + return {makeCidr(hostAndPort.host), hostAndPort.port}; +} + kj::StringPtr signalToString(uint32_t signal) { switch (signal) { case 1: @@ -107,22 +177,33 @@ ContainerClient::ContainerClient(capnp::ByteStreamFactory& byteStreamFactory, kj::String dockerPath, kj::String containerName, kj::String imageName, + kj::Maybe containerEgressInterceptorImage, kj::TaskSet& waitUntilTasks, - kj::Function cleanupCallback) + kj::Function cleanupCallback, + ChannelTokenHandler& channelTokenHandler) : byteStreamFactory(byteStreamFactory), timer(timer), network(network), dockerPath(kj::mv(dockerPath)), - containerName(kj::encodeUriComponent(kj::mv(containerName))), + containerName(kj::encodeUriComponent(kj::str(containerName))), + sidecarContainerName(kj::encodeUriComponent(kj::str(containerName, "-proxy"))), imageName(kj::mv(imageName)), + containerEgressInterceptorImage(kj::mv(containerEgressInterceptorImage)), waitUntilTasks(waitUntilTasks), - cleanupCallback(kj::mv(cleanupCallback)) {} + cleanupCallback(kj::mv(cleanupCallback)), + channelTokenHandler(channelTokenHandler) {} ContainerClient::~ContainerClient() noexcept(false) { + stopEgressListener(); + // Call the cleanup callback to remove this client from the ActorNamespace map cleanupCallback(); - // Destroy the Docker container + // Sidecar shares main container's network namespace, so must be destroyed first + waitUntilTasks.add(dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::DELETE, + kj::str("/containers/", sidecarContainerName, "?force=true")) + .ignoreResult()); + waitUntilTasks.add(dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::DELETE, kj::str("/containers/", containerName, "?force=true")) .ignoreResult()); @@ -175,6 +256,182 @@ class ContainerClient::DockerPort final: public rpc::Container::Port::Server { kj::Maybe> pumpTask; }; +// HTTP service that handles HTTP CONNECT requests from the container sidecar (proxy-everything). +// When the sidecar intercepts container egress traffic, it sends HTTP CONNECT to this service. +// After accepting the CONNECT, the tunnel carries the actual HTTP request from the container, +// which we parse and forward to the appropriate SubrequestChannel based on egressMappings. +// Inner HTTP service that handles requests inside the CONNECT tunnel. +// Forwards requests to the worker binding via SubrequestChannel. +class InnerEgressService final: public kj::HttpService { + public: + InnerEgressService(IoChannelFactory::SubrequestChannel& channel): channel(channel) {} + + kj::Promise request(kj::HttpMethod method, + kj::StringPtr url, + const kj::HttpHeaders& headers, + kj::AsyncInputStream& requestBody, + Response& response) override { + IoChannelFactory::SubrequestMetadata metadata; + auto worker = channel.startRequest(kj::mv(metadata)); + co_await worker->request(method, url, headers, requestBody, response); + } + + private: + IoChannelFactory::SubrequestChannel& channel; +}; + +// Outer HTTP service that handles CONNECT requests from the sidecar. +class EgressHttpService final: public kj::HttpService { + public: + EgressHttpService(ContainerClient& containerClient, kj::HttpHeaderTable& headerTable) + : containerClient(containerClient), + headerTable(headerTable) {} + + kj::Promise request(kj::HttpMethod method, + kj::StringPtr url, + const kj::HttpHeaders& headers, + kj::AsyncInputStream& requestBody, + Response& response) override { + // Regular HTTP requests are not expected - we only handle CONNECT + co_return co_await response.sendError(405, "Method Not Allowed", headerTable); + } + + kj::Promise connect(kj::StringPtr host, + const kj::HttpHeaders& headers, + kj::AsyncIoStream& connection, + ConnectResponse& response, + kj::HttpConnectSettings settings) override { + auto destAddr = kj::str(host); + + kj::HttpHeaders responseHeaders(headerTable); + response.accept(200, "OK", responseHeaders); + + auto mapping = containerClient.findEgressMapping(destAddr, /*defaultPort=*/80); + + KJ_IF_SOME(channel, mapping) { + // Layer an HttpServer on top of the tunnel to handle HTTP parsing/serialization + auto innerService = kj::heap(*channel); + auto innerServer = + kj::heap(containerClient.timer, headerTable, *innerService); + + co_await innerServer->listenHttpCleanDrain(connection) + .attach(kj::mv(innerServer), kj::mv(innerService)); + + co_return; + } + + if (!containerClient.internetEnabled) { + connection.shutdownWrite(); + co_return; + } + + // No egress mapping and internet enabled, so forward via raw TCP + auto addr = co_await containerClient.network.parseAddress(destAddr); + auto destConn = co_await addr->connect(); + + auto connToDestination = connection.pumpTo(*destConn).then( + [&destConn = *destConn](uint64_t) { destConn.shutdownWrite(); }); + + auto destinationToConn = + destConn->pumpTo(connection).then([&connection](uint64_t) { connection.shutdownWrite(); }); + + co_await kj::joinPromisesFailFast( + kj::arr(kj::mv(connToDestination), kj::mv(destinationToConn))); + co_return; + } + + private: + ContainerClient& containerClient; + kj::HttpHeaderTable& headerTable; +}; + +// The name of the docker workerd network. All containers spawned by Workerd +// will be attached to this network. +constexpr kj::StringPtr WORKERD_NETWORK_NAME = "workerd-network"_kj; + +kj::Promise ContainerClient::getDockerBridgeIPAMConfig() { + // First, try to find or create the workerd-network + // Docker API: GET /networks/workerd-network + auto response = co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::GET, + kj::str("/networks/", WORKERD_NETWORK_NAME)); + + if (response.statusCode == 404) { + // Network doesn't exist, create it + // Equivalent to: docker network create -d bridge --ipv6 workerd-network + co_await createWorkerdNetwork(); + // Re-fetch the network to get the gateway + response = co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::GET, + kj::str("/networks/", WORKERD_NETWORK_NAME)); + } + + if (response.statusCode == 200) { + auto jsonRoot = decodeJsonResponse(response.body); + auto ipamConfig = jsonRoot.getIpam().getConfig(); + if (ipamConfig.size() > 0) { + auto config = ipamConfig[0]; + co_return IPAMConfigResult{ + .gateway = kj::str(config.getGateway()), + .subnet = kj::str(config.getSubnet()), + }; + } + } + + JSG_FAIL_REQUIRE(Error, + "Failed to get workerd-network. " + "Status: ", + response.statusCode, ", Body: ", response.body); +} + +kj::Promise ContainerClient::createWorkerdNetwork() { + // Docker API: POST /networks/create + // Equivalent to: docker network create -d bridge --ipv6 workerd-network + capnp::JsonCodec codec; + codec.handleByAnnotation(); + capnp::MallocMessageBuilder message; + auto jsonRoot = message.initRoot(); + jsonRoot.setName(WORKERD_NETWORK_NAME); + jsonRoot.setDriver("bridge"); + jsonRoot.setEnableIpv6(true); + + auto response = co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::POST, + kj::str("/networks/create"), codec.encode(jsonRoot)); + + if (response.statusCode != 201 && response.statusCode != 409) { + JSG_FAIL_REQUIRE(Error, + "Failed to create workerd-network." + "Status: ", + response.statusCode, ", Body: ", response.body); + } +} + +kj::Promise ContainerClient::startEgressListener(kj::StringPtr listenAddress) { + auto service = kj::heap(*this, headerTable); + auto httpServer = kj::heap(timer, headerTable, *service); + auto& httpServerRef = *httpServer; + + egressHttpServer = httpServer.attach(kj::mv(service)); + + // Listen on the Docker bridge gateway IP with port 0 to let the OS pick a free port + auto addr = co_await network.parseAddress(kj::str(listenAddress, ":0")); + auto listener = addr->listen(); + + uint16_t chosenPort = listener->getPort(); + + egressListenerTask = httpServerRef.listenHttp(*listener) + .attach(kj::mv(listener)) + .eagerlyEvaluate([](kj::Exception&& e) { + LOG_EXCEPTION( + "Workerd could not listen in the TCP port to proxy traffic off the docker container", e); + }); + + co_return chosenPort; +} + +void ContainerClient::stopEgressListener() { + egressListenerTask = kj::none; + egressHttpServer = kj::none; +} + kj::Promise ContainerClient::dockerApiRequest(kj::Network& network, kj::String dockerPath, kj::HttpMethod method, @@ -219,6 +476,7 @@ kj::Promise ContainerClient::inspectContainer( if (response.statusCode == 404) { co_return InspectResponse{.isRunning = false, .ports = {}}; } + JSG_REQUIRE(response.statusCode == 200, Error, "Container inspect failed"); // Parse JSON response auto jsonRoot = decodeJsonResponse(response.body); @@ -284,6 +542,8 @@ kj::Promise ContainerClient::createContainer( auto envSize = environment.map([](auto& env) { return env.size(); }).orDefault(0); auto jsonEnv = jsonRoot.initEnv(envSize + kj::size(defaultEnv)); + co_await createWorkerdNetwork(); + KJ_IF_SOME(env, environment) { for (uint32_t i: kj::zeroTo(env.size())) { jsonEnv.set(i, env[i]); @@ -300,6 +560,13 @@ kj::Promise ContainerClient::createContainer( // We need to set a restart policy to avoid having ambiguous states // where the container we're managing is stuck at "exited" state. hostConfig.initRestartPolicy().setName("on-failure"); + // Add host.docker.internal mapping so containers can reach the host + // This is equivalent to --add-host=host.docker.internal:host-gateway + auto extraHosts = hostConfig.initExtraHosts(1); + auto ipamConfigForHost = co_await getDockerBridgeIPAMConfig(); + extraHosts.set(0, kj::str("host.docker.internal:", ipamConfigForHost.gateway)); + // Connect the container to the workerd-network for IPv6 support and container isolation + hostConfig.setNetworkMode(WORKERD_NETWORK_NAME); auto response = co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::POST, kj::str("/containers/create?name=", containerName), codec.encode(jsonRoot)); @@ -380,8 +647,72 @@ kj::Promise ContainerClient::destroyContainer() { } } +// Creates the sidecar container for egress proxy. +// The sidecar shares the network namespace with the main container and runs +// proxy-everything to intercept and proxy egress traffic. +kj::Promise ContainerClient::createSidecarContainer( + uint16_t egressPort, kj::String networkCidr) { + // Equivalent to: docker run --cap-add=NET_ADMIN --network container:$(CONTAINER) ... + capnp::JsonCodec codec; + codec.handleByAnnotation(); + capnp::MallocMessageBuilder message; + auto jsonRoot = message.initRoot(); + auto& image = KJ_ASSERT_NONNULL(containerEgressInterceptorImage, + "containerEgressInterceptorImage must be configured to use egress interception. " + "Set it in the localDocker configuration."); + jsonRoot.setImage(image); + + auto cmd = jsonRoot.initCmd(4); + cmd.set(0, "--http-egress-port"); + cmd.set(1, kj::str(egressPort)); + cmd.set(2, "--docker-gateway-cidr"); + cmd.set(3, networkCidr); + + auto hostConfig = jsonRoot.initHostConfig(); + // Share network namespace with the main container + hostConfig.setNetworkMode(kj::str("container:", containerName)); + + // Sidecar needs NET_ADMIN capability for iptables/TPROXY + auto capAdd = hostConfig.initCapAdd(1); + capAdd.set(0, "NET_ADMIN"); + hostConfig.setAutoRemove(true); + + auto response = co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::POST, + kj::str("/containers/create?name=", sidecarContainerName), codec.encode(jsonRoot)); + + if (response.statusCode == 409) { + // Already created, nothing to do + co_return; + } + + if (response.statusCode != 201) { + JSG_REQUIRE(response.statusCode != 404, Error, "No such image available named ", image, + ". Please ensure the container egress interceptor image is built and available."); + JSG_FAIL_REQUIRE(Error, "Failed to create the networking sidecar [", response.statusCode, "] ", + response.body); + } +} + +kj::Promise ContainerClient::startSidecarContainer() { + auto endpoint = kj::str("/containers/", sidecarContainerName, "/start"); + auto response = co_await dockerApiRequest( + network, kj::str(dockerPath), kj::HttpMethod::POST, kj::mv(endpoint), kj::str("")); + JSG_REQUIRE(response.statusCode == 204, Error, + "Starting network sidecar container failed with: ", response.body); +} + +kj::Promise ContainerClient::destroySidecarContainer() { + auto endpoint = kj::str("/containers/", sidecarContainerName, "?force=true"); + co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::DELETE, kj::mv(endpoint)); + auto response = co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::POST, + kj::str("/containers/", sidecarContainerName, "/wait?condition=removed")); + JSG_REQUIRE(response.statusCode == 200 || response.statusCode == 404, Error, + "Destroying docker network sidecar container failed: ", response.statusCode, response.body); +} + kj::Promise ContainerClient::status(StatusContext context) { const auto [isRunning, _ports] = co_await inspectContainer(); + containerStarted.store(isRunning, std::memory_order_release); context.getResults().setRunning(isRunning); } @@ -395,12 +726,27 @@ kj::Promise ContainerClient::start(StartContext context) { if (params.hasEntrypoint()) { entrypoint = params.getEntrypoint(); } + if (params.hasEnvironmentVariables()) { environment = params.getEnvironmentVariables(); } + // We record internetEnabled so we can accept/reject container connections that don't go to Workers + internetEnabled = params.getEnableInternet(); + co_await createContainer(entrypoint, environment); co_await startContainer(); + + // Opt in to the proxy sidecar container only if the user has configured egressMappings + // for now. In the future, it will always run when a user container is running + if (!egressMappings.empty()) { + // The user container will be blocked on network connectivity until this finishes. + // When workerd-network is more battle-tested and goes out of experimental so it's non-optional, + // we should make the sidecar start first and _then_ make the user container join the sidecar network. + co_await ensureSidecarStarted(); + } + + containerStarted.store(true, std::memory_order_release); } kj::Promise ContainerClient::monitor(MonitorContext context) { @@ -409,7 +755,6 @@ kj::Promise ContainerClient::monitor(MonitorContext context) { // If it hasn't, we'll give it 3 tries before failing. auto results = context.getResults(); for (int i = 0; i < 3; i++) { - // Docker API: POST /containers/{id}/wait - wait for container to exit auto endpoint = kj::str("/containers/", containerName, "/wait"); auto response = co_await dockerApiRequest( @@ -418,6 +763,8 @@ kj::Promise ContainerClient::monitor(MonitorContext context) { co_await timer.afterDelay(1 * kj::SECONDS); continue; } + + containerStarted.store(false, std::memory_order_release); JSG_REQUIRE(response.statusCode == 200, Error, "Monitoring container failed with: ", response.statusCode, response.body); // Parse JSON response @@ -426,10 +773,13 @@ kj::Promise ContainerClient::monitor(MonitorContext context) { results.setExitCode(statusCode); co_return; } + JSG_FAIL_REQUIRE(Error, "Monitor failed to find container"); } kj::Promise ContainerClient::destroy(DestroyContext context) { + // Sidecar shares main container's network namespace, so must be destroyed first + co_await destroySidecarContainer(); co_await destroyContainer(); } @@ -468,6 +818,75 @@ kj::Promise ContainerClient::listenTcp(ListenTcpContext context) { KJ_UNIMPLEMENTED("listenTcp not implemented for Docker containers - use port mapping instead"); } +kj::Maybe ContainerClient::findEgressMapping( + kj::StringPtr destAddr, uint16_t defaultPort) { + auto hostAndPort = stripPort(destAddr); + uint16_t port = hostAndPort.port.orDefault(defaultPort); + + for (auto& mapping: egressMappings) { + if (mapping.cidr.matches(hostAndPort.host)) { + // CIDR matches, now check port. + // If the port is 0, we match anything. + if (mapping.port == 0 || mapping.port == port) { + return mapping.channel.get(); + } + } + } + + return kj::none; +} + +kj::Promise ContainerClient::ensureSidecarStarted() { + if (containerSidecarStarted.exchange(true, std::memory_order_acquire)) { + co_return; + } + + KJ_ON_SCOPE_FAILURE(containerSidecarStarted.store(false, std::memory_order_release)); + + // Get the Docker bridge gateway IP to listen on (only accessible from containers) + auto ipamConfig = co_await getDockerBridgeIPAMConfig(); + // Create and start the sidecar container that shares the network namespace + // with the main container and intercepts egress traffic. + co_await createSidecarContainer(egressListenerPort, kj::mv(ipamConfig.subnet)); + co_await startSidecarContainer(); +} + +kj::Promise ContainerClient::setEgressHttp(SetEgressHttpContext context) { + auto params = context.getParams(); + auto hostPortStr = kj::str(params.getHostPort()); + auto tokenBytes = params.getChannelToken(); + + auto parsed = parseHostPort(hostPortStr); + uint16_t port = parsed.port.orDefault(80); + auto cidr = kj::mv(parsed.cidr); + + if (egressListenerTask == kj::none) { + // Get the Docker bridge gateway IP to listen on (only accessible from containers) + auto ipamConfig = co_await getDockerBridgeIPAMConfig(); + + // Start the egress listener first so it's ready when the sidecar starts. + // Use port 0 to let the OS pick a free port dynamically. + egressListenerPort = co_await startEgressListener(ipamConfig.gateway); + } + + if (containerStarted.load(std::memory_order_acquire)) { + // Only try to create and start a sidecar container + // if the user container is running. + co_await ensureSidecarStarted(); + } + + auto subrequestChannel = channelTokenHandler.decodeSubrequestChannelToken( + workerd::IoChannelFactory::ChannelTokenUsage::RPC, tokenBytes); + + egressMappings.add(EgressMapping{ + .cidr = kj::mv(cidr), + .port = port, + .channel = kj::mv(subrequestChannel), + }); + + co_return; +} + kj::Own ContainerClient::addRef() { return kj::addRef(*this); } diff --git a/src/workerd/server/container-client.h b/src/workerd/server/container-client.h index 456c8911eb5..7d592393937 100644 --- a/src/workerd/server/container-client.h +++ b/src/workerd/server/container-client.h @@ -5,15 +5,22 @@ #pragma once #include +#include +#include +#include #include #include +#include #include +#include #include #include #include #include +#include + namespace workerd::server { // Docker-based implementation that implements the rpc::Container::Server interface @@ -23,7 +30,7 @@ namespace workerd::server { // // ContainerClient is reference-counted to support actor reconnection with inactivity timeouts. // When setInactivityTimeout() is called, a timer holds a reference to prevent premature -// destruction. The ContainerClient can be shared across multiple actor lifetimes. +// destruction. The ContainerClient can be shared across multiple actor lifetimes class ContainerClient final: public rpc::Container::Server, public kj::Refcounted { public: ContainerClient(capnp::ByteStreamFactory& byteStreamFactory, @@ -32,8 +39,10 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte kj::String dockerPath, kj::String containerName, kj::String imageName, + kj::Maybe containerEgressInterceptorImage, kj::TaskSet& waitUntilTasks, - kj::Function cleanupCallback); + kj::Function cleanupCallback, + ChannelTokenHandler& channelTokenHandler); ~ContainerClient() noexcept(false); @@ -46,16 +55,23 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte kj::Promise getTcpPort(GetTcpPortContext context) override; kj::Promise listenTcp(ListenTcpContext context) override; kj::Promise setInactivityTimeout(SetInactivityTimeoutContext context) override; + kj::Promise setEgressHttp(SetEgressHttpContext context) override; kj::Own addRef(); private: capnp::ByteStreamFactory& byteStreamFactory; + kj::HttpHeaderTable headerTable; kj::Timer& timer; kj::Network& network; kj::String dockerPath; kj::String containerName; + kj::String sidecarContainerName; kj::String imageName; + + // Container egress interceptor image name (sidecar for egress proxy) + kj::Maybe containerEgressInterceptorImage; + kj::TaskSet& waitUntilTasks; static constexpr kj::StringPtr defaultEnv[] = {"CLOUDFLARE_COUNTRY_A2=XX"_kj, @@ -67,6 +83,9 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte // Docker-specific Port implementation class DockerPort; + // EgressHttpService handles CONNECT requests from proxy-anything sidecar + friend class EgressHttpService; + struct Response { kj::uint statusCode; kj::String body; @@ -77,6 +96,11 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte kj::HashMap ports; }; + struct IPAMConfigResult { + kj::String gateway; + kj::String subnet; + }; + // Docker API v1.50 helper methods static kj::Promise dockerApiRequest(kj::Network& network, kj::String dockerPath, @@ -91,8 +115,53 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte kj::Promise killContainer(uint32_t signal); kj::Promise destroyContainer(); + // Sidecar container management (for egress proxy) + kj::Promise createSidecarContainer(uint16_t egressPort, kj::String networkCidr); + kj::Promise startSidecarContainer(); + kj::Promise destroySidecarContainer(); + kj::Promise monitorSidecarContainer(); + // Cleanup callback to remove from ActorNamespace map when destroyed kj::Function cleanupCallback; + + // For redeeming channel tokens received via setEgressHttp + ChannelTokenHandler& channelTokenHandler; + + // Represents a parsed egress mapping with CIDR and port matching + struct EgressMapping { + kj::CidrRange cidr; + uint16_t port; // 0 means match all ports + kj::Own channel; + }; + + kj::Vector egressMappings; + + // Find a matching egress mapping for the given destination address (host:port format) + kj::Maybe findEgressMapping( + kj::StringPtr destAddr, uint16_t defaultPort); + + // Whether general internet access is enabled for this container + bool internetEnabled = false; + + std::atomic_bool containerStarted = false; + std::atomic_bool containerSidecarStarted = false; + + kj::Maybe> egressHttpServer; + kj::Maybe> egressListenerTask; + + uint16_t egressListenerPort = 0; + + // Get the Docker bridge network gateway IP and subnet. + // Prefers the "workerd-network" bridge, creating it if needed + kj::Promise getDockerBridgeIPAMConfig(); + // Create the workerd-network Docker bridge network with IPv6 support + kj::Promise createWorkerdNetwork(); + // Start the egress listener on the specified address, returns the chosen port + kj::Promise startEgressListener(kj::StringPtr listenAddress); + void stopEgressListener(); + // Ensure the egress listener and sidecar container are started exactly once. + // Uses containerSidecarStarted as a guard. Called from both start() and setEgressHttp(). + kj::Promise ensureSidecarStarted(); }; } // namespace workerd::server diff --git a/src/workerd/server/docker-api.capnp b/src/workerd/server/docker-api.capnp index e3d9ca3e5ab..692ce021cee 100644 --- a/src/workerd/server/docker-api.capnp +++ b/src/workerd/server/docker-api.capnp @@ -162,6 +162,7 @@ struct Docker { cgroupParent @47 :Text $Json.name("CgroupParent"); volumeDriver @48 :Text $Json.name("VolumeDriver"); shmSize @49 :UInt32 $Json.name("ShmSize"); + extraHosts @50 :List(Text) $Json.name("ExtraHosts"); # --add-host entries in "host:ip" format } } @@ -286,4 +287,35 @@ struct Docker { } } } + + # Network inspection response (GET /networks/{id}) + struct NetworkInspectResponse { + name @0 :Text $Json.name("Name"); + id @1 :Text $Json.name("Id"); + ipam @2 :IPAM $Json.name("IPAM"); + + struct IPAM { + driver @0 :Text $Json.name("Driver"); + config @1 :List(IPAMConfig) $Json.name("Config"); + + struct IPAMConfig { + subnet @0 :Text $Json.name("Subnet"); + gateway @1 :Text $Json.name("Gateway"); + } + } + } + + # Network create request (POST /networks/create) + # Equivalent to: docker network create -d bridge --ipv6 workerd-network + struct NetworkCreateRequest { + name @0 :Text $Json.name("Name"); + driver @1 :Text $Json.name("Driver"); # "bridge", "overlay", etc. + enableIpv6 @2 :Bool $Json.name("EnableIPv6"); + } + + # Network create response + struct NetworkCreateResponse { + id @0 :Text $Json.name("Id"); + warning @1 :Text $Json.name("Warning"); + } } diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index 681f92cb14b..03580e9a8e8 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -1896,6 +1896,7 @@ class Server::WorkerService final: public Service, LinkCallback linkCallback, AbortActorsCallback abortActorsCallback, kj::Maybe dockerPathParam, + kj::Maybe containerEgressInterceptorImageParam, bool isDynamic) : channelTokenHandler(channelTokenHandler), serviceName(serviceName), @@ -1909,6 +1910,7 @@ class Server::WorkerService final: public Service, waitUntilTasks(*this), abortActorsCallback(kj::mv(abortActorsCallback)), dockerPath(kj::mv(dockerPathParam)), + containerEgressInterceptorImage(kj::mv(containerEgressInterceptorImageParam)), isDynamic(isDynamic) {} // Call immediately after the constructor to set up `actorNamespaces`. This can't happen during @@ -1929,9 +1931,9 @@ class Server::WorkerService final: public Service, } auto actorClass = kj::refcounted(*this, entry.key, Frankenvalue()); - auto ns = - kj::heap(kj::mv(actorClass), entry.value, threadContext.getUnsafeTimer(), - threadContext.getByteStreamFactory(), network, dockerPath, waitUntilTasks); + auto ns = kj::heap(kj::mv(actorClass), entry.value, + threadContext.getUnsafeTimer(), threadContext.getByteStreamFactory(), channelTokenHandler, + network, dockerPath, containerEgressInterceptorImage, waitUntilTasks); actorNamespaces.insert(entry.key, kj::mv(ns)); } } @@ -2195,15 +2197,19 @@ class Server::WorkerService final: public Service, const ActorConfig& config, kj::Timer& timer, capnp::ByteStreamFactory& byteStreamFactory, + ChannelTokenHandler& channelTokenHandler, kj::Network& dockerNetwork, kj::Maybe dockerPath, + kj::Maybe containerEgressInterceptorImage, kj::TaskSet& waitUntilTasks) : actorClass(kj::mv(actorClass)), config(config), timer(timer), byteStreamFactory(byteStreamFactory), + channelTokenHandler(channelTokenHandler), dockerNetwork(dockerNetwork), dockerPath(dockerPath), + containerEgressInterceptorImage(containerEgressInterceptorImage), waitUntilTasks(waitUntilTasks) {} // Called at link time to provide needed resources. @@ -2855,8 +2861,9 @@ class Server::WorkerService final: public Service, }; auto client = kj::refcounted(byteStreamFactory, timer, dockerNetwork, - kj::str(dockerPathRef), kj::str(containerId), kj::str(imageName), waitUntilTasks, - kj::mv(cleanupCallback)); + kj::str(dockerPathRef), kj::str(containerId), kj::str(imageName), + containerEgressInterceptorImage.map([](kj::StringPtr s) { return kj::str(s); }), + waitUntilTasks, kj::mv(cleanupCallback), channelTokenHandler); // Store raw pointer in map (does not own) containerClients.insert(kj::str(containerId), client.get()); @@ -2901,8 +2908,10 @@ class Server::WorkerService final: public Service, kj::Maybe> cleanupTask; kj::Timer& timer; capnp::ByteStreamFactory& byteStreamFactory; + ChannelTokenHandler& channelTokenHandler; kj::Network& dockerNetwork; kj::Maybe dockerPath; + kj::Maybe containerEgressInterceptorImage; kj::TaskSet& waitUntilTasks; kj::Maybe alarmScheduler; @@ -3141,6 +3150,7 @@ class Server::WorkerService final: public Service, kj::TaskSet waitUntilTasks; AbortActorsCallback abortActorsCallback; kj::Maybe dockerPath; + kj::Maybe containerEgressInterceptorImage; bool isDynamic; class ActorChannelImpl final: public IoChannelFactory::ActorChannel { @@ -4721,23 +4731,30 @@ kj::Promise> Server::makeWorkerImpl(kj::StringPtr }; kj::Maybe dockerPath = kj::none; + kj::Maybe containerEgressInterceptorImage = kj::none; switch (def.containerEngineConf.which()) { case config::Worker::ContainerEngine::NONE: // No container engine configured break; - case config::Worker::ContainerEngine::LOCAL_DOCKER: - dockerPath = kj::str(def.containerEngineConf.getLocalDocker().getSocketPath()); + case config::Worker::ContainerEngine::LOCAL_DOCKER: { + auto dockerConf = def.containerEngineConf.getLocalDocker(); + dockerPath = kj::str(dockerConf.getSocketPath()); + if (dockerConf.hasContainerEgressInterceptorImage()) { + containerEgressInterceptorImage = kj::str(dockerConf.getContainerEgressInterceptorImage()); + } break; + } } kj::Maybe serviceName; if (!def.isDynamic) serviceName = name; - auto result = kj::refcounted(channelTokenHandler, serviceName, - globalContext->threadContext, monotonicClock, kj::mv(worker), - kj::mv(errorReporter.defaultEntrypoint), kj::mv(errorReporter.namedEntrypoints), - kj::mv(errorReporter.actorClasses), kj::mv(linkCallback), - KJ_BIND_METHOD(*this, abortAllActors), kj::mv(dockerPath), def.isDynamic); + auto result = + kj::refcounted(channelTokenHandler, serviceName, globalContext->threadContext, + monotonicClock, kj::mv(worker), kj::mv(errorReporter.defaultEntrypoint), + kj::mv(errorReporter.namedEntrypoints), kj::mv(errorReporter.actorClasses), + kj::mv(linkCallback), KJ_BIND_METHOD(*this, abortAllActors), kj::mv(dockerPath), + kj::mv(containerEgressInterceptorImage), def.isDynamic); result->initActorNamespaces(def.localActorConfigs, network); co_return result; } diff --git a/src/workerd/server/tests/container-client/container-client.wd-test b/src/workerd/server/tests/container-client/container-client.wd-test index 06e5b2435c3..824bdf3713d 100644 --- a/src/workerd/server/tests/container-client/container-client.wd-test +++ b/src/workerd/server/tests/container-client/container-client.wd-test @@ -8,8 +8,8 @@ const unitTests :Workerd.Config = ( modules = [ (name = "worker", esModule = embed "test.js") ], - compatibilityFlags = ["nodejs_compat", "experimental"], - containerEngine = (localDocker = (socketPath = "unix:/var/run/docker.sock")), + compatibilityFlags = ["enable_ctx_exports", "nodejs_compat", "experimental"], + containerEngine = (localDocker = (socketPath = "unix:/var/run/docker.sock", containerEgressInterceptorImage = "cloudflare/proxy-everything:main")), durableObjectNamespaces = [ ( className = "DurableObjectExample", uniqueKey = "container-client-test-DurableObjectExample", diff --git a/src/workerd/server/tests/container-client/test.js b/src/workerd/server/tests/container-client/test.js index 1633f67d313..f09039081cb 100644 --- a/src/workerd/server/tests/container-client/test.js +++ b/src/workerd/server/tests/container-client/test.js @@ -1,4 +1,4 @@ -import { DurableObject } from 'cloudflare:workers'; +import { DurableObject, WorkerEntrypoint } from 'cloudflare:workers'; import assert from 'node:assert'; import { scheduler } from 'node:timers/promises'; @@ -66,7 +66,7 @@ export class DurableObjectExample extends DurableObject { { let resp; // The retry count here is arbitrary. Can increase it if necessary. - const maxRetries = 6; + const maxRetries = 15; for (let i = 1; i <= maxRetries; i++) { try { resp = await container @@ -260,6 +260,231 @@ export class DurableObjectExample extends DurableObject { getStatus() { return this.ctx.container.running; } + + async ping() { + const container = this.ctx.container; + { + let resp; + // The retry count here is arbitrary. Can increase it if necessary. + const maxRetries = 15; + for (let i = 1; i <= maxRetries; i++) { + try { + resp = await container + .getTcpPort(8080) + .fetch('http://foo/bar/baz', { method: 'POST', body: 'hello' }); + break; + } catch (e) { + if (!e.message.includes('container port not found')) { + throw e; + } + console.info( + `Retrying getTcpPort(8080) for the ${i} time due to an error ${e.message}` + ); + console.info(e); + if (i === maxRetries) { + console.error( + `Failed to connect to container ${container.id}. Retried ${i} times` + ); + throw e; + } + await scheduler.wait(500); + } + } + + assert.equal(resp.status, 200); + assert.equal(resp.statusText, 'OK'); + assert.strictEqual(await resp.text(), 'Hello World!'); + } + } + + async testSetEgressHttp() { + const container = this.ctx.container; + if (container.running) { + let monitor = container.monitor().catch((_err) => {}); + await container.destroy(); + await monitor; + } + + assert.strictEqual(container.running, false); + + // Set up egress TCP mapping to route requests to the binding + // We can configure this even before the container starts. + await container.interceptOutboundHttp( + '1.2.3.4', + this.ctx.exports.TestService({ props: { id: 1234 } }) + ); + + // Start container + container.start(); + + // wait for container to be available + await this.ping(); + + assert.strictEqual(container.running, true); + + // Set up egress TCP mapping to route requests to the binding + // This registers the binding's channel token with the container runtime + await container.interceptOutboundHttp( + '11.0.0.1:9999', + this.ctx.exports.TestService({ props: { id: 1 } }) + ); + + await container.interceptOutboundHttp( + '11.0.0.2:9999', + this.ctx.exports.TestService({ props: { id: 2 } }) + ); + + // we catch all HTTP requests to port 80 + await container.interceptAllOutboundHttp( + this.ctx.exports.TestService({ props: { id: 3 } }) + ); + + { + const response = await container + .getTcpPort(8080) + .fetch('http://foo/intercept', { + headers: { 'x-host': '1.2.3.4:80' }, + }); + assert.equal(response.status, 200); + assert.equal(await response.text(), 'hello binding: 1234'); + } + + { + const response = await container + .getTcpPort(8080) + .fetch('http://foo/intercept', { + headers: { 'x-host': '11.0.0.1:9999' }, + }); + assert.equal(response.status, 200); + assert.equal(await response.text(), 'hello binding: 1'); + } + + { + const response = await container + .getTcpPort(8080) + .fetch('http://foo/intercept', { + headers: { 'x-host': '11.0.0.2:9999' }, + }); + assert.equal(response.status, 200); + assert.equal(await response.text(), 'hello binding: 2'); + } + + { + const response = await container + .getTcpPort(8080) + .fetch('http://foo/intercept', { + headers: { 'x-host': '15.0.0.2:80' }, + }); + assert.equal(response.status, 200); + assert.equal(await response.text(), 'hello binding: 3'); + } + + { + const response = await container + .getTcpPort(8080) + .fetch('http://foo/intercept', { + headers: { 'x-host': '[111::]:80' }, + }); + assert.equal(response.status, 200); + assert.equal(await response.text(), 'hello binding: 3'); + } + } + + async testInterceptWebSocket() { + const container = this.ctx.container; + if (container.running) { + const monitor = container.monitor().catch((_err) => {}); + await container.destroy(); + await monitor; + } + + assert.strictEqual(container.running, false); + + // Start container with WebSocket proxy mode enabled + container.start({ + env: { WS_ENABLED: 'true', WS_PROXY_TARGET: '11.0.0.1:9999' }, + }); + + // Wait for container to be available + await this.ping(); + + assert.strictEqual(container.running, true); + + // Set up egress mapping to route WebSocket requests to the binding + await container.interceptOutboundHttp( + '11.0.0.1:9999', + this.ctx.exports.TestService({ props: { id: 42 } }) + ); + + // Connect to container's /ws endpoint which proxies to the intercepted address + // Flow: DO -> container:8080/ws -> container connects to 11.0.0.1:9999/ws + // -> sidecar intercepts -> workerd -> TestService worker binding + const res = await container.getTcpPort(8080).fetch('http://foo/ws', { + headers: { + Upgrade: 'websocket', + Connection: 'Upgrade', + 'Sec-WebSocket-Key': 'x3JJHMbDL1EzLkh9GBhXDw==', + 'Sec-WebSocket-Version': '13', + }, + }); + + // Should get WebSocket upgrade response + assert.strictEqual(res.status, 101); + assert.strictEqual(res.headers.get('upgrade'), 'websocket'); + assert.strictEqual(!!res.webSocket, true); + + const ws = res.webSocket; + ws.accept(); + + // Listen for response + const messagePromise = new Promise((resolve) => { + ws.addEventListener( + 'message', + (event) => { + resolve(event.data); + }, + { once: true } + ); + }); + + // Send a test message - should go through the whole chain and come back + ws.send('Hello through intercept!'); + + // Should receive response from TestService binding with id 42 + const response = new TextDecoder().decode(await messagePromise); + assert.strictEqual(response, 'Binding 42: Hello through intercept!'); + + ws.close(); + await container.destroy(); + } +} + +export class TestService extends WorkerEntrypoint { + fetch(request) { + // Check if this is a WebSocket upgrade request + const upgradeHeader = request.headers.get('Upgrade'); + if (upgradeHeader && upgradeHeader.toLowerCase() === 'websocket') { + // Handle WebSocket upgrade + const [client, server] = Object.values(new WebSocketPair()); + + server.accept(); + + server.addEventListener('message', (event) => { + // Echo back with binding id prefix + server.send( + `Binding ${this.ctx.props.id}: ${new TextDecoder().decode(event.data)}` + ); + }); + + return new Response(null, { + status: 101, + webSocket: client, + }); + } + + // Regular HTTP request + return new Response('hello binding: ' + this.ctx.props.id); + } } export class DurableObjectExample2 extends DurableObjectExample {} @@ -411,3 +636,21 @@ export const testSetInactivityTimeout = { } }, }; + +// Test setEgressHttp functionality - registers a binding's channel token with the container +export const testSetEgressHttp = { + async test(_ctrl, env) { + const id = env.MY_CONTAINER.idFromName('testSetEgressHttp'); + const stub = env.MY_CONTAINER.get(id); + await stub.testSetEgressHttp(); + }, +}; + +// Test WebSocket through interceptOutboundHttp - DO -> container -> worker binding via WebSocket +export const testInterceptWebSocket = { + async test(_ctrl, env) { + const id = env.MY_CONTAINER.idFromName('testInterceptWebSocket'); + const stub = env.MY_CONTAINER.get(id); + await stub.testInterceptWebSocket(); + }, +}; diff --git a/src/workerd/server/workerd.capnp b/src/workerd/server/workerd.capnp index 2b2bca8d371..9e1cb23b62a 100644 --- a/src/workerd/server/workerd.capnp +++ b/src/workerd/server/workerd.capnp @@ -733,6 +733,12 @@ struct Worker { struct DockerConfiguration { socketPath @0 :Text; # Path to the Docker socket. + + containerEgressInterceptorImage @1 :Text; + # Docker image name for the container egress interceptor sidecar. + # This sidecar intercepts outbound traffic from containers and routes it + # through workerd for egress mappings (setEgressHttp bindings). + # You can find this image in repositories like DockerHub: https://hub.docker.com/r/cloudflare/proxy-everything } } diff --git a/types/generated-snapshot/experimental/index.d.ts b/types/generated-snapshot/experimental/index.d.ts index d5fce056771..cce9417cadf 100755 --- a/types/generated-snapshot/experimental/index.d.ts +++ b/types/generated-snapshot/experimental/index.d.ts @@ -3841,6 +3841,8 @@ interface Container { signal(signo: number): void; getTcpPort(port: number): Fetcher; setInactivityTimeout(durationMs: number | bigint): Promise; + interceptOutboundHttp(addr: string, binding: Fetcher): Promise; + interceptAllOutboundHttp(binding: Fetcher): Promise; } interface ContainerStartupOptions { entrypoint?: string[]; diff --git a/types/generated-snapshot/experimental/index.ts b/types/generated-snapshot/experimental/index.ts index 71c0d95e46c..45fff8f440f 100755 --- a/types/generated-snapshot/experimental/index.ts +++ b/types/generated-snapshot/experimental/index.ts @@ -3850,6 +3850,8 @@ export interface Container { signal(signo: number): void; getTcpPort(port: number): Fetcher; setInactivityTimeout(durationMs: number | bigint): Promise; + interceptOutboundHttp(addr: string, binding: Fetcher): Promise; + interceptAllOutboundHttp(binding: Fetcher): Promise; } export interface ContainerStartupOptions { entrypoint?: string[];