diff --git a/src/workerd/io/trace.c++ b/src/workerd/io/trace.c++ index a12e3bbd8a9..779a1d80a17 100644 --- a/src/workerd/io/trace.c++ +++ b/src/workerd/io/trace.c++ @@ -1516,13 +1516,49 @@ CompleteSpan::CompleteSpan(rpc::UserSpanData::Reader reader) } } -CompleteSpan CompleteSpan::clone() const { - CompleteSpan copy(spanId, parentSpanId, operationName.clone(), startTime, endTime); - copy.tags.reserve(tags.size()); +SpanOpenData::SpanOpenData(rpc::SpanOpenData::Reader reader) + : spanId(reader.getSpanId()), + parentSpanId(reader.getParentSpanId()), + operationName(kj::str(reader.getOperationName())), + startTime(kj::UNIX_EPOCH + reader.getStartTimeNs() * kj::NANOSECONDS) {} + +void SpanOpenData::copyTo(rpc::SpanOpenData::Builder builder) const { + builder.setOperationName(operationName.asPtr()); + builder.setStartTimeNs((startTime - kj::UNIX_EPOCH) / kj::NANOSECONDS); + builder.setSpanId(spanId); + builder.setParentSpanId(parentSpanId); +} + +SpanEndData::SpanEndData(CompleteSpan&& span) + : spanId(span.spanId), + startTime(span.startTime), + endTime(span.endTime), + tags(kj::mv(span.tags)) {} + +SpanEndData::SpanEndData(rpc::SpanEndData::Reader reader) + : spanId(reader.getSpanId()), + startTime(kj::UNIX_EPOCH + reader.getStartTimeNs() * kj::NANOSECONDS), + endTime(kj::UNIX_EPOCH + reader.getEndTimeNs() * kj::NANOSECONDS) { + auto tagsParam = reader.getTags(); + tags.reserve(tagsParam.size()); + for (auto tagParam: tagsParam) { + tags.insert(kj::ConstString(kj::heapString(tagParam.getKey())), + deserializeTagValue(tagParam.getValue())); + } +} + +void SpanEndData::copyTo(rpc::SpanEndData::Builder builder) const { + builder.setStartTimeNs((startTime - kj::UNIX_EPOCH) / kj::NANOSECONDS); + builder.setEndTimeNs((endTime - kj::UNIX_EPOCH) / kj::NANOSECONDS); + builder.setSpanId(spanId); + + auto tagsParam = builder.initTags(tags.size()); + auto i = 0; for (auto& tag: tags) { - copy.tags.insert(tag.key.clone(), spanTagClone(tag.value)); + auto tagParam = tagsParam[i++]; + tagParam.setKey(tag.key.asPtr()); + serializeTagValue(tagParam.initValue(), tag.value); } - return copy; } } // namespace tracing @@ -1534,7 +1570,8 @@ SpanBuilder::SpanBuilder(kj::Maybe> observer, KJ_IF_SOME(obs, observer) { // TODO(o11y): Once we report the user tracing spanOpen event as soon as a span is created, we // should be able to fold this virtual call and just get the timestamp directly. - span.emplace(kj::mv(operationName), startTime.orDefault(obs->getTime())); + kj::Date time = startTime.orDefault([&]() { return obs->getTime(); }); + span.emplace(kj::mv(operationName), time); this->observer = kj::mv(obs); } } diff --git a/src/workerd/io/trace.capnp b/src/workerd/io/trace.capnp index d27e2cfe9fc..3ab995523c1 100644 --- a/src/workerd/io/trace.capnp +++ b/src/workerd/io/trace.capnp @@ -42,3 +42,28 @@ struct UserSpanData { parentSpanId @5 :UInt64; } +struct SpanOpenData { + # Representation of a SpanOpen event, created when a user span is opened. + operationName @0 :Text; + + startTimeNs @1 :Int64; + # Nanoseconds since Unix epoch + + spanId @2 :UInt64; + parentSpanId @3 :UInt64; +} + +struct SpanEndData { + # Representation of an event that indicates completion of a user span. This information is + # provided to the streaming tail worker in the Attributes and SpanClose events. + + # TODO(cleanup): startTimeNs is merely used as a fallback timestamp, consider obsoleting it. + startTimeNs @0 :Int64; + # Nanoseconds since Unix epoch + endTimeNs @1 :Int64; + # Nanoseconds since Unix epoch + + # List of span attributes + tags @2 :List(Tag); + spanId @3 :UInt64; +} diff --git a/src/workerd/io/trace.h b/src/workerd/io/trace.h index dfd34b06b77..1080b7a584c 100644 --- a/src/workerd/io/trace.h +++ b/src/workerd/io/trace.h @@ -632,7 +632,6 @@ struct CompleteSpan { CompleteSpan(rpc::UserSpanData::Reader reader); void copyTo(rpc::UserSpanData::Builder builder) const; - CompleteSpan clone() const; explicit CompleteSpan(tracing::SpanId spanId, tracing::SpanId parentSpanId, kj::ConstString operationName, @@ -646,7 +645,50 @@ struct CompleteSpan { startTime(startTime), endTime(endTime), tags(kj::mv(tags)) {} - kj::String toString() const; +}; + +struct SpanOpenData { + // Represents the data needed for a SpanOpen event + tracing::SpanId spanId; + tracing::SpanId parentSpanId; + + kj::ConstString operationName; + kj::Date startTime; + + SpanOpenData(rpc::SpanOpenData::Reader reader); + void copyTo(rpc::SpanOpenData::Builder builder) const; + explicit SpanOpenData(tracing::SpanId spanId, + tracing::SpanId parentSpanId, + kj::ConstString operationName, + kj::Date startTime) + : spanId(spanId), + parentSpanId(parentSpanId), + operationName(kj::mv(operationName)), + startTime(startTime) {} +}; + +struct SpanEndData { + // Represents the data needed when closing a span, including the Attributes and SpanClose events. + tracing::SpanId spanId; + + kj::Date startTime; + kj::Date endTime; + // Should be Span::TagMap, but we can't forward-declare that. + kj::HashMap tags; + + // Convert CompleteSpan to SpanEndData + explicit SpanEndData(CompleteSpan&& span); + SpanEndData(rpc::SpanEndData::Reader reader); + void copyTo(rpc::SpanEndData::Builder builder) const; + explicit SpanEndData(tracing::SpanId spanId, + kj::Date startTime, + kj::Date endTime, + kj::HashMap tags = + kj::HashMap()) + : spanId(spanId), + startTime(startTime), + endTime(endTime), + tags(kj::mv(tags)) {} }; // A Return mark is used to mark the point at which a span operation returned @@ -1112,8 +1154,10 @@ class SpanObserver: public kj::Refcounted { // Report the span data. Called at the end of the span. // - // This should always be called exactly once per observer. + // This should always be called exactly once per observer at span completion time. virtual void report(const Span& span) = 0; + // Report information about the span onset. + virtual void reportStart(kj::ConstString operationName, kj::Date startTime) = 0; // The current time to be provided for the span. For user tracing, we will override this to // provide I/O time. This *requires* that spans are only created when an IOContext is available diff --git a/src/workerd/io/tracer.c++ b/src/workerd/io/tracer.c++ index 5f401db2c69..922e9b999e3 100644 --- a/src/workerd/io/tracer.c++ +++ b/src/workerd/io/tracer.c++ @@ -124,7 +124,37 @@ void WorkerTracer::addLog(const tracing::InvocationSpanContext& context, } void WorkerTracer::addSpan(tracing::CompleteSpan&& span) { - // This is where we'll actually encode the span. + // The span information is not transmitted via RPC at this point, we can decompose the span into + // spanOpen/spanEnd. + addSpanOpen(span.spanId, span.parentSpanId, kj::mv(span.operationName), span.startTime); + tracing::SpanEndData spanEnd(kj::mv(span)); + addSpanEnd(kj::mv(spanEnd)); +} + +void WorkerTracer::addSpanOpen(tracing::SpanId spanId, + tracing::SpanId parentSpanId, + kj::ConstString operationName, + kj::Date startTime) { + if (pipelineLogLevel == PipelineLogLevel::NONE) { + return; + } + + auto& tailStreamWriter = KJ_UNWRAP_OR_RETURN(maybeTailStreamWriter); + auto& topLevelContext = KJ_ASSERT_NONNULL(topLevelInvocationSpanContext); + // Compose SpanOpen. An all-zero spanId is interpreted as having no spans above this one, thus we + // use the Onset spanId instead (taken from topLevelContext). We go to great lengths to rule out + // getting an all-zero spanId by chance (see SpanId::fromEntropy()), so this should be safe. + if (parentSpanId == tracing::SpanId::nullId) { + parentSpanId = topLevelContext.getSpanId(); + } + size_t spanNameSize = operationName.size(); + auto spanOpenContext = tracing::InvocationSpanContext( + topLevelContext.getTraceId(), topLevelContext.getInvocationId(), parentSpanId); + tailStreamWriter->report( + spanOpenContext, tracing::SpanOpen(spanId, kj::mv(operationName)), startTime, spanNameSize); +} + +void WorkerTracer::addSpanEnd(tracing::SpanEndData&& span) { if (pipelineLogLevel == PipelineLogLevel::NONE) { return; } @@ -136,7 +166,6 @@ void WorkerTracer::addSpan(tracing::CompleteSpan&& span) { adjustSpanTime(span); size_t spanTagsSize = 0; - size_t spanNameSize = span.operationName.size(); for (const Span::TagMap::Entry& tag: span.tags) { spanTagsSize += tag.key.size(); KJ_SWITCH_ONEOF(tag.value) { @@ -153,28 +182,15 @@ void WorkerTracer::addSpan(tracing::CompleteSpan&& span) { } } - // Span events are transmitted together for now. + // Compose Attributes and SpanClose, which are available at span completion time and transmitted + // together. auto& topLevelContext = KJ_ASSERT_NONNULL(topLevelInvocationSpanContext); - // Compose span events. For SpanOpen, an all-zero spanId is interpreted as having no spans above - // this one, thus we use the Onset spanId instead (taken from topLevelContext). We go to great - // lengths to rule out getting an all-zero spanId by chance (see SpanId::fromEntropy()), so this - // should be safe. - tracing::SpanId parentSpanId = span.parentSpanId; - if (parentSpanId == tracing::SpanId::nullId) { - parentSpanId = topLevelContext.getSpanId(); - } - // TODO(o11y): Actually report the spanOpen event at span creation time - auto spanOpenContext = tracing::InvocationSpanContext( - topLevelContext.getTraceId(), topLevelContext.getInvocationId(), parentSpanId); auto spanComponentContext = tracing::InvocationSpanContext( topLevelContext.getTraceId(), topLevelContext.getInvocationId(), span.spanId); - tailStreamWriter->report(spanOpenContext, - tracing::SpanOpen(span.spanId, span.operationName.clone()), span.startTime, spanNameSize); - // If a span manages to exceed the size limit, truncate it by not providing span attributes. if (span.tags.size() && spanTagsSize <= MAX_TRACE_BYTES) { tracing::CustomInfo attr = KJ_MAP(tag, span.tags) { - return tracing::Attribute(tag.key.clone(), kj::mv(tag.value)); + return tracing::Attribute(kj::mv(tag.key), kj::mv(tag.value)); }; tailStreamWriter->report(spanComponentContext, kj::mv(attr), span.startTime, spanTagsSize); } @@ -443,6 +459,56 @@ void BaseTracer::adjustSpanTime(tracing::CompleteSpan& span) { } } +void BaseTracer::adjustSpanTime(tracing::SpanEndData& span) { + // To report I/O time, we need the IOContext to still be alive. + // weakIoContext is only none if we are tracing via RPC (in this case span times have already been + // adjusted) or if we failed to transmit an Onset event (in that case we'll get an error based on + // missing topLevelInvocationSpanContext right after). + if (weakIoContext != kj::none) { + auto& weakIoCtx = KJ_ASSERT_NONNULL(weakIoContext); + weakIoCtx->runIfAlive([this, &span](IoContext& context) { + if (context.hasCurrentIncomingRequest()) { + span.endTime = context.now(); + } else { + // We have an IOContext, but there's no current IncomingRequest. Always log a warning here, + // this should not be happening. Still report completeTime as a useful timestamp if + // available. + bool hasCompleteTime = false; + if (completeTime != kj::UNIX_EPOCH) { + span.endTime = completeTime; + hasCompleteTime = true; + } else { + span.endTime = span.startTime; + } + if (isPredictableModeForTest()) { + KJ_FAIL_ASSERT("reported span without current request", hasCompleteTime); + } else { + LOG_WARNING_PERIODICALLY("reported span without current request"); + } + } + }); + if (!weakIoCtx->isValid()) { + // This can happen if we start a customEvent from this event and cancel it after this IoContext + // gets destroyed. In that case we no longer have an IoContext available and can't get the + // current time, but the outcome timestamp will have already been set. Since the outcome + // timestamp is "late enough", simply use that. + // TODO(o11y): fix this – spans should not be outliving the IoContext. + if (completeTime != kj::UNIX_EPOCH) { + span.endTime = completeTime; + } else { + // Otherwise, we can't actually get an end timestamp that makes sense. Report a zero-duration + // span and log a warning (or fail assert in test mode). + span.endTime = span.startTime; + if (isPredictableModeForTest()) { + KJ_FAIL_ASSERT("reported span after IoContext was deallocated"); + } else { + KJ_LOG(WARNING, "reported span after IoContext was deallocated"); + } + } + } + } +} + void WorkerTracer::setReturn( kj::Maybe timestamp, kj::Maybe fetchResponseInfo) { // Match the behavior of setEventInfo(). Any resolution of the TODO comments in setEventInfo() @@ -517,6 +583,10 @@ void UserSpanObserver::report(const Span& span) { submitter->submitSpan(spanId, parentSpanId, span); } +void UserSpanObserver::reportStart(kj::ConstString operationName, kj::Date startTime) { + submitter->submitSpanOpen(spanId, parentSpanId, kj::mv(operationName), startTime); +} + // Provide I/O time to the tracing system for user spans. kj::Date UserSpanObserver::getTime() { return IoContext::current().now(); diff --git a/src/workerd/io/tracer.h b/src/workerd/io/tracer.h index cc1efefc14b..7a23036ba53 100644 --- a/src/workerd/io/tracer.h +++ b/src/workerd/io/tracer.h @@ -31,8 +31,15 @@ class BaseTracer: public kj::Refcounted { kj::Date timestamp, LogLevel logLevel, kj::String message) = 0; - // Add a span. + // Add a complete span. virtual void addSpan(tracing::CompleteSpan&& span) = 0; + // Add information about a span when it is opened, corresponds to SpanOpen event. + virtual void addSpanOpen(tracing::SpanId spanId, + tracing::SpanId parentSpanId, + kj::ConstString operationName, + kj::Date startTime) = 0; + // Add span events when the span is complete (Attributes and SpanClose). + virtual void addSpanEnd(tracing::SpanEndData&& span) = 0; virtual void addException(const tracing::InvocationSpanContext& context, kj::Date timestamp, @@ -90,6 +97,7 @@ class BaseTracer: public kj::Refcounted { // helper method for addSpan() implementations void adjustSpanTime(tracing::CompleteSpan& span); + void adjustSpanTime(tracing::SpanEndData& span); // Function to create the root span for the new tracing format. kj::Maybe makeUserRequestSpanFunc; @@ -126,6 +134,11 @@ class WorkerTracer final: public BaseTracer { LogLevel logLevel, kj::String message) override; void addSpan(tracing::CompleteSpan&& span) override; + void addSpanOpen(tracing::SpanId spanId, + tracing::SpanId parentSpanId, + kj::ConstString operationName, + kj::Date startTime) override; + void addSpanEnd(tracing::SpanEndData&& span) override; void addException(const tracing::InvocationSpanContext& context, kj::Date timestamp, kj::String name, @@ -182,6 +195,12 @@ class WorkerTracer final: public BaseTracer { class SpanSubmitter: public kj::Refcounted { public: virtual void submitSpan(tracing::SpanId context, tracing::SpanId spanId, const Span& span) = 0; + virtual void submitSpanOpen(tracing::SpanId spanId, + tracing::SpanId parentSpanId, + kj::ConstString operationName, + kj::Date startTime) = 0; + virtual void submitSpanEnd(tracing::SpanId spanId, const Span& span) = 0; + virtual tracing::SpanId makeSpanId() = 0; }; @@ -202,6 +221,7 @@ class UserSpanObserver final: public SpanObserver { kj::Own newChild() override; void report(const Span& span) override; + void reportStart(kj::ConstString operationName, kj::Date startTime) override; kj::Date getTime() override; private: diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index 93fd589c9de..e959c8c009f 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -1701,10 +1701,24 @@ class SequentialSpanSubmitter final: public SpanSubmitter { public: SequentialSpanSubmitter(kj::Own workerTracer): workerTracer(kj::mv(workerTracer)) {} void submitSpan(tracing::SpanId spanId, tracing::SpanId parentSpanId, const Span& span) override { - // We largely recreate the span here which feels inefficient, but is hard to avoid given the - // mismatch between the Span type and the full span information required for OTel. - tracing::CompleteSpan span2( - spanId, parentSpanId, span.operationName.clone(), span.startTime, span.endTime); + // This code path is workerd-only, we can safely decompose this span into its components and + // call submitSpanOpen/submitSpanEnd instead of reimplementing them here. + submitSpanOpen(spanId, parentSpanId, span.operationName.clone(), span.startTime); + submitSpanEnd(spanId, span); + } + + void submitSpanOpen(tracing::SpanId spanId, + tracing::SpanId parentSpanId, + kj::ConstString operationName, + kj::Date startTime) override { + if (isPredictableModeForTest()) { + startTime = kj::UNIX_EPOCH; + } + workerTracer->addSpanOpen(spanId, parentSpanId, kj::mv(operationName), startTime); + } + + void submitSpanEnd(tracing::SpanId spanId, const Span& span) override { + tracing::SpanEndData span2(spanId, span.startTime, span.endTime); span2.tags.reserve(span.tags.size()); for (auto& tag: span.tags) { span2.tags.insert(tag.key.clone(), spanTagClone(tag.value)); @@ -1713,7 +1727,7 @@ class SequentialSpanSubmitter final: public SpanSubmitter { span2.startTime = span2.endTime = kj::UNIX_EPOCH; } - workerTracer->addSpan(kj::mv(span2)); + workerTracer->addSpanEnd(kj::mv(span2)); } tracing::SpanId makeSpanId() override {