diff --git a/router-tests/telemetry/attribute_processor_test.go b/router-tests/telemetry/attribute_processor_test.go new file mode 100644 index 0000000000..b9f76edbac --- /dev/null +++ b/router-tests/telemetry/attribute_processor_test.go @@ -0,0 +1,359 @@ +package telemetry + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/require" + "github.com/wundergraph/cosmo/router-tests/testenv" + "github.com/wundergraph/cosmo/router/core" + "github.com/wundergraph/cosmo/router/pkg/config" + "github.com/wundergraph/cosmo/router/pkg/trace/tracetest" + "go.opentelemetry.io/otel/attribute" + "go.uber.org/zap/zapcore" +) + +// TestAttributeProcessorIntegration tests that the attribute processor configurations +// are properly wired through the router. These tests verify: +// 1. The configuration is properly passed through testenv -> router -> trace provider +// 2. The router functions correctly with various attribute processor configurations +// 3. SanitizeUTF8 logs warnings when invalid UTF-8 is detected (when logging is enabled) +// +// The actual attribute processing logic (redaction, hashing, UTF-8 sanitization) +// is also tested in: +// - router/pkg/trace/attributeprocessor/*_test.go (unit tests) +// - router/pkg/trace/attributeprocessor_integration_test.go (integration tests) +func TestAttributeProcessorIntegration(t *testing.T) { + t.Parallel() + + t.Run("Router works with IPAnonymization Redact enabled", func(t *testing.T) { + t.Parallel() + + exporter := tracetest.NewInMemoryExporter(t) + + testenv.Run(t, &testenv.Config{ + TraceExporter: exporter, + IPAnonymization: &core.IPAnonymizationConfig{ + Enabled: true, + Method: core.Redact, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query { employees { id } }`, + }) + require.Contains(t, res.Body, `"employees"`) + + sn := exporter.GetSpans().Snapshots() + require.NotEmpty(t, sn) + }) + }) + + t.Run("Router works with IPAnonymization Hash enabled", func(t *testing.T) { + t.Parallel() + + exporter := tracetest.NewInMemoryExporter(t) + + testenv.Run(t, &testenv.Config{ + TraceExporter: exporter, + IPAnonymization: &core.IPAnonymizationConfig{ + Enabled: true, + Method: core.Hash, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query { employees { id } }`, + }) + require.Contains(t, res.Body, `"employees"`) + + sn := exporter.GetSpans().Snapshots() + require.NotEmpty(t, sn) + }) + }) + + t.Run("Router works with IPAnonymization disabled", func(t *testing.T) { + t.Parallel() + + exporter := tracetest.NewInMemoryExporter(t) + + testenv.Run(t, &testenv.Config{ + TraceExporter: exporter, + IPAnonymization: &core.IPAnonymizationConfig{ + Enabled: false, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query { employees { id } }`, + }) + require.Contains(t, res.Body, `"employees"`) + + sn := exporter.GetSpans().Snapshots() + require.NotEmpty(t, sn) + }) + }) + + t.Run("Router works with SanitizeUTF8 enabled", func(t *testing.T) { + t.Parallel() + + exporter := tracetest.NewInMemoryExporter(t) + + testenv.Run(t, &testenv.Config{ + TraceExporter: exporter, + TracingSanitizeUTF8: &config.SanitizeUTF8Config{ + Enabled: true, + LogSanitizations: false, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query { employees { id } }`, + }) + require.Contains(t, res.Body, `"employees"`) + + sn := exporter.GetSpans().Snapshots() + require.NotEmpty(t, sn) + }) + }) + + t.Run("SanitizeUTF8 logs warning when invalid UTF-8 is detected", func(t *testing.T) { + t.Parallel() + + exporter := tracetest.NewInMemoryExporter(t) + + // Create a string with invalid UTF-8 bytes + invalidUTF8Value := string([]byte{0x80, 0x81, 0x82}) + sanitizedValue := strings.ToValidUTF8(invalidUTF8Value, "\ufffd") + attrKey := "custom.invalid_utf8_attr" + + testenv.Run(t, &testenv.Config{ + TraceExporter: exporter, + TracingSanitizeUTF8: &config.SanitizeUTF8Config{ + Enabled: true, + LogSanitizations: true, + }, + // Add a custom tracing attribute with invalid UTF-8 as default value + CustomTracingAttributes: []config.CustomAttribute{ + { + Key: attrKey, + Default: invalidUTF8Value, + }, + }, + LogObservation: testenv.LogObservationConfig{ + Enabled: true, + LogLevel: zapcore.WarnLevel, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query { employees { id } }`, + }) + require.Contains(t, res.Body, `"employees"`) + + // Verify that spans were created + sn := exporter.GetSpans().Snapshots() + require.NotEmpty(t, sn) + + // Verify that the invalid UTF-8 attribute was sanitized (replaced with U+FFFD) + sanitizedAttr := attribute.String(attrKey, sanitizedValue) + require.Contains(t, sn[0].Attributes(), sanitizedAttr) + + // Verify that the warning log was emitted + logEntries := xEnv.Observer().FilterMessageSnippet("Invalid UTF-8 in span attribute").All() + require.GreaterOrEqual(t, len(logEntries), 1) + + // Verify the log contains the attribute key + logEntry := logEntries[0] + contextMap := logEntry.ContextMap() + require.Equal(t, attrKey, contextMap["key"]) + }) + }) + + t.Run("SanitizeUTF8 does not log when logging is disabled", func(t *testing.T) { + t.Parallel() + + exporter := tracetest.NewInMemoryExporter(t) + + // Create a string with invalid UTF-8 bytes + invalidUTF8Value := string([]byte{0x80, 0x81, 0x82}) + sanitizedValue := strings.ToValidUTF8(invalidUTF8Value, "\ufffd") + attrKey := "custom.invalid_utf8_attr_no_log" + + testenv.Run(t, &testenv.Config{ + TraceExporter: exporter, + TracingSanitizeUTF8: &config.SanitizeUTF8Config{ + Enabled: true, + LogSanitizations: false, // Logging disabled + }, + CustomTracingAttributes: []config.CustomAttribute{ + { + Key: attrKey, + Default: invalidUTF8Value, + }, + }, + LogObservation: testenv.LogObservationConfig{ + Enabled: true, + LogLevel: zapcore.WarnLevel, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query { employees { id } }`, + }) + require.Contains(t, res.Body, `"employees"`) + + // Verify that spans were created + sn := exporter.GetSpans().Snapshots() + require.NotEmpty(t, sn) + + // Verify that the invalid UTF-8 attribute was still sanitized + sanitizedAttr := attribute.String(attrKey, sanitizedValue) + require.Contains(t, sn[0].Attributes(), sanitizedAttr) + + // Verify that NO warning log was emitted for the sanitization + logEntries := xEnv.Observer().FilterMessageSnippet("Invalid UTF-8 in span attribute").All() + require.Empty(t, logEntries) + }) + }) + + t.Run("Router works with SanitizeUTF8 disabled", func(t *testing.T) { + t.Parallel() + + exporter := tracetest.NewInMemoryExporter(t) + + testenv.Run(t, &testenv.Config{ + TraceExporter: exporter, + TracingSanitizeUTF8: &config.SanitizeUTF8Config{ + Enabled: false, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query { employees { id } }`, + }) + require.Contains(t, res.Body, `"employees"`) + + sn := exporter.GetSpans().Snapshots() + require.NotEmpty(t, sn) + }) + }) + + t.Run("SanitizeUTF8 disabled leaves invalid UTF-8 unchanged", func(t *testing.T) { + t.Parallel() + + exporter := tracetest.NewInMemoryExporter(t) + + // Create a string with invalid UTF-8 bytes + invalidUTF8Value := string([]byte{0x80, 0x81, 0x82}) + attrKey := "custom.invalid_utf8_unchanged" + + testenv.Run(t, &testenv.Config{ + TraceExporter: exporter, + TracingSanitizeUTF8: &config.SanitizeUTF8Config{ + Enabled: false, // Disabled + }, + CustomTracingAttributes: []config.CustomAttribute{ + { + Key: attrKey, + Default: invalidUTF8Value, + }, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query { employees { id } }`, + }) + require.Contains(t, res.Body, `"employees"`) + + // Verify that spans were created + sn := exporter.GetSpans().Snapshots() + require.NotEmpty(t, sn) + + // Verify that the invalid UTF-8 attribute was NOT sanitized + require.Contains(t, sn[0].Attributes(), attribute.String(attrKey, invalidUTF8Value)) + }) + }) + + t.Run("Router works with both IPAnonymization and SanitizeUTF8 enabled", func(t *testing.T) { + t.Parallel() + + exporter := tracetest.NewInMemoryExporter(t) + + testenv.Run(t, &testenv.Config{ + TraceExporter: exporter, + TracingSanitizeUTF8: &config.SanitizeUTF8Config{ + Enabled: true, + LogSanitizations: false, + }, + IPAnonymization: &core.IPAnonymizationConfig{ + Enabled: true, + Method: core.Redact, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query { employees { id } }`, + }) + require.Contains(t, res.Body, `"employees"`) + + sn := exporter.GetSpans().Snapshots() + require.NotEmpty(t, sn) + }) + }) + + t.Run("IPAnonymization redacts http.client_ip attribute", func(t *testing.T) { + t.Parallel() + + exporter := tracetest.NewInMemoryExporter(t) + + testenv.Run(t, &testenv.Config{ + TraceExporter: exporter, + IPAnonymization: &core.IPAnonymizationConfig{ + Enabled: true, + Method: core.Redact, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query { employees { id } }`, + }) + require.Contains(t, res.Body, `"employees"`) + + sn := exporter.GetSpans().Snapshots() + require.NotEmpty(t, sn) + + // Check that http.client_ip is redacted in spans that have it + for _, span := range sn { + for _, attr := range span.Attributes() { + if attr.Key == attribute.Key("http.client_ip") { + require.Equal(t, "[REDACTED]", attr.Value.AsString()) + } + } + } + }) + }) + + t.Run("IPAnonymization hashes http.client_ip attribute", func(t *testing.T) { + t.Parallel() + + exporter := tracetest.NewInMemoryExporter(t) + + testenv.Run(t, &testenv.Config{ + TraceExporter: exporter, + IPAnonymization: &core.IPAnonymizationConfig{ + Enabled: true, + Method: core.Hash, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query { employees { id } }`, + }) + require.Contains(t, res.Body, `"employees"`) + + sn := exporter.GetSpans().Snapshots() + require.NotEmpty(t, sn) + + // Check that http.client_ip is hashed (64 char hex) in spans that have it + for _, span := range sn { + for _, attr := range span.Attributes() { + if attr.Key == attribute.Key("http.client_ip") { + value := attr.Value.AsString() + require.Len(t, value, 64) + require.NotEqual(t, "[REDACTED]", value) + } + } + } + }) + }) +} diff --git a/router-tests/testenv/testenv.go b/router-tests/testenv/testenv.go index bc99b90a1a..bd37715f15 100644 --- a/router-tests/testenv/testenv.go +++ b/router-tests/testenv/testenv.go @@ -308,6 +308,8 @@ type Config struct { DisableParentBasedSampler bool TLSConfig *core.TlsConfig TraceExporter trace.SpanExporter + TracingSanitizeUTF8 *config.SanitizeUTF8Config + IPAnonymization *core.IPAnonymizationConfig CustomMetricAttributes []config.CustomAttribute CustomTelemetryAttributes []config.CustomAttribute CustomTracingAttributes []config.CustomAttribute @@ -1465,19 +1467,25 @@ func configureRouter(listenerAddr string, testConfig *Config, routerConfig *node if testConfig.TraceExporter != nil { testConfig.PropagationConfig.TraceContext = true + tracingConfig := config.Tracing{ + Enabled: true, + SamplingRate: 1, + ParentBasedSampler: !testConfig.DisableParentBasedSampler, + OperationContentAttributes: testConfig.OperationContentAttributes, + Exporters: []config.TracingExporter{}, + Propagation: testConfig.PropagationConfig, + TracingGlobalFeatures: config.TracingGlobalFeatures{}, + ResponseTraceHeader: testConfig.ResponseTraceHeader, + } + + if testConfig.TracingSanitizeUTF8 != nil { + tracingConfig.SanitizeUTF8 = *testConfig.TracingSanitizeUTF8 + } + c := core.TraceConfigFromTelemetry(&config.Telemetry{ ServiceName: "cosmo-router", ResourceAttributes: testConfig.CustomResourceAttributes, - Tracing: config.Tracing{ - Enabled: true, - SamplingRate: 1, - ParentBasedSampler: !testConfig.DisableParentBasedSampler, - OperationContentAttributes: testConfig.OperationContentAttributes, - Exporters: []config.TracingExporter{}, - Propagation: testConfig.PropagationConfig, - TracingGlobalFeatures: config.TracingGlobalFeatures{}, - ResponseTraceHeader: testConfig.ResponseTraceHeader, - }, + Tracing: tracingConfig, }) c.TestMemoryExporter = testConfig.TraceExporter @@ -1487,6 +1495,10 @@ func configureRouter(listenerAddr string, testConfig *Config, routerConfig *node ) } + if testConfig.IPAnonymization != nil { + routerOpts = append(routerOpts, core.WithAnonymization(testConfig.IPAnonymization)) + } + if testConfig.CustomTelemetryAttributes != nil { routerOpts = append(routerOpts, core.WithTelemetryAttributes(testConfig.CustomTelemetryAttributes)) } diff --git a/router/core/router.go b/router/core/router.go index ad4b77cc33..26e31828e5 100644 --- a/router/core/router.go +++ b/router/core/router.go @@ -52,6 +52,7 @@ import ( "github.com/wundergraph/cosmo/router/pkg/otel/otelconfig" "github.com/wundergraph/cosmo/router/pkg/statistics" rtrace "github.com/wundergraph/cosmo/router/pkg/trace" + "github.com/wundergraph/cosmo/router/pkg/trace/attributeprocessor" "github.com/wundergraph/cosmo/router/pkg/watcher" "github.com/wundergraph/graphql-go-tools/v2/pkg/netpoll" ) @@ -813,10 +814,11 @@ func (r *Router) bootstrap(ctx context.Context) error { Logger: r.logger, Config: r.traceConfig, ServiceInstanceID: r.instanceID, - IPAnonymization: &rtrace.IPAnonymizationConfig{ + IPAnonymization: &attributeprocessor.IPAnonymizationConfig{ Enabled: r.ipAnonymization.Enabled, - Method: rtrace.IPAnonymizationMethod(r.ipAnonymization.Method), + Method: attributeprocessor.IPAnonymizationMethod(r.ipAnonymization.Method), }, + SanitizeUTF8: r.traceConfig.SanitizeUTF8, MemoryExporter: r.traceConfig.TestMemoryExporter, }) if err != nil { @@ -2266,6 +2268,10 @@ func TraceConfigFromTelemetry(cfg *config.Telemetry) *rtrace.Config { Propagators: propagators, ResponseTraceHeader: cfg.Tracing.ResponseTraceHeader, OperationContentAttributes: cfg.Tracing.OperationContentAttributes, + SanitizeUTF8: &attributeprocessor.SanitizeUTF8Config{ + Enabled: cfg.Tracing.SanitizeUTF8.Enabled, + LogSanitizations: cfg.Tracing.SanitizeUTF8.LogSanitizations, + }, } } diff --git a/router/pkg/config/config.go b/router/pkg/config/config.go index 8eb71bc5f1..81e19b4935 100644 --- a/router/pkg/config/config.go +++ b/router/pkg/config/config.go @@ -70,6 +70,11 @@ type ResponseTraceHeader struct { HeaderName string `yaml:"header_name" envDefault:"x-wg-trace-id"` } +type SanitizeUTF8Config struct { + Enabled bool `yaml:"enabled" envDefault:"false" env:"TRACING_SANITIZE_UTF8_ENABLED"` + LogSanitizations bool `yaml:"log_sanitizations" envDefault:"false" env:"TRACING_SANITIZE_UTF8_LOG_SANITIZATIONS"` +} + type Tracing struct { Enabled bool `yaml:"enabled" envDefault:"true" env:"TRACING_ENABLED"` SamplingRate float64 `yaml:"sampling_rate" envDefault:"1" env:"TRACING_SAMPLING_RATE"` @@ -82,6 +87,9 @@ type Tracing struct { OperationContentAttributes bool `yaml:"operation_content_attributes" envDefault:"false" env:"TRACING_OPERATION_CONTENT_ATTRIBUTES"` TracingGlobalFeatures `yaml:",inline"` + + // SanitizeUTF8 configures sanitization of invalid UTF-8 sequences in span attribute values + SanitizeUTF8 SanitizeUTF8Config `yaml:"sanitize_utf8"` } type PropagationConfig struct { diff --git a/router/pkg/config/config.schema.json b/router/pkg/config/config.schema.json index a531fa4af3..e453600e10 100644 --- a/router/pkg/config/config.schema.json +++ b/router/pkg/config/config.schema.json @@ -999,6 +999,23 @@ } } } + }, + "sanitize_utf8": { + "type": "object", + "description": "Configuration for sanitizing invalid UTF-8 sequences in span attribute values.", + "additionalProperties": false, + "properties": { + "enabled": { + "type": "boolean", + "default": false, + "description": "Enable the sanitization of invalid UTF-8 sequences in span attribute values. Invalid sequences are replaced with the Unicode replacement character (U+FFFD)." + }, + "log_sanitizations": { + "type": "boolean", + "default": false, + "description": "Log a warning when invalid UTF-8 sequences are sanitized. The log includes the attribute key and original value." + } + } } } }, diff --git a/router/pkg/trace/attributeprocessor/attributes.go b/router/pkg/trace/attributeprocessor/attributes.go new file mode 100644 index 0000000000..50951b2ce4 --- /dev/null +++ b/router/pkg/trace/attributeprocessor/attributes.go @@ -0,0 +1,57 @@ +package attributeprocessor + +import ( + "context" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/trace" +) + +// AttributeTransformer processes a single attribute and returns the modified value. +// Returns (newValue, handled) where handled=true means the attribute was processed +// and subsequent transformers should be skipped for this attribute. +type AttributeTransformer func(kv attribute.KeyValue) (attribute.Value, bool) + +// AttributeProcessor is an OpenTelemetry SpanProcessor that applies +// a chain of transformers to span attributes. +type AttributeProcessor struct { + transformers []AttributeTransformer +} + +// NewAttributeProcessorOption returns an OpenTelemetry SDK TracerProviderOption +// that registers the AttributeProcessor as a SpanProcessor. +func NewAttributeProcessorOption(transformers ...AttributeTransformer) trace.TracerProviderOption { + return trace.WithSpanProcessor(NewAttributeProcessor(transformers...)) +} + +// NewAttributeProcessor creates a new AttributeProcessor with the given transformers. +// Transformers are applied in order until one returns handled=true. +func NewAttributeProcessor(transformers ...AttributeTransformer) AttributeProcessor { + return AttributeProcessor{transformers: transformers} +} + +// OnStart does nothing. +func (c AttributeProcessor) OnStart(_ context.Context, _ trace.ReadWriteSpan) { +} + +// OnEnd applies all transformers to the attributes of the span. +func (c AttributeProcessor) OnEnd(s trace.ReadOnlySpan) { + // We can't change the attribute slice of the span snapshot in OnEnd, but + // we can change the attribute value in the underlying array. + attributes := s.Attributes() + for i := range attributes { + for _, transform := range c.transformers { + if newVal, replace := transform(attributes[i]); replace { + attributes[i].Value = newVal + // Right now we do not process the same attribute via two spans + break + } + } + } +} + +// Shutdown does nothing. +func (AttributeProcessor) Shutdown(context.Context) error { return nil } + +// ForceFlush does nothing. +func (AttributeProcessor) ForceFlush(context.Context) error { return nil } diff --git a/router/pkg/trace/attributeprocessor/attributes_test.go b/router/pkg/trace/attributeprocessor/attributes_test.go new file mode 100644 index 0000000000..46ddec163c --- /dev/null +++ b/router/pkg/trace/attributeprocessor/attributes_test.go @@ -0,0 +1,242 @@ +package attributeprocessor + +import ( + "context" + "strconv" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/trace" + api "go.opentelemetry.io/otel/trace" +) + +// attrRecorder is a test helper that records span attributes +type attrRecorder struct { + attrs []attribute.KeyValue +} + +func (r *attrRecorder) OnEnd(s trace.ReadOnlySpan) { + r.attrs = s.Attributes() +} +func (*attrRecorder) Shutdown(context.Context) error { return nil } +func (*attrRecorder) ForceFlush(context.Context) error { return nil } +func (*attrRecorder) OnStart(_ context.Context, _ trace.ReadWriteSpan) {} + +// testAttributes creates a span with the given attributes at creation time and returns the recorded attributes +func testAttributes(opt trace.TracerProviderOption, attrs ...attribute.KeyValue) []attribute.KeyValue { + r := &attrRecorder{} + tp := trace.NewTracerProvider(opt, trace.WithSpanProcessor(r)) + defer func() { _ = tp.Shutdown(context.Background()) }() + + ctx := context.Background() + tracer := tp.Tracer("testAttributes") + _, s := tracer.Start(ctx, "span name", api.WithAttributes(attrs...)) + s.End() + return r.attrs +} + +// testAttributesAfterCreation creates a span and sets attributes after creation, then returns the recorded attributes +func testAttributesAfterCreation(opt trace.TracerProviderOption, attrs ...attribute.KeyValue) []attribute.KeyValue { + r := &attrRecorder{} + tp := trace.NewTracerProvider(opt, trace.WithSpanProcessor(r)) + defer func() { _ = tp.Shutdown(context.Background()) }() + + ctx := context.Background() + tracer := tp.Tracer("testAttributes") + _, s := tracer.Start(ctx, "span name") + s.SetAttributes(attrs...) + s.End() + return r.attrs +} + +func TestAttributeProcessor(t *testing.T) { + t.Parallel() + + t.Run("NoTransformers", func(t *testing.T) { + t.Parallel() + + // With no transformers, attributes should remain unchanged + name := attribute.String("name", "bob") + count := attribute.Int("count", 42) + + attributes := testAttributes(NewAttributeProcessorOption(), name, count) + require.Contains(t, attributes, name) + require.Contains(t, attributes, count) + }) + + t.Run("EmptyAttributes", func(t *testing.T) { + t.Parallel() + + // With no attributes, nothing should happen + attributes := testAttributes(NewAttributeProcessorOption(RedactKeys([]attribute.Key{"secret"}, Redact))) + require.Empty(t, attributes) + }) + + t.Run("FirstTransformerHandlesAttribute", func(t *testing.T) { + t.Parallel() + + // When first transformer handles an attribute, second transformer should be skipped for that attribute + secretKey := attribute.Key("secret") + otherKey := attribute.Key("other") + secret := attribute.String(string(secretKey), "value") + other := attribute.String(string(otherKey), "other-value") + + // Track which keys the second transformer sees + seenKeys := make(map[attribute.Key]bool) + trackingTransformer := func(kv attribute.KeyValue) (attribute.Value, bool) { + seenKeys[kv.Key] = true + return kv.Value, false + } + + // RedactKeys should handle "secret", so trackingTransformer should NOT see "secret" + // but SHOULD see "other" + attributes := testAttributes( + NewAttributeProcessorOption(RedactKeys([]attribute.Key{secretKey}, Redact), trackingTransformer), + secret, other, + ) + + // secret should be redacted by first transformer + require.Contains(t, attributes, attribute.String(string(secretKey), "[REDACTED]")) + // other should be unchanged + require.Contains(t, attributes, other) + // tracking transformer should NOT have seen "secret" (it was handled by redact) + require.False(t, seenKeys[secretKey], "second transformer should NOT see 'secret' key (handled by first)") + // tracking transformer SHOULD have seen "other" + require.True(t, seenKeys[otherKey], "second transformer should see 'other' key") + }) +} + +func TestMultipleTransformers(t *testing.T) { + t.Parallel() + + t.Run("TransformersAppliedInOrder", func(t *testing.T) { + t.Parallel() + + // First transformer handles "secret" key + // Second transformer handles all strings (SanitizeUTF8) + secretKey := attribute.Key("secret") + otherKey := attribute.Key("other") + + secret := attribute.String(string(secretKey), "value") + invalidUTF8 := attribute.String(string(otherKey), string([]byte{0x80})) + + attributes := testAttributes( + NewAttributeProcessorOption(RedactKeys([]attribute.Key{secretKey}, Redact), SanitizeUTF8(&SanitizeUTF8Config{Enabled: true}, nil)), + secret, invalidUTF8, + ) + + // secret should be redacted + require.Contains(t, attributes, attribute.String(string(secretKey), "[REDACTED]")) + // other should have UTF-8 sanitized + require.Contains(t, attributes, attribute.String(string(otherKey), "\ufffd")) + }) + + t.Run("RedactedAttributeNotSanitized", func(t *testing.T) { + t.Parallel() + + // When an attribute is redacted, it should not be passed to sanitize + // (the redacted value is already valid UTF-8) + key := attribute.Key("password") + invalidUTF8Password := attribute.String(string(key), string([]byte{'s', 'e', 'c', 'r', 'e', 't', 0x80})) + + attributes := testAttributes( + NewAttributeProcessorOption(RedactKeys([]attribute.Key{key}, Redact), SanitizeUTF8(&SanitizeUTF8Config{Enabled: true}, nil)), + invalidUTF8Password, + ) + + // password should be redacted (not sanitized) + require.Contains(t, attributes, attribute.String(string(key), "[REDACTED]")) + }) + + t.Run("MixedAttributeTypes", func(t *testing.T) { + t.Parallel() + + // Test with mixed attribute types - only strings should be affected + secretKey := attribute.Key("secret") + secret := attribute.String(string(secretKey), "value") + count := attribute.Int("count", 42) + flag := attribute.Bool("flag", true) + invalidUTF8 := attribute.String("message", string([]byte{0x80})) + + attributes := testAttributes( + NewAttributeProcessorOption(RedactKeys([]attribute.Key{secretKey}, Redact), SanitizeUTF8(&SanitizeUTF8Config{Enabled: true}, nil)), + secret, count, flag, invalidUTF8, + ) + + require.Contains(t, attributes, attribute.String(string(secretKey), "[REDACTED]")) + require.Contains(t, attributes, count) + require.Contains(t, attributes, flag) + require.Contains(t, attributes, attribute.String("message", "\ufffd")) + }) +} + +// benchSpan is a minimal span implementation for benchmarks +type benchSpan struct { + trace.ReadWriteSpan + attrs []attribute.KeyValue +} + +func (benchSpan) SetAttributes(...attribute.KeyValue) {} +func (s benchSpan) Attributes() []attribute.KeyValue { + return s.attrs +} + +func BenchmarkCombinedTransformers(b *testing.B) { + b.Run("Redact+SanitizeUTF8/0_redacted/16_total", benchCombinedTransformers(0, 16, 0)) + b.Run("Redact+SanitizeUTF8/4_redacted/16_total", benchCombinedTransformers(4, 16, 0)) + b.Run("Redact+SanitizeUTF8/0_redacted/4_invalid/16_total", benchCombinedTransformers(0, 16, 4)) + b.Run("Redact+SanitizeUTF8/4_redacted/4_invalid/16_total", benchCombinedTransformers(4, 16, 4)) + b.Run("Redact+SanitizeUTF8/8_redacted/8_invalid/16_total", benchCombinedTransformers(8, 16, 8)) +} + +func benchCombinedTransformers(redacted, total, invalidUTF8 int) func(*testing.B) { + if redacted > total { + panic("redacted needs to be less than or equal to total") + } + if invalidUTF8 > total-redacted { + panic("invalidUTF8 needs to be less than or equal to total-redacted") + } + + keys := make([]attribute.Key, 0, redacted) + attrs := make([]attribute.KeyValue, total) + + for i := range attrs { + key := attribute.Key(strconv.Itoa(i)) + switch { + case i < redacted: + keys = append(keys, key) + attrs[i] = attribute.KeyValue{ + Key: key, + Value: attribute.StringValue("secret-value"), + } + case i < redacted+invalidUTF8: + // Create invalid UTF-8 string + attrs[i] = attribute.KeyValue{ + Key: key, + Value: attribute.StringValue(string([]byte{0x80, 0x81})), + } + default: + attrs[i] = attribute.KeyValue{ + Key: key, + Value: attribute.StringValue("valid-string"), + } + } + } + + s := benchSpan{attrs: attrs} + ac := NewAttributeProcessor( + RedactKeys(keys, Redact), + SanitizeUTF8(&SanitizeUTF8Config{Enabled: true}, nil), + ) + ctx := context.Background() + + return func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + ac.OnStart(ctx, s) + ac.OnEnd(s) + } + } +} diff --git a/router/pkg/trace/attributeprocessor/redact.go b/router/pkg/trace/attributeprocessor/redact.go new file mode 100644 index 0000000000..113a197691 --- /dev/null +++ b/router/pkg/trace/attributeprocessor/redact.go @@ -0,0 +1,53 @@ +package attributeprocessor + +import ( + "crypto/sha256" + "encoding/hex" + + "go.opentelemetry.io/otel/attribute" +) + +type ( + IPAnonymizationMethod string + + IPAnonymizationConfig struct { + Enabled bool + Method IPAnonymizationMethod + } +) + +const ( + Hash IPAnonymizationMethod = "hash" + Redact IPAnonymizationMethod = "redact" +) + +// RedactKeys returns a transformer that redacts attributes matching the given keys. +// The redactFunc is called with the original attribute to produce the replacement value. +func RedactKeys(keys []attribute.Key, ipAnonymizationMethod IPAnonymizationMethod) AttributeTransformer { + var rFunc func(attribute.KeyValue) string + + switch ipAnonymizationMethod { + case Hash: + rFunc = func(key attribute.KeyValue) string { + h := sha256.New() + h.Write([]byte(key.Value.AsString())) + return hex.EncodeToString(h.Sum(nil)) + } + case Redact: + rFunc = func(_ attribute.KeyValue) string { + return "[REDACTED]" + } + } + + keySet := make(map[attribute.Key]struct{}, len(keys)) + for _, k := range keys { + keySet[k] = struct{}{} + } + + return func(kv attribute.KeyValue) (attribute.Value, bool) { + if _, ok := keySet[kv.Key]; ok { + return attribute.StringValue(rFunc(kv)), true + } + return kv.Value, false + } +} diff --git a/router/pkg/trace/attributeprocessor/redact_test.go b/router/pkg/trace/attributeprocessor/redact_test.go new file mode 100644 index 0000000000..15b76565a9 --- /dev/null +++ b/router/pkg/trace/attributeprocessor/redact_test.go @@ -0,0 +1,243 @@ +package attributeprocessor + +import ( + "context" + "strconv" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/trace" +) + +func TestRedactKeys(t *testing.T) { + t.Parallel() + + const key = "password" + var ( + name = attribute.String("name", "bob") + eID = attribute.Int("employee-id", 9287) + passStr = attribute.String(key, "super-secret-pswd") + passBool = attribute.Bool(key, true) + replaced = attribute.String(key, "[REDACTED]") + ) + + t.Run("Empty", func(t *testing.T) { + t.Parallel() + + // No transformers means no changes + attributes := testAttributes(NewAttributeProcessorOption(), name, passStr, eID) + require.Contains(t, attributes, name) + require.Contains(t, attributes, eID) + require.Contains(t, attributes, passStr) + }) + t.Run("EmptyAfterCreation", func(t *testing.T) { + t.Parallel() + + attributes := testAttributesAfterCreation(NewAttributeProcessorOption(), name, passStr, eID) + require.Contains(t, attributes, name) + require.Contains(t, attributes, eID) + require.Contains(t, attributes, passStr) + }) + + t.Run("SingleStringAttribute", func(t *testing.T) { + t.Parallel() + + attributes := testAttributes(NewAttributeProcessorOption(RedactKeys([]attribute.Key{key}, Redact)), name, passStr, eID) + require.Contains(t, attributes, name) + require.Contains(t, attributes, eID) + require.Contains(t, attributes, replaced) + }) + t.Run("SingleStringAttributeAfterCreation", func(t *testing.T) { + t.Parallel() + + attributes := testAttributesAfterCreation(NewAttributeProcessorOption(RedactKeys([]attribute.Key{key}, Redact)), name, passStr, eID) + require.Contains(t, attributes, name) + require.Contains(t, attributes, eID) + require.Contains(t, attributes, replaced) + }) + + t.Run("NoMatchingKey", func(t *testing.T) { + t.Parallel() + + attributes := testAttributes(NewAttributeProcessorOption(RedactKeys([]attribute.Key{"secret"}, Redact)), name, passStr, eID) + require.Contains(t, attributes, name) + require.Contains(t, attributes, eID) + require.Contains(t, attributes, passStr) + }) + t.Run("NoMatchingKeyAfterCreation", func(t *testing.T) { + t.Parallel() + + attributes := testAttributesAfterCreation(NewAttributeProcessorOption(RedactKeys([]attribute.Key{"secret"}, Redact)), name, passStr, eID) + require.Contains(t, attributes, name) + require.Contains(t, attributes, eID) + require.Contains(t, attributes, passStr) + }) + + t.Run("DifferentValueTypes", func(t *testing.T) { + t.Parallel() + + attributes := testAttributes(NewAttributeProcessorOption(RedactKeys([]attribute.Key{key}, Redact)), name, passBool, eID) + require.Contains(t, attributes, name) + require.Contains(t, attributes, eID) + require.Contains(t, attributes, replaced) + }) + t.Run("DifferentValueTypesAfterCreation", func(t *testing.T) { + t.Parallel() + + attributes := testAttributesAfterCreation(NewAttributeProcessorOption(RedactKeys([]attribute.Key{key}, Redact)), name, passBool, eID) + require.Contains(t, attributes, name) + require.Contains(t, attributes, eID) + require.Contains(t, attributes, replaced) + }) + + t.Run("MultipleKeys", func(t *testing.T) { + t.Parallel() + + secret := attribute.String("secret", "my-secret") + apiKey := attribute.String("api_key", "my-api-key") + normal := attribute.String("normal", "normal-value") + + attributes := testAttributes( + NewAttributeProcessorOption(RedactKeys([]attribute.Key{"secret", "api_key"}, Redact)), + secret, apiKey, normal, + ) + require.Contains(t, attributes, attribute.String("secret", "[REDACTED]")) + require.Contains(t, attributes, attribute.String("api_key", "[REDACTED]")) + require.Contains(t, attributes, normal) + }) +} + +func TestRedactKeysWithHash(t *testing.T) { + t.Parallel() + + const key = "password" + passStr := attribute.String(key, "super-secret-pswd") + + t.Run("HashMethod", func(t *testing.T) { + t.Parallel() + + attributes := testAttributes(NewAttributeProcessorOption(RedactKeys([]attribute.Key{key}, Hash)), passStr) + + // Find the password attribute + var hashedValue string + for _, attr := range attributes { + if attr.Key == key { + hashedValue = attr.Value.AsString() + break + } + } + + // Hash should be a 64-character hex string (SHA256) + require.Len(t, hashedValue, 64, "Hash should be 64 characters (SHA256 hex)") + require.NotEqual(t, "super-secret-pswd", hashedValue, "Value should be hashed") + require.NotEqual(t, "[REDACTED]", hashedValue, "Value should be hashed, not redacted") + }) + + t.Run("HashIsDeterministic", func(t *testing.T) { + t.Parallel() + + // Same value should produce same hash + attributes1 := testAttributes(NewAttributeProcessorOption(RedactKeys([]attribute.Key{key}, Hash)), passStr) + attributes2 := testAttributes(NewAttributeProcessorOption(RedactKeys([]attribute.Key{key}, Hash)), passStr) + + var hash1, hash2 string + for _, attr := range attributes1 { + if attr.Key == key { + hash1 = attr.Value.AsString() + break + } + } + for _, attr := range attributes2 { + if attr.Key == key { + hash2 = attr.Value.AsString() + break + } + } + + require.Equal(t, "84ac464cfb16339f20b38c5dbd2623514badf48f525c165ebd39091a7969a86c", hash1) + require.Equal(t, hash1, hash2) + }) + + t.Run("DifferentValuesProduceDifferentHashes", func(t *testing.T) { + t.Parallel() + + pass1 := attribute.String(key, "password1") + pass2 := attribute.String(key, "password2") + + attributes1 := testAttributes(NewAttributeProcessorOption(RedactKeys([]attribute.Key{key}, Hash)), pass1) + attributes2 := testAttributes(NewAttributeProcessorOption(RedactKeys([]attribute.Key{key}, Hash)), pass2) + + var hash1, hash2 string + for _, attr := range attributes1 { + if attr.Key == key { + hash1 = attr.Value.AsString() + break + } + } + for _, attr := range attributes2 { + if attr.Key == key { + hash2 = attr.Value.AsString() + break + } + } + + require.NotEqual(t, hash1, hash2, "Different inputs should produce different hashes") + }) +} + +func BenchmarkRedactOnEnd(b *testing.B) { + b.Run("Redact/0/16", benchRedactOnEnd(0, 16, Redact)) + b.Run("Redact/1/16", benchRedactOnEnd(1, 16, Redact)) + b.Run("Redact/4/16", benchRedactOnEnd(4, 16, Redact)) + b.Run("Redact/8/16", benchRedactOnEnd(8, 16, Redact)) + b.Run("Redact/16/16", benchRedactOnEnd(16, 16, Redact)) + b.Run("Hash/0/16", benchRedactOnEnd(0, 16, Hash)) + b.Run("Hash/1/16", benchRedactOnEnd(1, 16, Hash)) + b.Run("Hash/4/16", benchRedactOnEnd(4, 16, Hash)) + b.Run("Hash/8/16", benchRedactOnEnd(8, 16, Hash)) + b.Run("Hash/16/16", benchRedactOnEnd(16, 16, Hash)) +} + +type rwSpan struct { + trace.ReadWriteSpan + + attrs []attribute.KeyValue +} + +func (rwSpan) SetAttributes(...attribute.KeyValue) {} +func (s rwSpan) Attributes() []attribute.KeyValue { + return s.attrs +} + +func benchRedactOnEnd(redacted, total int, method IPAnonymizationMethod) func(*testing.B) { + if redacted > total { + panic("redacted needs to be less than or equal to total") + } + + keys := make([]attribute.Key, 0, redacted) + attrs := make([]attribute.KeyValue, total) + for i := range attrs { + key := attribute.Key(strconv.Itoa(i)) + if i < redacted { + keys = append(keys, key) + } + attrs[i] = attribute.KeyValue{ + Key: key, + Value: attribute.StringValue("sensitive-value-" + strconv.Itoa(i)), + } + } + + s := rwSpan{attrs: attrs} + ac := NewAttributeProcessor(RedactKeys(keys, method)) + ctx := context.Background() + + return func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + ac.OnStart(ctx, s) + ac.OnEnd(s) + } + } +} diff --git a/router/pkg/trace/attributeprocessor/sanitizeutf8.go b/router/pkg/trace/attributeprocessor/sanitizeutf8.go new file mode 100644 index 0000000000..62808f5e2f --- /dev/null +++ b/router/pkg/trace/attributeprocessor/sanitizeutf8.go @@ -0,0 +1,41 @@ +package attributeprocessor + +import ( + "strings" + "unicode/utf8" + + "go.opentelemetry.io/otel/attribute" + "go.uber.org/zap" +) + +type SanitizeUTF8Config struct { + Enabled bool + LogSanitizations bool +} + +// SanitizeUTF8 returns a transformer that replaces invalid UTF-8 sequences +// with the Unicode replacement character (U+FFFD). +// If config.LogSanitizations is true and logger is provided, it will log a warning +// for each attribute with invalid UTF-8. +func SanitizeUTF8(config *SanitizeUTF8Config, logger *zap.Logger) AttributeTransformer { + if config.LogSanitizations && logger == nil { + logger = zap.NewNop() + } + + return func(kv attribute.KeyValue) (attribute.Value, bool) { + if kv.Value.Type() != attribute.STRING { + return kv.Value, false + } + strValue := kv.Value.AsString() + if strValue == "" || utf8.ValidString(strValue) { + return kv.Value, false + } + if config.LogSanitizations { + logger.Warn("Invalid UTF-8 in span attribute, sanitizing", + zap.String("key", string(kv.Key)), + zap.String("value", strValue), + ) + } + return attribute.StringValue(strings.ToValidUTF8(strValue, "\ufffd")), true + } +} diff --git a/router/pkg/trace/attributeprocessor/sanitizeutf8_test.go b/router/pkg/trace/attributeprocessor/sanitizeutf8_test.go new file mode 100644 index 0000000000..63fbe68b33 --- /dev/null +++ b/router/pkg/trace/attributeprocessor/sanitizeutf8_test.go @@ -0,0 +1,207 @@ +package attributeprocessor + +import ( + "context" + "strconv" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/trace" +) + +func TestSanitizeUTF8(t *testing.T) { + t.Parallel() + + t.Run("ValidUTF8Unchanged", func(t *testing.T) { + t.Parallel() + + validStr := attribute.String("message", "Hello, World!") + attributes := testAttributes(NewAttributeProcessorOption(SanitizeUTF8(&SanitizeUTF8Config{Enabled: true}, nil)), validStr) + require.Contains(t, attributes, validStr) + }) + + t.Run("InvalidUTF8Sanitized", func(t *testing.T) { + t.Parallel() + + // Create an invalid UTF-8 string with a byte sequence that is not valid UTF-8 + // strings.ToValidUTF8 replaces each run of invalid bytes with a single replacement character + invalidBytes := string([]byte{0x80, 0x81, 0x82}) + invalidStr := attribute.String("message", invalidBytes) + expected := attribute.String("message", "\ufffd") + + attributes := testAttributes(NewAttributeProcessorOption(SanitizeUTF8(&SanitizeUTF8Config{Enabled: true}, nil)), invalidStr) + require.Contains(t, attributes, expected) + }) + + t.Run("MixedUTF8Sanitized", func(t *testing.T) { + t.Parallel() + + // Valid UTF-8 followed by invalid bytes + mixedBytes := string([]byte{'H', 'i', 0x80, '!'}) + mixedStr := attribute.String("message", mixedBytes) + expected := attribute.String("message", "Hi\ufffd!") + + attributes := testAttributes(NewAttributeProcessorOption(SanitizeUTF8(&SanitizeUTF8Config{Enabled: true}, nil)), mixedStr) + require.Contains(t, attributes, expected) + }) + + t.Run("NoTransformers", func(t *testing.T) { + t.Parallel() + + invalidBytes := string([]byte{0x80, 0x81, 0x82}) + invalidStr := attribute.String("message", invalidBytes) + + // With no transformers, the invalid string should remain unchanged + attributes := testAttributes(NewAttributeProcessorOption(), invalidStr) + require.Contains(t, attributes, invalidStr) + }) + + t.Run("NonStringAttributesUnchanged", func(t *testing.T) { + t.Parallel() + + intAttr := attribute.Int("count", 42) + boolAttr := attribute.Bool("flag", true) + + attributes := testAttributes(NewAttributeProcessorOption(SanitizeUTF8(&SanitizeUTF8Config{Enabled: true}, nil)), intAttr, boolAttr) + require.Contains(t, attributes, intAttr) + require.Contains(t, attributes, boolAttr) + }) + + t.Run("RedactionTakesPrecedenceOverSanitization", func(t *testing.T) { + t.Parallel() + + const key = "password" + invalidBytes := string([]byte{'s', 'e', 'c', 'r', 'e', 't', 0x80}) + passStr := attribute.String(key, invalidBytes) + expected := attribute.String(key, "[REDACTED]") + + // With both redaction and sanitization, redaction runs first and handles the attribute + attributes := testAttributes(NewAttributeProcessorOption(RedactKeys([]attribute.Key{key}, Redact), SanitizeUTF8(&SanitizeUTF8Config{Enabled: true}, nil)), passStr) + require.Contains(t, attributes, expected) + }) + + t.Run("EmptyStringUnchanged", func(t *testing.T) { + t.Parallel() + + emptyStr := attribute.String("empty", "") + attributes := testAttributes(NewAttributeProcessorOption(SanitizeUTF8(&SanitizeUTF8Config{Enabled: true}, nil)), emptyStr) + require.Contains(t, attributes, emptyStr) + }) + + t.Run("UnicodeCharactersPreserved", func(t *testing.T) { + t.Parallel() + + // Valid UTF-8 with various unicode characters should be preserved + unicodeStr := attribute.String("message", "Hello, 世界! 🌍 Ñoño") + attributes := testAttributes(NewAttributeProcessorOption(SanitizeUTF8(&SanitizeUTF8Config{Enabled: true}, nil)), unicodeStr) + require.Contains(t, attributes, unicodeStr) + }) + + t.Run("MultipleInvalidSequences", func(t *testing.T) { + t.Parallel() + + // Multiple separate invalid sequences should each be replaced + // "Hi" + invalid + "there" + invalid + "!" + mixedBytes := string([]byte{'H', 'i', 0x80, 't', 'h', 'e', 'r', 'e', 0x81, '!'}) + mixedStr := attribute.String("message", mixedBytes) + expected := attribute.String("message", "Hi\ufffdthere\ufffd!") + + attributes := testAttributes(NewAttributeProcessorOption(SanitizeUTF8(&SanitizeUTF8Config{Enabled: true}, nil)), mixedStr) + require.Contains(t, attributes, expected) + }) + + t.Run("StringSliceAttributeUnchanged", func(t *testing.T) { + t.Parallel() + + // String slice attributes should not be modified (only simple strings are processed) + sliceAttr := attribute.StringSlice("tags", []string{"tag1", "tag2"}) + attributes := testAttributes(NewAttributeProcessorOption(SanitizeUTF8(&SanitizeUTF8Config{Enabled: true}, nil)), sliceAttr) + require.Contains(t, attributes, sliceAttr) + }) +} + +// sanitizeSpan is a minimal span implementation for benchmarks +type sanitizeSpan struct { + trace.ReadWriteSpan + attrs []attribute.KeyValue +} + +func (sanitizeSpan) SetAttributes(...attribute.KeyValue) {} +func (s sanitizeSpan) Attributes() []attribute.KeyValue { + return s.attrs +} + +func BenchmarkSanitizeUTF8OnEnd(b *testing.B) { + b.Run("AllValid/16", benchSanitizeUTF8OnEnd(0, 16)) + b.Run("1Invalid/16", benchSanitizeUTF8OnEnd(1, 16)) + b.Run("4Invalid/16", benchSanitizeUTF8OnEnd(4, 16)) + b.Run("8Invalid/16", benchSanitizeUTF8OnEnd(8, 16)) + b.Run("16Invalid/16", benchSanitizeUTF8OnEnd(16, 16)) +} + +func benchSanitizeUTF8OnEnd(invalidCount, total int) func(*testing.B) { + if invalidCount > total { + panic("invalidCount needs to be less than or equal to total") + } + + attrs := make([]attribute.KeyValue, total) + for i := range attrs { + key := attribute.Key(strconv.Itoa(i)) + if i < invalidCount { + // Create invalid UTF-8 string + attrs[i] = attribute.KeyValue{ + Key: key, + Value: attribute.StringValue(string([]byte{0x80, 0x81, 0x82})), + } + } else { + // Create valid UTF-8 string + attrs[i] = attribute.KeyValue{ + Key: key, + Value: attribute.StringValue("valid-utf8-string"), + } + } + } + + s := sanitizeSpan{attrs: attrs} + ac := NewAttributeProcessor(SanitizeUTF8(&SanitizeUTF8Config{Enabled: true}, nil)) + ctx := context.Background() + + return func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + ac.OnStart(ctx, s) + ac.OnEnd(s) + } + } +} + +func BenchmarkSanitizeUTF8MixedTypes(b *testing.B) { + // Benchmark with mixed attribute types (strings, ints, bools) + attrs := make([]attribute.KeyValue, 16) + for i := range attrs { + key := attribute.Key(strconv.Itoa(i)) + switch i % 4 { + case 0: + attrs[i] = attribute.KeyValue{Key: key, Value: attribute.StringValue("valid")} + case 1: + attrs[i] = attribute.KeyValue{Key: key, Value: attribute.StringValue(string([]byte{0x80}))} + case 2: + attrs[i] = attribute.KeyValue{Key: key, Value: attribute.IntValue(i)} + case 3: + attrs[i] = attribute.KeyValue{Key: key, Value: attribute.BoolValue(true)} + } + } + + s := sanitizeSpan{attrs: attrs} + ac := NewAttributeProcessor(SanitizeUTF8(&SanitizeUTF8Config{Enabled: true}, nil)) + ctx := context.Background() + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + ac.OnStart(ctx, s) + ac.OnEnd(s) + } +} diff --git a/router/pkg/trace/config.go b/router/pkg/trace/config.go index 33d59d9e38..d6362ee48b 100644 --- a/router/pkg/trace/config.go +++ b/router/pkg/trace/config.go @@ -6,6 +6,7 @@ import ( "github.com/wundergraph/cosmo/router/pkg/config" "github.com/wundergraph/cosmo/router/pkg/otel/otelconfig" + "github.com/wundergraph/cosmo/router/pkg/trace/attributeprocessor" "go.opentelemetry.io/otel/attribute" sdktrace "go.opentelemetry.io/otel/sdk/trace" ) @@ -71,6 +72,8 @@ type Config struct { TestMemoryExporter sdktrace.SpanExporter ResponseTraceHeader config.ResponseTraceHeader Attributes []config.CustomAttribute + // SanitizeUTF8 configures sanitization of invalid UTF-8 sequences in span attribute values + SanitizeUTF8 *attributeprocessor.SanitizeUTF8Config } func DefaultExporter(cfg *Config) *ExporterConfig { @@ -122,5 +125,9 @@ func DefaultConfig(serviceVersion string) *Config { Enabled: false, HeaderName: "x-wg-trace-id", }, + SanitizeUTF8: &attributeprocessor.SanitizeUTF8Config{ + Enabled: false, + LogSanitizations: false, + }, } } diff --git a/router/pkg/trace/meter.go b/router/pkg/trace/meter.go index edcd71ac82..54fb1b76f7 100644 --- a/router/pkg/trace/meter.go +++ b/router/pkg/trace/meter.go @@ -2,15 +2,12 @@ package trace import ( "context" - "crypto/sha256" - "encoding/hex" "fmt" "net/url" "github.com/wundergraph/cosmo/router/pkg/otel/otelconfig" - "github.com/wundergraph/cosmo/router/pkg/trace/redact" + "github.com/wundergraph/cosmo/router/pkg/trace/attributeprocessor" "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" "go.opentelemetry.io/otel/sdk/resource" @@ -26,28 +23,18 @@ var ( ) type ( - IPAnonymizationMethod string - - IPAnonymizationConfig struct { - Enabled bool - Method IPAnonymizationMethod - } - ProviderConfig struct { Logger *zap.Logger Config *Config ServiceInstanceID string - IPAnonymization *IPAnonymizationConfig + IPAnonymization *attributeprocessor.IPAnonymizationConfig + // SanitizeUTF8 configures sanitization of invalid UTF-8 sequences in span attribute values + SanitizeUTF8 *attributeprocessor.SanitizeUTF8Config // MemoryExporter is used for testing purposes MemoryExporter sdktrace.SpanExporter } ) -const ( - Hash IPAnonymizationMethod = "hash" - Redact IPAnonymizationMethod = "redact" -) - func createExporter(log *zap.Logger, exp *ExporterConfig) (sdktrace.SpanExporter, error) { u, err := url.Parse(exp.Endpoint) if err != nil { @@ -159,25 +146,19 @@ func NewTracerProvider(ctx context.Context, config *ProviderConfig) (*sdktrace.T ) } - if config.IPAnonymization != nil && config.IPAnonymization.Enabled { - - var rFunc redact.RedactFunc + // Build list of attribute transformers based on config + var transformers []attributeprocessor.AttributeTransformer - switch config.IPAnonymization.Method { - case Hash: - rFunc = func(key attribute.KeyValue) string { - h := sha256.New() - h.Write([]byte(key.Value.AsString())) - return hex.EncodeToString(h.Sum(nil)) - } - case Redact: - rFunc = func(key attribute.KeyValue) string { - return "[REDACTED]" - } - - } + // The orders of the transformers indicate the priority. + if config.IPAnonymization != nil && config.IPAnonymization.Enabled { + transformers = append(transformers, attributeprocessor.RedactKeys(SensitiveAttributes, config.IPAnonymization.Method)) + } + if config.SanitizeUTF8 != nil && config.SanitizeUTF8.Enabled { + transformers = append(transformers, attributeprocessor.SanitizeUTF8(config.SanitizeUTF8, config.Logger)) + } - opts = append(opts, redact.Attributes(SensitiveAttributes, rFunc)) + if len(transformers) > 0 { + opts = append(opts, attributeprocessor.NewAttributeProcessorOption(transformers...)) } if config.Config.Enabled { diff --git a/router/pkg/trace/redact/attributes.go b/router/pkg/trace/redact/attributes.go deleted file mode 100644 index b255e3bd22..0000000000 --- a/router/pkg/trace/redact/attributes.go +++ /dev/null @@ -1,67 +0,0 @@ -package redact - -import ( - "context" - - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/trace" -) - -type RedactFunc func(key attribute.KeyValue) string - -// Attributes returns an OpenTelemetry SDK TracerProviderOption. It registers -// an OpenTelemetry SpanProcessor that redacts attributes of new spans matching -// the passed keys. -func Attributes(keys []attribute.Key, redactFunc RedactFunc) trace.TracerProviderOption { - r := make(map[attribute.Key]struct{}, len(keys)) - for _, k := range keys { - r[k] = struct{}{} - } - censor := NewAttributeCensor(r, redactFunc) - return trace.WithSpanProcessor(censor) -} - -// AttributeCensor is an OpenTelemetry SpanProcessor that censors attributes of -// new spans. -type AttributeCensor struct { - // args is a slice allocated on creation that is reused when calling - // SetAttributes in OnStart. - args []attribute.KeyValue - redactFunc RedactFunc - replacements map[attribute.Key]struct{} -} - -// NewAttributeCensor returns an AttributeCensor that uses the provided mapping -// of replacement values for a set of keys to redact matching attributes. -// Attributes are matched based on the equality of keys. -func NewAttributeCensor(replacements map[attribute.Key]struct{}, redactFunc RedactFunc) AttributeCensor { - a := AttributeCensor{ - // Allocate a reusable slice to pass to SetAttributes. - args: make([]attribute.KeyValue, 0, len(replacements)), - redactFunc: redactFunc, - replacements: replacements, - } - return a -} - -// OnStart does nothing. -func (c AttributeCensor) OnStart(_ context.Context, _ trace.ReadWriteSpan) { -} - -// OnEnd censors the attributes of s matching the Replacements keys of c. -func (c AttributeCensor) OnEnd(s trace.ReadOnlySpan) { - // We can't change the attribute slice of the span snapshot in OnEnd, but - // we can change the attribute value in the underlying array. - attributes := s.Attributes() - for i := range attributes { - if _, ok := c.replacements[attributes[i].Key]; ok { - attributes[i].Value = attribute.StringValue(c.redactFunc(attributes[i])) - } - } -} - -// Shutdown does nothing. -func (AttributeCensor) Shutdown(context.Context) error { return nil } - -// ForceFlush does nothing. -func (AttributeCensor) ForceFlush(context.Context) error { return nil } diff --git a/router/pkg/trace/redact/attributes_test.go b/router/pkg/trace/redact/attributes_test.go deleted file mode 100644 index c56b38b25a..0000000000 --- a/router/pkg/trace/redact/attributes_test.go +++ /dev/null @@ -1,162 +0,0 @@ -package redact - -import ( - "context" - "strconv" - "testing" - - "github.com/stretchr/testify/assert" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/trace" - api "go.opentelemetry.io/otel/trace" -) - -type attrRecorder struct { - attrs []attribute.KeyValue -} - -func (r *attrRecorder) OnEnd(s trace.ReadOnlySpan) { - r.attrs = s.Attributes() -} -func (*attrRecorder) Shutdown(context.Context) error { return nil } -func (*attrRecorder) ForceFlush(context.Context) error { return nil } -func (*attrRecorder) OnStart(_ context.Context, _ trace.ReadWriteSpan) {} - -func TestAttributes(t *testing.T) { - const key = "password" - var ( - name = attribute.String("name", "bob") - eID = attribute.Int("employee-id", 9287) - passStr = attribute.String(key, "super-secret-pswd") - passBool = attribute.Bool(key, true) - replaced = attribute.String(key, "REDACTED") - ) - - contains := func(t *testing.T, got []attribute.KeyValue, want ...attribute.KeyValue) { - t.Helper() - for _, w := range want { - assert.Contains(t, got, w) - } - } - - rf := func(kv attribute.KeyValue) string { - return "REDACTED" - } - - t.Run("Empty", func(t *testing.T) { - got := testAttributes(Attributes(nil, rf), name, passStr, eID) - contains(t, got, name, eID, passStr) - }) - t.Run("EmptyAfterCreation", func(t *testing.T) { - got := testAttributesAfterCreation(Attributes(nil, rf), name, passStr, eID) - contains(t, got, name, eID, passStr) - }) - - t.Run("SingleStringAttribute", func(t *testing.T) { - got := testAttributes(Attributes([]attribute.Key{key}, rf), name, passStr, eID) - contains(t, got, name, eID, replaced) - }) - t.Run("SingleStringAttributeAfterCreation", func(t *testing.T) { - got := testAttributesAfterCreation(Attributes([]attribute.Key{key}, rf), name, passStr, eID) - contains(t, got, name, eID, replaced) - }) - - t.Run("NoMatchingKey", func(t *testing.T) { - got := testAttributes(Attributes([]attribute.Key{"secret"}, rf), name, passStr, eID) - contains(t, got, name, eID, passStr) - }) - t.Run("NoMatchingKeyAfterCreation", func(t *testing.T) { - got := testAttributesAfterCreation(Attributes([]attribute.Key{"secret"}, rf), name, passStr, eID) - contains(t, got, name, eID, passStr) - }) - - t.Run("DifferentValueTypes", func(t *testing.T) { - got := testAttributes(Attributes([]attribute.Key{key}, rf), name, passBool, eID) - contains(t, got, name, eID, replaced) - }) - t.Run("DifferentValueTypesAfterCreation", func(t *testing.T) { - got := testAttributesAfterCreation(Attributes([]attribute.Key{key}, rf), name, passBool, eID) - contains(t, got, name, eID, replaced) - }) -} - -func testAttributes(opt trace.TracerProviderOption, attrs ...attribute.KeyValue) []attribute.KeyValue { - r := &attrRecorder{} - tp := trace.NewTracerProvider(opt, trace.WithSpanProcessor(r)) - defer func() { _ = tp.Shutdown(context.Background()) }() - - ctx := context.Background() - tracer := tp.Tracer("testAttributes") - _, s := tracer.Start(ctx, "span name", api.WithAttributes(attrs...)) - s.End() - return r.attrs -} - -func testAttributesAfterCreation(opt trace.TracerProviderOption, attrs ...attribute.KeyValue) []attribute.KeyValue { - r := &attrRecorder{} - tp := trace.NewTracerProvider(opt, trace.WithSpanProcessor(r)) - defer func() { _ = tp.Shutdown(context.Background()) }() - - ctx := context.Background() - tracer := tp.Tracer("testAttributes") - _, s := tracer.Start(ctx, "span name") - s.SetAttributes(attrs...) - s.End() - return r.attrs -} - -func BenchmarkAttributeCensorOnStart(b *testing.B) { - b.Run("0/16", benchAttributeCensorOnStart(0, 16)) - b.Run("1/16", benchAttributeCensorOnStart(1, 16)) - b.Run("2/16", benchAttributeCensorOnStart(2, 16)) - b.Run("4/16", benchAttributeCensorOnStart(4, 16)) - b.Run("8/16", benchAttributeCensorOnStart(8, 16)) - b.Run("16/16", benchAttributeCensorOnStart(16, 16)) -} - -type rwSpan struct { - trace.ReadWriteSpan - - attrs []attribute.KeyValue -} - -func (rwSpan) SetAttributes(...attribute.KeyValue) {} -func (s rwSpan) Attributes() []attribute.KeyValue { - return s.attrs -} - -func benchAttributeCensorOnStart(redacted, total int) func(*testing.B) { - if redacted > total { - panic("redacted needs to be less than or equal to total") - } - - rf := func(kv attribute.KeyValue) string { - return "REDACTED" - } - - replacements := make(map[attribute.Key]struct{}) - attrs := make([]attribute.KeyValue, total) - for i := range attrs { - key := attribute.Key(strconv.Itoa(i)) - if i < redacted { - replacements[key] = struct{}{} - } - attrs[i] = attribute.KeyValue{ - Key: key, - Value: attribute.IntValue(i), - } - } - - s := rwSpan{attrs: attrs} - ac := NewAttributeCensor(replacements, rf) - ctx := context.Background() - - return func(b *testing.B) { - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - ac.OnStart(ctx, s) - ac.OnEnd(s) - } - } -}