diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/coordination/impl/LockManagerTest.java similarity index 91% rename from pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java rename to pulsar-metadata/src/test/java/org/apache/pulsar/metadata/coordination/impl/LockManagerTest.java index ebd60bad5507d..9a2500b8c4c13 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/coordination/impl/LockManagerTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.metadata; +package org.apache.pulsar.metadata.coordination.impl; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -27,6 +27,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; +import java.util.List; import java.util.Optional; import java.util.concurrent.CompletionException; import java.util.concurrent.CountDownLatch; @@ -34,18 +35,18 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import lombok.Cleanup; +import org.apache.bookkeeper.discover.BookieServiceInfo; import org.apache.pulsar.common.util.ObjectMapperFactory; -import org.apache.pulsar.metadata.api.GetResult; -import org.apache.pulsar.metadata.api.MetadataCache; -import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.BaseMetadataStoreTest; +import org.apache.pulsar.metadata.api.*; import org.apache.pulsar.metadata.api.MetadataStoreException.LockBusyException; import org.apache.pulsar.metadata.api.coordination.CoordinationService; import org.apache.pulsar.metadata.api.coordination.LockManager; import org.apache.pulsar.metadata.api.coordination.ResourceLock; import org.apache.pulsar.metadata.api.extended.CreateOption; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; -import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl; import org.awaitility.Awaitility; +import org.testng.Assert; import org.testng.annotations.Test; public class LockManagerTest extends BaseMetadataStoreTest { @@ -383,4 +384,29 @@ public void lockDeletedAndReacquired(String provider, Supplier urlSuppli assertFalse(lock.getLockExpiredFuture().isDone()); }); } + + @Test(dataProvider = "impl") + public void lockDeletedAndReacquiredWithBookieInfo(String __, Supplier urlSupplier) throws Exception { + @Cleanup + MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(), + MetadataStoreConfig.builder().fsyncEnable(false).build()); + final List notifications = new ArrayList<>(); + store.registerListener(notifications::add); + @Cleanup + CoordinationService coordinationService = new CoordinationServiceImpl(store); + @Cleanup + LockManager lockManager = coordinationService.getLockManager(BookieServiceInfo.class); + String key = newKey(); + ResourceLockImpl lock = (ResourceLockImpl)lockManager.acquireLock(key, new BookieServiceInfo()).join(); + lock.silentRevalidateOnce().join(); + // wait for 1 sec + Thread.sleep(1000); + + Assert.assertEquals(notifications.size(), 1); + final Notification notification = notifications.get(0); + Assert.assertEquals(notification.getType(), NotificationType.Created); + Assert.assertEquals(notification.getPath(), key); + + } + }