Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions samples/tcp-ingress/config.capnp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using Workerd = import "/workerd/workerd.capnp";

const tcpIngressExample :Workerd.Config = (
services = [
(name = "main", worker = .worker),
],

sockets = [
( name = "http", address = "*:8080", http = (), service = "main" ),
( name = "tcp", address = "*:8081", tcp = (), service = "main" )
]
);

const worker :Workerd.Worker = (
modules = [
(name = "worker", esModule = embed "worker.js")
],
compatibilityFlags = ["nodejs_compat_v2", "experimental"],
compatibilityDate = "2023-02-28",
);
11 changes: 11 additions & 0 deletions samples/tcp-ingress/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@

export default {
async fetch(req) {
return new Response("ok");
},

connect({inbound, cf}) {
console.log(cf);
return inbound.pipeThrough(new IdentityTransformStream());
}
};
86 changes: 86 additions & 0 deletions src/workerd/api/global-scope.c++
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ jsg::LenientOptional<T> mapAddRef(jsg::Lock& js, jsg::LenientOptional<T>& functi
ExportedHandler ExportedHandler::clone(jsg::Lock& js) {
return ExportedHandler{
.fetch{mapAddRef(js, fetch)},
.connect{mapAddRef(js, connect)},
.tail{mapAddRef(js, tail)},
.trace{mapAddRef(js, trace)},
.tailStream{mapAddRef(js, tailStream)},
Expand Down Expand Up @@ -118,6 +119,91 @@ void ServiceWorkerGlobalScope::clear() {
unhandledRejections.clear();
}

kj::Promise<DeferredProxy<void>> ServiceWorkerGlobalScope::connect(kj::AsyncIoStream& connection,
kj::HttpService::ConnectResponse& response,
kj::Maybe<kj::StringPtr> cfBlobJson,
Worker::Lock& lock,
kj::Maybe<ExportedHandler&> exportedHandler) {
ExportedHandler& eh = JSG_REQUIRE_NONNULL(exportedHandler, Error,
"Connect ingress is not currently supported with Service Workers syntax.");
KJ_REQUIRE(FeatureFlags::get(lock).getWorkerdExperimental(),
"TCP ingress requires the experimental flag.");

KJ_IF_SOME(handler, eh.connect) {
// Has a connect handler!
kj::HttpHeaderTable table;
kj::HttpHeaders headers(table);
response.accept(200, "OK", headers);

// TODO(cleanup): There's a fair amount of duplication between this and
// the request() method, and much of this could likely be cleaned up to
// use co_awaits rather than the promise syntax.
auto ownConnection = newNeuterableIoStream(connection);
auto deferredNeuter = kj::defer([ownConnection = kj::addRef(*ownConnection)]() mutable {
ownConnection->neuter(makeNeuterException(NeuterReason::CLIENT_DISCONNECTED));
});
KJ_ON_SCOPE_FAILURE(ownConnection->neuter(makeNeuterException(NeuterReason::THREW_EXCEPTION)));
auto conn2 = kj::addRef(*ownConnection);

auto& ioContext = IoContext::current();
jsg::Lock& js = lock;

CfProperty cf(cfBlobJson);

auto ownConn2 = kj::addRef(*ownConnection);
auto conn = newSystemMultiStream(ownConn2, ioContext);
auto jsInbound = js.alloc<ReadableStream>(ioContext, kj::mv(conn.readable));

kj::Maybe<SpanBuilder> span = ioContext.makeTraceSpan("connect_handler"_kjc);
auto event = js.alloc<ConnectEvent>(kj::mv(jsInbound), kj::mv(cf));
auto promise = handler(js, kj::mv(event), eh.env.addRef(js), eh.getCtx());

auto canceled = kj::refcounted<kj::RefcountedWrapper<bool>>(false);

return ioContext
.awaitJs(js,
promise.then(js,
ioContext.addFunctor([canceled = canceled->addWrappedRef(),
outbound = kj::mv(conn.writable), span = kj::mv(span)](
jsg::Lock& js, jsg::Ref<ReadableStream> jsOutbound) mutable
-> IoOwn<kj::Promise<DeferredProxy<void>>> {
auto& context = IoContext::current();
span = kj::none;
if (*canceled) {
// The client disconnected before the response was ready. The outbound is a dangling
// reference, let's not use it.
return context.addObject(kj::heap(addNoopDeferredProxy(kj::READY_NOW)));
} else {
return context.addObject(kj::heap(jsOutbound->pumpTo(js, kj::mv(outbound), true)));
}
})))
.attach(
kj::defer([canceled = kj::mv(canceled)]() mutable { canceled->getWrapped() = true; }))
.then(
[ownConnection = kj::mv(ownConnection), deferredNeuter = kj::mv(deferredNeuter)](
DeferredProxy<void> deferredProxy) mutable {
deferredProxy.proxyTask = deferredProxy.proxyTask
.then([connection = kj::addRef(*ownConnection)]() mutable {
connection->neuter(makeNeuterException(NeuterReason::SENT_RESPONSE));
}, [connection = kj::addRef(*ownConnection)](kj::Exception&& e) mutable {
connection->neuter(makeNeuterException(NeuterReason::THREW_EXCEPTION));
kj::throwFatalException(kj::mv(e));
}).attach(kj::mv(deferredNeuter));

return deferredProxy;
},
[connection = kj::mv(conn2)](kj::Exception&& e) mutable -> DeferredProxy<void> {
// HACK: We depend on the fact that the success-case lambda above hasn't been destroyed yet
// so `deferredNeuter` hasn't been destroyed yet.
connection->neuter(makeNeuterException(NeuterReason::THREW_EXCEPTION));
kj::throwFatalException(kj::mv(e));
});
}
lock.logWarningOnce("Received a connect event but we lack a handler. "
"Did you remember to export a connect() function?");
JSG_FAIL_REQUIRE(Error, "Handler does not export a connect() function.");
}

kj::Promise<DeferredProxy<void>> ServiceWorkerGlobalScope::request(kj::HttpMethod method,
kj::StringPtr url,
const kj::HttpHeaders& headers,
Expand Down
45 changes: 44 additions & 1 deletion src/workerd/api/global-scope.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,34 @@ class AlarmInvocationInfo: public jsg::Object {
uint32_t retryCount = 0;
};

class ConnectEvent final: public Event {
public:
static jsg::Ref<ConnectEvent> constructor() = delete;

ConnectEvent(jsg::Ref<ReadableStream> inbound, CfProperty&& cf)
: Event("connect"),
inbound(kj::mv(inbound)),
cf(kj::mv(cf)) {}

jsg::Ref<ReadableStream> getInbound() {
return inbound.addRef();
}

// Returns the `cf` field containing Cloudflare feature flags.
jsg::Optional<jsg::JsObject> getCf(jsg::Lock& js) {
return cf.get(js);
}

JSG_RESOURCE_TYPE(ConnectEvent) {
JSG_READONLY_PROTOTYPE_PROPERTY(inbound, getInbound);
JSG_READONLY_PROTOTYPE_PROPERTY(cf, getCf);
}

private:
jsg::Ref<ReadableStream> inbound;
CfProperty cf;
};

// Type signature for handlers exported from the root module.
//
// We define each handler method as a LenientOptional rather than as a plain Optional in order to
Expand All @@ -296,6 +324,11 @@ struct ExportedHandler {
using TailHandler = kj::Promise<void>(kj::Array<jsg::Ref<TraceItem>> events,
jsg::Value env,
jsg::Optional<jsg::Ref<ExecutionContext>> ctx);
using ConnectHandler = jsg::Promise<jsg::Ref<api::ReadableStream>>(jsg::Ref<ConnectEvent> connect,
jsg::Value env,
jsg::Optional<jsg::Ref<ExecutionContext>> ctx);
jsg::LenientOptional<jsg::Function<ConnectHandler>> connect;

jsg::LenientOptional<jsg::Function<TailHandler>> tail;
jsg::LenientOptional<jsg::Function<TailHandler>> trace;

Expand Down Expand Up @@ -332,6 +365,7 @@ struct ExportedHandler {
jsg::SelfRef self;

JSG_STRUCT(fetch,
connect,
tail,
trace,
tailStream,
Expand All @@ -349,6 +383,7 @@ struct ExportedHandler {

JSG_STRUCT_TS_DEFINE(
type ExportedHandlerFetchHandler<Env = unknown, CfHostMetadata = unknown> = (request: Request<CfHostMetadata, IncomingRequestCfProperties<CfHostMetadata>>, env: Env, ctx: ExecutionContext) => Response | Promise<Response>;
type ExportedHandlerConnectHandler<Env = unknown> = (readable: ReadableStream, env: Env, ctx: ExecutionContext) => ReadableStream | Promise<ReadableStream>;
type ExportedHandlerTailHandler<Env = unknown> = (events: TraceItem[], env: Env, ctx: ExecutionContext) => void | Promise<void>;
type ExportedHandlerTraceHandler<Env = unknown> = (traces: TraceItem[], env: Env, ctx: ExecutionContext) => void | Promise<void>;
type ExportedHandlerTailStreamHandler<Env = unknown> = (event : TailStream.TailEvent<TailStream.Onset>, env: Env, ctx: ExecutionContext) => TailStream.TailEventHandlerType | Promise<TailStream.TailEventHandlerType>;
Expand All @@ -359,6 +394,7 @@ struct ExportedHandler {
JSG_STRUCT_TS_OVERRIDE(<Env = unknown, QueueHandlerMessage = unknown, CfHostMetadata = unknown> {
email?: EmailExportedHandler<Env>;
fetch?: ExportedHandlerFetchHandler<Env, CfHostMetadata>;
connect?: ExportedHandlerConnectHandler<Env>;
tail?: ExportedHandlerTailHandler<Env>;
trace?: ExportedHandlerTraceHandler<Env>;
tailStream?: ExportedHandlerTailStreamHandler<Env>;
Expand Down Expand Up @@ -458,6 +494,13 @@ class ServiceWorkerGlobalScope: public WorkerGlobalScope {
// TODO(cleanup): Factor out the shared code used between old-style event listeners vs. module
// exports and move that code somewhere more appropriate.

// Received TCP/socket ingress (called from C++, not JS).
kj::Promise<DeferredProxy<void>> connect(kj::AsyncIoStream& connection,
kj::HttpService::ConnectResponse& response,
kj::Maybe<kj::StringPtr> cfBlobJson,
Worker::Lock& lock,
kj::Maybe<ExportedHandler&> exportedHandler);

// Received sendTraces (called from C++, not JS).
void sendTraces(kj::ArrayPtr<kj::Own<Trace>> traces,
Worker::Lock& lock,
Expand Down Expand Up @@ -926,6 +969,6 @@ class ServiceWorkerGlobalScope: public WorkerGlobalScope {
api::WorkerGlobalScope, api::ServiceWorkerGlobalScope, api::TestController, \
api::ExecutionContext, api::ExportedHandler, \
api::ServiceWorkerGlobalScope::StructuredCloneOptions, api::Navigator, \
api::AlarmInvocationInfo, api::Immediate, api::Cloudflare
api::AlarmInvocationInfo, api::Immediate, api::ConnectEvent, api::Cloudflare
// The list of global-scope.h types that are added to worker.c++'s JSG_DECLARE_ISOLATE_TYPE
} // namespace workerd::api
9 changes: 9 additions & 0 deletions src/workerd/api/system-streams.c++
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

#include "util.h"

#include <workerd/util/stream-utils.h>

#include <kj/compat/brotli.h>
#include <kj/compat/gzip.h>
#include <kj/one-of.h>
Expand Down Expand Up @@ -388,6 +390,13 @@ SystemMultiStream newSystemMultiStream(
.writable = kj::heap<EncodedAsyncOutputStream>(
stream.addWrappedRef(), StreamEncoding::IDENTITY, context)};
}
SystemMultiStream newSystemMultiStream(kj::Own<NeuterableIoStream>& stream, IoContext& context) {

return {.readable = kj::heap<EncodedAsyncInputStream>(
kj::addRef(*stream), StreamEncoding::IDENTITY, context),
.writable =
kj::heap<EncodedAsyncOutputStream>(kj::addRef(*stream), StreamEncoding::IDENTITY, context)};
}

ContentEncodingOptions::ContentEncodingOptions(CompatibilityFlags::Reader flags)
: brotliEnabled(flags.getBrotliContentEncoding()) {}
Expand Down
5 changes: 5 additions & 0 deletions src/workerd/api/system-streams.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
#include <workerd/io/compatibility-date.capnp.h>
#include <workerd/io/io-context.h>

namespace workerd {
class NeuterableIoStream;
};

namespace workerd::api {

// A ReadableStreamSource which automatically decodes its underlying stream. It does so lazily -- if
Expand Down Expand Up @@ -41,6 +45,7 @@ struct SystemMultiStream {
// A combo ReadableStreamSource and WritableStreamSink.
SystemMultiStream newSystemMultiStream(kj::RefcountedWrapper<kj::Own<kj::AsyncIoStream>>& stream,
IoContext& context = IoContext::current());
SystemMultiStream newSystemMultiStream(kj::Own<NeuterableIoStream>& stream, IoContext& context);

struct ContentEncodingOptions {
bool brotliEnabled = false;
Expand Down
7 changes: 7 additions & 0 deletions src/workerd/api/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ wd_test(
data = ["actor-alarms-delete-test.js"],
)

wd_test(
src = "connect-handler-test.wd-test",
args = ["--experimental"],
data = ["connect-handler-test.js"],
)

wd_test(
src = "actor-alarms-test.wd-test",
args = ["--experimental"],
Expand All @@ -36,6 +42,7 @@ wd_test(
"tail-worker-test-invalid.js",
"tail-worker-test-jsrpc.js",
"websocket-hibernation.js",
"connect-handler-test.js",
],
)

Expand Down
27 changes: 27 additions & 0 deletions src/workerd/api/tests/connect-handler-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { connect } from 'cloudflare:sockets';
import { ok, strictEqual } from 'assert';

export const newFunction = {
async test() {
const socket = connect('localhost:8081');
await socket.opened;
const dec = new TextDecoder();
let result = '';
for await (const chunk of socket.readable) {
dec.decode(chunk, { stream: true });
result += dec.decode(chunk, { stream: true });
}
result += dec.decode();
strictEqual(result, 'hello');
await socket.closed;
},
};

export default {
connect({ inbound, cf }) {
const enc = new TextEncoder();
ok(inbound instanceof ReadableStream);
strictEqual(typeof cf.clientIp, 'string');
return ReadableStream.from([enc.encode('hello')]);
},
};
18 changes: 18 additions & 0 deletions src/workerd/api/tests/connect-handler-test.wd-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using Workerd = import "/workerd/workerd.capnp";

const unitTests :Workerd.Config = (
services = [
( name = "connect-handler-test",
worker = (
modules = [
(name = "worker", esModule = embed "connect-handler-test.js"),
],
compatibilityFlags = ["nodejs_compat_v2", "experimental", "streams_enable_constructors"],
)
),
( name = "internet", network = ( allow = ["private"] ) )
],
sockets = [
(name = "tcp", address = "*:8081", tcp = (), service = "connect-handler-test")
]
);
20 changes: 20 additions & 0 deletions src/workerd/api/tests/js-rpc-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,11 @@ export class MyService extends WorkerEntrypoint {
return new Response('method = ' + req.method + ', url = ' + req.url);
}

async connect(arg) {
const enc = new TextEncoder();
return ReadableStream.from([enc.encode('hello')]);
}

// Define a property to test behavior of property accessors.
get nonFunctionProperty() {
return { foo: 123 };
Expand Down Expand Up @@ -631,6 +636,21 @@ export let extendingEntrypointClasses = {
assert.equal(svc instanceof WorkerEntrypoint, true);
},
};
export let connectBinding = {
async test(controller, env, ctx) {
let socket = await env.MyService.connect('localhost:8081');
await socket.opened;
const dec = new TextDecoder();
let result = '';
for await (const chunk of socket.readable) {
dec.decode(chunk, { stream: true });
result += dec.decode(chunk, { stream: true });
}
result += dec.decode();
assert.strictEqual(result, 'hello');
await socket.closed;
},
};

export let namedServiceBinding = {
async test(controller, env, ctx) {
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/api/tests/tail-worker-test-receiver.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export const test = {

// The shared tail worker we configured only produces onset and outcome events, so every trace is identical here.
// Number of traces based on how often main tail worker is invoked from previous tests
let numTraces = 28;
let numTraces = 30;
let basicTrace =
'{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"trace","traces":[]}}{"type":"return"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}';
assert.deepStrictEqual(
Expand Down
3 changes: 3 additions & 0 deletions src/workerd/api/tests/tail-worker-test.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading