From ae565f69d904635157fe8160e255f5e2decd98ee Mon Sep 17 00:00:00 2001 From: Gabi Villalonga Simon Date: Wed, 28 Jan 2026 13:11:49 -0600 Subject: [PATCH 01/12] container: add setEgressTcp and setEgressHttp RPC methods to Container interface Add new Cap'n Proto schema definitions for container egress routing: - setEgressTcp: configures TCP egress routing to Workers runtime - setEgressHttp: configures HTTP egress routing to Workers runtime Also add Docker API schema additions to support the networking features we will need to introduce proxy-everything. --- src/workerd/io/container.capnp | 11 +++++++++++ src/workerd/server/docker-api.capnp | 18 ++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/src/workerd/io/container.capnp b/src/workerd/io/container.capnp index f79e85defb2..e141b7ef882 100644 --- a/src/workerd/io/container.capnp +++ b/src/workerd/io/container.capnp @@ -110,4 +110,15 @@ interface Container @0x9aaceefc06523bca { # Note that if there is an open connection to the container, the runtime must not shutdown the container. # If there is no activity timeout duration configured and no container connection, it's up to the runtime # to decide when to signal the container to exit. + + setEgressTcp @8 (addr :Text, channelToken :Data); + # Configures egress TCP routing for the container. When the container attempts to connect to the + # specified address, the connection should be routed back to the Workers runtime using the channel token. + + setEgressHttp @9 (hostPort :Text, channelToken :Data); + # Configures egress HTTP routing for the container. When the container attempts to connect to the + # specified host:port, the connection should be routed back to the Workers runtime using the channel token. + # The format of hostPort can be '[':']'. + # Implementation across container runtimes might differ; a perfect world is this method can parse both HTTP and HTTPs + # requests, if possible. } diff --git a/src/workerd/server/docker-api.capnp b/src/workerd/server/docker-api.capnp index e3d9ca3e5ab..79f1b092fef 100644 --- a/src/workerd/server/docker-api.capnp +++ b/src/workerd/server/docker-api.capnp @@ -162,6 +162,7 @@ struct Docker { cgroupParent @47 :Text $Json.name("CgroupParent"); volumeDriver @48 :Text $Json.name("VolumeDriver"); shmSize @49 :UInt32 $Json.name("ShmSize"); + extraHosts @50 :List(Text) $Json.name("ExtraHosts"); # --add-host entries in "host:ip" format } } @@ -286,4 +287,21 @@ struct Docker { } } } + + # Network inspection response (GET /networks/{id}) + struct NetworkInspectResponse { + name @0 :Text $Json.name("Name"); + id @1 :Text $Json.name("Id"); + ipam @2 :IPAM $Json.name("IPAM"); + + struct IPAM { + driver @0 :Text $Json.name("Driver"); + config @1 :List(IPAMConfig) $Json.name("Config"); + + struct IPAMConfig { + subnet @0 :Text $Json.name("Subnet"); + gateway @1 :Text $Json.name("Gateway"); + } + } + } } From 7bc0fc865d95a3e4641e876fdae2fd3a6895260c Mon Sep 17 00:00:00 2001 From: Gabi Villalonga Simon Date: Wed, 28 Jan 2026 13:11:59 -0600 Subject: [PATCH 02/12] extract channel-token into separate library Refactor BUILD.bazel to split channel-token.c++ and channel-token.h. This allows container-client to depend on channel-token. --- src/workerd/server/BUILD.bazel | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/workerd/server/BUILD.bazel b/src/workerd/server/BUILD.bazel index 4f547439006..3594a8ee891 100644 --- a/src/workerd/server/BUILD.bazel +++ b/src/workerd/server/BUILD.bazel @@ -204,19 +204,29 @@ wd_cc_library( }), ) +wd_cc_library( + name = "channel-token", + srcs = ["channel-token.c++"], + hdrs = ["channel-token.h"], + deps = [ + ":channel-token_capnp", + "//src/workerd/io", + "//src/workerd/util:entropy", + ], +) + wd_cc_library( name = "server", srcs = [ - "channel-token.c++", "server.c++", ], hdrs = [ - "channel-token.h", "server.h", ], deps = [ ":actor-id-impl", ":alarm-scheduler", + ":channel-token", ":channel-token_capnp", ":container-client", ":facet-tree-index", @@ -268,7 +278,9 @@ wd_cc_library( hdrs = ["container-client.h"], visibility = ["//visibility:public"], deps = [ + ":channel-token", ":docker-api_capnp", + "//src/workerd/io", "//src/workerd/io:container_capnp", "//src/workerd/jsg", "@capnp-cpp//src/capnp/compat:http-over-capnp", From 184ceace789769e8e9ed8c514630aabdbb94b9f0 Mon Sep 17 00:00:00 2001 From: Gabi Villalonga Simon Date: Wed, 28 Jan 2026 13:12:09 -0600 Subject: [PATCH 03/12] container: add setEgressHttp method to Container class Expose setEgressHttp() to JavaScript, allowing Workers to register WorkerEntrypoint bindings for container egress routing. The method is gated behind the workerdExperimental flag. --- src/workerd/api/container.c++ | 14 ++++++++++++++ src/workerd/api/container.h | 5 +++++ 2 files changed, 19 insertions(+) diff --git a/src/workerd/api/container.c++ b/src/workerd/api/container.c++ index 24137471ff0..7742e3ca56b 100644 --- a/src/workerd/api/container.c++ +++ b/src/workerd/api/container.c++ @@ -71,6 +71,20 @@ jsg::Promise Container::setInactivityTimeout(jsg::Lock& js, int64_t durati return IoContext::current().awaitIo(js, req.sendIgnoringResult()); } +void Container::setEgressHttp(jsg::Lock& js, kj::String addr, jsg::Ref binding) { + auto& ioctx = IoContext::current(); + auto channel = binding->getSubrequestChannel(ioctx); + + // Get a channel token for RPC usage, the container runtime can use this + // token later to redeem a Fetcher. + auto token = channel->getToken(IoChannelFactory::ChannelTokenUsage::RPC); + + auto req = rpcClient->setEgressHttpRequest(); + req.setHostPort(addr); + req.setChannelToken(token); + ioctx.addTask(req.sendIgnoringResult()); +} + jsg::Promise Container::monitor(jsg::Lock& js) { JSG_REQUIRE(running, Error, "monitor() cannot be called on a container that is not running."); diff --git a/src/workerd/api/container.h b/src/workerd/api/container.h index 1cbacce4027..449b334e48c 100644 --- a/src/workerd/api/container.h +++ b/src/workerd/api/container.h @@ -62,6 +62,7 @@ class Container: public jsg::Object { void signal(jsg::Lock& js, int signo); jsg::Ref getTcpPort(jsg::Lock& js, int port); jsg::Promise setInactivityTimeout(jsg::Lock& js, int64_t durationMs); + void setEgressHttp(jsg::Lock& js, kj::String addr, jsg::Ref binding); // TODO(containers): listenTcp() @@ -73,6 +74,10 @@ class Container: public jsg::Object { JSG_METHOD(signal); JSG_METHOD(getTcpPort); JSG_METHOD(setInactivityTimeout); + + if (flags.getWorkerdExperimental()) { + JSG_METHOD(setEgressHttp); + } } void visitForMemoryInfo(jsg::MemoryTracker& tracker) const { From c6d35347189f06209f3ed65dc2d4cc3f020b8f7e Mon Sep 17 00:00:00 2001 From: Gabi Villalonga Simon Date: Wed, 28 Jan 2026 13:12:24 -0600 Subject: [PATCH 04/12] container: implement egress HTTP routing with sidecar container Implement the workerd handling for container egress HTTP routing: - EgressHttpService: HTTP service that handles CONNECT requests from proxy-everything (https://hub.docker.com/r/cloudflare/proxy-everything), it parses tunneled HTTP requests, and forwards them to the appropriate SubrequestChannel based on registered mappings - We need to do proxy-everything container management: we create and monitor a sidecar container (proxy-everything) that shares network namespace with the main container and intercepts outbound traffic via iptables/TPROXY. - Egress listener: HTTP server listening on the Docker bridge gateway that receives proxied requests from proxy-everything. - setEgressHttp RPC implementation that registers address to SubrequestChannel mappings. WebSocket is currently unimplemented. It's a TODO. --- src/workerd/server/container-client.c++ | 438 +++++++++++++++++++++++- src/workerd/server/container-client.h | 48 ++- 2 files changed, 481 insertions(+), 5 deletions(-) diff --git a/src/workerd/server/container-client.c++ b/src/workerd/server/container-client.c++ index 5c118b1f955..23996c47c22 100644 --- a/src/workerd/server/container-client.c++ +++ b/src/workerd/server/container-client.c++ @@ -5,6 +5,7 @@ #include "container-client.h" #include +#include #include #include @@ -107,22 +108,35 @@ ContainerClient::ContainerClient(capnp::ByteStreamFactory& byteStreamFactory, kj::String dockerPath, kj::String containerName, kj::String imageName, + kj::String containerEgressInterceptorImage, kj::TaskSet& waitUntilTasks, - kj::Function cleanupCallback) + kj::Function cleanupCallback, + ChannelTokenHandler& channelTokenHandler) : byteStreamFactory(byteStreamFactory), timer(timer), network(network), dockerPath(kj::mv(dockerPath)), - containerName(kj::encodeUriComponent(kj::mv(containerName))), + containerName(kj::encodeUriComponent(kj::str(containerName))), + sidecarContainerName(kj::encodeUriComponent(kj::str(containerName, "-proxy"))), imageName(kj::mv(imageName)), + containerEgressInterceptorImage(kj::mv(containerEgressInterceptorImage)), waitUntilTasks(waitUntilTasks), - cleanupCallback(kj::mv(cleanupCallback)) {} + cleanupCallback(kj::mv(cleanupCallback)), + channelTokenHandler(channelTokenHandler) {} ContainerClient::~ContainerClient() noexcept(false) { + // Stop the egress listener + stopEgressListener(); + // Call the cleanup callback to remove this client from the ActorNamespace map cleanupCallback(); - // Destroy the Docker container + // Destroy the sidecar container first (it depends on the main container's network) + waitUntilTasks.add(dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::DELETE, + kj::str("/containers/", sidecarContainerName, "?force=true")) + .ignoreResult()); + + // Destroy the main Docker container waitUntilTasks.add(dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::DELETE, kj::str("/containers/", containerName, "?force=true")) .ignoreResult()); @@ -175,6 +189,262 @@ class ContainerClient::DockerPort final: public rpc::Container::Port::Server { kj::Maybe> pumpTask; }; +// HTTP service that handles HTTP CONNECT requests from the container sidecar (dockerproxyanything). +// When the sidecar intercepts container egress traffic, it sends HTTP CONNECT to this service. +// After accepting the CONNECT, the tunnel carries the actual HTTP request from the container, +// which we parse and forward to the appropriate SubrequestChannel based on egressMappings. +class ContainerClient::EgressHttpService final: public kj::HttpService { + public: + EgressHttpService(ContainerClient& containerClient, kj::HttpHeaderTable& headerTable) + : containerClient(containerClient), + headerTable(headerTable) {} + + kj::Promise request(kj::HttpMethod method, + kj::StringPtr url, + const kj::HttpHeaders& headers, + kj::AsyncInputStream& requestBody, + Response& response) override { + // Regular HTTP requests are not expected - we only handle CONNECT + co_return co_await response.sendError(405, "Method Not Allowed", headerTable); + } + + kj::Promise connect(kj::StringPtr host, + const kj::HttpHeaders& headers, + kj::AsyncIoStream& connection, + ConnectResponse& response, + kj::HttpConnectSettings settings) override { + // The host header contains the destination address (e.g., "10.0.0.1:9999") + // that the container was trying to connect to. + auto destAddr = kj::str(host); + + // Accept the CONNECT tunnel + kj::HttpHeaders responseHeaders(headerTable); + response.accept(200, "OK", responseHeaders); + + // Check if there's a mapping for this destination + auto maybeChannel = containerClient.egressMappings.find(destAddr); + + if (maybeChannel == kj::none) { + // No mapping found - check if internet access is enabled + if (!containerClient.internetEnabled) { + // Internet access not enabled - close the connection + connection.shutdownWrite(); + co_return; + } + + // Forward to the general internet via raw TCP + // Just do bidirectional byte pumping, no HTTP parsing needed + auto addr = co_await containerClient.network.parseAddress(destAddr); + auto destConn = co_await addr->connect(); + + // Pump bytes bidirectionally: tunnel <-> destination + auto promises = kj::heapArrayBuilder>(2); + + promises.add(connection.pumpTo(*destConn).then( + [&destConn = *destConn](uint64_t) { destConn.shutdownWrite(); })); + + promises.add(destConn->pumpTo(connection).then([&connection](uint64_t) { + connection.shutdownWrite(); + })); + + // Wait for both directions to complete, keeping destConn alive + co_await kj::joinPromisesFailFast(promises.finish()).attach(kj::mv(destConn)); + co_return; + } + + // Found a mapping - need to parse HTTP and forward to the SubrequestChannel + auto& channel = KJ_ASSERT_NONNULL(maybeChannel); + + // Parse HTTP requests from the tunnel + auto httpInput = kj::newHttpInputStream(connection, headerTable); + + // Loop to handle multiple requests on the same connection (HTTP/1.1 keep-alive) + while (true) { + // Check if there's more data + bool hasMore = co_await httpInput->awaitNextMessage(); + if (!hasMore) { + // Client closed the connection + co_return; + } + + auto req = co_await httpInput->readRequest(); + + // Forward to the SubrequestChannel + IoChannelFactory::SubrequestMetadata metadata; + auto worker = channel->startRequest(kj::mv(metadata)); + + // Create a response handler that writes back to the tunnel + TunnelHttpResponse tunnelResponse(connection); + + // Forward the request to the worker + co_await worker->request(req.method, req.url, req.headers, *req.body, tunnelResponse); + + // Finalize the response (writes chunked terminator if needed) + co_await tunnelResponse.end(); + + // After the response is complete, shut down the write side to signal EOF + connection.shutdownWrite(); + co_return; + } + } + + private: + ContainerClient& containerClient; + kj::HttpHeaderTable& headerTable; + + // Response implementation that writes HTTP responses back through the tunnel. + // This class serializes the HTTP response and writes it to the tunnel stream. + class TunnelHttpResponse final: public kj::HttpService::Response { + public: + TunnelHttpResponse(kj::AsyncIoStream& tunnel): tunnel(tunnel), isChunked(false) {} + + kj::Own send(uint statusCode, + kj::StringPtr statusText, + const kj::HttpHeaders& headers, + kj::Maybe expectedBodySize = kj::none) override { + + isChunked = (expectedBodySize == kj::none); + auto headersStr = headers.serializeResponse(statusCode, statusText); + + return kj::heap(tunnel, kj::mv(headersStr), isChunked); + } + + kj::Own acceptWebSocket(const kj::HttpHeaders& headers) override { + KJ_FAIL_REQUIRE("WebSocket upgrade not supported through egress tunnel"); + } + + // Called after worker->request() completes to finalize the response + kj::Promise end() { + if (isChunked) { + // Write final empty chunk to terminate chunked encoding + co_await tunnel.write("0\r\n\r\n"_kjb); + } + } + + private: + kj::AsyncIoStream& tunnel; + bool isChunked; + }; + + // Output stream that writes to the tunnel. + // Headers are written on the first write or when the stream ends. + // If chunked mode is enabled, wraps body data in chunked transfer encoding. + class TunnelOutputStream final: public kj::AsyncOutputStream { + public: + TunnelOutputStream(kj::AsyncIoStream& tunnel, kj::String serializedHeaders, bool chunked) + : tunnel(tunnel), + serializedHeaders(kj::mv(serializedHeaders)), + headersWritten(false), + chunked(chunked) {} + + kj::Promise write(kj::ArrayPtr buffer) override { + co_await ensureHeadersWritten(); + if (chunked) { + // Write chunk: size in hex, CRLF, data, CRLF + auto chunkHeader = kj::str(kj::hex(buffer.size()), "\r\n"); + co_await tunnel.write(chunkHeader.asBytes()); + co_await tunnel.write(buffer); + co_await tunnel.write("\r\n"_kjb); + } else { + co_await tunnel.write(buffer); + } + } + + kj::Promise write(kj::ArrayPtr> pieces) override { + co_await ensureHeadersWritten(); + if (chunked) { + // Calculate total size for chunk header + size_t totalSize = 0; + for (auto& piece: pieces) { + totalSize += piece.size(); + } + auto chunkHeader = kj::str(kj::hex(totalSize), "\r\n"); + co_await tunnel.write(chunkHeader.asBytes()); + co_await tunnel.write(pieces); + co_await tunnel.write("\r\n"_kjb); + } else { + co_await tunnel.write(pieces); + } + } + + kj::Promise whenWriteDisconnected() override { + return tunnel.whenWriteDisconnected(); + } + + private: + // Ensure headers are written + kj::Promise ensureHeadersWritten() { + if (!headersWritten) { + headersWritten = true; + co_await tunnel.write(serializedHeaders.asBytes()); + } + } + + kj::AsyncIoStream& tunnel; + kj::String serializedHeaders; + bool headersWritten; + bool chunked; + }; +}; + +kj::Promise ContainerClient::getDockerBridgeGateway() { + // Docker API: GET /networks/bridge + auto response = co_await dockerApiRequest( + network, kj::str(dockerPath), kj::HttpMethod::GET, kj::str("/networks/bridge")); + + if (response.statusCode == 200) { + auto jsonRoot = decodeJsonResponse(response.body); + auto ipamConfig = jsonRoot.getIpam().getConfig(); + if (ipamConfig.size() > 0) { + auto gateway = ipamConfig[0].getGateway(); + if (gateway.size() > 0) { + co_return kj::str(gateway); + } + } + } + + // Fallback to default Docker bridge gateway + KJ_LOG(WARNING, "Could not determine Docker bridge gateway, using default 172.17.0.1"); + co_return kj::str("172.17.0.1"); +} + +kj::Promise ContainerClient::startEgressListener(kj::StringPtr listenAddress) { + // Create header table for HTTP parsing + auto headerTable = kj::heap(); + auto& headerTableRef = *headerTable; + egressHeaderTable = kj::mv(headerTable); + + // Create the egress HTTP service + auto service = kj::heap(*this, headerTableRef); + + // Create the HTTP server + auto httpServer = kj::heap(timer, headerTableRef, *service); + auto& httpServerRef = *httpServer; + egressHttpServer = kj::mv(httpServer); + + // Listen on the Docker bridge gateway IP with port 0 to let the OS pick a free port + auto addr = co_await network.parseAddress(kj::str(listenAddress, ":0")); + auto listener = addr->listen(); + + // Get the actual port that was assigned + uint16_t chosenPort = listener->getPort(); + + // Run the server in the background - this promise never completes normally + // We need to detach it and return the port + egressListenerTask = + httpServerRef.listenHttp(*listener) + .attach(kj::mv(listener), kj::mv(service)) + .eagerlyEvaluate([](kj::Exception&& e) { LOG_EXCEPTION("Error in egress listener", e); }); + + co_return chosenPort; +} + +void ContainerClient::stopEgressListener() { + egressListenerTask = kj::none; + egressHttpServer = kj::none; + egressHeaderTable = kj::none; +} + kj::Promise ContainerClient::dockerApiRequest(kj::Network& network, kj::String dockerPath, kj::HttpMethod method, @@ -300,6 +570,10 @@ kj::Promise ContainerClient::createContainer( // We need to set a restart policy to avoid having ambiguous states // where the container we're managing is stuck at "exited" state. hostConfig.initRestartPolicy().setName("on-failure"); + // Add host.docker.internal mapping so containers can reach the host + // This is equivalent to --add-host=host.docker.internal:host-gateway + auto extraHosts = hostConfig.initExtraHosts(1); + extraHosts.set(0, "host.docker.internal:host-gateway"); auto response = co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::POST, kj::str("/containers/create?name=", containerName), codec.encode(jsonRoot)); @@ -380,6 +654,104 @@ kj::Promise ContainerClient::destroyContainer() { } } +// Creates the sidecar container for egress proxy. +// The sidecar shares the network namespace with the main container and runs +// proxy-everything to intercept and proxy egress traffic. +kj::Promise ContainerClient::createSidecarContainer(uint16_t egressPort) { + // Docker API: POST /containers/create + // Equivalent to: docker run --cap-add=NET_ADMIN --network container:$(CONTAINER) ... + capnp::JsonCodec codec; + codec.handleByAnnotation(); + capnp::MallocMessageBuilder message; + auto jsonRoot = message.initRoot(); + jsonRoot.setImage(containerEgressInterceptorImage); + + // Pass the egress port to the sidecar via command line flag + auto cmd = jsonRoot.initCmd(2); + cmd.set(0, "--http-egress-port"); + cmd.set(1, kj::str(egressPort)); + + auto hostConfig = jsonRoot.initHostConfig(); + // Share network namespace with the main container + hostConfig.setNetworkMode(kj::str("container:", containerName)); + + // Sidecar needs NET_ADMIN capability for iptables/TPROXY + auto capAdd = hostConfig.initCapAdd(1); + capAdd.set(0, "NET_ADMIN"); + hostConfig.setAutoRemove(true); + + auto response = co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::POST, + kj::str("/containers/create?name=", sidecarContainerName), codec.encode(jsonRoot)); + + // statusCode 409 refers to "conflict". Occurs when a container with the given name exists. + // In that case we destroy and re-create the container. + if (response.statusCode == 409) { + co_await destroySidecarContainer(); + response = co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::POST, + kj::str("/containers/create?name=", sidecarContainerName), codec.encode(jsonRoot)); + } + + // statusCode 201 refers to "container created successfully" + if (response.statusCode != 201) { + JSG_REQUIRE(response.statusCode != 404, Error, "No such image available named ", + containerEgressInterceptorImage, + ". Please ensure the container egress interceptor image is built and available."); + JSG_REQUIRE(response.statusCode != 409, Error, "Sidecar container already exists"); + JSG_FAIL_REQUIRE( + Error, "Create sidecar container failed with [", response.statusCode, "] ", response.body); + } +} + +kj::Promise ContainerClient::startSidecarContainer() { + // Docker API: POST /containers/{id}/start + auto endpoint = kj::str("/containers/", sidecarContainerName, "/start"); + auto response = co_await dockerApiRequest( + network, kj::str(dockerPath), kj::HttpMethod::POST, kj::mv(endpoint), kj::str("")); + // statusCode 304 refers to "container already started" + JSG_REQUIRE(response.statusCode != 304, Error, "Sidecar container already started"); + // statusCode 204 refers to "no error" + JSG_REQUIRE( + response.statusCode == 204, Error, "Starting sidecar container failed with: ", response.body); +} + +kj::Promise ContainerClient::destroySidecarContainer() { + auto endpoint = kj::str("/containers/", sidecarContainerName, "?force=true"); + co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::DELETE, kj::mv(endpoint)) + .ignoreResult(); + auto response = co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::POST, + kj::str("/containers/", sidecarContainerName, "/wait?condition=removed")); + JSG_REQUIRE(response.statusCode == 200 || response.statusCode == 404, Error, + "Waiting for container sidecar removal failed with: ", response.statusCode, response.body); + KJ_LOG(WARNING, "Container destroyed"); +} + +kj::Promise ContainerClient::monitorSidecarContainer() { + // Docker API: POST /containers/{id}/wait - wait for container to exit + auto endpoint = kj::str("/containers/", sidecarContainerName, "/wait"); + auto response = co_await dockerApiRequest( + network, kj::str(dockerPath), kj::HttpMethod::POST, kj::mv(endpoint)); + + if (response.statusCode == 200) { + // Container exited - parse the exit code and log it + auto jsonRoot = decodeJsonResponse(response.body); + auto exitCode = jsonRoot.getStatusCode(); + KJ_LOG(WARNING, "Sidecar container exited unexpectedly", sidecarContainerName, exitCode); + + // Fetch the container logs to help diagnose the exit + auto logsEndpoint = + kj::str("/containers/", sidecarContainerName, "/logs?stdout=true&stderr=true&tail=50"); + auto logsResponse = co_await dockerApiRequest( + network, kj::str(dockerPath), kj::HttpMethod::GET, kj::mv(logsEndpoint)); + if (logsResponse.statusCode == 200) { + KJ_LOG(WARNING, "Sidecar container logs:", logsResponse.body); + } + } else if (response.statusCode == 404) { + // Container was removed before we could monitor it - this is normal during shutdown + } else { + KJ_LOG(ERROR, "Failed to monitor sidecar container", response.statusCode, response.body); + } +} + kj::Promise ContainerClient::status(StatusContext context) { const auto [isRunning, _ports] = co_await inspectContainer(); context.getResults().setRunning(isRunning); @@ -399,6 +771,10 @@ kj::Promise ContainerClient::start(StartContext context) { environment = params.getEnvironmentVariables(); } + // Track whether internet access is enabled for this container + internetEnabled = params.getEnableInternet(); + + // Create and start the main user container co_await createContainer(entrypoint, environment); co_await startContainer(); } @@ -430,6 +806,8 @@ kj::Promise ContainerClient::monitor(MonitorContext context) { } kj::Promise ContainerClient::destroy(DestroyContext context) { + // Destroy sidecar first since it depends on the main container's network + co_await destroySidecarContainer(); co_await destroyContainer(); } @@ -468,6 +846,58 @@ kj::Promise ContainerClient::listenTcp(ListenTcpContext context) { KJ_UNIMPLEMENTED("listenTcp not implemented for Docker containers - use port mapping instead"); } +kj::Promise ContainerClient::setEgressHttp(SetEgressHttpContext context) { + auto params = context.getParams(); + auto addr = kj::str(params.getHostPort()); + auto tokenBytes = params.getChannelToken(); + + // Wait for any previous setEgressHttp call to complete (serializes sidecar setup) + KJ_IF_SOME(lock, egressSetupLock) { + co_await lock.addBranch(); + } + + // If no egressListenerTask, start one now. + // The biggest disadvantage of doing it here, is that if the workerd process restarts, + // and the container is still running, it might have no connectivity. + if (egressListenerTask == kj::none) { + // Create a promise/fulfiller pair to signal when setup is complete + // TODO: Actually, every RPC in this class would benefit from this. + auto paf = kj::newPromiseAndFulfiller(); + egressSetupLock = paf.promise.fork(); + KJ_DEFER(paf.fulfiller->fulfill()); + + // Get the Docker bridge gateway IP to listen on (only accessible from containers) + auto bridgeGateway = co_await getDockerBridgeGateway(); + + // Start the egress listener first so it's ready when the sidecar starts. + // Use port 0 to let the OS pick a free port dynamically. + egressListenerPort = co_await startEgressListener(bridgeGateway); + + // Create and start the sidecar container that shares the network namespace + // with the main container and intercepts egress traffic. + // Pass the dynamically chosen port so the sidecar knows where to connect. + co_await createSidecarContainer(egressListenerPort); + co_await startSidecarContainer(); + + // Monitor the sidecar container for unexpected exits + waitUntilTasks.add(monitorSidecarContainer()); + } + + // Redeem the channel token to get a SubrequestChannel + auto subrequestChannel = channelTokenHandler.decodeSubrequestChannelToken( + workerd::IoChannelFactory::ChannelTokenUsage::RPC, tokenBytes); + + // Store the mapping + egressMappings.upsert(kj::mv(addr), kj::mv(subrequestChannel), + [](auto& existing, auto&& newValue) { existing = kj::mv(newValue); }); + + co_return; +} + +kj::Promise ContainerClient::setEgressTcp(SetEgressTcpContext context) { + KJ_UNIMPLEMENTED("setEgressTcp not implemented - use setEgressHttp for now"); +} + kj::Own ContainerClient::addRef() { return kj::addRef(*this); } diff --git a/src/workerd/server/container-client.h b/src/workerd/server/container-client.h index 456c8911eb5..246157c630e 100644 --- a/src/workerd/server/container-client.h +++ b/src/workerd/server/container-client.h @@ -5,9 +5,12 @@ #pragma once #include +#include +#include #include #include +#include #include #include #include @@ -32,8 +35,10 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte kj::String dockerPath, kj::String containerName, kj::String imageName, + kj::String containerEgressInterceptorImage, kj::TaskSet& waitUntilTasks, - kj::Function cleanupCallback); + kj::Function cleanupCallback, + ChannelTokenHandler& channelTokenHandler); ~ContainerClient() noexcept(false); @@ -46,6 +51,8 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte kj::Promise getTcpPort(GetTcpPortContext context) override; kj::Promise listenTcp(ListenTcpContext context) override; kj::Promise setInactivityTimeout(SetInactivityTimeoutContext context) override; + kj::Promise setEgressTcp(SetEgressTcpContext context) override; + kj::Promise setEgressHttp(SetEgressHttpContext context) override; kj::Own addRef(); @@ -55,7 +62,12 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte kj::Network& network; kj::String dockerPath; kj::String containerName; + kj::String sidecarContainerName; kj::String imageName; + + // Container egress interceptor image name (sidecar for egress proxy) + kj::String containerEgressInterceptorImage; + kj::TaskSet& waitUntilTasks; static constexpr kj::StringPtr defaultEnv[] = {"CLOUDFLARE_COUNTRY_A2=XX"_kj, @@ -91,8 +103,42 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte kj::Promise killContainer(uint32_t signal); kj::Promise destroyContainer(); + // Sidecar container management (for egress proxy) + kj::Promise createSidecarContainer(uint16_t egressPort); + kj::Promise startSidecarContainer(); + kj::Promise destroySidecarContainer(); + kj::Promise monitorSidecarContainer(); + // Cleanup callback to remove from ActorNamespace map when destroyed kj::Function cleanupCallback; + + // For redeeming channel tokens received via setEgressHttp + ChannelTokenHandler& channelTokenHandler; + + // Egress HTTP mappings: address -> SubrequestChannel + kj::HashMap> egressMappings; + + // Whether general internet access is enabled for this container + bool internetEnabled = false; + + // Egress HTTP listener for handling container egress via HTTP CONNECT from sidecar + class EgressHttpService; + kj::Maybe> egressHeaderTable; + kj::Maybe> egressHttpServer; + kj::Maybe> egressListenerTask; + + // The dynamically chosen port for the egress listener + uint16_t egressListenerPort = 0; + + // Mutex to serialize setEgressHttp() calls (sidecar setup must complete before adding mappings) + kj::Maybe> egressSetupLock; + + // Get the Docker bridge network gateway IP (e.g., "172.17.0.1") + kj::Promise getDockerBridgeGateway(); + // Start the egress listener on the specified address, returns the chosen port + kj::Promise startEgressListener(kj::StringPtr listenAddress); + // Stop the egress listener + void stopEgressListener(); }; } // namespace workerd::server From 269a841e4ec83b335840caee9925ef777f012731 Mon Sep 17 00:00:00 2001 From: Gabi Villalonga Simon Date: Wed, 28 Jan 2026 13:12:34 -0600 Subject: [PATCH 05/12] container: wire up containerEgressInterceptorImage configuration Add containerEgressInterceptorImage field to DockerConfiguration in workerd.capnp with default value 'cloudflare/proxy-everything:main' Pass ChannelTokenHandler and containerEgressInterceptorImage through WorkerService -> ActorNamespace -> ContainerClient chain Update ContainerClient instantiation to include the new parameters --- src/workerd/server/server.c++ | 39 +++++++++++++++++++++++--------- src/workerd/server/workerd.capnp | 5 ++++ 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index 681f92cb14b..a576b03b901 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -1896,6 +1896,7 @@ class Server::WorkerService final: public Service, LinkCallback linkCallback, AbortActorsCallback abortActorsCallback, kj::Maybe dockerPathParam, + kj::Maybe containerEgressInterceptorImageParam, bool isDynamic) : channelTokenHandler(channelTokenHandler), serviceName(serviceName), @@ -1909,6 +1910,7 @@ class Server::WorkerService final: public Service, waitUntilTasks(*this), abortActorsCallback(kj::mv(abortActorsCallback)), dockerPath(kj::mv(dockerPathParam)), + containerEgressInterceptorImage(kj::mv(containerEgressInterceptorImageParam)), isDynamic(isDynamic) {} // Call immediately after the constructor to set up `actorNamespaces`. This can't happen during @@ -1929,9 +1931,9 @@ class Server::WorkerService final: public Service, } auto actorClass = kj::refcounted(*this, entry.key, Frankenvalue()); - auto ns = - kj::heap(kj::mv(actorClass), entry.value, threadContext.getUnsafeTimer(), - threadContext.getByteStreamFactory(), network, dockerPath, waitUntilTasks); + auto ns = kj::heap(kj::mv(actorClass), entry.value, + threadContext.getUnsafeTimer(), threadContext.getByteStreamFactory(), channelTokenHandler, + network, dockerPath, containerEgressInterceptorImage, waitUntilTasks); actorNamespaces.insert(entry.key, kj::mv(ns)); } } @@ -2195,15 +2197,19 @@ class Server::WorkerService final: public Service, const ActorConfig& config, kj::Timer& timer, capnp::ByteStreamFactory& byteStreamFactory, + ChannelTokenHandler& channelTokenHandler, kj::Network& dockerNetwork, kj::Maybe dockerPath, + kj::Maybe containerEgressInterceptorImage, kj::TaskSet& waitUntilTasks) : actorClass(kj::mv(actorClass)), config(config), timer(timer), byteStreamFactory(byteStreamFactory), + channelTokenHandler(channelTokenHandler), dockerNetwork(dockerNetwork), dockerPath(dockerPath), + containerEgressInterceptorImage(containerEgressInterceptorImage), waitUntilTasks(waitUntilTasks) {} // Called at link time to provide needed resources. @@ -2854,9 +2860,12 @@ class Server::WorkerService final: public Service, containerClients.erase(containerId); }; + auto& interceptorImage = KJ_ASSERT_NONNULL(containerEgressInterceptorImage, + "containerEgressInterceptorImage must be defined when docker is enabled."); + auto client = kj::refcounted(byteStreamFactory, timer, dockerNetwork, - kj::str(dockerPathRef), kj::str(containerId), kj::str(imageName), waitUntilTasks, - kj::mv(cleanupCallback)); + kj::str(dockerPathRef), kj::str(containerId), kj::str(imageName), + kj::str(interceptorImage), waitUntilTasks, kj::mv(cleanupCallback), channelTokenHandler); // Store raw pointer in map (does not own) containerClients.insert(kj::str(containerId), client.get()); @@ -2901,8 +2910,10 @@ class Server::WorkerService final: public Service, kj::Maybe> cleanupTask; kj::Timer& timer; capnp::ByteStreamFactory& byteStreamFactory; + ChannelTokenHandler& channelTokenHandler; kj::Network& dockerNetwork; kj::Maybe dockerPath; + kj::Maybe containerEgressInterceptorImage; kj::TaskSet& waitUntilTasks; kj::Maybe alarmScheduler; @@ -3141,6 +3152,7 @@ class Server::WorkerService final: public Service, kj::TaskSet waitUntilTasks; AbortActorsCallback abortActorsCallback; kj::Maybe dockerPath; + kj::Maybe containerEgressInterceptorImage; bool isDynamic; class ActorChannelImpl final: public IoChannelFactory::ActorChannel { @@ -4721,23 +4733,28 @@ kj::Promise> Server::makeWorkerImpl(kj::StringPtr }; kj::Maybe dockerPath = kj::none; + kj::Maybe containerEgressInterceptorImage = kj::none; switch (def.containerEngineConf.which()) { case config::Worker::ContainerEngine::NONE: // No container engine configured break; - case config::Worker::ContainerEngine::LOCAL_DOCKER: - dockerPath = kj::str(def.containerEngineConf.getLocalDocker().getSocketPath()); + case config::Worker::ContainerEngine::LOCAL_DOCKER: { + auto dockerConf = def.containerEngineConf.getLocalDocker(); + dockerPath = kj::str(dockerConf.getSocketPath()); + // Cap'n Proto default is "cloudflare/container-egress-interceptor:latest" + containerEgressInterceptorImage = kj::str(dockerConf.getContainerEgressInterceptorImage()); break; + } } kj::Maybe serviceName; if (!def.isDynamic) serviceName = name; auto result = kj::refcounted(channelTokenHandler, serviceName, - globalContext->threadContext, monotonicClock, kj::mv(worker), - kj::mv(errorReporter.defaultEntrypoint), kj::mv(errorReporter.namedEntrypoints), - kj::mv(errorReporter.actorClasses), kj::mv(linkCallback), - KJ_BIND_METHOD(*this, abortAllActors), kj::mv(dockerPath), def.isDynamic); + globalContext->threadContext, monotonicClock, kj::mv(worker), kj::mv(errorReporter.defaultEntrypoint), + kj::mv(errorReporter.namedEntrypoints), kj::mv(errorReporter.actorClasses), + kj::mv(linkCallback), KJ_BIND_METHOD(*this, abortAllActors), kj::mv(dockerPath), + kj::mv(containerEgressInterceptorImage), def.isDynamic); result->initActorNamespaces(def.localActorConfigs, network); co_return result; } diff --git a/src/workerd/server/workerd.capnp b/src/workerd/server/workerd.capnp index 2b2bca8d371..07350f58bdb 100644 --- a/src/workerd/server/workerd.capnp +++ b/src/workerd/server/workerd.capnp @@ -733,6 +733,11 @@ struct Worker { struct DockerConfiguration { socketPath @0 :Text; # Path to the Docker socket. + + containerEgressInterceptorImage @1 :Text = "cloudflare/proxy-everything:main"; + # Docker image name for the container egress interceptor sidecar. + # This sidecar intercepts outbound traffic from containers and routes it + # through workerd for egress mappings (setEgressHttp bindings). } } From fb08eb94fd09ce516501730e5d90e07725a02328 Mon Sep 17 00:00:00 2001 From: Gabi Villalonga Simon Date: Wed, 28 Jan 2026 13:12:44 -0600 Subject: [PATCH 06/12] container: add /intercept endpoint to test container image Add a new /intercept HTTP endpoint to the test container that makes an outbound fetch request to a configurable host (via x-host header, defaults to 11.0.0.1). This enables testing of egress HTTP routing where container traffic is intercepted and routed back to Workers bindings. --- images/container-client-test/app.js | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/images/container-client-test/app.js b/images/container-client-test/app.js index 8190141809b..3f3a7b40204 100644 --- a/images/container-client-test/app.js +++ b/images/container-client-test/app.js @@ -9,6 +9,23 @@ const server = createServer(function (req, res) { return; } + if (req.url === '/intercept') { + const targetHost = req.headers['x-host'] || '11.0.0.1'; + fetch(`http://${targetHost}`) + .then((result) => result.text()) + .then((body) => { + res.writeHead(200); + res.write(body); + res.end(); + }) + .catch((err) => { + res.writeHead(500); + res.write(err.message); + res.end(); + }); + return; + } + res.writeHead(200, { 'Content-Type': 'text/plain' }); res.write('Hello World!'); res.end(); From 401a305a89e63feb5aa32792adc194c61d9ef46a Mon Sep 17 00:00:00 2001 From: Gabi Villalonga Simon Date: Wed, 28 Jan 2026 13:12:58 -0600 Subject: [PATCH 07/12] container: add tests and TypeScript types for setEgressHttp Add test coverage for the setEgressHttp functionality. Also adds enable_ctx_exports compatibility flag to test config and updates TypeScript type definitions with setEgressHttp signature. --- .../container-client/container-client.wd-test | 2 +- .../server/tests/container-client/test.js | 106 +++++++++++++++++- .../experimental/index.d.ts | 1 + .../generated-snapshot/experimental/index.ts | 1 + 4 files changed, 107 insertions(+), 3 deletions(-) diff --git a/src/workerd/server/tests/container-client/container-client.wd-test b/src/workerd/server/tests/container-client/container-client.wd-test index 06e5b2435c3..7dd4321d2d5 100644 --- a/src/workerd/server/tests/container-client/container-client.wd-test +++ b/src/workerd/server/tests/container-client/container-client.wd-test @@ -8,7 +8,7 @@ const unitTests :Workerd.Config = ( modules = [ (name = "worker", esModule = embed "test.js") ], - compatibilityFlags = ["nodejs_compat", "experimental"], + compatibilityFlags = ["enable_ctx_exports", "nodejs_compat", "experimental"], containerEngine = (localDocker = (socketPath = "unix:/var/run/docker.sock")), durableObjectNamespaces = [ ( className = "DurableObjectExample", diff --git a/src/workerd/server/tests/container-client/test.js b/src/workerd/server/tests/container-client/test.js index 1633f67d313..f20b13a46f9 100644 --- a/src/workerd/server/tests/container-client/test.js +++ b/src/workerd/server/tests/container-client/test.js @@ -1,4 +1,4 @@ -import { DurableObject } from 'cloudflare:workers'; +import { DurableObject, WorkerEntrypoint } from 'cloudflare:workers'; import assert from 'node:assert'; import { scheduler } from 'node:timers/promises'; @@ -66,7 +66,7 @@ export class DurableObjectExample extends DurableObject { { let resp; // The retry count here is arbitrary. Can increase it if necessary. - const maxRetries = 6; + const maxRetries = 15; for (let i = 1; i <= maxRetries; i++) { try { resp = await container @@ -260,6 +260,99 @@ export class DurableObjectExample extends DurableObject { getStatus() { return this.ctx.container.running; } + + async ping() { + const container = this.ctx.container; + { + let resp; + // The retry count here is arbitrary. Can increase it if necessary. + const maxRetries = 15; + for (let i = 1; i <= maxRetries; i++) { + try { + resp = await container + .getTcpPort(8080) + .fetch('http://foo/bar/baz', { method: 'POST', body: 'hello' }); + break; + } catch (e) { + if (!e.message.includes('container port not found')) { + throw e; + } + console.info( + `Retrying getTcpPort(8080) for the ${i} time due to an error ${e.message}` + ); + console.info(e); + if (i === maxRetries) { + console.error( + `Failed to connect to container ${container.id}. Retried ${i} times` + ); + throw e; + } + await scheduler.wait(500); + } + } + + assert.equal(resp.status, 200); + assert.equal(resp.statusText, 'OK'); + assert.strictEqual(await resp.text(), 'Hello World!'); + } + } + + async testSetEgressHttp() { + const container = this.ctx.container; + if (container.running) { + let monitor = container.monitor().catch((_err) => {}); + await container.destroy(); + await monitor; + } + + assert.strictEqual(container.running, false); + + // Start container + container.start(); + + // wait for container to be available + await this.ping(); + + assert.strictEqual(container.running, true); + + // Set up egress TCP mapping to route requests to the binding + // This registers the binding's channel token with the container runtime + container.setEgressHttp( + '11.0.0.1:9999', + this.ctx.exports.TestService({ props: { id: 1 } }) + ); + + container.setEgressHttp( + '11.0.0.2:9999', + this.ctx.exports.TestService({ props: { id: 2 } }) + ); + + { + const response = await container + .getTcpPort(8080) + .fetch('http://foo/intercept', { + headers: { 'x-host': '11.0.0.1:9999' }, + }); + assert.equal(response.status, 200); + assert.equal(await response.text(), 'hello binding: 1'); + } + + { + const response = await container + .getTcpPort(8080) + .fetch('http://foo/intercept', { + headers: { 'x-host': '11.0.0.2:9999' }, + }); + assert.equal(response.status, 200); + assert.equal(await response.text(), 'hello binding: 2'); + } + } +} + +export class TestService extends WorkerEntrypoint { + fetch() { + return new Response('hello binding: ' + this.ctx.props.id); + } } export class DurableObjectExample2 extends DurableObjectExample {} @@ -411,3 +504,12 @@ export const testSetInactivityTimeout = { } }, }; + +// Test setEgressHttp functionality - registers a binding's channel token with the container +export const testSetEgressHttp = { + async test(_ctrl, env) { + const id = env.MY_CONTAINER.idFromName('testSetEgressHttp'); + const stub = env.MY_CONTAINER.get(id); + await stub.testSetEgressHttp(); + }, +}; diff --git a/types/generated-snapshot/experimental/index.d.ts b/types/generated-snapshot/experimental/index.d.ts index d5fce056771..b1088803823 100755 --- a/types/generated-snapshot/experimental/index.d.ts +++ b/types/generated-snapshot/experimental/index.d.ts @@ -3841,6 +3841,7 @@ interface Container { signal(signo: number): void; getTcpPort(port: number): Fetcher; setInactivityTimeout(durationMs: number | bigint): Promise; + setEgressHttp(addr: string, binding: Fetcher): void; } interface ContainerStartupOptions { entrypoint?: string[]; diff --git a/types/generated-snapshot/experimental/index.ts b/types/generated-snapshot/experimental/index.ts index 71c0d95e46c..ccde9b9940e 100755 --- a/types/generated-snapshot/experimental/index.ts +++ b/types/generated-snapshot/experimental/index.ts @@ -3850,6 +3850,7 @@ export interface Container { signal(signo: number): void; getTcpPort(port: number): Fetcher; setInactivityTimeout(durationMs: number | bigint): Promise; + setEgressHttp(addr: string, binding: Fetcher): void; } export interface ContainerStartupOptions { entrypoint?: string[]; From 49b133268b7529ea3a493abdb8b28ba8f6df19b6 Mon Sep 17 00:00:00 2001 From: Gabi Villalonga Simon Date: Wed, 28 Jan 2026 14:46:05 -0600 Subject: [PATCH 08/12] container: Stop having a default image for proxy-everything, and pull it for tests in bazel --- build/deps/oci.MODULE.bazel | 10 +++++++++- images/BUILD.bazel | 1 + images/container-client-test/BUILD.bazel | 7 +++++++ src/workerd/server/container-client.c++ | 3 ++- src/workerd/server/docker-api.capnp | 14 ++++++++++++++ src/workerd/server/server.c++ | 2 +- .../container-client/container-client.wd-test | 2 +- src/workerd/server/workerd.capnp | 2 +- 8 files changed, 36 insertions(+), 5 deletions(-) diff --git a/build/deps/oci.MODULE.bazel b/build/deps/oci.MODULE.bazel index d7c8f38a4b7..17f58122188 100644 --- a/build/deps/oci.MODULE.bazel +++ b/build/deps/oci.MODULE.bazel @@ -12,4 +12,12 @@ oci.pull( "linux/arm64/v8", ], ) -use_repo(oci, "node_25_slim", "node_25_slim_linux_amd64", "node_25_slim_linux_arm64_v8") +oci.pull( + name = "proxy_everything", + image = "docker.io/cloudflare/proxy-everything:main", + platforms = [ + "linux/amd64", + "linux/arm64/v8", + ], +) +use_repo(oci, "node_25_slim", "node_25_slim_linux_amd64", "node_25_slim_linux_arm64_v8", "proxy_everything", "proxy_everything_linux_amd64", "proxy_everything_linux_arm64_v8") diff --git a/images/BUILD.bazel b/images/BUILD.bazel index 789428f60a3..91c80184435 100644 --- a/images/BUILD.bazel +++ b/images/BUILD.bazel @@ -2,6 +2,7 @@ load("@rules_multirun//:defs.bzl", "command", "multirun") IMAGES = { "container-client-test": "//images/container-client-test:load", + "proxy-everything": "//images/container-client-test:load-proxy-everything", } [ diff --git a/images/container-client-test/BUILD.bazel b/images/container-client-test/BUILD.bazel index c3069f29edf..e8d0cad2aa6 100644 --- a/images/container-client-test/BUILD.bazel +++ b/images/container-client-test/BUILD.bazel @@ -40,3 +40,10 @@ oci_load( repo_tags = ["cloudflare/workerd/container-client-test:latest"], visibility = ["//visibility:public"], ) + +oci_load( + name = "load-proxy-everything", + image = "@proxy_everything", + repo_tags = ["cloudflare/proxy-everything:main"], + visibility = ["//visibility:public"], +) diff --git a/src/workerd/server/container-client.c++ b/src/workerd/server/container-client.c++ index 23996c47c22..cfbd64d8389 100644 --- a/src/workerd/server/container-client.c++ +++ b/src/workerd/server/container-client.c++ @@ -189,7 +189,7 @@ class ContainerClient::DockerPort final: public rpc::Container::Port::Server { kj::Maybe> pumpTask; }; -// HTTP service that handles HTTP CONNECT requests from the container sidecar (dockerproxyanything). +// HTTP service that handles HTTP CONNECT requests from the container sidecar (proxy-everything). // When the sidecar intercepts container egress traffic, it sends HTTP CONNECT to this service. // After accepting the CONNECT, the tunnel carries the actual HTTP request from the container, // which we parse and forward to the appropriate SubrequestChannel based on egressMappings. @@ -850,6 +850,7 @@ kj::Promise ContainerClient::setEgressHttp(SetEgressHttpContext context) { auto params = context.getParams(); auto addr = kj::str(params.getHostPort()); auto tokenBytes = params.getChannelToken(); + JSG_REQUIRE(containerEgressInterceptorImage != "", Error, "should be set for setEgressHttp"); // Wait for any previous setEgressHttp call to complete (serializes sidecar setup) KJ_IF_SOME(lock, egressSetupLock) { diff --git a/src/workerd/server/docker-api.capnp b/src/workerd/server/docker-api.capnp index 79f1b092fef..9be3e2a636d 100644 --- a/src/workerd/server/docker-api.capnp +++ b/src/workerd/server/docker-api.capnp @@ -177,6 +177,20 @@ struct Docker { statusCode @0 :Int32 $Json.name("StatusCode"); } + struct ImagePullResponse { + # Response from ImagePull operation (streamed as multiple JSON objects) + status @0 :Text $Json.name("status"); + id @1 :Text $Json.name("id"); # Layer ID (if applicable) + progress @2 :Text $Json.name("progress"); # Progress bar text + progressDetail @3 :ProgressDetail $Json.name("progressDetail"); + error @4 :Text $Json.name("error"); # Error message if pull failed + + struct ProgressDetail { + current @0 :UInt64 $Json.name("current"); + total @1 :UInt64 $Json.name("total"); + } + } + struct ContainerState { # Container's running state status @0 :Text $Json.name("Status"); # "created", "running", "paused", "restarting", "removing", "exited", "dead" diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index a576b03b901..d4d770b426d 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -4741,7 +4741,7 @@ kj::Promise> Server::makeWorkerImpl(kj::StringPtr case config::Worker::ContainerEngine::LOCAL_DOCKER: { auto dockerConf = def.containerEngineConf.getLocalDocker(); dockerPath = kj::str(dockerConf.getSocketPath()); - // Cap'n Proto default is "cloudflare/container-egress-interceptor:latest" + // Cap'n Proto default is "cloudflare/proxy-everything:main" containerEgressInterceptorImage = kj::str(dockerConf.getContainerEgressInterceptorImage()); break; } diff --git a/src/workerd/server/tests/container-client/container-client.wd-test b/src/workerd/server/tests/container-client/container-client.wd-test index 7dd4321d2d5..824bdf3713d 100644 --- a/src/workerd/server/tests/container-client/container-client.wd-test +++ b/src/workerd/server/tests/container-client/container-client.wd-test @@ -9,7 +9,7 @@ const unitTests :Workerd.Config = ( (name = "worker", esModule = embed "test.js") ], compatibilityFlags = ["enable_ctx_exports", "nodejs_compat", "experimental"], - containerEngine = (localDocker = (socketPath = "unix:/var/run/docker.sock")), + containerEngine = (localDocker = (socketPath = "unix:/var/run/docker.sock", containerEgressInterceptorImage = "cloudflare/proxy-everything:main")), durableObjectNamespaces = [ ( className = "DurableObjectExample", uniqueKey = "container-client-test-DurableObjectExample", diff --git a/src/workerd/server/workerd.capnp b/src/workerd/server/workerd.capnp index 07350f58bdb..a8c7ac40296 100644 --- a/src/workerd/server/workerd.capnp +++ b/src/workerd/server/workerd.capnp @@ -734,7 +734,7 @@ struct Worker { socketPath @0 :Text; # Path to the Docker socket. - containerEgressInterceptorImage @1 :Text = "cloudflare/proxy-everything:main"; + containerEgressInterceptorImage @1 :Text; # Docker image name for the container egress interceptor sidecar. # This sidecar intercepts outbound traffic from containers and routes it # through workerd for egress mappings (setEgressHttp bindings). From 26873cc52e08dad188c4d1016e9df9d30bfdd699 Mon Sep 17 00:00:00 2001 From: Gabi Villalonga Simon Date: Fri, 30 Jan 2026 13:35:41 -0600 Subject: [PATCH 09/12] address some of the PR comments --- images/container-client-test/app.js | 43 ++- src/workerd/api/container.c++ | 23 +- src/workerd/api/container.h | 7 +- src/workerd/io/container.capnp | 7 +- src/workerd/server/container-client.c++ | 354 ++++++++++-------- src/workerd/server/container-client.h | 23 +- src/workerd/server/docker-api.capnp | 14 - src/workerd/server/server.c++ | 5 +- .../server/tests/container-client/test.js | 130 ++++++- src/workerd/server/workerd.capnp | 1 + 10 files changed, 392 insertions(+), 215 deletions(-) diff --git a/images/container-client-test/app.js b/images/container-client-test/app.js index 3f3a7b40204..8c349482796 100644 --- a/images/container-client-test/app.js +++ b/images/container-client-test/app.js @@ -1,11 +1,10 @@ const { createServer } = require('http'); const webSocketEnabled = process.env.WS_ENABLED === 'true'; +const wsProxyTarget = process.env.WS_PROXY_TARGET || null; -// Create HTTP server const server = createServer(function (req, res) { if (req.url === '/ws') { - // WebSocket upgrade will be handled by the WebSocket server return; } @@ -20,7 +19,7 @@ const server = createServer(function (req, res) { }) .catch((err) => { res.writeHead(500); - res.write(err.message); + res.write(`${targetHost} ${err.message}`); res.end(); }); return; @@ -31,30 +30,30 @@ const server = createServer(function (req, res) { res.end(); }); -// Check if WebSocket functionality is enabled if (webSocketEnabled) { const WebSocket = require('ws'); + const wss = new WebSocket.Server({ server, path: '/ws' }); - // Create WebSocket server - const wss = new WebSocket.Server({ - server: server, - path: '/ws', - }); - - wss.on('connection', function connection(ws) { - console.log('WebSocket connection established'); - - ws.on('message', function message(data) { - console.log('Received:', data.toString()); - // Echo the message back with prefix - ws.send('Echo: ' + data.toString()); - }); + wss.on('connection', function (clientWs) { + if (wsProxyTarget) { + const targetWs = new WebSocket(`ws://${wsProxyTarget}/ws`); + const ready = new Promise(function (resolve) { + targetWs.on('open', resolve); + }); - ws.on('close', function close() { - console.log('WebSocket connection closed'); - }); + targetWs.on('message', (data) => clientWs.send(data)); + clientWs.on('message', async function (data) { + await ready; + targetWs.send(data); + }); - ws.on('error', console.error); + clientWs.on('close', targetWs.close); + targetWs.on('close', clientWs.close); + } else { + clientWs.on('message', function (data) { + clientWs.send('Echo: ' + data.toString()); + }); + } }); } diff --git a/src/workerd/api/container.c++ b/src/workerd/api/container.c++ index 7742e3ca56b..ed6c0fea03e 100644 --- a/src/workerd/api/container.c++ +++ b/src/workerd/api/container.c++ @@ -71,7 +71,8 @@ jsg::Promise Container::setInactivityTimeout(jsg::Lock& js, int64_t durati return IoContext::current().awaitIo(js, req.sendIgnoringResult()); } -void Container::setEgressHttp(jsg::Lock& js, kj::String addr, jsg::Ref binding) { +jsg::Promise Container::interceptOutboundHttp( + jsg::Lock& js, kj::String addr, jsg::Ref binding) { auto& ioctx = IoContext::current(); auto channel = binding->getSubrequestChannel(ioctx); @@ -82,7 +83,25 @@ void Container::setEgressHttp(jsg::Lock& js, kj::String addr, jsg::Ref auto req = rpcClient->setEgressHttpRequest(); req.setHostPort(addr); req.setChannelToken(token); - ioctx.addTask(req.sendIgnoringResult()); + return ioctx.awaitIo(js, req.sendIgnoringResult()); +} + +jsg::Promise Container::interceptAllOutboundHttp(jsg::Lock& js, jsg::Ref binding) { + auto& ioctx = IoContext::current(); + auto channel = binding->getSubrequestChannel(ioctx); + auto token = channel->getToken(IoChannelFactory::ChannelTokenUsage::RPC); + + // Register for all IPv4 and IPv6 addresses (on port 80) + auto reqV4 = rpcClient->setEgressHttpRequest(); + reqV4.setHostPort("0.0.0.0/0"_kj); + reqV4.setChannelToken(token); + + auto reqV6 = rpcClient->setEgressHttpRequest(); + reqV6.setHostPort("::/0"_kj); + reqV6.setChannelToken(token); + + return ioctx.awaitIo( + js, kj::joinPromises(kj::arr(reqV4.sendIgnoringResult(), reqV6.sendIgnoringResult()))); } jsg::Promise Container::monitor(jsg::Lock& js) { diff --git a/src/workerd/api/container.h b/src/workerd/api/container.h index 449b334e48c..f3a13aa33dd 100644 --- a/src/workerd/api/container.h +++ b/src/workerd/api/container.h @@ -62,7 +62,9 @@ class Container: public jsg::Object { void signal(jsg::Lock& js, int signo); jsg::Ref getTcpPort(jsg::Lock& js, int port); jsg::Promise setInactivityTimeout(jsg::Lock& js, int64_t durationMs); - void setEgressHttp(jsg::Lock& js, kj::String addr, jsg::Ref binding); + jsg::Promise interceptOutboundHttp( + jsg::Lock& js, kj::String addr, jsg::Ref binding); + jsg::Promise interceptAllOutboundHttp(jsg::Lock& js, jsg::Ref binding); // TODO(containers): listenTcp() @@ -76,7 +78,8 @@ class Container: public jsg::Object { JSG_METHOD(setInactivityTimeout); if (flags.getWorkerdExperimental()) { - JSG_METHOD(setEgressHttp); + JSG_METHOD(interceptOutboundHttp); + JSG_METHOD(interceptAllOutboundHttp); } } diff --git a/src/workerd/io/container.capnp b/src/workerd/io/container.capnp index e141b7ef882..ffb4b427ccf 100644 --- a/src/workerd/io/container.capnp +++ b/src/workerd/io/container.capnp @@ -112,13 +112,14 @@ interface Container @0x9aaceefc06523bca { # to decide when to signal the container to exit. setEgressTcp @8 (addr :Text, channelToken :Data); + # TODO: This method is unimplemented. + # # Configures egress TCP routing for the container. When the container attempts to connect to the # specified address, the connection should be routed back to the Workers runtime using the channel token. setEgressHttp @9 (hostPort :Text, channelToken :Data); # Configures egress HTTP routing for the container. When the container attempts to connect to the # specified host:port, the connection should be routed back to the Workers runtime using the channel token. - # The format of hostPort can be '[':']'. - # Implementation across container runtimes might differ; a perfect world is this method can parse both HTTP and HTTPs - # requests, if possible. + # The format of hostPort can be '[':']'. If port is omitted, it's assumed to only cover port 80. + # This method does not support HTTPs yet. } diff --git a/src/workerd/server/container-client.c++ b/src/workerd/server/container-client.c++ index cfbd64d8389..69c97a5507a 100644 --- a/src/workerd/server/container-client.c++ +++ b/src/workerd/server/container-client.c++ @@ -9,10 +9,14 @@ #include #include +#include +#include + #include #include #include #include +#include #include #include #include @@ -22,6 +26,77 @@ namespace workerd::server { namespace { + +// Parsed address from parseHostPort() +struct ParsedAddress { + kj::CidrRange cidr; + kj::Maybe port; +}; + +struct HostAndPort { + kj::String host; + kj::Maybe port; +}; + +// Strips a port suffix from a string, returning the host and port separately. +// For IPv6, expects brackets: "[::1]:8080" -> ("::1", 8080) +// For IPv4: "10.0.0.1:8080" -> ("10.0.0.1", 8080) +// If no port, returns the host as-is with no port. +HostAndPort stripPort(kj::StringPtr str) { + if (str.startsWith("[")) { + // Bracketed IPv6: "[ipv6]" or "[ipv6]:port" + size_t closeBracket = + KJ_REQUIRE_NONNULL(str.findLast(']'), "Unclosed '[' in address string.", str); + + auto host = str.slice(1, closeBracket); + + if (str.size() > closeBracket + 1) { + KJ_REQUIRE( + str.slice(closeBracket + 1).startsWith(":"), "Expected port suffix after ']'.", str); + auto port = KJ_REQUIRE_NONNULL( + str.slice(closeBracket + 2).tryParseAs(), "Invalid port number.", str); + return {kj::str(host), port}; + } + return {kj::str(host), kj::none}; + } + + // No brackets - check if there's exactly one colon (IPv4 with port) + // IPv6 without brackets has 2+ colons and no port suffix supported + KJ_IF_SOME(colonPos, str.findLast(':')) { + auto afterColon = str.slice(colonPos + 1); + KJ_IF_SOME(port, afterColon.tryParseAs()) { + // Valid port - but only treat as port for IPv4 (check no other colons before) + auto beforeColon = str.first(colonPos); + if (beforeColon.findFirst(':') == kj::none) { + // No other colons, so this is IPv4 with port + return {kj::str(beforeColon), port}; + } + } + } + + // No port found + return {kj::str(str), kj::none}; +} + +// Build a CidrRange from a host string, adding /32 or /128 prefix if not present. +kj::CidrRange makeCidr(kj::StringPtr host) { + if (host.findFirst('/') != kj::none) { + return kj::CidrRange(host); + } + // No CIDR prefix - add /32 for IPv4, /128 for IPv6 + bool isIpv6 = host.findFirst(':') != kj::none; + return kj::CidrRange(kj::str(host, isIpv6 ? "/128" : "/32")); +} + +// Parses "host[:port]" strings. Handles: +// - IPv4: "10.0.0.1", "10.0.0.1:8080", "10.0.0.0/8", "10.0.0.0/8:8080" +// - IPv6 with brackets: "[::1]", "[::1]:8080", "[fe80::1]", "[fe80::/10]:8080" +// - IPv6 without brackets: "::1", "fe80::1", "fe80::/10" +ParsedAddress parseHostPort(kj::StringPtr str) { + auto hostAndPort = stripPort(str); + return {makeCidr(hostAndPort.host), hostAndPort.port}; +} + kj::StringPtr signalToString(uint32_t signal) { switch (signal) { case 1: @@ -193,6 +268,30 @@ class ContainerClient::DockerPort final: public rpc::Container::Port::Server { // When the sidecar intercepts container egress traffic, it sends HTTP CONNECT to this service. // After accepting the CONNECT, the tunnel carries the actual HTTP request from the container, // which we parse and forward to the appropriate SubrequestChannel based on egressMappings. +// Inner HTTP service that handles requests inside the CONNECT tunnel. +// Forwards requests to the worker binding via SubrequestChannel. +class ContainerClient::InnerEgressService final: public kj::HttpService { + public: + InnerEgressService(IoChannelFactory::SubrequestChannel& channel): channel(channel) {} + + kj::Promise request(kj::HttpMethod method, + kj::StringPtr url, + const kj::HttpHeaders& headers, + kj::AsyncInputStream& requestBody, + Response& response) override { + // Forward to the SubrequestChannel + IoChannelFactory::SubrequestMetadata metadata; + auto worker = channel.startRequest(kj::mv(metadata)); + + // Forward the request to the worker - the response flows back through 'response' + co_await worker->request(method, url, headers, requestBody, response); + } + + private: + IoChannelFactory::SubrequestChannel& channel; +}; + +// Outer HTTP service that handles CONNECT requests from the sidecar. class ContainerClient::EgressHttpService final: public kj::HttpService { public: EgressHttpService(ContainerClient& containerClient, kj::HttpHeaderTable& headerTable) @@ -222,169 +321,54 @@ class ContainerClient::EgressHttpService final: public kj::HttpService { response.accept(200, "OK", responseHeaders); // Check if there's a mapping for this destination - auto maybeChannel = containerClient.egressMappings.find(destAddr); - - if (maybeChannel == kj::none) { - // No mapping found - check if internet access is enabled - if (!containerClient.internetEnabled) { - // Internet access not enabled - close the connection - connection.shutdownWrite(); - co_return; - } + auto mapping = containerClient.findEgressMapping(destAddr, /*defaultPort=*/80); - // Forward to the general internet via raw TCP - // Just do bidirectional byte pumping, no HTTP parsing needed - auto addr = co_await containerClient.network.parseAddress(destAddr); - auto destConn = co_await addr->connect(); + KJ_IF_SOME(channel, mapping) { + // Found a mapping - layer an HttpServer on top of the tunnel connection + // to handle HTTP parsing/serialization automatically - // Pump bytes bidirectionally: tunnel <-> destination - auto promises = kj::heapArrayBuilder>(2); + // Create the inner service that forwards to the worker binding + auto innerService = kj::heap(*channel); - promises.add(connection.pumpTo(*destConn).then( - [&destConn = *destConn](uint64_t) { destConn.shutdownWrite(); })); + // Create an HttpServer for the tunnel connection + auto innerServer = + kj::heap(containerClient.timer, headerTable, *innerService); - promises.add(destConn->pumpTo(connection).then([&connection](uint64_t) { - connection.shutdownWrite(); - })); + // Let the HttpServer handle the HTTP traffic inside the tunnel + co_await innerServer->listenHttpCleanDrain(connection) + .attach(kj::mv(innerServer), kj::mv(innerService)); - // Wait for both directions to complete, keeping destConn alive - co_await kj::joinPromisesFailFast(promises.finish()).attach(kj::mv(destConn)); co_return; } - // Found a mapping - need to parse HTTP and forward to the SubrequestChannel - auto& channel = KJ_ASSERT_NONNULL(maybeChannel); - - // Parse HTTP requests from the tunnel - auto httpInput = kj::newHttpInputStream(connection, headerTable); - - // Loop to handle multiple requests on the same connection (HTTP/1.1 keep-alive) - while (true) { - // Check if there's more data - bool hasMore = co_await httpInput->awaitNextMessage(); - if (!hasMore) { - // Client closed the connection - co_return; - } - - auto req = co_await httpInput->readRequest(); - - // Forward to the SubrequestChannel - IoChannelFactory::SubrequestMetadata metadata; - auto worker = channel->startRequest(kj::mv(metadata)); + // No mapping found - check if internet access is enabled + if (!containerClient.internetEnabled) { + // Internet access not enabled - close the connection + connection.shutdownWrite(); + co_return; + } - // Create a response handler that writes back to the tunnel - TunnelHttpResponse tunnelResponse(connection); + // Forward to the general internet via raw TCP + // Just do bidirectional byte pumping, no HTTP parsing needed + auto addr = co_await containerClient.network.parseAddress(destAddr); + auto destConn = co_await addr->connect(); - // Forward the request to the worker - co_await worker->request(req.method, req.url, req.headers, *req.body, tunnelResponse); + // Pump bytes bidirectionally: tunnel <-> destination + auto connToDestination = connection.pumpTo(*destConn).then( + [&destConn = *destConn](uint64_t) { destConn.shutdownWrite(); }); - // Finalize the response (writes chunked terminator if needed) - co_await tunnelResponse.end(); + auto destinationToConn = + destConn->pumpTo(connection).then([&connection](uint64_t) { connection.shutdownWrite(); }); - // After the response is complete, shut down the write side to signal EOF - connection.shutdownWrite(); - co_return; - } + // Wait for both directions to complete + co_await kj::joinPromisesFailFast( + kj::arr(kj::mv(connToDestination), kj::mv(destinationToConn))); + co_return; } private: ContainerClient& containerClient; kj::HttpHeaderTable& headerTable; - - // Response implementation that writes HTTP responses back through the tunnel. - // This class serializes the HTTP response and writes it to the tunnel stream. - class TunnelHttpResponse final: public kj::HttpService::Response { - public: - TunnelHttpResponse(kj::AsyncIoStream& tunnel): tunnel(tunnel), isChunked(false) {} - - kj::Own send(uint statusCode, - kj::StringPtr statusText, - const kj::HttpHeaders& headers, - kj::Maybe expectedBodySize = kj::none) override { - - isChunked = (expectedBodySize == kj::none); - auto headersStr = headers.serializeResponse(statusCode, statusText); - - return kj::heap(tunnel, kj::mv(headersStr), isChunked); - } - - kj::Own acceptWebSocket(const kj::HttpHeaders& headers) override { - KJ_FAIL_REQUIRE("WebSocket upgrade not supported through egress tunnel"); - } - - // Called after worker->request() completes to finalize the response - kj::Promise end() { - if (isChunked) { - // Write final empty chunk to terminate chunked encoding - co_await tunnel.write("0\r\n\r\n"_kjb); - } - } - - private: - kj::AsyncIoStream& tunnel; - bool isChunked; - }; - - // Output stream that writes to the tunnel. - // Headers are written on the first write or when the stream ends. - // If chunked mode is enabled, wraps body data in chunked transfer encoding. - class TunnelOutputStream final: public kj::AsyncOutputStream { - public: - TunnelOutputStream(kj::AsyncIoStream& tunnel, kj::String serializedHeaders, bool chunked) - : tunnel(tunnel), - serializedHeaders(kj::mv(serializedHeaders)), - headersWritten(false), - chunked(chunked) {} - - kj::Promise write(kj::ArrayPtr buffer) override { - co_await ensureHeadersWritten(); - if (chunked) { - // Write chunk: size in hex, CRLF, data, CRLF - auto chunkHeader = kj::str(kj::hex(buffer.size()), "\r\n"); - co_await tunnel.write(chunkHeader.asBytes()); - co_await tunnel.write(buffer); - co_await tunnel.write("\r\n"_kjb); - } else { - co_await tunnel.write(buffer); - } - } - - kj::Promise write(kj::ArrayPtr> pieces) override { - co_await ensureHeadersWritten(); - if (chunked) { - // Calculate total size for chunk header - size_t totalSize = 0; - for (auto& piece: pieces) { - totalSize += piece.size(); - } - auto chunkHeader = kj::str(kj::hex(totalSize), "\r\n"); - co_await tunnel.write(chunkHeader.asBytes()); - co_await tunnel.write(pieces); - co_await tunnel.write("\r\n"_kjb); - } else { - co_await tunnel.write(pieces); - } - } - - kj::Promise whenWriteDisconnected() override { - return tunnel.whenWriteDisconnected(); - } - - private: - // Ensure headers are written - kj::Promise ensureHeadersWritten() { - if (!headersWritten) { - headersWritten = true; - co_await tunnel.write(serializedHeaders.asBytes()); - } - } - - kj::AsyncIoStream& tunnel; - kj::String serializedHeaders; - bool headersWritten; - bool chunked; - }; }; kj::Promise ContainerClient::getDockerBridgeGateway() { @@ -409,18 +393,15 @@ kj::Promise ContainerClient::getDockerBridgeGateway() { } kj::Promise ContainerClient::startEgressListener(kj::StringPtr listenAddress) { - // Create header table for HTTP parsing - auto headerTable = kj::heap(); - auto& headerTableRef = *headerTable; - egressHeaderTable = kj::mv(headerTable); - // Create the egress HTTP service - auto service = kj::heap(*this, headerTableRef); + auto service = kj::heap(*this, headerTable); // Create the HTTP server - auto httpServer = kj::heap(timer, headerTableRef, *service); + auto httpServer = kj::heap(timer, headerTable, *service); auto& httpServerRef = *httpServer; - egressHttpServer = kj::mv(httpServer); + + // Attach service to httpServer so ownership is clear - httpServer owns service + egressHttpServer = httpServer.attach(kj::mv(service)); // Listen on the Docker bridge gateway IP with port 0 to let the OS pick a free port auto addr = co_await network.parseAddress(kj::str(listenAddress, ":0")); @@ -430,11 +411,12 @@ kj::Promise ContainerClient::startEgressListener(kj::StringPtr listenA uint16_t chosenPort = listener->getPort(); // Run the server in the background - this promise never completes normally - // We need to detach it and return the port - egressListenerTask = - httpServerRef.listenHttp(*listener) - .attach(kj::mv(listener), kj::mv(service)) - .eagerlyEvaluate([](kj::Exception&& e) { LOG_EXCEPTION("Error in egress listener", e); }); + egressListenerTask = httpServerRef.listenHttp(*listener) + .attach(kj::mv(listener)) + .eagerlyEvaluate([](kj::Exception&& e) { + LOG_EXCEPTION( + "Workerd could not listen in the TCP port to proxy traffic off the docker container", e); + }); co_return chosenPort; } @@ -442,7 +424,6 @@ kj::Promise ContainerClient::startEgressListener(kj::StringPtr listenA void ContainerClient::stopEgressListener() { egressListenerTask = kj::none; egressHttpServer = kj::none; - egressHeaderTable = kj::none; } kj::Promise ContainerClient::dockerApiRequest(kj::Network& network, @@ -846,13 +827,55 @@ kj::Promise ContainerClient::listenTcp(ListenTcpContext context) { KJ_UNIMPLEMENTED("listenTcp not implemented for Docker containers - use port mapping instead"); } +kj::Maybe ContainerClient::findEgressMapping( + kj::StringPtr destAddr, uint16_t defaultPort) { + auto hostAndPort = stripPort(destAddr); + uint16_t port = hostAndPort.port.orDefault(defaultPort); + + struct sockaddr_storage ss; + memset(&ss, 0, sizeof(ss)); + + auto* sin = reinterpret_cast(&ss); + auto* sin6 = reinterpret_cast(&ss); + + // This is kind of awful. We could theoretically have a CidrRange + // parse this, but we don't have a way to compare two CidrRanges. + // Ideally, KJ would have a library to parse IPs, and we are able to have a cidr.includes(ip) method. + if (inet_pton(AF_INET, hostAndPort.host.cStr(), &sin->sin_addr) == 1) { + ss.ss_family = AF_INET; + sin->sin_port = htons(port); + } else if (inet_pton(AF_INET6, hostAndPort.host.cStr(), &sin6->sin6_addr) == 1) { + ss.ss_family = AF_INET6; + sin6->sin6_port = htons(port); + } else { + JSG_KJ_EXCEPTION(FAILED, Error, "host is an invalid address"); + } + + // Find a matching mapping + for (auto& mapping: egressMappings) { + if (mapping.cidr.matches(reinterpret_cast(&ss))) { + // CIDR matches, now check port. + // If the port is 0, we match anything. + if (mapping.port == 0 || mapping.port == port) { + return mapping.channel.get(); + } + } + } + + return kj::none; +} + kj::Promise ContainerClient::setEgressHttp(SetEgressHttpContext context) { auto params = context.getParams(); - auto addr = kj::str(params.getHostPort()); + auto hostPortStr = kj::str(params.getHostPort()); auto tokenBytes = params.getChannelToken(); JSG_REQUIRE(containerEgressInterceptorImage != "", Error, "should be set for setEgressHttp"); - // Wait for any previous setEgressHttp call to complete (serializes sidecar setup) + auto parsed = parseHostPort(hostPortStr); + uint16_t port = parsed.port.orDefault(80); + auto cidr = kj::mv(parsed.cidr); + + // Wait for any previous setEgressHttp call to complete KJ_IF_SOME(lock, egressSetupLock) { co_await lock.addBranch(); } @@ -889,8 +912,11 @@ kj::Promise ContainerClient::setEgressHttp(SetEgressHttpContext context) { workerd::IoChannelFactory::ChannelTokenUsage::RPC, tokenBytes); // Store the mapping - egressMappings.upsert(kj::mv(addr), kj::mv(subrequestChannel), - [](auto& existing, auto&& newValue) { existing = kj::mv(newValue); }); + egressMappings.add(EgressMapping{ + .cidr = kj::mv(cidr), + .port = port, + .channel = kj::mv(subrequestChannel), + }); co_return; } diff --git a/src/workerd/server/container-client.h b/src/workerd/server/container-client.h index 246157c630e..c7a2c1e0ac5 100644 --- a/src/workerd/server/container-client.h +++ b/src/workerd/server/container-client.h @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -58,6 +59,8 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte private: capnp::ByteStreamFactory& byteStreamFactory; + // Create header table for HTTP parsing + kj::HttpHeaderTable headerTable; kj::Timer& timer; kj::Network& network; kj::String dockerPath; @@ -115,15 +118,29 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte // For redeeming channel tokens received via setEgressHttp ChannelTokenHandler& channelTokenHandler; - // Egress HTTP mappings: address -> SubrequestChannel - kj::HashMap> egressMappings; + // Represents a parsed egress mapping with CIDR and port matching + struct EgressMapping { + // The cidr to match this mapping on + kj::CidrRange cidr; + // Port to match (0 means match all ports) + uint16_t port; + // The channel to route matching connections to + kj::Own channel; + }; + + // Egress HTTP mappings - list of CIDR/port rules to match against + kj::Vector egressMappings; + + // Find a matching egress mapping for the given destination address (host:port format) + kj::Maybe findEgressMapping( + kj::StringPtr destAddr, uint16_t defaultPort); // Whether general internet access is enabled for this container bool internetEnabled = false; // Egress HTTP listener for handling container egress via HTTP CONNECT from sidecar class EgressHttpService; - kj::Maybe> egressHeaderTable; + class InnerEgressService; kj::Maybe> egressHttpServer; kj::Maybe> egressListenerTask; diff --git a/src/workerd/server/docker-api.capnp b/src/workerd/server/docker-api.capnp index 9be3e2a636d..79f1b092fef 100644 --- a/src/workerd/server/docker-api.capnp +++ b/src/workerd/server/docker-api.capnp @@ -177,20 +177,6 @@ struct Docker { statusCode @0 :Int32 $Json.name("StatusCode"); } - struct ImagePullResponse { - # Response from ImagePull operation (streamed as multiple JSON objects) - status @0 :Text $Json.name("status"); - id @1 :Text $Json.name("id"); # Layer ID (if applicable) - progress @2 :Text $Json.name("progress"); # Progress bar text - progressDetail @3 :ProgressDetail $Json.name("progressDetail"); - error @4 :Text $Json.name("error"); # Error message if pull failed - - struct ProgressDetail { - current @0 :UInt64 $Json.name("current"); - total @1 :UInt64 $Json.name("total"); - } - } - struct ContainerState { # Container's running state status @0 :Text $Json.name("Status"); # "created", "running", "paused", "restarting", "removing", "exited", "dead" diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index d4d770b426d..2a8bd6982f5 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -4741,8 +4741,9 @@ kj::Promise> Server::makeWorkerImpl(kj::StringPtr case config::Worker::ContainerEngine::LOCAL_DOCKER: { auto dockerConf = def.containerEngineConf.getLocalDocker(); dockerPath = kj::str(dockerConf.getSocketPath()); - // Cap'n Proto default is "cloudflare/proxy-everything:main" - containerEgressInterceptorImage = kj::str(dockerConf.getContainerEgressInterceptorImage()); + if (dockerConf.hasContainerEgressInterceptorImage()) { + containerEgressInterceptorImage = kj::str(dockerConf.getContainerEgressInterceptorImage()); + } break; } } diff --git a/src/workerd/server/tests/container-client/test.js b/src/workerd/server/tests/container-client/test.js index f20b13a46f9..f093ad7d0e5 100644 --- a/src/workerd/server/tests/container-client/test.js +++ b/src/workerd/server/tests/container-client/test.js @@ -317,16 +317,21 @@ export class DurableObjectExample extends DurableObject { // Set up egress TCP mapping to route requests to the binding // This registers the binding's channel token with the container runtime - container.setEgressHttp( + await container.interceptOutboundHttp( '11.0.0.1:9999', this.ctx.exports.TestService({ props: { id: 1 } }) ); - container.setEgressHttp( + await container.interceptOutboundHttp( '11.0.0.2:9999', this.ctx.exports.TestService({ props: { id: 2 } }) ); + // we catch all HTTP requests to port 80 + await container.interceptAllOutboundHttp( + this.ctx.exports.TestService({ props: { id: 3 } }) + ); + { const response = await container .getTcpPort(8080) @@ -346,11 +351,121 @@ export class DurableObjectExample extends DurableObject { assert.equal(response.status, 200); assert.equal(await response.text(), 'hello binding: 2'); } + + { + const response = await container + .getTcpPort(8080) + .fetch('http://foo/intercept', { + headers: { 'x-host': '15.0.0.2:80' }, + }); + assert.equal(response.status, 200); + assert.equal(await response.text(), 'hello binding: 3'); + } + + { + const response = await container + .getTcpPort(8080) + .fetch('http://foo/intercept', { + headers: { 'x-host': '[111::]:80' }, + }); + assert.equal(response.status, 200); + assert.equal(await response.text(), 'hello binding: 3'); + } + } + + async testInterceptWebSocket() { + const container = this.ctx.container; + if (container.running) { + const monitor = container.monitor().catch((_err) => {}); + await container.destroy(); + await monitor; + } + + assert.strictEqual(container.running, false); + + // Start container with WebSocket proxy mode enabled + container.start({ + env: { WS_ENABLED: 'true', WS_PROXY_TARGET: '11.0.0.1:9999' }, + }); + + // Wait for container to be available + await this.ping(); + + assert.strictEqual(container.running, true); + + // Set up egress mapping to route WebSocket requests to the binding + await container.interceptOutboundHttp( + '11.0.0.1:9999', + this.ctx.exports.TestService({ props: { id: 42 } }) + ); + + // Connect to container's /ws endpoint which proxies to the intercepted address + // Flow: DO -> container:8080/ws -> container connects to 11.0.0.1:9999/ws + // -> sidecar intercepts -> workerd -> TestService worker binding + const res = await container.getTcpPort(8080).fetch('http://foo/ws', { + headers: { + Upgrade: 'websocket', + Connection: 'Upgrade', + 'Sec-WebSocket-Key': 'x3JJHMbDL1EzLkh9GBhXDw==', + 'Sec-WebSocket-Version': '13', + }, + }); + + // Should get WebSocket upgrade response + assert.strictEqual(res.status, 101); + assert.strictEqual(res.headers.get('upgrade'), 'websocket'); + assert.strictEqual(!!res.webSocket, true); + + const ws = res.webSocket; + ws.accept(); + + // Listen for response + const messagePromise = new Promise((resolve) => { + ws.addEventListener( + 'message', + (event) => { + resolve(event.data); + }, + { once: true } + ); + }); + + // Send a test message - should go through the whole chain and come back + ws.send('Hello through intercept!'); + + // Should receive response from TestService binding with id 42 + const response = new TextDecoder().decode(await messagePromise); + assert.strictEqual(response, 'Binding 42: Hello through intercept!'); + + ws.close(); + await container.destroy(); } } export class TestService extends WorkerEntrypoint { - fetch() { + fetch(request) { + // Check if this is a WebSocket upgrade request + const upgradeHeader = request.headers.get('Upgrade'); + if (upgradeHeader && upgradeHeader.toLowerCase() === 'websocket') { + // Handle WebSocket upgrade + const [client, server] = Object.values(new WebSocketPair()); + + server.accept(); + + server.addEventListener('message', (event) => { + // Echo back with binding id prefix + server.send( + `Binding ${this.ctx.props.id}: ${new TextDecoder().decode(event.data)}` + ); + }); + + return new Response(null, { + status: 101, + webSocket: client, + }); + } + + // Regular HTTP request return new Response('hello binding: ' + this.ctx.props.id); } } @@ -513,3 +628,12 @@ export const testSetEgressHttp = { await stub.testSetEgressHttp(); }, }; + +// Test WebSocket through interceptOutboundHttp - DO -> container -> worker binding via WebSocket +export const testInterceptWebSocket = { + async test(_ctrl, env) { + const id = env.MY_CONTAINER.idFromName('testInterceptWebSocket'); + const stub = env.MY_CONTAINER.get(id); + await stub.testInterceptWebSocket(); + }, +}; diff --git a/src/workerd/server/workerd.capnp b/src/workerd/server/workerd.capnp index a8c7ac40296..9e1cb23b62a 100644 --- a/src/workerd/server/workerd.capnp +++ b/src/workerd/server/workerd.capnp @@ -738,6 +738,7 @@ struct Worker { # Docker image name for the container egress interceptor sidecar. # This sidecar intercepts outbound traffic from containers and routes it # through workerd for egress mappings (setEgressHttp bindings). + # You can find this image in repositories like DockerHub: https://hub.docker.com/r/cloudflare/proxy-everything } } From ae742566f7800af286f8b4029781d48ec36515d4 Mon Sep 17 00:00:00 2001 From: Gabi Villalonga Simon Date: Thu, 5 Feb 2026 18:29:42 -0600 Subject: [PATCH 10/12] create workerd network for workerd containers and generate new types --- build/deps/oci.MODULE.bazel | 4 +- src/workerd/io/container.capnp | 11 +-- src/workerd/server/container-client.c++ | 86 ++++++++++++++----- src/workerd/server/container-client.h | 16 +++- src/workerd/server/docker-api.capnp | 14 +++ src/workerd/server/server.c++ | 11 +-- .../experimental/index.d.ts | 3 +- .../generated-snapshot/experimental/index.ts | 3 +- 8 files changed, 107 insertions(+), 41 deletions(-) diff --git a/build/deps/oci.MODULE.bazel b/build/deps/oci.MODULE.bazel index 17f58122188..8aece83b8e8 100644 --- a/build/deps/oci.MODULE.bazel +++ b/build/deps/oci.MODULE.bazel @@ -17,7 +17,7 @@ oci.pull( image = "docker.io/cloudflare/proxy-everything:main", platforms = [ "linux/amd64", - "linux/arm64/v8", + "linux/arm64", ], ) -use_repo(oci, "node_25_slim", "node_25_slim_linux_amd64", "node_25_slim_linux_arm64_v8", "proxy_everything", "proxy_everything_linux_amd64", "proxy_everything_linux_arm64_v8") +use_repo(oci, "node_25_slim", "node_25_slim_linux_amd64", "node_25_slim_linux_arm64_v8", "proxy_everything", "proxy_everything_linux_amd64", "proxy_everything_linux_arm64") diff --git a/src/workerd/io/container.capnp b/src/workerd/io/container.capnp index ffb4b427ccf..fa9bf192818 100644 --- a/src/workerd/io/container.capnp +++ b/src/workerd/io/container.capnp @@ -111,15 +111,12 @@ interface Container @0x9aaceefc06523bca { # If there is no activity timeout duration configured and no container connection, it's up to the runtime # to decide when to signal the container to exit. - setEgressTcp @8 (addr :Text, channelToken :Data); - # TODO: This method is unimplemented. - # - # Configures egress TCP routing for the container. When the container attempts to connect to the - # specified address, the connection should be routed back to the Workers runtime using the channel token. - - setEgressHttp @9 (hostPort :Text, channelToken :Data); + setEgressHttp @8 (hostPort :Text, channelToken :Data); # Configures egress HTTP routing for the container. When the container attempts to connect to the # specified host:port, the connection should be routed back to the Workers runtime using the channel token. # The format of hostPort can be '[':']'. If port is omitted, it's assumed to only cover port 80. # This method does not support HTTPs yet. + + + # TODO: setEgressTcp } diff --git a/src/workerd/server/container-client.c++ b/src/workerd/server/container-client.c++ index 69c97a5507a..826a706e837 100644 --- a/src/workerd/server/container-client.c++ +++ b/src/workerd/server/container-client.c++ @@ -9,8 +9,13 @@ #include #include +#if _WIN32 +#include +#undef DELETE +#else #include #include +#endif #include #include @@ -371,25 +376,60 @@ class ContainerClient::EgressHttpService final: public kj::HttpService { kj::HttpHeaderTable& headerTable; }; -kj::Promise ContainerClient::getDockerBridgeGateway() { - // Docker API: GET /networks/bridge - auto response = co_await dockerApiRequest( - network, kj::str(dockerPath), kj::HttpMethod::GET, kj::str("/networks/bridge")); +// The name of the docker workerd network. All containers spawned by Workerd +// will be attached to this network. +constexpr kj::StringPtr WORKERD_NETWORK_NAME = "workerd-network"_kj; + +kj::Promise ContainerClient::getDockerBridgeIPAMConfig() { + // First, try to find or create the workerd-network + // Docker API: GET /networks/workerd-network + auto response = co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::GET, + kj::str("/networks/", WORKERD_NETWORK_NAME)); + + if (response.statusCode == 404) { + // Network doesn't exist, create it + // Equivalent to: docker network create -d bridge --ipv6 workerd-network + co_await createWorkerdNetwork(); + // Re-fetch the network to get the gateway + response = co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::GET, + kj::str("/networks/", WORKERD_NETWORK_NAME)); + } if (response.statusCode == 200) { auto jsonRoot = decodeJsonResponse(response.body); auto ipamConfig = jsonRoot.getIpam().getConfig(); if (ipamConfig.size() > 0) { - auto gateway = ipamConfig[0].getGateway(); - if (gateway.size() > 0) { - co_return kj::str(gateway); - } + auto config = ipamConfig[0]; + co_return IPAMConfigResult{ + .gateway = kj::str(config.getGateway()), + .subnet = kj::str(config.getSubnet()), + }; } } - // Fallback to default Docker bridge gateway - KJ_LOG(WARNING, "Could not determine Docker bridge gateway, using default 172.17.0.1"); - co_return kj::str("172.17.0.1"); + JSG_FAIL_REQUIRE(Error, + "Failed to get or create workerd-network. " + "Status: ", + response.statusCode, ", Body: ", response.body); +} + +kj::Promise ContainerClient::createWorkerdNetwork() { + // Docker API: POST /networks/create + // Equivalent to: docker network create -d bridge --ipv6 workerd-network + capnp::JsonCodec codec; + codec.handleByAnnotation(); + capnp::MallocMessageBuilder message; + auto jsonRoot = message.initRoot(); + jsonRoot.setName(WORKERD_NETWORK_NAME); + jsonRoot.setDriver("bridge"); + jsonRoot.setEnableIpv6(true); + + auto response = co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::POST, + kj::str("/networks/create"), codec.encode(jsonRoot)); + + if (response.statusCode != 201 && response.statusCode != 409) { + KJ_LOG(WARNING, "Failed to create workerd-network", response.statusCode, response.body); + } } kj::Promise ContainerClient::startEgressListener(kj::StringPtr listenAddress) { @@ -535,6 +575,8 @@ kj::Promise ContainerClient::createContainer( auto envSize = environment.map([](auto& env) { return env.size(); }).orDefault(0); auto jsonEnv = jsonRoot.initEnv(envSize + kj::size(defaultEnv)); + co_await createWorkerdNetwork(); + KJ_IF_SOME(env, environment) { for (uint32_t i: kj::zeroTo(env.size())) { jsonEnv.set(i, env[i]); @@ -554,7 +596,10 @@ kj::Promise ContainerClient::createContainer( // Add host.docker.internal mapping so containers can reach the host // This is equivalent to --add-host=host.docker.internal:host-gateway auto extraHosts = hostConfig.initExtraHosts(1); - extraHosts.set(0, "host.docker.internal:host-gateway"); + auto ipamConfigForHost = co_await getDockerBridgeIPAMConfig(); + extraHosts.set(0, kj::str("host.docker.internal:", ipamConfigForHost.gateway)); + // Connect the container to the workerd-network for IPv6 support and container isolation + hostConfig.setNetworkMode(WORKERD_NETWORK_NAME); auto response = co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::POST, kj::str("/containers/create?name=", containerName), codec.encode(jsonRoot)); @@ -638,7 +683,8 @@ kj::Promise ContainerClient::destroyContainer() { // Creates the sidecar container for egress proxy. // The sidecar shares the network namespace with the main container and runs // proxy-everything to intercept and proxy egress traffic. -kj::Promise ContainerClient::createSidecarContainer(uint16_t egressPort) { +kj::Promise ContainerClient::createSidecarContainer( + uint16_t egressPort, kj::String networkCidr) { // Docker API: POST /containers/create // Equivalent to: docker run --cap-add=NET_ADMIN --network container:$(CONTAINER) ... capnp::JsonCodec codec; @@ -648,9 +694,11 @@ kj::Promise ContainerClient::createSidecarContainer(uint16_t egressPort) { jsonRoot.setImage(containerEgressInterceptorImage); // Pass the egress port to the sidecar via command line flag - auto cmd = jsonRoot.initCmd(2); + auto cmd = jsonRoot.initCmd(4); cmd.set(0, "--http-egress-port"); cmd.set(1, kj::str(egressPort)); + cmd.set(2, "--docker-gateway-cidr"); + cmd.set(3, networkCidr); auto hostConfig = jsonRoot.initHostConfig(); // Share network namespace with the main container @@ -891,16 +939,16 @@ kj::Promise ContainerClient::setEgressHttp(SetEgressHttpContext context) { KJ_DEFER(paf.fulfiller->fulfill()); // Get the Docker bridge gateway IP to listen on (only accessible from containers) - auto bridgeGateway = co_await getDockerBridgeGateway(); + auto ipamConfig = co_await getDockerBridgeIPAMConfig(); // Start the egress listener first so it's ready when the sidecar starts. // Use port 0 to let the OS pick a free port dynamically. - egressListenerPort = co_await startEgressListener(bridgeGateway); + egressListenerPort = co_await startEgressListener(ipamConfig.gateway); // Create and start the sidecar container that shares the network namespace // with the main container and intercepts egress traffic. // Pass the dynamically chosen port so the sidecar knows where to connect. - co_await createSidecarContainer(egressListenerPort); + co_await createSidecarContainer(egressListenerPort, kj::mv(ipamConfig.subnet)); co_await startSidecarContainer(); // Monitor the sidecar container for unexpected exits @@ -921,10 +969,6 @@ kj::Promise ContainerClient::setEgressHttp(SetEgressHttpContext context) { co_return; } -kj::Promise ContainerClient::setEgressTcp(SetEgressTcpContext context) { - KJ_UNIMPLEMENTED("setEgressTcp not implemented - use setEgressHttp for now"); -} - kj::Own ContainerClient::addRef() { return kj::addRef(*this); } diff --git a/src/workerd/server/container-client.h b/src/workerd/server/container-client.h index c7a2c1e0ac5..e302ff7a74c 100644 --- a/src/workerd/server/container-client.h +++ b/src/workerd/server/container-client.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -52,7 +53,6 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte kj::Promise getTcpPort(GetTcpPortContext context) override; kj::Promise listenTcp(ListenTcpContext context) override; kj::Promise setInactivityTimeout(SetInactivityTimeoutContext context) override; - kj::Promise setEgressTcp(SetEgressTcpContext context) override; kj::Promise setEgressHttp(SetEgressHttpContext context) override; kj::Own addRef(); @@ -92,6 +92,11 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte kj::HashMap ports; }; + struct IPAMConfigResult { + kj::String gateway; + kj::String subnet; + }; + // Docker API v1.50 helper methods static kj::Promise dockerApiRequest(kj::Network& network, kj::String dockerPath, @@ -107,7 +112,7 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte kj::Promise destroyContainer(); // Sidecar container management (for egress proxy) - kj::Promise createSidecarContainer(uint16_t egressPort); + kj::Promise createSidecarContainer(uint16_t egressPort, kj::String networkCidr); kj::Promise startSidecarContainer(); kj::Promise destroySidecarContainer(); kj::Promise monitorSidecarContainer(); @@ -150,8 +155,11 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte // Mutex to serialize setEgressHttp() calls (sidecar setup must complete before adding mappings) kj::Maybe> egressSetupLock; - // Get the Docker bridge network gateway IP (e.g., "172.17.0.1") - kj::Promise getDockerBridgeGateway(); + // Get the Docker bridge network gateway IP and subnet. + // Prefers the "workerd-network" bridge, creating it if needed + kj::Promise getDockerBridgeIPAMConfig(); + // Create the workerd-network Docker bridge network with IPv6 support + kj::Promise createWorkerdNetwork(); // Start the egress listener on the specified address, returns the chosen port kj::Promise startEgressListener(kj::StringPtr listenAddress); // Stop the egress listener diff --git a/src/workerd/server/docker-api.capnp b/src/workerd/server/docker-api.capnp index 79f1b092fef..692ce021cee 100644 --- a/src/workerd/server/docker-api.capnp +++ b/src/workerd/server/docker-api.capnp @@ -304,4 +304,18 @@ struct Docker { } } } + + # Network create request (POST /networks/create) + # Equivalent to: docker network create -d bridge --ipv6 workerd-network + struct NetworkCreateRequest { + name @0 :Text $Json.name("Name"); + driver @1 :Text $Json.name("Driver"); # "bridge", "overlay", etc. + enableIpv6 @2 :Bool $Json.name("EnableIPv6"); + } + + # Network create response + struct NetworkCreateResponse { + id @0 :Text $Json.name("Id"); + warning @1 :Text $Json.name("Warning"); + } } diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index 2a8bd6982f5..dc69f1bc8f2 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -4751,11 +4751,12 @@ kj::Promise> Server::makeWorkerImpl(kj::StringPtr kj::Maybe serviceName; if (!def.isDynamic) serviceName = name; - auto result = kj::refcounted(channelTokenHandler, serviceName, - globalContext->threadContext, monotonicClock, kj::mv(worker), kj::mv(errorReporter.defaultEntrypoint), - kj::mv(errorReporter.namedEntrypoints), kj::mv(errorReporter.actorClasses), - kj::mv(linkCallback), KJ_BIND_METHOD(*this, abortAllActors), kj::mv(dockerPath), - kj::mv(containerEgressInterceptorImage), def.isDynamic); + auto result = + kj::refcounted(channelTokenHandler, serviceName, globalContext->threadContext, + monotonicClock, kj::mv(worker), kj::mv(errorReporter.defaultEntrypoint), + kj::mv(errorReporter.namedEntrypoints), kj::mv(errorReporter.actorClasses), + kj::mv(linkCallback), KJ_BIND_METHOD(*this, abortAllActors), kj::mv(dockerPath), + kj::mv(containerEgressInterceptorImage), def.isDynamic); result->initActorNamespaces(def.localActorConfigs, network); co_return result; } diff --git a/types/generated-snapshot/experimental/index.d.ts b/types/generated-snapshot/experimental/index.d.ts index b1088803823..cce9417cadf 100755 --- a/types/generated-snapshot/experimental/index.d.ts +++ b/types/generated-snapshot/experimental/index.d.ts @@ -3841,7 +3841,8 @@ interface Container { signal(signo: number): void; getTcpPort(port: number): Fetcher; setInactivityTimeout(durationMs: number | bigint): Promise; - setEgressHttp(addr: string, binding: Fetcher): void; + interceptOutboundHttp(addr: string, binding: Fetcher): Promise; + interceptAllOutboundHttp(binding: Fetcher): Promise; } interface ContainerStartupOptions { entrypoint?: string[]; diff --git a/types/generated-snapshot/experimental/index.ts b/types/generated-snapshot/experimental/index.ts index ccde9b9940e..45fff8f440f 100755 --- a/types/generated-snapshot/experimental/index.ts +++ b/types/generated-snapshot/experimental/index.ts @@ -3850,7 +3850,8 @@ export interface Container { signal(signo: number): void; getTcpPort(port: number): Fetcher; setInactivityTimeout(durationMs: number | bigint): Promise; - setEgressHttp(addr: string, binding: Fetcher): void; + interceptOutboundHttp(addr: string, binding: Fetcher): Promise; + interceptAllOutboundHttp(binding: Fetcher): Promise; } export interface ContainerStartupOptions { entrypoint?: string[]; From fd9440d73041a7f1f08d83d2bc1e7c0bcc3a06f6 Mon Sep 17 00:00:00 2001 From: Gabi Villalonga Simon Date: Tue, 10 Feb 2026 19:01:17 -0600 Subject: [PATCH 11/12] address second pass in MR review --- build/deps/gen/build_deps.MODULE.bazel | 10 +- build/deps/gen/deps.MODULE.bazel | 6 +- src/workerd/api/container.c++ | 4 +- src/workerd/server/container-client.c++ | 196 +++++++----------------- src/workerd/server/container-client.h | 28 ++-- 5 files changed, 78 insertions(+), 166 deletions(-) diff --git a/build/deps/gen/build_deps.MODULE.bazel b/build/deps/gen/build_deps.MODULE.bazel index 325d4136b7f..36a83f37a05 100644 --- a/build/deps/gen/build_deps.MODULE.bazel +++ b/build/deps/gen/build_deps.MODULE.bazel @@ -5,7 +5,7 @@ http = use_extension("@//:build/exts/http.bzl", "http") git_repository = use_repo_rule("@bazel_tools//tools/build_defs/repo:git.bzl", "git_repository") # abseil-cpp -bazel_dep(name = "abseil-cpp", version = "20260107.0") +bazel_dep(name = "abseil-cpp", version = "20260107.1") # apple_support bazel_dep(name = "apple_support", version = "2.2.0") @@ -17,13 +17,13 @@ bazel_dep(name = "aspect_rules_esbuild", version = "0.25.0") bazel_dep(name = "aspect_rules_js", version = "2.9.2") # aspect_rules_lint -bazel_dep(name = "aspect_rules_lint", version = "1.13.0") +bazel_dep(name = "aspect_rules_lint", version = "2.1.0") # aspect_rules_ts -bazel_dep(name = "aspect_rules_ts", version = "3.8.3") +bazel_dep(name = "aspect_rules_ts", version = "3.8.4") # bazel_lib -bazel_dep(name = "bazel_lib", version = "3.1.1") +bazel_dep(name = "bazel_lib", version = "3.2.0") # bazel_skylib bazel_dep(name = "bazel_skylib", version = "1.9.0") @@ -117,7 +117,7 @@ bazel_dep(name = "rules_nodejs", version = "6.7.3") bazel_dep(name = "rules_oci", version = "2.2.7") # rules_python -bazel_dep(name = "rules_python", version = "1.8.0") +bazel_dep(name = "rules_python", version = "1.8.4") # rules_shell bazel_dep(name = "rules_shell", version = "0.6.1") diff --git a/build/deps/gen/deps.MODULE.bazel b/build/deps/gen/deps.MODULE.bazel index aa17edf7756..138176bcd45 100644 --- a/build/deps/gen/deps.MODULE.bazel +++ b/build/deps/gen/deps.MODULE.bazel @@ -27,10 +27,10 @@ bazel_dep(name = "brotli", version = "1.2.0") # capnp-cpp http.archive( name = "capnp-cpp", - sha256 = "58a883721d220a3d8d75531a7e7ede3fd87c3d6923caf645faff0c78f8807b23", - strip_prefix = "capnproto-capnproto-79b3170/c++", + sha256 = "2e8519d77eb453463b1f2b1e22f40959fe560c143d78e7a51c606ce3bca30c5b", + strip_prefix = "capnproto-capnproto-ac7d90a/c++", type = "tgz", - url = "https://github.com/capnproto/capnproto/tarball/79b317039adad92da1204929f4047f84dfd17350", + url = "https://github.com/capnproto/capnproto/tarball/ac7d90ae2e171a714be4348718fed6f26b0b85f2", ) use_repo(http, "capnp-cpp") diff --git a/src/workerd/api/container.c++ b/src/workerd/api/container.c++ index ed6c0fea03e..8c3db6e871d 100644 --- a/src/workerd/api/container.c++ +++ b/src/workerd/api/container.c++ @@ -100,8 +100,8 @@ jsg::Promise Container::interceptAllOutboundHttp(jsg::Lock& js, jsg::Ref Container::monitor(jsg::Lock& js) { diff --git a/src/workerd/server/container-client.c++ b/src/workerd/server/container-client.c++ index 826a706e837..168aff845f9 100644 --- a/src/workerd/server/container-client.c++ +++ b/src/workerd/server/container-client.c++ @@ -9,14 +9,6 @@ #include #include -#if _WIN32 -#include -#undef DELETE -#else -#include -#include -#endif - #include #include #include @@ -32,7 +24,6 @@ namespace workerd::server { namespace { -// Parsed address from parseHostPort() struct ParsedAddress { kj::CidrRange cidr; kj::Maybe port; @@ -73,13 +64,11 @@ HostAndPort stripPort(kj::StringPtr str) { // Valid port - but only treat as port for IPv4 (check no other colons before) auto beforeColon = str.first(colonPos); if (beforeColon.findFirst(':') == kj::none) { - // No other colons, so this is IPv4 with port return {kj::str(beforeColon), port}; } } } - // No port found return {kj::str(str), kj::none}; } @@ -205,18 +194,16 @@ ContainerClient::ContainerClient(capnp::ByteStreamFactory& byteStreamFactory, channelTokenHandler(channelTokenHandler) {} ContainerClient::~ContainerClient() noexcept(false) { - // Stop the egress listener stopEgressListener(); // Call the cleanup callback to remove this client from the ActorNamespace map cleanupCallback(); - // Destroy the sidecar container first (it depends on the main container's network) + // Sidecar shares main container's network namespace, so must be destroyed first waitUntilTasks.add(dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::DELETE, kj::str("/containers/", sidecarContainerName, "?force=true")) .ignoreResult()); - // Destroy the main Docker container waitUntilTasks.add(dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::DELETE, kj::str("/containers/", containerName, "?force=true")) .ignoreResult()); @@ -275,7 +262,7 @@ class ContainerClient::DockerPort final: public rpc::Container::Port::Server { // which we parse and forward to the appropriate SubrequestChannel based on egressMappings. // Inner HTTP service that handles requests inside the CONNECT tunnel. // Forwards requests to the worker binding via SubrequestChannel. -class ContainerClient::InnerEgressService final: public kj::HttpService { +class InnerEgressService final: public kj::HttpService { public: InnerEgressService(IoChannelFactory::SubrequestChannel& channel): channel(channel) {} @@ -284,11 +271,8 @@ class ContainerClient::InnerEgressService final: public kj::HttpService { const kj::HttpHeaders& headers, kj::AsyncInputStream& requestBody, Response& response) override { - // Forward to the SubrequestChannel IoChannelFactory::SubrequestMetadata metadata; auto worker = channel.startRequest(kj::mv(metadata)); - - // Forward the request to the worker - the response flows back through 'response' co_await worker->request(method, url, headers, requestBody, response); } @@ -297,7 +281,7 @@ class ContainerClient::InnerEgressService final: public kj::HttpService { }; // Outer HTTP service that handles CONNECT requests from the sidecar. -class ContainerClient::EgressHttpService final: public kj::HttpService { +class EgressHttpService final: public kj::HttpService { public: EgressHttpService(ContainerClient& containerClient, kj::HttpHeaderTable& headerTable) : containerClient(containerClient), @@ -317,55 +301,40 @@ class ContainerClient::EgressHttpService final: public kj::HttpService { kj::AsyncIoStream& connection, ConnectResponse& response, kj::HttpConnectSettings settings) override { - // The host header contains the destination address (e.g., "10.0.0.1:9999") - // that the container was trying to connect to. auto destAddr = kj::str(host); - // Accept the CONNECT tunnel kj::HttpHeaders responseHeaders(headerTable); response.accept(200, "OK", responseHeaders); - // Check if there's a mapping for this destination auto mapping = containerClient.findEgressMapping(destAddr, /*defaultPort=*/80); KJ_IF_SOME(channel, mapping) { - // Found a mapping - layer an HttpServer on top of the tunnel connection - // to handle HTTP parsing/serialization automatically - - // Create the inner service that forwards to the worker binding + // Layer an HttpServer on top of the tunnel to handle HTTP parsing/serialization auto innerService = kj::heap(*channel); - - // Create an HttpServer for the tunnel connection auto innerServer = kj::heap(containerClient.timer, headerTable, *innerService); - // Let the HttpServer handle the HTTP traffic inside the tunnel co_await innerServer->listenHttpCleanDrain(connection) .attach(kj::mv(innerServer), kj::mv(innerService)); co_return; } - // No mapping found - check if internet access is enabled if (!containerClient.internetEnabled) { - // Internet access not enabled - close the connection connection.shutdownWrite(); co_return; } - // Forward to the general internet via raw TCP - // Just do bidirectional byte pumping, no HTTP parsing needed + // No egress mapping and internet enabled, so forward via raw TCP auto addr = co_await containerClient.network.parseAddress(destAddr); auto destConn = co_await addr->connect(); - // Pump bytes bidirectionally: tunnel <-> destination auto connToDestination = connection.pumpTo(*destConn).then( [&destConn = *destConn](uint64_t) { destConn.shutdownWrite(); }); auto destinationToConn = destConn->pumpTo(connection).then([&connection](uint64_t) { connection.shutdownWrite(); }); - // Wait for both directions to complete co_await kj::joinPromisesFailFast( kj::arr(kj::mv(connToDestination), kj::mv(destinationToConn))); co_return; @@ -408,7 +377,7 @@ kj::Promise ContainerClient::getDockerBridgeI } JSG_FAIL_REQUIRE(Error, - "Failed to get or create workerd-network. " + "Failed to get workerd-network. " "Status: ", response.statusCode, ", Body: ", response.body); } @@ -428,29 +397,26 @@ kj::Promise ContainerClient::createWorkerdNetwork() { kj::str("/networks/create"), codec.encode(jsonRoot)); if (response.statusCode != 201 && response.statusCode != 409) { - KJ_LOG(WARNING, "Failed to create workerd-network", response.statusCode, response.body); + JSG_FAIL_REQUIRE(Error, + "Failed to create workerd-network." + "Status: ", + response.statusCode, ", Body: ", response.body); } } kj::Promise ContainerClient::startEgressListener(kj::StringPtr listenAddress) { - // Create the egress HTTP service auto service = kj::heap(*this, headerTable); - - // Create the HTTP server auto httpServer = kj::heap(timer, headerTable, *service); auto& httpServerRef = *httpServer; - // Attach service to httpServer so ownership is clear - httpServer owns service egressHttpServer = httpServer.attach(kj::mv(service)); // Listen on the Docker bridge gateway IP with port 0 to let the OS pick a free port auto addr = co_await network.parseAddress(kj::str(listenAddress, ":0")); auto listener = addr->listen(); - // Get the actual port that was assigned uint16_t chosenPort = listener->getPort(); - // Run the server in the background - this promise never completes normally egressListenerTask = httpServerRef.listenHttp(*listener) .attach(kj::mv(listener)) .eagerlyEvaluate([](kj::Exception&& e) { @@ -510,6 +476,7 @@ kj::Promise ContainerClient::inspectContainer( if (response.statusCode == 404) { co_return InspectResponse{.isRunning = false, .ports = {}}; } + JSG_REQUIRE(response.statusCode == 200, Error, "Container inspect failed"); // Parse JSON response auto jsonRoot = decodeJsonResponse(response.body); @@ -685,7 +652,6 @@ kj::Promise ContainerClient::destroyContainer() { // proxy-everything to intercept and proxy egress traffic. kj::Promise ContainerClient::createSidecarContainer( uint16_t egressPort, kj::String networkCidr) { - // Docker API: POST /containers/create // Equivalent to: docker run --cap-add=NET_ADMIN --network container:$(CONTAINER) ... capnp::JsonCodec codec; codec.handleByAnnotation(); @@ -693,7 +659,6 @@ kj::Promise ContainerClient::createSidecarContainer( auto jsonRoot = message.initRoot(); jsonRoot.setImage(containerEgressInterceptorImage); - // Pass the egress port to the sidecar via command line flag auto cmd = jsonRoot.initCmd(4); cmd.set(0, "--http-egress-port"); cmd.set(1, kj::str(egressPort)); @@ -712,77 +677,40 @@ kj::Promise ContainerClient::createSidecarContainer( auto response = co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::POST, kj::str("/containers/create?name=", sidecarContainerName), codec.encode(jsonRoot)); - // statusCode 409 refers to "conflict". Occurs when a container with the given name exists. - // In that case we destroy and re-create the container. if (response.statusCode == 409) { - co_await destroySidecarContainer(); - response = co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::POST, - kj::str("/containers/create?name=", sidecarContainerName), codec.encode(jsonRoot)); + // Already created, nothing to do + co_return; } - // statusCode 201 refers to "container created successfully" if (response.statusCode != 201) { JSG_REQUIRE(response.statusCode != 404, Error, "No such image available named ", containerEgressInterceptorImage, ". Please ensure the container egress interceptor image is built and available."); - JSG_REQUIRE(response.statusCode != 409, Error, "Sidecar container already exists"); - JSG_FAIL_REQUIRE( - Error, "Create sidecar container failed with [", response.statusCode, "] ", response.body); + JSG_FAIL_REQUIRE(Error, "Failed to create the networking sidecar [", response.statusCode, "] ", + response.body); } } kj::Promise ContainerClient::startSidecarContainer() { - // Docker API: POST /containers/{id}/start auto endpoint = kj::str("/containers/", sidecarContainerName, "/start"); auto response = co_await dockerApiRequest( network, kj::str(dockerPath), kj::HttpMethod::POST, kj::mv(endpoint), kj::str("")); - // statusCode 304 refers to "container already started" - JSG_REQUIRE(response.statusCode != 304, Error, "Sidecar container already started"); - // statusCode 204 refers to "no error" - JSG_REQUIRE( - response.statusCode == 204, Error, "Starting sidecar container failed with: ", response.body); + JSG_REQUIRE(response.statusCode == 204, Error, + "Starting network sidecar container failed with: ", response.body); } kj::Promise ContainerClient::destroySidecarContainer() { auto endpoint = kj::str("/containers/", sidecarContainerName, "?force=true"); - co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::DELETE, kj::mv(endpoint)) - .ignoreResult(); + co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::DELETE, kj::mv(endpoint)); auto response = co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::POST, kj::str("/containers/", sidecarContainerName, "/wait?condition=removed")); JSG_REQUIRE(response.statusCode == 200 || response.statusCode == 404, Error, - "Waiting for container sidecar removal failed with: ", response.statusCode, response.body); - KJ_LOG(WARNING, "Container destroyed"); -} - -kj::Promise ContainerClient::monitorSidecarContainer() { - // Docker API: POST /containers/{id}/wait - wait for container to exit - auto endpoint = kj::str("/containers/", sidecarContainerName, "/wait"); - auto response = co_await dockerApiRequest( - network, kj::str(dockerPath), kj::HttpMethod::POST, kj::mv(endpoint)); - - if (response.statusCode == 200) { - // Container exited - parse the exit code and log it - auto jsonRoot = decodeJsonResponse(response.body); - auto exitCode = jsonRoot.getStatusCode(); - KJ_LOG(WARNING, "Sidecar container exited unexpectedly", sidecarContainerName, exitCode); - - // Fetch the container logs to help diagnose the exit - auto logsEndpoint = - kj::str("/containers/", sidecarContainerName, "/logs?stdout=true&stderr=true&tail=50"); - auto logsResponse = co_await dockerApiRequest( - network, kj::str(dockerPath), kj::HttpMethod::GET, kj::mv(logsEndpoint)); - if (logsResponse.statusCode == 200) { - KJ_LOG(WARNING, "Sidecar container logs:", logsResponse.body); - } - } else if (response.statusCode == 404) { - // Container was removed before we could monitor it - this is normal during shutdown - } else { - KJ_LOG(ERROR, "Failed to monitor sidecar container", response.statusCode, response.body); - } + "Destroying docker network sidecar container failed: ", response.statusCode, response.body); } kj::Promise ContainerClient::status(StatusContext context) { const auto [isRunning, _ports] = co_await inspectContainer(); + containerStarted.store(isRunning, std::memory_order_release); context.getResults().setRunning(isRunning); } @@ -796,16 +724,24 @@ kj::Promise ContainerClient::start(StartContext context) { if (params.hasEntrypoint()) { entrypoint = params.getEntrypoint(); } + if (params.hasEnvironmentVariables()) { environment = params.getEnvironmentVariables(); } - // Track whether internet access is enabled for this container + // We record internetEnabled so we can accept/reject container connections that don't go to Workers internetEnabled = params.getEnableInternet(); - // Create and start the main user container co_await createContainer(entrypoint, environment); + + // Opt in to the proxy sidecar container only if the user has configured egressMappings + // for now. In the future, it will always run when a user container is running + if (!egressMappings.empty()) { + co_await ensureSidecarStarted(); + } + co_await startContainer(); + containerStarted.store(true, std::memory_order_release); } kj::Promise ContainerClient::monitor(MonitorContext context) { @@ -814,7 +750,6 @@ kj::Promise ContainerClient::monitor(MonitorContext context) { // If it hasn't, we'll give it 3 tries before failing. auto results = context.getResults(); for (int i = 0; i < 3; i++) { - // Docker API: POST /containers/{id}/wait - wait for container to exit auto endpoint = kj::str("/containers/", containerName, "/wait"); auto response = co_await dockerApiRequest( @@ -823,6 +758,8 @@ kj::Promise ContainerClient::monitor(MonitorContext context) { co_await timer.afterDelay(1 * kj::SECONDS); continue; } + + containerStarted.store(false, std::memory_order_release); JSG_REQUIRE(response.statusCode == 200, Error, "Monitoring container failed with: ", response.statusCode, response.body); // Parse JSON response @@ -831,11 +768,12 @@ kj::Promise ContainerClient::monitor(MonitorContext context) { results.setExitCode(statusCode); co_return; } + JSG_FAIL_REQUIRE(Error, "Monitor failed to find container"); } kj::Promise ContainerClient::destroy(DestroyContext context) { - // Destroy sidecar first since it depends on the main container's network + // Sidecar shares main container's network namespace, so must be destroyed first co_await destroySidecarContainer(); co_await destroyContainer(); } @@ -880,28 +818,8 @@ kj::Maybe ContainerClient::findEg auto hostAndPort = stripPort(destAddr); uint16_t port = hostAndPort.port.orDefault(defaultPort); - struct sockaddr_storage ss; - memset(&ss, 0, sizeof(ss)); - - auto* sin = reinterpret_cast(&ss); - auto* sin6 = reinterpret_cast(&ss); - - // This is kind of awful. We could theoretically have a CidrRange - // parse this, but we don't have a way to compare two CidrRanges. - // Ideally, KJ would have a library to parse IPs, and we are able to have a cidr.includes(ip) method. - if (inet_pton(AF_INET, hostAndPort.host.cStr(), &sin->sin_addr) == 1) { - ss.ss_family = AF_INET; - sin->sin_port = htons(port); - } else if (inet_pton(AF_INET6, hostAndPort.host.cStr(), &sin6->sin6_addr) == 1) { - ss.ss_family = AF_INET6; - sin6->sin6_port = htons(port); - } else { - JSG_KJ_EXCEPTION(FAILED, Error, "host is an invalid address"); - } - - // Find a matching mapping for (auto& mapping: egressMappings) { - if (mapping.cidr.matches(reinterpret_cast(&ss))) { + if (mapping.cidr.matches(hostAndPort.host)) { // CIDR matches, now check port. // If the port is 0, we match anything. if (mapping.port == 0 || mapping.port == port) { @@ -913,6 +831,21 @@ kj::Maybe ContainerClient::findEg return kj::none; } +kj::Promise ContainerClient::ensureSidecarStarted() { + if (containerSidecarStarted.exchange(true, std::memory_order_acquire)) { + co_return; + } + + KJ_ON_SCOPE_FAILURE(containerSidecarStarted.store(false, std::memory_order_release)); + + // Get the Docker bridge gateway IP to listen on (only accessible from containers) + auto ipamConfig = co_await getDockerBridgeIPAMConfig(); + // Create and start the sidecar container that shares the network namespace + // with the main container and intercepts egress traffic. + co_await createSidecarContainer(egressListenerPort, kj::mv(ipamConfig.subnet)); + co_await startSidecarContainer(); +} + kj::Promise ContainerClient::setEgressHttp(SetEgressHttpContext context) { auto params = context.getParams(); auto hostPortStr = kj::str(params.getHostPort()); @@ -923,43 +856,24 @@ kj::Promise ContainerClient::setEgressHttp(SetEgressHttpContext context) { uint16_t port = parsed.port.orDefault(80); auto cidr = kj::mv(parsed.cidr); - // Wait for any previous setEgressHttp call to complete - KJ_IF_SOME(lock, egressSetupLock) { - co_await lock.addBranch(); - } - - // If no egressListenerTask, start one now. - // The biggest disadvantage of doing it here, is that if the workerd process restarts, - // and the container is still running, it might have no connectivity. if (egressListenerTask == kj::none) { - // Create a promise/fulfiller pair to signal when setup is complete - // TODO: Actually, every RPC in this class would benefit from this. - auto paf = kj::newPromiseAndFulfiller(); - egressSetupLock = paf.promise.fork(); - KJ_DEFER(paf.fulfiller->fulfill()); - // Get the Docker bridge gateway IP to listen on (only accessible from containers) auto ipamConfig = co_await getDockerBridgeIPAMConfig(); // Start the egress listener first so it's ready when the sidecar starts. // Use port 0 to let the OS pick a free port dynamically. egressListenerPort = co_await startEgressListener(ipamConfig.gateway); + } - // Create and start the sidecar container that shares the network namespace - // with the main container and intercepts egress traffic. - // Pass the dynamically chosen port so the sidecar knows where to connect. - co_await createSidecarContainer(egressListenerPort, kj::mv(ipamConfig.subnet)); - co_await startSidecarContainer(); - - // Monitor the sidecar container for unexpected exits - waitUntilTasks.add(monitorSidecarContainer()); + if (containerStarted.load(std::memory_order_acquire)) { + // Only try to create and start a sidecar container + // if the user container is running. + co_await ensureSidecarStarted(); } - // Redeem the channel token to get a SubrequestChannel auto subrequestChannel = channelTokenHandler.decodeSubrequestChannelToken( workerd::IoChannelFactory::ChannelTokenUsage::RPC, tokenBytes); - // Store the mapping egressMappings.add(EgressMapping{ .cidr = kj::mv(cidr), .port = port, diff --git a/src/workerd/server/container-client.h b/src/workerd/server/container-client.h index e302ff7a74c..831ae442e64 100644 --- a/src/workerd/server/container-client.h +++ b/src/workerd/server/container-client.h @@ -19,6 +19,8 @@ #include #include +#include + namespace workerd::server { // Docker-based implementation that implements the rpc::Container::Server interface @@ -28,7 +30,7 @@ namespace workerd::server { // // ContainerClient is reference-counted to support actor reconnection with inactivity timeouts. // When setInactivityTimeout() is called, a timer holds a reference to prevent premature -// destruction. The ContainerClient can be shared across multiple actor lifetimes. +// destruction. The ContainerClient can be shared across multiple actor lifetimes class ContainerClient final: public rpc::Container::Server, public kj::Refcounted { public: ContainerClient(capnp::ByteStreamFactory& byteStreamFactory, @@ -59,7 +61,6 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte private: capnp::ByteStreamFactory& byteStreamFactory; - // Create header table for HTTP parsing kj::HttpHeaderTable headerTable; kj::Timer& timer; kj::Network& network; @@ -82,6 +83,9 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte // Docker-specific Port implementation class DockerPort; + // EgressHttpService handles CONNECT requests from proxy-anything sidecar + friend class EgressHttpService; + struct Response { kj::uint statusCode; kj::String body; @@ -125,15 +129,11 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte // Represents a parsed egress mapping with CIDR and port matching struct EgressMapping { - // The cidr to match this mapping on kj::CidrRange cidr; - // Port to match (0 means match all ports) - uint16_t port; - // The channel to route matching connections to + uint16_t port; // 0 means match all ports kj::Own channel; }; - // Egress HTTP mappings - list of CIDR/port rules to match against kj::Vector egressMappings; // Find a matching egress mapping for the given destination address (host:port format) @@ -143,18 +143,14 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte // Whether general internet access is enabled for this container bool internetEnabled = false; - // Egress HTTP listener for handling container egress via HTTP CONNECT from sidecar - class EgressHttpService; - class InnerEgressService; + std::atomic_bool containerStarted = false; + std::atomic_bool containerSidecarStarted = false; + kj::Maybe> egressHttpServer; kj::Maybe> egressListenerTask; - // The dynamically chosen port for the egress listener uint16_t egressListenerPort = 0; - // Mutex to serialize setEgressHttp() calls (sidecar setup must complete before adding mappings) - kj::Maybe> egressSetupLock; - // Get the Docker bridge network gateway IP and subnet. // Prefers the "workerd-network" bridge, creating it if needed kj::Promise getDockerBridgeIPAMConfig(); @@ -162,8 +158,10 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte kj::Promise createWorkerdNetwork(); // Start the egress listener on the specified address, returns the chosen port kj::Promise startEgressListener(kj::StringPtr listenAddress); - // Stop the egress listener void stopEgressListener(); + // Ensure the egress listener and sidecar container are started exactly once. + // Uses containerSidecarStarted as a guard. Called from both start() and setEgressHttp(). + kj::Promise ensureSidecarStarted(); }; } // namespace workerd::server From 742a039f9d87e1bc8d88bb035cd44f31390aedbb Mon Sep 17 00:00:00 2001 From: Gabi Villalonga Simon Date: Wed, 11 Feb 2026 21:28:53 -0600 Subject: [PATCH 12/12] fixup: Container egress interception image has to be optional until we exit experimental phase --- src/workerd/server/container-client.c++ | 16 ++++++++++------ src/workerd/server/container-client.h | 4 ++-- src/workerd/server/server.c++ | 6 ++---- .../server/tests/container-client/test.js | 17 +++++++++++++++++ 4 files changed, 31 insertions(+), 12 deletions(-) diff --git a/src/workerd/server/container-client.c++ b/src/workerd/server/container-client.c++ index 168aff845f9..0c55e9032a1 100644 --- a/src/workerd/server/container-client.c++ +++ b/src/workerd/server/container-client.c++ @@ -177,7 +177,7 @@ ContainerClient::ContainerClient(capnp::ByteStreamFactory& byteStreamFactory, kj::String dockerPath, kj::String containerName, kj::String imageName, - kj::String containerEgressInterceptorImage, + kj::Maybe containerEgressInterceptorImage, kj::TaskSet& waitUntilTasks, kj::Function cleanupCallback, ChannelTokenHandler& channelTokenHandler) @@ -657,7 +657,10 @@ kj::Promise ContainerClient::createSidecarContainer( codec.handleByAnnotation(); capnp::MallocMessageBuilder message; auto jsonRoot = message.initRoot(); - jsonRoot.setImage(containerEgressInterceptorImage); + auto& image = KJ_ASSERT_NONNULL(containerEgressInterceptorImage, + "containerEgressInterceptorImage must be configured to use egress interception. " + "Set it in the localDocker configuration."); + jsonRoot.setImage(image); auto cmd = jsonRoot.initCmd(4); cmd.set(0, "--http-egress-port"); @@ -683,8 +686,7 @@ kj::Promise ContainerClient::createSidecarContainer( } if (response.statusCode != 201) { - JSG_REQUIRE(response.statusCode != 404, Error, "No such image available named ", - containerEgressInterceptorImage, + JSG_REQUIRE(response.statusCode != 404, Error, "No such image available named ", image, ". Please ensure the container egress interceptor image is built and available."); JSG_FAIL_REQUIRE(Error, "Failed to create the networking sidecar [", response.statusCode, "] ", response.body); @@ -733,14 +735,17 @@ kj::Promise ContainerClient::start(StartContext context) { internetEnabled = params.getEnableInternet(); co_await createContainer(entrypoint, environment); + co_await startContainer(); // Opt in to the proxy sidecar container only if the user has configured egressMappings // for now. In the future, it will always run when a user container is running if (!egressMappings.empty()) { + // The user container will be blocked on network connectivity until this finishes. + // When workerd-network is more battle-tested and goes out of experimental so it's non-optional, + // we should make the sidecar start first and _then_ make the user container join the sidecar network. co_await ensureSidecarStarted(); } - co_await startContainer(); containerStarted.store(true, std::memory_order_release); } @@ -850,7 +855,6 @@ kj::Promise ContainerClient::setEgressHttp(SetEgressHttpContext context) { auto params = context.getParams(); auto hostPortStr = kj::str(params.getHostPort()); auto tokenBytes = params.getChannelToken(); - JSG_REQUIRE(containerEgressInterceptorImage != "", Error, "should be set for setEgressHttp"); auto parsed = parseHostPort(hostPortStr); uint16_t port = parsed.port.orDefault(80); diff --git a/src/workerd/server/container-client.h b/src/workerd/server/container-client.h index 831ae442e64..7d592393937 100644 --- a/src/workerd/server/container-client.h +++ b/src/workerd/server/container-client.h @@ -39,7 +39,7 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte kj::String dockerPath, kj::String containerName, kj::String imageName, - kj::String containerEgressInterceptorImage, + kj::Maybe containerEgressInterceptorImage, kj::TaskSet& waitUntilTasks, kj::Function cleanupCallback, ChannelTokenHandler& channelTokenHandler); @@ -70,7 +70,7 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte kj::String imageName; // Container egress interceptor image name (sidecar for egress proxy) - kj::String containerEgressInterceptorImage; + kj::Maybe containerEgressInterceptorImage; kj::TaskSet& waitUntilTasks; diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index dc69f1bc8f2..03580e9a8e8 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -2860,12 +2860,10 @@ class Server::WorkerService final: public Service, containerClients.erase(containerId); }; - auto& interceptorImage = KJ_ASSERT_NONNULL(containerEgressInterceptorImage, - "containerEgressInterceptorImage must be defined when docker is enabled."); - auto client = kj::refcounted(byteStreamFactory, timer, dockerNetwork, kj::str(dockerPathRef), kj::str(containerId), kj::str(imageName), - kj::str(interceptorImage), waitUntilTasks, kj::mv(cleanupCallback), channelTokenHandler); + containerEgressInterceptorImage.map([](kj::StringPtr s) { return kj::str(s); }), + waitUntilTasks, kj::mv(cleanupCallback), channelTokenHandler); // Store raw pointer in map (does not own) containerClients.insert(kj::str(containerId), client.get()); diff --git a/src/workerd/server/tests/container-client/test.js b/src/workerd/server/tests/container-client/test.js index f093ad7d0e5..f09039081cb 100644 --- a/src/workerd/server/tests/container-client/test.js +++ b/src/workerd/server/tests/container-client/test.js @@ -307,6 +307,13 @@ export class DurableObjectExample extends DurableObject { assert.strictEqual(container.running, false); + // Set up egress TCP mapping to route requests to the binding + // We can configure this even before the container starts. + await container.interceptOutboundHttp( + '1.2.3.4', + this.ctx.exports.TestService({ props: { id: 1234 } }) + ); + // Start container container.start(); @@ -332,6 +339,16 @@ export class DurableObjectExample extends DurableObject { this.ctx.exports.TestService({ props: { id: 3 } }) ); + { + const response = await container + .getTcpPort(8080) + .fetch('http://foo/intercept', { + headers: { 'x-host': '1.2.3.4:80' }, + }); + assert.equal(response.status, 200); + assert.equal(await response.text(), 'hello binding: 1234'); + } + { const response = await container .getTcpPort(8080)