diff --git a/src/workerd/api/tests/http-test.js b/src/workerd/api/tests/http-test.js index 33ec6e0bd35..b498f044725 100644 --- a/src/workerd/api/tests/http-test.js +++ b/src/workerd/api/tests/http-test.js @@ -13,6 +13,20 @@ export default { if (pathname === '/body-length') { return Response.json(Object.fromEntries(request.headers)); } + if (pathname === '/consume-body') { + const body = await request.text(); + return new Response(`Received ${body.length} bytes`); + } + if (pathname === '/streaming-response') { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('chunk1')); + controller.enqueue(new TextEncoder().encode('chunk2')); + controller.close(); + }, + }); + return new Response(stream); + } if (pathname === '/web-socket') { const pair = new WebSocketPair(); pair[0].addEventListener('message', (event) => { @@ -89,6 +103,28 @@ export default { assert.strictEqual(scheduledLastCtrl.scheduledTime, 1000); assert.strictEqual(scheduledLastCtrl.cron, '* * * * 30'); } + + { + const response = await env.SERVICE.fetch( + 'http://placeholder/consume-body', + { + method: 'POST', + body: 'hello', + } + ); + const text = await response.text(); + assert.strictEqual(text, 'Received 5 bytes'); + } + + { + const response = await env.SERVICE.fetch( + 'http://placeholder/streaming-response' + ); + const text = await response.text(); + // "chunk1" + "chunk2" = 12 bytes + assert.strictEqual(text, 'chunk1chunk2'); + assert.strictEqual(text.length, 12); + } }, }; diff --git a/src/workerd/api/tests/tail-worker-test.js b/src/workerd/api/tests/tail-worker-test.js index 752a0fc1d8f..17387c86bb8 100644 --- a/src/workerd/api/tests/tail-worker-test.js +++ b/src/workerd/api/tests/tail-worker-test.js @@ -85,11 +85,13 @@ export const test = { let expected = [ // http-test.js: fetch and scheduled events get reported correctly. // First event is emitted by the test() event, which allows us to get some coverage for span tracing. - '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"custom"}}{"type":"spanOpen","name":"fetch","spanId":"0000000000000001"}{"type":"attributes","info":[{"name":"network.protocol.name","value":"http"},{"name":"network.protocol.version","value":"HTTP/1.1"},{"name":"http.request.method","value":"POST"},{"name":"url.full","value":"http://placeholder/body-length"},{"name":"http.request.body.size","value":"3"},{"name":"http.response.status_code","value":"200"},{"name":"http.response.body.size","value":"22"}]}{"type":"spanClose","outcome":"ok"}{"type":"spanOpen","name":"fetch","spanId":"0000000000000002"}{"type":"attributes","info":[{"name":"network.protocol.name","value":"http"},{"name":"network.protocol.version","value":"HTTP/1.1"},{"name":"http.request.method","value":"POST"},{"name":"url.full","value":"http://placeholder/body-length"},{"name":"http.response.status_code","value":"200"},{"name":"http.response.body.size","value":"31"}]}{"type":"spanClose","outcome":"ok"}{"type":"spanOpen","name":"scheduled","spanId":"0000000000000003"}{"type":"spanClose","outcome":"ok"}{"type":"spanOpen","name":"scheduled","spanId":"0000000000000004"}{"type":"spanClose","outcome":"ok"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}', - '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"fetch","method":"POST","url":"http://placeholder/body-length","headers":[]}}{"type":"return","info":{"type":"fetch","statusCode":200}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}', - '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"fetch","method":"POST","url":"http://placeholder/body-length","headers":[]}}{"type":"return","info":{"type":"fetch","statusCode":200}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}', + '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"custom"}}{"type":"spanOpen","name":"fetch","spanId":"0000000000000001"}{"type":"attributes","info":[{"name":"network.protocol.name","value":"http"},{"name":"network.protocol.version","value":"HTTP/1.1"},{"name":"http.request.method","value":"POST"},{"name":"url.full","value":"http://placeholder/body-length"},{"name":"http.request.body.size","value":"3"},{"name":"http.response.status_code","value":"200"},{"name":"http.response.body.size","value":"22"}]}{"type":"spanClose","outcome":"ok"}{"type":"spanOpen","name":"fetch","spanId":"0000000000000002"}{"type":"attributes","info":[{"name":"network.protocol.name","value":"http"},{"name":"network.protocol.version","value":"HTTP/1.1"},{"name":"http.request.method","value":"POST"},{"name":"url.full","value":"http://placeholder/body-length"},{"name":"http.response.status_code","value":"200"},{"name":"http.response.body.size","value":"31"}]}{"type":"spanClose","outcome":"ok"}{"type":"spanOpen","name":"scheduled","spanId":"0000000000000003"}{"type":"spanClose","outcome":"ok"}{"type":"spanOpen","name":"scheduled","spanId":"0000000000000004"}{"type":"spanClose","outcome":"ok"}{"type":"spanOpen","name":"fetch","spanId":"0000000000000005"}{"type":"attributes","info":[{"name":"network.protocol.name","value":"http"},{"name":"network.protocol.version","value":"HTTP/1.1"},{"name":"http.request.method","value":"POST"},{"name":"url.full","value":"http://placeholder/consume-body"},{"name":"http.request.body.size","value":"5"},{"name":"http.response.status_code","value":"200"},{"name":"http.response.body.size","value":"16"}]}{"type":"spanClose","outcome":"ok"}{"type":"spanOpen","name":"fetch","spanId":"0000000000000006"}{"type":"attributes","info":[{"name":"network.protocol.name","value":"http"},{"name":"network.protocol.version","value":"HTTP/1.1"},{"name":"http.request.method","value":"GET"},{"name":"url.full","value":"http://placeholder/streaming-response"},{"name":"http.response.status_code","value":"200"},{"name":"http.response.body.size","value":"12"}]}{"type":"spanClose","outcome":"ok"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}', + '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"fetch","method":"POST","url":"http://placeholder/body-length","headers":[]}}{"type":"return","info":{"type":"fetch","statusCode":200}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0,"responseBodySize":22,"requestBodySize":3}', + '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"fetch","method":"POST","url":"http://placeholder/body-length","headers":[]}}{"type":"return","info":{"type":"fetch","statusCode":200}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0,"responseBodySize":31,"requestBodySize":3}', '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"scheduled","scheduledTime":"1970-01-01T00:00:00.000Z","cron":""}}{"type":"return"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}', '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"scheduled","scheduledTime":"1970-01-01T00:00:00.000Z","cron":"* * * * 30"}}{"type":"return"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}', + '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"fetch","method":"POST","url":"http://placeholder/consume-body","headers":[]}}{"type":"return","info":{"type":"fetch","statusCode":200}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0,"responseBodySize":16,"requestBodySize":5}', + '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"fetch","method":"GET","url":"http://placeholder/streaming-response","headers":[]}}{"type":"return","info":{"type":"fetch","statusCode":200}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0,"responseBodySize":12}', '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","entrypoint":"cacheMode","scriptTags":[],"info":{"type":"custom"}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}', '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"fetch","method":"GET","url":"http://placeholder/not-found","headers":[]}}{"type":"return","info":{"type":"fetch","statusCode":404}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}', '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"fetch","method":"GET","url":"http://placeholder/web-socket","headers":[{"name":"upgrade","value":"websocket"}]}}{"type":"exception","name":"Error","message":"The Workers runtime canceled this request because it detected that your Worker\'s code had hung and would never generate a response. Refer to: https://developers.cloudflare.com/workers/observability/errors/"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}', diff --git a/src/workerd/api/trace.c++ b/src/workerd/api/trace.c++ index 972e2cf9cdc..22f48967e11 100644 --- a/src/workerd/api/trace.c++ +++ b/src/workerd/api/trace.c++ @@ -368,12 +368,32 @@ jsg::Ref TraceItem::FetchEventInfo::Request: TraceItem::FetchEventInfo::Response::Response( const Trace& trace, const tracing::FetchResponseInfo& responseInfo) - : status(responseInfo.statusCode) {} + : status(responseInfo.statusCode), + bodySize(trace.responseBodySize), + requestBodySize(trace.requestBodySize) {} uint16_t TraceItem::FetchEventInfo::Response::getStatus() { return status; } +jsg::Optional TraceItem::FetchEventInfo::Response::getBodySize() { + // Return null if bodySize is unknown (kj::none), otherwise return the size. + // Note: Converting uint64_t to double may lose precision for sizes larger than 2^53 bytes + // (approximately 9 petabytes), though this is unlikely in practice. + KJ_IF_SOME(size, bodySize) { + return static_cast(size); + } + return kj::none; +} + +jsg::Optional TraceItem::FetchEventInfo::Response::getRequestBodySize() { + // Return null if requestBodySize is unknown (kj::none), otherwise return the size. + KJ_IF_SOME(size, requestBodySize) { + return static_cast(size); + } + return kj::none; +} + TraceItem::JsRpcEventInfo::JsRpcEventInfo( const Trace& trace, const tracing::JsRpcEventInfo& eventInfo) : rpcMethod(kj::str(eventInfo.methodName)) {} diff --git a/src/workerd/api/trace.h b/src/workerd/api/trace.h index 9cf9e6b50f0..7d513086862 100644 --- a/src/workerd/api/trace.h +++ b/src/workerd/api/trace.h @@ -250,13 +250,19 @@ class TraceItem::FetchEventInfo::Response final: public jsg::Object { explicit Response(const Trace& trace, const tracing::FetchResponseInfo& responseInfo); uint16_t getStatus(); + jsg::Optional getBodySize(); + jsg::Optional getRequestBodySize(); JSG_RESOURCE_TYPE(Response) { JSG_LAZY_READONLY_INSTANCE_PROPERTY(status, getStatus); + JSG_LAZY_READONLY_INSTANCE_PROPERTY(bodySize, getBodySize); + JSG_LAZY_READONLY_INSTANCE_PROPERTY(requestBodySize, getRequestBodySize); } private: uint16_t status; + kj::Maybe bodySize; + kj::Maybe requestBodySize; }; class TraceItem::JsRpcEventInfo final: public jsg::Object { diff --git a/src/workerd/io/trace-stream.c++ b/src/workerd/io/trace-stream.c++ index 8c6045e7c3b..9f088c4422a 100644 --- a/src/workerd/io/trace-stream.c++ +++ b/src/workerd/io/trace-stream.c++ @@ -19,6 +19,8 @@ namespace { V(ALARM, "alarm") \ V(ATTRIBUTES, "attributes") \ V(BATCHSIZE, "batchSize") \ + V(RESPONSEBODYSIZE, "responseBodySize") \ + V(REQUESTBODYSIZE, "requestBodySize") \ V(CANCELED, "canceled") \ V(CHANNEL, "channel") \ V(CFJSON, "cfJson") \ @@ -175,9 +177,10 @@ jsg::JsValue ToJs(jsg::Lock& js, kj::ArrayPtr attributes, Strin } jsg::JsValue ToJs(jsg::Lock& js, const FetchResponseInfo& info, StringCache& cache) { - static const kj::StringPtr keys[] = {TYPE_STR, STATUSCODE_STR}; - jsg::JsValue values[] = {cache.get(js, FETCH_STR), js.num(info.statusCode)}; - return js.obj(kj::arrayPtr(keys), kj::arrayPtr(values)); + auto obj = js.obj(); + obj.set(js, TYPE_STR, cache.get(js, FETCH_STR)); + obj.set(js, STATUSCODE_STR, js.num(info.statusCode)); + return obj; } jsg::JsValue ToJs(jsg::Lock& js, const FetchEventInfo& info, StringCache& cache) { @@ -199,7 +202,6 @@ jsg::JsValue ToJs(jsg::Lock& js, const FetchEventInfo& info, StringCache& cache) obj.set(js, HEADERS_STR, js.arr(info.headers.asPtr(), [&cache, &ToJs](jsg::Lock& js, const auto& header) { return ToJs(js, header, cache); })); - return obj; } @@ -403,7 +405,12 @@ jsg::JsValue ToJs(jsg::Lock& js, const Outcome& outcome, StringCache& cache) { obj.set(js, CPUTIME_STR, js.num(cpuTime)); obj.set(js, WALLTIME_STR, js.num(wallTime)); - + KJ_IF_SOME(size, outcome.responseBodySize) { + obj.set(js, RESPONSEBODYSIZE_STR, js.num(static_cast(size))); + } + KJ_IF_SOME(size, outcome.requestBodySize) { + obj.set(js, REQUESTBODYSIZE_STR, js.num(static_cast(size))); + } return obj; } diff --git a/src/workerd/io/trace-test.c++ b/src/workerd/io/trace-test.c++ index aab27b2ac23..e005e7e5bf8 100644 --- a/src/workerd/io/trace-test.c++ +++ b/src/workerd/io/trace-test.c++ @@ -496,11 +496,63 @@ KJ_TEST("Read/Write Outcome works") { KJ_ASSERT(info2.outcome == EventOutcome::EXCEPTION); KJ_ASSERT(info2.wallTime == 2 * kj::MILLISECONDS); KJ_ASSERT(info2.cpuTime == 1 * kj::MILLISECONDS); + KJ_ASSERT(info2.responseBodySize == kj::none); // Default value (unknown) + KJ_ASSERT(info2.requestBodySize == kj::none); // Default value (unknown) Outcome info3 = info.clone(); KJ_ASSERT(info3.outcome == EventOutcome::EXCEPTION); KJ_ASSERT(info3.wallTime == 2 * kj::MILLISECONDS); KJ_ASSERT(info3.cpuTime == 1 * kj::MILLISECONDS); + KJ_ASSERT(info3.responseBodySize == kj::none); // Default value (unknown) + KJ_ASSERT(info3.requestBodySize == kj::none); // Default value (unknown) +} + +KJ_TEST("Read/Write Outcome with body sizes works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + kj::Maybe responseBodySize = 54321; + kj::Maybe requestBodySize = 12345; + Outcome info(EventOutcome::OK, 1 * kj::MILLISECONDS, 2 * kj::MILLISECONDS, responseBodySize, + requestBodySize); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + Outcome info2(reader); + KJ_ASSERT(info2.outcome == EventOutcome::OK); + KJ_ASSERT(info2.wallTime == 2 * kj::MILLISECONDS); + KJ_ASSERT(info2.cpuTime == 1 * kj::MILLISECONDS); + KJ_ASSERT(KJ_ASSERT_NONNULL(info2.responseBodySize) == 54321); + KJ_ASSERT(KJ_ASSERT_NONNULL(info2.requestBodySize) == 12345); + + Outcome info3 = info.clone(); + KJ_ASSERT(info3.outcome == EventOutcome::OK); + KJ_ASSERT(info3.wallTime == 2 * kj::MILLISECONDS); + KJ_ASSERT(info3.cpuTime == 1 * kj::MILLISECONDS); + KJ_ASSERT(KJ_ASSERT_NONNULL(info3.responseBodySize) == 54321); + KJ_ASSERT(KJ_ASSERT_NONNULL(info3.requestBodySize) == 12345); +} + +KJ_TEST("Read/Write Outcome with zero body sizes works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + // Zero body sizes should be distinguishable from unknown + kj::Maybe responseBodySize = uint64_t{0}; + kj::Maybe requestBodySize = uint64_t{0}; + Outcome info(EventOutcome::OK, 1 * kj::MILLISECONDS, 2 * kj::MILLISECONDS, responseBodySize, + requestBodySize); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + Outcome info2(reader); + KJ_ASSERT(info2.outcome == EventOutcome::OK); + KJ_ASSERT(KJ_ASSERT_NONNULL(info2.responseBodySize) == 0); // Known to be zero, not unknown + KJ_ASSERT(KJ_ASSERT_NONNULL(info2.requestBodySize) == 0); // Known to be zero, not unknown + + Outcome info3 = info.clone(); + KJ_ASSERT(KJ_ASSERT_NONNULL(info3.responseBodySize) == 0); // Known to be zero, not unknown + KJ_ASSERT(KJ_ASSERT_NONNULL(info3.requestBodySize) == 0); // Known to be zero, not unknown } KJ_TEST("Read/Write TailEvent works") { diff --git a/src/workerd/io/trace.c++ b/src/workerd/io/trace.c++ index 8cf6f1d9080..2435aced9f0 100644 --- a/src/workerd/io/trace.c++ +++ b/src/workerd/io/trace.c++ @@ -1280,24 +1280,45 @@ Onset Onset::clone() const { KJ_MAP(attr, attributes) { return attr.clone(); }); } -Outcome::Outcome(EventOutcome outcome, kj::Duration cpuTime, kj::Duration wallTime) +Outcome::Outcome(EventOutcome outcome, + kj::Duration cpuTime, + kj::Duration wallTime, + kj::Maybe responseBodySize, + kj::Maybe requestBodySize) : outcome(outcome), cpuTime(cpuTime), - wallTime(wallTime) {} + wallTime(wallTime), + responseBodySize(responseBodySize), + requestBodySize(requestBodySize) {} Outcome::Outcome(rpc::Trace::Outcome::Reader reader) : outcome(reader.getOutcome()), cpuTime(reader.getCpuTime() * kj::MILLISECONDS), - wallTime(reader.getWallTime() * kj::MILLISECONDS) {} + wallTime(reader.getWallTime() * kj::MILLISECONDS) { + if (reader.getHasResponseBodySize()) { + responseBodySize = reader.getResponseBodySize(); + } + if (reader.getHasRequestBodySize()) { + requestBodySize = reader.getRequestBodySize(); + } +} void Outcome::copyTo(rpc::Trace::Outcome::Builder builder) const { builder.setOutcome(outcome); builder.setCpuTime(cpuTime / kj::MILLISECONDS); builder.setWallTime(wallTime / kj::MILLISECONDS); + KJ_IF_SOME(size, responseBodySize) { + builder.setResponseBodySize(size); + builder.setHasResponseBodySize(true); + } + KJ_IF_SOME(size, requestBodySize) { + builder.setRequestBodySize(size); + builder.setHasRequestBodySize(true); + } } Outcome Outcome::clone() const { - return Outcome(outcome, cpuTime, wallTime); + return Outcome(outcome, cpuTime, wallTime, responseBodySize, requestBodySize); } TailEvent::TailEvent( diff --git a/src/workerd/io/trace.h b/src/workerd/io/trace.h index fdd643a70d1..a65cf35267d 100644 --- a/src/workerd/io/trace.h +++ b/src/workerd/io/trace.h @@ -735,7 +735,11 @@ Onset::Info readOnsetInfo(const rpc::Trace::Onset::Info::Reader& info); void writeOnsetInfo(const tracing::Onset::Info& info, rpc::Trace::Onset::Info::Builder& builder); struct Outcome final { - explicit Outcome(EventOutcome outcome, kj::Duration cpuTime, kj::Duration wallTime); + explicit Outcome(EventOutcome outcome, + kj::Duration cpuTime, + kj::Duration wallTime, + kj::Maybe responseBodySize = kj::none, + kj::Maybe requestBodySize = kj::none); Outcome(rpc::Trace::Outcome::Reader reader); Outcome(Outcome&&) = default; Outcome& operator=(Outcome&&) = default; @@ -744,6 +748,8 @@ struct Outcome final { EventOutcome outcome = EventOutcome::OK; kj::Duration cpuTime; kj::Duration wallTime; + kj::Maybe responseBodySize; + kj::Maybe requestBodySize; void copyTo(rpc::Trace::Outcome::Builder builder) const; Outcome clone() const; @@ -869,6 +875,8 @@ class Trace final: public kj::Refcounted { kj::Duration cpuTime; kj::Duration wallTime; + kj::Maybe responseBodySize; + kj::Maybe requestBodySize; bool truncated = false; bool exceededLogLimit = false; diff --git a/src/workerd/io/tracer.c++ b/src/workerd/io/tracer.c++ index b11314a23d5..51eb231290f 100644 --- a/src/workerd/io/tracer.c++ +++ b/src/workerd/io/tracer.c++ @@ -57,11 +57,14 @@ WorkerTracer::~WorkerTracer() noexcept(false) { } if (isPredictableModeForTest()) { writer->report(spanContext, - tracing::Outcome(trace->outcome, 0 * kj::MILLISECONDS, 0 * kj::MILLISECONDS), + tracing::Outcome(trace->outcome, 0 * kj::MILLISECONDS, 0 * kj::MILLISECONDS, + trace->responseBodySize, trace->requestBodySize), completeTime); } else { writer->report(spanContext, - tracing::Outcome(trace->outcome, trace->cpuTime, trace->wallTime), completeTime); + tracing::Outcome(trace->outcome, trace->cpuTime, trace->wallTime, + trace->responseBodySize, trace->requestBodySize), + completeTime); } } else if (!markedUnused) { // If no span context is available, we have a streaming tail worker set up but shut down the @@ -345,10 +348,20 @@ void WorkerTracer::setEventInfoInternal( } } -void WorkerTracer::setOutcome(EventOutcome outcome, kj::Duration cpuTime, kj::Duration wallTime) { +void WorkerTracer::setOutcome(EventOutcome outcome, + kj::Duration cpuTime, + kj::Duration wallTime, + kj::Maybe responseBodySize, + kj::Maybe requestBodySize) { trace->outcome = outcome; trace->cpuTime = cpuTime; trace->wallTime = wallTime; + if (responseBodySize != kj::none) { + trace->responseBodySize = responseBodySize; + } + if (requestBodySize != kj::none) { + trace->requestBodySize = requestBodySize; + } // Defer reporting the actual outcome event to the WorkerTracer destructor: The outcome is // reported when the metrics request is deallocated, but with ctx.waitUntil() there might be spans @@ -359,6 +372,12 @@ void WorkerTracer::setOutcome(EventOutcome outcome, kj::Duration cpuTime, kj::Du // fixed size. } +void WorkerTracer::setBodySizes( + kj::Maybe responseBodySize, kj::Maybe requestBodySize) { + trace->responseBodySize = responseBodySize; + trace->requestBodySize = requestBodySize; +} + void WorkerTracer::recordTimestamp(kj::Date timestamp) { if (completeTime == kj::UNIX_EPOCH) { completeTime = timestamp; diff --git a/src/workerd/io/tracer.h b/src/workerd/io/tracer.h index cc1efefc14b..5d60071c1f9 100644 --- a/src/workerd/io/tracer.h +++ b/src/workerd/io/tracer.h @@ -58,7 +58,14 @@ class BaseTracer: public kj::Refcounted { // Reports the outcome event of the worker invocation. For Streaming Tail Worker, this will be the // final event, causing the stream to terminate. - virtual void setOutcome(EventOutcome outcome, kj::Duration cpuTime, kj::Duration wallTime) = 0; + virtual void setOutcome(EventOutcome outcome, + kj::Duration cpuTime, + kj::Duration wallTime, + kj::Maybe responseBodySize = kj::none, + kj::Maybe requestBodySize = kj::none) = 0; + + virtual void setBodySizes( + kj::Maybe responseBodySize, kj::Maybe requestBodySize) = 0; // Report time as seen from the incoming Request when the request is complete, since it will not // be available afterwards. @@ -146,7 +153,13 @@ class WorkerTracer final: public BaseTracer { void setEventInfoInternal( const tracing::InvocationSpanContext& context, kj::Date timestamp, tracing::EventInfo&& info); - void setOutcome(EventOutcome outcome, kj::Duration cpuTime, kj::Duration wallTime) override; + void setOutcome(EventOutcome outcome, + kj::Duration cpuTime, + kj::Duration wallTime, + kj::Maybe responseBodySize = kj::none, + kj::Maybe requestBodySize = kj::none) override; + void setBodySizes( + kj::Maybe responseBodySize, kj::Maybe requestBodySize) override; virtual void recordTimestamp(kj::Date timestamp) override; // Set a worker-level tag/attribute to be provided in the onset event. diff --git a/src/workerd/io/worker-entrypoint.c++ b/src/workerd/io/worker-entrypoint.c++ index 655b5e8a1fc..8b5e1f597b6 100644 --- a/src/workerd/io/worker-entrypoint.c++ +++ b/src/workerd/io/worker-entrypoint.c++ @@ -25,6 +25,10 @@ namespace workerd { namespace { + +struct ResponseBodyByteCounter; +struct RequestBodyByteCounter; + // Wrapper around a Worker that handles receiving a new event from the outside. In particular, // this handles: // - Creating a IoContext and making it current. @@ -98,6 +102,12 @@ class WorkerEntrypoint final: public WorkerInterface { bool loggedExceptionEarlier = false; kj::Maybe> abortController; + // For deferred trace reporting after response body streaming completes + kj::Maybe> traceByteCounter; + kj::Maybe> traceRequestByteCounter; + kj::Maybe deferredWorkerTracer; + uint deferredHttpResponseStatus = 0; + void init(kj::Own worker, kj::Maybe> actor, kj::Own limitEnforcer, @@ -125,11 +135,103 @@ class WorkerEntrypoint final: public WorkerInterface { kj::Maybe cfBlobJson); }; +// A refcounted byte counter that can be shared between the stream wrapper and +// the code that needs to read the final byte count after streaming completes. +// Note: kj I/O objects are single-threaded, so no atomic operations are needed. +struct ResponseBodyByteCounter: public kj::Refcounted { + uint64_t bytesWritten = 0; + + void add(size_t bytes) { + bytesWritten += bytes; + } + + uint64_t get() const { + return bytesWritten; + } +}; + +// A refcounted byte counter for tracking request body bytes read. +struct RequestBodyByteCounter: public kj::Refcounted { + uint64_t bytesRead = 0; + + void add(size_t bytes) { + bytesRead += bytes; + } + + uint64_t get() const { + return bytesRead; + } +}; + +class ByteCountingOutputStream final: public kj::AsyncOutputStream { + public: + ByteCountingOutputStream( + kj::Own inner, kj::Own counter) + : inner(kj::mv(inner)), + counter(kj::mv(counter)) {} + + kj::Promise write(kj::ArrayPtr buffer) override { + counter->add(buffer.size()); + return inner->write(buffer); + } + + kj::Promise write(kj::ArrayPtr> pieces) override { + for (const auto& piece: pieces) { + counter->add(piece.size()); + } + return inner->write(pieces); + } + + kj::Promise whenWriteDisconnected() override { + return inner->whenWriteDisconnected(); + } + + // We intentionally don't override tryPumpFrom() because we need to count bytes + // and the optimized pump paths don't give us access to the byte count. + // Returning kj::none causes fallback to the regular write() path. + + private: + kj::Own inner; + kj::Own counter; +}; + +class ByteCountingInputStream final: public kj::AsyncInputStream { + public: + ByteCountingInputStream(kj::AsyncInputStream& inner, kj::Own counter) + : inner(inner), + counter(kj::mv(counter)) {} + + kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { + return inner.tryRead(buffer, minBytes, maxBytes).then([this](size_t bytesRead) { + counter->add(bytesRead); + return bytesRead; + }); + } + + kj::Maybe tryGetLength() override { + return inner.tryGetLength(); + } + + kj::Promise pumpTo(kj::AsyncOutputStream& output, uint64_t amount) override { + // We can't use the optimized pumpTo from the inner stream because we need + // to count the bytes. Instead, we use the default implementation which + // calls tryRead() in a loop, allowing us to count each chunk. + return kj::AsyncInputStream::pumpTo(output, amount); + } + + private: + kj::AsyncInputStream& inner; + kj::Own counter; +}; + // Simple wrapper around `HttpService::Response` to let us know if the response was sent -// already. +// already. Also optionally wraps the output stream with byte counting for trace events +// when tracing is enabled. class WorkerEntrypoint::ResponseSentTracker final: public kj::HttpService::Response { public: - ResponseSentTracker(kj::HttpService::Response& inner): inner(inner) {} + ResponseSentTracker(kj::HttpService::Response& inner, bool enableByteCount) + : inner(inner), + enableByteCount(enableByteCount) {} KJ_DISALLOW_COPY_AND_MOVE(ResponseSentTracker); bool isSent() const { @@ -138,6 +240,16 @@ class WorkerEntrypoint::ResponseSentTracker final: public kj::HttpService::Respo uint getHttpResponseStatus() const { return httpResponseStatus; } + kj::Maybe getExpectedBodySize() const { + return expectedBodySize; + } + + // Transfers ownership of the byte counter for deferred access after streaming. + // Returns kj::none if byte counting is not enabled or if already called. + // The returned ownership should be held until after proxyTask completes. + kj::Maybe> takeByteCounter() { + return kj::mv(byteCounter); + } kj::Own send(uint statusCode, kj::StringPtr statusText, @@ -147,7 +259,18 @@ class WorkerEntrypoint::ResponseSentTracker final: public kj::HttpService::Respo "workerd", "WorkerEntrypoint::ResponseSentTracker::send()", "statusCode", statusCode); sent = true; httpResponseStatus = statusCode; - return inner.send(statusCode, statusText, headers, expectedBodySize); + this->expectedBodySize = expectedBodySize; + + auto stream = inner.send(statusCode, statusText, headers, expectedBodySize); + + // Only wrap with byte counting when tracing is enabled to avoid performance + // overhead for workers without tail workers. + if (enableByteCount) { + auto counter = kj::refcounted(); + byteCounter = kj::addRef(*counter); + return kj::heap(kj::mv(stream), kj::mv(counter)); + } + return stream; } kj::Own acceptWebSocket(const kj::HttpHeaders& headers) override { @@ -158,8 +281,11 @@ class WorkerEntrypoint::ResponseSentTracker final: public kj::HttpService::Respo private: uint httpResponseStatus = 0; + kj::Maybe expectedBodySize; + kj::Maybe> byteCounter; kj::HttpService::Response& inner; bool sent = false; + bool enableByteCount = false; }; kj::Own WorkerEntrypoint::construct(ThreadContext& threadContext, @@ -254,7 +380,9 @@ kj::Promise WorkerEntrypoint::request(kj::HttpMethod method, this->incomingRequest = kj::none; auto& context = incomingRequest->getContext(); - auto wrappedResponse = kj::heap(response); + // Enable byte counting only when tracing is enabled to avoid performance overhead. + bool hasTracer = incomingRequest->getWorkerTracer() != kj::none; + auto wrappedResponse = kj::heap(response, hasTracer); bool isActor = context.getActor() != kj::none; // HACK: Capture workerTracer directly, it's unclear how to acquire the right tracer from context @@ -299,8 +427,24 @@ kj::Promise WorkerEntrypoint::request(kj::HttpMethod method, TRACE_EVENT_BEGIN("workerd", "WorkerEntrypoint::request() waiting on context", PERFETTO_TRACK_FROM_POINTER(&context), PERFETTO_FLOW_FROM_POINTER(this)); + // Create request body byte counter if tracing is enabled. + // We wrap the request body stream to count actual bytes consumed. + kj::Maybe> wrappedRequestBody; + if (hasTracer) { + auto counter = kj::refcounted(); + traceRequestByteCounter = kj::addRef(*counter); + wrappedRequestBody = kj::heap(requestBody, kj::mv(counter)); + } + + // Get reference to the appropriate request body stream to pass to handler. + // If tracing is enabled, use the wrapped stream; otherwise use the original. + kj::AsyncInputStream& requestBodyForHandler = + wrappedRequestBody.map([](auto& wrapped) -> kj::AsyncInputStream& { + return *wrapped; + }).orDefault(requestBody); + return context - .run([this, &context, method, url, &headers, &requestBody, + .run([this, &context, method, url, &headers, &requestBodyForHandler, &metrics = incomingRequest->getMetrics(), &wrappedResponse = *wrappedResponse, entrypointName = entrypointName](Worker::Lock& lock) mutable { TRACE_EVENT_END("workerd", PERFETTO_TRACK_FROM_POINTER(&context)); @@ -319,22 +463,20 @@ kj::Promise WorkerEntrypoint::request(kj::HttpMethod method, ->getSignal()); } - return lock.getGlobalScope().request(method, url, headers, requestBody, wrappedResponse, - cfBlobJson, lock, + return lock.getGlobalScope().request(method, url, headers, requestBodyForHandler, + wrappedResponse, cfBlobJson, lock, lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor()), kj::mv(signal)); }) - .then([this, &context, &wrappedResponse = *wrappedResponse, workerTracer]( + .then([this, &wrappedResponse = *wrappedResponse, workerTracer]( api::DeferredProxy deferredProxy) { TRACE_EVENT("workerd", "WorkerEntrypoint::request() deferred proxy step", PERFETTO_FLOW_FROM_POINTER(this)); proxyTask = kj::mv(deferredProxy.proxyTask); + KJ_IF_SOME(t, workerTracer) { - auto httpResponseStatus = wrappedResponse.getHttpResponseStatus(); - if (httpResponseStatus != 0) { - t.setReturn(context.now(), tracing::FetchResponseInfo(httpResponseStatus)); - } else { - t.setReturn(context.now()); - } + deferredWorkerTracer = t; + deferredHttpResponseStatus = wrappedResponse.getHttpResponseStatus(); + traceByteCounter = wrappedResponse.takeByteCounter(); } }) .catch_([this, &context](kj::Exception&& exception) mutable -> kj::Promise { @@ -389,11 +531,50 @@ kj::Promise WorkerEntrypoint::request(kj::HttpMethod method, // Now that the IoContext is dropped (unless it had waitUntil()s), we can finish proxying // without pinning it or the isolate into memory. KJ_IF_SOME(p, proxyTask) { - return p.catch_([metrics = kj::mv(metrics)](kj::Exception&& e) mutable -> kj::Promise { + return p + .then([this]() { + KJ_IF_SOME(t, deferredWorkerTracer) { + if (deferredHttpResponseStatus != 0) { + t.setReturn(kj::none, tracing::FetchResponseInfo(deferredHttpResponseStatus)); + } else { + t.setReturn(kj::none); + } + kj::Maybe responseBodySize; + KJ_IF_SOME(counter, traceByteCounter) { + responseBodySize = counter->get(); + } + kj::Maybe requestBodySize; + KJ_IF_SOME(counter, traceRequestByteCounter) { + requestBodySize = counter->get(); + } + t.setBodySizes(responseBodySize, requestBodySize); + } + deferredWorkerTracer = kj::none; + traceByteCounter = kj::none; + traceRequestByteCounter = kj::none; + }).catch_([this, metrics = kj::mv(metrics)](kj::Exception&& e) mutable -> kj::Promise { + deferredWorkerTracer = kj::none; + traceByteCounter = kj::none; + traceRequestByteCounter = kj::none; metrics->reportFailure(e, RequestObserver::FailureSource::DEFERRED_PROXY); return kj::mv(e); }); } else { + KJ_IF_SOME(t, deferredWorkerTracer) { + if (deferredHttpResponseStatus != 0) { + t.setReturn(kj::none, tracing::FetchResponseInfo(deferredHttpResponseStatus)); + } else { + t.setReturn(kj::none); + } + kj::Maybe requestBodySize; + KJ_IF_SOME(counter, traceRequestByteCounter) { + requestBodySize = counter->get(); + } + t.setBodySizes(kj::none, requestBodySize); + } + deferredWorkerTracer = kj::none; + traceByteCounter = kj::none; + traceRequestByteCounter = kj::none; return kj::READY_NOW; } }) @@ -401,7 +582,8 @@ kj::Promise WorkerEntrypoint::request(kj::HttpMethod method, // If we're being cancelled, we need to make sure `proxyTask` gets canceled. proxyTask = kj::none; })) - .catch_([this, wrappedResponse = kj::mv(wrappedResponse), isActor, method, url, &headers, + .catch_([this, wrappedResponse = kj::mv(wrappedResponse), + wrappedRequestBody = kj::mv(wrappedRequestBody), isActor, method, url, &headers, &requestBody, metrics = kj::mv(metricsForCatch), workerTracer](kj::Exception&& exception) mutable -> kj::Promise { // Don't return errors to end user. diff --git a/src/workerd/io/worker-interface.capnp b/src/workerd/io/worker-interface.capnp index 027bec0f0bd..b0c8ebf5ed6 100644 --- a/src/workerd/io/worker-interface.capnp +++ b/src/workerd/io/worker-interface.capnp @@ -268,6 +268,10 @@ struct Trace @0x8e8d911203762d34 { outcome @0 :EventOutcome; cpuTime @1 :UInt64; wallTime @2 :UInt64; + responseBodySize @3 :UInt64; + hasResponseBodySize @4 :Bool; + requestBodySize @5 :UInt64; + hasRequestBodySize @6 :Bool; } struct TailEvent {