From d1652b9d052d32fc2a0bf0bcb08a1a3138d9ef77 Mon Sep 17 00:00:00 2001 From: YukiInu Date: Thu, 13 Nov 2025 14:36:29 +0100 Subject: [PATCH] Fixing problem with retry messages in Interconnect --- CHANGELOG.md | 1 + .../daemon/handler/ADaemonMessageHandler.java | 44 +++++++------------ .../daemon/jms/InterconnectMessageSender.java | 33 ++++++++++---- .../daemon/model/InterconnectContext.java | 6 +-- .../daemon/util/DaemonExceptionMapper.java | 12 ++--- .../de/taimos/dvalin/jms/JmsConnector.java | 31 ++++++------- 6 files changed, 63 insertions(+), 64 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1df05365..2dda6990 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ * Swagger 2.2.40 * Velocity Engine 2.4.1 * Fixing DaemonExceptionMapper +* Fixing problem with retry messages in Interconnect * Fixed vulnerabilities: CVE-2024-13009(Jetty), CVE-2025-23184(Apache CXF), CVE-2024-57699 (Json-smart),CVE-2025-27533 (ActiveMQ) # 1.37 diff --git a/interconnect/core/src/main/java/de/taimos/dvalin/interconnect/core/daemon/handler/ADaemonMessageHandler.java b/interconnect/core/src/main/java/de/taimos/dvalin/interconnect/core/daemon/handler/ADaemonMessageHandler.java index d0deacfe..6b0a25fc 100644 --- a/interconnect/core/src/main/java/de/taimos/dvalin/interconnect/core/daemon/handler/ADaemonMessageHandler.java +++ b/interconnect/core/src/main/java/de/taimos/dvalin/interconnect/core/daemon/handler/ADaemonMessageHandler.java @@ -149,17 +149,18 @@ private Message decryptIfNecessary(ICryptoService cryptoService, Message message private void handleWithReply(IDaemonHandler handler, InterconnectResponseContext context) throws Exception { try { - final InterconnectObject responseIco = this.handleRequest(handler, context.getCreateResponseMethod(), - context.getReceivedContext().getRequestIco()); + final InterconnectObject responseIco = this.handleRequest(handler, context.getCreateResponseMethod(), context.getReceivedContext().getRequestIco()); context.setResponseICO(responseIco); if (this.duration(context) == HandlingDurationType.TIMEOUT) { return; } this.reply(context); } catch (final DaemonError e) { - this.getLogger().debug("DaemonError for " + context.getCreateResponseMethod().getMethod().getName() + "(" + - context.getReceivedContext().getIcoClass().getSimpleName() + ")" + " with " + - de.taimos.dvalin.interconnect.model.InterconnectContext.getContext(), e); + if (e.getNumber().get() < 0) { + this.getLogger().error("DaemonError for " + context.getCreateResponseMethod().getMethod().getName() + "(" + context.getReceivedContext().getIcoClass().getSimpleName() + ")" + " with " + de.taimos.dvalin.interconnect.model.InterconnectContext.getContext(), e); + } else { + this.getLogger().debug("DaemonError for " + context.getCreateResponseMethod().getMethod().getName() + "(" + context.getReceivedContext().getIcoClass().getSimpleName() + ")" + " with " + de.taimos.dvalin.interconnect.model.InterconnectContext.getContext(), e); + } this.sendErrorResponse(e, context); } } @@ -175,13 +176,9 @@ private void sendErrorResponse(DaemonError e, InterconnectResponseContext contex } private void updateThreadContext(InterconnectResponseContext context) throws Exception { - de.taimos.dvalin.interconnect.model.InterconnectContext.setUuid( - ADaemonMessageHandler.getUuid(context.getReceivedMessage(), - context.getReceivedContext().getIcoClass())); - de.taimos.dvalin.interconnect.model.InterconnectContext.setDeliveryCount( - this.getDeliveryCount(context.getReceivedMessage())); - de.taimos.dvalin.interconnect.model.InterconnectContext.setRedelivered( - context.getReceivedMessage().getJMSRedelivered()); + de.taimos.dvalin.interconnect.model.InterconnectContext.setUuid(ADaemonMessageHandler.getUuid(context.getReceivedMessage(), context.getReceivedContext().getIcoClass())); + de.taimos.dvalin.interconnect.model.InterconnectContext.setDeliveryCount(this.getDeliveryCount(context.getReceivedMessage())); + de.taimos.dvalin.interconnect.model.InterconnectContext.setRedelivered(context.getReceivedMessage().getJMSRedelivered()); Class ivoClass; if (context.getReceivedContext() instanceof IVO) { ivoClass = ADaemonMessageHandler.uncheckedCast(context.getResponseICO()); @@ -192,10 +189,7 @@ private void updateThreadContext(InterconnectResponseContext context) throws Exc private static DaemonMethod getDaemonMethod(RegistryEntry registryEntry, InterconnectResponseContext context) throws Exception { final DaemonMethod method = registryEntry.getMethod(); if (method.isSecure() != context.getReceivedContext().isSecure()) { - throw new Exception( - "Insecure call (is " + context.getReceivedContext().isSecure() + " should be " + method.isSecure() + - ") for " + context.getReceivedContext().getIcoClass().getSimpleName() + " from " + - context.getReceivedMessage().getJMSReplyTo()); + throw new Exception("Insecure call (is " + context.getReceivedContext().isSecure() + " should be " + method.isSecure() + ") for " + context.getReceivedContext().getIcoClass().getSimpleName() + " from " + context.getReceivedMessage().getJMSReplyTo()); } return method; } @@ -203,9 +197,7 @@ private static DaemonMethod getDaemonMethod(RegistryEntry registryEntry, Interco private RegistryEntry getRegistryEntry(InterconnectResponseContext context) throws Exception { final RegistryEntry registryEntry = this.registry.get(context.getReceivedContext().getIcoClass()); if (registryEntry == null) { - throw new Exception( - "No registered method found for " + context.getReceivedContext().getIcoClass().getSimpleName() + - " from " + context.getReceivedMessage().getJMSReplyTo()); + throw new Exception("No registered method found for " + context.getReceivedContext().getIcoClass().getSimpleName() + " from " + context.getReceivedMessage().getJMSReplyTo()); } return registryEntry; } @@ -241,8 +233,7 @@ private void logInvoke(InterconnectResponseContext context) { .append("(").append(context.getReceivedContext().getIcoClass().getSimpleName()).append(")"); if (context.getReceivedContext().getRequestIco() instanceof IPageable) { sbInvokeLog // - .append(" at Page ").append(((IPageable) context.getReceivedContext().getRequestIco()).getOffset()) - .append(";").append(((IPageable) context.getReceivedContext().getRequestIco()).getLimit()); + .append(" at Page ").append(((IPageable) context.getReceivedContext().getRequestIco()).getOffset()).append(";").append(((IPageable) context.getReceivedContext().getRequestIco()).getLimit()); } sbInvokeLog.append(" with ").append(de.taimos.dvalin.interconnect.model.InterconnectContext.getContext()); this.getLogger().info(sbInvokeLog.toString()); @@ -267,15 +258,12 @@ private int getDeliveryCount(Message message) throws JMSException { private static UUID getUuid(Message message, Class icoClass) throws Exception { final String requestUUID = message.getStringProperty(InterconnectContext.HEADER_REQUEST_UUID); if (requestUUID == null) { - throw new Exception("No request UUID found in message with " + icoClass.getSimpleName() + " from " + - message.getJMSReplyTo()); + throw new Exception("No request UUID found in message with " + icoClass.getSimpleName() + " from " + message.getJMSReplyTo()); } try { return UUID.fromString(requestUUID); } catch (final IllegalArgumentException e) { - throw new Exception( - "No valid request UUID " + requestUUID + " message with " + icoClass.getSimpleName() + " from " + - message.getJMSReplyTo()); + throw new Exception("No valid request UUID " + requestUUID + " message with " + icoClass.getSimpleName() + " from " + message.getJMSReplyTo()); } } @@ -314,9 +302,7 @@ private InterconnectObject handleRequest(final IDaemonHandler handler, final Dae throw new IdemponentRetryException(targetException); } - this.getLogger().error( - "Exception in non-idempotent " + method.getMethod().getName() + "(" + ico.getClass().getSimpleName() + - ")" + " with " + de.taimos.dvalin.interconnect.model.InterconnectContext.getContext(), e); + this.getLogger().error("Exception in non-idempotent " + method.getMethod().getName() + "(" + ico.getClass().getSimpleName() + ")" + " with " + de.taimos.dvalin.interconnect.model.InterconnectContext.getContext(), e); throw new DaemonError(FrameworkErrors.FRAMEWORK_ERROR, targetException); } catch (final Exception e) { throw new RuntimeException(e); diff --git a/interconnect/core/src/main/java/de/taimos/dvalin/interconnect/core/daemon/jms/InterconnectMessageSender.java b/interconnect/core/src/main/java/de/taimos/dvalin/interconnect/core/daemon/jms/InterconnectMessageSender.java index 2a38f877..10d1c2dc 100644 --- a/interconnect/core/src/main/java/de/taimos/dvalin/interconnect/core/daemon/jms/InterconnectMessageSender.java +++ b/interconnect/core/src/main/java/de/taimos/dvalin/interconnect/core/daemon/jms/InterconnectMessageSender.java @@ -26,6 +26,9 @@ import de.taimos.dvalin.interconnect.core.daemon.exceptions.UnexpectedTypeException; import de.taimos.dvalin.interconnect.core.daemon.model.InterconnectContext; import de.taimos.dvalin.interconnect.core.daemon.util.DaemonExceptionMapper; +import de.taimos.dvalin.interconnect.core.exceptions.InfrastructureException; +import de.taimos.dvalin.interconnect.core.exceptions.SerializationException; +import de.taimos.dvalin.interconnect.core.exceptions.TimeoutException; import de.taimos.dvalin.interconnect.model.FutureImpl; import de.taimos.dvalin.interconnect.model.InterconnectList; import de.taimos.dvalin.interconnect.model.InterconnectMapper; @@ -35,11 +38,10 @@ import de.taimos.dvalin.interconnect.model.service.DaemonErrorNumber; import de.taimos.dvalin.interconnect.model.service.DaemonScanner; import de.taimos.dvalin.jms.IJmsConnector; +import de.taimos.dvalin.jms.exceptions.CommunicationFailureException; +import de.taimos.dvalin.jms.exceptions.CommunicationFailureException.CommunicationError; import de.taimos.dvalin.jms.exceptions.CreationException; import de.taimos.dvalin.jms.exceptions.CreationException.Source; -import de.taimos.dvalin.interconnect.core.exceptions.InfrastructureException; -import de.taimos.dvalin.interconnect.core.exceptions.SerializationException; -import de.taimos.dvalin.interconnect.core.exceptions.TimeoutException; import de.taimos.dvalin.jms.model.JmsResponseContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,13 +102,26 @@ public void sendRequest(InterconnectContext interconnectContext) throws DaemonEr private boolean checkForRetry(InterconnectContext so, InfrastructureException e) throws TimeoutException { if (!so.isIdempotent()) { + this.logger.warn("No message retry due to missing idempotency."); return false; } - if (!(e instanceof CreationException || e instanceof TimeoutException)) { - return false; + + if (e instanceof TimeoutException) { + return true; } - return !(e instanceof CreationException) || - Source.DESTINATION.equals(((CreationException) e).getExceptionSource()); + + if (e instanceof CreationException) { + return Source.DESTINATION.equals(((CreationException) e).getExceptionSource()); + } + + if (e instanceof CommunicationFailureException) { + if (CommunicationError.SEND.equals(((CommunicationFailureException) e).getCommunicationError())) { + this.logger.warn("Retrying message send because of: {}", FrameworkErrors.SEND_ERROR); + return true; + } + } + + return false; } private void sendRequestRetry(InterconnectContext so) throws DaemonError, TimeoutException { @@ -162,10 +177,12 @@ private R request(InterconnectContext requestObject, Class responseClazz) try { responseObject = this.jmsConnector.request(requestObject); } catch (SerializationException e) { - throw new RuntimeException(e); + DaemonExceptionMapper.mapAndThrow(e); } catch (InfrastructureException e) { if (this.checkForRetry(requestObject, e)) { responseObject = this.sendSyncRequestRetry(requestObject); + } else { + DaemonExceptionMapper.mapAndThrow(e); } } diff --git a/interconnect/core/src/main/java/de/taimos/dvalin/interconnect/core/daemon/model/InterconnectContext.java b/interconnect/core/src/main/java/de/taimos/dvalin/interconnect/core/daemon/model/InterconnectContext.java index 2ddf4f12..f1980502 100644 --- a/interconnect/core/src/main/java/de/taimos/dvalin/interconnect/core/daemon/model/InterconnectContext.java +++ b/interconnect/core/src/main/java/de/taimos/dvalin/interconnect/core/daemon/model/InterconnectContext.java @@ -1,9 +1,9 @@ package de.taimos.dvalin.interconnect.core.daemon.model; import com.google.common.base.Preconditions; +import de.taimos.dvalin.interconnect.core.exceptions.InfrastructureException; import de.taimos.dvalin.interconnect.model.InterconnectMapper; import de.taimos.dvalin.interconnect.model.InterconnectObject; -import de.taimos.dvalin.interconnect.core.exceptions.InfrastructureException; import de.taimos.dvalin.jms.model.JmsContext; import de.taimos.dvalin.jms.model.JmsTarget; @@ -150,13 +150,13 @@ public InterconnectContext createResponseContext(InterconnectObject responseICO) return new InterconnectContextBuilder(this)// .withDestination(this.getReplyToDestination()) // .withRequestICO(responseICO) // - .withIdempotent(false) // + .withIdempotent(true) // .withTarget(JmsTarget.DESTINATION).build(); } return new InterconnectContextBuilder(this)// .withDestinationName(this.getReplyToQueueName()) // .withRequestICO(responseICO) // - .withIdempotent(false) // + .withIdempotent(true) // .withTarget(JmsTarget.QUEUE).build(); } } diff --git a/interconnect/core/src/main/java/de/taimos/dvalin/interconnect/core/daemon/util/DaemonExceptionMapper.java b/interconnect/core/src/main/java/de/taimos/dvalin/interconnect/core/daemon/util/DaemonExceptionMapper.java index 7413c176..c0bdf131 100644 --- a/interconnect/core/src/main/java/de/taimos/dvalin/interconnect/core/daemon/util/DaemonExceptionMapper.java +++ b/interconnect/core/src/main/java/de/taimos/dvalin/interconnect/core/daemon/util/DaemonExceptionMapper.java @@ -2,16 +2,16 @@ import de.taimos.dvalin.interconnect.core.daemon.exceptions.FrameworkErrors; import de.taimos.dvalin.interconnect.core.daemon.exceptions.UnexpectedTypeException; +import de.taimos.dvalin.interconnect.core.exceptions.InfrastructureException; +import de.taimos.dvalin.interconnect.core.exceptions.MessageCryptoException; +import de.taimos.dvalin.interconnect.core.exceptions.SerializationException; +import de.taimos.dvalin.interconnect.core.exceptions.TimeoutException; import de.taimos.dvalin.interconnect.model.service.DaemonError; import de.taimos.dvalin.interconnect.model.service.DaemonErrorNumber; import de.taimos.dvalin.jms.exceptions.CommunicationFailureException; import de.taimos.dvalin.jms.exceptions.CommunicationFailureException.CommunicationError; import de.taimos.dvalin.jms.exceptions.CreationException; import de.taimos.dvalin.jms.exceptions.CreationException.Source; -import de.taimos.dvalin.interconnect.core.exceptions.InfrastructureException; -import de.taimos.dvalin.interconnect.core.exceptions.MessageCryptoException; -import de.taimos.dvalin.interconnect.core.exceptions.SerializationException; -import de.taimos.dvalin.interconnect.core.exceptions.TimeoutException; /** * Copyright 2024 Cinovo AG
@@ -77,10 +77,10 @@ private static Exception handleInfrastructureException(InfrastructureException e private static Exception handleCommunicationFailureException(CommunicationFailureException e) { if (CommunicationError.SEND.equals(e.getCommunicationError())) { - return new DaemonError(FrameworkErrors.SEND_ERROR, e); + return new DaemonError(FrameworkErrors.SEND_ERROR, e.getCause()); } if (CommunicationError.RECEIVE.equals(e.getCommunicationError())) { - return new DaemonError(FrameworkErrors.RECEIVE_ERROR, e); + return new DaemonError(FrameworkErrors.RECEIVE_ERROR, e.getCause()); } if (CommunicationError.INVALID_RESPONSE.equals(e.getCommunicationError())) { return new DaemonError(FrameworkErrors.INVALID_RESPONSE_ERROR, e); diff --git a/jms/core/src/main/java/de/taimos/dvalin/jms/JmsConnector.java b/jms/core/src/main/java/de/taimos/dvalin/jms/JmsConnector.java index 79751853..1c66dd21 100644 --- a/jms/core/src/main/java/de/taimos/dvalin/jms/JmsConnector.java +++ b/jms/core/src/main/java/de/taimos/dvalin/jms/JmsConnector.java @@ -1,13 +1,13 @@ package de.taimos.dvalin.jms; +import de.taimos.dvalin.interconnect.core.exceptions.InfrastructureException; +import de.taimos.dvalin.interconnect.core.exceptions.SerializationException; +import de.taimos.dvalin.interconnect.core.exceptions.TimeoutException; import de.taimos.dvalin.jms.crypto.ICryptoService; import de.taimos.dvalin.jms.exceptions.CommunicationFailureException; import de.taimos.dvalin.jms.exceptions.CommunicationFailureException.CommunicationError; import de.taimos.dvalin.jms.exceptions.CreationException; import de.taimos.dvalin.jms.exceptions.CreationException.Source; -import de.taimos.dvalin.interconnect.core.exceptions.InfrastructureException; -import de.taimos.dvalin.interconnect.core.exceptions.SerializationException; -import de.taimos.dvalin.interconnect.core.exceptions.TimeoutException; import de.taimos.dvalin.jms.model.JmsContext; import de.taimos.dvalin.jms.model.JmsResponseContext; import org.slf4j.Logger; @@ -56,11 +56,11 @@ public JmsConnector(ConnectionFactory connectionFactory, ICryptoService cryptoSe @Override public void send(@Nonnull JmsContext context) throws SerializationException, InfrastructureException { try (Connection connection = this.connectionFactory.createConnection()) { + connection.start(); try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { final Destination destination = JmsConnector.createDestination(session, context); Message txt = this.createTextMessageForDestination(session, context, null); - JmsConnector.sendMessage(session, destination, txt, - context.getTimeToLive(), context.getPriority()); + JmsConnector.sendMessage(session, destination, txt, context.getTimeToLive(), context.getPriority()); } catch (final JMSException e) { throw new CreationException(Source.SESSION, e); } @@ -72,12 +72,10 @@ public void send(@Nonnull JmsContext context) throws SerializationException, Inf @Override public JmsResponseContext receive(@Nonnull JmsContext context) throws InfrastructureException, SerializationException { try (Connection connection = this.connectionFactory.createConnection()) { + connection.start(); try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { - try (MessageConsumer consumer = session.createConsumer(JmsConnector.createDestination(session, context), - context.getSelector())) { - connection.start(); - Message message = this.syncReceiveSingleMessage(consumer, context.getReceiveTimeout(), - context.isSecure()); + try (MessageConsumer consumer = session.createConsumer(JmsConnector.createDestination(session, context), context.getSelector())) { + Message message = this.syncReceiveSingleMessage(consumer, context.getReceiveTimeout(), context.isSecure()); return new JmsResponseContext<>(message); } catch (final JMSException e) { throw new CreationException(Source.CONSUMER, e); @@ -93,6 +91,7 @@ public JmsResponseContext receive(@Nonnull JmsContext context @Override public JmsResponseContext request(@Nonnull JmsContext context) throws SerializationException, InfrastructureException { try (Connection connection = this.connectionFactory.createConnection()) { + connection.start(); try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { try { final TemporaryQueue temporaryQueue = session.createTemporaryQueue(); @@ -100,11 +99,8 @@ public JmsResponseContext request(@Nonnull JmsContext context final Message txt = this.createTextMessageForDestination(session, context, temporaryQueue); try (MessageConsumer consumer = session.createConsumer(temporaryQueue, context.getSelector())) { - connection.start(); - JmsConnector.sendMessage(session, requestQueue, txt, - context.getTimeToLive(), context.getPriority()); - Message response = this.syncReceiveSingleMessage(consumer, context.getReceiveTimeout(), - context.isSecure()); + JmsConnector.sendMessage(session, requestQueue, txt, context.getTimeToLive(), context.getPriority()); + Message response = this.syncReceiveSingleMessage(consumer, context.getReceiveTimeout(), context.isSecure()); return new JmsResponseContext<>(response); } catch (final JMSException e) { throw new CreationException(Source.CONSUMER); @@ -135,14 +131,13 @@ private static void sendMessage(Session session, Destination destination, Messag @Override public List receiveBulkFromDestination(JmsContext context, final int maxMessages) throws InfrastructureException, SerializationException { try (Connection connection = this.connectionFactory.createConnection()) { + connection.start(); try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { Destination destination = JmsConnector.createDestination(session, context); List messages = new ArrayList<>(); try (MessageConsumer consumer = session.createConsumer(destination, context.getSelector())) { - connection.start(); while (messages.size() < maxMessages) { - Message message = this.syncReceiveSingleMessage(consumer, context.getReceiveTimeout(), - context.isSecure()); + Message message = this.syncReceiveSingleMessage(consumer, context.getReceiveTimeout(), context.isSecure()); messages.add(message); } } catch (final JMSException e) {