diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java index 43a95cc86016f..3f82b64acc14f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java @@ -49,6 +49,12 @@ public ConsumerBusyException(String msg) { } } + public static class ConsumerClosedException extends BrokerServiceException { + public ConsumerClosedException(String msg) { + super(msg); + } + } + public static class ProducerBusyException extends BrokerServiceException { public ProducerBusyException(String msg) { super(msg); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index d1bc4953d11ca..6d63a987eb1e3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -62,6 +62,7 @@ import java.util.stream.Collectors; import javax.naming.AuthenticationException; import javax.net.ssl.SSLSession; +import javax.ws.rs.NotAuthorizedException; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Response; import lombok.Getter; @@ -89,9 +90,11 @@ import org.apache.pulsar.broker.namespace.LookupOptions; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; +import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerClosedException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException; +import org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -1258,8 +1261,11 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { // epoch using different consumer futures, and only remove the consumer future from the map // if subscribe failed . CompletableFuture consumerFuture = new CompletableFuture<>(); - CompletableFuture existingConsumerFuture = - consumers.putIfAbsent(consumerId, consumerFuture); + consumerFuture.exceptionally((ex) -> { + consumers.remove(consumerId, consumerFuture); + return null; + }); + CompletableFuture existingConsumerFuture = consumers.putIfAbsent(consumerId, consumerFuture); isAuthorizedFuture.thenApply(isAuthorized -> { if (isAuthorized) { if (log.isDebugEnabled()) { @@ -1274,7 +1280,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { service.getPulsar().getConfiguration().getMaxConsumerMetadataSize()); } catch (IllegalArgumentException iae) { final String msg = iae.getMessage(); - consumers.remove(consumerId, consumerFuture); + consumerFuture.completeExceptionally(iae); commandSender.sendErrorResponse(requestId, ServerError.MetadataError, msg); return null; } @@ -1396,7 +1402,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { if (consumer.checkAndApplyTopicMigration()) { log.info("[{}] Disconnecting consumer {} on migrated subscription on topic {} / {}", remoteAddress, consumerId, subscriptionName, topicName); - consumers.remove(consumerId, consumerFuture); + consumerFuture.completeExceptionally( + new TopicMigratedException("Topic has been migrated")); return; } @@ -1423,7 +1430,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { + " after timeout on client side {}: {}", remoteAddress, consumer, e.getMessage()); } - consumers.remove(consumerId, consumerFuture); + consumerFuture.completeExceptionally(new ConsumerClosedException( + "Cleared consumer created after timeout on client side")); } }) @@ -1450,7 +1458,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { log.info("consumer client doesn't support topic migration handling {}-{}-{}", topicName, remoteAddress, consumerId); } - consumers.remove(consumerId, consumerFuture); + consumerFuture.completeExceptionally(exception); closeConsumer(consumerId, Optional.empty()); return null; } @@ -1471,7 +1479,6 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { BrokerServiceException.getClientErrorCode(exception.getCause()), exception.getCause().getMessage()); } - consumers.remove(consumerId, consumerFuture); return null; @@ -1479,13 +1486,13 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { } else { String msg = "Client is not authorized to subscribe"; log.warn("[{}] {} with role {}", remoteAddress, msg, getPrincipal()); - consumers.remove(consumerId, consumerFuture); + consumerFuture.completeExceptionally(new NotAuthorizedException(msg)); writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg)); } return null; }).exceptionally(ex -> { logAuthException(remoteAddress, "subscribe", getPrincipal(), Optional.of(topicName), ex); - consumers.remove(consumerId, consumerFuture); + consumerFuture.completeExceptionally(ex); commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, ex.getMessage()); return null; }); @@ -2229,6 +2236,7 @@ protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) { // create operation will complete, the new consumer will be discarded. log.info("[{}] Closed consumer before its creation was completed. consumerId={}", remoteAddress, consumerId); + consumers.remove(consumerId, consumerFuture); commandSender.sendSuccessResponse(requestId); return; } @@ -2236,6 +2244,7 @@ protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) { if (consumerFuture.isCompletedExceptionally()) { log.info("[{}] Closed consumer that already failed to be created. consumerId={}", remoteAddress, consumerId); + consumers.remove(consumerId, consumerFuture); commandSender.sendSuccessResponse(requestId); return; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 4d734081e43cd..9cbaa60690daa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -2436,13 +2436,6 @@ public void testSubscribeBookieTimeout() throws Exception { null /* assignedBrokerServiceUrl */, null /* assignedBrokerServiceUrlTls */); channel.writeInbound(closeConsumer); - ByteBuf subscribe2 = Commands.newSubscribe(successTopicName, // - successSubName, 1 /* consumer id */, 3 /* request id */, SubType.Exclusive, 0, - "test" /* consumer name */, 0 /* avoid reseting cursor */); - channel.writeInbound(subscribe2); - - openTopicFail.get().run(); - Object response; // Close succeeds @@ -2450,30 +2443,52 @@ public void testSubscribeBookieTimeout() throws Exception { assertEquals(response.getClass(), CommandSuccess.class); assertEquals(((CommandSuccess) response).getRequestId(), 2); - // Subscribe fails - response = getResponse(); - assertEquals(response.getClass(), CommandError.class); - assertEquals(((CommandError) response).getRequestId(), 3); + openTopicFail.get().run(); + + // We should not receive response for 1st consumer, since it was cancelled by the close + assertTrue(channel.outboundMessages().isEmpty()); + assertTrue(channel.isActive()); Awaitility.await().until(() -> !serverCnx.hasConsumer(1)); - ByteBuf subscribe3 = Commands.newSubscribe(successTopicName, // - successSubName, 1 /* consumer id */, 4 /* request id */, SubType.Exclusive, 0, + ByteBuf subscribe2 = Commands.newSubscribe(successTopicName, // + successSubName, 1 /* consumer id */, 3 /* request id */, SubType.Exclusive, 0, "test" /* consumer name */, 0 /* avoid reseting cursor */); - channel.writeInbound(subscribe3); + channel.writeInbound(subscribe2); openTopicSuccess.get().run(); // Subscribe succeeds response = getResponse(); assertEquals(response.getClass(), CommandSuccess.class); - assertEquals(((CommandSuccess) response).getRequestId(), 4); + assertEquals(((CommandSuccess) response).getRequestId(), 3); - Thread.sleep(100); + channel.finish(); + } - // We should not receive response for 1st producer, since it was cancelled by the close - assertTrue(channel.outboundMessages().isEmpty()); - assertTrue(channel.isActive()); + @Test + public void testCleanupCacheWhenHandleCloseConsumer() throws Exception { + resetChannel(); + setChannelConnected(); + + CompletableFuture consumerFuture = new CompletableFuture<>(); + int consumerId = 1; + + // Assumer a consumer has been added to the cache + serverCnx.getConsumers().put(1, consumerFuture); + + ByteBuf closeConsumer = Commands.newCloseConsumer(consumerId /* consumer id */, 1 /* request id */, + null /* assignedBrokerServiceUrl */, null /* assignedBrokerServiceUrlTls */); + channel.writeInbound(closeConsumer); + + Object response; + + // Close succeeds + response = getResponse(); + assertEquals(response.getClass(), CommandSuccess.class); + assertEquals(((CommandSuccess) response).getRequestId(), 1); + + assertFalse(serverCnx.hasConsumer(consumerId)); channel.finish(); }