proxyRoles, String authenticatedPrincipal,
- String originalPrincipal) {
- if (proxyRoles.contains(authenticatedPrincipal)) {
- // Request has come from a proxy
+ /**
+ * Whether the authenticatedPrincipal and the originalPrincipal form a valid pair. This method assumes that
+ * authenticatedPrincipal and originalPrincipal can be equal, as long as they are not a proxy role. This use
+ * case is relvant for the admin server because of the way the proxy handles authentication. The binary protocol
+ * should not use this method.
+ * @return true when roles are a valid combination and false when roles are an invalid combination
+ */
+ public boolean isValidOriginalPrincipal(String authenticatedPrincipal,
+ String originalPrincipal,
+ AuthenticationDataSource authDataSource) {
+ SocketAddress remoteAddress = authDataSource != null ? authDataSource.getPeerAddress() : null;
+ return isValidOriginalPrincipal(authenticatedPrincipal, originalPrincipal, remoteAddress, true);
+ }
+
+ /**
+ * Validates that the authenticatedPrincipal and the originalPrincipal are a valid combination.
+ * Valid combinations fulfill one of the following two rules:
+ *
+ * 1. The authenticatedPrincipal is in {@link ServiceConfiguration#getProxyRoles()}, if, and only if,
+ * the originalPrincipal is set to a role that is not also in {@link ServiceConfiguration#getProxyRoles()}.
+ *
+ * 2. The authenticatedPrincipal and the originalPrincipal are the same, but are not a proxyRole, when
+ * allowNonProxyPrincipalsToBeEqual is true.
+ *
+ * @return true when roles are a valid combination and false when roles are an invalid combination
+ */
+ public boolean isValidOriginalPrincipal(String authenticatedPrincipal,
+ String originalPrincipal,
+ SocketAddress remoteAddress,
+ boolean allowNonProxyPrincipalsToBeEqual) {
+ String errorMsg = null;
+ if (conf.getProxyRoles().contains(authenticatedPrincipal)) {
if (StringUtils.isBlank(originalPrincipal)) {
- log.warn("Original principal empty in request authenticated as {}", authenticatedPrincipal);
- throw new RestException(Response.Status.UNAUTHORIZED, "Original principal cannot be empty if the "
- + "request is via proxy.");
- }
- if (proxyRoles.contains(originalPrincipal)) {
- log.warn("Original principal {} cannot be a proxy role ({})", originalPrincipal, proxyRoles);
- throw new RestException(Response.Status.UNAUTHORIZED, "Original principal cannot be a proxy role");
+ errorMsg = "originalPrincipal must be provided when connecting with a proxy role.";
+ } else if (conf.getProxyRoles().contains(originalPrincipal)) {
+ errorMsg = "originalPrincipal cannot be a proxy role.";
}
+ } else if (StringUtils.isNotBlank(originalPrincipal)
+ && !(allowNonProxyPrincipalsToBeEqual && originalPrincipal.equals(authenticatedPrincipal))) {
+ errorMsg = "cannot specify originalPrincipal when connecting without valid proxy role.";
+ }
+ if (errorMsg != null) {
+ log.warn("[{}] Illegal combination of role [{}] and originalPrincipal [{}]: {}", remoteAddress,
+ authenticatedPrincipal, originalPrincipal, errorMsg);
+ return false;
+ } else {
+ return true;
}
}
@@ -340,7 +373,9 @@ public CompletableFuture allowTenantOperationAsync(String tenantName,
String originalRole,
String role,
AuthenticationDataSource authData) {
- validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
+ if (!isValidOriginalPrincipal(role, originalRole, authData)) {
+ return CompletableFuture.completedFuture(false);
+ }
if (isProxyRole(role)) {
CompletableFuture isRoleAuthorizedFuture = allowTenantOperationAsync(
tenantName, operation, role, authData);
@@ -400,7 +435,9 @@ public CompletableFuture allowNamespaceOperationAsync(NamespaceName nam
String originalRole,
String role,
AuthenticationDataSource authData) {
- validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
+ if (!isValidOriginalPrincipal(role, originalRole, authData)) {
+ return CompletableFuture.completedFuture(false);
+ }
if (isProxyRole(role)) {
CompletableFuture isRoleAuthorizedFuture = allowNamespaceOperationAsync(
namespaceName, operation, role, authData);
@@ -442,7 +479,9 @@ public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceNa
String originalRole,
String role,
AuthenticationDataSource authData) {
- validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
+ if (!isValidOriginalPrincipal(role, originalRole, authData)) {
+ return CompletableFuture.completedFuture(false);
+ }
if (isProxyRole(role)) {
CompletableFuture isRoleAuthorizedFuture = allowNamespacePolicyOperationAsync(
namespaceName, policy, operation, role, authData);
@@ -503,10 +542,8 @@ public CompletableFuture allowTopicPolicyOperationAsync(TopicName topic
String originalRole,
String role,
AuthenticationDataSource authData) {
- try {
- validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
- } catch (RestException e) {
- return FutureUtil.failedFuture(e);
+ if (!isValidOriginalPrincipal(role, originalRole, authData)) {
+ return CompletableFuture.completedFuture(false);
}
if (isProxyRole(role)) {
CompletableFuture isRoleAuthorizedFuture = allowTopicPolicyOperationAsync(
@@ -594,7 +631,9 @@ public CompletableFuture allowTopicOperationAsync(TopicName topicName,
String originalRole,
String role,
AuthenticationDataSource authData) {
- validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
+ if (!isValidOriginalPrincipal(role, originalRole, authData)) {
+ return CompletableFuture.completedFuture(false);
+ }
if (isProxyRole(role)) {
CompletableFuture isRoleAuthorizedFuture = allowTopicOperationAsync(
topicName, operation, role, authData);
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java
index 8ca0c121ef1b9..c6b658c3bd025 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java
@@ -85,7 +85,7 @@ public CompletableFuture deleteLocalPoliciesAsync(NamespaceName ns) {
public CompletableFuture deleteLocalPoliciesTenantAsync(String tenant) {
final String localPoliciesPath = joinPath(LOCAL_POLICIES_ROOT, tenant);
CompletableFuture future = new CompletableFuture();
- deleteAsync(localPoliciesPath).whenComplete((ignore, ex) -> {
+ deleteIfExistsAsync(localPoliciesPath).whenComplete((ignore, ex) -> {
if (ex != null && ex.getCause().getCause() instanceof KeeperException) {
future.complete(null);
} else if (ex != null) {
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java
new file mode 100644
index 0000000000000..6f9dffa11b948
--- /dev/null
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authorization;
+
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertTrue;
+import java.util.HashSet;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class AuthorizationServiceTest {
+
+ AuthorizationService authorizationService;
+
+ @BeforeClass
+ void beforeClass() throws PulsarServerException {
+ ServiceConfiguration conf = new ServiceConfiguration();
+ conf.setAuthorizationEnabled(true);
+ // Consider both of these proxy roles to make testing more comprehensive
+ HashSet proxyRoles = new HashSet<>();
+ proxyRoles.add("pass.proxy");
+ proxyRoles.add("fail.proxy");
+ conf.setProxyRoles(proxyRoles);
+ conf.setAuthorizationProvider(MockAuthorizationProvider.class.getName());
+ authorizationService = new AuthorizationService(conf, null);
+ }
+
+ /**
+ * See {@link MockAuthorizationProvider} for the implementation of the mock authorization provider.
+ */
+ @DataProvider(name = "roles")
+ public Object[][] encryptionProvider() {
+ return new Object[][]{
+ // Schema: role, originalRole, whether authorization should pass
+
+ // Client conditions where original role isn't passed or is blank
+ {"pass.client", null, Boolean.TRUE},
+ {"pass.client", " ", Boolean.TRUE},
+ {"fail.client", null, Boolean.FALSE},
+ {"fail.client", " ", Boolean.FALSE},
+
+ // Proxy conditions where original role isn't passed or is blank
+ {"pass.proxy", null, Boolean.FALSE},
+ {"pass.proxy", " ", Boolean.FALSE},
+ {"fail.proxy", null, Boolean.FALSE},
+ {"fail.proxy", " ", Boolean.FALSE},
+
+ // Normal proxy and client conditions
+ {"pass.proxy", "pass.client", Boolean.TRUE},
+ {"pass.proxy", "fail.client", Boolean.FALSE},
+ {"fail.proxy", "pass.client", Boolean.FALSE},
+ {"fail.proxy", "fail.client", Boolean.FALSE},
+
+ // Not proxy with original principal
+ {"pass.not-proxy", "pass.client", Boolean.FALSE}, // non proxy role can't pass original role
+ {"pass.not-proxy", "fail.client", Boolean.FALSE},
+ {"fail.not-proxy", "pass.client", Boolean.FALSE},
+ {"fail.not-proxy", "fail.client", Boolean.FALSE},
+
+ // Covers an unlikely scenario, but valid in the context of this test
+ {null, "pass.proxy", Boolean.FALSE},
+ };
+ }
+
+ private void checkResult(boolean expected, boolean actual) {
+ if (expected) {
+ assertTrue(actual);
+ } else {
+ assertFalse(actual);
+ }
+ }
+
+ @Test(dataProvider = "roles")
+ public void testAllowTenantOperationAsync(String role, String originalRole, boolean shouldPass) throws Exception {
+ boolean isAuthorized = authorizationService.allowTenantOperationAsync("tenant",
+ TenantOperation.DELETE_NAMESPACE, originalRole, role, null).get();
+ checkResult(shouldPass, isAuthorized);
+ }
+
+ @Test(dataProvider = "roles")
+ public void testNamespaceOperationAsync(String role, String originalRole, boolean shouldPass) throws Exception {
+ boolean isAuthorized = authorizationService.allowNamespaceOperationAsync(NamespaceName.get("public/default"),
+ NamespaceOperation.PACKAGES, originalRole, role, null).get();
+ checkResult(shouldPass, isAuthorized);
+ }
+
+ @Test(dataProvider = "roles")
+ public void testTopicOperationAsync(String role, String originalRole, boolean shouldPass) throws Exception {
+ boolean isAuthorized = authorizationService.allowTopicOperationAsync(TopicName.get("topic"),
+ TopicOperation.PRODUCE, originalRole, role, null).get();
+ checkResult(shouldPass, isAuthorized);
+ }
+
+ @Test(dataProvider = "roles")
+ public void testNamespacePolicyOperationAsync(String role, String originalRole, boolean shouldPass)
+ throws Exception {
+ boolean isAuthorized = authorizationService.allowNamespacePolicyOperationAsync(
+ NamespaceName.get("public/default"), PolicyName.ALL, PolicyOperation.READ, originalRole, role, null)
+ .get();
+ checkResult(shouldPass, isAuthorized);
+ }
+
+ @Test(dataProvider = "roles")
+ public void testTopicPolicyOperationAsync(String role, String originalRole, boolean shouldPass) throws Exception {
+ boolean isAuthorized = authorizationService.allowTopicPolicyOperationAsync(TopicName.get("topic"),
+ PolicyName.ALL, PolicyOperation.READ, originalRole, role, null).get();
+ checkResult(shouldPass, isAuthorized);
+ }
+}
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MockAuthorizationProvider.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MockAuthorizationProvider.java
new file mode 100644
index 0000000000000..4c939cbd9723a
--- /dev/null
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MockAuthorizationProvider.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authorization;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+
+/**
+ * Mock implementation of the authorization provider interface used for testing.
+ * A role is authorized if it starts with "pass".
+ */
+public class MockAuthorizationProvider implements AuthorizationProvider {
+
+ private CompletableFuture shouldPass(String role) {
+ return CompletableFuture.completedFuture(role != null && role.startsWith("pass"));
+ }
+
+ @Override
+ public CompletableFuture canProduceAsync(TopicName topicName, String role,
+ AuthenticationDataSource authenticationData) {
+ return shouldPass(role);
+ }
+
+ @Override
+ public CompletableFuture canConsumeAsync(TopicName topicName, String role,
+ AuthenticationDataSource authenticationData, String subscription) {
+ return shouldPass(role);
+ }
+
+ @Override
+ public CompletableFuture canLookupAsync(TopicName topicName, String role,
+ AuthenticationDataSource authenticationData) {
+ return shouldPass(role);
+ }
+
+ @Override
+ public CompletableFuture allowFunctionOpsAsync(NamespaceName namespaceName, String role,
+ AuthenticationDataSource authenticationData) {
+ return shouldPass(role);
+ }
+
+ @Override
+ public CompletableFuture allowSourceOpsAsync(NamespaceName namespaceName, String role,
+ AuthenticationDataSource authenticationData) {
+ return shouldPass(role);
+ }
+
+ @Override
+ public CompletableFuture allowTenantOperationAsync(String tenantName, String role,
+ TenantOperation operation,
+ AuthenticationDataSource authData) {
+ return shouldPass(role);
+ }
+
+ @Override
+ public CompletableFuture allowNamespaceOperationAsync(NamespaceName namespaceName, String role,
+ NamespaceOperation operation,
+ AuthenticationDataSource authData) {
+ return shouldPass(role);
+ }
+
+ @Override
+ public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName policy,
+ PolicyOperation operation, String role,
+ AuthenticationDataSource authData) {
+ return shouldPass(role);
+ }
+
+ @Override
+ public CompletableFuture allowTopicOperationAsync(TopicName topic, String role, TopicOperation operation,
+ AuthenticationDataSource authData) {
+ return shouldPass(role);
+ }
+
+ @Override
+ public CompletableFuture allowTopicPolicyOperationAsync(TopicName topic, String role, PolicyName policy,
+ PolicyOperation operation,
+ AuthenticationDataSource authData) {
+ return shouldPass(role);
+ }
+
+
+ @Override
+ public CompletableFuture allowSinkOpsAsync(NamespaceName namespaceName, String role,
+ AuthenticationDataSource authenticationData) {
+ return null;
+ }
+
+
+ @Override
+ public CompletableFuture grantPermissionAsync(NamespaceName namespace, Set actions, String role,
+ String authDataJson) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture grantSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName,
+ Set roles, String authDataJson) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture revokeSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName,
+ String role, String authDataJson) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture grantPermissionAsync(TopicName topicName, Set actions, String role,
+ String authDataJson) {
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+}
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 074c6fc8a9bc0..7442d95e467e6 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.12.0-SNAPSHOT
+ 3.0.0-SNAPSHOT
..
@@ -697,11 +697,13 @@
com.github.kongchen
swagger-maven-plugin
- 3.1.7
+ 3.1.8
false
+ {{className}}_{{methodName}}
+ json
org.apache.pulsar.broker.admin.v2.Bookies
org.apache.pulsar.broker.admin.v2.BrokerStats
@@ -737,6 +739,8 @@
false
+ {{className}}_{{methodName}}
+ json
org.apache.pulsar.broker.lookup.v2
http,https
/lookup
@@ -754,6 +758,8 @@
false
+ {{className}}_{{methodName}}
+ json
org.apache.pulsar.broker.admin.v3.Functions
http,https
/admin/v3
@@ -771,6 +777,8 @@
false
+ {{className}}_{{methodName}}
+ json
org.apache.pulsar.broker.admin.v3.Transactions
http,https
/admin/v3
@@ -788,6 +796,8 @@
false
+ {{className}}_{{methodName}}
+ json
org.apache.pulsar.broker.admin.v3.Sources
http,https
/admin/v3
@@ -805,6 +815,8 @@
false
+ {{className}}_{{methodName}}
+ json
org.apache.pulsar.broker.admin.v3.Sinks
http,https
/admin/v3
@@ -822,6 +834,8 @@
false
+ {{className}}_{{methodName}}
+ json
org.apache.pulsar.broker.admin.v3.Packages
http,https
/admin/v3
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index ae0fb1a9f283c..4af94c339c82f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1551,6 +1551,7 @@ public synchronized PulsarClient getClient() throws PulsarServerException {
conf.setTlsCiphers(this.getConfiguration().getBrokerClientTlsCiphers());
conf.setTlsProtocols(this.getConfiguration().getBrokerClientTlsProtocols());
conf.setTlsAllowInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection());
+ conf.setTlsHostnameVerificationEnable(this.getConfiguration().isTlsHostnameVerificationEnabled());
if (this.getConfiguration().isBrokerClientTlsEnabledWithKeyStore()) {
conf.setUseKeyStoreTls(true);
conf.setTlsTrustStoreType(this.getConfiguration().getBrokerClientTlsTrustStoreType());
@@ -1622,7 +1623,8 @@ public synchronized PulsarAdmin getAdminClient() throws PulsarServerException {
.tlsKeyFilePath(conf.getBrokerClientKeyFilePath())
.tlsCertificateFilePath(conf.getBrokerClientCertificateFilePath());
}
- builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection());
+ builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection())
+ .enableTlsHostnameVerification(conf.isTlsHostnameVerificationEnabled());
}
// most of the admin request requires to make zk-call so, keep the max read-timeout based on
@@ -1849,7 +1851,7 @@ public static WorkerConfig initializeWorkerConfigFromBrokerConfig(ServiceConfigu
workerConfig.setMetadataStoreCacheExpirySeconds(brokerConfig.getMetadataStoreCacheExpirySeconds());
workerConfig.setTlsAllowInsecureConnection(brokerConfig.isTlsAllowInsecureConnection());
workerConfig.setTlsEnabled(brokerConfig.isTlsEnabled());
- workerConfig.setTlsEnableHostnameVerification(false);
+ workerConfig.setTlsEnableHostnameVerification(brokerConfig.isTlsHostnameVerificationEnabled());
workerConfig.setBrokerClientTrustCertsFilePath(brokerConfig.getTlsTrustCertsFilePath());
// client in worker will use this config to authenticate with broker
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index d0cf22a86533a..3e3b044ec51b8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -256,12 +256,13 @@ public CompletableFuture removeTransactionMetadataStore(TransactionCoordin
}
}
- public CompletableFuture newTransaction(TransactionCoordinatorID tcId, long timeoutInMills) {
+ public CompletableFuture newTransaction(TransactionCoordinatorID tcId, long timeoutInMills,
+ String owner) {
TransactionMetadataStore store = stores.get(tcId);
if (store == null) {
return FutureUtil.failedFuture(new CoordinatorNotFoundException(tcId));
}
- return store.newTransaction(timeoutInMills);
+ return store.newTransaction(timeoutInMills, owner);
}
public CompletableFuture addProducedPartitionToTxn(TxnID txnId, List partitions) {
@@ -483,6 +484,21 @@ public Map getStores() {
return Collections.unmodifiableMap(stores);
}
+ public CompletableFuture verifyTxnOwnership(TxnID txnID, String checkOwner) {
+ return getTxnMeta(txnID)
+ .thenCompose(meta -> {
+ // owner was null in the old versions or no auth enabled
+ if (meta.getOwner() == null) {
+ return CompletableFuture.completedFuture(true);
+ }
+ if (meta.getOwner().equals(checkOwner)) {
+ return CompletableFuture.completedFuture(true);
+ }
+ return CompletableFuture.completedFuture(false);
+ });
+ }
+
+
public void close () {
this.internalPinnedExecutor.shutdown();
stores.forEach((tcId, metadataStore) -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index d5cf6a3e74d0e..19f8d7c437ded 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -38,12 +38,11 @@
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
@@ -59,7 +58,8 @@
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
-import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
@@ -115,6 +115,7 @@
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+import org.apache.zookeeper.KeeperException;
@Slf4j
public abstract class NamespacesBase extends AdminResource {
@@ -204,87 +205,103 @@ protected CompletableFuture> internalGetNonPersistentTopics(Policie
});
}
- @SuppressWarnings("unchecked")
- protected CompletableFuture internalDeleteNamespaceAsync(boolean force) {
- CompletableFuture preconditionCheck = precheckWhenDeleteNamespace(namespaceName, force);
- return preconditionCheck
+ /**
+ * Delete the namespace and retry to resolve some topics that were not created successfully(in metadata)
+ * during the deletion.
+ */
+ protected @Nonnull CompletableFuture internalDeleteNamespaceAsync(boolean force) {
+ final CompletableFuture future = new CompletableFuture<>();
+ internalRetryableDeleteNamespaceAsync0(force, 5, future);
+ return future;
+ }
+ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTimes,
+ @Nonnull CompletableFuture callback) {
+ precheckWhenDeleteNamespace(namespaceName, force)
.thenCompose(policies -> {
+ final CompletableFuture> topicsFuture;
if (policies == null || CollectionUtils.isEmpty(policies.replication_clusters)){
- return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName);
- }
- return pulsar().getNamespaceService().getFullListOfTopics(namespaceName);
- })
- .thenCompose(allTopics -> pulsar().getNamespaceService().getFullListOfPartitionedTopic(namespaceName)
- .thenCompose(allPartitionedTopics -> {
- List> topicsSum = new ArrayList<>(2);
- topicsSum.add(allTopics);
- topicsSum.add(allPartitionedTopics);
- return CompletableFuture.completedFuture(topicsSum);
- }))
- .thenCompose(topics -> {
- List allTopics = topics.get(0);
- ArrayList allUserCreatedTopics = new ArrayList<>();
- List allPartitionedTopics = topics.get(1);
- ArrayList allUserCreatedPartitionTopics = new ArrayList<>();
- boolean hasNonSystemTopic = false;
- List allSystemTopics = new ArrayList<>();
- List allPartitionedSystemTopics = new ArrayList<>();
- List topicPolicy = new ArrayList<>();
- List partitionedTopicPolicy = new ArrayList<>();
- for (String topic : allTopics) {
- if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
- hasNonSystemTopic = true;
- allUserCreatedTopics.add(topic);
- } else {
- if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
- topicPolicy.add(topic);
- } else {
- allSystemTopics.add(topic);
- }
- }
- }
- for (String topic : allPartitionedTopics) {
- if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
- hasNonSystemTopic = true;
- allUserCreatedPartitionTopics.add(topic);
- } else {
- if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
- partitionedTopicPolicy.add(topic);
- } else {
- allPartitionedSystemTopics.add(topic);
- }
- }
- }
- if (!force) {
- if (hasNonSystemTopic) {
- throw new RestException(Status.CONFLICT, "Cannot delete non empty namespace");
- }
+ topicsFuture = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName);
+ } else {
+ topicsFuture = pulsar().getNamespaceService().getFullListOfTopics(namespaceName);
}
- return namespaceResources().setPoliciesAsync(namespaceName, old -> {
- old.deleted = true;
- return old;
- }).thenCompose(ignore -> {
- return internalDeleteTopicsAsync(allUserCreatedTopics);
- }).thenCompose(ignore -> {
- return internalDeletePartitionedTopicsAsync(allUserCreatedPartitionTopics);
- }).thenCompose(ignore -> {
- return internalDeleteTopicsAsync(allSystemTopics);
- }).thenCompose(ignore__ -> {
- return internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics);
- }).thenCompose(ignore -> {
- return internalDeleteTopicsAsync(topicPolicy);
- }).thenCompose(ignore__ -> {
- return internalDeletePartitionedTopicsAsync(partitionedTopicPolicy);
- });
+ return topicsFuture.thenCompose(allTopics ->
+ pulsar().getNamespaceService().getFullListOfPartitionedTopic(namespaceName)
+ .thenCompose(allPartitionedTopics -> {
+ List> topicsSum = new ArrayList<>(2);
+ topicsSum.add(allTopics);
+ topicsSum.add(allPartitionedTopics);
+ return CompletableFuture.completedFuture(topicsSum);
+ }))
+ .thenCompose(topics -> {
+ List allTopics = topics.get(0);
+ ArrayList allUserCreatedTopics = new ArrayList<>();
+ List allPartitionedTopics = topics.get(1);
+ ArrayList allUserCreatedPartitionTopics = new ArrayList<>();
+ boolean hasNonSystemTopic = false;
+ List allSystemTopics = new ArrayList<>();
+ List allPartitionedSystemTopics = new ArrayList<>();
+ List topicPolicy = new ArrayList<>();
+ List partitionedTopicPolicy = new ArrayList<>();
+ for (String topic : allTopics) {
+ if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
+ hasNonSystemTopic = true;
+ allUserCreatedTopics.add(topic);
+ } else {
+ if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
+ topicPolicy.add(topic);
+ } else if (!isDeletedAlongWithUserCreatedTopic(topic)) {
+ allSystemTopics.add(topic);
+ }
+ }
+ }
+ for (String topic : allPartitionedTopics) {
+ if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
+ hasNonSystemTopic = true;
+ allUserCreatedPartitionTopics.add(topic);
+ } else {
+ if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
+ partitionedTopicPolicy.add(topic);
+ } else {
+ allPartitionedSystemTopics.add(topic);
+ }
+ }
+ }
+ if (!force) {
+ if (hasNonSystemTopic) {
+ throw new RestException(Status.CONFLICT, "Cannot delete non empty namespace");
+ }
+ }
+ final CompletableFuture markDeleteFuture;
+ if (policies != null && policies.deleted) {
+ markDeleteFuture = CompletableFuture.completedFuture(null);
+ } else {
+ markDeleteFuture = namespaceResources().setPoliciesAsync(namespaceName, old -> {
+ old.deleted = true;
+ return old;
+ });
+ }
+ return markDeleteFuture.thenCompose(__ ->
+ internalDeleteTopicsAsync(allUserCreatedTopics))
+ .thenCompose(ignore ->
+ internalDeletePartitionedTopicsAsync(allUserCreatedPartitionTopics))
+ .thenCompose(ignore ->
+ internalDeleteTopicsAsync(allSystemTopics))
+ .thenCompose(ignore ->
+ internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics))
+ .thenCompose(ignore ->
+ internalDeleteTopicsAsync(topicPolicy))
+ .thenCompose(ignore ->
+ internalDeletePartitionedTopicsAsync(partitionedTopicPolicy));
+ });
})
.thenCompose(ignore -> pulsar().getNamespaceService()
.getNamespaceBundleFactory().getBundlesAsync(namespaceName))
.thenCompose(bundles -> FutureUtil.waitForAll(bundles.getBundles().stream()
- .map(bundle -> pulsar().getNamespaceService().getOwnerAsync(bundle)
- .thenCompose(owner -> {
+ .map(bundle -> pulsar().getNamespaceService().checkOwnershipPresentAsync(bundle)
+ .thenCompose(present -> {
// check if the bundle is owned by any broker,
// if not then we do not need to delete the bundle
- if (owner.isPresent()) {
+ if (present) {
PulsarAdmin admin;
try {
admin = pulsar().getAdminClient();
@@ -299,7 +316,37 @@ protected CompletableFuture internalDeleteNamespaceAsync(boolean force) {
return CompletableFuture.completedFuture(null);
})
).collect(Collectors.toList())))
- .thenCompose(ignore -> internalClearZkSources());
+ .thenCompose(ignore -> internalClearZkSources())
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ final Throwable rc = FutureUtil.unwrapCompletionException(error);
+ if (rc instanceof MetadataStoreException) {
+ if (rc.getCause() != null && rc.getCause() instanceof KeeperException.NotEmptyException) {
+ log.info("[{}] There are in-flight topics created during the namespace deletion, "
+ + "retry to delete the namespace again.", namespaceName);
+ final int next = retryTimes - 1;
+ if (next > 0) {
+ // async recursive
+ internalRetryableDeleteNamespaceAsync0(force, next, callback);
+ } else {
+ callback.completeExceptionally(
+ new RestException(Status.CONFLICT, "The broker still have in-flight topics"
+ + " created during namespace deletion, please try again."));
+ // drop out recursive
+ }
+ return;
+ }
+ }
+ callback.completeExceptionally(error);
+ return;
+ }
+ callback.complete(result);
+ });
+ }
+
+ private boolean isDeletedAlongWithUserCreatedTopic(String topic) {
+ // The transaction pending ack topic will be deleted while topic unsubscribe corresponding subscription.
+ return topic.endsWith(SystemTopicNames.PENDING_ACK_STORE_SUFFIX);
}
private CompletableFuture internalDeletePartitionedTopicsAsync(List topicNames) {
@@ -886,55 +933,82 @@ protected BookieAffinityGroupData internalGetBookieAffinityGroup() {
}
}
- private void validateLeaderBroker() {
- if (!this.isLeaderBroker()) {
- LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader().get();
- String leaderBrokerUrl = leaderBroker.getServiceUrl();
- CompletableFuture result = pulsar().getNamespaceService()
- .createLookupResult(leaderBrokerUrl, false, null);
- try {
- LookupResult lookupResult = result.get(2L, TimeUnit.SECONDS);
- String redirectUrl = isRequestHttps() ? lookupResult.getLookupData().getHttpUrlTls()
- : lookupResult.getLookupData().getHttpUrl();
- if (redirectUrl == null) {
- log.error("Redirected broker's service url is not configured");
- throw new RestException(Response.Status.PRECONDITION_FAILED,
- "Redirected broker's service url is not configured.");
- }
- URL url = new URL(redirectUrl);
- URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(url.getHost())
- .port(url.getPort())
- .replaceQueryParam("authoritative",
- false).build();
-
- // Redirect
- if (log.isDebugEnabled()) {
- log.debug("Redirecting the request call to leader - {}", redirect);
- }
- throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
- } catch (MalformedURLException exception) {
- log.error("The leader broker url is malformed - {}", leaderBrokerUrl);
- throw new RestException(exception);
- } catch (ExecutionException | InterruptedException exception) {
- log.error("Leader broker not found - {}", leaderBrokerUrl);
- throw new RestException(exception.getCause());
- } catch (TimeoutException exception) {
- log.error("Leader broker not found within timeout - {}", leaderBrokerUrl);
- throw new RestException(exception);
- }
+ private CompletableFuture validateLeaderBrokerAsync() {
+ if (this.isLeaderBroker()) {
+ return CompletableFuture.completedFuture(null);
}
+ Optional currentLeaderOpt = pulsar().getLeaderElectionService().getCurrentLeader();
+ if (currentLeaderOpt.isEmpty()) {
+ String errorStr = "The current leader is empty.";
+ log.error(errorStr);
+ return FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED, errorStr));
+ }
+ LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader().get();
+ String leaderBrokerUrl = leaderBroker.getServiceUrl();
+ return pulsar().getNamespaceService()
+ .createLookupResult(leaderBrokerUrl, false, null)
+ .thenCompose(lookupResult -> {
+ String redirectUrl = isRequestHttps() ? lookupResult.getLookupData().getHttpUrlTls()
+ : lookupResult.getLookupData().getHttpUrl();
+ if (redirectUrl == null) {
+ log.error("Redirected broker's service url is not configured");
+ return FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED,
+ "Redirected broker's service url is not configured."));
+ }
+
+ try {
+ URL url = new URL(redirectUrl);
+ URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(url.getHost())
+ .port(url.getPort())
+ .replaceQueryParam("authoritative",
+ false).build();
+ // Redirect
+ if (log.isDebugEnabled()) {
+ log.debug("Redirecting the request call to leader - {}", redirect);
+ }
+ return FutureUtil.failedFuture((
+ new WebApplicationException(Response.temporaryRedirect(redirect).build())));
+ } catch (MalformedURLException exception) {
+ log.error("The leader broker url is malformed - {}", leaderBrokerUrl);
+ return FutureUtil.failedFuture(new RestException(exception));
+ }
+ });
}
- public void setNamespaceBundleAffinity (String bundleRange, String destinationBroker) {
+ public CompletableFuture setNamespaceBundleAffinityAsync(String bundleRange, String destinationBroker) {
if (StringUtils.isBlank(destinationBroker)) {
- return;
+ return CompletableFuture.completedFuture(null);
}
- validateLeaderBroker();
- pulsar().getLoadManager().get().setNamespaceBundleAffinity(bundleRange, destinationBroker);
+ return pulsar().getLoadManager().get().getAvailableBrokersAsync()
+ .thenCompose(brokers -> {
+ if (!brokers.contains(destinationBroker)) {
+ log.warn("[{}] Failed to unload namespace bundle {}/{} to inactive broker {}.",
+ clientAppId(), namespaceName, bundleRange, destinationBroker);
+ return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException(
+ "Not allowed unload namespace bundle to inactive destination broker"));
+ }
+ return CompletableFuture.completedFuture(null);
+ })
+ .thenCompose(__ -> {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config())) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return validateLeaderBrokerAsync();
+ })
+ .thenAccept(__ -> {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config())) {
+ return;
+ }
+ // For ExtensibleLoadManager, this operation will be ignored.
+ pulsar().getLoadManager().get().setNamespaceBundleAffinity(bundleRange, destinationBroker);
+ });
}
- public CompletableFuture internalUnloadNamespaceBundleAsync(String bundleRange, boolean authoritative) {
+ public CompletableFuture internalUnloadNamespaceBundleAsync(String bundleRange,
+ String destinationBroker,
+ boolean authoritative) {
return validateSuperUserAccessAsync()
+ .thenCompose(__ -> setNamespaceBundleAffinityAsync(bundleRange, destinationBroker))
.thenAccept(__ -> {
checkNotNull(bundleRange, "BundleRange should not be null");
log.info("[{}] Unloading namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange);
@@ -977,10 +1051,11 @@ public CompletableFuture internalUnloadNamespaceBundleAsync(String bundleR
namespaceName, bundleRange);
return CompletableFuture.completedFuture(null);
}
+ Optional destinationBrokerOpt = Optional.ofNullable(destinationBroker);
return validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange,
authoritative, true)
- .thenCompose(nsBundle ->
- pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle));
+ .thenCompose(nsBundle -> pulsar().getNamespaceService()
+ .unloadNamespaceBundle(nsBundle, destinationBrokerOpt));
}));
}
@@ -1028,7 +1103,9 @@ protected CompletableFuture internalSplitNamespaceBundleAsync(String bundl
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange,
authoritative, false))
.thenCompose(nsBundle -> pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
- getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), splitBoundaries));
+ pulsar().getNamespaceService()
+ .getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName),
+ splitBoundaries));
});
}
@@ -1109,18 +1186,6 @@ private CompletableFuture findHotBundleAsync(NamespaceName name
.getBundleWithHighestThroughputAsync(namespaceName);
}
- private NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(String algorithmName) {
- NamespaceBundleSplitAlgorithm algorithm = NamespaceBundleSplitAlgorithm.of(algorithmName);
- if (algorithm == null) {
- algorithm = NamespaceBundleSplitAlgorithm.of(
- pulsar().getConfig().getDefaultNamespaceBundleSplitAlgorithm());
- }
- if (algorithm == null) {
- algorithm = NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO;
- }
- return algorithm;
- }
-
protected void internalSetPublishRate(PublishRate maxPublishMessageRate) {
validateSuperUserAccess();
log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(), namespaceName, maxPublishMessageRate);
@@ -1346,7 +1411,7 @@ protected void internalClearNamespaceBacklog(AsyncResponse asyncResponse, boolea
.getBundles(namespaceName);
for (NamespaceBundle nsBundle : bundles.getBundles()) {
// check if the bundle is owned by any broker, if not then there is no backlog on this bundle to clear
- if (pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) {
+ if (pulsar().getNamespaceService().checkOwnershipPresent(nsBundle)) {
futures.add(pulsar().getAdminClient().namespaces()
.clearNamespaceBundleBacklogAsync(namespaceName.toString(), nsBundle.getBundleRange()));
}
@@ -1411,7 +1476,7 @@ protected void internalClearNamespaceBacklogForSubscription(AsyncResponse asyncR
.getBundles(namespaceName);
for (NamespaceBundle nsBundle : bundles.getBundles()) {
// check if the bundle is owned by any broker, if not then there is no backlog on this bundle to clear
- if (pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) {
+ if (pulsar().getNamespaceService().checkOwnershipPresent(nsBundle)) {
futures.add(pulsar().getAdminClient().namespaces().clearNamespaceBundleBacklogForSubscriptionAsync(
namespaceName.toString(), nsBundle.getBundleRange(), subscription));
}
@@ -1478,7 +1543,7 @@ protected void internalUnsubscribeNamespace(AsyncResponse asyncResponse, String
.getBundles(namespaceName);
for (NamespaceBundle nsBundle : bundles.getBundles()) {
// check if the bundle is owned by any broker, if not then there are no subscriptions
- if (pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) {
+ if (pulsar().getNamespaceService().checkOwnershipPresent(nsBundle)) {
futures.add(pulsar().getAdminClient().namespaces().unsubscribeNamespaceBundleAsync(
namespaceName.toString(), nsBundle.getBundleRange(), subscription));
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index c5d465e747e12..035e32542ed71 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -420,7 +420,7 @@ protected CompletableFuture internalCreateNonPartitionedTopicAsync(boolean
.thenCompose(partitionedTopicMetadata -> {
int currentMetadataPartitions = partitionedTopicMetadata.partitions;
if (currentMetadataPartitions <= 0) {
- throw new RestException(422 /* Unprocessable entity*/,
+ throw new RestException(Status.CONFLICT /* Unprocessable entity*/,
String.format("Topic %s is not the partitioned topic.", topicName));
}
if (expectPartitions < currentMetadataPartitions) {
@@ -2672,7 +2672,7 @@ protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, String
if (topicMetadata.partitions > 0) {
log.warn("[{}] Not supported operation on partitioned-topic {} {}",
clientAppId(), topicName, subName);
- asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+ throw new CompletionException(new RestException(Status.METHOD_NOT_ALLOWED,
"Reset-cursor at position is not allowed for partitioned-topic"));
}
return CompletableFuture.completedFuture(null);
@@ -4317,7 +4317,7 @@ protected void internalOffloadStatus(AsyncResponse asyncResponse, boolean author
});
}
- public static CompletableFuture getPartitionedTopicMetadata(
+ public CompletableFuture getPartitionedTopicMetadata(
PulsarService pulsar, String clientAppId, String originalPrincipal,
AuthenticationDataSource authenticationData, TopicName topicName) {
CompletableFuture metadataFuture = new CompletableFuture<>();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
index f537f0ecdb9eb..d596cbdd39db9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
@@ -215,6 +215,7 @@ private void getTransactionMetadata(TxnMeta txnMeta,
transactionMetadata.status = txnMeta.status().name();
transactionMetadata.openTimestamp = txnMeta.getOpenTimestamp();
transactionMetadata.timeoutAt = txnMeta.getTimeoutAt();
+ transactionMetadata.owner = txnMeta.getOwner();
List> ackedPartitionsFutures = new ArrayList<>();
Map>> ackFutures = new HashMap<>();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index c13441db3dfdb..153e29506c3d0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -46,9 +46,7 @@
import javax.ws.rs.core.Response.Status;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
-import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceName;
@@ -250,12 +248,6 @@ public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathP
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
- Throwable cause = FutureUtil.unwrapCompletionException(ex);
- if (cause instanceof PulsarAdminException.ConflictException) {
- log.info("[{}] There are new topics created during the namespace deletion, "
- + "retry to delete the namespace again.", namespaceName);
- pulsar().getExecutor().execute(() -> internalDeleteNamespaceAsync(force));
- }
if (!isRedirectException(ex)) {
log.error("[{}] Failed to delete namespace {}", clientAppId(), namespaceName, ex);
}
@@ -891,34 +883,13 @@ public void unloadNamespaceBundle(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("destinationBroker") String destinationBroker) {
validateNamespaceName(property, cluster, namespace);
- pulsar().getLoadManager().get().getAvailableBrokersAsync()
- .thenApply(brokers ->
- StringUtils.isNotBlank(destinationBroker) ? brokers.contains(destinationBroker) : true)
- .thenAccept(isActiveDestination -> {
- if (isActiveDestination) {
- setNamespaceBundleAffinity(bundleRange, destinationBroker);
- internalUnloadNamespaceBundleAsync(bundleRange, authoritative)
- .thenAccept(__ -> {
- log.info("[{}] Successfully unloaded namespace bundle {}",
- clientAppId(), bundleRange);
- asyncResponse.resume(Response.noContent().build());
- })
- .exceptionally(ex -> {
- if (!isRedirectException(ex)) {
- log.error("[{}] Failed to unload namespace bundle {}/{}",
- clientAppId(), namespaceName, bundleRange, ex);
- }
- resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
- });
- } else {
- log.warn("[{}] Failed to unload namespace bundle {}/{} to inactive broker {}.",
- clientAppId(), namespaceName, bundleRange, destinationBroker);
- resumeAsyncResponseExceptionally(asyncResponse,
- new BrokerServiceException.NotAllowedException(
- "Not allowed unload namespace bundle to inactive destination broker"));
- }
- }).exceptionally(ex -> {
+ internalUnloadNamespaceBundleAsync(bundleRange, destinationBroker, authoritative)
+ .thenAccept(__ -> {
+ log.info("[{}] Successfully unloaded namespace bundle {}",
+ clientAppId(), bundleRange);
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to unload namespace bundle {}/{}",
clientAppId(), namespaceName, bundleRange, ex);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 80af5f4ad45b7..55766c173f71f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -46,12 +46,9 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
-import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.admin.impl.OffloaderObjectsScannerUtils;
-import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.naming.NamespaceName;
@@ -199,12 +196,6 @@ public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathP
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
- Throwable cause = FutureUtil.unwrapCompletionException(ex);
- if (cause instanceof PulsarAdminException.ConflictException) {
- log.info("[{}] There are new topics created during the namespace deletion, "
- + "retry to delete the namespace again.", namespaceName);
- pulsar().getExecutor().execute(() -> internalDeleteNamespaceAsync(force));
- }
if (!isRedirectException(ex)) {
log.error("[{}] Failed to delete namespace {}", clientAppId(), namespaceName, ex);
}
@@ -817,34 +808,13 @@ public void unloadNamespaceBundle(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("destinationBroker") String destinationBroker) {
validateNamespaceName(tenant, namespace);
- pulsar().getLoadManager().get().getAvailableBrokersAsync()
- .thenApply(brokers ->
- StringUtils.isNotBlank(destinationBroker) ? brokers.contains(destinationBroker) : true)
- .thenAccept(isActiveDestination -> {
- if (isActiveDestination) {
- setNamespaceBundleAffinity(bundleRange, destinationBroker);
- internalUnloadNamespaceBundleAsync(bundleRange, authoritative)
- .thenAccept(__ -> {
- log.info("[{}] Successfully unloaded namespace bundle {}",
- clientAppId(), bundleRange);
- asyncResponse.resume(Response.noContent().build());
- })
- .exceptionally(ex -> {
- if (!isRedirectException(ex)) {
- log.error("[{}] Failed to unload namespace bundle {}/{}",
- clientAppId(), namespaceName, bundleRange, ex);
- }
- resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
- });
- } else {
- log.warn("[{}] Failed to unload namespace bundle {}/{} to inactive broker {}.",
- clientAppId(), namespaceName, bundleRange, destinationBroker);
- resumeAsyncResponseExceptionally(asyncResponse,
- new BrokerServiceException.NotAllowedException(
- "Not allowed unload namespace bundle to inactive destination broker"));
- }
- }).exceptionally(ex -> {
+ internalUnloadNamespaceBundleAsync(bundleRange, destinationBroker, authoritative)
+ .thenAccept(__ -> {
+ log.info("[{}] Successfully unloaded namespace bundle {}",
+ clientAppId(), bundleRange);
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to unload namespace bundle {}/{}",
clientAppId(), namespaceName, bundleRange, ex);
@@ -2599,7 +2569,7 @@ public void getProperty(
@DELETE
@Path("/{tenant}/{namespace}/property/{key}")
- @ApiOperation(value = "Get property value for a given key on a namespace.")
+ @ApiOperation(value = "Remove property value for a given key on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or namespace doesn't exist"), })
public void removeProperty(
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 7eeaec403d842..477cc8b57129d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1106,7 +1106,9 @@ public void deleteTopic(
ex = new RestException(Response.Status.PRECONDITION_FAILED,
t.getMessage());
}
- if (isManagedLedgerNotFoundException(t)) {
+ if (t instanceof IllegalStateException){
+ ex = new RestException(422/* Unprocessable entity*/, t.getMessage());
+ } else if (isManagedLedgerNotFoundException(t)) {
ex = new RestException(Response.Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString()));
} else if (!isRedirectException(ex)) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
index 16648d84e9fba..f0feb8b27d6a1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
@@ -43,12 +43,15 @@ public class BucketDelayedDeliveryTrackerFactory implements DelayedDeliveryTrack
private long delayedDeliveryMinIndexCountPerBucket;
- private long delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds;
+ private int delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds;
+
+ private int delayedDeliveryMaxIndexesPerBucketSnapshotSegment;
@Override
public void initialize(PulsarService pulsarService) throws Exception {
ServiceConfiguration config = pulsarService.getConfig();
bucketSnapshotStorage = new BookkeeperBucketSnapshotStorage(pulsarService);
+ bucketSnapshotStorage.start();
this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-delayed-delivery"),
config.getDelayedDeliveryTickTimeMillis(), TimeUnit.MILLISECONDS);
this.tickTimeMillis = config.getDelayedDeliveryTickTimeMillis();
@@ -57,14 +60,16 @@ public void initialize(PulsarService pulsarService) throws Exception {
this.delayedDeliveryMaxNumBuckets = config.getDelayedDeliveryMaxNumBuckets();
this.delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds =
config.getDelayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds();
+ this.delayedDeliveryMaxIndexesPerBucketSnapshotSegment =
+ config.getDelayedDeliveryMaxIndexesPerBucketSnapshotSegment();
}
@Override
public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
return new BucketDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis, isDelayedDeliveryDeliverAtTimeStrict,
bucketSnapshotStorage, delayedDeliveryMinIndexCountPerBucket,
- delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds,
- delayedDeliveryMaxNumBuckets);
+ TimeUnit.SECONDS.toMillis(delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds),
+ delayedDeliveryMaxIndexesPerBucketSnapshotSegment, delayedDeliveryMaxNumBuckets);
}
@Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
index 2f248a441cdee..78229fef25a5a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
@@ -20,6 +20,7 @@
import com.google.common.annotations.Beta;
import java.util.NavigableSet;
+import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
/**
@@ -66,12 +67,6 @@ public interface DelayedDeliveryTracker extends AutoCloseable {
*/
boolean shouldPauseAllDeliveries();
- /**
- * Tells whether this DelayedDeliveryTracker contains this message index,
- * if the tracker is not supported it or disabled this feature also will return false.
- */
- boolean containsMessage(long ledgerId, long entryId);
-
/**
* Reset tick time use zk policies cache.
* @param tickTime
@@ -81,8 +76,10 @@ public interface DelayedDeliveryTracker extends AutoCloseable {
/**
* Clear all delayed messages from the tracker.
+ *
+ * @return CompletableFuture
*/
- void clear();
+ CompletableFuture clear();
/**
* Close the subscription tracker and release all resources.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index f55d5fd11694b..58358b06a46bb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -23,6 +23,7 @@
import java.time.Clock;
import java.util.NavigableSet;
import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -147,8 +148,9 @@ public NavigableSet getScheduledMessages(int maxMessages) {
}
@Override
- public void clear() {
+ public CompletableFuture clear() {
this.priorityQueue.clear();
+ return CompletableFuture.completedFuture(null);
}
@Override
@@ -176,11 +178,6 @@ && getNumberOfDelayedMessages() >= fixedDelayDetectionLookahead
&& !hasMessageAvailable();
}
- @Override
- public boolean containsMessage(long ledgerId, long entryId) {
- return false;
- }
-
protected long nextDeliveryTime() {
return priorityQueue.peekN1();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
index 1cadc6d98e268..e7d4f9301dd36 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
@@ -56,8 +56,8 @@ public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
@Override
public CompletableFuture createBucketSnapshot(SnapshotMetadata snapshotMetadata,
List bucketSnapshotSegments,
- String bucketKey) {
- return createLedger(bucketKey)
+ String bucketKey, String topicName, String cursorName) {
+ return createLedger(bucketKey, topicName, cursorName)
.thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
.thenCompose(__ -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments))
.thenCompose(__ -> closeLedger(ledgerHandle))
@@ -67,7 +67,7 @@ public CompletableFuture createBucketSnapshot(SnapshotMetadata snapshotMet
@Override
public CompletableFuture getBucketSnapshotMetadata(long bucketId) {
return openLedger(bucketId).thenCompose(
- ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, 0, 0).
+ ledgerHandle -> getLedgerEntry(ledgerHandle, 0, 0).
thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
}
@@ -75,17 +75,13 @@ public CompletableFuture getBucketSnapshotMetadata(long bucket
public CompletableFuture> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
long lastSegmentEntryId) {
return openLedger(bucketId).thenCompose(
- ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, firstSegmentEntryId,
+ ledgerHandle -> getLedgerEntry(ledgerHandle, firstSegmentEntryId,
lastSegmentEntryId).thenApply(this::parseSnapshotSegmentEntries));
}
@Override
public CompletableFuture getBucketSnapshotLength(long bucketId) {
- return openLedger(bucketId).thenApply(ledgerHandle -> {
- long length = ledgerHandle.getLength();
- closeLedger(ledgerHandle);
- return length;
- });
+ return openLedger(bucketId).thenApply(LedgerHandle::getLength);
}
@Override
@@ -143,9 +139,10 @@ private List parseSnapshotSegmentEntries(Enumeration createLedger(String bucketKey) {
+ private CompletableFuture createLedger(String bucketKey, String topicName, String cursorName) {
CompletableFuture future = new CompletableFuture<>();
- Map metadata = LedgerMetadataUtils.buildMetadataForDelayedIndexBucket(bucketKey);
+ Map metadata = LedgerMetadataUtils.buildMetadataForDelayedIndexBucket(bucketKey,
+ topicName, cursorName);
bookKeeper.asyncCreateLedger(
config.getManagedLedgerDefaultEnsembleSize(),
config.getManagedLedgerDefaultWriteQuorum(),
@@ -154,7 +151,7 @@ private CompletableFuture createLedger(String bucketKey) {
LedgerPassword,
(rc, handle, ctx) -> {
if (rc != BKException.Code.OK) {
- future.completeExceptionally(bkException("Failed to create ledger", rc, -1));
+ future.completeExceptionally(bkException("Create ledger", rc, -1));
} else {
future.complete(handle);
}
@@ -170,7 +167,7 @@ private CompletableFuture openLedger(Long ledgerId) {
LedgerPassword,
(rc, handle, ctx) -> {
if (rc != BKException.Code.OK) {
- future.completeExceptionally(bkException("Failed to open ledger", rc, ledgerId));
+ future.completeExceptionally(bkException("Open ledger", rc, ledgerId));
} else {
future.complete(handle);
}
@@ -184,7 +181,7 @@ private CompletableFuture closeLedger(LedgerHandle ledgerHandle) {
ledgerHandle.asyncClose((rc, handle, ctx) -> {
if (rc != BKException.Code.OK) {
log.warn("Failed to close a Ledger Handle: {}", ledgerHandle.getId());
- future.completeExceptionally(bkException("Failed to close ledger", rc, ledgerHandle.getId()));
+ future.completeExceptionally(bkException("Close ledger", rc, ledgerHandle.getId()));
} else {
future.complete(null);
}
@@ -197,7 +194,7 @@ private CompletableFuture addEntry(LedgerHandle ledgerHandle, byte[] data)
ledgerHandle.asyncAddEntry(data,
(rc, handle, entryId, ctx) -> {
if (rc != BKException.Code.OK) {
- future.completeExceptionally(bkException("Failed to add entry", rc, ledgerHandle.getId()));
+ future.completeExceptionally(bkException("Add entry", rc, ledgerHandle.getId()));
} else {
future.complete(null);
}
@@ -211,17 +208,16 @@ private CompletableFuture addEntry(LedgerHandle ledgerHandle, byte[] data)
});
}
- CompletableFuture> getLedgerEntryThenCloseLedger(LedgerHandle ledger,
- long firstEntryId, long lastEntryId) {
+ CompletableFuture> getLedgerEntry(LedgerHandle ledger,
+ long firstEntryId, long lastEntryId) {
final CompletableFuture> future = new CompletableFuture<>();
ledger.asyncReadEntries(firstEntryId, lastEntryId,
(rc, handle, entries, ctx) -> {
if (rc != BKException.Code.OK) {
- future.completeExceptionally(bkException("Failed to read entry", rc, ledger.getId()));
+ future.completeExceptionally(bkException("Read entry", rc, ledger.getId()));
} else {
future.complete(entries);
}
- closeLedger(handle);
}, null
);
return future;
@@ -231,7 +227,7 @@ private CompletableFuture deleteLedger(long ledgerId) {
CompletableFuture future = new CompletableFuture<>();
bookKeeper.asyncDeleteLedger(ledgerId, (int rc, Object cnx) -> {
if (rc != BKException.Code.OK) {
- future.completeExceptionally(bkException("Failed to delete ledger", rc, ledgerId));
+ future.completeExceptionally(bkException("Delete ledger", rc, ledgerId));
} else {
future.complete(null);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
index 5d2a556337a6e..5b7023be5034e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.delayed.bucket;
+import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.CURSOR_INTERNAL_PROPERTY_PREFIX;
import static org.apache.bookkeeper.mledger.util.Futures.executeWithRetry;
import java.util.HashMap;
import java.util.List;
@@ -31,6 +32,8 @@
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
+import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.common.util.FutureUtil;
import org.roaringbitmap.RoaringBitmap;
@Slf4j
@@ -38,11 +41,16 @@
@AllArgsConstructor
abstract class Bucket {
- static final String DELAYED_BUCKET_KEY_PREFIX = "#pulsar.internal.delayed.bucket";
+ static final String DELAYED_BUCKET_KEY_PREFIX = CURSOR_INTERNAL_PROPERTY_PREFIX + "delayed.bucket";
static final String DELIMITER = "_";
static final int MaxRetryTimes = 3;
+ protected final String dispatcherName;
+
protected final ManagedCursor cursor;
+
+ protected final FutureUtil.Sequencer sequencer;
+
protected final BucketSnapshotStorage bucketSnapshotStorage;
long startLedgerId;
@@ -54,17 +62,19 @@ abstract class Bucket {
int lastSegmentEntryId;
- int currentSegmentEntryId;
+ volatile int currentSegmentEntryId;
- long snapshotLength;
+ volatile long snapshotLength;
private volatile Long bucketId;
private volatile CompletableFuture snapshotCreateFuture;
- Bucket(ManagedCursor cursor, BucketSnapshotStorage storage, long startLedgerId, long endLedgerId) {
- this(cursor, storage, startLedgerId, endLedgerId, new HashMap<>(), -1, -1, 0, 0, null, null);
+ Bucket(String dispatcherName, ManagedCursor cursor, FutureUtil.Sequencer sequencer,
+ BucketSnapshotStorage storage, long startLedgerId, long endLedgerId) {
+ this(dispatcherName, cursor, sequencer, storage, startLedgerId, endLedgerId, new HashMap<>(), -1, -1, 0, 0,
+ null, null);
}
boolean containsMessage(long ledgerId, long entryId) {
@@ -126,13 +136,22 @@ CompletableFuture asyncSaveBucketSnapshot(
ImmutableBucket bucket, DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata,
List bucketSnapshotSegments) {
final String bucketKey = bucket.bucketKey();
- return bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, bucketSnapshotSegments, bucketKey)
- .thenCompose(newBucketId -> {
+ final String cursorName = Codec.decode(cursor.getName());
+ final String topicName = dispatcherName.substring(0, dispatcherName.lastIndexOf(" / " + cursorName));
+ return executeWithRetry(
+ () -> bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, bucketSnapshotSegments, bucketKey,
+ topicName, cursorName)
+ .whenComplete((__, ex) -> {
+ if (ex != null) {
+ log.warn("[{}] Failed to create bucket snapshot, bucketKey: {}",
+ dispatcherName, bucketKey, ex);
+ }
+ }), BucketSnapshotPersistenceException.class, MaxRetryTimes).thenCompose(newBucketId -> {
bucket.setBucketId(newBucketId);
return putBucketKeyId(bucketKey, newBucketId).exceptionally(ex -> {
- log.warn("Failed to record bucketId to cursor property, bucketKey: {}, bucketId: {}",
- bucketKey, bucketId);
+ log.warn("[{}] Failed to record bucketId to cursor property, bucketKey: {}, bucketId: {}",
+ dispatcherName, bucketKey, newBucketId, ex);
return null;
}).thenApply(__ -> newBucketId);
});
@@ -140,12 +159,16 @@ CompletableFuture asyncSaveBucketSnapshot(
private CompletableFuture putBucketKeyId(String bucketKey, Long bucketId) {
Objects.requireNonNull(bucketId);
- return executeWithRetry(() -> cursor.putCursorProperty(bucketKey, String.valueOf(bucketId)),
- ManagedLedgerException.BadVersionException.class, MaxRetryTimes);
+ return sequencer.sequential(() -> {
+ return executeWithRetry(() -> cursor.putCursorProperty(bucketKey, String.valueOf(bucketId)),
+ ManagedLedgerException.BadVersionException.class, MaxRetryTimes);
+ });
}
protected CompletableFuture removeBucketCursorProperty(String bucketKey) {
- return executeWithRetry(() -> cursor.removeCursorProperty(bucketKey),
- ManagedLedgerException.BadVersionException.class, MaxRetryTimes);
+ return sequencer.sequential(() -> {
+ return executeWithRetry(() -> cursor.removeCursorProperty(bucketKey),
+ ManagedLedgerException.BadVersionException.class, MaxRetryTimes);
+ });
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index 77c1dfb1eea14..a34bd51af98e4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELAYED_BUCKET_KEY_PREFIX;
import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELIMITER;
+import static org.apache.pulsar.broker.delayed.bucket.Bucket.MaxRetryTimes;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Range;
@@ -30,6 +31,8 @@
import io.netty.util.Timeout;
import io.netty.util.Timer;
import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -41,6 +44,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
import javax.annotation.concurrent.ThreadSafe;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -53,25 +57,39 @@
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
+import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
+import org.roaringbitmap.RoaringBitmap;
@Slf4j
@ThreadSafe
public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker {
- static final int AsyncOperationTimeoutSeconds = 30;
+ static final CompletableFuture NULL_LONG_PROMISE = CompletableFuture.completedFuture(null);
+
+ static final int AsyncOperationTimeoutSeconds = 60;
+
+ private static final Long INVALID_BUCKET_ID = -1L;
+
+ private static final int MAX_MERGE_NUM = 4;
private final long minIndexCountPerBucket;
- private final long timeStepPerBucketSnapshotSegment;
+ private final long timeStepPerBucketSnapshotSegmentInMillis;
+
+ private final int maxIndexesPerBucketSnapshotSegment;
private final int maxNumBuckets;
private long numberDelayedMessages;
+ @Getter
+ @VisibleForTesting
private final MutableBucket lastMutableBucket;
+ @Getter
+ @VisibleForTesting
private final TripleLongPriorityQueue sharedBucketPriorityQueue;
@Getter
@@ -80,43 +98,51 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker
private final Table snapshotSegmentLastIndexTable;
+ private final BucketDelayedMessageIndexStats stats;
+
public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher,
Timer timer, long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict,
BucketSnapshotStorage bucketSnapshotStorage,
- long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegment,
- int maxNumBuckets) {
+ long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis,
+ int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets) {
this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict,
- bucketSnapshotStorage, minIndexCountPerBucket, timeStepPerBucketSnapshotSegment, maxNumBuckets);
+ bucketSnapshotStorage, minIndexCountPerBucket, timeStepPerBucketSnapshotSegmentInMillis,
+ maxIndexesPerBucketSnapshotSegment, maxNumBuckets);
}
public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher,
Timer timer, long tickTimeMillis, Clock clock,
boolean isDelayedDeliveryDeliverAtTimeStrict,
BucketSnapshotStorage bucketSnapshotStorage,
- long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegment,
- int maxNumBuckets) {
+ long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis,
+ int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets) {
super(dispatcher, timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict);
this.minIndexCountPerBucket = minIndexCountPerBucket;
- this.timeStepPerBucketSnapshotSegment = timeStepPerBucketSnapshotSegment;
+ this.timeStepPerBucketSnapshotSegmentInMillis = timeStepPerBucketSnapshotSegmentInMillis;
+ this.maxIndexesPerBucketSnapshotSegment = maxIndexesPerBucketSnapshotSegment;
this.maxNumBuckets = maxNumBuckets;
this.sharedBucketPriorityQueue = new TripleLongPriorityQueue();
this.immutableBuckets = TreeRangeMap.create();
this.snapshotSegmentLastIndexTable = HashBasedTable.create();
- ManagedCursor cursor = dispatcher.getCursor();
- this.lastMutableBucket = new MutableBucket(cursor, bucketSnapshotStorage);
+ this.lastMutableBucket =
+ new MutableBucket(dispatcher.getName(), dispatcher.getCursor(), FutureUtil.Sequencer.create(),
+ bucketSnapshotStorage);
+ this.stats = new BucketDelayedMessageIndexStats();
this.numberDelayedMessages = recoverBucketSnapshot();
}
private synchronized long recoverBucketSnapshot() throws RuntimeException {
- ManagedCursor cursor = this.lastMutableBucket.cursor;
+ ManagedCursor cursor = this.lastMutableBucket.getCursor();
+ FutureUtil.Sequencer sequencer = this.lastMutableBucket.getSequencer();
Map, ImmutableBucket> toBeDeletedBucketMap = new HashMap<>();
cursor.getCursorProperties().keySet().forEach(key -> {
if (key.startsWith(DELAYED_BUCKET_KEY_PREFIX)) {
String[] keys = key.split(DELIMITER);
checkArgument(keys.length == 3);
ImmutableBucket immutableBucket =
- new ImmutableBucket(cursor, this.lastMutableBucket.bucketSnapshotStorage,
+ new ImmutableBucket(dispatcher.getName(), cursor, sequencer,
+ this.lastMutableBucket.bucketSnapshotStorage,
Long.parseLong(keys[1]), Long.parseLong(keys[2]));
putAndCleanOverlapRange(Range.closed(immutableBucket.startLedgerId, immutableBucket.endLedgerId),
immutableBucket, toBeDeletedBucketMap);
@@ -125,6 +151,8 @@ private synchronized long recoverBucketSnapshot() throws RuntimeException {
Map, ImmutableBucket> immutableBucketMap = immutableBuckets.asMapOfRanges();
if (immutableBucketMap.isEmpty()) {
+ log.info("[{}] Recover delayed message index bucket snapshot finish, don't find bucket snapshot",
+ dispatcher.getName());
return 0;
}
@@ -137,8 +165,9 @@ private synchronized long recoverBucketSnapshot() throws RuntimeException {
}
try {
- FutureUtil.waitForAll(futures.values()).get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS);
+ FutureUtil.waitForAll(futures.values()).get(AsyncOperationTimeoutSeconds * 2, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
+ log.error("[{}] Failed to recover delayed message index bucket snapshot.", dispatcher.getName(), e);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
@@ -169,7 +198,7 @@ private synchronized long recoverBucketSnapshot() throws RuntimeException {
ImmutableBucket immutableBucket = mapEntry.getValue();
immutableBucketMap.remove(key);
// delete asynchronously without waiting for completion
- immutableBucket.asyncDeleteBucketSnapshot();
+ immutableBucket.asyncDeleteBucketSnapshot(stats);
}
MutableLong numberDelayedMessages = new MutableLong(0);
@@ -222,7 +251,8 @@ private Optional findImmutableBucket(long ledgerId) {
return Optional.ofNullable(immutableBuckets.get(ledgerId));
}
- private void afterCreateImmutableBucket(Pair immutableBucketDelayedIndexPair) {
+ private void afterCreateImmutableBucket(Pair immutableBucketDelayedIndexPair,
+ long startTime) {
if (immutableBucketDelayedIndexPair != null) {
ImmutableBucket immutableBucket = immutableBucketDelayedIndexPair.getLeft();
immutableBuckets.put(Range.closed(immutableBucket.startLedgerId, immutableBucket.endLedgerId),
@@ -231,10 +261,48 @@ private void afterCreateImmutableBucket(Pair immu
DelayedIndex lastDelayedIndex = immutableBucketDelayedIndexPair.getRight();
snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(), lastDelayedIndex.getEntryId(),
immutableBucket);
- if (log.isDebugEnabled()) {
- log.debug("[{}] Create bucket snapshot, bucket: {}", dispatcher.getName(),
- lastMutableBucket);
- }
+
+ immutableBucket.getSnapshotCreateFuture().ifPresent(createFuture -> {
+ CompletableFuture future = createFuture.handle((bucketId, ex) -> {
+ if (ex == null) {
+ immutableBucket.setSnapshotSegments(null);
+ immutableBucket.asyncUpdateSnapshotLength();
+ log.info("[{}] Creat bucket snapshot finish, bucketKey: {}", dispatcher.getName(),
+ immutableBucket.bucketKey());
+
+ stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.create,
+ System.currentTimeMillis() - startTime);
+
+ return bucketId;
+ }
+
+ log.error("[{}] Failed to create bucket snapshot, bucketKey: {}", dispatcher.getName(),
+ immutableBucket.bucketKey(), ex);
+ stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.create);
+
+ // Put indexes back into the shared queue and downgrade to memory mode
+ synchronized (BucketDelayedDeliveryTracker.this) {
+ immutableBucket.getSnapshotSegments().ifPresent(snapshotSegments -> {
+ for (DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment :
+ snapshotSegments) {
+ for (DelayedIndex delayedIndex : snapshotSegment.getIndexesList()) {
+ sharedBucketPriorityQueue.add(delayedIndex.getTimestamp(),
+ delayedIndex.getLedgerId(), delayedIndex.getEntryId());
+ }
+ }
+ immutableBucket.setSnapshotSegments(null);
+ });
+
+ immutableBucket.setCurrentSegmentEntryId(immutableBucket.lastSegmentEntryId);
+ immutableBuckets.asMapOfRanges().remove(
+ Range.closed(immutableBucket.startLedgerId, immutableBucket.endLedgerId));
+ snapshotSegmentLastIndexTable.remove(lastDelayedIndex.getLedgerId(),
+ lastDelayedIndex.getTimestamp());
+ }
+ return INVALID_BUCKET_ID;
+ });
+ immutableBucket.setSnapshotCreateFuture(future);
+ });
}
}
@@ -254,21 +322,18 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver
if (!existBucket && ledgerId > lastMutableBucket.endLedgerId
&& lastMutableBucket.size() >= minIndexCountPerBucket
&& !lastMutableBucket.isEmpty()) {
+ long createStartTime = System.currentTimeMillis();
+ stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create);
Pair immutableBucketDelayedIndexPair =
- lastMutableBucket.sealBucketAndAsyncPersistent(this.timeStepPerBucketSnapshotSegment,
+ lastMutableBucket.sealBucketAndAsyncPersistent(
+ this.timeStepPerBucketSnapshotSegmentInMillis,
+ this.maxIndexesPerBucketSnapshotSegment,
this.sharedBucketPriorityQueue);
- afterCreateImmutableBucket(immutableBucketDelayedIndexPair);
+ afterCreateImmutableBucket(immutableBucketDelayedIndexPair, createStartTime);
lastMutableBucket.resetLastMutableBucketRange();
if (immutableBuckets.asMapOfRanges().size() > maxNumBuckets) {
- try {
- asyncMergeBucketSnapshot().get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- if (e instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- throw new RuntimeException(e);
- }
+ asyncMergeBucketSnapshot();
}
}
@@ -293,51 +358,147 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver
return true;
}
- private synchronized CompletableFuture asyncMergeBucketSnapshot() {
- List values = immutableBuckets.asMapOfRanges().values().stream().toList();
+ private synchronized List selectMergedBuckets(final List values, int mergeNum) {
+ checkArgument(mergeNum < values.size());
long minNumberMessages = Long.MAX_VALUE;
+ long minScheduleTimestamp = Long.MAX_VALUE;
int minIndex = -1;
- for (int i = 0; i + 1 < values.size(); i++) {
- ImmutableBucket bucketL = values.get(i);
- ImmutableBucket bucketR = values.get(i + 1);
- long numberMessages = bucketL.numberBucketDelayedMessages + bucketR.numberBucketDelayedMessages;
- if (numberMessages < minNumberMessages) {
- minNumberMessages = (int) numberMessages;
- minIndex = i;
+ for (int i = 0; i + (mergeNum - 1) < values.size(); i++) {
+ List immutableBuckets = values.subList(i, i + mergeNum);
+ if (immutableBuckets.stream().allMatch(bucket -> {
+ // We should skip the bucket which last segment already been load to memory,
+ // avoid record replicated index.
+ return bucket.lastSegmentEntryId > bucket.currentSegmentEntryId && !bucket.merging;
+ })) {
+ long numberMessages = immutableBuckets.stream()
+ .mapToLong(bucket -> bucket.numberBucketDelayedMessages)
+ .sum();
+ if (numberMessages <= minNumberMessages) {
+ minNumberMessages = numberMessages;
+ long scheduleTimestamp = immutableBuckets.stream()
+ .mapToLong(bucket -> bucket.firstScheduleTimestamps.get(bucket.currentSegmentEntryId + 1))
+ .min().getAsLong();
+ if (scheduleTimestamp < minScheduleTimestamp) {
+ minScheduleTimestamp = scheduleTimestamp;
+ minIndex = i;
+ }
+ }
}
}
- return asyncMergeBucketSnapshot(values.get(minIndex), values.get(minIndex + 1));
+
+ if (minIndex >= 0) {
+ return values.subList(minIndex, minIndex + mergeNum);
+ } else if (mergeNum > 2){
+ return selectMergedBuckets(values, mergeNum - 1);
+ } else {
+ return Collections.emptyList();
+ }
}
- private synchronized CompletableFuture asyncMergeBucketSnapshot(ImmutableBucket bucketA,
- ImmutableBucket bucketB) {
- immutableBuckets.remove(Range.closed(bucketA.startLedgerId, bucketA.endLedgerId));
- immutableBuckets.remove(Range.closed(bucketB.startLedgerId, bucketB.endLedgerId));
-
- CompletableFuture snapshotCreateFutureA =
- bucketA.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null));
- CompletableFuture snapshotCreateFutureB =
- bucketB.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null));
-
- return CompletableFuture.allOf(snapshotCreateFutureA, snapshotCreateFutureB).thenCompose(__ -> {
- CompletableFuture> futureA =
- bucketA.getRemainSnapshotSegment();
- CompletableFuture> futureB =
- bucketB.getRemainSnapshotSegment();
- return futureA.thenCombine(futureB, CombinedSegmentDelayedIndexQueue::wrap)
+ private synchronized CompletableFuture asyncMergeBucketSnapshot() {
+ List immutableBucketList = immutableBuckets.asMapOfRanges().values().stream().toList();
+ List toBeMergeImmutableBuckets = selectMergedBuckets(immutableBucketList, MAX_MERGE_NUM);
+
+ if (toBeMergeImmutableBuckets.isEmpty()) {
+ log.warn("[{}] Can't find able merged buckets", dispatcher.getName());
+ return CompletableFuture.completedFuture(null);
+ }
+
+ final String bucketsStr = toBeMergeImmutableBuckets.stream().map(Bucket::bucketKey).collect(
+ Collectors.joining(",")).replaceAll(DELAYED_BUCKET_KEY_PREFIX + "_", "");
+ if (log.isDebugEnabled()) {
+ log.info("[{}] Merging bucket snapshot, bucketKeys: {}", dispatcher.getName(), bucketsStr);
+ }
+
+ for (ImmutableBucket immutableBucket : toBeMergeImmutableBuckets) {
+ immutableBucket.merging = true;
+ }
+
+ long mergeStartTime = System.currentTimeMillis();
+ stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.merge);
+ return asyncMergeBucketSnapshot(toBeMergeImmutableBuckets).whenComplete((__, ex) -> {
+ synchronized (this) {
+ for (ImmutableBucket immutableBucket : toBeMergeImmutableBuckets) {
+ immutableBucket.merging = false;
+ }
+ }
+ if (ex != null) {
+ log.error("[{}] Failed to merge bucket snapshot, bucketKeys: {}",
+ dispatcher.getName(), bucketsStr, ex);
+
+ stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.merge);
+ } else {
+ log.info("[{}] Merge bucket snapshot finish, bucketKeys: {}, bucketNum: {}",
+ dispatcher.getName(), bucketsStr, immutableBuckets.asMapOfRanges().size());
+
+ stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.merge,
+ System.currentTimeMillis() - mergeStartTime);
+ }
+ });
+ }
+
+ private synchronized CompletableFuture asyncMergeBucketSnapshot(List buckets) {
+ List> createFutures =
+ buckets.stream().map(bucket -> bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE))
+ .toList();
+
+ return FutureUtil.waitForAll(createFutures).thenCompose(bucketId -> {
+ if (createFutures.stream().anyMatch(future -> INVALID_BUCKET_ID.equals(future.join()))) {
+ return FutureUtil.failedFuture(new RuntimeException("Can't merge buckets due to bucket create failed"));
+ }
+
+ List>> getRemainFutures =
+ buckets.stream().map(ImmutableBucket::getRemainSnapshotSegment).toList();
+
+ return FutureUtil.waitForAll(getRemainFutures)
+ .thenApply(__ -> {
+ return CombinedSegmentDelayedIndexQueue.wrap(
+ getRemainFutures.stream().map(CompletableFuture::join).toList());
+ })
.thenAccept(combinedDelayedIndexQueue -> {
- Pair immutableBucketDelayedIndexPair =
- lastMutableBucket.createImmutableBucketAndAsyncPersistent(
- timeStepPerBucketSnapshotSegment, sharedBucketPriorityQueue,
- combinedDelayedIndexQueue, bucketA.startLedgerId, bucketB.endLedgerId);
- afterCreateImmutableBucket(immutableBucketDelayedIndexPair);
-
- immutableBucketDelayedIndexPair.getLeft().getSnapshotCreateFuture()
- .orElse(CompletableFuture.completedFuture(null)).thenCompose(___ -> {
- CompletableFuture removeAFuture = bucketA.asyncDeleteBucketSnapshot();
- CompletableFuture removeBFuture = bucketB.asyncDeleteBucketSnapshot();
- return CompletableFuture.allOf(removeAFuture, removeBFuture);
+ synchronized (BucketDelayedDeliveryTracker.this) {
+ long createStartTime = System.currentTimeMillis();
+ stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create);
+ Pair immutableBucketDelayedIndexPair =
+ lastMutableBucket.createImmutableBucketAndAsyncPersistent(
+ timeStepPerBucketSnapshotSegmentInMillis,
+ maxIndexesPerBucketSnapshotSegment,
+ sharedBucketPriorityQueue, combinedDelayedIndexQueue,
+ buckets.get(0).startLedgerId,
+ buckets.get(buckets.size() - 1).endLedgerId);
+
+ // Merge bit map to new bucket
+ Map delayedIndexBitMap =
+ new HashMap<>(buckets.get(0).getDelayedIndexBitMap());
+ for (int i = 1; i < buckets.size(); i++) {
+ buckets.get(i).delayedIndexBitMap.forEach((ledgerId, bitMapB) -> {
+ delayedIndexBitMap.compute(ledgerId, (k, bitMap) -> {
+ if (bitMap == null) {
+ return bitMapB;
+ }
+
+ bitMap.or(bitMapB);
+ return bitMap;
+ });
});
+ }
+ immutableBucketDelayedIndexPair.getLeft().setDelayedIndexBitMap(delayedIndexBitMap);
+
+ afterCreateImmutableBucket(immutableBucketDelayedIndexPair, createStartTime);
+
+ immutableBucketDelayedIndexPair.getLeft().getSnapshotCreateFuture()
+ .orElse(NULL_LONG_PROMISE).thenCompose(___ -> {
+ List> removeFutures =
+ buckets.stream().map(bucket -> bucket.asyncDeleteBucketSnapshot(stats))
+ .toList();
+ return FutureUtil.waitForAll(removeFutures);
+ });
+
+ for (ImmutableBucket bucket : buckets) {
+ immutableBuckets.asMapOfRanges()
+ .remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
+ }
+ }
});
});
}
@@ -392,23 +553,44 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa
long ledgerId = sharedBucketPriorityQueue.peekN2();
long entryId = sharedBucketPriorityQueue.peekN3();
- positions.add(new PositionImpl(ledgerId, entryId));
- sharedBucketPriorityQueue.pop();
- removeIndexBit(ledgerId, entryId);
-
- ImmutableBucket bucket = snapshotSegmentLastIndexTable.remove(ledgerId, entryId);
+ ImmutableBucket bucket = snapshotSegmentLastIndexTable.get(ledgerId, entryId);
if (bucket != null && immutableBuckets.asMapOfRanges().containsValue(bucket)) {
+ if (bucket.merging) {
+ log.info("[{}] Skip load to wait for bucket snapshot merge finish, bucketKey:{}",
+ dispatcher.getName(), bucket.bucketKey());
+ break;
+ }
+
+ final int preSegmentEntryId = bucket.currentSegmentEntryId;
if (log.isDebugEnabled()) {
- log.debug("[{}] Load next snapshot segment, bucket: {}", dispatcher.getName(), bucket);
+ log.debug("[{}] Loading next bucket snapshot segment, bucketKey: {}, nextSegmentEntryId: {}",
+ dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1);
}
// All message of current snapshot segment are scheduled, load next snapshot segment
// TODO make it asynchronous and not blocking this process
try {
+ boolean createFutureDone = bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone();
+
+ if (!createFutureDone) {
+ log.info("[{}] Skip load to wait for bucket snapshot create finish, bucketKey:{}",
+ dispatcher.getName(), bucket.bucketKey());
+ break;
+ }
+
+ if (bucket.currentSegmentEntryId == bucket.lastSegmentEntryId) {
+ immutableBuckets.asMapOfRanges().remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
+ bucket.asyncDeleteBucketSnapshot(stats);
+ continue;
+ }
+
+ long loadStartTime = System.currentTimeMillis();
+ stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load);
bucket.asyncLoadNextBucketSnapshotEntry().thenAccept(indexList -> {
if (CollectionUtils.isEmpty(indexList)) {
- immutableBuckets.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
- bucket.asyncDeleteBucketSnapshot();
+ immutableBuckets.asMapOfRanges()
+ .remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
+ bucket.asyncDeleteBucketSnapshot(stats);
return;
}
DelayedMessageIndexBucketSnapshotFormat.DelayedIndex
@@ -419,12 +601,36 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa
sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(),
index.getEntryId());
}
- }).get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- // TODO make this segment load again
- throw new RuntimeException(e);
+ }).whenComplete((__, ex) -> {
+ if (ex != null) {
+ // Back bucket state
+ bucket.setCurrentSegmentEntryId(preSegmentEntryId);
+
+ log.error("[{}] Failed to load bucket snapshot segment, bucketKey: {}, segmentEntryId: {}",
+ dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1, ex);
+
+ stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.load);
+ } else {
+ log.info("[{}] Load next bucket snapshot segment finish, bucketKey: {}, segmentEntryId: {}",
+ dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1);
+
+ stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load,
+ System.currentTimeMillis() - loadStartTime);
+ }
+ }).get(AsyncOperationTimeoutSeconds * (MaxRetryTimes + 1), TimeUnit.SECONDS);
+ } catch (Exception e) {
+ // Ignore exception to reload this segment on the next schedule.
+ log.error("[{}] An exception occurs when load next bucket snapshot, bucketKey:{}",
+ dispatcher.getName(), bucket.bucketKey(), e);
+ break;
}
}
+ snapshotSegmentLastIndexTable.remove(ledgerId, entryId);
+
+ positions.add(new PositionImpl(ledgerId, entryId));
+
+ sharedBucketPriorityQueue.pop();
+ removeIndexBit(ledgerId, entryId);
--n;
--numberDelayedMessages;
@@ -441,31 +647,39 @@ public boolean shouldPauseAllDeliveries() {
}
@Override
- public synchronized void clear() {
- cleanImmutableBuckets(true);
+ public synchronized CompletableFuture clear() {
+ CompletableFuture future = cleanImmutableBuckets();
sharedBucketPriorityQueue.clear();
lastMutableBucket.clear();
snapshotSegmentLastIndexTable.clear();
numberDelayedMessages = 0;
+ return future;
}
@Override
public synchronized void close() {
super.close();
lastMutableBucket.close();
- cleanImmutableBuckets(false);
sharedBucketPriorityQueue.close();
+ try {
+ List> completableFutures = immutableBuckets.asMapOfRanges().values().stream()
+ .map(bucket -> bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE)).toList();
+ FutureUtil.waitForAll(completableFutures).get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ log.warn("[{}] Failed wait to snapshot generate", dispatcher.getName(), e);
+ }
}
- private void cleanImmutableBuckets(boolean delete) {
- if (immutableBuckets != null) {
- Iterator iterator = immutableBuckets.asMapOfRanges().values().iterator();
- while (iterator.hasNext()) {
- ImmutableBucket bucket = iterator.next();
- bucket.clear(delete);
- iterator.remove();
- }
+ private CompletableFuture cleanImmutableBuckets() {
+ List> futures = new ArrayList<>();
+ Iterator iterator = immutableBuckets.asMapOfRanges().values().iterator();
+ while (iterator.hasNext()) {
+ ImmutableBucket bucket = iterator.next();
+ futures.add(bucket.clear(stats));
+ numberDelayedMessages -= bucket.getNumberBucketDelayedMessages();
+ iterator.remove();
}
+ return FutureUtil.waitForAll(futures);
}
private boolean removeIndexBit(long ledgerId, long entryId) {
@@ -477,7 +691,6 @@ private boolean removeIndexBit(long ledgerId, long entryId) {
.orElse(false);
}
- @Override
public boolean containsMessage(long ledgerId, long entryId) {
if (lastMutableBucket.containsMessage(ledgerId, entryId)) {
return true;
@@ -486,4 +699,15 @@ public boolean containsMessage(long ledgerId, long entryId) {
return findImmutableBucket(ledgerId).map(bucket -> bucket.containsMessage(ledgerId, entryId))
.orElse(false);
}
+
+ public Map genTopicMetricMap() {
+ stats.recordNumOfBuckets(immutableBuckets.asMapOfRanges().size() + 1);
+ stats.recordDelayedMessageIndexLoaded(this.sharedBucketPriorityQueue.size() + this.lastMutableBucket.size());
+ MutableLong totalSnapshotLength = new MutableLong();
+ immutableBuckets.asMapOfRanges().values().forEach(immutableBucket -> {
+ totalSnapshotLength.add(immutableBucket.getSnapshotLength());
+ });
+ stats.recordBucketSnapshotSizeBytes(totalSnapshotLength.longValue());
+ return stats.genTopicMetricMap();
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedMessageIndexStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedMessageIndexStats.java
new file mode 100644
index 0000000000000..68788c359d560
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedMessageIndexStats.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.bookkeeper.mledger.util.StatsBuckets;
+import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
+
+public class BucketDelayedMessageIndexStats {
+
+ private static final long[] BUCKETS = new long[]{50, 100, 500, 1000, 5000, 30000, 60000};
+
+ enum State {
+ succeed,
+ failed,
+ all
+ }
+
+ enum Type {
+ create,
+ load,
+ delete,
+ merge
+ }
+
+ private static final String BUCKET_TOTAL_NAME = "pulsar_delayed_message_index_bucket_total";
+ private static final String INDEX_LOADED_NAME = "pulsar_delayed_message_index_loaded";
+ private static final String SNAPSHOT_SIZE_BYTES_NAME = "pulsar_delayed_message_index_bucket_snapshot_size_bytes";
+ private static final String OP_COUNT_NAME = "pulsar_delayed_message_index_bucket_op_count";
+ private static final String OP_LATENCY_NAME = "pulsar_delayed_message_index_bucket_op_latency_ms";
+
+ private final AtomicInteger delayedMessageIndexBucketTotal = new AtomicInteger();
+ private final AtomicLong delayedMessageIndexLoaded = new AtomicLong();
+ private final AtomicLong delayedMessageIndexBucketSnapshotSizeBytes = new AtomicLong();
+ private final Map delayedMessageIndexBucketOpLatencyMs = new ConcurrentHashMap<>();
+ private final Map delayedMessageIndexBucketOpCount = new ConcurrentHashMap<>();
+
+ public BucketDelayedMessageIndexStats() {
+ }
+
+ public Map genTopicMetricMap() {
+ Map metrics = new HashMap<>();
+
+ metrics.put(BUCKET_TOTAL_NAME,
+ new TopicMetricBean(BUCKET_TOTAL_NAME, delayedMessageIndexBucketTotal.get(), null));
+
+ metrics.put(INDEX_LOADED_NAME,
+ new TopicMetricBean(INDEX_LOADED_NAME, delayedMessageIndexLoaded.get(), null));
+
+ metrics.put(SNAPSHOT_SIZE_BYTES_NAME,
+ new TopicMetricBean(SNAPSHOT_SIZE_BYTES_NAME, delayedMessageIndexBucketSnapshotSizeBytes.get(), null));
+
+ delayedMessageIndexBucketOpCount.forEach((k, count) -> {
+ String[] labels = splitKey(k);
+ String[] labelsAndValues = new String[] {"state", labels[0], "type", labels[1]};
+ String key = OP_COUNT_NAME + joinKey(labelsAndValues);
+ metrics.put(key, new TopicMetricBean(OP_COUNT_NAME, count.sumThenReset(), labelsAndValues));
+ });
+
+ delayedMessageIndexBucketOpLatencyMs.forEach((typeName, statsBuckets) -> {
+ statsBuckets.refresh();
+ long[] buckets = statsBuckets.getBuckets();
+ for (int i = 0; i < buckets.length; i++) {
+ long count = buckets[i];
+ if (count == 0L) {
+ continue;
+ }
+ String quantile;
+ if (i == BUCKETS.length) {
+ quantile = "overflow";
+ } else {
+ quantile = String.valueOf(BUCKETS[i]);
+ }
+ String[] labelsAndValues = new String[] {"type", typeName, "quantile", quantile};
+ String key = OP_LATENCY_NAME + joinKey(labelsAndValues);
+
+ metrics.put(key, new TopicMetricBean(OP_LATENCY_NAME, count, labelsAndValues));
+ }
+ String[] labelsAndValues = new String[] {"type", typeName};
+ metrics.put(OP_LATENCY_NAME + "_count" + joinKey(labelsAndValues),
+ new TopicMetricBean(OP_LATENCY_NAME + "_count", statsBuckets.getCount(), labelsAndValues));
+ metrics.put(OP_LATENCY_NAME + "_sum" + joinKey(labelsAndValues),
+ new TopicMetricBean(OP_LATENCY_NAME + "_sum", statsBuckets.getSum(), labelsAndValues));
+ });
+
+ return metrics;
+ }
+
+ public void recordNumOfBuckets(int numOfBuckets) {
+ delayedMessageIndexBucketTotal.set(numOfBuckets);
+ }
+
+ public void recordDelayedMessageIndexLoaded(long num) {
+ delayedMessageIndexLoaded.set(num);
+ }
+
+ public void recordBucketSnapshotSizeBytes(long sizeBytes) {
+ delayedMessageIndexBucketSnapshotSizeBytes.set(sizeBytes);
+ }
+
+ public void recordTriggerEvent(Type eventType) {
+ delayedMessageIndexBucketOpCount.computeIfAbsent(joinKey(State.all.name(), eventType.name()),
+ k -> new LongAdder()).increment();
+ }
+
+ public void recordSuccessEvent(Type eventType, long cost) {
+ delayedMessageIndexBucketOpCount.computeIfAbsent(joinKey(State.succeed.name(), eventType.name()),
+ k -> new LongAdder()).increment();
+ delayedMessageIndexBucketOpLatencyMs.computeIfAbsent(eventType.name(),
+ k -> new StatsBuckets(BUCKETS)).addValue(cost);
+ }
+
+ public void recordFailEvent(Type eventType) {
+ delayedMessageIndexBucketOpCount.computeIfAbsent(joinKey(State.failed.name(), eventType.name()),
+ k -> new LongAdder()).increment();
+ }
+
+ public static String joinKey(String... values) {
+ return String.join("_", values);
+ }
+
+ public static String[] splitKey(String key) {
+ return key.split("_");
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java
index c6941e289f1ac..51c89bed47af2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java
@@ -28,14 +28,16 @@ public interface BucketSnapshotStorage {
/**
* Create a delayed message index bucket snapshot with metadata and bucketSnapshotSegments.
*
- * @param snapshotMetadata the metadata of snapshot
+ * @param snapshotMetadata the metadata of snapshot
* @param bucketSnapshotSegments the list of snapshot segments
- * @param bucketKey the key of bucket is used to generate custom storage metadata
+ * @param bucketKey the key of bucket is used to generate custom storage metadata
+ * @param topicName the name of topic is used to generate custom storage metadata
+ * @param cursorName the name of cursor is used to generate custom storage metadata
* @return the future with bucketId(ledgerId).
*/
CompletableFuture createBucketSnapshot(SnapshotMetadata snapshotMetadata,
List bucketSnapshotSegments,
- String bucketKey);
+ String bucketKey, String topicName, String cursorName);
/**
* Get delayed message index bucket snapshot metadata.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java
index 3f89cc9fdfb15..5655a26878296 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java
@@ -18,37 +18,48 @@
*/
package org.apache.pulsar.broker.delayed.bucket;
+import java.util.Comparator;
import java.util.List;
+import java.util.Objects;
+import java.util.PriorityQueue;
import javax.annotation.concurrent.NotThreadSafe;
+import lombok.AllArgsConstructor;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
@NotThreadSafe
class CombinedSegmentDelayedIndexQueue implements DelayedIndexQueue {
- private final List segmentListA;
- private final List segmentListB;
+ @AllArgsConstructor
+ static class Node {
+ List segmentList;
- private int segmentListACursor = 0;
- private int segmentListBCursor = 0;
- private int segmentACursor = 0;
- private int segmentBCursor = 0;
+ int segmentListCursor;
- private CombinedSegmentDelayedIndexQueue(List segmentListA,
- List segmentListB) {
- this.segmentListA = segmentListA;
- this.segmentListB = segmentListB;
+ int segmentCursor;
}
- public static CombinedSegmentDelayedIndexQueue wrap(
- List segmentListA,
- List segmentListB) {
- return new CombinedSegmentDelayedIndexQueue(segmentListA, segmentListB);
+ private static final Comparator COMPARATOR_NODE = (node1, node2) -> DelayedIndexQueue.COMPARATOR.compare(
+ node1.segmentList.get(node1.segmentListCursor).getIndexes(node1.segmentCursor),
+ node2.segmentList.get(node2.segmentListCursor).getIndexes(node2.segmentCursor));
+
+ private final PriorityQueue kpq;
+
+ private CombinedSegmentDelayedIndexQueue(List> segmentLists) {
+ this.kpq = new PriorityQueue<>(segmentLists.size(), COMPARATOR_NODE);
+ for (List segmentList : segmentLists) {
+ Node node = new Node(segmentList, 0, 0);
+ kpq.offer(node);
+ }
+ }
+
+ public static CombinedSegmentDelayedIndexQueue wrap(List> segmentLists) {
+ return new CombinedSegmentDelayedIndexQueue(segmentLists);
}
@Override
public boolean isEmpty() {
- return segmentListACursor >= segmentListA.size() && segmentListBCursor >= segmentListB.size();
+ return kpq.isEmpty();
}
@Override
@@ -62,48 +73,35 @@ public DelayedIndex pop() {
}
private DelayedIndex getValue(boolean needAdvanceCursor) {
- // skip empty segment
- while (segmentListACursor < segmentListA.size()
- && segmentListA.get(segmentListACursor).getIndexesCount() == 0) {
- segmentListACursor++;
- }
- while (segmentListBCursor < segmentListB.size()
- && segmentListB.get(segmentListBCursor).getIndexesCount() == 0) {
- segmentListBCursor++;
- }
+ Node node = kpq.peek();
+ Objects.requireNonNull(node);
- DelayedIndex delayedIndexA = null;
- DelayedIndex delayedIndexB = null;
- if (segmentListACursor >= segmentListA.size()) {
- delayedIndexB = segmentListB.get(segmentListBCursor).getIndexes(segmentBCursor);
- } else if (segmentListBCursor >= segmentListB.size()) {
- delayedIndexA = segmentListA.get(segmentListACursor).getIndexes(segmentACursor);
- } else {
- delayedIndexA = segmentListA.get(segmentListACursor).getIndexes(segmentACursor);
- delayedIndexB = segmentListB.get(segmentListBCursor).getIndexes(segmentBCursor);
+ SnapshotSegment snapshotSegment = node.segmentList.get(node.segmentListCursor);
+ DelayedIndex delayedIndex = snapshotSegment.getIndexes(node.segmentCursor);
+ if (!needAdvanceCursor) {
+ return delayedIndex;
}
- DelayedIndex resultValue;
- if (delayedIndexB == null || (delayedIndexA != null && COMPARATOR.compare(delayedIndexA, delayedIndexB) < 0)) {
- resultValue = delayedIndexA;
- if (needAdvanceCursor) {
- if (++segmentACursor >= segmentListA.get(segmentListACursor).getIndexesCount()) {
- segmentListA.set(segmentListACursor, null);
- ++segmentListACursor;
- segmentACursor = 0;
- }
- }
- } else {
- resultValue = delayedIndexB;
- if (needAdvanceCursor) {
- if (++segmentBCursor >= segmentListB.get(segmentListBCursor).getIndexesCount()) {
- segmentListB.set(segmentListBCursor, null);
- ++segmentListBCursor;
- segmentBCursor = 0;
+ kpq.poll();
+
+ if (node.segmentCursor + 1 < snapshotSegment.getIndexesCount()) {
+ node.segmentCursor++;
+ kpq.offer(node);
+ } else {
+ // help GC
+ node.segmentList.set(node.segmentListCursor, null);
+ while (node.segmentListCursor + 1 < node.segmentList.size()) {
+ node.segmentListCursor++;
+ node.segmentCursor = 0;
+
+ // skip empty segment
+ if (node.segmentList.get(node.segmentListCursor).getIndexesCount() > 0) {
+ kpq.offer(node);
+ break;
}
}
}
- return resultValue;
+ return delayedIndex;
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
index 8348b4999ed80..82e98cefa5d98 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
@@ -18,16 +18,17 @@
*/
package org.apache.pulsar.broker.delayed.bucket;
-import static org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker.AsyncOperationTimeoutSeconds;
+import static org.apache.bookkeeper.mledger.util.Futures.executeWithRetry;
+import static org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker.NULL_LONG_PROMISE;
import com.google.protobuf.ByteString;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
+import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.commons.collections4.CollectionUtils;
@@ -35,13 +36,28 @@
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata;
+import org.apache.pulsar.common.util.FutureUtil;
import org.roaringbitmap.RoaringBitmap;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
@Slf4j
class ImmutableBucket extends Bucket {
- ImmutableBucket(ManagedCursor cursor, BucketSnapshotStorage storage, long startLedgerId, long endLedgerId) {
- super(cursor, storage, startLedgerId, endLedgerId);
+
+ @Setter
+ private List snapshotSegments;
+
+ boolean merging = false;
+
+ @Setter
+ List firstScheduleTimestamps = new ArrayList<>();
+
+ ImmutableBucket(String dispatcherName, ManagedCursor cursor, FutureUtil.Sequencer sequencer,
+ BucketSnapshotStorage storage, long startLedgerId, long endLedgerId) {
+ super(dispatcherName, cursor, sequencer, storage, startLedgerId, endLedgerId);
+ }
+
+ public Optional> getSnapshotSegments() {
+ return Optional.ofNullable(snapshotSegments);
}
CompletableFuture> asyncLoadNextBucketSnapshotEntry() {
@@ -54,61 +70,72 @@ CompletableFuture> asyncRecoverBucketSnapshotEntry(Supplier> asyncLoadNextBucketSnapshotEntry(boolean isRecover,
Supplier cutoffTimeSupplier) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Load next bucket snapshot data, bucket: {}", cursor.getName(), this);
+ final long bucketId = getAndUpdateBucketId();
+ final CompletableFuture loadMetaDataFuture;
+ if (isRecover) {
+ final long cutoffTime = cutoffTimeSupplier.get();
+ // Load Metadata of bucket snapshot
+ final String bucketKey = bucketKey();
+ loadMetaDataFuture = executeWithRetry(() -> bucketSnapshotStorage.getBucketSnapshotMetadata(bucketId)
+ .whenComplete((___, ex) -> {
+ if (ex != null) {
+ log.warn("[{}] Failed to get bucket snapshot metadata,"
+ + " bucketKey: {}, bucketId: {}",
+ dispatcherName, bucketKey, bucketId, ex);
+ }
+ }), BucketSnapshotPersistenceException.class, MaxRetryTimes)
+ .thenApply(snapshotMetadata -> {
+ List metadataList =
+ snapshotMetadata.getMetadataListList();
+
+ // Skip all already reach schedule time snapshot segments
+ int nextSnapshotEntryIndex = 0;
+ while (nextSnapshotEntryIndex < metadataList.size()
+ && metadataList.get(nextSnapshotEntryIndex).getMaxScheduleTimestamp() <= cutoffTime) {
+ nextSnapshotEntryIndex++;
+ }
+
+ this.setLastSegmentEntryId(metadataList.size());
+ this.recoverDelayedIndexBitMapAndNumber(nextSnapshotEntryIndex, metadataList);
+ List firstScheduleTimestamps = metadataList.stream().map(
+ SnapshotSegmentMetadata::getMinScheduleTimestamp).toList();
+ this.setFirstScheduleTimestamps(firstScheduleTimestamps);
+
+ return nextSnapshotEntryIndex + 1;
+ });
+ } else {
+ loadMetaDataFuture = CompletableFuture.completedFuture(currentSegmentEntryId + 1);
}
- // Wait bucket snapshot create finish
- CompletableFuture snapshotCreateFuture =
- getSnapshotCreateFuture().orElseGet(() -> CompletableFuture.completedFuture(null))
- .thenApply(__ -> null);
-
- return snapshotCreateFuture.thenCompose(__ -> {
- final long bucketId = getAndUpdateBucketId();
- final CompletableFuture loadMetaDataFuture;
- if (isRecover) {
- final long cutoffTime = cutoffTimeSupplier.get();
- // Load Metadata of bucket snapshot
- loadMetaDataFuture = bucketSnapshotStorage.getBucketSnapshotMetadata(bucketId)
- .thenApply(snapshotMetadata -> {
- List metadataList =
- snapshotMetadata.getMetadataListList();
-
- // Skip all already reach schedule time snapshot segments
- int nextSnapshotEntryIndex = 0;
- while (nextSnapshotEntryIndex < metadataList.size()
- && metadataList.get(nextSnapshotEntryIndex).getMaxScheduleTimestamp() <= cutoffTime) {
- nextSnapshotEntryIndex++;
- }
-
- this.setLastSegmentEntryId(metadataList.size());
- this.recoverDelayedIndexBitMapAndNumber(nextSnapshotEntryIndex, metadataList);
-
- return nextSnapshotEntryIndex + 1;
- });
- } else {
- loadMetaDataFuture = CompletableFuture.completedFuture(currentSegmentEntryId + 1);
+ return loadMetaDataFuture.thenCompose(nextSegmentEntryId -> {
+ if (nextSegmentEntryId > lastSegmentEntryId) {
+ return CompletableFuture.completedFuture(null);
}
- return loadMetaDataFuture.thenCompose(nextSegmentEntryId -> {
- if (nextSegmentEntryId > lastSegmentEntryId) {
- return CompletableFuture.completedFuture(null);
- }
-
- return bucketSnapshotStorage.getBucketSnapshotSegment(bucketId, nextSegmentEntryId, nextSegmentEntryId)
- .thenApply(bucketSnapshotSegments -> {
- if (CollectionUtils.isEmpty(bucketSnapshotSegments)) {
- return Collections.emptyList();
- }
-
- DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment =
- bucketSnapshotSegments.get(0);
- List indexList =
- snapshotSegment.getIndexesList();
- this.setCurrentSegmentEntryId(nextSegmentEntryId);
- return indexList;
- });
- });
+ return executeWithRetry(
+ () -> bucketSnapshotStorage.getBucketSnapshotSegment(bucketId, nextSegmentEntryId,
+ nextSegmentEntryId).whenComplete((___, ex) -> {
+ if (ex != null) {
+ log.warn("[{}] Failed to get bucket snapshot segment. bucketKey: {},"
+ + " bucketId: {}, segmentEntryId: {}", dispatcherName, bucketKey(),
+ bucketId, nextSegmentEntryId, ex);
+ }
+ }), BucketSnapshotPersistenceException.class, MaxRetryTimes)
+ .thenApply(bucketSnapshotSegments -> {
+ if (CollectionUtils.isEmpty(bucketSnapshotSegments)) {
+ return Collections.emptyList();
+ }
+
+ DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment =
+ bucketSnapshotSegments.get(0);
+ List indexList =
+ snapshotSegment.getIndexesList();
+ this.setCurrentSegmentEntryId(nextSegmentEntryId);
+ if (isRecover) {
+ this.asyncUpdateSnapshotLength();
+ }
+ return indexList;
+ });
});
}
@@ -134,42 +161,60 @@ private void recoverDelayedIndexBitMapAndNumber(int startSnapshotIndex,
}
CompletableFuture> getRemainSnapshotSegment() {
- return bucketSnapshotStorage.getBucketSnapshotSegment(getAndUpdateBucketId(), currentSegmentEntryId,
- lastSegmentEntryId);
+ int nextSegmentEntryId = currentSegmentEntryId + 1;
+ if (nextSegmentEntryId > lastSegmentEntryId) {
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
+ return executeWithRetry(() -> {
+ return bucketSnapshotStorage.getBucketSnapshotSegment(getAndUpdateBucketId(), nextSegmentEntryId,
+ lastSegmentEntryId).whenComplete((__, ex) -> {
+ if (ex != null) {
+ log.warn(
+ "[{}] Failed to get remain bucket snapshot segment, bucketKey: {},"
+ + " nextSegmentEntryId: {}, lastSegmentEntryId: {}",
+ dispatcherName, bucketKey(), nextSegmentEntryId, lastSegmentEntryId, ex);
+ }
+ });
+ }, BucketSnapshotPersistenceException.class, MaxRetryTimes);
}
- CompletableFuture asyncDeleteBucketSnapshot() {
+ CompletableFuture asyncDeleteBucketSnapshot(BucketDelayedMessageIndexStats stats) {
+ long deleteStartTime = System.currentTimeMillis();
+ stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.delete);
String bucketKey = bucketKey();
long bucketId = getAndUpdateBucketId();
return removeBucketCursorProperty(bucketKey).thenCompose(__ ->
- bucketSnapshotStorage.deleteBucketSnapshot(bucketId)).whenComplete((__, ex) -> {
+ executeWithRetry(() -> bucketSnapshotStorage.deleteBucketSnapshot(bucketId),
+ BucketSnapshotPersistenceException.class, MaxRetryTimes)).whenComplete((__, ex) -> {
if (ex != null) {
- log.warn("Failed to delete bucket snapshot, bucketId: {}, bucketKey: {}",
- bucketId, bucketKey, ex);
+ log.error("[{}] Failed to delete bucket snapshot, bucketId: {}, bucketKey: {}",
+ dispatcherName, bucketId, bucketKey, ex);
+
+ stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.delete);
+ } else {
+ log.info("[{}] Delete bucket snapshot finish, bucketId: {}, bucketKey: {}",
+ dispatcherName, bucketId, bucketKey);
+
+ stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.delete,
+ System.currentTimeMillis() - deleteStartTime);
}
});
}
- void clear(boolean delete) {
+ CompletableFuture clear(BucketDelayedMessageIndexStats stats) {
delayedIndexBitMap.clear();
- getSnapshotCreateFuture().ifPresent(snapshotGenerateFuture -> {
- if (delete) {
- snapshotGenerateFuture.cancel(true);
- try {
- asyncDeleteBucketSnapshot().get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- if (e instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- throw new RuntimeException(e);
- }
+ return getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).exceptionally(e -> null)
+ .thenCompose(__ -> asyncDeleteBucketSnapshot(stats));
+ }
+
+ protected CompletableFuture asyncUpdateSnapshotLength() {
+ long bucketId = getAndUpdateBucketId();
+ return bucketSnapshotStorage.getBucketSnapshotLength(bucketId).whenComplete((length, ex) -> {
+ if (ex != null) {
+ log.error("[{}] Failed to get snapshot length, bucketId: {}, bucketKey: {}",
+ dispatcherName, bucketId, bucketKey(), ex);
} else {
- try {
- snapshotGenerateFuture.get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS);
- } catch (Exception e) {
- log.warn("Failed wait to snapshot generate, bucketId: {}, bucketKey: {}", getBucketId(),
- bucketKey());
- }
+ setSnapshotLength(length);
}
});
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
index ad457329c427f..e49ebe9606e01 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
@@ -34,6 +34,7 @@
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
import org.roaringbitmap.RoaringBitmap;
@@ -42,23 +43,30 @@ class MutableBucket extends Bucket implements AutoCloseable {
private final TripleLongPriorityQueue priorityQueue;
- MutableBucket(ManagedCursor cursor,
+ MutableBucket(String dispatcherName, ManagedCursor cursor, FutureUtil.Sequencer sequencer,
BucketSnapshotStorage bucketSnapshotStorage) {
- super(cursor, bucketSnapshotStorage, -1L, -1L);
+ super(dispatcherName, cursor, sequencer, bucketSnapshotStorage, -1L, -1L);
this.priorityQueue = new TripleLongPriorityQueue();
}
Pair sealBucketAndAsyncPersistent(
long timeStepPerBucketSnapshotSegment,
+ int maxIndexesPerBucketSnapshotSegment,
TripleLongPriorityQueue sharedQueue) {
- return createImmutableBucketAndAsyncPersistent(timeStepPerBucketSnapshotSegment, sharedQueue,
+ return createImmutableBucketAndAsyncPersistent(timeStepPerBucketSnapshotSegment,
+ maxIndexesPerBucketSnapshotSegment, sharedQueue,
TripleLongPriorityDelayedIndexQueue.wrap(priorityQueue), startLedgerId, endLedgerId);
}
Pair createImmutableBucketAndAsyncPersistent(
- final long timeStepPerBucketSnapshotSegment,
+ final long timeStepPerBucketSnapshotSegment, final int maxIndexesPerBucketSnapshotSegment,
TripleLongPriorityQueue sharedQueue, DelayedIndexQueue delayedIndexQueue, final long startLedgerId,
final long endLedgerId) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Creating bucket snapshot, startLedgerId: {}, endLedgerId: {}", dispatcherName,
+ startLedgerId, endLedgerId);
+ }
+
if (delayedIndexQueue.isEmpty()) {
return null;
}
@@ -70,11 +78,15 @@ Pair createImmutableBucketAndAsyncPersistent(
SnapshotSegment.Builder snapshotSegmentBuilder = SnapshotSegment.newBuilder();
SnapshotSegmentMetadata.Builder segmentMetadataBuilder = SnapshotSegmentMetadata.newBuilder();
+ List firstScheduleTimestamps = new ArrayList<>();
long currentTimestampUpperLimit = 0;
+ long currentFirstTimestamp = 0L;
while (!delayedIndexQueue.isEmpty()) {
DelayedIndex delayedIndex = delayedIndexQueue.peek();
long timestamp = delayedIndex.getTimestamp();
if (currentTimestampUpperLimit == 0) {
+ currentFirstTimestamp = timestamp;
+ firstScheduleTimestamps.add(currentFirstTimestamp);
currentTimestampUpperLimit = timestamp + timeStepPerBucketSnapshotSegment - 1;
}
@@ -95,8 +107,11 @@ Pair createImmutableBucketAndAsyncPersistent(
snapshotSegmentBuilder.addIndexes(delayedIndex);
- if (delayedIndexQueue.isEmpty() || delayedIndexQueue.peek().getTimestamp() > currentTimestampUpperLimit) {
+ if (delayedIndexQueue.isEmpty() || delayedIndexQueue.peek().getTimestamp() > currentTimestampUpperLimit
+ || (maxIndexesPerBucketSnapshotSegment != -1
+ && snapshotSegmentBuilder.getIndexesCount() >= maxIndexesPerBucketSnapshotSegment)) {
segmentMetadataBuilder.setMaxScheduleTimestamp(timestamp);
+ segmentMetadataBuilder.setMinScheduleTimestamp(currentFirstTimestamp);
currentTimestampUpperLimit = 0;
Iterator> iterator = bitMap.entrySet().iterator();
@@ -122,10 +137,16 @@ Pair createImmutableBucketAndAsyncPersistent(
final int lastSegmentEntryId = segmentMetadataList.size();
- ImmutableBucket bucket = new ImmutableBucket(cursor, bucketSnapshotStorage, startLedgerId, endLedgerId);
+ ImmutableBucket bucket = new ImmutableBucket(dispatcherName, cursor, sequencer, bucketSnapshotStorage,
+ startLedgerId, endLedgerId);
bucket.setCurrentSegmentEntryId(1);
bucket.setNumberBucketDelayedMessages(numMessages);
bucket.setLastSegmentEntryId(lastSegmentEntryId);
+ bucket.setFirstScheduleTimestamps(firstScheduleTimestamps);
+
+ // Skip first segment, because it has already been loaded
+ List snapshotSegments = bucketSnapshotSegments.subList(1, bucketSnapshotSegments.size());
+ bucket.setSnapshotSegments(snapshotSegments);
// Add the first snapshot segment last message to snapshotSegmentLastMessageTable
checkArgument(!bucketSnapshotSegments.isEmpty());
@@ -136,12 +157,6 @@ Pair createImmutableBucketAndAsyncPersistent(
CompletableFuture future = asyncSaveBucketSnapshot(bucket,
bucketSnapshotMetadata, bucketSnapshotSegments);
bucket.setSnapshotCreateFuture(future);
- future.whenComplete((__, ex) -> {
- if (ex != null) {
- //TODO Record create snapshot failed
- log.error("Failed to create snapshot: ", ex);
- }
- });
return result;
}
@@ -169,6 +184,7 @@ void resetLastMutableBucketRange() {
void clear() {
this.resetLastMutableBucketRange();
this.delayedIndexBitMap.clear();
+ this.priorityQueue.clear();
}
public void close() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
index a74730d23e102..faee5799289d0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.intercept;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.Map;
@@ -208,4 +209,9 @@ public void close() {
log.warn("Failed to close the broker interceptor class loader", e);
}
}
+
+ @VisibleForTesting
+ public BrokerInterceptor getInterceptor() {
+ return interceptor;
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
index e7f82742a97cc..cef3f0eb609a1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.intercept;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
@@ -277,4 +278,9 @@ public void close() {
private boolean interceptorsEnabled() {
return interceptors != null && !interceptors.isEmpty();
}
+
+ @VisibleForTesting
+ public Map getInterceptors() {
+ return interceptors;
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
index e3b94ec94a4c1..30713c91907ef 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
@@ -79,6 +79,15 @@ public OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages) {
return op;
}
+ @Override
+ public void afterFailedAddEntry(int numberOfMessages) {
+ for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
+ if (interceptor instanceof AppendIndexMetadataInterceptor) {
+ ((AppendIndexMetadataInterceptor) interceptor).decreaseWithNumberOfMessages(numberOfMessages);
+ }
+ }
+ }
+
@Override
public void onManagedLedgerPropertiesInitialize(Map propertiesMap) {
if (propertiesMap == null || propertiesMap.size() == 0) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingStrategy.java
index ee136d6e9b851..3cad4b74d9903 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingStrategy.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingStrategy.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.loadbalance;
import com.google.common.collect.Multimap;
+import java.util.Set;
import org.apache.pulsar.broker.ServiceConfiguration;
/**
@@ -36,4 +37,11 @@ public interface LoadSheddingStrategy {
* @return A map from all selected bundles to the brokers on which they reside.
*/
Multimap findBundlesForUnloading(LoadData loadData, ServiceConfiguration conf);
+
+ /**
+ * Triggered when active broker changes.
+ *
+ * @param activeBrokers active Brokers
+ */
+ default void onActiveBrokersChange(Set activeBrokers) {}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategy.java
index 2be8200aef5c1..91a619eafc226 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategy.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategy.java
@@ -46,6 +46,13 @@ public interface ModularLoadManagerStrategy {
Optional selectBroker(Set candidates, BundleData bundleToAssign, LoadData loadData,
ServiceConfiguration conf);
+ /**
+ * Triggered when active brokers change.
+ */
+ default void onActiveBrokersChange(Set activeBrokers) {
+
+ }
+
/**
* Create a placement strategy using the configuration.
*
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 66c271ab22eac..3e078d0a5eb20 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -18,6 +18,12 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions;
+import static java.lang.String.format;
+import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Follower;
+import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Leader;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin;
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -26,29 +32,60 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
+import org.apache.pulsar.broker.loadbalance.extensions.filter.AntiAffinityGroupPolicyFilter;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
+import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerIsolationPoliciesFilter;
+import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerMaxTopicCountFilter;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerVersionFilter;
+import org.apache.pulsar.broker.loadbalance.extensions.manager.SplitManager;
+import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager;
+import org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter;
+import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
+import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
+import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
+import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
+import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
+import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
+import org.apache.pulsar.broker.loadbalance.extensions.policies.AntiAffinityGroupPolicyHelper;
+import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
+import org.apache.pulsar.broker.loadbalance.extensions.reporter.TopBundleLoadDataReporter;
+import org.apache.pulsar.broker.loadbalance.extensions.scheduler.LoadManagerScheduler;
+import org.apache.pulsar.broker.loadbalance.extensions.scheduler.SplitScheduler;
+import org.apache.pulsar.broker.loadbalance.extensions.scheduler.UnloadScheduler;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory;
import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
+import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
+import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
+import org.slf4j.Logger;
@Slf4j
public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
@@ -63,6 +100,10 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
NamespaceName.SYSTEM_NAMESPACE,
"loadbalancer-top-bundles-load-data").toString();
+ private static final long MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS = 200;
+
+ private static final long MONITOR_INTERVAL_IN_MILLIS = 120_000;
+
private PulsarService pulsar;
private ServiceConfiguration conf;
@@ -70,11 +111,22 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
@Getter
private BrokerRegistry brokerRegistry;
+ @Getter
private ServiceUnitStateChannel serviceUnitStateChannel;
+ private AntiAffinityGroupPolicyFilter antiAffinityGroupPolicyFilter;
+
+ @Getter
+ private AntiAffinityGroupPolicyHelper antiAffinityGroupPolicyHelper;
+
private LoadDataStore brokerLoadDataStore;
private LoadDataStore topBundlesLoadDataStore;
+ private LoadManagerScheduler unloadScheduler;
+
+ @Getter
+ private LeaderElectionService leaderElectionService;
+
@Getter
private LoadManagerContext context;
@@ -83,19 +135,57 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
@Getter
private final List brokerFilterPipeline;
+ /**
+ * The load data reporter.
+ */
+ private BrokerLoadDataReporter brokerLoadDataReporter;
+
+ private TopBundleLoadDataReporter topBundleLoadDataReporter;
+
+ private ScheduledFuture brokerLoadDataReportTask;
+ private ScheduledFuture topBundlesLoadDataReportTask;
+
+ private ScheduledFuture monitorTask;
+ private SplitScheduler splitScheduler;
+
+ private UnloadManager unloadManager;
+
+ private SplitManager splitManager;
private boolean started = false;
+ private final AssignCounter assignCounter = new AssignCounter();
+ @Getter
+ private final UnloadCounter unloadCounter = new UnloadCounter();
+ private final SplitCounter splitCounter = new SplitCounter();
+
+ // record load metrics
+ private final AtomicReference> brokerLoadMetrics = new AtomicReference<>();
+ // record unload metrics
+ private final AtomicReference> unloadMetrics = new AtomicReference();
+ // record split metrics
+ private final AtomicReference> splitMetrics = new AtomicReference<>();
+
private final ConcurrentOpenHashMap>>
lookupRequests = ConcurrentOpenHashMap.>>newBuilder()
.build();
+ private final CountDownLatch initWaiter = new CountDownLatch(1);
+
+ public enum Role {
+ Leader,
+ Follower
+ }
+
+ private volatile Role role;
/**
* Life cycle: Constructor -> initialize -> start -> close.
*/
public ExtensibleLoadManagerImpl() {
this.brokerFilterPipeline = new ArrayList<>();
+ this.brokerFilterPipeline.add(new BrokerMaxTopicCountFilter());
+ this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter());
this.brokerFilterPipeline.add(new BrokerVersionFilter());
// TODO: Make brokerSelectionStrategy configurable.
this.brokerSelectionStrategy = new LeastResourceUsageWithWeight();
@@ -105,19 +195,52 @@ public static boolean isLoadManagerExtensionEnabled(ServiceConfiguration conf) {
return ExtensibleLoadManagerImpl.class.getName().equals(conf.getLoadManagerClassName());
}
+ public static ExtensibleLoadManagerImpl get(LoadManager loadManager) {
+ if (!(loadManager instanceof ExtensibleLoadManagerWrapper loadManagerWrapper)) {
+ throw new IllegalArgumentException("The load manager should be 'ExtensibleLoadManagerWrapper'.");
+ }
+ return loadManagerWrapper.get();
+ }
+
+ public static boolean debug(ServiceConfiguration config, Logger log) {
+ return config.isLoadBalancerDebugModeEnabled() || log.isDebugEnabled();
+ }
+
@Override
public void start() throws PulsarServerException {
if (this.started) {
return;
}
this.brokerRegistry = new BrokerRegistryImpl(pulsar);
+ this.leaderElectionService = new LeaderElectionService(
+ pulsar.getCoordinationService(), pulsar.getSafeWebServiceAddress(),
+ state -> {
+ pulsar.getLoadManagerExecutor().execute(() -> {
+ if (state == LeaderElectionState.Leading) {
+ playLeader();
+ } else {
+ playFollower();
+ }
+ });
+ });
this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
this.brokerRegistry.start();
+ this.splitManager = new SplitManager(splitCounter);
+ this.unloadManager = new UnloadManager(unloadCounter);
+ this.serviceUnitStateChannel.listen(unloadManager);
+ this.serviceUnitStateChannel.listen(splitManager);
+ this.leaderElectionService.start();
this.serviceUnitStateChannel.start();
+ this.antiAffinityGroupPolicyHelper =
+ new AntiAffinityGroupPolicyHelper(pulsar, serviceUnitStateChannel);
+ antiAffinityGroupPolicyHelper.listenFailureDomainUpdate();
+ this.antiAffinityGroupPolicyFilter = new AntiAffinityGroupPolicyFilter(antiAffinityGroupPolicyHelper);
+ this.brokerFilterPipeline.add(antiAffinityGroupPolicyFilter);
try {
this.brokerLoadDataStore = LoadDataStoreFactory
.create(pulsar.getClient(), BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class);
+ this.brokerLoadDataStore.startTableView();
this.topBundlesLoadDataStore = LoadDataStoreFactory
.create(pulsar.getClient(), TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);
} catch (LoadDataStoreException e) {
@@ -129,9 +252,54 @@ public void start() throws PulsarServerException {
.brokerRegistry(brokerRegistry)
.brokerLoadDataStore(brokerLoadDataStore)
.topBundleLoadDataStore(topBundlesLoadDataStore).build();
- // TODO: Start load data reporter.
- // TODO: Start unload scheduler and bundle split scheduler
+ this.brokerLoadDataReporter =
+ new BrokerLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), brokerLoadDataStore);
+
+ this.topBundleLoadDataReporter =
+ new TopBundleLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), topBundlesLoadDataStore);
+ this.serviceUnitStateChannel.listen(brokerLoadDataReporter);
+ this.serviceUnitStateChannel.listen(topBundleLoadDataReporter);
+ var interval = conf.getLoadBalancerReportUpdateMinIntervalMillis();
+ this.brokerLoadDataReportTask = this.pulsar.getLoadManagerExecutor()
+ .scheduleAtFixedRate(() -> {
+ try {
+ brokerLoadDataReporter.reportAsync(false);
+ // TODO: update broker load metrics using getLocalData
+ } catch (Throwable e) {
+ log.error("Failed to run the broker load manager executor job.", e);
+ }
+ },
+ interval,
+ interval, TimeUnit.MILLISECONDS);
+
+ this.topBundlesLoadDataReportTask = this.pulsar.getLoadManagerExecutor()
+ .scheduleAtFixedRate(() -> {
+ try {
+ // TODO: consider excluding the bundles that are in the process of split.
+ topBundleLoadDataReporter.reportAsync(false);
+ } catch (Throwable e) {
+ log.error("Failed to run the top bundles load manager executor job.", e);
+ }
+ },
+ interval,
+ interval, TimeUnit.MILLISECONDS);
+
+ this.monitorTask = this.pulsar.getLoadManagerExecutor()
+ .scheduleAtFixedRate(() -> {
+ monitor();
+ },
+ MONITOR_INTERVAL_IN_MILLIS,
+ MONITOR_INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS);
+
+ this.unloadScheduler = new UnloadScheduler(
+ pulsar, pulsar.getLoadManagerExecutor(), unloadManager, context, serviceUnitStateChannel,
+ unloadCounter, unloadMetrics);
+ this.unloadScheduler.start();
+ this.splitScheduler = new SplitScheduler(
+ pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context);
+ this.splitScheduler.start();
+ this.initWaiter.countDown();
this.started = true;
}
@@ -139,6 +307,7 @@ public void start() throws PulsarServerException {
public void initialize(PulsarService pulsar) {
this.pulsar = pulsar;
this.conf = pulsar.getConfiguration();
+ this.brokerFilterPipeline.forEach(brokerFilter -> brokerFilter.initialize(pulsar));
}
@Override
@@ -158,15 +327,18 @@ public CompletableFuture> assign(Optional {
if (brokerOpt.isPresent()) {
+ assignCounter.incrementSuccess();
log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle);
return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get())
.thenApply(Optional::of);
} else {
+ assignCounter.incrementEmpty();
throw new IllegalStateException(
"Failed to select the new owner broker for bundle: " + bundle);
}
});
}
+ assignCounter.incrementSkip();
// Already assigned, return it.
return CompletableFuture.completedFuture(broker);
});
@@ -175,7 +347,7 @@ public CompletableFuture> assign(Optional {
if (broker.isEmpty()) {
String errorMsg = String.format(
- "Failed to look up a broker registry:%s for bundle:%s", broker, bundle);
+ "Failed to get or assign the owner for bundle:%s", bundle);
log.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
@@ -198,7 +370,6 @@ private CompletableFuture> selectAsync(ServiceUnitId bundle) {
BrokerRegistry brokerRegistry = getBrokerRegistry();
return brokerRegistry.getAvailableBrokerLookupDataAsync()
.thenCompose(availableBrokers -> {
- // TODO: Support isolation policies
LoadManagerContext context = this.getContext();
Map availableBrokerCandidates = new HashMap<>(availableBrokers);
@@ -207,11 +378,13 @@ private CompletableFuture> selectAsync(ServiceUnitId bundle) {
List filterPipeline = getBrokerFilterPipeline();
for (final BrokerFilter filter : filterPipeline) {
try {
- filter.filter(availableBrokerCandidates, context);
+ filter.filter(availableBrokerCandidates, bundle, context);
+ // Preserve the filter successes result.
+ availableBrokers.keySet().retainAll(availableBrokerCandidates.keySet());
} catch (BrokerFilterException e) {
// TODO: We may need to revisit this error case.
log.error("Failed to filter out brokers.", e);
- availableBrokerCandidates = availableBrokers;
+ availableBrokerCandidates = new HashMap<>(availableBrokers);
}
}
if (availableBrokerCandidates.isEmpty()) {
@@ -226,6 +399,12 @@ private CompletableFuture> selectAsync(ServiceUnitId bundle) {
@Override
public CompletableFuture checkOwnershipAsync(Optional topic, ServiceUnitId bundleUnit) {
+ return getOwnershipAsync(topic, bundleUnit)
+ .thenApply(broker -> brokerRegistry.getBrokerId().equals(broker.orElse(null)));
+ }
+
+ public CompletableFuture> getOwnershipAsync(Optional topic,
+ ServiceUnitId bundleUnit) {
final String bundle = bundleUnit.toString();
CompletableFuture> owner;
if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
@@ -233,8 +412,96 @@ public CompletableFuture checkOwnershipAsync(Optional