From 339a6b890ab251a9ef3ce51ae373f1175684e1e4 Mon Sep 17 00:00:00 2001 From: Dream95 Date: Tue, 27 Jan 2026 21:08:24 +0800 Subject: [PATCH] [improve][broker] Add http produce backlog quota check - Add http produce backlog quota check for both destination_storage and message_age Signed-off-by: Dream95 --- .../apache/pulsar/broker/rest/TopicsBase.java | 60 ++++++++------ .../pulsar/broker/admin/TopicsTest.java | 83 +++++++++++++++++++ 2 files changed, 117 insertions(+), 26 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java index 425e715a1e5f0..e71b250ee965b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java @@ -88,6 +88,7 @@ import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.schema.SchemaData; @@ -272,35 +273,42 @@ private void internalPublishMessages(TopicName topicName, ProducerMessages reque private CompletableFuture publishSingleMessageToPartition(String topic, Message message) { CompletableFuture publishResult = new CompletableFuture<>(); pulsar().getBrokerService().getTopic(topic, false) - .thenAccept(t -> { - // TODO: Check message backlog and fail if backlog too large. - if (!t.isPresent()) { - // Topic not found, and remove from owning partition list. - publishResult.completeExceptionally(new BrokerServiceException.TopicNotFoundException("Topic not " - + "owned by current broker.")); - TopicName topicName = TopicName.get(topic); - pulsar().getBrokerService().getOwningTopics().get(topicName.getPartitionedTopicName()) - .remove(topicName.getPartitionIndex()); - } else { - try { - ByteBuf headersAndPayload = messageToByteBuf(message); - try { - Topic topicObj = t.get(); - topicObj.publishMessage(headersAndPayload, - RestMessagePublishContext.get(publishResult, topicObj, System.nanoTime())); - } finally { - headersAndPayload.release(); + .thenCompose(tOpt -> { + if (tOpt.isEmpty()) { + publishResult.completeExceptionally( + new BrokerServiceException.TopicNotFoundException("Topic not " + + "owned by current broker.")); + TopicName tn = TopicName.get(topic); + pulsar().getBrokerService().getOwningTopics().get(tn.getPartitionedTopicName()) + .remove(tn.getPartitionIndex()); + return CompletableFuture.completedFuture(null); } - } catch (Exception e) { + Topic topicObj = tOpt.get(); + CompletableFuture backlogQuotaCheckFuture = CompletableFuture.allOf( + topicObj.checkBacklogQuotaExceeded(message.getProducerName(), + BacklogQuota.BacklogQuotaType.destination_storage), + topicObj.checkBacklogQuotaExceeded(message.getProducerName(), + BacklogQuota.BacklogQuotaType.message_age)); + return backlogQuotaCheckFuture.thenRun(() -> { + ByteBuf headersAndPayload = messageToByteBuf(message); + try { + topicObj.publishMessage(headersAndPayload, + RestMessagePublishContext.get(publishResult, topicObj, System.nanoTime())); + } finally { + headersAndPayload.release(); + } + }); + }) + .exceptionally(ex -> { + Throwable cause = FutureUtil.unwrapCompletionException(ex); if (log.isDebugEnabled()) { - log.debug("Fail to publish single messages to topic {}: {} ", - topicName, e.getCause()); + log.debug("Fail to publish single message to topic {}: {}", topic, cause.getMessage()); } - publishResult.completeExceptionally(e); - } - } - }); - + if (!publishResult.isDone()) { + publishResult.completeExceptionally(cause); + } + return null; + }); return publishResult; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java index bcbd133ce5cd9..99cea2f8204df 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.admin; +import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; @@ -66,6 +67,7 @@ import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; @@ -82,6 +84,7 @@ import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema; import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl; import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.schema.KeyValue; @@ -931,4 +934,84 @@ public void testProduceWithAutoConsumeSchema() throws Exception { } } + @Test + public void testProduceWithBacklogQuotaSizeExceeded() throws Exception { + String namespaceName = testTenant + "/" + testNamespace; + String topicName = "persistent://" + namespaceName + "/" + testTopicName; + admin.topics().createNonPartitionedTopic(topicName); + BacklogQuota backlogQuota = BacklogQuota.builder() + .limitSize(0) + .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception) + .build(); + admin.namespaces().setBacklogQuota(namespaceName, backlogQuota, + BacklogQuota.BacklogQuotaType.destination_storage); + + AsyncResponse asyncResponse = mock(AsyncResponse.class); + ProducerMessages producerMessages = new ProducerMessages(); + String message = "[{\"payload\":\"rest-produce\"}]"; + producerMessages.setMessages(createMessages(message)); + topics.produceOnPersistentTopic(asyncResponse, testTenant, testNamespace, testTopicName, + false, producerMessages); + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class); + verify(asyncResponse, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.OK.getStatusCode()); + Object responseEntity = responseCaptor.getValue().getEntity(); + Assert.assertTrue(responseEntity instanceof ProducerAcks); + ProducerAcks response = (ProducerAcks) responseEntity; + Assert.assertEquals(response.getMessagePublishResults().size(), 1); + for (int index = 0; index < response.getMessagePublishResults().size(); index++) { + Assert.assertEquals(response.getMessagePublishResults().get(index).getErrorCode(), 2); + Assert.assertTrue(response.getMessagePublishResults().get(index).getErrorMsg() + .contains("Cannot create producer on topic with backlog quota exceeded")); + } + } + + @Test + public void testProduceWithBacklogQuotaTimeExceeded() throws Exception { + pulsar.getConfiguration().setPreciseTimeBasedBacklogQuotaCheck(true); + String namespaceName = testTenant + "/" + testNamespace; + String topicName = "persistent://" + namespaceName + "/" + testTopicName; + admin.topics().createNonPartitionedTopic(topicName); + BacklogQuota backlogQuota = BacklogQuota.builder() + .limitTime(1) + .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception) + .build(); + admin.namespaces().setBacklogQuota(namespaceName, backlogQuota, + BacklogQuota.BacklogQuotaType.message_age); + admin.topics().createSubscription(topicName, "time-quota-sub", MessageId.earliest); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .create(); + producer.send("backlog-message"); + Topic topic = pulsar.getBrokerService().getTopic(topicName, false) + .get() + .orElseThrow(() -> new IllegalStateException("Topic not loaded: " + topicName)); + PersistentTopic persistentTopic = (PersistentTopic) topic; + await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> + Assert.assertTrue(persistentTopic.checkTimeBacklogExceeded(true).get())); + + AsyncResponse asyncResponse = mock(AsyncResponse.class); + ProducerMessages producerMessages = new ProducerMessages(); + String message = "[" + + "{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1}," + + "{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2}]"; + producerMessages.setMessages(createMessages(message)); + topics.produceOnPersistentTopic(asyncResponse, testTenant, testNamespace, testTopicName, + false, producerMessages); + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class); + verify(asyncResponse, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.OK.getStatusCode()); + Object responseEntity = responseCaptor.getValue().getEntity(); + Assert.assertTrue(responseEntity instanceof ProducerAcks); + ProducerAcks response = (ProducerAcks) responseEntity; + Assert.assertEquals(response.getMessagePublishResults().size(), 2); + for (int index = 0; index < response.getMessagePublishResults().size(); index++) { + Assert.assertEquals(response.getMessagePublishResults().get(index).getErrorCode(), 2); + Assert.assertTrue(response.getMessagePublishResults().get(index).getErrorMsg() + .contains("Cannot create producer on topic with backlog quota exceeded")); + } + } + }