From 0e1e69df969a472a62abb346eaac857a15e4a3f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?JB=20Onofr=C3=A9?= Date: Fri, 13 Mar 2026 17:01:20 +0100 Subject: [PATCH] Add OpenTelemetry distributed tracing support New activemq-opentelemetry module that instruments the broker using the OpenTelemetry API. The plugin traces send (PRODUCER), dispatch (CONSUMER), and acknowledge (INTERNAL) operations with W3C TraceContext propagation and standard OTel messaging semantic conventions. Depends on opentelemetry-api only; users bring their own SDK and exporter at runtime. Included in the distribution with example configuration. --- activemq-opentelemetry/pom.xml | 115 +++++++++ .../ActiveMQMessageTextMapGetter.java | 53 ++++ .../ActiveMQMessageTextMapSetter.java | 41 +++ .../OpenTelemetryBrokerPlugin.java | 202 +++++++++++++++ .../OpenTelemetryBrokerPluginTest.java | 244 ++++++++++++++++++ assembly/pom.xml | 4 + assembly/src/release/conf/activemq.xml | 9 + .../examples/conf/activemq-opentelemetry.xml | 120 +++++++++ bom/pom.xml | 5 + pom.xml | 16 ++ 10 files changed, 809 insertions(+) create mode 100644 activemq-opentelemetry/pom.xml create mode 100644 activemq-opentelemetry/src/main/java/org/apache/activemq/broker/util/opentelemetry/ActiveMQMessageTextMapGetter.java create mode 100644 activemq-opentelemetry/src/main/java/org/apache/activemq/broker/util/opentelemetry/ActiveMQMessageTextMapSetter.java create mode 100644 activemq-opentelemetry/src/main/java/org/apache/activemq/broker/util/opentelemetry/OpenTelemetryBrokerPlugin.java create mode 100644 activemq-opentelemetry/src/test/java/org/apache/activemq/broker/util/opentelemetry/OpenTelemetryBrokerPluginTest.java create mode 100644 assembly/src/release/examples/conf/activemq-opentelemetry.xml diff --git a/activemq-opentelemetry/pom.xml b/activemq-opentelemetry/pom.xml new file mode 100644 index 00000000000..e60ad1809c9 --- /dev/null +++ b/activemq-opentelemetry/pom.xml @@ -0,0 +1,115 @@ + + + + + 4.0.0 + + + org.apache.activemq + activemq-parent + 6.3.0-SNAPSHOT + + + activemq-opentelemetry + bundle + ActiveMQ :: OpenTelemetry + ActiveMQ OpenTelemetry tracing support + + + + + + + + ${project.groupId} + activemq-broker + + + + io.opentelemetry + opentelemetry-api + + + io.opentelemetry + opentelemetry-context + + + + + + + ${project.groupId} + activemq-kahadb-store + test + + + ${project.groupId} + activemq-broker + test-jar + test + + + io.opentelemetry + opentelemetry-sdk-testing + test + + + io.opentelemetry + opentelemetry-sdk + test + + + junit + junit + test + + + org.apache.logging.log4j + log4j-slf4j2-impl + test + + + org.apache.logging.log4j + log4j-core + test + + + + + + + org.apache.felix + maven-bundle-plugin + true + + + org.apache.activemq.opentelemetry + org.apache.activemq.broker.util.opentelemetry*;version=${project.version};-noimport:=true;-split-package:=merge-first + + org.apache.activemq*;version=${project.version};resolution:=optional, + io.opentelemetry.*;resolution:=optional, + * + + <_noee>true + + + + + + + diff --git a/activemq-opentelemetry/src/main/java/org/apache/activemq/broker/util/opentelemetry/ActiveMQMessageTextMapGetter.java b/activemq-opentelemetry/src/main/java/org/apache/activemq/broker/util/opentelemetry/ActiveMQMessageTextMapGetter.java new file mode 100644 index 00000000000..a3835ad5690 --- /dev/null +++ b/activemq-opentelemetry/src/main/java/org/apache/activemq/broker/util/opentelemetry/ActiveMQMessageTextMapGetter.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.util.opentelemetry; + +import java.io.IOException; +import java.util.Collections; + +import io.opentelemetry.context.propagation.TextMapGetter; +import org.apache.activemq.command.Message; + +public class ActiveMQMessageTextMapGetter implements TextMapGetter { + + public static final ActiveMQMessageTextMapGetter INSTANCE = new ActiveMQMessageTextMapGetter(); + + @Override + public Iterable keys(Message message) { + try { + if (message.getProperties() != null) { + return message.getProperties().keySet(); + } + } catch (IOException e) { + // ignore + } + return Collections.emptyList(); + } + + @Override + public String get(Message message, String key) { + try { + Object value = message.getProperty(key); + if (value instanceof String) { + return (String) value; + } + } catch (IOException e) { + // ignore + } + return null; + } +} diff --git a/activemq-opentelemetry/src/main/java/org/apache/activemq/broker/util/opentelemetry/ActiveMQMessageTextMapSetter.java b/activemq-opentelemetry/src/main/java/org/apache/activemq/broker/util/opentelemetry/ActiveMQMessageTextMapSetter.java new file mode 100644 index 00000000000..152129ad4a6 --- /dev/null +++ b/activemq-opentelemetry/src/main/java/org/apache/activemq/broker/util/opentelemetry/ActiveMQMessageTextMapSetter.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.util.opentelemetry; + +import java.io.IOException; + +import io.opentelemetry.context.propagation.TextMapSetter; +import org.apache.activemq.command.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ActiveMQMessageTextMapSetter implements TextMapSetter { + + private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageTextMapSetter.class); + + public static final ActiveMQMessageTextMapSetter INSTANCE = new ActiveMQMessageTextMapSetter(); + + @Override + public void set(Message message, String key, String value) { + try { + message.setProperty(key, value); + message.setMarshalledProperties(null); + } catch (IOException e) { + LOG.warn("Failed to set trace context property '{}' on message", key, e); + } + } +} diff --git a/activemq-opentelemetry/src/main/java/org/apache/activemq/broker/util/opentelemetry/OpenTelemetryBrokerPlugin.java b/activemq-opentelemetry/src/main/java/org/apache/activemq/broker/util/opentelemetry/OpenTelemetryBrokerPlugin.java new file mode 100644 index 00000000000..b30bd9b5b86 --- /dev/null +++ b/activemq-opentelemetry/src/main/java/org/apache/activemq/broker/util/opentelemetry/OpenTelemetryBrokerPlugin.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.util.opentelemetry; + +import java.util.concurrent.ConcurrentHashMap; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapPropagator; +import org.apache.activemq.broker.BrokerPluginSupport; +import org.apache.activemq.broker.ConsumerBrokerExchange; +import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A broker plugin that provides OpenTelemetry distributed tracing support. + * Traces message send, dispatch, and acknowledge operations with proper + * context propagation via W3C TraceContext. + * + * @org.apache.xbean.XBean element="openTelemetryPlugin" + */ +public class OpenTelemetryBrokerPlugin extends BrokerPluginSupport { + + private static final Logger LOG = LoggerFactory.getLogger(OpenTelemetryBrokerPlugin.class); + private static final String INSTRUMENTATION_NAME = "org.apache.activemq"; + + private boolean traceProducer = true; + private boolean traceConsumer = true; + private boolean traceAcknowledge = true; + + private final ConcurrentHashMap dispatchSpans = new ConcurrentHashMap<>(); + + public boolean isTraceProducer() { + return traceProducer; + } + + public void setTraceProducer(boolean traceProducer) { + this.traceProducer = traceProducer; + } + + public boolean isTraceConsumer() { + return traceConsumer; + } + + public void setTraceConsumer(boolean traceConsumer) { + this.traceConsumer = traceConsumer; + } + + public boolean isTraceAcknowledge() { + return traceAcknowledge; + } + + public void setTraceAcknowledge(boolean traceAcknowledge) { + this.traceAcknowledge = traceAcknowledge; + } + + @Override + public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { + if (!traceProducer) { + super.send(producerExchange, messageSend); + return; + } + + TextMapPropagator propagator = GlobalOpenTelemetry.getPropagators().getTextMapPropagator(); + Tracer tracer = GlobalOpenTelemetry.getTracer(INSTRUMENTATION_NAME); + + // Extract any existing context from the message (e.g., set by client) + Context parentContext = propagator.extract(Context.current(), messageSend, ActiveMQMessageTextMapGetter.INSTANCE); + + String destinationName = messageSend.getDestination().getPhysicalName(); + SpanBuilder spanBuilder = tracer.spanBuilder(destinationName + " publish") + .setSpanKind(SpanKind.PRODUCER) + .setParent(parentContext) + .setAttribute("messaging.system", "activemq") + .setAttribute("messaging.destination.name", destinationName) + .setAttribute("messaging.operation", "publish"); + + if (messageSend.getMessageId() != null) { + spanBuilder.setAttribute("messaging.message.id", messageSend.getMessageId().toString()); + } + if (producerExchange.getConnectionContext() != null + && producerExchange.getConnectionContext().getClientId() != null) { + spanBuilder.setAttribute("messaging.client_id", producerExchange.getConnectionContext().getClientId()); + } + + Span span = spanBuilder.startSpan(); + try { + // Inject trace context into the message for downstream propagation + Context contextWithSpan = parentContext.with(span); + propagator.inject(contextWithSpan, messageSend, ActiveMQMessageTextMapSetter.INSTANCE); + + super.send(producerExchange, messageSend); + } catch (Exception e) { + span.setStatus(StatusCode.ERROR, e.getMessage()); + span.recordException(e); + throw e; + } finally { + span.end(); + } + } + + @Override + public void preProcessDispatch(MessageDispatch messageDispatch) { + if (traceConsumer && messageDispatch != null && messageDispatch.getMessage() != null) { + try { + TextMapPropagator propagator = GlobalOpenTelemetry.getPropagators().getTextMapPropagator(); + Tracer tracer = GlobalOpenTelemetry.getTracer(INSTRUMENTATION_NAME); + + Message message = messageDispatch.getMessage(); + Context extractedContext = propagator.extract(Context.current(), message, ActiveMQMessageTextMapGetter.INSTANCE); + + String destinationName = message.getDestination().getPhysicalName(); + SpanBuilder spanBuilder = tracer.spanBuilder(destinationName + " deliver") + .setSpanKind(SpanKind.CONSUMER) + .setParent(extractedContext) + .setAttribute("messaging.system", "activemq") + .setAttribute("messaging.destination.name", destinationName) + .setAttribute("messaging.operation", "deliver"); + + if (message.getMessageId() != null) { + spanBuilder.setAttribute("messaging.message.id", message.getMessageId().toString()); + } + + Span span = spanBuilder.startSpan(); + dispatchSpans.put(messageDispatch, span); + } catch (Exception e) { + LOG.warn("Failed to create deliver span", e); + } + } + super.preProcessDispatch(messageDispatch); + } + + @Override + public void postProcessDispatch(MessageDispatch messageDispatch) { + if (messageDispatch != null) { + Span span = dispatchSpans.remove(messageDispatch); + if (span != null) { + span.end(); + } + } + super.postProcessDispatch(messageDispatch); + } + + @Override + public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { + if (!traceAcknowledge) { + super.acknowledge(consumerExchange, ack); + return; + } + + Tracer tracer = GlobalOpenTelemetry.getTracer(INSTRUMENTATION_NAME); + + String destinationName = ack.getDestination() != null ? ack.getDestination().getPhysicalName() : "unknown"; + SpanBuilder spanBuilder = tracer.spanBuilder(destinationName + " ack") + .setSpanKind(SpanKind.INTERNAL) + .setAttribute("messaging.system", "activemq") + .setAttribute("messaging.destination.name", destinationName) + .setAttribute("messaging.operation", "ack"); + + if (ack.getLastMessageId() != null) { + spanBuilder.setAttribute("messaging.message.id", ack.getLastMessageId().toString()); + } + if (consumerExchange.getConnectionContext() != null + && consumerExchange.getConnectionContext().getClientId() != null) { + spanBuilder.setAttribute("messaging.client_id", consumerExchange.getConnectionContext().getClientId()); + } + + Span span = spanBuilder.startSpan(); + try { + super.acknowledge(consumerExchange, ack); + } catch (Exception e) { + span.setStatus(StatusCode.ERROR, e.getMessage()); + span.recordException(e); + throw e; + } finally { + span.end(); + } + } +} diff --git a/activemq-opentelemetry/src/test/java/org/apache/activemq/broker/util/opentelemetry/OpenTelemetryBrokerPluginTest.java b/activemq-opentelemetry/src/test/java/org/apache/activemq/broker/util/opentelemetry/OpenTelemetryBrokerPluginTest.java new file mode 100644 index 00000000000..80834d464f7 --- /dev/null +++ b/activemq-opentelemetry/src/test/java/org/apache/activemq/broker/util/opentelemetry/OpenTelemetryBrokerPluginTest.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.util.opentelemetry; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.List; + +import jakarta.jms.Connection; +import jakarta.jms.DeliveryMode; +import jakarta.jms.MessageConsumer; +import jakarta.jms.MessageProducer; +import jakarta.jms.Session; +import jakarta.jms.TextMessage; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class OpenTelemetryBrokerPluginTest { + + private BrokerService broker; + private TransportConnector connector; + private InMemorySpanExporter spanExporter; + private Connection connection; + private Session session; + private final String queueName = "TEST.OTEL"; + + @Before + public void setUp() throws Exception { + GlobalOpenTelemetry.resetForTest(); + + spanExporter = InMemorySpanExporter.create(); + SdkTracerProvider tracerProvider = SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) + .build(); + + OpenTelemetrySdk.builder() + .setTracerProvider(tracerProvider) + .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())) + .buildAndRegisterGlobal(); + + OpenTelemetryBrokerPlugin plugin = new OpenTelemetryBrokerPlugin(); + + broker = new BrokerService(); + broker.setBrokerName("otelTestBroker"); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.setPlugins(new BrokerPlugin[]{plugin}); + connector = broker.addConnector("tcp://localhost:0"); + broker.start(); + broker.waitUntilStarted(); + + connection = new ActiveMQConnectionFactory(connector.getConnectUri()).createConnection(); + connection.start(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + @After + public void tearDown() throws Exception { + if (session != null) session.close(); + if (connection != null) connection.close(); + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + GlobalOpenTelemetry.resetForTest(); + } + + @Test + public void testPublishSpanCreatedOnSend() throws Exception { + MessageProducer producer = session.createProducer(session.createQueue(queueName)); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + TextMessage message = session.createTextMessage("test"); + producer.send(message); + producer.close(); + + // Allow async broker processing to complete + Thread.sleep(500); + + List spans = spanExporter.getFinishedSpanItems(); + assertTrue("Expected at least one span", spans.size() >= 1); + + SpanData publishSpan = findSpan(spans, queueName + " publish"); + assertNotNull("Publish span should exist", publishSpan); + assertEquals(SpanKind.PRODUCER, publishSpan.getKind()); + assertEquals("activemq", publishSpan.getAttributes().get(io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system"))); + assertEquals(queueName, publishSpan.getAttributes().get(io.opentelemetry.api.common.AttributeKey.stringKey("messaging.destination.name"))); + assertEquals("publish", publishSpan.getAttributes().get(io.opentelemetry.api.common.AttributeKey.stringKey("messaging.operation"))); + } + + @Test + public void testTraceContextPropagatedInMessageProperties() throws Exception { + MessageProducer producer = session.createProducer(session.createQueue(queueName)); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + TextMessage message = session.createTextMessage("propagation test"); + producer.send(message); + producer.close(); + + MessageConsumer consumer = session.createConsumer(session.createQueue(queueName)); + jakarta.jms.Message received = consumer.receive(5000); + assertNotNull("Should receive message", received); + + // The message should have traceparent property injected by the plugin + String traceparent = received.getStringProperty("traceparent"); + assertNotNull("traceparent property should be set", traceparent); + assertTrue("traceparent should follow W3C format", traceparent.startsWith("00-")); + + consumer.close(); + } + + @Test + public void testDeliverSpanCreatedOnDispatch() throws Exception { + MessageConsumer consumer = session.createConsumer(session.createQueue(queueName)); + + MessageProducer producer = session.createProducer(session.createQueue(queueName)); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + producer.send(session.createTextMessage("deliver test")); + producer.close(); + + jakarta.jms.Message received = consumer.receive(5000); + assertNotNull("Should receive message", received); + consumer.close(); + + // Allow time for postProcessDispatch + Thread.sleep(500); + + List spans = spanExporter.getFinishedSpanItems(); + SpanData deliverSpan = findSpan(spans, queueName + " deliver"); + assertNotNull("Deliver span should exist", deliverSpan); + assertEquals(SpanKind.CONSUMER, deliverSpan.getKind()); + assertEquals("activemq", deliverSpan.getAttributes().get(io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system"))); + assertEquals("deliver", deliverSpan.getAttributes().get(io.opentelemetry.api.common.AttributeKey.stringKey("messaging.operation"))); + } + + @Test + public void testAckSpanCreatedOnAcknowledge() throws Exception { + MessageConsumer consumer = session.createConsumer(session.createQueue(queueName)); + + MessageProducer producer = session.createProducer(session.createQueue(queueName)); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + producer.send(session.createTextMessage("ack test")); + producer.close(); + + jakarta.jms.Message received = consumer.receive(5000); + assertNotNull("Should receive message", received); + consumer.close(); + + // Allow time for ack processing + Thread.sleep(500); + + List spans = spanExporter.getFinishedSpanItems(); + SpanData ackSpan = findSpan(spans, queueName + " ack"); + assertNotNull("Ack span should exist", ackSpan); + assertEquals(SpanKind.INTERNAL, ackSpan.getKind()); + assertEquals("activemq", ackSpan.getAttributes().get(io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system"))); + assertEquals("ack", ackSpan.getAttributes().get(io.opentelemetry.api.common.AttributeKey.stringKey("messaging.operation"))); + } + + @Test + public void testDisabledProducerTracing() throws Exception { + // Stop and reconfigure with tracing disabled + connection.close(); + broker.stop(); + broker.waitUntilStopped(); + spanExporter.reset(); + + OpenTelemetryBrokerPlugin plugin = new OpenTelemetryBrokerPlugin(); + plugin.setTraceProducer(false); + plugin.setTraceConsumer(false); + plugin.setTraceAcknowledge(false); + + broker = new BrokerService(); + broker.setBrokerName("otelTestBroker2"); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.setPlugins(new BrokerPlugin[]{plugin}); + connector = broker.addConnector("tcp://localhost:0"); + broker.start(); + broker.waitUntilStarted(); + + connection = new ActiveMQConnectionFactory(connector.getConnectUri()).createConnection(); + connection.start(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = session.createProducer(session.createQueue(queueName)); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + producer.send(session.createTextMessage("no trace")); + producer.close(); + + MessageConsumer consumer = session.createConsumer(session.createQueue(queueName)); + consumer.receive(5000); + consumer.close(); + + Thread.sleep(500); + + List spans = spanExporter.getFinishedSpanItems(); + assertFalse("Should have no publish spans when tracing disabled", + spans.stream().anyMatch(s -> s.getName().contains("publish"))); + assertFalse("Should have no deliver spans when tracing disabled", + spans.stream().anyMatch(s -> s.getName().contains("deliver"))); + assertFalse("Should have no ack spans when tracing disabled", + spans.stream().anyMatch(s -> s.getName().contains("ack"))); + } + + private SpanData findSpan(List spans, String name) { + return spans.stream() + .filter(s -> s.getName().equals(name)) + .findFirst() + .orElse(null); + } +} diff --git a/assembly/pom.xml b/assembly/pom.xml index e4a9922b4dc..a86496989ac 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -136,6 +136,10 @@ ${project.groupId} activemq-shiro + + ${project.groupId} + activemq-opentelemetry + ${project.groupId} activemq-spring diff --git a/assembly/src/release/conf/activemq.xml b/assembly/src/release/conf/activemq.xml index 9f32bd90f59..33b18e2a501 100644 --- a/assembly/src/release/conf/activemq.xml +++ b/assembly/src/release/conf/activemq.xml @@ -96,6 +96,15 @@ + + + + + + + + + file:${activemq.conf}/credentials.properties + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/bom/pom.xml b/bom/pom.xml index d70f15f14eb..11a9fcf9440 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -147,6 +147,11 @@ activemq-shiro ${project.version} + + org.apache.activemq + activemq-opentelemetry + ${project.version} + org.apache.activemq activemq-spring diff --git a/pom.xml b/pom.xml index 2745d052477..9d7b3650daf 100644 --- a/pom.xml +++ b/pom.xml @@ -96,6 +96,7 @@ 4.2.10.Final 2.1.0 2.1.0 + 1.43.0 2.0.17 1.1.2 6.2.17 @@ -211,6 +212,7 @@ activemq-rar activemq-run activemq-shiro + activemq-opentelemetry activemq-spring activemq-runtime-config activemq-tooling @@ -348,6 +350,11 @@ activemq-shiro ${project.version} + + org.apache.activemq + activemq-opentelemetry + ${project.version} + org.apache.activemq activemq-spring @@ -661,6 +668,15 @@ true + + + io.opentelemetry + opentelemetry-bom + ${opentelemetry-version} + pom + import + + org.springframework