diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/LoopSampler.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/LoopSampler.java new file mode 100644 index 00000000000..d7a044d2a4f --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/LoopSampler.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.tracing; + +import java.util.concurrent.ThreadLocalRandom; + +/** + * Probability-based span sampler that samples spans independently. + * Uses ThreadLocalRandom for probability decisions. + */ +public final class LoopSampler { + private final double probability; + + public LoopSampler(double ratio) { + if (ratio < 0) { + throw new IllegalArgumentException("Sampling ratio cannot be negative: " + ratio); + } + // Cap at 1.0 to prevent logic errors + this.probability = Math.min(ratio, 1.0); + } + + public boolean shouldSample() { + if (probability <= 0) { + return false; + } + if (probability >= 1.0) { + return true; + } + return ThreadLocalRandom.current().nextDouble() < probability; + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/SpanSampler.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/SpanSampler.java new file mode 100644 index 00000000000..317573a5e19 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/SpanSampler.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.tracing; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.opentelemetry.sdk.trace.samplers.SamplingResult; +import java.util.List; +import java.util.Map; + +/** + * Custom Sampler that applies span-level sampling for configured + * span names, and delegates to parent-based strategy otherwise. + * When a span name is in the configured spanMap, uses LoopSampler for + * probabilistic sampling, otherwise follows the parent span's + * sampling decision. + */ +public final class SpanSampler implements Sampler { + + private final Sampler rootSampler; + private final Map spanMap; + + public SpanSampler(Sampler rootSampler, + Map spanMap) { + this.rootSampler = rootSampler; + this.spanMap = spanMap; + } + + @Override + public SamplingResult shouldSample( + Context parentContext, + String traceId, + String spanName, + SpanKind spanKind, + Attributes attributes, + List parentLinks) { + + Span parentSpan = Span.fromContext(parentContext); + + // check if we have a valid parent span + if (!parentSpan.getSpanContext().isValid()) { + // Root span: always delegate to trace-level sampler + // This ensures OTEL_TRACES_SAMPLER_ARG is respected + return rootSampler.shouldSample(parentContext, traceId, spanName, + spanKind, attributes, parentLinks); + } + + // Child span: check parent's sampling status first + // after the process of sampling trace / parent span then check if it is sampled or not. + if (!parentSpan.getSpanContext().isSampled()) { + // Parent was not sampled, so this child should not be sampled either + // This prevents orphaned spans + return SamplingResult.drop(); + } + + // Parent was sampled, now check if this span has explicit sampling config + LoopSampler loopSampler = spanMap.get(spanName); + if (loopSampler != null) { + boolean sample = loopSampler.shouldSample(); + return sample ? SamplingResult.recordAndSample() : SamplingResult.drop(); + } + + // No explicit config for this span, follow parent's sampling decision + return SamplingResult.recordAndSample(); + } + + @Override + public String getDescription() { + return "SpanSampler(spanMap=" + spanMap.keySet() + ")"; + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java index 8d6e0fd240f..6ef604bc2ad 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java @@ -33,6 +33,7 @@ import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; import io.opentelemetry.sdk.trace.samplers.Sampler; import java.lang.reflect.Proxy; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.hdds.conf.ConfigurationSource; @@ -52,6 +53,8 @@ public final class TracingUtil { private static final String OTEL_EXPORTER_OTLP_ENDPOINT_DEFAULT = "http://localhost:4317"; private static final String OTEL_TRACES_SAMPLER_ARG = "OTEL_TRACES_SAMPLER_ARG"; private static final double OTEL_TRACES_SAMPLER_RATIO_DEFAULT = 1.0; + private static final String OTEL_SPAN_SAMPLING_ARG = "OTEL_SPAN_SAMPLING_ARG"; + private static final String OTEL_TRACES_SAMPLER_CONFIG_DEFAULT = ""; private static volatile boolean isInit = false; private static Tracer tracer = OpenTelemetry.noop().getTracer("noop"); @@ -88,21 +91,50 @@ private static void initialize(String serviceName) { String sampleStrRatio = System.getenv(OTEL_TRACES_SAMPLER_ARG); if (sampleStrRatio != null && !sampleStrRatio.isEmpty()) { samplerRatio = Double.parseDouble(System.getenv(OTEL_TRACES_SAMPLER_ARG)); + LOG.info("Sampling Trace Config = '{}'", samplerRatio); } } catch (NumberFormatException ex) { - // ignore and use the default value. + // log and use the default value. + LOG.warn("Invalid value for {}: '{}'. Falling back to default: {}", + OTEL_TRACES_SAMPLER_ARG, System.getenv(OTEL_TRACES_SAMPLER_ARG), OTEL_TRACES_SAMPLER_RATIO_DEFAULT, ex); } + String spanSamplingConfig = OTEL_TRACES_SAMPLER_CONFIG_DEFAULT; + try { + String spanStrConfig = System.getenv(OTEL_SPAN_SAMPLING_ARG); + if (spanStrConfig != null && !spanStrConfig.isEmpty()) { + spanSamplingConfig = spanStrConfig; + } + LOG.info("Sampling Span Config = '{}'", spanSamplingConfig); + } catch (Exception ex) { + // Log and use the default value. + LOG.warn("Failed to process {}. Falling back to default configuration: {}", + OTEL_SPAN_SAMPLING_ARG, OTEL_TRACES_SAMPLER_CONFIG_DEFAULT, ex); + } + // Pass the config to parseSpanSamplingConfig to get spans to be sampled. + Map spanMap = parseSpanSamplingConfig(spanSamplingConfig); + Resource resource = Resource.create(Attributes.of(AttributeKey.stringKey("service.name"), serviceName)); OtlpGrpcSpanExporter spanExporter = OtlpGrpcSpanExporter.builder() .setEndpoint(otelEndPoint) .build(); SimpleSpanProcessor spanProcessor = SimpleSpanProcessor.builder(spanExporter).build(); + + // Choose sampler based on span sampling config. If it is empty use trace based sampling only. + // else use custom SpanSampler. + Sampler sampler; + if (spanMap.isEmpty()) { + sampler = Sampler.traceIdRatioBased(samplerRatio); + } else { + Sampler rootSampler = Sampler.traceIdRatioBased(samplerRatio); + sampler = new SpanSampler(rootSampler, spanMap); + } + SdkTracerProvider tracerProvider = SdkTracerProvider.builder() .addSpanProcessor(spanProcessor) .setResource(resource) - .setSampler(Sampler.traceIdRatioBased(samplerRatio)) + .setSampler(sampler) .build(); OpenTelemetry openTelemetry = OpenTelemetrySdk.builder() .setTracerProvider(tracerProvider) @@ -177,6 +209,43 @@ public static boolean isTracingEnabled( ScmConfigKeys.HDDS_TRACING_ENABLED_DEFAULT); } + /** Function to parse span sampling config. The input is in the form :. + * The sample rate must be a number between 0 and 1. Any value other than that will LOG an error. + * */ + private static Map parseSpanSamplingConfig(String configStr) { + Map result = new HashMap<>(); + if (configStr == null || configStr.isEmpty()) { + return Collections.emptyMap(); + } + + for (String entry : configStr.split(",")) { + String trimmed = entry.trim(); + int colon = trimmed.indexOf(':'); + + if (colon <= 0 || colon >= trimmed.length() - 1) { + continue; + } + + String name = trimmed.substring(0, colon).trim(); + String val = trimmed.substring(colon + 1).trim(); + + try { + double rate = Double.parseDouble(val); + //if the rate is less than or equal to zero , no sampling config is taken for that key value pair. + if (rate > 0) { + // cap it at 1.0 when a number greater than 1 is entered + double effectiveRate = Math.min(rate, 1.0); + result.put(name, new LoopSampler(effectiveRate)); + } else { + LOG.warn("rate for span '{}' is 0 or less, ignoring sample configuration", name); + } + } catch (NumberFormatException e) { + LOG.error("Invalid rate '{}' for span '{}', ignoring sample configuration", val, name); + } + } + return result; + } + /** * Execute {@code runnable} inside an activated new span. * If a parent span exists in the current context, this becomes a child span. diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/tracing/TestSpanSampling.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/tracing/TestSpanSampling.java new file mode 100644 index 00000000000..fbbc2b0b5b6 --- /dev/null +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/tracing/TestSpanSampling.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.tracing; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.opentelemetry.sdk.trace.samplers.SamplingDecision; +import io.opentelemetry.sdk.trace.samplers.SamplingResult; +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.Test; + +/** + * Test cases for span sampling functionality. + */ +public class TestSpanSampling { + + /** + * Tests that valid configuration strings result in a Map + * containing the correct LoopSampler objects. + */ + @Test + public void testParseSpanSamplingConfigValid() throws Exception { + String config = "createVolume:0.25,createBucket:0.5,createKey:0.75"; + Method method = TracingUtil.class.getDeclaredMethod("parseSpanSamplingConfig", String.class); + method.setAccessible(true); + Map result = (Map) method.invoke(null, config); + + assertThat(result) + .hasSize(3) + .containsKeys("createVolume", "createBucket", "createKey"); + + } + + /** + * Tests that invalid entries (zeros, negative numbers, non-numeric) are caught + * by the try-catch blocks and excluded from the resulting Map. + */ + @Test + public void testParseSpanSamplingConfigInvalid() throws Exception { + String config = "createVolume:0,createBucket:-0.5,createKey:invalid,writeKey:-1"; + Method method = TracingUtil.class.getDeclaredMethod("parseSpanSamplingConfig", String.class); + method.setAccessible(true); + Map result = (Map) method.invoke(null, config); + + assertThat(result).as("The map should be empty as all inputs were invalid").isEmpty(); + } + + /** + * Tests a mixed configuration to ensure valid entries are + * preserved while invalid ones are skipped. + */ + @Test + public void testParseSpanSamplingConfigMixed() throws Exception { + String config = "createVolume:0.75,createBucket:0,createKey:-5"; + + Method method = TracingUtil.class.getDeclaredMethod("parseSpanSamplingConfig", String.class); + method.setAccessible(true); + + Map result = (Map) method.invoke(null, config); + + assertThat(result) + .hasSize(1) + .containsKey("createVolume") + .doesNotContainKeys("createBucket", "createKey"); + } + + /** + * Test to show sampling of span only if trace is sampled. + * Trace is always sampled and span name is not mentioned in config, Hence it will be sampled. + */ + @Test + public void testSpanSamplingWithTraceSampled() { + Map spanMap = new HashMap<>(); + spanMap.put("createKey", new LoopSampler(0.5)); + + Sampler rootSampler = Sampler.alwaysOn(); + SpanSampler customSampler = new SpanSampler(rootSampler, spanMap); + Context parentContext = Context.current(); + SamplingResult result = customSampler.shouldSample(parentContext, "trace1", "unknownSpan", + SpanKind.INTERNAL, Attributes.empty(), Collections.emptyList()); + + // Since no parent and not configured, should use root sampler and sample span. + assertEquals(SamplingDecision.RECORD_AND_SAMPLE, result.getDecision()); + } + + /** + * Test to show dropping of span only if trace is not sample sampled. + * This shows priority given to Trace. + * */ + @Test + public void testSpanSamplingWithTraceNotSampled() { + Map spanMap = new HashMap<>(); + Sampler rootSampler = Sampler.alwaysOff(); + SpanSampler customSampler = new SpanSampler(rootSampler, spanMap); + Context parentContext = Context.current(); + + SamplingResult result = customSampler.shouldSample(parentContext, "trace1", "rootSpan", + SpanKind.INTERNAL, Attributes.empty(), Collections.emptyList()); + + // Root span with alwaysOff should not be sampled. + assertEquals(SamplingDecision.DROP, result.getDecision()); + } + + /** + * Test to show child span is not sampled when parent span is also not sampled. + */ + @Test + public void testChildDropsWhenParentIsNotSampled() { + Map spanMap = new HashMap<>(); + spanMap.put("createKey", new LoopSampler(1.0)); + + SpanSampler customSampler = new SpanSampler(Sampler.alwaysOn(), spanMap); + + io.opentelemetry.api.trace.Span parentSpan = io.opentelemetry.api.trace.Span.wrap( + io.opentelemetry.api.trace.SpanContext.create( + "ff000000000000000000000000000041", + "ff00000000000042", + TraceFlags.getDefault(), + TraceState.getDefault())); + + Context parentContext = Context.root().with(parentSpan); + + SamplingResult result = customSampler.shouldSample(parentContext, "trace1", "createKey", + SpanKind.INTERNAL, Attributes.empty(), Collections.emptyList()); + + assertEquals(SamplingDecision.DROP, result.getDecision()); + } +} diff --git a/hadoop-ozone/freon/pom.xml b/hadoop-ozone/freon/pom.xml index 71a9b1c1165..08bbdcd3eac 100644 --- a/hadoop-ozone/freon/pom.xml +++ b/hadoop-ozone/freon/pom.xml @@ -70,6 +70,10 @@ io.opentelemetry opentelemetry-api + + io.opentelemetry + opentelemetry-context + org.apache.commons commons-lang3 diff --git a/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/Freon.java b/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/Freon.java index b2ed1eb8069..c6e21cf7a95 100644 --- a/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/Freon.java +++ b/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/Freon.java @@ -58,7 +58,8 @@ public int execute(String[] argv) { conf = getOzoneConf(); HddsServerUtil.initializeMetrics(conf, "ozone-freon"); TracingUtil.initTracing("freon", conf); - return super.execute(argv); + String spanName = "ozone freon " + String.join(" ", argv); + return TracingUtil.executeInNewSpan(spanName, () -> super.execute(argv)); } @Override diff --git a/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java b/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java index 87ebf95f0b0..5fe14774159 100644 --- a/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java +++ b/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java @@ -27,6 +27,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; import com.google.common.annotations.VisibleForTesting; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Scope; import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStream; @@ -311,8 +313,8 @@ public Void call() throws Exception { if (validateWrites) { commonInitialMD = DigestUtils.getDigest(DIGEST_ALGORITHM); for (long nrRemaining = keySize.toBytes(); nrRemaining > 0; - nrRemaining -= bufferSize) { - int curSize = (int)Math.min(bufferSize, nrRemaining); + nrRemaining -= bufferSize) { + int curSize = (int) Math.min(bufferSize, nrRemaining); commonInitialMD.update(keyValueBuffer, 0, curSize); } } @@ -333,8 +335,10 @@ public Void call() throws Exception { LOG.info("validateWrites : {}", validateWrites); LOG.info("Number of Validate Threads: {}", numOfValidateThreads); LOG.info("cleanObjects : {}", cleanObjects); + + Span currentSpan = TracingUtil.getActiveSpan(); for (int i = 0; i < numOfThreads; i++) { - executor.execute(new ObjectCreator()); + executor.execute(new ObjectCreator(currentSpan)); } ExecutorService validateExecutor = null; @@ -360,7 +364,7 @@ public Void call() throws Exception { // wait until all keys are added or exception occurred. while ((numberOfKeysAdded.get() != totalKeyCount) - && exception == null) { + && exception == null) { Thread.sleep(CHECK_INTERVAL_MILLIS); } executor.shutdown(); @@ -689,9 +693,9 @@ private static class KeyValidate { /** * Constructs a new ozone keyValidate. * - * @param bucket bucket part - * @param keyName key part - * @param digest digest of this key's full value + * @param bucket bucket part + * @param keyName key part + * @param digest digest of this key's full value */ KeyValidate(OzoneBucket bucket, String keyName, byte[] digest) { this.bucket = bucket; @@ -701,22 +705,32 @@ private static class KeyValidate { } private class ObjectCreator implements Runnable { + private final Span parentSpan; + + ObjectCreator(Span parentSpan) { + this.parentSpan = parentSpan; + } + @Override public void run() { + try (Scope scope = parentSpan.makeCurrent()) { + createObjects(); + } + } + + private void createObjects() { int v; while ((v = volumeCounter.getAndIncrement()) < numOfVolumes) { if (!createVolume(v)) { return; } } - int b; while ((b = bucketCounter.getAndIncrement()) < totalBucketCount) { if (!createBucket(b)) { return; } } - long k; while ((k = keyCounter.getAndIncrement()) < totalKeyCount) { if (!createKey(k)) { @@ -919,7 +933,7 @@ OzoneBucket getBucket(Integer bucketNumber) { * threads). * * @return may return null if this thread is interrupted, or if any other - * thread encounters an exception (and stores it to {@code exception}) + * thread encounters an exception (and stores it to {@code exception}) */ private T waitUntilAddedToMap(Map map, Integer i) { while (exception == null && !map.containsKey(i)) {