From 70cef7e83c85c85a6f7a5fd07a9ea26f09eb20c9 Mon Sep 17 00:00:00 2001 From: sravani-revuri Date: Fri, 20 Mar 2026 22:23:50 +0530 Subject: [PATCH 1/5] HDDS-14814. Unify Fragmented Traces for Freon randomkeys Command --- .../hadoop/hdds/tracing/LoopSampler.java | 48 +++++ .../hadoop/hdds/tracing/SpanSampler.java | 84 ++++++++ .../hadoop/hdds/tracing/TracingUtil.java | 108 ++++++++++- .../hadoop/hdds/tracing/TestSpanSampling.java | 183 ++++++++++++++++++ .../org/apache/hadoop/ozone/freon/Freon.java | 3 +- .../ozone/freon/RandomKeyGenerator.java | 58 +++--- 6 files changed, 455 insertions(+), 29 deletions(-) create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/LoopSampler.java create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/SpanSampler.java create mode 100644 hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/tracing/TestSpanSampling.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/LoopSampler.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/LoopSampler.java new file mode 100644 index 000000000000..124d20356884 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/LoopSampler.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.tracing; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Generic span sampler that samples every Nth span. + * Uses a counter to ensure accurate 1-in-N sampling without dropping + */ +public final class LoopSampler { + private final long sampleInterval; + private final AtomicLong counter = new AtomicLong(0); + + /** + * @param sampleInterval sample every Nth span (e.g. 1000 = 1 in 1000) + */ + public LoopSampler(long sampleInterval) { + if (sampleInterval <= 0) { + throw new IllegalArgumentException("sampleInterval must be positive: " + sampleInterval); + } + this.sampleInterval = sampleInterval; + } + + /** + * Returns true to sample this span, false to drop. + * Thread-safe; provides deterministic 1-in-N sampling. + */ + public boolean shouldSample() { + long count = counter.incrementAndGet(); + return (count % sampleInterval) == 0; + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/SpanSampler.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/SpanSampler.java new file mode 100644 index 000000000000..237bfc55769e --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/SpanSampler.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.tracing; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.opentelemetry.sdk.trace.samplers.SamplingResult; +import java.util.List; +import java.util.Map; + +/** + * Custom Sampler that applies span-level sampling for configured + * span names, and delegates to parent-based strategy otherwise. + * When a span name is in the configured spanMap, uses LoopSampler for + * deterministic 1-in-N sampling, otherwise follows the parent span's + * sampling decision. + */ +public final class SpanSampler implements Sampler { + + private final Sampler rootSampler; + private final Map spanMap; + + public SpanSampler(Sampler rootSampler, + Map spanMap) { + this.rootSampler = rootSampler; + this.spanMap = spanMap; + } + + @Override + public SamplingResult shouldSample(Context parentContext, String traceId, + String spanName, SpanKind spanKind, Attributes attributes, + List parentLinks) { + + // First, check if we have a valid parent span + io.opentelemetry.api.trace.Span parentSpan = + io.opentelemetry.api.trace.Span.fromContext(parentContext); + + if (!parentSpan.getSpanContext().isValid()) { + // Root span: always delegate to trace-level sampler + // This ensures OTEL_TRACES_SAMPLER_ARG=0.5 is respected + return rootSampler.shouldSample(parentContext, traceId, spanName, + spanKind, attributes, parentLinks); + } + + // Child span: check parent's sampling status first + if (!parentSpan.getSpanContext().isSampled()) { + // Parent was not sampled, so this child should not be sampled either + // This prevents orphaned spans + return SamplingResult.drop(); + } + + // Parent was sampled, now check if this span has explicit sampling config + LoopSampler loopSampler = spanMap.get(spanName); + if (loopSampler != null) { + boolean sample = loopSampler.shouldSample(); + return sample ? SamplingResult.recordAndSample() : SamplingResult.drop(); + } + + // No explicit config for this span, follow parent's sampling decision + return SamplingResult.recordAndSample(); + } + + @Override + public String getDescription() { + return "SpanSamplingCustomSampler(spanMap=" + spanMap.keySet() + ")"; + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java index 8d6e0fd240f3..b06382393beb 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java @@ -52,6 +52,8 @@ public final class TracingUtil { private static final String OTEL_EXPORTER_OTLP_ENDPOINT_DEFAULT = "http://localhost:4317"; private static final String OTEL_TRACES_SAMPLER_ARG = "OTEL_TRACES_SAMPLER_ARG"; private static final double OTEL_TRACES_SAMPLER_RATIO_DEFAULT = 1.0; + private static final String OTEL_SPAN_SAMPLING = "OTEL_SPAN_SAMPLING"; + private static final String OTEL_TRACES_SAMPLER_CONFIG_DEFAULT = ""; private static volatile boolean isInit = false; private static Tracer tracer = OpenTelemetry.noop().getTracer("noop"); @@ -69,7 +71,7 @@ public static void initTracing( } try { - initialize(serviceName); + initialize(serviceName, conf); isInit = true; LOG.info("Initialized tracing service: {}", serviceName); } catch (Exception e) { @@ -77,7 +79,7 @@ public static void initTracing( } } - private static void initialize(String serviceName) { + private static void initialize(String serviceName, ConfigurationSource conf) { String otelEndPoint = System.getenv(OTEL_EXPORTER_OTLP_ENDPOINT); if (otelEndPoint == null || otelEndPoint.isEmpty()) { otelEndPoint = OTEL_EXPORTER_OTLP_ENDPOINT_DEFAULT; @@ -93,16 +95,39 @@ private static void initialize(String serviceName) { // ignore and use the default value. } + String spanSamplingConfig = OTEL_TRACES_SAMPLER_CONFIG_DEFAULT; + try { + String spanStrConfig = System.getenv(OTEL_SPAN_SAMPLING); + if (spanStrConfig != null && !spanStrConfig.isEmpty()) { + spanSamplingConfig = spanStrConfig; + } + } catch (Exception ex) { + // ignore and use the default value. + } + // Pass the config to parseSpanSamplingConfig to get spans to eb sampled. + Map spanMap = parseSpanSamplingConfig(spanSamplingConfig != null ? spanSamplingConfig : ""); + Resource resource = Resource.create(Attributes.of(AttributeKey.stringKey("service.name"), serviceName)); OtlpGrpcSpanExporter spanExporter = OtlpGrpcSpanExporter.builder() .setEndpoint(otelEndPoint) .build(); SimpleSpanProcessor spanProcessor = SimpleSpanProcessor.builder(spanExporter).build(); + + // Choose sampler based on span sampling config. If it is empty use trace based sampling only. + // else use custom SpanSampler. + Sampler sampler; + if (spanMap.isEmpty()) { + sampler = Sampler.traceIdRatioBased(samplerRatio); + } else { + Sampler rootSampler = Sampler.traceIdRatioBased(samplerRatio); + sampler = new SpanSampler(rootSampler, spanMap); + } + SdkTracerProvider tracerProvider = SdkTracerProvider.builder() .addSpanProcessor(spanProcessor) .setResource(resource) - .setSampler(Sampler.traceIdRatioBased(samplerRatio)) + .setSampler(sampler) .build(); OpenTelemetry openTelemetry = OpenTelemetrySdk.builder() .setTracerProvider(tracerProvider) @@ -177,6 +202,41 @@ public static boolean isTracingEnabled( ScmConfigKeys.HDDS_TRACING_ENABLED_DEFAULT); } + /** Function to parse span sampling config. The input is in thr form :. + * The sample rate must be a natural number (1,2,3). Any value other than that will LOG an error. + * */ + private static Map parseSpanSamplingConfig(String configStr) { + Map result = new HashMap<>(); + if (configStr == null || configStr.isEmpty()) { + return result; + } + + for (String entry : configStr.split(",")) { + String trimmed = entry.trim(); + int colon = trimmed.indexOf(':'); + if (colon <= 0 || colon >= trimmed.length() - 1) { + continue; + } + + String name = trimmed.substring(0, colon).trim(); + String val = trimmed.substring(colon + 1).trim(); + + try { + // Long.parseLong strictly rejects decimals (throws NumberFormatException) + long interval = Long.parseLong(val); + + // LoopSampler constructor strictly rejects <= 0 (throws IllegalArgumentException) + result.put(name, new LoopSampler(interval)); + + } catch (NumberFormatException e) { + LOG.error("Invalid ratio '{}' for span '{}': decimals not allowed", val, name); + } catch (IllegalArgumentException e) { + LOG.error("Invalid ratio '{}' for span '{}': {}", val, name, e.getMessage()); + } + } + return result; + } + /** * Execute {@code runnable} inside an activated new span. * If a parent span exists in the current context, this becomes a child span. @@ -257,6 +317,48 @@ public static Span getActiveSpan() { return Span.current(); } + /** + * Import a parent span context and make it current without creating a new span. + * When next span is created, it will use this context as parent. + * + * @param encodedParent Encoded parent span context + * @return A Scope that should be closed when done, or null if no valid parent + */ + public static Scope importAndActivateContext(String encodedParent) { + if (encodedParent == null || encodedParent.isEmpty()) { + return null; + } + + W3CTraceContextPropagator propagator = W3CTraceContextPropagator.getInstance(); + Context extractedContext = propagator.extract(Context.current(), encodedParent, new TextExtractor()); + + if (Span.fromContext(extractedContext).getSpanContext().isValid()) { + return extractedContext.makeCurrent(); + } + + return null; + } + + /** + * Execute the given runnable with the imported parent span context activated. + * This propagates the trace context to the current thread without creating + * a new span. + * + * @param encodedParent Encoded parent span context (can be null or empty) + * @param runnable The code to execute within the imported context + */ + public static void executeWithImportedContext( + String encodedParent, CheckedRunnable runnable) throws E { + Scope scope = importAndActivateContext(encodedParent); + try { + runnable.run(); + } finally { + if (scope != null) { + scope.close(); + } + } + } + /** * AutoCloseable interface for tracing span but no exception is thrown in close. */ diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/tracing/TestSpanSampling.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/tracing/TestSpanSampling.java new file mode 100644 index 000000000000..9be9907df908 --- /dev/null +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/tracing/TestSpanSampling.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.tracing; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.opentelemetry.sdk.trace.samplers.SamplingDecision; +import io.opentelemetry.sdk.trace.samplers.SamplingResult; +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.Test; + +/** + * Test cases for span sampling functionality. + */ +public class TestSpanSampling { + + /** + * Validates the logic of the LoopSampler through the following cases: + * 1. Invalid intervals (0 or negative) throw an {@link IllegalArgumentException}. + * 2. Sampling occurs exactly every Nth attempt for a given interval. + * 3. An interval of 1 results in every single attempt being sampled. + */ + @Test + public void testLoopSamplerLogic() { + // Tests invalid values + assertThrows(IllegalArgumentException.class, () -> new LoopSampler(0)); + assertThrows(IllegalArgumentException.class, () -> new LoopSampler(-1)); + + // Tests functionality of a correct value + LoopSampler sampler3 = new LoopSampler(3); + assertFalse(sampler3.shouldSample(), "1st span should not be sampled"); + assertFalse(sampler3.shouldSample(), "2nd span should not be sampled"); + assertTrue(sampler3.shouldSample(), "3rd span should be sampled"); + assertFalse(sampler3.shouldSample(), "4th span should not be sampled"); + assertFalse(sampler3.shouldSample(), "5th span should not be sampled"); + assertTrue(sampler3.shouldSample(), "6th span should be sampled"); + + // Tests every span is sampled for a value of 1 + LoopSampler sampler1 = new LoopSampler(1); + for (int i = 1; i <= 10; i++) { + assertTrue(sampler1.shouldSample(), "Span " + i + " should be sampled when interval is 1"); + } + } + + /** + * Tests that valid configuration strings result in a Map + * containing the correct LoopSampler objects. + */ + @Test + public void testParseSpanSamplingConfigValid() throws Exception { + String config = "createVolume:1,createBucket:2,createKey:10"; + Method method = TracingUtil.class.getDeclaredMethod("parseSpanSamplingConfig", String.class); + method.setAccessible(true); + Map result = (Map) method.invoke(null, config); + + // Verify all 3 valid entries exist + assertEquals(3, result.size()); + assertTrue(result.containsKey("createVolume")); + assertTrue(result.containsKey("createBucket")); + assertTrue(result.containsKey("createKey")); + } + + /** + * Tests that invalid entries (decimals, zeros, text, negative numbers) are caught + * by the try-catch blocks and excluded from the resulting Map. + */ + @Test + public void testParseSpanSamplingConfigInvalid() throws Exception { + String config = "createVolume:0.5,createBucket:0,createKey:-4.5,writeKey:-1"; + + Method method = TracingUtil.class.getDeclaredMethod("parseSpanSamplingConfig", String.class); + method.setAccessible(true); + + Map result = (Map) method.invoke(null, config); + + // Verify the map is empty because every entry was invalid + assertTrue(result.isEmpty(), "The map should be empty as all inputs were invalid"); + } + + /** + * Tests a mixed configuration to ensure valid entries are + * preserved while invalid ones are skipped. + */ + @Test + public void testParseSpanSamplingConfigMixed() throws Exception { + String config = "createVolume:1,createBucket:0.5"; + + Method method = TracingUtil.class.getDeclaredMethod("parseSpanSamplingConfig", String.class); + method.setAccessible(true); + + Map result = (Map) method.invoke(null, config); + + // Verify createVolume is kept and createBucket is discarded + assertEquals(1, result.size()); + assertTrue(result.containsKey("createVolume")); + assertFalse(result.containsKey("createBucket")); + } + + /** Tests a SpanSampler dropping appropriate samples according to Config. + * (e.g., keeping every 2nd "createKey"). + */ + @Test + public void testSpanSamplingWithConfiguration() { + Map spanMap = new HashMap<>(); + spanMap.put("createKey", new LoopSampler(2)); + + Sampler rootSampler = Sampler.alwaysOn(); + SpanSampler customSampler = new SpanSampler(rootSampler, spanMap); + + // Create a parent span to move from rootSampler to customSampler logic. + Span parentSpan = Span.wrap(SpanContext.create( + "00000000000000000000000000000001", + "0000000000000002", + TraceFlags.getSampled(), + TraceState.getDefault())); + Context parentContext = Context.root().with(parentSpan); + + // result1 will drop and result2 will be sampled according to config + SamplingResult result1 = customSampler.shouldSample(parentContext, "trace1", "createKey", + SpanKind.INTERNAL, Attributes.empty(), Collections.emptyList()); + assertEquals(SamplingDecision.DROP, result1.getDecision()); + SamplingResult result2 = customSampler.shouldSample(parentContext, "trace1", "createKey", + SpanKind.INTERNAL, Attributes.empty(), Collections.emptyList()); + assertEquals(SamplingDecision.RECORD_AND_SAMPLE, result2.getDecision()); + } + + @Test + public void testSpanSamplingWithTraceSampled() { + Map spanMap = new HashMap<>(); + spanMap.put("createKey", new LoopSampler(2)); + + Sampler rootSampler = Sampler.alwaysOn(); + SpanSampler customSampler = new SpanSampler(rootSampler, spanMap); + Context parentContext = Context.current(); + SamplingResult result = customSampler.shouldSample(parentContext, "trace1", "unknownSpan", + SpanKind.INTERNAL, Attributes.empty(), Collections.emptyList()); + + // Since no parent and not configured, should use root sampler and sample span. + assertEquals(SamplingDecision.RECORD_AND_SAMPLE, result.getDecision()); + } + + @Test + public void testSpanSamplingWithTraceNotSampled() { + Map spanMap = new HashMap<>(); + Sampler rootSampler = Sampler.alwaysOff(); + SpanSampler customSampler = new SpanSampler(rootSampler, spanMap); + Context parentContext = Context.current(); + + // Root span with alwaysOff should not be sampled. + SamplingResult rootResult = customSampler.shouldSample(parentContext, "trace1", "rootSpan", + SpanKind.INTERNAL, Attributes.empty(), Collections.emptyList()); + assertEquals(SamplingDecision.DROP, rootResult.getDecision()); + } +} diff --git a/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/Freon.java b/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/Freon.java index b2ed1eb80691..c6e21cf7a955 100644 --- a/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/Freon.java +++ b/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/Freon.java @@ -58,7 +58,8 @@ public int execute(String[] argv) { conf = getOzoneConf(); HddsServerUtil.initializeMetrics(conf, "ozone-freon"); TracingUtil.initTracing("freon", conf); - return super.execute(argv); + String spanName = "ozone freon " + String.join(" ", argv); + return TracingUtil.executeInNewSpan(spanName, () -> super.execute(argv)); } @Override diff --git a/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java b/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java index 87ebf95f0b06..0fbe26c2fbe9 100644 --- a/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java +++ b/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java @@ -311,8 +311,8 @@ public Void call() throws Exception { if (validateWrites) { commonInitialMD = DigestUtils.getDigest(DIGEST_ALGORITHM); for (long nrRemaining = keySize.toBytes(); nrRemaining > 0; - nrRemaining -= bufferSize) { - int curSize = (int)Math.min(bufferSize, nrRemaining); + nrRemaining -= bufferSize) { + int curSize = (int) Math.min(bufferSize, nrRemaining); commonInitialMD.update(keyValueBuffer, 0, curSize); } } @@ -333,8 +333,10 @@ public Void call() throws Exception { LOG.info("validateWrites : {}", validateWrites); LOG.info("Number of Validate Threads: {}", numOfValidateThreads); LOG.info("cleanObjects : {}", cleanObjects); + + String currentSpanContext = TracingUtil.exportCurrentSpan(); for (int i = 0; i < numOfThreads; i++) { - executor.execute(new ObjectCreator()); + executor.execute(new ObjectCreator(currentSpanContext)); } ExecutorService validateExecutor = null; @@ -360,7 +362,7 @@ public Void call() throws Exception { // wait until all keys are added or exception occurred. while ((numberOfKeysAdded.get() != totalKeyCount) - && exception == null) { + && exception == null) { Thread.sleep(CHECK_INTERVAL_MILLIS); } executor.shutdown(); @@ -689,9 +691,9 @@ private static class KeyValidate { /** * Constructs a new ozone keyValidate. * - * @param bucket bucket part - * @param keyName key part - * @param digest digest of this key's full value + * @param bucket bucket part + * @param keyName key part + * @param digest digest of this key's full value */ KeyValidate(OzoneBucket bucket, String keyName, byte[] digest) { this.bucket = bucket; @@ -701,28 +703,34 @@ private static class KeyValidate { } private class ObjectCreator implements Runnable { + private final String parentSpanContext; + + ObjectCreator(String parentSpanContext) { + this.parentSpanContext = parentSpanContext; + } + @Override public void run() { - int v; - while ((v = volumeCounter.getAndIncrement()) < numOfVolumes) { - if (!createVolume(v)) { - return; + TracingUtil.executeWithImportedContext(parentSpanContext, () -> { + int v; + while ((v = volumeCounter.getAndIncrement()) < numOfVolumes) { + if (!createVolume(v)) { + return; + } } - } - - int b; - while ((b = bucketCounter.getAndIncrement()) < totalBucketCount) { - if (!createBucket(b)) { - return; + int b; + while ((b = bucketCounter.getAndIncrement()) < totalBucketCount) { + if (!createBucket(b)) { + return; + } } - } - - long k; - while ((k = keyCounter.getAndIncrement()) < totalKeyCount) { - if (!createKey(k)) { - return; + long k; + while ((k = keyCounter.getAndIncrement()) < totalKeyCount) { + if (!createKey(k)) { + return; + } } - } + }); } } @@ -919,7 +927,7 @@ OzoneBucket getBucket(Integer bucketNumber) { * threads). * * @return may return null if this thread is interrupted, or if any other - * thread encounters an exception (and stores it to {@code exception}) + * thread encounters an exception (and stores it to {@code exception}) */ private T waitUntilAddedToMap(Map map, Integer i) { while (exception == null && !map.containsKey(i)) { From 03ee9c6dddfc3d2c35a10e4d68bca8ea3c0104ec Mon Sep 17 00:00:00 2001 From: sravani-revuri Date: Fri, 20 Mar 2026 22:55:48 +0530 Subject: [PATCH 2/5] fixed findbugs --- .../main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java index b06382393beb..8b12041b7dbb 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java @@ -105,7 +105,7 @@ private static void initialize(String serviceName, ConfigurationSource conf) { // ignore and use the default value. } // Pass the config to parseSpanSamplingConfig to get spans to eb sampled. - Map spanMap = parseSpanSamplingConfig(spanSamplingConfig != null ? spanSamplingConfig : ""); + Map spanMap = parseSpanSamplingConfig(spanSamplingConfig); Resource resource = Resource.create(Attributes.of(AttributeKey.stringKey("service.name"), serviceName)); OtlpGrpcSpanExporter spanExporter = OtlpGrpcSpanExporter.builder() From 6b1bef4357ce8ffd04cee70236456475573f309e Mon Sep 17 00:00:00 2001 From: sravani-revuri Date: Mon, 23 Mar 2026 11:35:56 +0530 Subject: [PATCH 3/5] addressed comments --- .../java/org/apache/hadoop/hdds/tracing/TracingUtil.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java index 8b12041b7dbb..980b31fbfdb4 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java @@ -71,7 +71,7 @@ public static void initTracing( } try { - initialize(serviceName, conf); + initialize(serviceName); isInit = true; LOG.info("Initialized tracing service: {}", serviceName); } catch (Exception e) { @@ -79,7 +79,7 @@ public static void initTracing( } } - private static void initialize(String serviceName, ConfigurationSource conf) { + private static void initialize(String serviceName) { String otelEndPoint = System.getenv(OTEL_EXPORTER_OTLP_ENDPOINT); if (otelEndPoint == null || otelEndPoint.isEmpty()) { otelEndPoint = OTEL_EXPORTER_OTLP_ENDPOINT_DEFAULT; @@ -104,7 +104,7 @@ private static void initialize(String serviceName, ConfigurationSource conf) { } catch (Exception ex) { // ignore and use the default value. } - // Pass the config to parseSpanSamplingConfig to get spans to eb sampled. + // Pass the config to parseSpanSamplingConfig to get spans to be sampled. Map spanMap = parseSpanSamplingConfig(spanSamplingConfig); Resource resource = Resource.create(Attributes.of(AttributeKey.stringKey("service.name"), serviceName)); @@ -202,7 +202,7 @@ public static boolean isTracingEnabled( ScmConfigKeys.HDDS_TRACING_ENABLED_DEFAULT); } - /** Function to parse span sampling config. The input is in thr form :. + /** Function to parse span sampling config. The input is in the form :. * The sample rate must be a natural number (1,2,3). Any value other than that will LOG an error. * */ private static Map parseSpanSamplingConfig(String configStr) { From c419d36b5832dfec3155e08b37d32e53df6a0aa8 Mon Sep 17 00:00:00 2001 From: sravani-revuri Date: Wed, 25 Mar 2026 12:19:08 +0530 Subject: [PATCH 4/5] addressed comments and implemented random sampling --- .../hadoop/hdds/tracing/LoopSampler.java | 34 +++--- .../hadoop/hdds/tracing/SpanSampler.java | 1 + .../hadoop/hdds/tracing/TracingUtil.java | 81 ++++--------- .../hadoop/hdds/tracing/TestSpanSampling.java | 107 +++++++----------- hadoop-ozone/freon/pom.xml | 4 + .../ozone/freon/RandomKeyGenerator.java | 16 +-- 6 files changed, 94 insertions(+), 149 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/LoopSampler.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/LoopSampler.java index 124d20356884..d7a044d2a4f1 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/LoopSampler.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/LoopSampler.java @@ -17,32 +17,30 @@ package org.apache.hadoop.hdds.tracing; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ThreadLocalRandom; /** - * Generic span sampler that samples every Nth span. - * Uses a counter to ensure accurate 1-in-N sampling without dropping + * Probability-based span sampler that samples spans independently. + * Uses ThreadLocalRandom for probability decisions. */ public final class LoopSampler { - private final long sampleInterval; - private final AtomicLong counter = new AtomicLong(0); + private final double probability; - /** - * @param sampleInterval sample every Nth span (e.g. 1000 = 1 in 1000) - */ - public LoopSampler(long sampleInterval) { - if (sampleInterval <= 0) { - throw new IllegalArgumentException("sampleInterval must be positive: " + sampleInterval); + public LoopSampler(double ratio) { + if (ratio < 0) { + throw new IllegalArgumentException("Sampling ratio cannot be negative: " + ratio); } - this.sampleInterval = sampleInterval; + // Cap at 1.0 to prevent logic errors + this.probability = Math.min(ratio, 1.0); } - /** - * Returns true to sample this span, false to drop. - * Thread-safe; provides deterministic 1-in-N sampling. - */ public boolean shouldSample() { - long count = counter.incrementAndGet(); - return (count % sampleInterval) == 0; + if (probability <= 0) { + return false; + } + if (probability >= 1.0) { + return true; + } + return ThreadLocalRandom.current().nextDouble() < probability; } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/SpanSampler.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/SpanSampler.java index 237bfc55769e..9fa6bf828b6d 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/SpanSampler.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/SpanSampler.java @@ -60,6 +60,7 @@ public SamplingResult shouldSample(Context parentContext, String traceId, } // Child span: check parent's sampling status first + // after the process of sampling trace / parent span then check if it is sampled or not. if (!parentSpan.getSpanContext().isSampled()) { // Parent was not sampled, so this child should not be sampled either // This prevents orphaned spans diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java index 980b31fbfdb4..6ef604bc2ad9 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/TracingUtil.java @@ -33,6 +33,7 @@ import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; import io.opentelemetry.sdk.trace.samplers.Sampler; import java.lang.reflect.Proxy; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.hdds.conf.ConfigurationSource; @@ -52,7 +53,7 @@ public final class TracingUtil { private static final String OTEL_EXPORTER_OTLP_ENDPOINT_DEFAULT = "http://localhost:4317"; private static final String OTEL_TRACES_SAMPLER_ARG = "OTEL_TRACES_SAMPLER_ARG"; private static final double OTEL_TRACES_SAMPLER_RATIO_DEFAULT = 1.0; - private static final String OTEL_SPAN_SAMPLING = "OTEL_SPAN_SAMPLING"; + private static final String OTEL_SPAN_SAMPLING_ARG = "OTEL_SPAN_SAMPLING_ARG"; private static final String OTEL_TRACES_SAMPLER_CONFIG_DEFAULT = ""; private static volatile boolean isInit = false; @@ -90,19 +91,25 @@ private static void initialize(String serviceName) { String sampleStrRatio = System.getenv(OTEL_TRACES_SAMPLER_ARG); if (sampleStrRatio != null && !sampleStrRatio.isEmpty()) { samplerRatio = Double.parseDouble(System.getenv(OTEL_TRACES_SAMPLER_ARG)); + LOG.info("Sampling Trace Config = '{}'", samplerRatio); } } catch (NumberFormatException ex) { - // ignore and use the default value. + // log and use the default value. + LOG.warn("Invalid value for {}: '{}'. Falling back to default: {}", + OTEL_TRACES_SAMPLER_ARG, System.getenv(OTEL_TRACES_SAMPLER_ARG), OTEL_TRACES_SAMPLER_RATIO_DEFAULT, ex); } String spanSamplingConfig = OTEL_TRACES_SAMPLER_CONFIG_DEFAULT; try { - String spanStrConfig = System.getenv(OTEL_SPAN_SAMPLING); + String spanStrConfig = System.getenv(OTEL_SPAN_SAMPLING_ARG); if (spanStrConfig != null && !spanStrConfig.isEmpty()) { spanSamplingConfig = spanStrConfig; } + LOG.info("Sampling Span Config = '{}'", spanSamplingConfig); } catch (Exception ex) { - // ignore and use the default value. + // Log and use the default value. + LOG.warn("Failed to process {}. Falling back to default configuration: {}", + OTEL_SPAN_SAMPLING_ARG, OTEL_TRACES_SAMPLER_CONFIG_DEFAULT, ex); } // Pass the config to parseSpanSamplingConfig to get spans to be sampled. Map spanMap = parseSpanSamplingConfig(spanSamplingConfig); @@ -203,17 +210,18 @@ public static boolean isTracingEnabled( } /** Function to parse span sampling config. The input is in the form :. - * The sample rate must be a natural number (1,2,3). Any value other than that will LOG an error. + * The sample rate must be a number between 0 and 1. Any value other than that will LOG an error. * */ private static Map parseSpanSamplingConfig(String configStr) { Map result = new HashMap<>(); if (configStr == null || configStr.isEmpty()) { - return result; + return Collections.emptyMap(); } for (String entry : configStr.split(",")) { String trimmed = entry.trim(); int colon = trimmed.indexOf(':'); + if (colon <= 0 || colon >= trimmed.length() - 1) { continue; } @@ -222,16 +230,17 @@ private static Map parseSpanSamplingConfig(String configStr String val = trimmed.substring(colon + 1).trim(); try { - // Long.parseLong strictly rejects decimals (throws NumberFormatException) - long interval = Long.parseLong(val); - - // LoopSampler constructor strictly rejects <= 0 (throws IllegalArgumentException) - result.put(name, new LoopSampler(interval)); - + double rate = Double.parseDouble(val); + //if the rate is less than or equal to zero , no sampling config is taken for that key value pair. + if (rate > 0) { + // cap it at 1.0 when a number greater than 1 is entered + double effectiveRate = Math.min(rate, 1.0); + result.put(name, new LoopSampler(effectiveRate)); + } else { + LOG.warn("rate for span '{}' is 0 or less, ignoring sample configuration", name); + } } catch (NumberFormatException e) { - LOG.error("Invalid ratio '{}' for span '{}': decimals not allowed", val, name); - } catch (IllegalArgumentException e) { - LOG.error("Invalid ratio '{}' for span '{}': {}", val, name, e.getMessage()); + LOG.error("Invalid rate '{}' for span '{}', ignoring sample configuration", val, name); } } return result; @@ -317,48 +326,6 @@ public static Span getActiveSpan() { return Span.current(); } - /** - * Import a parent span context and make it current without creating a new span. - * When next span is created, it will use this context as parent. - * - * @param encodedParent Encoded parent span context - * @return A Scope that should be closed when done, or null if no valid parent - */ - public static Scope importAndActivateContext(String encodedParent) { - if (encodedParent == null || encodedParent.isEmpty()) { - return null; - } - - W3CTraceContextPropagator propagator = W3CTraceContextPropagator.getInstance(); - Context extractedContext = propagator.extract(Context.current(), encodedParent, new TextExtractor()); - - if (Span.fromContext(extractedContext).getSpanContext().isValid()) { - return extractedContext.makeCurrent(); - } - - return null; - } - - /** - * Execute the given runnable with the imported parent span context activated. - * This propagates the trace context to the current thread without creating - * a new span. - * - * @param encodedParent Encoded parent span context (can be null or empty) - * @param runnable The code to execute within the imported context - */ - public static void executeWithImportedContext( - String encodedParent, CheckedRunnable runnable) throws E { - Scope scope = importAndActivateContext(encodedParent); - try { - runnable.run(); - } finally { - if (scope != null) { - scope.close(); - } - } - } - /** * AutoCloseable interface for tracing span but no exception is thrown in close. */ diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/tracing/TestSpanSampling.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/tracing/TestSpanSampling.java index 9be9907df908..d02836e888b0 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/tracing/TestSpanSampling.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/tracing/TestSpanSampling.java @@ -19,15 +19,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.SpanKind; -import io.opentelemetry.api.trace.TraceFlags; -import io.opentelemetry.api.trace.TraceState; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.trace.samplers.Sampler; import io.opentelemetry.sdk.trace.samplers.SamplingDecision; @@ -43,41 +38,13 @@ */ public class TestSpanSampling { - /** - * Validates the logic of the LoopSampler through the following cases: - * 1. Invalid intervals (0 or negative) throw an {@link IllegalArgumentException}. - * 2. Sampling occurs exactly every Nth attempt for a given interval. - * 3. An interval of 1 results in every single attempt being sampled. - */ - @Test - public void testLoopSamplerLogic() { - // Tests invalid values - assertThrows(IllegalArgumentException.class, () -> new LoopSampler(0)); - assertThrows(IllegalArgumentException.class, () -> new LoopSampler(-1)); - - // Tests functionality of a correct value - LoopSampler sampler3 = new LoopSampler(3); - assertFalse(sampler3.shouldSample(), "1st span should not be sampled"); - assertFalse(sampler3.shouldSample(), "2nd span should not be sampled"); - assertTrue(sampler3.shouldSample(), "3rd span should be sampled"); - assertFalse(sampler3.shouldSample(), "4th span should not be sampled"); - assertFalse(sampler3.shouldSample(), "5th span should not be sampled"); - assertTrue(sampler3.shouldSample(), "6th span should be sampled"); - - // Tests every span is sampled for a value of 1 - LoopSampler sampler1 = new LoopSampler(1); - for (int i = 1; i <= 10; i++) { - assertTrue(sampler1.shouldSample(), "Span " + i + " should be sampled when interval is 1"); - } - } - /** * Tests that valid configuration strings result in a Map * containing the correct LoopSampler objects. */ @Test public void testParseSpanSamplingConfigValid() throws Exception { - String config = "createVolume:1,createBucket:2,createKey:10"; + String config = "createVolume:0.25,createBucket:0.5,createKey:0.75"; Method method = TracingUtil.class.getDeclaredMethod("parseSpanSamplingConfig", String.class); method.setAccessible(true); Map result = (Map) method.invoke(null, config); @@ -90,12 +57,12 @@ public void testParseSpanSamplingConfigValid() throws Exception { } /** - * Tests that invalid entries (decimals, zeros, text, negative numbers) are caught + * Tests that invalid entries (zeros, negative numbers, non-numeric) are caught * by the try-catch blocks and excluded from the resulting Map. */ @Test public void testParseSpanSamplingConfigInvalid() throws Exception { - String config = "createVolume:0.5,createBucket:0,createKey:-4.5,writeKey:-1"; + String config = "createVolume:0,createBucket:-0.5,createKey:invalid,writeKey:-1"; Method method = TracingUtil.class.getDeclaredMethod("parseSpanSamplingConfig", String.class); method.setAccessible(true); @@ -112,51 +79,28 @@ public void testParseSpanSamplingConfigInvalid() throws Exception { */ @Test public void testParseSpanSamplingConfigMixed() throws Exception { - String config = "createVolume:1,createBucket:0.5"; + String config = "createVolume:0.75,createBucket:0,createKey:-5"; Method method = TracingUtil.class.getDeclaredMethod("parseSpanSamplingConfig", String.class); method.setAccessible(true); Map result = (Map) method.invoke(null, config); - // Verify createVolume is kept and createBucket is discarded + // Verify only createVolume is kept assertEquals(1, result.size()); assertTrue(result.containsKey("createVolume")); assertFalse(result.containsKey("createBucket")); + assertFalse(result.containsKey("createKey")); } - /** Tests a SpanSampler dropping appropriate samples according to Config. - * (e.g., keeping every 2nd "createKey"). - */ - @Test - public void testSpanSamplingWithConfiguration() { - Map spanMap = new HashMap<>(); - spanMap.put("createKey", new LoopSampler(2)); - - Sampler rootSampler = Sampler.alwaysOn(); - SpanSampler customSampler = new SpanSampler(rootSampler, spanMap); - - // Create a parent span to move from rootSampler to customSampler logic. - Span parentSpan = Span.wrap(SpanContext.create( - "00000000000000000000000000000001", - "0000000000000002", - TraceFlags.getSampled(), - TraceState.getDefault())); - Context parentContext = Context.root().with(parentSpan); - - // result1 will drop and result2 will be sampled according to config - SamplingResult result1 = customSampler.shouldSample(parentContext, "trace1", "createKey", - SpanKind.INTERNAL, Attributes.empty(), Collections.emptyList()); - assertEquals(SamplingDecision.DROP, result1.getDecision()); - SamplingResult result2 = customSampler.shouldSample(parentContext, "trace1", "createKey", - SpanKind.INTERNAL, Attributes.empty(), Collections.emptyList()); - assertEquals(SamplingDecision.RECORD_AND_SAMPLE, result2.getDecision()); - } - + /** + * Test to show sampling of span only if trace is sampled. + * This shows priority given to trace. + * */ @Test public void testSpanSamplingWithTraceSampled() { Map spanMap = new HashMap<>(); - spanMap.put("createKey", new LoopSampler(2)); + spanMap.put("createKey", new LoopSampler(0.5)); Sampler rootSampler = Sampler.alwaysOn(); SpanSampler customSampler = new SpanSampler(rootSampler, spanMap); @@ -168,6 +112,10 @@ public void testSpanSamplingWithTraceSampled() { assertEquals(SamplingDecision.RECORD_AND_SAMPLE, result.getDecision()); } + /** + * Test to show dropping of span only if trace is not sample sampled. + * This shows priority given to Trace. + * */ @Test public void testSpanSamplingWithTraceNotSampled() { Map spanMap = new HashMap<>(); @@ -180,4 +128,29 @@ public void testSpanSamplingWithTraceNotSampled() { SpanKind.INTERNAL, Attributes.empty(), Collections.emptyList()); assertEquals(SamplingDecision.DROP, rootResult.getDecision()); } + + @Test + public void testChildDropsWhenParentIsNotSampled() { + Map spanMap = new HashMap<>(); + // Even if we set this to 100% (always sample), it should be ignored + spanMap.put("createKey", new LoopSampler(1.0)); + + SpanSampler customSampler = new SpanSampler(Sampler.alwaysOn(), spanMap); + + // 1. Manually create a NOT SAMPLED parent context + io.opentelemetry.api.trace.Span parentSpan = io.opentelemetry.api.trace.Span.wrap( + io.opentelemetry.api.trace.SpanContext.create( + "ff000000000000000000000000000041", + "ff00000000000042", + io.opentelemetry.api.trace.TraceFlags.getDefault(), // turns off sampling bit + io.opentelemetry.api.trace.TraceState.getDefault())); + + Context parentContext = Context.root().with(parentSpan); + + SamplingResult result = customSampler.shouldSample(parentContext, "trace1", "createKey", + SpanKind.INTERNAL, Attributes.empty(), Collections.emptyList()); + + //Assert that span is dropped because parent was not sampled + assertEquals(SamplingDecision.DROP, result.getDecision()); + } } diff --git a/hadoop-ozone/freon/pom.xml b/hadoop-ozone/freon/pom.xml index 71a9b1c11653..08bbdcd3eac9 100644 --- a/hadoop-ozone/freon/pom.xml +++ b/hadoop-ozone/freon/pom.xml @@ -70,6 +70,10 @@ io.opentelemetry opentelemetry-api + + io.opentelemetry + opentelemetry-context + org.apache.commons commons-lang3 diff --git a/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java b/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java index 0fbe26c2fbe9..bb80956b7669 100644 --- a/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java +++ b/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java @@ -27,6 +27,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; import com.google.common.annotations.VisibleForTesting; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Scope; import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStream; @@ -334,9 +336,9 @@ public Void call() throws Exception { LOG.info("Number of Validate Threads: {}", numOfValidateThreads); LOG.info("cleanObjects : {}", cleanObjects); - String currentSpanContext = TracingUtil.exportCurrentSpan(); + Span currentSpan = TracingUtil.getActiveSpan(); for (int i = 0; i < numOfThreads; i++) { - executor.execute(new ObjectCreator(currentSpanContext)); + executor.execute(new ObjectCreator(currentSpan)); } ExecutorService validateExecutor = null; @@ -703,15 +705,15 @@ private static class KeyValidate { } private class ObjectCreator implements Runnable { - private final String parentSpanContext; + private final Span parentSpan; - ObjectCreator(String parentSpanContext) { - this.parentSpanContext = parentSpanContext; + ObjectCreator(Span parentSpan) { + this.parentSpan = parentSpan; } @Override public void run() { - TracingUtil.executeWithImportedContext(parentSpanContext, () -> { + try (Scope scope = parentSpan.makeCurrent()) { int v; while ((v = volumeCounter.getAndIncrement()) < numOfVolumes) { if (!createVolume(v)) { @@ -730,7 +732,7 @@ public void run() { return; } } - }); + } } } From 80350c2a8b87bbb8ece64966c66823b23fe42f13 Mon Sep 17 00:00:00 2001 From: sravani-revuri Date: Wed, 25 Mar 2026 21:31:06 +0530 Subject: [PATCH 5/5] addressed comments --- .../hadoop/hdds/tracing/SpanSampler.java | 23 +++++---- .../hadoop/hdds/tracing/TestSpanSampling.java | 51 +++++++++---------- .../ozone/freon/RandomKeyGenerator.java | 34 +++++++------ 3 files changed, 57 insertions(+), 51 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/SpanSampler.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/SpanSampler.java index 9fa6bf828b6d..317573a5e19f 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/SpanSampler.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/tracing/SpanSampler.java @@ -18,8 +18,10 @@ package org.apache.hadoop.hdds.tracing; import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.trace.data.LinkData; import io.opentelemetry.sdk.trace.samplers.Sampler; import io.opentelemetry.sdk.trace.samplers.SamplingResult; import java.util.List; @@ -29,7 +31,7 @@ * Custom Sampler that applies span-level sampling for configured * span names, and delegates to parent-based strategy otherwise. * When a span name is in the configured spanMap, uses LoopSampler for - * deterministic 1-in-N sampling, otherwise follows the parent span's + * probabilistic sampling, otherwise follows the parent span's * sampling decision. */ public final class SpanSampler implements Sampler { @@ -44,17 +46,20 @@ public SpanSampler(Sampler rootSampler, } @Override - public SamplingResult shouldSample(Context parentContext, String traceId, - String spanName, SpanKind spanKind, Attributes attributes, - List parentLinks) { + public SamplingResult shouldSample( + Context parentContext, + String traceId, + String spanName, + SpanKind spanKind, + Attributes attributes, + List parentLinks) { - // First, check if we have a valid parent span - io.opentelemetry.api.trace.Span parentSpan = - io.opentelemetry.api.trace.Span.fromContext(parentContext); + Span parentSpan = Span.fromContext(parentContext); + // check if we have a valid parent span if (!parentSpan.getSpanContext().isValid()) { // Root span: always delegate to trace-level sampler - // This ensures OTEL_TRACES_SAMPLER_ARG=0.5 is respected + // This ensures OTEL_TRACES_SAMPLER_ARG is respected return rootSampler.shouldSample(parentContext, traceId, spanName, spanKind, attributes, parentLinks); } @@ -80,6 +85,6 @@ public SamplingResult shouldSample(Context parentContext, String traceId, @Override public String getDescription() { - return "SpanSamplingCustomSampler(spanMap=" + spanMap.keySet() + ")"; + return "SpanSampler(spanMap=" + spanMap.keySet() + ")"; } } diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/tracing/TestSpanSampling.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/tracing/TestSpanSampling.java index d02836e888b0..fbbc2b0b5b69 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/tracing/TestSpanSampling.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/tracing/TestSpanSampling.java @@ -17,12 +17,13 @@ package org.apache.hadoop.hdds.tracing; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.trace.samplers.Sampler; import io.opentelemetry.sdk.trace.samplers.SamplingDecision; @@ -49,11 +50,10 @@ public void testParseSpanSamplingConfigValid() throws Exception { method.setAccessible(true); Map result = (Map) method.invoke(null, config); - // Verify all 3 valid entries exist - assertEquals(3, result.size()); - assertTrue(result.containsKey("createVolume")); - assertTrue(result.containsKey("createBucket")); - assertTrue(result.containsKey("createKey")); + assertThat(result) + .hasSize(3) + .containsKeys("createVolume", "createBucket", "createKey"); + } /** @@ -63,14 +63,11 @@ public void testParseSpanSamplingConfigValid() throws Exception { @Test public void testParseSpanSamplingConfigInvalid() throws Exception { String config = "createVolume:0,createBucket:-0.5,createKey:invalid,writeKey:-1"; - Method method = TracingUtil.class.getDeclaredMethod("parseSpanSamplingConfig", String.class); method.setAccessible(true); - Map result = (Map) method.invoke(null, config); - // Verify the map is empty because every entry was invalid - assertTrue(result.isEmpty(), "The map should be empty as all inputs were invalid"); + assertThat(result).as("The map should be empty as all inputs were invalid").isEmpty(); } /** @@ -86,17 +83,16 @@ public void testParseSpanSamplingConfigMixed() throws Exception { Map result = (Map) method.invoke(null, config); - // Verify only createVolume is kept - assertEquals(1, result.size()); - assertTrue(result.containsKey("createVolume")); - assertFalse(result.containsKey("createBucket")); - assertFalse(result.containsKey("createKey")); + assertThat(result) + .hasSize(1) + .containsKey("createVolume") + .doesNotContainKeys("createBucket", "createKey"); } /** - * Test to show sampling of span only if trace is sampled. - * This shows priority given to trace. - * */ + * Test to show sampling of span only if trace is sampled. + * Trace is always sampled and span name is not mentioned in config, Hence it will be sampled. + */ @Test public void testSpanSamplingWithTraceSampled() { Map spanMap = new HashMap<>(); @@ -123,34 +119,35 @@ public void testSpanSamplingWithTraceNotSampled() { SpanSampler customSampler = new SpanSampler(rootSampler, spanMap); Context parentContext = Context.current(); - // Root span with alwaysOff should not be sampled. - SamplingResult rootResult = customSampler.shouldSample(parentContext, "trace1", "rootSpan", + SamplingResult result = customSampler.shouldSample(parentContext, "trace1", "rootSpan", SpanKind.INTERNAL, Attributes.empty(), Collections.emptyList()); - assertEquals(SamplingDecision.DROP, rootResult.getDecision()); + + // Root span with alwaysOff should not be sampled. + assertEquals(SamplingDecision.DROP, result.getDecision()); } + /** + * Test to show child span is not sampled when parent span is also not sampled. + */ @Test public void testChildDropsWhenParentIsNotSampled() { Map spanMap = new HashMap<>(); - // Even if we set this to 100% (always sample), it should be ignored spanMap.put("createKey", new LoopSampler(1.0)); SpanSampler customSampler = new SpanSampler(Sampler.alwaysOn(), spanMap); - // 1. Manually create a NOT SAMPLED parent context io.opentelemetry.api.trace.Span parentSpan = io.opentelemetry.api.trace.Span.wrap( io.opentelemetry.api.trace.SpanContext.create( "ff000000000000000000000000000041", "ff00000000000042", - io.opentelemetry.api.trace.TraceFlags.getDefault(), // turns off sampling bit - io.opentelemetry.api.trace.TraceState.getDefault())); + TraceFlags.getDefault(), + TraceState.getDefault())); Context parentContext = Context.root().with(parentSpan); SamplingResult result = customSampler.shouldSample(parentContext, "trace1", "createKey", SpanKind.INTERNAL, Attributes.empty(), Collections.emptyList()); - //Assert that span is dropped because parent was not sampled assertEquals(SamplingDecision.DROP, result.getDecision()); } } diff --git a/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java b/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java index bb80956b7669..5fe147741598 100644 --- a/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java +++ b/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java @@ -714,23 +714,27 @@ private class ObjectCreator implements Runnable { @Override public void run() { try (Scope scope = parentSpan.makeCurrent()) { - int v; - while ((v = volumeCounter.getAndIncrement()) < numOfVolumes) { - if (!createVolume(v)) { - return; - } + createObjects(); + } + } + + private void createObjects() { + int v; + while ((v = volumeCounter.getAndIncrement()) < numOfVolumes) { + if (!createVolume(v)) { + return; } - int b; - while ((b = bucketCounter.getAndIncrement()) < totalBucketCount) { - if (!createBucket(b)) { - return; - } + } + int b; + while ((b = bucketCounter.getAndIncrement()) < totalBucketCount) { + if (!createBucket(b)) { + return; } - long k; - while ((k = keyCounter.getAndIncrement()) < totalKeyCount) { - if (!createKey(k)) { - return; - } + } + long k; + while ((k = keyCounter.getAndIncrement()) < totalKeyCount) { + if (!createKey(k)) { + return; } } }