Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 43 additions & 6 deletions src/workerd/io/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1516,13 +1516,49 @@ CompleteSpan::CompleteSpan(rpc::UserSpanData::Reader reader)
}
}

CompleteSpan CompleteSpan::clone() const {
CompleteSpan copy(spanId, parentSpanId, operationName.clone(), startTime, endTime);
copy.tags.reserve(tags.size());
SpanOpenData::SpanOpenData(rpc::SpanOpenData::Reader reader)
: spanId(reader.getSpanId()),
parentSpanId(reader.getParentSpanId()),
operationName(kj::str(reader.getOperationName())),
startTime(kj::UNIX_EPOCH + reader.getStartTimeNs() * kj::NANOSECONDS) {}

void SpanOpenData::copyTo(rpc::SpanOpenData::Builder builder) const {
builder.setOperationName(operationName.asPtr());
builder.setStartTimeNs((startTime - kj::UNIX_EPOCH) / kj::NANOSECONDS);
builder.setSpanId(spanId);
builder.setParentSpanId(parentSpanId);
}

SpanEndData::SpanEndData(CompleteSpan&& span)
: spanId(span.spanId),
startTime(span.startTime),
endTime(span.endTime),
tags(kj::mv(span.tags)) {}

SpanEndData::SpanEndData(rpc::SpanEndData::Reader reader)
: spanId(reader.getSpanId()),
startTime(kj::UNIX_EPOCH + reader.getStartTimeNs() * kj::NANOSECONDS),
endTime(kj::UNIX_EPOCH + reader.getEndTimeNs() * kj::NANOSECONDS) {
auto tagsParam = reader.getTags();
tags.reserve(tagsParam.size());
for (auto tagParam: tagsParam) {
tags.insert(kj::ConstString(kj::heapString(tagParam.getKey())),
deserializeTagValue(tagParam.getValue()));
}
}

void SpanEndData::copyTo(rpc::SpanEndData::Builder builder) const {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll notice that this is also duplicated with CompleteSpan, but that will go away in the follow-up as discussed.

builder.setStartTimeNs((startTime - kj::UNIX_EPOCH) / kj::NANOSECONDS);
builder.setEndTimeNs((endTime - kj::UNIX_EPOCH) / kj::NANOSECONDS);
builder.setSpanId(spanId);

auto tagsParam = builder.initTags(tags.size());
auto i = 0;
for (auto& tag: tags) {
copy.tags.insert(tag.key.clone(), spanTagClone(tag.value));
auto tagParam = tagsParam[i++];
tagParam.setKey(tag.key.asPtr());
serializeTagValue(tagParam.initValue(), tag.value);
}
return copy;
}
} // namespace tracing

Expand All @@ -1534,7 +1570,8 @@ SpanBuilder::SpanBuilder(kj::Maybe<kj::Own<SpanObserver>> observer,
KJ_IF_SOME(obs, observer) {
// TODO(o11y): Once we report the user tracing spanOpen event as soon as a span is created, we
// should be able to fold this virtual call and just get the timestamp directly.
span.emplace(kj::mv(operationName), startTime.orDefault(obs->getTime()));
kj::Date time = startTime.orDefault([&]() { return obs->getTime(); });
span.emplace(kj::mv(operationName), time);
this->observer = kj::mv(obs);
}
}
Expand Down
25 changes: 25 additions & 0 deletions src/workerd/io/trace.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,28 @@ struct UserSpanData {
parentSpanId @5 :UInt64;
}

struct SpanOpenData {
# Representation of a SpanOpen event, created when a user span is opened.
operationName @0 :Text;

startTimeNs @1 :Int64;
# Nanoseconds since Unix epoch

spanId @2 :UInt64;
parentSpanId @3 :UInt64;
}

struct SpanEndData {
# Representation of an event that indicates completion of a user span. This information is
# provided to the streaming tail worker in the Attributes and SpanClose events.

# TODO(cleanup): startTimeNs is merely used as a fallback timestamp, consider obsoleting it.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only used as a fallback timestamp in the case of errors, but @mar-cf urged me to still support it.

startTimeNs @0 :Int64;
# Nanoseconds since Unix epoch
endTimeNs @1 :Int64;
# Nanoseconds since Unix epoch

# List of span attributes
tags @2 :List(Tag);
spanId @3 :UInt64;
}
50 changes: 47 additions & 3 deletions src/workerd/io/trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,6 @@ struct CompleteSpan {

CompleteSpan(rpc::UserSpanData::Reader reader);
void copyTo(rpc::UserSpanData::Builder builder) const;
CompleteSpan clone() const;
explicit CompleteSpan(tracing::SpanId spanId,
tracing::SpanId parentSpanId,
kj::ConstString operationName,
Expand All @@ -646,7 +645,50 @@ struct CompleteSpan {
startTime(startTime),
endTime(endTime),
tags(kj::mv(tags)) {}
kj::String toString() const;
};

struct SpanOpenData {
// Represents the data needed for a SpanOpen event
tracing::SpanId spanId;
tracing::SpanId parentSpanId;

kj::ConstString operationName;
kj::Date startTime;

SpanOpenData(rpc::SpanOpenData::Reader reader);
void copyTo(rpc::SpanOpenData::Builder builder) const;
explicit SpanOpenData(tracing::SpanId spanId,
tracing::SpanId parentSpanId,
kj::ConstString operationName,
kj::Date startTime)
: spanId(spanId),
parentSpanId(parentSpanId),
operationName(kj::mv(operationName)),
startTime(startTime) {}
};

struct SpanEndData {
// Represents the data needed when closing a span, including the Attributes and SpanClose events.
tracing::SpanId spanId;

kj::Date startTime;
kj::Date endTime;
// Should be Span::TagMap, but we can't forward-declare that.
kj::HashMap<kj::ConstString, tracing::Attribute::Value> tags;

// Convert CompleteSpan to SpanEndData
explicit SpanEndData(CompleteSpan&& span);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will also no longer be needed with the follow-up.

SpanEndData(rpc::SpanEndData::Reader reader);
void copyTo(rpc::SpanEndData::Builder builder) const;
explicit SpanEndData(tracing::SpanId spanId,
kj::Date startTime,
kj::Date endTime,
kj::HashMap<kj::ConstString, tracing::Attribute::Value> tags =
kj::HashMap<kj::ConstString, tracing::Attribute::Value>())
: spanId(spanId),
startTime(startTime),
endTime(endTime),
tags(kj::mv(tags)) {}
};

// A Return mark is used to mark the point at which a span operation returned
Expand Down Expand Up @@ -1112,8 +1154,10 @@ class SpanObserver: public kj::Refcounted {

// Report the span data. Called at the end of the span.
//
// This should always be called exactly once per observer.
// This should always be called exactly once per observer at span completion time.
virtual void report(const Span& span) = 0;
Copy link
Contributor Author

@fhanau fhanau Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For user tracing, report() will be used to only transmit the span end data in the follow-up. For internal tracing, this will continue to be used to transmit the full span. reportStart() is not yet used and will only used in the user tracing framework (no-op otherwise).

// Report information about the span onset.
virtual void reportStart(kj::ConstString operationName, kj::Date startTime) = 0;

// The current time to be provided for the span. For user tracing, we will override this to
// provide I/O time. This *requires* that spans are only created when an IOContext is available
Expand Down
106 changes: 88 additions & 18 deletions src/workerd/io/tracer.c++
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,37 @@ void WorkerTracer::addLog(const tracing::InvocationSpanContext& context,
}

void WorkerTracer::addSpan(tracing::CompleteSpan&& span) {
// This is where we'll actually encode the span.
// The span information is not transmitted via RPC at this point, we can decompose the span into
// spanOpen/spanEnd.
addSpanOpen(span.spanId, span.parentSpanId, kj::mv(span.operationName), span.startTime);
tracing::SpanEndData spanEnd(kj::mv(span));
addSpanEnd(kj::mv(spanEnd));
}

void WorkerTracer::addSpanOpen(tracing::SpanId spanId,
tracing::SpanId parentSpanId,
kj::ConstString operationName,
kj::Date startTime) {
if (pipelineLogLevel == PipelineLogLevel::NONE) {
return;
}

auto& tailStreamWriter = KJ_UNWRAP_OR_RETURN(maybeTailStreamWriter);
auto& topLevelContext = KJ_ASSERT_NONNULL(topLevelInvocationSpanContext);
// Compose SpanOpen. An all-zero spanId is interpreted as having no spans above this one, thus we
// use the Onset spanId instead (taken from topLevelContext). We go to great lengths to rule out
// getting an all-zero spanId by chance (see SpanId::fromEntropy()), so this should be safe.
if (parentSpanId == tracing::SpanId::nullId) {
parentSpanId = topLevelContext.getSpanId();
}
size_t spanNameSize = operationName.size();
auto spanOpenContext = tracing::InvocationSpanContext(
topLevelContext.getTraceId(), topLevelContext.getInvocationId(), parentSpanId);
tailStreamWriter->report(
spanOpenContext, tracing::SpanOpen(spanId, kj::mv(operationName)), startTime, spanNameSize);
}

void WorkerTracer::addSpanEnd(tracing::SpanEndData&& span) {
if (pipelineLogLevel == PipelineLogLevel::NONE) {
return;
}
Expand All @@ -136,7 +166,6 @@ void WorkerTracer::addSpan(tracing::CompleteSpan&& span) {
adjustSpanTime(span);

size_t spanTagsSize = 0;
size_t spanNameSize = span.operationName.size();
for (const Span::TagMap::Entry& tag: span.tags) {
spanTagsSize += tag.key.size();
KJ_SWITCH_ONEOF(tag.value) {
Expand All @@ -153,28 +182,15 @@ void WorkerTracer::addSpan(tracing::CompleteSpan&& span) {
}
}

// Span events are transmitted together for now.
// Compose Attributes and SpanClose, which are available at span completion time and transmitted
// together.
auto& topLevelContext = KJ_ASSERT_NONNULL(topLevelInvocationSpanContext);
// Compose span events. For SpanOpen, an all-zero spanId is interpreted as having no spans above
// this one, thus we use the Onset spanId instead (taken from topLevelContext). We go to great
// lengths to rule out getting an all-zero spanId by chance (see SpanId::fromEntropy()), so this
// should be safe.
tracing::SpanId parentSpanId = span.parentSpanId;
if (parentSpanId == tracing::SpanId::nullId) {
parentSpanId = topLevelContext.getSpanId();
}
// TODO(o11y): Actually report the spanOpen event at span creation time
auto spanOpenContext = tracing::InvocationSpanContext(
topLevelContext.getTraceId(), topLevelContext.getInvocationId(), parentSpanId);
auto spanComponentContext = tracing::InvocationSpanContext(
topLevelContext.getTraceId(), topLevelContext.getInvocationId(), span.spanId);

tailStreamWriter->report(spanOpenContext,
tracing::SpanOpen(span.spanId, span.operationName.clone()), span.startTime, spanNameSize);
// If a span manages to exceed the size limit, truncate it by not providing span attributes.
if (span.tags.size() && spanTagsSize <= MAX_TRACE_BYTES) {
tracing::CustomInfo attr = KJ_MAP(tag, span.tags) {
return tracing::Attribute(tag.key.clone(), kj::mv(tag.value));
return tracing::Attribute(kj::mv(tag.key), kj::mv(tag.value));
};
tailStreamWriter->report(spanComponentContext, kj::mv(attr), span.startTime, spanTagsSize);
}
Expand Down Expand Up @@ -443,6 +459,56 @@ void BaseTracer::adjustSpanTime(tracing::CompleteSpan& span) {
}
}

void BaseTracer::adjustSpanTime(tracing::SpanEndData& span) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is copied from BaseTracer::adjustSpanTime(tracing::CompleteSpan&), will be deduplicated in the follow-up PR. Note that the operationName variable is no longer available, but if we need to debug an error here we can still infer the operation using the spanId, which maps to the span's SpanOpen event which has the operationName.

// To report I/O time, we need the IOContext to still be alive.
// weakIoContext is only none if we are tracing via RPC (in this case span times have already been
// adjusted) or if we failed to transmit an Onset event (in that case we'll get an error based on
// missing topLevelInvocationSpanContext right after).
if (weakIoContext != kj::none) {
auto& weakIoCtx = KJ_ASSERT_NONNULL(weakIoContext);
weakIoCtx->runIfAlive([this, &span](IoContext& context) {
if (context.hasCurrentIncomingRequest()) {
span.endTime = context.now();
} else {
// We have an IOContext, but there's no current IncomingRequest. Always log a warning here,
// this should not be happening. Still report completeTime as a useful timestamp if
// available.
bool hasCompleteTime = false;
if (completeTime != kj::UNIX_EPOCH) {
span.endTime = completeTime;
hasCompleteTime = true;
} else {
span.endTime = span.startTime;
}
if (isPredictableModeForTest()) {
KJ_FAIL_ASSERT("reported span without current request", hasCompleteTime);
} else {
LOG_WARNING_PERIODICALLY("reported span without current request");
}
}
});
if (!weakIoCtx->isValid()) {
// This can happen if we start a customEvent from this event and cancel it after this IoContext
// gets destroyed. In that case we no longer have an IoContext available and can't get the
// current time, but the outcome timestamp will have already been set. Since the outcome
// timestamp is "late enough", simply use that.
// TODO(o11y): fix this – spans should not be outliving the IoContext.
if (completeTime != kj::UNIX_EPOCH) {
span.endTime = completeTime;
} else {
// Otherwise, we can't actually get an end timestamp that makes sense. Report a zero-duration
// span and log a warning (or fail assert in test mode).
span.endTime = span.startTime;
if (isPredictableModeForTest()) {
KJ_FAIL_ASSERT("reported span after IoContext was deallocated");
} else {
KJ_LOG(WARNING, "reported span after IoContext was deallocated");
}
}
}
}
}

void WorkerTracer::setReturn(
kj::Maybe<kj::Date> timestamp, kj::Maybe<tracing::FetchResponseInfo> fetchResponseInfo) {
// Match the behavior of setEventInfo(). Any resolution of the TODO comments in setEventInfo()
Expand Down Expand Up @@ -517,6 +583,10 @@ void UserSpanObserver::report(const Span& span) {
submitter->submitSpan(spanId, parentSpanId, span);
}

void UserSpanObserver::reportStart(kj::ConstString operationName, kj::Date startTime) {
submitter->submitSpanOpen(spanId, parentSpanId, kj::mv(operationName), startTime);
}

// Provide I/O time to the tracing system for user spans.
kj::Date UserSpanObserver::getTime() {
return IoContext::current().now();
Expand Down
22 changes: 21 additions & 1 deletion src/workerd/io/tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,15 @@ class BaseTracer: public kj::Refcounted {
kj::Date timestamp,
LogLevel logLevel,
kj::String message) = 0;
// Add a span.
// Add a complete span.
virtual void addSpan(tracing::CompleteSpan&& span) = 0;
// Add information about a span when it is opened, corresponds to SpanOpen event.
virtual void addSpanOpen(tracing::SpanId spanId,
tracing::SpanId parentSpanId,
kj::ConstString operationName,
kj::Date startTime) = 0;
// Add span events when the span is complete (Attributes and SpanClose).
virtual void addSpanEnd(tracing::SpanEndData&& span) = 0;

virtual void addException(const tracing::InvocationSpanContext& context,
kj::Date timestamp,
Expand Down Expand Up @@ -90,6 +97,7 @@ class BaseTracer: public kj::Refcounted {

// helper method for addSpan() implementations
void adjustSpanTime(tracing::CompleteSpan& span);
void adjustSpanTime(tracing::SpanEndData& span);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These will be deduplicated in the follow-up PR


// Function to create the root span for the new tracing format.
kj::Maybe<MakeUserRequestSpanFunc> makeUserRequestSpanFunc;
Expand Down Expand Up @@ -126,6 +134,11 @@ class WorkerTracer final: public BaseTracer {
LogLevel logLevel,
kj::String message) override;
void addSpan(tracing::CompleteSpan&& span) override;
void addSpanOpen(tracing::SpanId spanId,
tracing::SpanId parentSpanId,
kj::ConstString operationName,
kj::Date startTime) override;
void addSpanEnd(tracing::SpanEndData&& span) override;
void addException(const tracing::InvocationSpanContext& context,
kj::Date timestamp,
kj::String name,
Expand Down Expand Up @@ -182,6 +195,12 @@ class WorkerTracer final: public BaseTracer {
class SpanSubmitter: public kj::Refcounted {
public:
virtual void submitSpan(tracing::SpanId context, tracing::SpanId spanId, const Span& span) = 0;
virtual void submitSpanOpen(tracing::SpanId spanId,
tracing::SpanId parentSpanId,
kj::ConstString operationName,
kj::Date startTime) = 0;
virtual void submitSpanEnd(tracing::SpanId spanId, const Span& span) = 0;

virtual tracing::SpanId makeSpanId() = 0;
};

Expand All @@ -202,6 +221,7 @@ class UserSpanObserver final: public SpanObserver {

kj::Own<SpanObserver> newChild() override;
void report(const Span& span) override;
void reportStart(kj::ConstString operationName, kj::Date startTime) override;
kj::Date getTime() override;

private:
Expand Down
Loading
Loading