Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions src/workerd/api/tests/http-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,20 @@ export default {
if (pathname === '/body-length') {
return Response.json(Object.fromEntries(request.headers));
}
if (pathname === '/consume-body') {
const body = await request.text();
return new Response(`Received ${body.length} bytes`);
}
if (pathname === '/streaming-response') {
const stream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode('chunk1'));
controller.enqueue(new TextEncoder().encode('chunk2'));
controller.close();
},
});
return new Response(stream);
}
if (pathname === '/web-socket') {
const pair = new WebSocketPair();
pair[0].addEventListener('message', (event) => {
Expand Down Expand Up @@ -89,6 +103,28 @@ export default {
assert.strictEqual(scheduledLastCtrl.scheduledTime, 1000);
assert.strictEqual(scheduledLastCtrl.cron, '* * * * 30');
}

{
const response = await env.SERVICE.fetch(
'http://placeholder/consume-body',
{
method: 'POST',
body: 'hello',
}
);
const text = await response.text();
assert.strictEqual(text, 'Received 5 bytes');
}

{
const response = await env.SERVICE.fetch(
'http://placeholder/streaming-response'
);
const text = await response.text();
// "chunk1" + "chunk2" = 12 bytes
assert.strictEqual(text, 'chunk1chunk2');
assert.strictEqual(text.length, 12);
}
},
};

Expand Down
8 changes: 5 additions & 3 deletions src/workerd/api/tests/tail-worker-test.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 21 additions & 1 deletion src/workerd/api/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -368,12 +368,32 @@ jsg::Ref<TraceItem::FetchEventInfo::Request> TraceItem::FetchEventInfo::Request:

TraceItem::FetchEventInfo::Response::Response(
const Trace& trace, const tracing::FetchResponseInfo& responseInfo)
: status(responseInfo.statusCode) {}
: status(responseInfo.statusCode),
bodySize(trace.responseBodySize),
requestBodySize(trace.requestBodySize) {}

uint16_t TraceItem::FetchEventInfo::Response::getStatus() {
return status;
}

jsg::Optional<double> TraceItem::FetchEventInfo::Response::getBodySize() {
// Return null if bodySize is unknown (kj::none), otherwise return the size.
// Note: Converting uint64_t to double may lose precision for sizes larger than 2^53 bytes
// (approximately 9 petabytes), though this is unlikely in practice.
KJ_IF_SOME(size, bodySize) {
return static_cast<double>(size);
}
return kj::none;
}

jsg::Optional<double> TraceItem::FetchEventInfo::Response::getRequestBodySize() {
// Return null if requestBodySize is unknown (kj::none), otherwise return the size.
KJ_IF_SOME(size, requestBodySize) {
return static_cast<double>(size);
}
return kj::none;
}

TraceItem::JsRpcEventInfo::JsRpcEventInfo(
const Trace& trace, const tracing::JsRpcEventInfo& eventInfo)
: rpcMethod(kj::str(eventInfo.methodName)) {}
Expand Down
6 changes: 6 additions & 0 deletions src/workerd/api/trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,19 @@ class TraceItem::FetchEventInfo::Response final: public jsg::Object {
explicit Response(const Trace& trace, const tracing::FetchResponseInfo& responseInfo);

uint16_t getStatus();
jsg::Optional<double> getBodySize();
jsg::Optional<double> getRequestBodySize();

JSG_RESOURCE_TYPE(Response) {
JSG_LAZY_READONLY_INSTANCE_PROPERTY(status, getStatus);
JSG_LAZY_READONLY_INSTANCE_PROPERTY(bodySize, getBodySize);
JSG_LAZY_READONLY_INSTANCE_PROPERTY(requestBodySize, getRequestBodySize);
}

private:
uint16_t status;
kj::Maybe<uint64_t> bodySize;
kj::Maybe<uint64_t> requestBodySize;
};

class TraceItem::JsRpcEventInfo final: public jsg::Object {
Expand Down
17 changes: 12 additions & 5 deletions src/workerd/io/trace-stream.c++
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ namespace {
V(ALARM, "alarm") \
V(ATTRIBUTES, "attributes") \
V(BATCHSIZE, "batchSize") \
V(RESPONSEBODYSIZE, "responseBodySize") \
V(REQUESTBODYSIZE, "requestBodySize") \
V(CANCELED, "canceled") \
V(CHANNEL, "channel") \
V(CFJSON, "cfJson") \
Expand Down Expand Up @@ -175,9 +177,10 @@ jsg::JsValue ToJs(jsg::Lock& js, kj::ArrayPtr<const Attribute> attributes, Strin
}

jsg::JsValue ToJs(jsg::Lock& js, const FetchResponseInfo& info, StringCache& cache) {
static const kj::StringPtr keys[] = {TYPE_STR, STATUSCODE_STR};
jsg::JsValue values[] = {cache.get(js, FETCH_STR), js.num(info.statusCode)};
return js.obj(kj::arrayPtr(keys), kj::arrayPtr(values));
auto obj = js.obj();
obj.set(js, TYPE_STR, cache.get(js, FETCH_STR));
obj.set(js, STATUSCODE_STR, js.num(info.statusCode));
return obj;
}

jsg::JsValue ToJs(jsg::Lock& js, const FetchEventInfo& info, StringCache& cache) {
Expand All @@ -199,7 +202,6 @@ jsg::JsValue ToJs(jsg::Lock& js, const FetchEventInfo& info, StringCache& cache)
obj.set(js, HEADERS_STR,
js.arr(info.headers.asPtr(),
[&cache, &ToJs](jsg::Lock& js, const auto& header) { return ToJs(js, header, cache); }));

return obj;
}

Expand Down Expand Up @@ -403,7 +405,12 @@ jsg::JsValue ToJs(jsg::Lock& js, const Outcome& outcome, StringCache& cache) {

obj.set(js, CPUTIME_STR, js.num(cpuTime));
obj.set(js, WALLTIME_STR, js.num(wallTime));

KJ_IF_SOME(size, outcome.responseBodySize) {
obj.set(js, RESPONSEBODYSIZE_STR, js.num(static_cast<double>(size)));
}
KJ_IF_SOME(size, outcome.requestBodySize) {
obj.set(js, REQUESTBODYSIZE_STR, js.num(static_cast<double>(size)));
}
return obj;
}

Expand Down
52 changes: 52 additions & 0 deletions src/workerd/io/trace-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -496,11 +496,63 @@ KJ_TEST("Read/Write Outcome works") {
KJ_ASSERT(info2.outcome == EventOutcome::EXCEPTION);
KJ_ASSERT(info2.wallTime == 2 * kj::MILLISECONDS);
KJ_ASSERT(info2.cpuTime == 1 * kj::MILLISECONDS);
KJ_ASSERT(info2.responseBodySize == kj::none); // Default value (unknown)
KJ_ASSERT(info2.requestBodySize == kj::none); // Default value (unknown)

Outcome info3 = info.clone();
KJ_ASSERT(info3.outcome == EventOutcome::EXCEPTION);
KJ_ASSERT(info3.wallTime == 2 * kj::MILLISECONDS);
KJ_ASSERT(info3.cpuTime == 1 * kj::MILLISECONDS);
KJ_ASSERT(info3.responseBodySize == kj::none); // Default value (unknown)
KJ_ASSERT(info3.requestBodySize == kj::none); // Default value (unknown)
}

KJ_TEST("Read/Write Outcome with body sizes works") {
capnp::MallocMessageBuilder builder;
auto infoBuilder = builder.initRoot<rpc::Trace::Outcome>();

kj::Maybe<uint64_t> responseBodySize = 54321;
kj::Maybe<uint64_t> requestBodySize = 12345;
Outcome info(EventOutcome::OK, 1 * kj::MILLISECONDS, 2 * kj::MILLISECONDS, responseBodySize,
requestBodySize);
info.copyTo(infoBuilder);

auto reader = infoBuilder.asReader();
Outcome info2(reader);
KJ_ASSERT(info2.outcome == EventOutcome::OK);
KJ_ASSERT(info2.wallTime == 2 * kj::MILLISECONDS);
KJ_ASSERT(info2.cpuTime == 1 * kj::MILLISECONDS);
KJ_ASSERT(KJ_ASSERT_NONNULL(info2.responseBodySize) == 54321);
KJ_ASSERT(KJ_ASSERT_NONNULL(info2.requestBodySize) == 12345);

Outcome info3 = info.clone();
KJ_ASSERT(info3.outcome == EventOutcome::OK);
KJ_ASSERT(info3.wallTime == 2 * kj::MILLISECONDS);
KJ_ASSERT(info3.cpuTime == 1 * kj::MILLISECONDS);
KJ_ASSERT(KJ_ASSERT_NONNULL(info3.responseBodySize) == 54321);
KJ_ASSERT(KJ_ASSERT_NONNULL(info3.requestBodySize) == 12345);
}

KJ_TEST("Read/Write Outcome with zero body sizes works") {
capnp::MallocMessageBuilder builder;
auto infoBuilder = builder.initRoot<rpc::Trace::Outcome>();

// Zero body sizes should be distinguishable from unknown
kj::Maybe<uint64_t> responseBodySize = uint64_t{0};
kj::Maybe<uint64_t> requestBodySize = uint64_t{0};
Outcome info(EventOutcome::OK, 1 * kj::MILLISECONDS, 2 * kj::MILLISECONDS, responseBodySize,
requestBodySize);
info.copyTo(infoBuilder);

auto reader = infoBuilder.asReader();
Outcome info2(reader);
KJ_ASSERT(info2.outcome == EventOutcome::OK);
KJ_ASSERT(KJ_ASSERT_NONNULL(info2.responseBodySize) == 0); // Known to be zero, not unknown
KJ_ASSERT(KJ_ASSERT_NONNULL(info2.requestBodySize) == 0); // Known to be zero, not unknown

Outcome info3 = info.clone();
KJ_ASSERT(KJ_ASSERT_NONNULL(info3.responseBodySize) == 0); // Known to be zero, not unknown
KJ_ASSERT(KJ_ASSERT_NONNULL(info3.requestBodySize) == 0); // Known to be zero, not unknown
}

KJ_TEST("Read/Write TailEvent works") {
Expand Down
29 changes: 25 additions & 4 deletions src/workerd/io/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1280,24 +1280,45 @@ Onset Onset::clone() const {
KJ_MAP(attr, attributes) { return attr.clone(); });
}

Outcome::Outcome(EventOutcome outcome, kj::Duration cpuTime, kj::Duration wallTime)
Outcome::Outcome(EventOutcome outcome,
kj::Duration cpuTime,
kj::Duration wallTime,
kj::Maybe<uint64_t> responseBodySize,
kj::Maybe<uint64_t> requestBodySize)
: outcome(outcome),
cpuTime(cpuTime),
wallTime(wallTime) {}
wallTime(wallTime),
responseBodySize(responseBodySize),
requestBodySize(requestBodySize) {}

Outcome::Outcome(rpc::Trace::Outcome::Reader reader)
: outcome(reader.getOutcome()),
cpuTime(reader.getCpuTime() * kj::MILLISECONDS),
wallTime(reader.getWallTime() * kj::MILLISECONDS) {}
wallTime(reader.getWallTime() * kj::MILLISECONDS) {
if (reader.getHasResponseBodySize()) {
responseBodySize = reader.getResponseBodySize();
}
if (reader.getHasRequestBodySize()) {
requestBodySize = reader.getRequestBodySize();
}
}

void Outcome::copyTo(rpc::Trace::Outcome::Builder builder) const {
builder.setOutcome(outcome);
builder.setCpuTime(cpuTime / kj::MILLISECONDS);
builder.setWallTime(wallTime / kj::MILLISECONDS);
KJ_IF_SOME(size, responseBodySize) {
builder.setResponseBodySize(size);
builder.setHasResponseBodySize(true);
}
KJ_IF_SOME(size, requestBodySize) {
builder.setRequestBodySize(size);
builder.setHasRequestBodySize(true);
}
}

Outcome Outcome::clone() const {
return Outcome(outcome, cpuTime, wallTime);
return Outcome(outcome, cpuTime, wallTime, responseBodySize, requestBodySize);
}

TailEvent::TailEvent(
Expand Down
10 changes: 9 additions & 1 deletion src/workerd/io/trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,11 @@ Onset::Info readOnsetInfo(const rpc::Trace::Onset::Info::Reader& info);
void writeOnsetInfo(const tracing::Onset::Info& info, rpc::Trace::Onset::Info::Builder& builder);

struct Outcome final {
explicit Outcome(EventOutcome outcome, kj::Duration cpuTime, kj::Duration wallTime);
explicit Outcome(EventOutcome outcome,
kj::Duration cpuTime,
kj::Duration wallTime,
kj::Maybe<uint64_t> responseBodySize = kj::none,
kj::Maybe<uint64_t> requestBodySize = kj::none);
Outcome(rpc::Trace::Outcome::Reader reader);
Outcome(Outcome&&) = default;
Outcome& operator=(Outcome&&) = default;
Expand All @@ -744,6 +748,8 @@ struct Outcome final {
EventOutcome outcome = EventOutcome::OK;
kj::Duration cpuTime;
kj::Duration wallTime;
kj::Maybe<uint64_t> responseBodySize;
kj::Maybe<uint64_t> requestBodySize;

void copyTo(rpc::Trace::Outcome::Builder builder) const;
Outcome clone() const;
Expand Down Expand Up @@ -869,6 +875,8 @@ class Trace final: public kj::Refcounted {

kj::Duration cpuTime;
kj::Duration wallTime;
kj::Maybe<uint64_t> responseBodySize;
kj::Maybe<uint64_t> requestBodySize;

bool truncated = false;
bool exceededLogLimit = false;
Expand Down
Loading
Loading