Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,9 @@ GEM
logger
faraday-net_http (3.4.1)
net-http (>= 0.5.0)
faraday-retry (2.3.2)
faraday-retry (2.4.0)
faraday (~> 2.0)
google-protobuf (4.33.0-arm64-darwin)
bigdecimal
rake (>= 13)
google-protobuf (4.33.0-x86_64-linux-gnu)
google-protobuf (4.33.4)
bigdecimal
rake (>= 13)
googleapis-common-protos-types (1.22.0)
Expand Down Expand Up @@ -130,7 +127,7 @@ GEM
simplecov_json_formatter (0.1.4)
unicode-display_width (3.2.0)
unicode-emoji (~> 4.1)
unicode-emoji (4.1.0)
unicode-emoji (4.2.0)
uri (1.0.4)
webmock (3.25.1)
addressable (>= 2.8.0)
Expand Down
21 changes: 5 additions & 16 deletions lib/langfuse/propagation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,6 @@ def self._get_propagated_baggage_key(key)
def self._get_span_key_from_baggage_key(baggage_key)
return nil unless baggage_key.start_with?(BAGGAGE_PREFIX)

# Remove prefix
suffix = baggage_key[BAGGAGE_PREFIX.length..]

# Handle metadata keys (format: langfuse_metadata_{key_name})
Expand All @@ -373,17 +372,7 @@ def self._get_span_key_from_baggage_key(baggage_key)
return "#{OtelAttributes::TRACE_METADATA}.#{metadata_key}"
end

# Map standard keys
case suffix
when "user_id"
_get_propagated_span_key("user_id")
when "session_id"
_get_propagated_span_key("session_id")
when "version"
_get_propagated_span_key("version")
when "tags"
_get_propagated_span_key("tags")
end
SPAN_KEY_MAP[suffix]
end

# Check if baggage API is available
Expand All @@ -404,7 +393,7 @@ def self.baggage_available?
def self._extract_baggage_attributes(context)
return {} unless baggage_available?

baggage = OpenTelemetry::Baggage.value(context: context)
baggage = OpenTelemetry::Baggage.values(context: context)
return {} unless baggage.is_a?(Hash)

attributes = {}
Expand Down Expand Up @@ -453,12 +442,12 @@ def self._set_baggage_attribute(context:, key:, value:, baggage_key:)
if key == "metadata" && value.is_a?(Hash)
value.each do |k, v|
entry_key = "#{baggage_key}_#{k}"
context = OpenTelemetry::Baggage.set_value(context: context, key: entry_key, value: v.to_s)
context = OpenTelemetry::Baggage.set_value(entry_key, v.to_s, context: context)
end
elsif key == "tags" && value.is_a?(Array)
context = OpenTelemetry::Baggage.set_value(context: context, key: baggage_key, value: value.join(","))
context = OpenTelemetry::Baggage.set_value(baggage_key, value.join(","), context: context)
else
context = OpenTelemetry::Baggage.set_value(context: context, key: baggage_key, value: value.to_s)
context = OpenTelemetry::Baggage.set_value(baggage_key, value.to_s, context: context)
end
context
rescue StandardError => e
Expand Down
84 changes: 36 additions & 48 deletions spec/langfuse/propagation_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,27 @@
end
end

shared_context "with baggage mock" do
before do
baggage_module = Module.new do
def self.set_value(key, value, context: nil)
ctx = context || OpenTelemetry::Context.current
new_baggage = (ctx.value("baggage") || {}).dup
new_baggage[key] = value
ctx.set_value("baggage", new_baggage)
end

def self.values(context: nil)
ctx = context || OpenTelemetry::Context.current
ctx.value("baggage") || {}
end
end

stub_const("OpenTelemetry::Baggage", baggage_module)
allow(described_class).to receive(:baggage_available?).and_return(true)
end
end

describe ".propagate_attributes" do
context "when called without a block" do
it "raises ArgumentError" do
Expand Down Expand Up @@ -254,60 +275,44 @@
end

context "when baggage is available" do
before do
# Mock OpenTelemetry::Baggage to be available
baggage_module = Module.new do
def self.set_value(context:, key:, value:)
new_baggage = (context.value("baggage") || {}).dup
new_baggage[key] = value
context.set_value("baggage", new_baggage)
end

def self.value(context:)
context.value("baggage") || {}
end
end

stub_const("OpenTelemetry::Baggage", baggage_module)
allow(described_class).to receive(:baggage_available?).and_return(true)
end
include_context "with baggage mock"

it "sets baggage attributes when as_baggage is true" do
described_class.propagate_attributes(user_id: "user_123", as_baggage: true) do
context = OpenTelemetry::Context.current
baggage = OpenTelemetry::Baggage.value(context: context)
baggage = OpenTelemetry::Baggage.values(context: context)
expect(baggage["langfuse_user_id"]).to eq("user_123")
end
end

it "sets baggage for session_id" do
described_class.propagate_attributes(session_id: "session_abc", as_baggage: true) do
context = OpenTelemetry::Context.current
baggage = OpenTelemetry::Baggage.value(context: context)
baggage = OpenTelemetry::Baggage.values(context: context)
expect(baggage["langfuse_session_id"]).to eq("session_abc")
end
end

it "sets baggage for version" do
described_class.propagate_attributes(version: "v1.2.3", as_baggage: true) do
context = OpenTelemetry::Context.current
baggage = OpenTelemetry::Baggage.value(context: context)
baggage = OpenTelemetry::Baggage.values(context: context)
expect(baggage["langfuse_version"]).to eq("v1.2.3")
end
end

it "sets baggage for tags as comma-separated string" do
described_class.propagate_attributes(tags: %w[tag1 tag2], as_baggage: true) do
context = OpenTelemetry::Context.current
baggage = OpenTelemetry::Baggage.value(context: context)
baggage = OpenTelemetry::Baggage.values(context: context)
expect(baggage["langfuse_tags"]).to eq("tag1,tag2")
end
end

it "sets baggage for metadata with prefixed keys" do
described_class.propagate_attributes(metadata: { env: "prod", region: "us-east" }, as_baggage: true) do
context = OpenTelemetry::Context.current
baggage = OpenTelemetry::Baggage.value(context: context)
baggage = OpenTelemetry::Baggage.values(context: context)
expect(baggage["langfuse_metadata_env"]).to eq("prod")
expect(baggage["langfuse_metadata_region"]).to eq("us-east")
end
Expand Down Expand Up @@ -376,30 +381,13 @@ def self.value(context:)
end

context "with baggage extraction" do
before do
# Mock OpenTelemetry::Baggage to be available
baggage_module = Module.new do
def self.set_value(context:, key:, value:)
new_baggage = (context.value("baggage") || {}).dup
new_baggage[key] = value
context.set_value("baggage", new_baggage)
end

def self.value(context:)
context.value("baggage") || {}
end
end

stub_const("OpenTelemetry::Baggage", baggage_module)
allow(described_class).to receive(:baggage_available?).and_return(true)
end
include_context "with baggage mock"

it "extracts attributes from baggage" do
# Set baggage directly
context = OpenTelemetry::Context.current
context = OpenTelemetry::Baggage.set_value(context: context, key: "langfuse_user_id", value: "baggage_user")
context = OpenTelemetry::Baggage.set_value(context: context, key: "langfuse_session_id",
value: "baggage_session")
context = OpenTelemetry::Baggage.set_value("langfuse_user_id", "baggage_user", context: context)
context = OpenTelemetry::Baggage.set_value("langfuse_session_id", "baggage_session", context: context)

attrs = described_class.get_propagated_attributes_from_context(context)

Expand All @@ -409,7 +397,7 @@ def self.value(context:)

it "extracts tags from baggage as comma-separated string" do
context = OpenTelemetry::Context.current
context = OpenTelemetry::Baggage.set_value(context: context, key: "langfuse_tags", value: "tag1,tag2,tag3")
context = OpenTelemetry::Baggage.set_value("langfuse_tags", "tag1,tag2,tag3", context: context)

attrs = described_class.get_propagated_attributes_from_context(context)

Expand All @@ -418,8 +406,8 @@ def self.value(context:)

it "extracts metadata keys from baggage" do
context = OpenTelemetry::Context.current
context = OpenTelemetry::Baggage.set_value(context: context, key: "langfuse_metadata_env", value: "production")
context = OpenTelemetry::Baggage.set_value(context: context, key: "langfuse_metadata_region", value: "us-east")
context = OpenTelemetry::Baggage.set_value("langfuse_metadata_env", "production", context: context)
context = OpenTelemetry::Baggage.set_value("langfuse_metadata_region", "us-east", context: context)

attrs = described_class.get_propagated_attributes_from_context(context)

Expand All @@ -429,8 +417,8 @@ def self.value(context:)

it "ignores non-Langfuse baggage keys" do
context = OpenTelemetry::Context.current
context = OpenTelemetry::Baggage.set_value(context: context, key: "other_key", value: "other_value")
context = OpenTelemetry::Baggage.set_value(context: context, key: "langfuse_user_id", value: "user_123")
context = OpenTelemetry::Baggage.set_value("other_key", "other_value", context: context)
context = OpenTelemetry::Baggage.set_value("langfuse_user_id", "user_123", context: context)

attrs = described_class.get_propagated_attributes_from_context(context)

Expand All @@ -440,7 +428,7 @@ def self.value(context:)

it "handles baggage extraction errors gracefully" do
# Mock baggage to raise an error
allow(OpenTelemetry::Baggage).to receive(:value).and_raise(StandardError.new("Baggage error"))
allow(OpenTelemetry::Baggage).to receive(:values).and_raise(StandardError.new("Baggage error"))
allow(described_class).to receive(:baggage_available?).and_return(true)

expect(Langfuse.configuration.logger).to receive(:debug).with(/Baggage extraction failed/)
Expand Down
Loading