From 6bae79540fe9cde0eb335bcb44692080726e3b63 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Tue, 21 Nov 2023 17:43:13 -0800 Subject: [PATCH 1/2] TCP ingress worker support Implements the connect handler and tcp-ingress for a worker See the samples/tcp-ingress for an example Eliminate code duplication in server.c++ Require experimental flag for tcp ingress --- samples/tcp-ingress/config.capnp | 19 ++ samples/tcp-ingress/worker.js | 11 ++ src/workerd/api/global-scope.c++ | 88 ++++++++++ src/workerd/api/global-scope.h | 45 ++++- src/workerd/api/tests/BUILD.bazel | 6 + src/workerd/api/tests/tcp-ingress-test.js | 26 +++ .../api/tests/tcp-ingress-test.wd-test | 18 ++ src/workerd/api/trace.c++ | 28 +++ src/workerd/api/trace.h | 29 ++- src/workerd/io/trace-stream.c++ | 4 + src/workerd/io/trace.c++ | 29 +++ src/workerd/io/trace.h | 12 ++ src/workerd/io/tracer.c++ | 9 + src/workerd/io/worker-entrypoint.c++ | 151 +++++++++++++--- src/workerd/io/worker-interface.capnp | 6 + src/workerd/server/server.c++ | 166 ++++++++++++++---- src/workerd/server/server.h | 5 + src/workerd/server/workerd.capnp | 3 + src/workerd/util/stream-utils.c++ | 2 +- src/workerd/util/stream-utils.h | 2 +- 20 files changed, 587 insertions(+), 72 deletions(-) create mode 100644 samples/tcp-ingress/config.capnp create mode 100644 samples/tcp-ingress/worker.js create mode 100644 src/workerd/api/tests/tcp-ingress-test.js create mode 100644 src/workerd/api/tests/tcp-ingress-test.wd-test diff --git a/samples/tcp-ingress/config.capnp b/samples/tcp-ingress/config.capnp new file mode 100644 index 00000000000..5b339d50283 --- /dev/null +++ b/samples/tcp-ingress/config.capnp @@ -0,0 +1,19 @@ +using Workerd = import "/workerd/workerd.capnp"; + +const tcpIngressExample :Workerd.Config = ( + services = [ + (name = "main", worker = .worker), + ], + + sockets = [ + ( name = "http", address = "*:8080", http = (), service = "main" ), + ( name = "tcp", address = "*:8081", tcp = (), service = "main" ) + ] +); + +const worker :Workerd.Worker = ( + modules = [ + (name = "worker", esModule = embed "worker.js") + ], + compatibilityDate = "2023-02-28", +); diff --git a/samples/tcp-ingress/worker.js b/samples/tcp-ingress/worker.js new file mode 100644 index 00000000000..594485572bf --- /dev/null +++ b/samples/tcp-ingress/worker.js @@ -0,0 +1,11 @@ + +export default { + async fetch(req) { + return new Response("ok"); + }, + + connect({inbound, cf}) { + console.log(cf); + return inbound.pipeThrough(new IdentityTransformStream()); + } +}; diff --git a/src/workerd/api/global-scope.c++ b/src/workerd/api/global-scope.c++ index 6c5c23ec648..563c0efdd21 100644 --- a/src/workerd/api/global-scope.c++ +++ b/src/workerd/api/global-scope.c++ @@ -118,6 +118,94 @@ void ServiceWorkerGlobalScope::clear() { unhandledRejections.clear(); } +kj::Promise> ServiceWorkerGlobalScope::connect(kj::AsyncIoStream& connection, + kj::HttpService::ConnectResponse& response, + kj::Maybe cfBlobJson, + Worker::Lock& lock, + kj::Maybe exportedHandler) { + ExportedHandler& eh = JSG_REQUIRE_NONNULL(exportedHandler, Error, + "Connect ingress is not currently supported with Service Workers syntax."); + KJ_REQUIRE(FeatureFlags::get(lock).getWorkerdExperimental(), + "TCP ingress requires the experimental flag."); + + kj::HttpHeaderTable table; + kj::HttpHeaders headers(table); + + KJ_IF_SOME(handler, eh.connect) { + // Has a connect handler! + response.accept(200, "OK", headers); + + // TODO(cleanup): There's a fair amount of duplication between this and + // the request() method, and much of this could likely be cleaned up to + // use co_awaits rather than the promise syntax. + auto ownConnection = newNeuterableIoStream(connection); + auto deferredNeuter = kj::defer([ownConnection = kj::addRef(*ownConnection)]() mutable { + ownConnection->neuter(makeNeuterException(NeuterReason::CLIENT_DISCONNECTED)); + }); + KJ_ON_SCOPE_FAILURE(ownConnection->neuter(makeNeuterException(NeuterReason::THREW_EXCEPTION))); + auto conn2 = kj::addRef(*ownConnection); + + auto& ioContext = IoContext::current(); + jsg::Lock& js = lock; + + CfProperty cf(cfBlobJson); + + auto conn = newSystemMultiStream(kj::addRef(*ownConnection), ioContext); + auto jsInbound = jsg::alloc(ioContext, kj::mv(conn.readable)); + + kj::Maybe span = ioContext.makeTraceSpan("connect_handler"_kjc); + auto event = jsg::alloc(kj::mv(jsInbound), kj::mv(cf)); + auto promise = handler(js, kj::mv(event), eh.env.addRef(js), eh.getCtx()); + + struct RefcountedBool: public kj::Refcounted { + bool value; + RefcountedBool(bool value): value(value) {} + }; + auto canceled = kj::refcounted(false); + + return ioContext + .awaitJs(js, + promise.then(js, + ioContext.addFunctor([canceled = kj::addRef(*canceled), + outbound = kj::mv(conn.writable), span = kj::mv(span)]( + jsg::Lock& js, jsg::Ref jsOutbound) mutable + -> IoOwn>> { + auto& context = IoContext::current(); + span = kj::none; + if (canceled->value) { + // The client disconnected before the response was ready. The outbound + // is a dangling reference, let's not use it. + return context.addObject(kj::heap(addNoopDeferredProxy(kj::READY_NOW))); + } else { + return context.addObject(kj::heap(jsOutbound->pumpTo(js, kj::mv(outbound), true))); + } + }))) + .attach(kj::defer([canceled = kj::mv(canceled)]() mutable { canceled->value = true; })) + .then( + [ownConnection = kj::mv(ownConnection), deferredNeuter = kj::mv(deferredNeuter)]( + DeferredProxy deferredProxy) mutable { + deferredProxy.proxyTask = deferredProxy.proxyTask + .then([connection = kj::addRef(*ownConnection)]() mutable { + connection->neuter(makeNeuterException(NeuterReason::SENT_RESPONSE)); + }, [connection = kj::addRef(*ownConnection)](kj::Exception&& e) mutable { + connection->neuter(makeNeuterException(NeuterReason::THREW_EXCEPTION)); + kj::throwFatalException(kj::mv(e)); + }).attach(kj::mv(deferredNeuter)); + + return deferredProxy; + }, + [connection = kj::mv(conn2)](kj::Exception&& e) mutable -> DeferredProxy { + // HACK: We depend on the fact that the success-case lambda above hasn't been destroyed yet + // so `deferredNeuter` hasn't been destroyed yet. + connection->neuter(makeNeuterException(NeuterReason::THREW_EXCEPTION)); + kj::throwFatalException(kj::mv(e)); + }); + } + lock.logWarningOnce("Received a connect event but we lack a handler. " + "Did you remember to export a connect() function?"); + JSG_FAIL_REQUIRE(Error, "Handler does not export a connect() function."); +} + kj::Promise> ServiceWorkerGlobalScope::request(kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders& headers, diff --git a/src/workerd/api/global-scope.h b/src/workerd/api/global-scope.h index cd75ad0233a..0c0894b67bd 100644 --- a/src/workerd/api/global-scope.h +++ b/src/workerd/api/global-scope.h @@ -282,6 +282,34 @@ class AlarmInvocationInfo: public jsg::Object { uint32_t retryCount = 0; }; +class ConnectEvent final: public Event { + public: + static jsg::Ref constructor() = delete; + + ConnectEvent(jsg::Ref inbound, CfProperty&& cf) + : Event("connect"), + inbound(kj::mv(inbound)), + cf(kj::mv(cf)) {} + + jsg::Ref getInbound() { + return inbound.addRef(); + } + + // Returns the `cf` field containing Cloudflare feature flags. + jsg::Optional getCf(jsg::Lock& js) { + return cf.get(js); + } + + JSG_RESOURCE_TYPE(ConnectEvent) { + JSG_READONLY_PROTOTYPE_PROPERTY(inbound, getInbound); + JSG_READONLY_PROTOTYPE_PROPERTY(cf, getCf); + } + + private: + jsg::Ref inbound; + CfProperty cf; +}; + // Type signature for handlers exported from the root module. // // We define each handler method as a LenientOptional rather than as a plain Optional in order to @@ -296,6 +324,11 @@ struct ExportedHandler { using TailHandler = kj::Promise(kj::Array> events, jsg::Value env, jsg::Optional> ctx); + using ConnectHandler = jsg::Promise>(jsg::Ref connect, + jsg::Value env, + jsg::Optional> ctx); + jsg::LenientOptional> connect; + jsg::LenientOptional> tail; jsg::LenientOptional> trace; @@ -332,6 +365,7 @@ struct ExportedHandler { jsg::SelfRef self; JSG_STRUCT(fetch, + connect, tail, trace, tailStream, @@ -349,6 +383,7 @@ struct ExportedHandler { JSG_STRUCT_TS_DEFINE( type ExportedHandlerFetchHandler = (request: Request>, env: Env, ctx: ExecutionContext) => Response | Promise; + type ExportedHandlerConnectHandler = (readable: ReadableStream, env: Env, ctx: ExecutionContext) => ReadableStream | Promise; type ExportedHandlerTailHandler = (events: TraceItem[], env: Env, ctx: ExecutionContext) => void | Promise; type ExportedHandlerTraceHandler = (traces: TraceItem[], env: Env, ctx: ExecutionContext) => void | Promise; type ExportedHandlerTailStreamHandler = (event : TailStream.TailEvent, env: Env, ctx: ExecutionContext) => TailStream.TailEventHandlerType | Promise; @@ -359,6 +394,7 @@ struct ExportedHandler { JSG_STRUCT_TS_OVERRIDE( { email?: EmailExportedHandler; fetch?: ExportedHandlerFetchHandler; + connect?: ExportedHandlerConnectHandler; tail?: ExportedHandlerTailHandler; trace?: ExportedHandlerTraceHandler; tailStream?: ExportedHandlerTailStreamHandler; @@ -458,6 +494,13 @@ class ServiceWorkerGlobalScope: public WorkerGlobalScope { // TODO(cleanup): Factor out the shared code used between old-style event listeners vs. module // exports and move that code somewhere more appropriate. + // Received TCP/socket ingress (called from C++, not JS). + kj::Promise> connect(kj::AsyncIoStream& connection, + kj::HttpService::ConnectResponse& response, + kj::Maybe cfBlobJson, + Worker::Lock& lock, + kj::Maybe exportedHandler); + // Received sendTraces (called from C++, not JS). void sendTraces(kj::ArrayPtr> traces, Worker::Lock& lock, @@ -926,6 +969,6 @@ class ServiceWorkerGlobalScope: public WorkerGlobalScope { api::WorkerGlobalScope, api::ServiceWorkerGlobalScope, api::TestController, \ api::ExecutionContext, api::ExportedHandler, \ api::ServiceWorkerGlobalScope::StructuredCloneOptions, api::Navigator, \ - api::AlarmInvocationInfo, api::Immediate, api::Cloudflare + api::AlarmInvocationInfo, api::Immediate, api::ConnectEvent, api::Cloudflare // The list of global-scope.h types that are added to worker.c++'s JSG_DECLARE_ISOLATE_TYPE } // namespace workerd::api diff --git a/src/workerd/api/tests/BUILD.bazel b/src/workerd/api/tests/BUILD.bazel index 4422b8e2dd8..33ef972c719 100644 --- a/src/workerd/api/tests/BUILD.bazel +++ b/src/workerd/api/tests/BUILD.bazel @@ -13,6 +13,12 @@ wd_test( data = ["actor-alarms-delete-test.js"], ) +wd_test( + src = "tcp-ingress-test.wd-test", + args = ["--experimental"], + data = ["tcp-ingress-test.js"], +) + wd_test( src = "actor-alarms-test.wd-test", args = ["--experimental"], diff --git a/src/workerd/api/tests/tcp-ingress-test.js b/src/workerd/api/tests/tcp-ingress-test.js new file mode 100644 index 00000000000..1f83b5eeeef --- /dev/null +++ b/src/workerd/api/tests/tcp-ingress-test.js @@ -0,0 +1,26 @@ +import { connect } from 'cloudflare:sockets'; +import { ok, strictEqual } from 'assert'; + +export const newFunction = { + async test() { + const socket = connect('localhost:8081'); + await socket.opened; + const dec = new TextDecoder(); + let result = ''; + for await (const chunk of socket.readable) { + result += dec.decode(chunk, { stream: true }); + } + result += dec.decode(); + strictEqual(result, 'hello'); + await socket.closed; + }, +}; + +export default { + connect({ inbound, cf }) { + const enc = new TextEncoder(); + ok(inbound instanceof ReadableStream); + strictEqual(typeof cf.clientIp, 'string'); + return ReadableStream.from([enc.encode('hello')]); + }, +}; diff --git a/src/workerd/api/tests/tcp-ingress-test.wd-test b/src/workerd/api/tests/tcp-ingress-test.wd-test new file mode 100644 index 00000000000..08b7300cd78 --- /dev/null +++ b/src/workerd/api/tests/tcp-ingress-test.wd-test @@ -0,0 +1,18 @@ +using Workerd = import "/workerd/workerd.capnp"; + +const unitTests :Workerd.Config = ( + services = [ + ( name = "tcp-ingress-test", + worker = ( + modules = [ + (name = "worker", esModule = embed "tcp-ingress-test.js"), + ], + compatibilityFlags = ["nodejs_compat_v2", "experimental"], + ) + ), + ( name = "internet", network = ( allow = ["private"] ) ) + ], + sockets = [ + (name = "tcp", address = "*:8081", tcp = (), service = "tcp-ingress-test") + ] +); diff --git a/src/workerd/api/trace.c++ b/src/workerd/api/trace.c++ index 972e2cf9cdc..c0a62e4e0cc 100644 --- a/src/workerd/api/trace.c++ +++ b/src/workerd/api/trace.c++ @@ -144,6 +144,9 @@ kj::Maybe getTraceEvent(jsg::Lock& js, const Trace& trace) KJ_CASE_ONEOF(scheduled, tracing::ScheduledEventInfo) { return kj::Maybe(js.alloc(trace, scheduled)); } + KJ_CASE_ONEOF(connect, tracing::ConnectEventInfo) { + return kj::Maybe(jsg::alloc(js, trace, connect)); + } KJ_CASE_ONEOF(alarm, tracing::AlarmEventInfo) { return kj::Maybe(js.alloc(trace, alarm)); } @@ -228,6 +231,9 @@ kj::Maybe TraceItem::getEvent(jsg::Lock& js) { KJ_CASE_ONEOF(info, jsg::Ref) { return info.addRef(); } + KJ_CASE_ONEOF(info, jsg::Ref) { + return info.addRef(); + } } KJ_UNREACHABLE; }); @@ -710,6 +716,9 @@ void TraceItem::visitForMemoryInfo(jsg::MemoryTracker& tracker) const { KJ_CASE_ONEOF(info, jsg::Ref) { tracker.trackField("eventInfo", info); } + KJ_CASE_ONEOF(info, jsg::Ref) { + tracker.trackField("eventInfo", info); + } } } for (const auto& log: logs) { @@ -758,4 +767,23 @@ void TraceItem::HibernatableWebSocketEventInfo::visitForMemoryInfo( } } +namespace { +jsg::Optional> getConnectCf( + jsg::Lock& js, const tracing::ConnectEventInfo& eventInfo) { + const auto& cfJson = eventInfo.cfJson; + if (cfJson.size() > 0) { + return js.parseJson(cfJson).cast(js); + } + return kj::none; +}; +} // namespace + +TraceItem::ConnectEventInfo::ConnectEventInfo( + jsg::Lock& js, const Trace& trace, const tracing::ConnectEventInfo& eventInfo) + : cf(getConnectCf(js, eventInfo)) {} + +jsg::Optional> TraceItem::ConnectEventInfo::getCf(jsg::Lock& js) { + return cf.map([&](jsg::V8Ref& obj) { return obj.addRef(js); }); +} + } // namespace workerd::api diff --git a/src/workerd/api/trace.h b/src/workerd/api/trace.h index 9cf9e6b50f0..e817fda2308 100644 --- a/src/workerd/api/trace.h +++ b/src/workerd/api/trace.h @@ -71,6 +71,7 @@ class TraceItem final: public jsg::Object { public: class FetchEventInfo; class JsRpcEventInfo; + class ConnectEventInfo; class ScheduledEventInfo; class AlarmEventInfo; class QueueEventInfo; @@ -83,6 +84,7 @@ class TraceItem final: public jsg::Object { using EventInfo = kj::OneOf, jsg::Ref, + jsg::Ref, jsg::Ref, jsg::Ref, jsg::Ref, @@ -282,6 +284,21 @@ class TraceItem::JsRpcEventInfo final: public jsg::Object { kj::String rpcMethod; }; +class TraceItem::ConnectEventInfo final: public jsg::Object { + public: + explicit ConnectEventInfo( + jsg::Lock& js, const Trace& trace, const tracing::ConnectEventInfo& eventInfo); + + jsg::Optional> getCf(jsg::Lock& js); + + JSG_RESOURCE_TYPE(ConnectEventInfo) { + JSG_LAZY_READONLY_INSTANCE_PROPERTY(cf, getCf); + } + + private: + jsg::Optional> cf; +}; + class TraceItem::ScheduledEventInfo final: public jsg::Object { public: explicit ScheduledEventInfo(const Trace& trace, const tracing::ScheduledEventInfo& eventInfo); @@ -644,12 +661,12 @@ class TraceCustomEvent final: public WorkerInterface::CustomEvent { #define EW_TRACE_ISOLATE_TYPES \ api::ScriptVersion, api::TailEvent, api::TraceItem, api::TraceItem::AlarmEventInfo, \ - api::TraceItem::CustomEventInfo, api::TraceItem::ScheduledEventInfo, \ - api::TraceItem::QueueEventInfo, api::TraceItem::EmailEventInfo, \ - api::TraceItem::TailEventInfo, api::TraceItem::TailEventInfo::TailItem, \ - api::TraceItem::FetchEventInfo, api::TraceItem::FetchEventInfo::Request, \ - api::TraceItem::FetchEventInfo::Response, api::TraceItem::JsRpcEventInfo, \ - api::TraceItem::HibernatableWebSocketEventInfo, \ + api::TraceItem::ConnectEventInfo, api::TraceItem::CustomEventInfo, \ + api::TraceItem::ScheduledEventInfo, api::TraceItem::QueueEventInfo, \ + api::TraceItem::EmailEventInfo, api::TraceItem::TailEventInfo, \ + api::TraceItem::TailEventInfo::TailItem, api::TraceItem::FetchEventInfo, \ + api::TraceItem::FetchEventInfo::Request, api::TraceItem::FetchEventInfo::Response, \ + api::TraceItem::JsRpcEventInfo, api::TraceItem::HibernatableWebSocketEventInfo, \ api::TraceItem::HibernatableWebSocketEventInfo::Message, \ api::TraceItem::HibernatableWebSocketEventInfo::Close, \ api::TraceItem::HibernatableWebSocketEventInfo::Error, api::TraceLog, api::TraceException, \ diff --git a/src/workerd/io/trace-stream.c++ b/src/workerd/io/trace-stream.c++ index 5e6d6c98cdc..3793e113674 100644 --- a/src/workerd/io/trace-stream.c++ +++ b/src/workerd/io/trace-stream.c++ @@ -385,6 +385,10 @@ jsg::JsValue ToJs(jsg::Lock& js, const Onset& onset, StringCache& cache) { KJ_CASE_ONEOF(hws, HibernatableWebSocketEventInfo) { obj.set(js, INFO_STR, ToJs(js, hws, cache)); } + // TODO + KJ_CASE_ONEOF(custom, ConnectEventInfo) { + // obj.set(js, INFO_STR, ToJs(js, custom, cache)); + } KJ_CASE_ONEOF(custom, CustomEventInfo) { obj.set(js, INFO_STR, ToJs(js, custom, cache)); } diff --git a/src/workerd/io/trace.c++ b/src/workerd/io/trace.c++ index a12e3bbd8a9..2c14af0c78f 100644 --- a/src/workerd/io/trace.c++ +++ b/src/workerd/io/trace.c++ @@ -326,6 +326,19 @@ static kj::HttpMethod validateMethod(capnp::HttpMethod method) { } // namespace +ConnectEventInfo::ConnectEventInfo(kj::String cfJson): cfJson(kj::mv(cfJson)) {} + +ConnectEventInfo::ConnectEventInfo(rpc::Trace::ConnectEventInfo::Reader reader) + : cfJson(kj::str(reader.getCfJson())) {} + +void ConnectEventInfo::copyTo(rpc::Trace::ConnectEventInfo::Builder builder) const { + builder.setCfJson(cfJson); +} + +ConnectEventInfo ConnectEventInfo::clone() const { + return ConnectEventInfo(kj::str(cfJson)); +} + FetchEventInfo::FetchEventInfo( kj::HttpMethod method, kj::String url, kj::String cfJson, kj::Array
headers) : method(method), @@ -774,6 +787,10 @@ void Trace::copyTo(rpc::Trace::Builder builder) const { auto jsRpcBuilder = eventInfoBuilder.initJsRpc(); jsRpc.copyTo(jsRpcBuilder); } + KJ_CASE_ONEOF(connect, tracing::ConnectEventInfo) { + auto connectBuilder = eventInfoBuilder.initConnect(); + connect.copyTo(connectBuilder); + } KJ_CASE_ONEOF(scheduled, tracing::ScheduledEventInfo) { auto scheduledBuilder = eventInfoBuilder.initScheduled(); scheduled.copyTo(scheduledBuilder); @@ -880,6 +897,9 @@ void Trace::mergeFrom(rpc::Trace::Reader reader, PipelineLogLevel pipelineLogLev case rpc::Trace::EventInfo::Which::JS_RPC: eventInfo = tracing::JsRpcEventInfo(e.getJsRpc()); break; + case rpc::Trace::EventInfo::Which::CONNECT: + eventInfo = tracing::ConnectEventInfo(e.getConnect()); + break; case rpc::Trace::EventInfo::Which::SCHEDULED: eventInfo = tracing::ScheduledEventInfo(e.getScheduled()); break; @@ -1096,6 +1116,9 @@ Onset::Info readOnsetInfo(const rpc::Trace::Onset::Info::Reader& info) { case rpc::Trace::Onset::Info::JS_RPC: { return JsRpcEventInfo(info.getJsRpc()); } + case rpc::Trace::Onset::Info::CONNECT: { + return ConnectEventInfo(info.getConnect()); + } case rpc::Trace::Onset::Info::SCHEDULED: { return ScheduledEventInfo(info.getScheduled()); } @@ -1126,6 +1149,9 @@ void writeOnsetInfo(const Onset::Info& info, rpc::Trace::Onset::Info::Builder& i KJ_CASE_ONEOF(fetch, FetchEventInfo) { fetch.copyTo(infoBuilder.initFetch()); } + KJ_CASE_ONEOF(fetch, ConnectEventInfo) { + fetch.copyTo(infoBuilder.initConnect()); + } KJ_CASE_ONEOF(jsrpc, JsRpcEventInfo) { jsrpc.copyTo(infoBuilder.initJsRpc()); } @@ -1278,6 +1304,9 @@ EventInfo cloneEventInfo(const EventInfo& info) { KJ_CASE_ONEOF(fetch, FetchEventInfo) { return fetch.clone(); } + KJ_CASE_ONEOF(connect, ConnectEventInfo) { + return connect.clone(); + } KJ_CASE_ONEOF(jsrpc, JsRpcEventInfo) { return jsrpc.clone(); } diff --git a/src/workerd/io/trace.h b/src/workerd/io/trace.h index dfd34b06b77..a84d3ccfa5e 100644 --- a/src/workerd/io/trace.h +++ b/src/workerd/io/trace.h @@ -362,6 +362,17 @@ struct JsRpcEventInfo final { kj::String toString() const; }; +class ConnectEventInfo { + public: + explicit ConnectEventInfo(kj::String cfJson); + explicit ConnectEventInfo(rpc::Trace::ConnectEventInfo::Reader reader); + + kj::String cfJson; + + void copyTo(rpc::Trace::ConnectEventInfo::Builder builder) const; + ConnectEventInfo clone() const; +}; + // Describes a scheduled request struct ScheduledEventInfo final { explicit ScheduledEventInfo(double scheduledTime, kj::String cron); @@ -577,6 +588,7 @@ using EventInfo = kj::OneOf; EventInfo cloneEventInfo(const EventInfo& info); diff --git a/src/workerd/io/tracer.c++ b/src/workerd/io/tracer.c++ index 5f401db2c69..27b78abc9e6 100644 --- a/src/workerd/io/tracer.c++ +++ b/src/workerd/io/tracer.c++ @@ -303,6 +303,15 @@ void WorkerTracer::setEventInfoInternal( info = tracing::FetchEventInfo(fetch.method, {}, {}, {}); } } + KJ_CASE_ONEOF(connect, tracing::ConnectEventInfo) { + eventSize += connect.cfJson.size(); + if (eventSize > MAX_TRACE_BYTES) { + trace->logs.add(timestamp, LogLevel::WARN, + kj::str("[\"Trace resource limit exceeded; could not capture event info.\"]")); + trace->eventInfo = tracing::ConnectEventInfo(kj::str()); + return; + } + } KJ_CASE_ONEOF_DEFAULT {} } diff --git a/src/workerd/io/worker-entrypoint.c++ b/src/workerd/io/worker-entrypoint.c++ index 1114fd38850..8a200574867 100644 --- a/src/workerd/io/worker-entrypoint.c++ +++ b/src/workerd/io/worker-entrypoint.c++ @@ -242,6 +242,30 @@ void WorkerEntrypoint::init(kj::Own worker, .attach(kj::mv(actor)); } +kj::Exception exceptionToPropagate2(bool isInternalException, kj::Exception&& exception) { + if (isInternalException) { + // We've already logged it here, the only thing that matters to the client is that we failed + // due to an internal error. Note that this does not need to be labeled "remote." since jsg + // will sanitize it as an internal error. Note that we use `setDescription()` to preserve + // the exception type for `jsg::exceptionToJs(...)` downstream. + exception.setDescription(kj::str("worker_do_not_log; Request failed due to internal error")); + return kj::mv(exception); + } else { + // We do not care how many remote capnp servers this went through since we are returning + // it to the worker via jsg. + // TODO(someday) We also do this stripping when making the tunneled exception for + // `jsg::isTunneledException(...)`. It would be lovely if we could simply store some type + // instead of `loggedExceptionEarlier`. It would save use some work. + auto description = jsg::stripRemoteExceptionPrefix(exception.getDescription()); + if (!description.startsWith("remote.")) { + // If we already were annotated as remote from some other worker entrypoint, no point + // adding an additional prefix. + exception.setDescription(kj::str("remote.", description)); + } + return kj::mv(exception); + } +}; + kj::Promise WorkerEntrypoint::request(kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders& headers, @@ -426,31 +450,6 @@ kj::Promise WorkerEntrypoint::request(kj::HttpMethod method, } } - auto exceptionToPropagate = [&]() { - if (isInternalException) { - // We've already logged it here, the only thing that matters to the client is that we failed - // due to an internal error. Note that this does not need to be labeled "remote." since jsg - // will sanitize it as an internal error. Note that we use `setDescription()` to preserve - // the exception type for `jsg::exceptionToJs(...)` downstream. - exception.setDescription( - kj::str("worker_do_not_log; Request failed due to internal error")); - return kj::mv(exception); - } else { - // We do not care how many remote capnp servers this went through since we are returning - // it to the worker via jsg. - // TODO(someday) We also do this stripping when making the tunneled exception for - // `jsg::isTunneledException(...)`. It would be lovely if we could simply store some type - // instead of `loggedExceptionEarlier`. It would save use some work. - auto description = jsg::stripRemoteExceptionPrefix(exception.getDescription()); - if (!description.startsWith("remote.")) { - // If we already were annotated as remote from some other worker entrypoint, no point - // adding an additional prefix. - exception.setDescription(kj::str("remote.", description)); - } - return kj::mv(exception); - } - }; - if (wrappedResponse->isSent()) { // We can't fail open if the response was already sent, so set `failOpenService` null so that // that branch isn't taken below. @@ -462,7 +461,7 @@ kj::Promise WorkerEntrypoint::request(kj::HttpMethod method, // TODO(cleanup): We'd really like to tunnel exceptions any time a worker is calling another // worker, not just for actors (and W2W below), but getting that right will require cleaning // up error handling more generally. - return exceptionToPropagate(); + return exceptionToPropagate2(isInternalException, kj::mv(exception)); } else KJ_IF_SOME(service, failOpenService) { // Fall back to origin. @@ -496,7 +495,7 @@ kj::Promise WorkerEntrypoint::request(kj::HttpMethod method, // Like with the isActor check, we want to return exceptions back to the caller. // We don't want to handle this case the same as the isActor case though, since we want // fail-open to operate normally, which means this case must happen after fail-open handling. - return exceptionToPropagate(); + return exceptionToPropagate2(isInternalException, kj::mv(exception)); } else { // Return error. @@ -539,6 +538,7 @@ kj::Promise WorkerEntrypoint::connect(kj::StringPtr host, incomingRequest->delivered(); auto& context = incomingRequest->getContext(); + // TODO: Does this block interfere with connect stuff below, does drain() get duplicated? KJ_DEFER({ // Since we called incomingRequest->delivered, we are obliged to call `drain()`. auto promise = incomingRequest->drain().attach(kj::mv(incomingRequest)); @@ -557,7 +557,102 @@ kj::Promise WorkerEntrypoint::connect(kj::StringPtr host, return next->connect(host, headers, connection, response, settings); } - JSG_FAIL_REQUIRE(TypeError, "Incoming CONNECT on a worker not supported"); + bool isActor = context.getActor() != kj::none; + + KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) { + kj::String cfJson; + KJ_IF_SOME(c, cfBlobJson) { + cfJson = kj::str(c); + } + + t.setEventInfo(*incomingRequest, tracing::ConnectEventInfo(kj::mv(cfJson))); + } + + auto metricsForCatch = kj::addRef(incomingRequest->getMetrics()); + + return context + .run([this, &context, &connection, &response, entrypointName = entrypointName]( + Worker::Lock& lock) mutable { + jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock); + + return lock.getGlobalScope().connect(connection, response, cfBlobJson, lock, + lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor())); + }) + .then([this](api::DeferredProxy deferredProxy) { + proxyTask = kj::mv(deferredProxy.proxyTask); + }) + .exclusiveJoin(context.onAbort()) + .catch_([this, &context](kj::Exception&& exception) mutable -> kj::Promise { + // Log JS exceptions to the JS console, if fiddle is attached. This also has the effect of + // logging internal errors to syslog. + loggedExceptionEarlier = true; + context.logUncaughtExceptionAsync(UncaughtExceptionSource::REQUEST_HANDLER, kj::cp(exception)); + + // Do not allow the exception to escape the isolate without waiting for the output gate to + // open. Note that in the success path, this is taken care of in `FetchEvent::respondWith()`. + return context.waitForOutputLocks().then( + [exception = kj::mv(exception)]() mutable -> kj::Promise { + return kj::mv(exception); + }); + }) + .attach(kj::defer([this, incomingRequest = kj::mv(incomingRequest), &context]() mutable { + // The request has been canceled, but allow it to continue executing in the background. + auto promise = incomingRequest->drain().attach(kj::mv(incomingRequest)); + waitUntilTasks.add(maybeAddGcPassForTest(context, kj::mv(promise))); + })) + .then([this]() -> kj::Promise { + // Now that the IoContext is dropped (unless it had waitUntil()s), we can finish proxying + // without pinning it or the isolate into memory. + KJ_IF_SOME(p, proxyTask) { + return kj::mv(p); + } else { + return kj::READY_NOW; + } + }) + .attach(kj::defer([this]() mutable { + // If we're being cancelled, we need to make sure `proxyTask` gets canceled. + proxyTask = kj::none; + })) + .catch_([this, isActor, &response, metrics = kj::mv(metricsForCatch)]( + kj::Exception&& exception) mutable -> kj::Promise { + // Don't return errors to end user. + + auto isInternalException = !jsg::isTunneledException(exception.getDescription()) && + !jsg::isDoNotLogException(exception.getDescription()); + if (!loggedExceptionEarlier) { + // This exception seems to have originated during the deferred proxy task, so it was not + // logged to the IoContext earlier. + if (exception.getType() != kj::Exception::Type::DISCONNECTED && isInternalException) { + LOG_EXCEPTION("workerEntrypoint", exception); + } else { + KJ_LOG(INFO, exception); // Run with --verbose to see exception logs. + } + } + + if (isActor || tunnelExceptions) { + // We want to tunnel exceptions from actors back to the caller. + // TODO(cleanup): We'd really like to tunnel exceptions any time a worker is calling another + // worker, not just for actors (and W2W below), but getting that right will require cleaning + // up error handling more generally. + return exceptionToPropagate2(isInternalException, kj::mv(exception)); + } else { + // Return error. + + // We're catching the exception and replacing it with 5xx, but metrics should still indicate + // an exception. + metrics->reportFailure(exception); + + kj::HttpHeaders headers(threadContext.getHeaderTable()); + if (exception.getType() == kj::Exception::Type::OVERLOADED) { + response.reject(503, "Service Unavailable", headers, static_cast(0)); + } else { + response.reject(500, "Internal Server Error", headers, static_cast(0)); + } + // TODO: Set return event here? + + return kj::READY_NOW; + } + }); } kj::Promise WorkerEntrypoint::prewarm(kj::StringPtr url) { diff --git a/src/workerd/io/worker-interface.capnp b/src/workerd/io/worker-interface.capnp index a594026a885..ce5b1cf11ef 100644 --- a/src/workerd/io/worker-interface.capnp +++ b/src/workerd/io/worker-interface.capnp @@ -97,6 +97,7 @@ struct Trace @0x8e8d911203762d34 { email @16 :EmailEventInfo; trace @18 :TraceEventInfo; hibernatableWebSocket @20 :HibernatableWebSocketEventInfo; + connect @28 :ConnectEventInfo; } struct FetchEventInfo { method @0 :HttpMethod; @@ -114,6 +115,10 @@ struct Trace @0x8e8d911203762d34 { methodName @0 :Text; } + struct ConnectEventInfo { + cfJson @0 :Text; + } + struct ScheduledEventInfo { scheduledTime @0 :Float64; cron @1 :Text; @@ -269,6 +274,7 @@ struct Trace @0x8e8d911203762d34 { email @5 :EmailEventInfo; trace @6 :TraceEventInfo; hibernatableWebSocket @7 :HibernatableWebSocketEventInfo; + connect @9 : ConnectEventInfo; custom @8 :CustomEventInfo; } } diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index 93fd589c9de..7eff0d08bf6 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -5028,6 +5029,41 @@ class Server::WorkerdBootstrapImpl final: public rpc::WorkerdBootstrap::Server { }; }; +namespace { +// Used by both the Server:HttpListener and Server::TcpListener +kj::Maybe processCfBlobHeader(kj::AuthenticatedStream& stream) { + kj::PeerIdentity* peerId; + + KJ_IF_SOME(tlsId, kj::dynamicDowncastIfAvailable(*stream.peerIdentity)) { + peerId = &tlsId.getNetworkIdentity(); + + // TODO(someday): Add client certificate info to the cf blob? At present, KJ only + // supplies the common name, but that doesn't even seem to be one of the fields that + // Cloudflare-hosted Workers receive. We should probably try to match those. + } else { + peerId = stream.peerIdentity; + } + + KJ_IF_SOME(remote, kj::dynamicDowncastIfAvailable(*peerId)) { + return kj::str("{\"clientIp\": \"", escapeJsonString(remote.toString()), "\"}"); + } else KJ_IF_SOME(local, kj::dynamicDowncastIfAvailable(*peerId)) { + auto creds = local.getCredentials(); + + kj::Vector parts; + KJ_IF_SOME(p, creds.pid) { + parts.add(kj::str("\"clientPid\":", p)); + } + KJ_IF_SOME(u, creds.uid) { + parts.add(kj::str("\"clientUid\":", u)); + } + + return kj::str("{", kj::strArray(parts, ","), "}"); + } + + return kj::none; +} +} // namespace + class Server::HttpListener final: public kj::Refcounted { public: HttpListener(Server& owner, @@ -5055,36 +5091,7 @@ class Server::HttpListener final: public kj::Refcounted { kj::Maybe cfBlobJson; if (!rewriter->hasCfBlobHeader()) { - // Construct a cf blob describing the client identity. - - kj::PeerIdentity* peerId; - - KJ_IF_SOME(tlsId, - kj::dynamicDowncastIfAvailable(*stream.peerIdentity)) { - peerId = &tlsId.getNetworkIdentity(); - - // TODO(someday): Add client certificate info to the cf blob? At present, KJ only - // supplies the common name, but that doesn't even seem to be one of the fields that - // Cloudflare-hosted Workers receive. We should probably try to match those. - } else { - peerId = stream.peerIdentity; - } - - KJ_IF_SOME(remote, kj::dynamicDowncastIfAvailable(*peerId)) { - cfBlobJson = kj::str("{\"clientIp\": ", escapeJsonString(remote.toString()), "}"); - } else KJ_IF_SOME(local, kj::dynamicDowncastIfAvailable(*peerId)) { - auto creds = local.getCredentials(); - - kj::Vector parts; - KJ_IF_SOME(p, creds.pid) { - parts.add(kj::str("\"clientPid\":", p)); - } - KJ_IF_SOME(u, creds.uid) { - parts.add(kj::str("\"clientUid\":", u)); - } - - cfBlobJson = kj::str("{", kj::strArray(parts, ","), "}"); - } + cfBlobJson = processCfBlobHeader(stream); } auto conn = kj::heap(*this, kj::mv(cfBlobJson)); @@ -5240,6 +5247,63 @@ class Server::HttpListener final: public kj::Refcounted { }; }; +class Server::TcpListener final: public kj::Refcounted { + public: + TcpListener(Server& owner, + kj::Own listener, + kj::Own service, + kj::HttpHeaderTable& headerTable, + kj::Own rewriter) + : owner(owner), + listener(kj::mv(listener)), + service(kj::mv(service)), + headerTable(headerTable), + rewriter(kj::mv(rewriter)) {} + + kj::Promise run() { + for (;;) { + kj::AuthenticatedStream stream = co_await listener->acceptAuthenticated(); + + kj::Maybe cfBlobJson; + if (!rewriter->hasCfBlobHeader()) { + cfBlobJson = processCfBlobHeader(stream); + } + + IoChannelFactory::SubrequestMetadata metadata; + metadata.cfBlobJson = mapCopyString(cfBlobJson); + auto req = service->startRequest(kj::mv(metadata)); + auto response = kj::heap(); + kj::HttpHeaders headers(headerTable); + // The empty string here is the host parameter that is required by the API + // but is not actually used in the implementation at this point. + owner.tasks.add(req->connect(""_kj, headers, *stream.stream, *response, {}) + .attach(kj::mv(stream.stream), kj::mv(response)) + .attach(kj::mv(req))); + } + } + + private: + Server& owner; + kj::Own listener; + kj::Own service; + kj::HttpHeaderTable& headerTable; + kj::Own rewriter; + + struct ResponseWrapper final: public kj::HttpService::ConnectResponse { + void accept( + uint statusCode, kj::StringPtr statusText, const kj::HttpHeaders& headers) override { + // Ok.. we're accepting the connection... anything to do? + } + kj::Own reject(uint statusCode, + kj::StringPtr statusText, + const kj::HttpHeaders& headers, + kj::Maybe expectedBodySize = kj::none) override { + // Doh... we're rejecting the connection... anything to do? + return newNullOutputStream(); + } + }; +}; + kj::Promise Server::listenHttp(kj::Own listener, kj::Own service, kj::StringPtr physicalProtocol, @@ -5250,6 +5314,14 @@ kj::Promise Server::listenHttp(kj::Own listener, co_return co_await obj->run(); } +kj::Promise Server::listenTcp(kj::Own listener, + kj::Own service, + kj::Own rewriter) { + auto obj = kj::refcounted( + *this, kj::mv(listener), kj::mv(service), globalContext->headerTable, kj::mv(rewriter)); + co_return co_await obj->run(); +} + // ======================================================================================= // Debug port for exposing all services via RPC @@ -5704,13 +5776,16 @@ kj::Promise Server::listenOnSockets(config::Config::Reader config, config::HttpOptions::Reader httpOptions; kj::Maybe> tls; kj::StringPtr physicalProtocol; + bool isHttp = false; switch (sock.which()) { case config::Socket::HTTP: + isHttp = true; defaultPort = 80; httpOptions = sock.getHttp(); physicalProtocol = "http"; goto validSocket; case config::Socket::HTTPS: { + isHttp = true; auto https = sock.getHttps(); defaultPort = 443; httpOptions = https.getOptions(); @@ -5718,6 +5793,16 @@ kj::Promise Server::listenOnSockets(config::Config::Reader config, physicalProtocol = "https"; goto validSocket; } + case config::Socket::TCP: { + isHttp = false; + auto tcp = sock.getTcp(); + // No default port + // No physical protocol mention here. + if (tcp.hasTlsOptions()) { + tls = makeTlsContext(tcp.getTlsOptions()); + } + goto validSocket; + } } reportConfigError(kj::str("Encountered unknown socket type in \"", name, "\". Was the config compiled with a " @@ -5744,14 +5829,20 @@ kj::Promise Server::listenOnSockets(config::Config::Reader config, })(kj::mv(listener), kj::mv(t)); } - // Need to create rewriter before waiting on anything since `headerTableBuilder` will no longer - // be available later. + // Need to create rewriter before waiting on anything since `headerTableBuilder` will + // no longer be available later. auto rewriter = kj::heap(httpOptions, headerTableBuilder); auto handle = kj::coCapture( - [this, service = kj::mv(service), rewriter = kj::mv(rewriter), physicalProtocol, name]( + [this, service = kj::mv(service), rewriter = kj::mv(rewriter), physicalProtocol, name, + isHttp]( kj::Promise> promise) mutable -> kj::Promise { - TRACE_EVENT("workerd", "setup listenHttp"); + if (isHttp) { + TRACE_EVENT("workerd", "setup listenHttp"); + } else { + TRACE_EVENT("workerd", "setup listenTcp"); + } + auto listener = co_await promise; KJ_IF_SOME(stream, controlOverride) { auto message = kj::str("{\"event\":\"listen\",\"socket\":\"", name, @@ -5762,7 +5853,12 @@ kj::Promise Server::listenOnSockets(config::Config::Reader config, KJ_LOG(ERROR, e); } } - co_await listenHttp(kj::mv(listener), kj::mv(service), physicalProtocol, kj::mv(rewriter)); + + if (isHttp) { + co_await listenHttp(kj::mv(listener), kj::mv(service), physicalProtocol, kj::mv(rewriter)); + } else { + co_await listenTcp(kj::mv(listener), kj::mv(service), kj::mv(rewriter)); + } }); tasks.add(handle(kj::mv(listener)).exclusiveJoin(forkedDrainWhen.addBranch())); } diff --git a/src/workerd/server/server.h b/src/workerd/server/server.h index 6162b1107f8..87f1f03eea2 100644 --- a/src/workerd/server/server.h +++ b/src/workerd/server/server.h @@ -292,6 +292,10 @@ class Server final: private kj::TaskSet::ErrorHandler, private ChannelTokenHandl kj::Promise listenDebugPort(kj::Own listener); + kj::Promise listenTcp(kj::Own listener, + kj::Own service, + kj::Own rewriter); + class InvalidConfigService; class InvalidConfigActorClass; class ExternalHttpService; @@ -303,6 +307,7 @@ class Server final: private kj::TaskSet::ErrorHandler, private ChannelTokenHandl class WorkerdBootstrapImpl; class HttpListener; class DebugPortListener; + class TcpListener; struct ErrorReporter; struct ConfigErrorReporter; diff --git a/src/workerd/server/workerd.capnp b/src/workerd/server/workerd.capnp index 2b2bca8d371..df01c51c883 100644 --- a/src/workerd/server/workerd.capnp +++ b/src/workerd/server/workerd.capnp @@ -143,6 +143,9 @@ struct Socket { options @3 :HttpOptions; tlsOptions @4 :TlsOptions; } + tcp :group { + tlsOptions @6 :TlsOptions; + } # TODO(someday): TCP, TCP proxy, SMTP, Cap'n Proto, ... } diff --git a/src/workerd/util/stream-utils.c++ b/src/workerd/util/stream-utils.c++ index 361f9dc3881..006cc57c3a7 100644 --- a/src/workerd/util/stream-utils.c++ +++ b/src/workerd/util/stream-utils.c++ @@ -242,7 +242,7 @@ kj::Own newNeuterableInputStream(kj::AsyncInputStream& in } kj::Own newNeuterableIoStream(kj::AsyncIoStream& inner) { - return kj::heap(inner); + return kj::refcounted(inner); } } // namespace workerd diff --git a/src/workerd/util/stream-utils.h b/src/workerd/util/stream-utils.h index 3e0c23f0458..5f6dba72ccf 100644 --- a/src/workerd/util/stream-utils.h +++ b/src/workerd/util/stream-utils.h @@ -28,7 +28,7 @@ class NeuterableInputStream: public kj::AsyncInputStream, public kj::Refcounted virtual void neuter(kj::Exception ex) = 0; }; -class NeuterableIoStream: public kj::AsyncIoStream { +class NeuterableIoStream: public kj::AsyncIoStream, public kj::Refcounted { public: virtual void neuter(kj::Exception ex) = 0; }; From 540658bb2774cdea3bcc88917c63a9978a6d0472 Mon Sep 17 00:00:00 2001 From: Felix Hanau Date: Tue, 27 Jan 2026 20:20:48 +0000 Subject: [PATCH 2/2] [WIP] --- samples/tcp-ingress/config.capnp | 1 + src/workerd/api/global-scope.c++ | 30 ++++++----- src/workerd/api/system-streams.c++ | 9 ++++ src/workerd/api/system-streams.h | 5 ++ src/workerd/api/tests/BUILD.bazel | 5 +- ...ngress-test.js => connect-handler-test.js} | 1 + ...t.wd-test => connect-handler-test.wd-test} | 8 +-- src/workerd/api/tests/js-rpc-test.js | 20 ++++++++ .../api/tests/tail-worker-test-receiver.js | 2 +- src/workerd/api/tests/tail-worker-test.js | 3 ++ .../api/tests/tail-worker-test.wd-test | 13 +++++ src/workerd/io/trace-stream.c++ | 13 +++-- src/workerd/io/worker-entrypoint.c++ | 50 ++++++++++++------- src/workerd/server/server.c++ | 15 ++++-- types/defines/rpc.d.ts | 1 + .../experimental/index.d.ts | 15 ++++++ .../generated-snapshot/experimental/index.ts | 15 ++++++ types/generated-snapshot/latest/index.d.ts | 15 ++++++ types/generated-snapshot/latest/index.ts | 15 ++++++ 19 files changed, 187 insertions(+), 49 deletions(-) rename src/workerd/api/tests/{tcp-ingress-test.js => connect-handler-test.js} (94%) rename src/workerd/api/tests/{tcp-ingress-test.wd-test => connect-handler-test.wd-test} (58%) diff --git a/samples/tcp-ingress/config.capnp b/samples/tcp-ingress/config.capnp index 5b339d50283..418ff5931fb 100644 --- a/samples/tcp-ingress/config.capnp +++ b/samples/tcp-ingress/config.capnp @@ -15,5 +15,6 @@ const worker :Workerd.Worker = ( modules = [ (name = "worker", esModule = embed "worker.js") ], + compatibilityFlags = ["nodejs_compat_v2", "experimental"], compatibilityDate = "2023-02-28", ); diff --git a/src/workerd/api/global-scope.c++ b/src/workerd/api/global-scope.c++ index 563c0efdd21..c076ac54b67 100644 --- a/src/workerd/api/global-scope.c++ +++ b/src/workerd/api/global-scope.c++ @@ -86,6 +86,7 @@ jsg::LenientOptional mapAddRef(jsg::Lock& js, jsg::LenientOptional& functi ExportedHandler ExportedHandler::clone(jsg::Lock& js) { return ExportedHandler{ .fetch{mapAddRef(js, fetch)}, + .connect{mapAddRef(js, connect)}, .tail{mapAddRef(js, tail)}, .trace{mapAddRef(js, trace)}, .tailStream{mapAddRef(js, tailStream)}, @@ -128,11 +129,10 @@ kj::Promise> ServiceWorkerGlobalScope::connect(kj::AsyncIoSt KJ_REQUIRE(FeatureFlags::get(lock).getWorkerdExperimental(), "TCP ingress requires the experimental flag."); - kj::HttpHeaderTable table; - kj::HttpHeaders headers(table); - KJ_IF_SOME(handler, eh.connect) { // Has a connect handler! + kj::HttpHeaderTable table; + kj::HttpHeaders headers(table); response.accept(200, "OK", headers); // TODO(cleanup): There's a fair amount of duplication between this and @@ -150,37 +150,35 @@ kj::Promise> ServiceWorkerGlobalScope::connect(kj::AsyncIoSt CfProperty cf(cfBlobJson); - auto conn = newSystemMultiStream(kj::addRef(*ownConnection), ioContext); - auto jsInbound = jsg::alloc(ioContext, kj::mv(conn.readable)); + auto ownConn2 = kj::addRef(*ownConnection); + auto conn = newSystemMultiStream(ownConn2, ioContext); + auto jsInbound = js.alloc(ioContext, kj::mv(conn.readable)); kj::Maybe span = ioContext.makeTraceSpan("connect_handler"_kjc); - auto event = jsg::alloc(kj::mv(jsInbound), kj::mv(cf)); + auto event = js.alloc(kj::mv(jsInbound), kj::mv(cf)); auto promise = handler(js, kj::mv(event), eh.env.addRef(js), eh.getCtx()); - struct RefcountedBool: public kj::Refcounted { - bool value; - RefcountedBool(bool value): value(value) {} - }; - auto canceled = kj::refcounted(false); + auto canceled = kj::refcounted>(false); return ioContext .awaitJs(js, promise.then(js, - ioContext.addFunctor([canceled = kj::addRef(*canceled), + ioContext.addFunctor([canceled = canceled->addWrappedRef(), outbound = kj::mv(conn.writable), span = kj::mv(span)]( jsg::Lock& js, jsg::Ref jsOutbound) mutable -> IoOwn>> { auto& context = IoContext::current(); span = kj::none; - if (canceled->value) { - // The client disconnected before the response was ready. The outbound - // is a dangling reference, let's not use it. + if (*canceled) { + // The client disconnected before the response was ready. The outbound is a dangling + // reference, let's not use it. return context.addObject(kj::heap(addNoopDeferredProxy(kj::READY_NOW))); } else { return context.addObject(kj::heap(jsOutbound->pumpTo(js, kj::mv(outbound), true))); } }))) - .attach(kj::defer([canceled = kj::mv(canceled)]() mutable { canceled->value = true; })) + .attach( + kj::defer([canceled = kj::mv(canceled)]() mutable { canceled->getWrapped() = true; })) .then( [ownConnection = kj::mv(ownConnection), deferredNeuter = kj::mv(deferredNeuter)]( DeferredProxy deferredProxy) mutable { diff --git a/src/workerd/api/system-streams.c++ b/src/workerd/api/system-streams.c++ index 29803822c66..bc716aea31e 100644 --- a/src/workerd/api/system-streams.c++ +++ b/src/workerd/api/system-streams.c++ @@ -6,6 +6,8 @@ #include "util.h" +#include + #include #include #include @@ -388,6 +390,13 @@ SystemMultiStream newSystemMultiStream( .writable = kj::heap( stream.addWrappedRef(), StreamEncoding::IDENTITY, context)}; } +SystemMultiStream newSystemMultiStream(kj::Own& stream, IoContext& context) { + + return {.readable = kj::heap( + kj::addRef(*stream), StreamEncoding::IDENTITY, context), + .writable = + kj::heap(kj::addRef(*stream), StreamEncoding::IDENTITY, context)}; +} ContentEncodingOptions::ContentEncodingOptions(CompatibilityFlags::Reader flags) : brotliEnabled(flags.getBrotliContentEncoding()) {} diff --git a/src/workerd/api/system-streams.h b/src/workerd/api/system-streams.h index 8c4fb23b08a..dbcc20074cd 100644 --- a/src/workerd/api/system-streams.h +++ b/src/workerd/api/system-streams.h @@ -12,6 +12,10 @@ #include #include +namespace workerd { +class NeuterableIoStream; +}; + namespace workerd::api { // A ReadableStreamSource which automatically decodes its underlying stream. It does so lazily -- if @@ -41,6 +45,7 @@ struct SystemMultiStream { // A combo ReadableStreamSource and WritableStreamSink. SystemMultiStream newSystemMultiStream(kj::RefcountedWrapper>& stream, IoContext& context = IoContext::current()); +SystemMultiStream newSystemMultiStream(kj::Own& stream, IoContext& context); struct ContentEncodingOptions { bool brotliEnabled = false; diff --git a/src/workerd/api/tests/BUILD.bazel b/src/workerd/api/tests/BUILD.bazel index 33ef972c719..c089ce82bf7 100644 --- a/src/workerd/api/tests/BUILD.bazel +++ b/src/workerd/api/tests/BUILD.bazel @@ -14,9 +14,9 @@ wd_test( ) wd_test( - src = "tcp-ingress-test.wd-test", + src = "connect-handler-test.wd-test", args = ["--experimental"], - data = ["tcp-ingress-test.js"], + data = ["connect-handler-test.js"], ) wd_test( @@ -42,6 +42,7 @@ wd_test( "tail-worker-test-invalid.js", "tail-worker-test-jsrpc.js", "websocket-hibernation.js", + "connect-handler-test.js", ], ) diff --git a/src/workerd/api/tests/tcp-ingress-test.js b/src/workerd/api/tests/connect-handler-test.js similarity index 94% rename from src/workerd/api/tests/tcp-ingress-test.js rename to src/workerd/api/tests/connect-handler-test.js index 1f83b5eeeef..9afb3f90a65 100644 --- a/src/workerd/api/tests/tcp-ingress-test.js +++ b/src/workerd/api/tests/connect-handler-test.js @@ -8,6 +8,7 @@ export const newFunction = { const dec = new TextDecoder(); let result = ''; for await (const chunk of socket.readable) { + dec.decode(chunk, { stream: true }); result += dec.decode(chunk, { stream: true }); } result += dec.decode(); diff --git a/src/workerd/api/tests/tcp-ingress-test.wd-test b/src/workerd/api/tests/connect-handler-test.wd-test similarity index 58% rename from src/workerd/api/tests/tcp-ingress-test.wd-test rename to src/workerd/api/tests/connect-handler-test.wd-test index 08b7300cd78..10fc1c11091 100644 --- a/src/workerd/api/tests/tcp-ingress-test.wd-test +++ b/src/workerd/api/tests/connect-handler-test.wd-test @@ -2,17 +2,17 @@ using Workerd = import "/workerd/workerd.capnp"; const unitTests :Workerd.Config = ( services = [ - ( name = "tcp-ingress-test", + ( name = "connect-handler-test", worker = ( modules = [ - (name = "worker", esModule = embed "tcp-ingress-test.js"), + (name = "worker", esModule = embed "connect-handler-test.js"), ], - compatibilityFlags = ["nodejs_compat_v2", "experimental"], + compatibilityFlags = ["nodejs_compat_v2", "experimental", "streams_enable_constructors"], ) ), ( name = "internet", network = ( allow = ["private"] ) ) ], sockets = [ - (name = "tcp", address = "*:8081", tcp = (), service = "tcp-ingress-test") + (name = "tcp", address = "*:8081", tcp = (), service = "connect-handler-test") ] ); diff --git a/src/workerd/api/tests/js-rpc-test.js b/src/workerd/api/tests/js-rpc-test.js index 2f9e84ccabc..4619f44c988 100644 --- a/src/workerd/api/tests/js-rpc-test.js +++ b/src/workerd/api/tests/js-rpc-test.js @@ -194,6 +194,11 @@ export class MyService extends WorkerEntrypoint { return new Response('method = ' + req.method + ', url = ' + req.url); } + async connect(arg) { + const enc = new TextEncoder(); + return ReadableStream.from([enc.encode('hello')]); + } + // Define a property to test behavior of property accessors. get nonFunctionProperty() { return { foo: 123 }; @@ -631,6 +636,21 @@ export let extendingEntrypointClasses = { assert.equal(svc instanceof WorkerEntrypoint, true); }, }; +export let connectBinding = { + async test(controller, env, ctx) { + let socket = await env.MyService.connect('localhost:8081'); + await socket.opened; + const dec = new TextDecoder(); + let result = ''; + for await (const chunk of socket.readable) { + dec.decode(chunk, { stream: true }); + result += dec.decode(chunk, { stream: true }); + } + result += dec.decode(); + assert.strictEqual(result, 'hello'); + await socket.closed; + }, +}; export let namedServiceBinding = { async test(controller, env, ctx) { diff --git a/src/workerd/api/tests/tail-worker-test-receiver.js b/src/workerd/api/tests/tail-worker-test-receiver.js index 22767022f85..faf5bb8eab0 100644 --- a/src/workerd/api/tests/tail-worker-test-receiver.js +++ b/src/workerd/api/tests/tail-worker-test-receiver.js @@ -32,7 +32,7 @@ export const test = { // The shared tail worker we configured only produces onset and outcome events, so every trace is identical here. // Number of traces based on how often main tail worker is invoked from previous tests - let numTraces = 28; + let numTraces = 30; let basicTrace = '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"trace","traces":[]}}{"type":"return"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}'; assert.deepStrictEqual( diff --git a/src/workerd/api/tests/tail-worker-test.js b/src/workerd/api/tests/tail-worker-test.js index aa3429e48cb..06715e718a7 100644 --- a/src/workerd/api/tests/tail-worker-test.js +++ b/src/workerd/api/tests/tail-worker-test.js @@ -131,6 +131,9 @@ export const test = { // Test for transient objects - getCounter returns an object with methods // All transient calls happen in a single trace event, with only the entrypoint method reported '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","entrypoint":"MyService","scriptTags":[],"info":{"type":"jsrpc"}}{"type":"attributes","info":[{"name":"jsrpc.method","value":"getCounter"}]}{"type":"log","level":"log","message":["bar"]}{"type":"log","level":"log","message":["getCounter called"]}{"type":"return"}{"type":"log","level":"log","message":["increment called on transient"]}{"type":"log","level":"log","message":["getValue called on transient"]}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}', + // tests/connect-handler-test.js: connect events + '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","entrypoint":"newFunction","scriptTags":[],"info":{"type":"custom"}}{"type":"spanOpen","name":"connect","spanId":"0000000000000001"}{"type":"spanClose","outcome":"ok"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}', + '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"connect"}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}', ]; assert.deepStrictEqual(response, expected); diff --git a/src/workerd/api/tests/tail-worker-test.wd-test b/src/workerd/api/tests/tail-worker-test.wd-test index b8c8aa67b08..85fed7e18f2 100644 --- a/src/workerd/api/tests/tail-worker-test.wd-test +++ b/src/workerd/api/tests/tail-worker-test.wd-test @@ -32,6 +32,16 @@ const unitTests :Workerd.Config = ( (name = "alarms", worker = .alarmsWorker), (name = "hiber", worker = .hiberWorker), (name = "js-rpc-test", worker = .jsRpcWorker), + ( name = "connect-handler-test", + worker = ( + modules = [ + (name = "worker", esModule = embed "connect-handler-test.js"), + ], + compatibilityFlags = ["nodejs_compat_v2", "experimental", "streams_enable_constructors"], + streamingTails = ["log"], + ) + ), + ( name = "internet", network = ( allow = ["private"] ) ), (name = "TEST_TMPDIR", disk = (writable = true)), # Dummy buffered tail worker (gets traces from alarms worker and produces trace for main tracer) (name = "buffered", worker = .logBuffered, ), @@ -50,6 +60,9 @@ const unitTests :Workerd.Config = ( ), ) ], + sockets = [ + (name = "tcp", address = "*:8081", tcp = (), service = "connect-handler-test") + ] ); const alarmsWorker :Workerd.Worker = ( diff --git a/src/workerd/io/trace-stream.c++ b/src/workerd/io/trace-stream.c++ index 3793e113674..557f7bbf924 100644 --- a/src/workerd/io/trace-stream.c++ +++ b/src/workerd/io/trace-stream.c++ @@ -24,6 +24,7 @@ namespace { V(CFJSON, "cfJson") \ V(CLOSE, "close") \ V(CODE, "code") \ + V(CONNECT, "connect") \ V(COUNT, "count") \ V(CPUTIME, "cpuTime") \ V(CRON, "cron") \ @@ -292,6 +293,13 @@ jsg::JsValue ToJs(jsg::Lock& js, const HibernatableWebSocketEventInfo& info, Str return obj; } +// TODO +jsg::JsValue ToJs(jsg::Lock& js, const ConnectEventInfo& info, StringCache& cache) { + auto obj = js.obj(); + obj.set(js, TYPE_STR, cache.get(js, CONNECT_STR)); + return obj; +} + jsg::JsValue ToJs(jsg::Lock& js, const CustomEventInfo& info, StringCache& cache) { auto obj = js.obj(); obj.set(js, TYPE_STR, cache.get(js, CUSTOM_STR)); @@ -385,9 +393,8 @@ jsg::JsValue ToJs(jsg::Lock& js, const Onset& onset, StringCache& cache) { KJ_CASE_ONEOF(hws, HibernatableWebSocketEventInfo) { obj.set(js, INFO_STR, ToJs(js, hws, cache)); } - // TODO - KJ_CASE_ONEOF(custom, ConnectEventInfo) { - // obj.set(js, INFO_STR, ToJs(js, custom, cache)); + KJ_CASE_ONEOF(connect, ConnectEventInfo) { + obj.set(js, INFO_STR, ToJs(js, connect, cache)); } KJ_CASE_ONEOF(custom, CustomEventInfo) { obj.set(js, INFO_STR, ToJs(js, custom, cache)); diff --git a/src/workerd/io/worker-entrypoint.c++ b/src/workerd/io/worker-entrypoint.c++ index 8a200574867..a0725e92202 100644 --- a/src/workerd/io/worker-entrypoint.c++ +++ b/src/workerd/io/worker-entrypoint.c++ @@ -242,7 +242,7 @@ void WorkerEntrypoint::init(kj::Own worker, .attach(kj::mv(actor)); } -kj::Exception exceptionToPropagate2(bool isInternalException, kj::Exception&& exception) { +kj::Exception exceptionToPropagate(bool isInternalException, kj::Exception&& exception) { if (isInternalException) { // We've already logged it here, the only thing that matters to the client is that we failed // due to an internal error. Note that this does not need to be labeled "remote." since jsg @@ -461,7 +461,7 @@ kj::Promise WorkerEntrypoint::request(kj::HttpMethod method, // TODO(cleanup): We'd really like to tunnel exceptions any time a worker is calling another // worker, not just for actors (and W2W below), but getting that right will require cleaning // up error handling more generally. - return exceptionToPropagate2(isInternalException, kj::mv(exception)); + return exceptionToPropagate(isInternalException, kj::mv(exception)); } else KJ_IF_SOME(service, failOpenService) { // Fall back to origin. @@ -495,7 +495,7 @@ kj::Promise WorkerEntrypoint::request(kj::HttpMethod method, // Like with the isActor check, we want to return exceptions back to the caller. // We don't want to handle this case the same as the isActor case though, since we want // fail-open to operate normally, which means this case must happen after fail-open handling. - return exceptionToPropagate2(isInternalException, kj::mv(exception)); + return exceptionToPropagate(isInternalException, kj::mv(exception)); } else { // Return error. @@ -532,20 +532,16 @@ kj::Promise WorkerEntrypoint::connect(kj::StringPtr host, auto incomingRequest = kj::mv(KJ_REQUIRE_NONNULL(this->incomingRequest, "connect() can only be called once")); this->incomingRequest = kj::none; - // Whenever we implement incoming connections over the `connect` handler we need to remember to - // add tracing `onset` and `return` events using setEventInfo()/setReturn(), as with the other - // event types here. - incomingRequest->delivered(); auto& context = incomingRequest->getContext(); - // TODO: Does this block interfere with connect stuff below, does drain() get duplicated? - KJ_DEFER({ - // Since we called incomingRequest->delivered, we are obliged to call `drain()`. - auto promise = incomingRequest->drain().attach(kj::mv(incomingRequest)); - waitUntilTasks.add(maybeAddGcPassForTest(context, kj::mv(promise))); - }); - if (context.getWorker().getIsolate().getApi().getFeatureFlags().getConnectPassThrough()) { + incomingRequest->delivered(); + + KJ_DEFER({ + // Since we called incomingRequest->delivered, we are obliged to call `drain()`. + auto promise = incomingRequest->drain().attach(kj::mv(incomingRequest)); + waitUntilTasks.add(maybeAddGcPassForTest(context, kj::mv(promise))); + }); // connect_pass_through feature flag means we should just forward the connect request on to // the global outbound. @@ -557,6 +553,17 @@ kj::Promise WorkerEntrypoint::connect(kj::StringPtr host, return next->connect(host, headers, connection, response, settings); } + if (!context.getWorker().getIsolate().getApi().getFeatureFlags().getWorkerdExperimental()) { + incomingRequest->delivered(); + + KJ_DEFER({ + // Since we called incomingRequest->delivered, we are obliged to call `drain()`. + auto promise = incomingRequest->drain().attach(kj::mv(incomingRequest)); + waitUntilTasks.add(maybeAddGcPassForTest(context, kj::mv(promise))); + }); + JSG_FAIL_REQUIRE(TypeError, "Incoming CONNECT on a worker not supported"); + } + bool isActor = context.getActor() != kj::none; KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) { @@ -567,8 +574,10 @@ kj::Promise WorkerEntrypoint::connect(kj::StringPtr host, t.setEventInfo(*incomingRequest, tracing::ConnectEventInfo(kj::mv(cfJson))); } + incomingRequest->delivered(); auto metricsForCatch = kj::addRef(incomingRequest->getMetrics()); + auto metricsForProxyTask = kj::addRef(incomingRequest->getMetrics()); return context .run([this, &context, &connection, &response, entrypointName = entrypointName]( @@ -581,7 +590,6 @@ kj::Promise WorkerEntrypoint::connect(kj::StringPtr host, .then([this](api::DeferredProxy deferredProxy) { proxyTask = kj::mv(deferredProxy.proxyTask); }) - .exclusiveJoin(context.onAbort()) .catch_([this, &context](kj::Exception&& exception) mutable -> kj::Promise { // Log JS exceptions to the JS console, if fiddle is attached. This also has the effect of // logging internal errors to syslog. @@ -599,12 +607,18 @@ kj::Promise WorkerEntrypoint::connect(kj::StringPtr host, // The request has been canceled, but allow it to continue executing in the background. auto promise = incomingRequest->drain().attach(kj::mv(incomingRequest)); waitUntilTasks.add(maybeAddGcPassForTest(context, kj::mv(promise))); + //})) + // .then([this, metrics = kj::mv(metricsForProxyTask)]() -> kj::Promise { })) - .then([this]() -> kj::Promise { + .then([this, metrics = kj::mv(metricsForProxyTask)]() mutable -> kj::Promise { // Now that the IoContext is dropped (unless it had waitUntil()s), we can finish proxying // without pinning it or the isolate into memory. KJ_IF_SOME(p, proxyTask) { - return kj::mv(p); + //return kj::mv(p); + return p.catch_([metrics = kj::mv(metrics)](kj::Exception&& e) mutable -> kj::Promise { + metrics->reportFailure(e, RequestObserver::FailureSource::DEFERRED_PROXY); + return kj::mv(e); + }); } else { return kj::READY_NOW; } @@ -634,7 +648,7 @@ kj::Promise WorkerEntrypoint::connect(kj::StringPtr host, // TODO(cleanup): We'd really like to tunnel exceptions any time a worker is calling another // worker, not just for actors (and W2W below), but getting that right will require cleaning // up error handling more generally. - return exceptionToPropagate2(isInternalException, kj::mv(exception)); + return exceptionToPropagate(isInternalException, kj::mv(exception)); } else { // Return error. diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index 7eff0d08bf6..5ab7b4b5f39 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -5045,7 +5045,7 @@ kj::Maybe processCfBlobHeader(kj::AuthenticatedStream& stream) { } KJ_IF_SOME(remote, kj::dynamicDowncastIfAvailable(*peerId)) { - return kj::str("{\"clientIp\": \"", escapeJsonString(remote.toString()), "\"}"); + return kj::str("{\"clientIp\": ", escapeJsonString(remote.toString()), "}"); } else KJ_IF_SOME(local, kj::dynamicDowncastIfAvailable(*peerId)) { auto creds = local.getCredentials(); @@ -5261,8 +5261,10 @@ class Server::TcpListener final: public kj::Refcounted { rewriter(kj::mv(rewriter)) {} kj::Promise run() { + TRACE_EVENT("workerd", "TcpListener::run"); for (;;) { kj::AuthenticatedStream stream = co_await listener->acceptAuthenticated(); + TRACE_EVENT("workerd", "TcpListener handle connection"); kj::Maybe cfBlobJson; if (!rewriter->hasCfBlobHeader()) { @@ -5274,8 +5276,8 @@ class Server::TcpListener final: public kj::Refcounted { auto req = service->startRequest(kj::mv(metadata)); auto response = kj::heap(); kj::HttpHeaders headers(headerTable); - // The empty string here is the host parameter that is required by the API - // but is not actually used in the implementation at this point. + // TODO(now): The empty string here is the host parameter that is required by the API but is + // not actually used in the implementation at this point. owner.tasks.add(req->connect(""_kj, headers, *stream.stream, *response, {}) .attach(kj::mv(stream.stream), kj::mv(response)) .attach(kj::mv(req))); @@ -5289,16 +5291,19 @@ class Server::TcpListener final: public kj::Refcounted { kj::HttpHeaderTable& headerTable; kj::Own rewriter; + // TODO: Would using a plain ConnectResponse work here too? struct ResponseWrapper final: public kj::HttpService::ConnectResponse { void accept( uint statusCode, kj::StringPtr statusText, const kj::HttpHeaders& headers) override { // Ok.. we're accepting the connection... anything to do? + KJ_LOG(WARNING, "accepted TCP stream", statusCode, statusText); } kj::Own reject(uint statusCode, kj::StringPtr statusText, const kj::HttpHeaders& headers, kj::Maybe expectedBodySize = kj::none) override { // Doh... we're rejecting the connection... anything to do? + KJ_LOG(WARNING, "rejected TCP stream"); return newNullOutputStream(); } }; @@ -5829,8 +5834,8 @@ kj::Promise Server::listenOnSockets(config::Config::Reader config, })(kj::mv(listener), kj::mv(t)); } - // Need to create rewriter before waiting on anything since `headerTableBuilder` will - // no longer be available later. + // Need to create rewriter before waiting on anything since `headerTableBuilder` will no longer + // be available later. auto rewriter = kj::heap(httpOptions, headerTableBuilder); auto handle = kj::coCapture( diff --git a/types/defines/rpc.d.ts b/types/defines/rpc.d.ts index cc3ef17bf1c..5277e11697b 100644 --- a/types/defines/rpc.d.ts +++ b/types/defines/rpc.d.ts @@ -239,6 +239,7 @@ declare namespace CloudflareWorkersModule { email?(message: ForwardableEmailMessage): void | Promise; fetch?(request: Request): Response | Promise; + connect?(socket: Socket): Socket | Promise; queue?(batch: MessageBatch): void | Promise; scheduled?(controller: ScheduledController): void | Promise; tail?(events: TraceItem[]): void | Promise; diff --git a/types/generated-snapshot/experimental/index.d.ts b/types/generated-snapshot/experimental/index.d.ts index d5fce056771..8dbdc373150 100755 --- a/types/generated-snapshot/experimental/index.d.ts +++ b/types/generated-snapshot/experimental/index.d.ts @@ -498,6 +498,11 @@ type ExportedHandlerFetchHandler = ( env: Env, ctx: ExecutionContext, ) => Response | Promise; +type ExportedHandlerConnectHandler = ( + readable: ReadableStream, + env: Env, + ctx: ExecutionContext, +) => ReadableStream | Promise; type ExportedHandlerTailHandler = ( events: TraceItem[], env: Env, @@ -534,6 +539,7 @@ interface ExportedHandler< CfHostMetadata = unknown, > { fetch?: ExportedHandlerFetchHandler; + connect?: ExportedHandlerConnectHandler; tail?: ExportedHandlerTailHandler; trace?: ExportedHandlerTraceHandler; tailStream?: ExportedHandlerTailStreamHandler; @@ -557,6 +563,10 @@ interface AlarmInvocationInfo { readonly isRetry: boolean; readonly retryCount: number; } +interface ConnectEvent { + get inbound(): ReadableStream; + get cf(): any | undefined; +} interface Cloudflare { readonly compatibilityFlags: Record; } @@ -3205,6 +3215,7 @@ interface TraceItem { | ( | TraceItemFetchEventInfo | TraceItemJsRpcEventInfo + | TraceItemConnectEventInfo | TraceItemScheduledEventInfo | TraceItemAlarmEventInfo | TraceItemQueueEventInfo @@ -3233,6 +3244,9 @@ interface TraceItem { interface TraceItemAlarmEventInfo { readonly scheduledTime: Date; } +interface TraceItemConnectEventInfo { + readonly cf?: any; +} interface TraceItemCustomEventInfo {} interface TraceItemScheduledEventInfo { readonly scheduledTime: number; @@ -12154,6 +12168,7 @@ declare namespace CloudflareWorkersModule { constructor(ctx: ExecutionContext, env: Env); email?(message: ForwardableEmailMessage): void | Promise; fetch?(request: Request): Response | Promise; + connect?(socket: Socket): Socket | Promise; queue?(batch: MessageBatch): void | Promise; scheduled?(controller: ScheduledController): void | Promise; tail?(events: TraceItem[]): void | Promise; diff --git a/types/generated-snapshot/experimental/index.ts b/types/generated-snapshot/experimental/index.ts index 71c0d95e46c..4b25fd2df71 100755 --- a/types/generated-snapshot/experimental/index.ts +++ b/types/generated-snapshot/experimental/index.ts @@ -503,6 +503,11 @@ export type ExportedHandlerFetchHandler< env: Env, ctx: ExecutionContext, ) => Response | Promise; +export type ExportedHandlerConnectHandler = ( + readable: ReadableStream, + env: Env, + ctx: ExecutionContext, +) => ReadableStream | Promise; export type ExportedHandlerTailHandler = ( events: TraceItem[], env: Env, @@ -539,6 +544,7 @@ export interface ExportedHandler< CfHostMetadata = unknown, > { fetch?: ExportedHandlerFetchHandler; + connect?: ExportedHandlerConnectHandler; tail?: ExportedHandlerTailHandler; trace?: ExportedHandlerTraceHandler; tailStream?: ExportedHandlerTailStreamHandler; @@ -562,6 +568,10 @@ export interface AlarmInvocationInfo { readonly isRetry: boolean; readonly retryCount: number; } +export interface ConnectEvent { + get inbound(): ReadableStream; + get cf(): any | undefined; +} export interface Cloudflare { readonly compatibilityFlags: Record; } @@ -3214,6 +3224,7 @@ export interface TraceItem { | ( | TraceItemFetchEventInfo | TraceItemJsRpcEventInfo + | TraceItemConnectEventInfo | TraceItemScheduledEventInfo | TraceItemAlarmEventInfo | TraceItemQueueEventInfo @@ -3242,6 +3253,9 @@ export interface TraceItem { export interface TraceItemAlarmEventInfo { readonly scheduledTime: Date; } +export interface TraceItemConnectEventInfo { + readonly cf?: any; +} export interface TraceItemCustomEventInfo {} export interface TraceItemScheduledEventInfo { readonly scheduledTime: number; @@ -12122,6 +12136,7 @@ export declare namespace CloudflareWorkersModule { constructor(ctx: ExecutionContext, env: Env); email?(message: ForwardableEmailMessage): void | Promise; fetch?(request: Request): Response | Promise; + connect?(socket: Socket): Socket | Promise; queue?(batch: MessageBatch): void | Promise; scheduled?(controller: ScheduledController): void | Promise; tail?(events: TraceItem[]): void | Promise; diff --git a/types/generated-snapshot/latest/index.d.ts b/types/generated-snapshot/latest/index.d.ts index e037d3df8bb..b43901f5522 100755 --- a/types/generated-snapshot/latest/index.d.ts +++ b/types/generated-snapshot/latest/index.d.ts @@ -485,6 +485,11 @@ type ExportedHandlerFetchHandler = ( env: Env, ctx: ExecutionContext, ) => Response | Promise; +type ExportedHandlerConnectHandler = ( + readable: ReadableStream, + env: Env, + ctx: ExecutionContext, +) => ReadableStream | Promise; type ExportedHandlerTailHandler = ( events: TraceItem[], env: Env, @@ -521,6 +526,7 @@ interface ExportedHandler< CfHostMetadata = unknown, > { fetch?: ExportedHandlerFetchHandler; + connect?: ExportedHandlerConnectHandler; tail?: ExportedHandlerTailHandler; trace?: ExportedHandlerTraceHandler; tailStream?: ExportedHandlerTailStreamHandler; @@ -543,6 +549,10 @@ interface AlarmInvocationInfo { readonly isRetry: boolean; readonly retryCount: number; } +interface ConnectEvent { + get inbound(): ReadableStream; + get cf(): any | undefined; +} interface Cloudflare { readonly compatibilityFlags: Record; } @@ -3111,6 +3121,7 @@ interface TraceItem { | ( | TraceItemFetchEventInfo | TraceItemJsRpcEventInfo + | TraceItemConnectEventInfo | TraceItemScheduledEventInfo | TraceItemAlarmEventInfo | TraceItemQueueEventInfo @@ -3139,6 +3150,9 @@ interface TraceItem { interface TraceItemAlarmEventInfo { readonly scheduledTime: Date; } +interface TraceItemConnectEventInfo { + readonly cf?: any; +} interface TraceItemCustomEventInfo {} interface TraceItemScheduledEventInfo { readonly scheduledTime: number; @@ -11550,6 +11564,7 @@ declare namespace CloudflareWorkersModule { constructor(ctx: ExecutionContext, env: Env); email?(message: ForwardableEmailMessage): void | Promise; fetch?(request: Request): Response | Promise; + connect?(socket: Socket): Socket | Promise; queue?(batch: MessageBatch): void | Promise; scheduled?(controller: ScheduledController): void | Promise; tail?(events: TraceItem[]): void | Promise; diff --git a/types/generated-snapshot/latest/index.ts b/types/generated-snapshot/latest/index.ts index a8607b690f6..2b4c7960bc6 100755 --- a/types/generated-snapshot/latest/index.ts +++ b/types/generated-snapshot/latest/index.ts @@ -490,6 +490,11 @@ export type ExportedHandlerFetchHandler< env: Env, ctx: ExecutionContext, ) => Response | Promise; +export type ExportedHandlerConnectHandler = ( + readable: ReadableStream, + env: Env, + ctx: ExecutionContext, +) => ReadableStream | Promise; export type ExportedHandlerTailHandler = ( events: TraceItem[], env: Env, @@ -526,6 +531,7 @@ export interface ExportedHandler< CfHostMetadata = unknown, > { fetch?: ExportedHandlerFetchHandler; + connect?: ExportedHandlerConnectHandler; tail?: ExportedHandlerTailHandler; trace?: ExportedHandlerTraceHandler; tailStream?: ExportedHandlerTailStreamHandler; @@ -548,6 +554,10 @@ export interface AlarmInvocationInfo { readonly isRetry: boolean; readonly retryCount: number; } +export interface ConnectEvent { + get inbound(): ReadableStream; + get cf(): any | undefined; +} export interface Cloudflare { readonly compatibilityFlags: Record; } @@ -3120,6 +3130,7 @@ export interface TraceItem { | ( | TraceItemFetchEventInfo | TraceItemJsRpcEventInfo + | TraceItemConnectEventInfo | TraceItemScheduledEventInfo | TraceItemAlarmEventInfo | TraceItemQueueEventInfo @@ -3148,6 +3159,9 @@ export interface TraceItem { export interface TraceItemAlarmEventInfo { readonly scheduledTime: Date; } +export interface TraceItemConnectEventInfo { + readonly cf?: any; +} export interface TraceItemCustomEventInfo {} export interface TraceItemScheduledEventInfo { readonly scheduledTime: number; @@ -11518,6 +11532,7 @@ export declare namespace CloudflareWorkersModule { constructor(ctx: ExecutionContext, env: Env); email?(message: ForwardableEmailMessage): void | Promise; fetch?(request: Request): Response | Promise; + connect?(socket: Socket): Socket | Promise; queue?(batch: MessageBatch): void | Promise; scheduled?(controller: ScheduledController): void | Promise; tail?(events: TraceItem[]): void | Promise;