diff --git a/buildtools/pom.xml b/buildtools/pom.xml
index c00c351b3e273..155e826d13270 100644
--- a/buildtools/pom.xml
+++ b/buildtools/pom.xml
@@ -48,7 +48,7 @@
1.8
8
3.1.0
- 2.18.0
+ 2.25.3
1.7.32
7.7.1
3.18.0
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 722ae224f6480..08ce107eab529 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -337,10 +337,10 @@ The Apache Software License, Version 2.0
- jakarta.validation-jakarta.validation-api-2.0.2.jar
- javax.validation-validation-api-1.1.0.Final.jar
* Log4J
- - org.apache.logging.log4j-log4j-api-2.18.0.jar
- - org.apache.logging.log4j-log4j-core-2.18.0.jar
- - org.apache.logging.log4j-log4j-slf4j-impl-2.18.0.jar
- - org.apache.logging.log4j-log4j-web-2.18.0.jar
+ - org.apache.logging.log4j-log4j-api-2.25.3.jar
+ - org.apache.logging.log4j-log4j-core-2.25.3.jar
+ - org.apache.logging.log4j-log4j-slf4j-impl-2.25.3.jar
+ - org.apache.logging.log4j-log4j-web-2.25.3.jar
* Java Native Access JNA
- net.java.dev.jna-jna-jpms-5.12.1.jar
- net.java.dev.jna-jna-platform-jpms-5.12.1.jar
diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt b/distribution/shell/src/assemble/LICENSE.bin.txt
index ab487488b15d5..27ed78d07a76e 100644
--- a/distribution/shell/src/assemble/LICENSE.bin.txt
+++ b/distribution/shell/src/assemble/LICENSE.bin.txt
@@ -380,10 +380,10 @@ The Apache Software License, Version 2.0
- simpleclient_tracer_otel-0.16.0.jar
- simpleclient_tracer_otel_agent-0.16.0.jar
* Log4J
- - log4j-api-2.18.0.jar
- - log4j-core-2.18.0.jar
- - log4j-slf4j-impl-2.18.0.jar
- - log4j-web-2.18.0.jar
+ - log4j-api-2.25.3.jar
+ - log4j-core-2.25.3.jar
+ - log4j-slf4j-impl-2.25.3.jar
+ - log4j-web-2.25.3.jar
* BookKeeper
- bookkeeper-common-allocator-4.16.7.jar
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimator.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimator.java
index bb5bfe7c5f446..ed0bedf8a73d4 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimator.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimator.java
@@ -22,6 +22,7 @@
import java.util.Collection;
import java.util.Map;
import java.util.NavigableMap;
+import java.util.NoSuchElementException;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
@@ -81,17 +82,15 @@ static int internalEstimateEntryCountByBytesSize(int maxEntries, long maxSizeByt
return maxEntries;
}
- // Adjust the read position to ensure it falls within the valid range of available ledgers.
- // This handles special cases such as EARLIEST and LATEST positions by resetting them
- // to the first available ledger or the last active ledger, respectively.
- if (lastLedgerId != null && readPosition.getLedgerId() > lastLedgerId.longValue()) {
- readPosition = PositionImpl.get(lastLedgerId, Math.max(lastLedgerTotalEntries - 1, 0));
- } else if (lastLedgerId == null && readPosition.getLedgerId() > ledgersInfo.lastKey()) {
- Map.Entry lastEntry = ledgersInfo.lastEntry();
- readPosition =
- PositionImpl.get(lastEntry.getKey(), Math.max(lastEntry.getValue().getEntries() - 1, 0));
- } else if (readPosition.getLedgerId() < ledgersInfo.firstKey()) {
- readPosition = PositionImpl.get(ledgersInfo.firstKey(), 0);
+ if (ledgersInfo.isEmpty()) {
+ return 1;
+ }
+
+ try {
+ readPosition = adjustReadPosition(readPosition, ledgersInfo, lastLedgerId, lastLedgerTotalEntries);
+ } catch (NoSuchElementException e) {
+ // there was a race condition where ledgersInfo became empty just before adjustReadPosition was called
+ return 1;
}
long estimatedEntryCount = 0;
@@ -182,4 +181,28 @@ static int internalEstimateEntryCountByBytesSize(int maxEntries, long maxSizeByt
// Ensure at least one entry is always returned as the result
return Math.max((int) Math.min(estimatedEntryCount, maxEntries), 1);
}
+
+ private static Position adjustReadPosition(Position readPosition,
+ NavigableMap
+ ledgersInfo,
+ Long lastLedgerId, long lastLedgerTotalEntries) {
+ // Adjust the read position to ensure it falls within the valid range of available ledgers.
+ // This handles special cases such as EARLIEST and LATEST positions by resetting them
+ // to the first available ledger or the last active ledger, respectively.
+ if (lastLedgerId != null && readPosition.getLedgerId() > lastLedgerId.longValue()) {
+ return PositionImpl.get(lastLedgerId, Math.max(lastLedgerTotalEntries - 1, 0));
+ }
+ long lastKey = ledgersInfo.lastKey();
+ if (lastLedgerId == null && readPosition.getLedgerId() > lastKey) {
+ Map.Entry lastEntry = ledgersInfo.lastEntry();
+ if (lastEntry != null && lastEntry.getKey() == lastKey) {
+ return PositionImpl.get(lastEntry.getKey(), Math.max(lastEntry.getValue().getEntries() - 1, 0));
+ }
+ }
+ long firstKey = ledgersInfo.firstKey();
+ if (readPosition.getLedgerId() < firstKey) {
+ return PositionImpl.get(firstKey, 0);
+ }
+ return readPosition;
+ }
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimatorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimatorTest.java
index e6c77b73596e6..8477321552cdd 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimatorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimatorTest.java
@@ -19,9 +19,12 @@
package org.apache.bookkeeper.mledger.impl;
import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import java.util.HashSet;
import java.util.NavigableMap;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeMap;
import org.apache.bookkeeper.mledger.Position;
@@ -288,4 +291,41 @@ public void testMaxSizeIsLongMAX_VALUE() {
int result = estimateEntryCountByBytesSize(Long.MAX_VALUE);
assertEquals(result, maxEntries);
}
+
+ @Test
+ public void testNoLedgers() {
+ readPosition = PositionImpl.EARLIEST;
+ // remove all ledgers from ledgersInfo
+ ledgersInfo.clear();
+ int result = estimateEntryCountByBytesSize(5_000_000);
+ // expect that result is 1 because the estimation couldn't be done
+ assertEquals(result, 1);
+ }
+
+ @Test
+ public void testNoLedgersRaceFirstKey() {
+ readPosition = PositionImpl.EARLIEST;
+ // remove all ledgers from ledgersInfo
+ ledgersInfo = mock(NavigableMap.class);
+ when(ledgersInfo.isEmpty()).thenReturn(false);
+ when(ledgersInfo.firstKey()).thenThrow(NoSuchElementException.class);
+ when(ledgersInfo.lastKey()).thenReturn(1L);
+ int result = estimateEntryCountByBytesSize(5_000_000);
+ // expect that result is 1 because the estimation couldn't be done
+ assertEquals(result, 1);
+ }
+
+ @Test
+ public void testNoLedgersRaceLastKey() {
+ readPosition = PositionImpl.EARLIEST;
+ // remove all ledgers from ledgersInfo
+ ledgersInfo = mock(NavigableMap.class);
+ lastLedgerId = null;
+ when(ledgersInfo.isEmpty()).thenReturn(false);
+ when(ledgersInfo.firstKey()).thenReturn(1L);
+ when(ledgersInfo.lastKey()).thenThrow(NoSuchElementException.class);
+ int result = estimateEntryCountByBytesSize(5_000_000);
+ // expect that result is 1 because the estimation couldn't be done
+ assertEquals(result, 1);
+ }
}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 8641479df8f56..f421da6a748f7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -152,7 +152,7 @@ flexible messaging model and an intuitive client API.
7.9.2
1.7.32
4.4
- 2.18.0
+ 2.25.3
1.78.1
1.81
@@ -263,9 +263,9 @@ flexible messaging model and an intuitive client API.
3.3.2
- 1.20.4
+ 1.21.4
- 3.4.0
+ 3.7.0
2.2
5.4.0
1.1.1
diff --git a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenID.java b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenID.java
index 08b7e7928e8b2..a70f9e931173b 100644
--- a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenID.java
+++ b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenID.java
@@ -421,7 +421,6 @@ DecodedJWT verifyJWT(PublicKey publicKey,
.withAnyOfAudience(allowedAudiences)
.withClaimPresence(RegisteredClaims.ISSUED_AT)
.withClaimPresence(RegisteredClaims.EXPIRES_AT)
- .withClaimPresence(RegisteredClaims.NOT_BEFORE)
.withClaimPresence(RegisteredClaims.SUBJECT);
if (isRoleClaimNotSubject) {
diff --git a/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDTest.java b/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDTest.java
index 74abffe9c38e8..4cc3dcd46c765 100644
--- a/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDTest.java
+++ b/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.authentication.oidc;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertNull;
import com.auth0.jwt.JWT;
import com.auth0.jwt.interfaces.DecodedJWT;
@@ -174,6 +175,20 @@ public void ensureFutureNBFFails() throws Exception {
() -> basicProvider.verifyJWT(keyPair.getPublic(), SignatureAlgorithm.RS256.getValue(), jwt));
}
+ @Test
+ public void ensureWithoutNBFSucceeds() throws Exception {
+ KeyPair keyPair = Keys.keyPairFor(SignatureAlgorithm.RS256);
+ DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
+ addValidMandatoryClaims(defaultJwtBuilder, basicProviderAudience);
+ // remove "nbf" claim
+ defaultJwtBuilder.setNotBefore(null);
+ defaultJwtBuilder.signWith(keyPair.getPrivate());
+ DecodedJWT jwt = JWT.decode(defaultJwtBuilder.compact());
+ assertThat(jwt.getNotBefore()).isNull();
+ assertThat(jwt.getClaims().get("nbf")).isNull();
+ basicProvider.verifyJWT(keyPair.getPublic(), SignatureAlgorithm.RS256.getValue(), jwt);
+ }
+
@Test
public void ensureFutureIATFails() throws Exception {
KeyPair keyPair = Keys.keyPairFor(SignatureAlgorithm.RS256);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index ba9f0769e74ec..b1e8cf85f589e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -126,7 +126,7 @@
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.SafeCollectionUtils;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
-import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
+import org.apache.pulsar.common.util.collections.ConcurrentBitSet;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
@@ -3015,7 +3015,7 @@ private CompletableFuture doTransactionAcknowledgeForResponse(MessageId me
} else {
if (Commands.peerSupportsMultiMessageAcknowledgment(
getClientCnx().getRemoteEndpointProtocolVersion())) {
- List> entriesToAck =
+ List> entriesToAck =
new ArrayList<>(chunkMsgIds.length);
for (MessageIdImpl cMsgId : chunkMsgIds) {
if (cMsgId != null && chunkMsgIds.length > 1) {
@@ -3053,7 +3053,7 @@ private CompletableFuture doTransactionAcknowledgeForResponse(MessageId me
}
private ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID txnID,
- List> entries,
+ List> entries,
long requestID) {
BaseCommand cmd = newMultiMessageAckCommon(entries);
cmd.getAck()
@@ -3072,7 +3072,7 @@ protected BaseCommand initialValue() throws Exception {
}
};
- private static BaseCommand newMultiMessageAckCommon(List> entries) {
+ private static BaseCommand newMultiMessageAckCommon(List> entries) {
BaseCommand cmd = LOCAL_BASE_COMMAND.get()
.clear()
.setType(BaseCommand.Type.ACK);
@@ -3081,7 +3081,7 @@ private static BaseCommand newMultiMessageAckCommon(List pendingIndividualAcks;
@VisibleForTesting
- final ConcurrentSkipListMap pendingIndividualBatchIndexAcks;
+ final ConcurrentSkipListMap pendingIndividualBatchIndexAcks;
private final ScheduledFuture> scheduledTask;
private final boolean batchIndexAckEnabled;
@@ -133,7 +133,7 @@ public boolean isDuplicate(MessageId messageId) {
return true;
}
if (messageIdAdv.getBatchIndex() >= 0) {
- ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.get(key);
+ ConcurrentBitSet bitSet = pendingIndividualBatchIndexAcks.get(key);
return bitSet != null && !bitSet.get(messageIdAdv.getBatchIndex());
}
return false;
@@ -327,21 +327,22 @@ private CompletableFuture doCumulativeAck(MessageIdAdv messageId, Map doIndividualBatchAckAsync(MessageIdAdv msgId) {
- ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent(
+ ConcurrentBitSet bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent(
MessageIdAdvUtils.discardBatch(msgId), __ -> {
final BitSet ackSet = msgId.getAckSet();
- final ConcurrentBitSetRecyclable value;
+ final ConcurrentBitSet value;
if (ackSet != null) {
synchronized (ackSet) {
if (!ackSet.isEmpty()) {
- value = ConcurrentBitSetRecyclable.create(ackSet);
+ value = new ConcurrentBitSet();
+ value.or(ackSet);
} else {
- value = ConcurrentBitSetRecyclable.create();
+ value = new ConcurrentBitSet();
value.set(0, msgId.getBatchSize());
}
}
} else {
- value = ConcurrentBitSetRecyclable.create();
+ value = new ConcurrentBitSet();
value.set(0, msgId.getBatchSize());
}
return value;
@@ -445,7 +446,7 @@ private void flushAsync(ClientCnx cnx) {
}
// Flush all individual acks
- List> entriesToAck =
+ List> entriesToAck =
new ArrayList<>(pendingIndividualAcks.size() + pendingIndividualBatchIndexAcks.size());
if (!pendingIndividualAcks.isEmpty()) {
if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) {
@@ -487,7 +488,7 @@ private void flushAsync(ClientCnx cnx) {
}
while (true) {
- Map.Entry entry =
+ Map.Entry entry =
pendingIndividualBatchIndexAcks.pollFirstEntry();
if (entry == null) {
// The entry has been removed in a different thread
@@ -539,7 +540,7 @@ private CompletableFuture newImmediateAckAndFlush(long consumerId, Message
// cumulative ack chunk by the last messageId
if (chunkMsgIds != null && ackType != AckType.Cumulative) {
if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) {
- List> entriesToAck = new ArrayList<>(chunkMsgIds.length);
+ List> entriesToAck = new ArrayList<>(chunkMsgIds.length);
for (MessageIdImpl cMsgId : chunkMsgIds) {
if (cMsgId != null && chunkMsgIds.length > 1) {
entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null));
@@ -568,7 +569,7 @@ private CompletableFuture newMessageAckCommandAndWrite(
long entryId, BitSetRecyclable ackSet, AckType ackType,
Map properties, boolean flush,
TimedCompletableFuture timedCompletableFuture,
- List> entriesToAck) {
+ List> entriesToAck) {
if (consumer.isAckReceiptEnabled()) {
final long requestId = consumer.getClient().newRequestId();
final ByteBuf cmd;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
index ebcc99f8557f4..a3254b10949b7 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
@@ -31,8 +31,10 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
+import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -407,6 +409,54 @@ public void testDoIndividualBatchAckAsync() {
tracker.close();
}
+ @Test
+ public void testDoIndividualBatchAckNeverAffectIsDuplicate() throws Exception {
+ ConsumerConfigurationData> conf = new ConsumerConfigurationData<>();
+ conf.setMaxAcknowledgmentGroupSize(1);
+ PersistentAcknowledgmentsGroupingTracker tracker =
+ new PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup);
+
+ BatchMessageIdImpl batchMessageId0 = new BatchMessageIdImpl(5, 1, 0, 0, 10, null);
+ BatchMessageIdImpl batchMessageId1 = new BatchMessageIdImpl(5, 1, 0, 1, 10, null);
+
+ int loops = 10000;
+ int addAcknowledgmentThreadCount = 10;
+ List addAcknowledgmentThreads = new ArrayList<>(addAcknowledgmentThreadCount);
+ for (int i = 0; i < addAcknowledgmentThreadCount; i++) {
+ Thread addAcknowledgmentThread = new Thread(() -> {
+ for (int j = 0; j < loops; j++) {
+ tracker.addAcknowledgment(batchMessageId0, AckType.Individual, Collections.emptyMap());
+ }
+ }, "doIndividualBatchAck-thread-" + i);
+ addAcknowledgmentThread.start();
+ addAcknowledgmentThreads.add(addAcknowledgmentThread);
+ }
+
+ int isDuplicateThreadCount = 10;
+ AtomicBoolean assertResult = new AtomicBoolean();
+ List isDuplicateThreads = new ArrayList<>(isDuplicateThreadCount);
+ for (int i = 0; i < isDuplicateThreadCount; i++) {
+ Thread isDuplicateThread = new Thread(() -> {
+ for (int j = 0; j < loops; j++) {
+ boolean duplicate = tracker.isDuplicate(batchMessageId1);
+ assertResult.set(assertResult.get() || duplicate);
+ }
+ }, "isDuplicate-thread-" + i);
+ isDuplicateThread.start();
+ isDuplicateThreads.add(isDuplicateThread);
+ }
+
+ for (Thread addAcknowledgmentThread : addAcknowledgmentThreads) {
+ addAcknowledgmentThread.join();
+ }
+
+ for (Thread isDuplicateThread : isDuplicateThreads) {
+ isDuplicateThread.join();
+ }
+
+ assertFalse(assertResult.get());
+ }
+
public class ClientCnxTest extends ClientCnx {
public ClientCnxTest(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index d8cfceb536c73..e6161971ab19e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -105,7 +105,7 @@
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
-import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
+import org.apache.pulsar.common.util.collections.ConcurrentBitSet;
@UtilityClass
@Slf4j
@@ -970,7 +970,7 @@ public static ByteBuf newLookupErrorResponse(ServerError error, String errorMsg,
}
public static ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID txnID,
- List> entries) {
+ List> entries) {
BaseCommand cmd = newMultiMessageAckCommon(entries);
cmd.getAck()
.setConsumerId(consumerId)
@@ -980,14 +980,14 @@ public static ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID txnID
return serializeWithSize(cmd);
}
- private static BaseCommand newMultiMessageAckCommon(List> entries) {
+ private static BaseCommand newMultiMessageAckCommon(List> entries) {
BaseCommand cmd = localCmd(Type.ACK);
CommandAck ack = cmd.setAck();
int entriesCount = entries.size();
for (int i = 0; i < entriesCount; i++) {
long ledgerId = entries.get(i).getLeft();
long entryId = entries.get(i).getMiddle();
- ConcurrentBitSetRecyclable bitSet = entries.get(i).getRight();
+ ConcurrentBitSet bitSet = entries.get(i).getRight();
MessageIdData msgId = ack.addMessageId()
.setLedgerId(ledgerId)
.setEntryId(entryId);
@@ -996,7 +996,6 @@ private static BaseCommand newMultiMessageAckCommon(List> entries,
+ List> entries,
long requestId) {
BaseCommand cmd = newMultiMessageAckCommon(entries);
cmd.getAck()
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java
index 0ba409b2d7d17..d29e4b8240fde 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java
@@ -27,6 +27,7 @@
/**
* Safe multithreaded version of {@code BitSet} and leverage netty recycler.
*/
+@Deprecated
@EqualsAndHashCode(callSuper = true)
public class ConcurrentBitSetRecyclable extends ConcurrentBitSet {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 4c1f7f8d7cb9e..88b78b30a1e00 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -147,7 +147,12 @@ enum State {
Closing,
- Closed,
+ Closed;
+
+ boolean isAuthenticatedState() {
+ return this == ProxyLookupRequests
+ || this == ProxyConnectionToBroker;
+ }
}
ConnectionPool getConnectionPool() {
@@ -431,6 +436,10 @@ private void handleBrokerConnected(DirectProxyHandler directProxyHandler, Comman
final ByteBuf msg = Commands.newConnected(connected.getProtocolVersion(), maxMessageSize,
connected.hasFeatureFlags() && connected.getFeatureFlags().isSupportsTopicWatchers());
writeAndFlush(msg);
+ // Start auth refresh task only if we are not forwarding authorization credentials
+ if (!service.getConfiguration().isForwardAuthorizationCredentials()) {
+ startAuthRefreshTaskIfNotStarted();
+ }
} else {
LOG.warn("[{}] Channel is {}. ProxyConnection is in {}. "
+ "Closing connection to broker '{}'.",
@@ -512,16 +521,44 @@ protected void authChallengeSuccessCallback(AuthData authChallenge) {
}
}
+ private void startAuthRefreshTaskIfNotStarted() {
+ if (service.getConfiguration().isAuthenticationEnabled()
+ && service.getConfiguration().getAuthenticationRefreshCheckSeconds() > 0
+ && authRefreshTask == null) {
+ authRefreshTask = ctx.executor().scheduleAtFixedRate(
+ Runnables.catchingAndLoggingThrowables(
+ this::refreshAuthenticationCredentialsAndCloseIfTooExpired),
+ service.getConfiguration().getAuthenticationRefreshCheckSeconds(),
+ service.getConfiguration().getAuthenticationRefreshCheckSeconds(),
+ TimeUnit.SECONDS);
+ }
+ }
+
private void refreshAuthenticationCredentialsAndCloseIfTooExpired() {
assert ctx.executor().inEventLoop();
- if (state != State.ProxyLookupRequests) {
- // Happens when an exception is thrown that causes this connection to close.
+
+ // Only check expiration in authenticated states
+ if (!state.isAuthenticatedState()) {
return;
- } else if (!authState.isExpired()) {
+ }
+
+ if (!authState.isExpired()) {
// Credentials are still valid. Nothing to do at this point
return;
}
+ // If we are not forwarding authorization credentials to the broker, the broker cannot
+ // refresh the client's credentials. In this case, we must close the connection immediately
+ // when credentials expire.
+ if (!service.getConfiguration().isForwardAuthorizationCredentials()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[{}] Closing connection because client credentials have expired and "
+ + "forwardAuthorizationCredentials is disabled (broker cannot refresh)", remoteAddress);
+ }
+ ctx.close();
+ return;
+ }
+
if (System.nanoTime() - authChallengeSentTime
> TimeUnit.SECONDS.toNanos(service.getConfiguration().getAuthenticationRefreshCheckSeconds())) {
LOG.warn("[{}] Closing connection after timeout on refreshing auth credentials", remoteAddress);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
index 3207c2c3d6a7e..b255500c46364 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
@@ -18,13 +18,15 @@
*/
package org.apache.pulsar.proxy.server;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.spy;
-
import com.google.common.collect.Sets;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
-
import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -32,14 +34,16 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-
+import java.util.concurrent.TimeUnit;
import javax.naming.AuthenticationException;
-
+import javax.net.ssl.SSLSession;
import lombok.Cleanup;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
@@ -48,8 +52,11 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -58,214 +65,347 @@
import org.testng.annotations.Test;
public class ProxyAuthenticationTest extends ProducerConsumerBase {
- private static final Logger log = LoggerFactory.getLogger(ProxyAuthenticationTest.class);
-
- public static class BasicAuthenticationData implements AuthenticationDataProvider {
- private final String authParam;
-
- public BasicAuthenticationData(String authParam) {
- this.authParam = authParam;
- }
-
- public boolean hasDataFromCommand() {
- return true;
- }
-
- public String getCommandData() {
- return authParam;
- }
-
- public boolean hasDataForHttp() {
- return true;
- }
-
- @Override
- public Set> getHttpHeaders() {
- Map headers = new HashMap<>();
- headers.put("BasicAuthentication", authParam);
- return headers.entrySet();
- }
- }
-
- public static class BasicAuthentication implements Authentication {
-
- private String authParam;
-
- @Override
- public void close() throws IOException {
- // noop
- }
-
- @Override
- public String getAuthMethodName() {
- return "BasicAuthentication";
- }
-
- @Override
- public AuthenticationDataProvider getAuthData() throws PulsarClientException {
- try {
- return new BasicAuthenticationData(authParam);
- } catch (Exception e) {
- throw new PulsarClientException(e);
- }
- }
-
- @Override
- public void configure(Map authParams) {
- this.authParam = String.format("{\"entityType\": \"%s\", \"expiryTime\": \"%s\"}",
- authParams.get("entityType"), authParams.get("expiryTime"));
- }
-
- @Override
- public void start() throws PulsarClientException {
- // noop
- }
- }
-
- public static class BasicAuthenticationProvider implements AuthenticationProvider {
-
- @Override
- public void close() throws IOException {
- }
-
- @Override
- public void initialize(ServiceConfiguration config) throws IOException {
- }
-
- @Override
- public String getAuthMethodName() {
- return "BasicAuthentication";
- }
-
- @Override
- public CompletableFuture authenticateAsync(AuthenticationDataSource authData) {
- String commandData = null;
- if (authData.hasDataFromCommand()) {
- commandData = authData.getCommandData();
- } else if (authData.hasDataFromHttp()) {
- commandData = authData.getHttpHeader("BasicAuthentication");
- }
-
- JsonObject element = JsonParser.parseString(commandData).getAsJsonObject();
- log.info("Have log of {}", element);
- long expiryTimeInMillis = Long.parseLong(element.get("expiryTime").getAsString());
- long currentTimeInMillis = System.currentTimeMillis();
- if (expiryTimeInMillis < currentTimeInMillis) {
- log.warn("Auth failed due to timeout");
- return CompletableFuture
- .failedFuture(new AuthenticationException("Authentication data has been expired"));
- }
- final String result = element.get("entityType").getAsString();
- // Run in another thread to attempt to test the async logic
- return CompletableFuture.supplyAsync(() -> result);
- }
- }
-
- @BeforeMethod
- @Override
- protected void setup() throws Exception {
- conf.setAuthenticationEnabled(true);
- conf.setAuthorizationEnabled(true);
- conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
- // Expires after an hour
- conf.setBrokerClientAuthenticationParameters(
- "entityType:admin,expiryTime:" + (System.currentTimeMillis() + 3600 * 1000));
-
- Set superUserRoles = new HashSet<>();
- superUserRoles.add("admin");
- conf.setSuperUserRoles(superUserRoles);
-
- Set providers = new HashSet<>();
- providers.add(BasicAuthenticationProvider.class.getName());
- conf.setAuthenticationProviders(providers);
-
- conf.setClusterName("test");
- Set proxyRoles = new HashSet<>();
- proxyRoles.add("proxy");
- conf.setProxyRoles(proxyRoles);
+ private static final Logger log = LoggerFactory.getLogger(ProxyAuthenticationTest.class);
+ private static final String CLUSTER_NAME = "test";
+
+ public static class BasicAuthenticationData implements AuthenticationDataProvider {
+ private final String authParam;
+
+ public BasicAuthenticationData(String authParam) {
+ this.authParam = authParam;
+ }
+
+ public boolean hasDataFromCommand() {
+ return true;
+ }
+
+ public String getCommandData() {
+ return authParam;
+ }
+
+ public boolean hasDataForHttp() {
+ return true;
+ }
+
+ @Override
+ public Set> getHttpHeaders() {
+ Map headers = new HashMap<>();
+ headers.put("BasicAuthentication", authParam);
+ headers.put("X-Pulsar-Auth-Method-Name", "BasicAuthentication");
+ return headers.entrySet();
+ }
+ }
+
+ public static class BasicAuthentication implements Authentication {
+
+ private String authParam;
+
+ @Override
+ public void close() throws IOException {
+ // noop
+ }
+
+ @Override
+ public String getAuthMethodName() {
+ return "BasicAuthentication";
+ }
+
+ @Override
+ public AuthenticationDataProvider getAuthData() throws PulsarClientException {
+ try {
+ return new BasicAuthenticationData(authParam);
+ } catch (Exception e) {
+ throw new PulsarClientException(e);
+ }
+ }
+
+ @Override
+ public void configure(Map authParams) {
+ this.authParam = String.format("{\"entityType\": \"%s\", \"expiryTime\": \"%s\"}",
+ authParams.get("entityType"), authParams.get("expiryTime"));
+ }
+
+ @Override
+ public void start() throws PulsarClientException {
+ // noop
+ }
+ }
+
+ public static class BasicAuthenticationState implements AuthenticationState {
+ private final long expiryTimeInMillis;
+ private final String authRole;
+ private final AuthenticationDataSource authenticationDataSource;
+
+ private static boolean isExpired(long expiryTimeInMillis) {
+ return System.currentTimeMillis() > expiryTimeInMillis;
+ }
+
+ private static String[] parseAuthData(String commandData) {
+ JsonObject element = JsonParser.parseString(commandData).getAsJsonObject();
+ long expiryTimeInMillis = Long.parseLong(element.get("expiryTime").getAsString());
+ if (isExpired(expiryTimeInMillis)) {
+ throw new IllegalArgumentException("Credentials have expired");
+ }
+ String role = element.get("entityType").getAsString();
+ return new String[]{role, String.valueOf(expiryTimeInMillis)};
+ }
+
+ public BasicAuthenticationState(AuthenticationDataSource authData) {
+ this(authData.hasDataFromCommand() ? authData.getCommandData()
+ : authData.getHttpHeader("BasicAuthentication"));
+ }
+
+ public BasicAuthenticationState(AuthData authData) {
+ this(new String(authData.getBytes(), StandardCharsets.UTF_8));
+ }
+
+ private BasicAuthenticationState(String commandData) {
+ String[] parsed = parseAuthData(commandData);
+ this.authRole = parsed[0];
+ this.expiryTimeInMillis = Long.parseLong(parsed[1]);
+ this.authenticationDataSource = new AuthenticationDataCommand(commandData, null, null);
+ }
+
+ @Override
+ public String getAuthRole() {
+ return authRole;
+ }
+
+ @Override
+ public AuthData authenticate(AuthData authData) throws AuthenticationException {
+ return null; // Authentication complete
+ }
+
+ @Override
+ public CompletableFuture authenticateAsync(AuthData authData) {
+ return CompletableFuture.completedFuture(null); // Authentication complete
+ }
+
+ @Override
+ public AuthenticationDataSource getAuthDataSource() {
+ return authenticationDataSource;
+ }
+
+ @Override
+ public boolean isComplete() {
+ return authRole != null;
+ }
+
+ @Override
+ public boolean isExpired() {
+ return isExpired(expiryTimeInMillis);
+ }
+ }
+
+ public static class BasicAuthenticationProvider implements AuthenticationProvider {
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void initialize(ServiceConfiguration config) throws IOException {
+ }
+
+ @Override
+ public String getAuthMethodName() {
+ return "BasicAuthentication";
+ }
+
+ @Override
+ public AuthenticationState newAuthState(AuthData authData, SocketAddress remoteAddress, SSLSession sslSession) {
+ return new BasicAuthenticationState(authData);
+ }
+
+ @Override
+ public CompletableFuture authenticateAsync(AuthenticationDataSource authData) {
+ BasicAuthenticationState basicAuthenticationState = new BasicAuthenticationState(authData);
+ return CompletableFuture.supplyAsync(basicAuthenticationState::getAuthRole);
+ }
+ }
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ conf.setAuthenticationEnabled(true);
+ conf.setAuthorizationEnabled(true);
+ conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
+ // Expires after an hour
+ conf.setBrokerClientAuthenticationParameters(
+ "entityType:admin,expiryTime:" + (System.currentTimeMillis() + 3600 * 1000));
+
+ Set superUserRoles = new HashSet<>();
+ superUserRoles.add("admin");
+ conf.setSuperUserRoles(superUserRoles);
+
+ Set providers = new HashSet<>();
+ providers.add(BasicAuthenticationProvider.class.getName());
+ conf.setAuthenticationProviders(providers);
+
+ conf.setClusterName(CLUSTER_NAME);
+ Set proxyRoles = new HashSet<>();
+ proxyRoles.add("proxy");
+ conf.setProxyRoles(proxyRoles);
conf.setAuthenticateOriginalAuthData(true);
- super.init();
-
- updateAdminClient();
- producerBaseSetup();
- }
-
- @Override
- @AfterMethod(alwaysRun = true)
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
-
- @Test
- void testAuthentication() throws Exception {
- log.info("-- Starting {} test --", methodName);
-
- // Step 1: Create Admin Client
- updateAdminClient();
- // create a client which connects to proxy and pass authData
- String namespaceName = "my-property/my-ns";
- String topicName = "persistent://my-property/my-ns/my-topic1";
- String subscriptionName = "my-subscriber-name";
- // expires after 60 seconds
- String clientAuthParams = "entityType:client,expiryTime:" + (System.currentTimeMillis() + 60 * 1000);
- // expires after 60 seconds
- String proxyAuthParams = "entityType:proxy,expiryTime:" + (System.currentTimeMillis() + 60 * 1000);
-
- admin.namespaces().grantPermissionOnNamespace(namespaceName, "proxy",
- Sets.newHashSet(AuthAction.consume, AuthAction.produce));
- admin.namespaces().grantPermissionOnNamespace(namespaceName, "client",
- Sets.newHashSet(AuthAction.consume, AuthAction.produce));
-
- // Step 2: Try to use proxy Client as a normal Client - expect exception
- ProxyConfiguration proxyConfig = new ProxyConfiguration();
- proxyConfig.setAuthenticationEnabled(true);
- proxyConfig.setServicePort(Optional.of(0));
- proxyConfig.setBrokerProxyAllowedTargetPorts("*");
- proxyConfig.setWebServicePort(Optional.of(0));
- proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
-
- proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
- proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams);
-
- Set providers = new HashSet<>();
- providers.add(BasicAuthenticationProvider.class.getName());
- proxyConfig.setAuthenticationProviders(providers);
- proxyConfig.setForwardAuthorizationCredentials(true);
+ super.init();
+
+ updateAdminClient();
+ producerBaseSetup();
+ }
+
+ @Override
+ @AfterMethod(alwaysRun = true)
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ void testAuthentication() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ // Step 1: Create Admin Client
+ updateAdminClient();
+ // create a client which connects to proxy and pass authData
+ String namespaceName = "my-property/my-ns";
+ String topicName = "persistent://my-property/my-ns/my-topic1";
+ String subscriptionName = "my-subscriber-name";
+ // expires after 60 seconds
+ String clientAuthParams = "entityType:client,expiryTime:" + (System.currentTimeMillis() + 60 * 1000);
+ // expires after 60 seconds
+ String proxyAuthParams = "entityType:proxy,expiryTime:" + (System.currentTimeMillis() + 60 * 1000);
+
+ admin.namespaces().grantPermissionOnNamespace(namespaceName, "proxy",
+ Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+ admin.namespaces().grantPermissionOnNamespace(namespaceName, "client",
+ Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+
+ // Step 2: Try to use proxy Client as a normal Client - expect exception
+ ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ proxyConfig.setAuthenticationEnabled(true);
+ proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+ proxyConfig.setWebServicePort(Optional.of(0));
+ proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+ proxyConfig.setClusterName(CLUSTER_NAME);
+
+ proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
+ proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams);
+
+ Set providers = new HashSet<>();
+ providers.add(BasicAuthenticationProvider.class.getName());
+ proxyConfig.setAuthenticationProviders(providers);
+ proxyConfig.setForwardAuthorizationCredentials(true);
AuthenticationService authenticationService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
- @Cleanup
- final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
- proxyConfig.getBrokerClientAuthenticationParameters());
- proxyClientAuthentication.start();
- @Cleanup
- ProxyService proxyService = new ProxyService(proxyConfig, authenticationService, proxyClientAuthentication);
-
- proxyService.start();
- final String proxyServiceUrl = proxyService.getServiceUrl();
-
- // Step 3: Pass correct client params and use multiple connections
- @Cleanup
- PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientAuthParams, 3);
- proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
- proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
- proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
-
- // Step 4: Ensure that all client contexts share the same auth provider
- Assert.assertTrue(proxyService.getClientCnxs().size() >= 3, "expect at least 3 clients");
- proxyService.getClientCnxs().stream().forEach((cnx) -> {
- Assert.assertSame(cnx.authenticationProvider, proxyService.getAuthenticationService().getAuthenticationProvider("BasicAuthentication"));
- });
- }
-
- private void updateAdminClient() throws PulsarClientException {
- // Expires after an hour
- String adminAuthParams = "entityType:admin,expiryTime:" + (System.currentTimeMillis() + 3600 * 1000);
- admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
- .authentication(BasicAuthentication.class.getName(), adminAuthParams).build());
- }
-
- private PulsarClient createPulsarClient(String proxyServiceUrl, String authParams, int numberOfConnections) throws PulsarClientException {
- return PulsarClient.builder().serviceUrl(proxyServiceUrl)
- .authentication(BasicAuthentication.class.getName(), authParams).connectionsPerBroker(numberOfConnections).build();
- }
+ @Cleanup
+ final Authentication proxyClientAuthentication =
+ AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+ @Cleanup
+ ProxyService proxyService = new ProxyService(proxyConfig, authenticationService, proxyClientAuthentication);
+
+ proxyService.start();
+ final String proxyServiceUrl = proxyService.getServiceUrl();
+
+ // Step 3: Pass correct client params and use multiple connections
+ @Cleanup
+ PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientAuthParams, 3);
+ proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
+ proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
+ proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
+
+ // Step 4: Ensure that all client contexts share the same auth provider
+ Assert.assertTrue(proxyService.getClientCnxs().size() >= 3, "expect at least 3 clients");
+ proxyService.getClientCnxs().stream().forEach((cnx) -> {
+ Assert.assertSame(cnx.authenticationProvider,
+ proxyService.getAuthenticationService().getAuthenticationProvider("BasicAuthentication"));
+ });
+ }
+
+ private void updateAdminClient() throws PulsarClientException {
+ // Expires after an hour
+ String adminAuthParams = "entityType:admin,expiryTime:" + (System.currentTimeMillis() + 3600 * 1000);
+ admin.close();
+ admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
+ .authentication(BasicAuthentication.class.getName(), adminAuthParams).build());
+ }
+
+ private PulsarClient createPulsarClient(String proxyServiceUrl, String authParams, int numberOfConnections)
+ throws PulsarClientException {
+ return PulsarClient.builder().serviceUrl(proxyServiceUrl)
+ .authentication(BasicAuthentication.class.getName(), authParams)
+ .connectionsPerBroker(numberOfConnections).build();
+ }
+
+ @Test
+ void testClientDisconnectWhenCredentialsExpireWithoutForwardAuth() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ String namespaceName = "my-property/my-ns";
+ String topicName = "persistent://my-property/my-ns/my-topic1";
+
+ admin.namespaces().grantPermissionOnNamespace(namespaceName, "proxy",
+ Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+ admin.namespaces().grantPermissionOnNamespace(namespaceName, "client",
+ Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+
+ // Important: When forwardAuthorizationCredentials=false, broker should not authenticate original auth data
+ // because the proxy doesn't forward it. Set authenticateOriginalAuthData=false to match this behavior.
+ conf.setAuthenticateOriginalAuthData(false);
+
+ ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ proxyConfig.setAuthenticationEnabled(true);
+ proxyConfig.setAuthenticationRefreshCheckSeconds(2); // Check every 2 seconds
+ proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+ proxyConfig.setWebServicePort(Optional.of(0));
+ proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+ proxyConfig.setClusterName(CLUSTER_NAME);
+
+ // Proxy auth with long expiry
+ String proxyAuthParams = "entityType:proxy,expiryTime:" + (System.currentTimeMillis() + 3600 * 1000);
+ proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
+ proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams);
+
+ Set providers = new HashSet<>();
+ providers.add(BasicAuthenticationProvider.class.getName());
+ proxyConfig.setAuthenticationProviders(providers);
+ proxyConfig.setForwardAuthorizationCredentials(false);
+
+ @Cleanup
+ AuthenticationService authenticationService = new AuthenticationService(
+ PulsarConfigurationLoader.convertFrom(proxyConfig));
+ @Cleanup
+ final Authentication proxyClientAuthentication =
+ AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+ @Cleanup
+ ProxyService proxyService = new ProxyService(proxyConfig, authenticationService, proxyClientAuthentication);
+ proxyService.start();
+ final String proxyServiceUrl = proxyService.getServiceUrl();
+
+ // Create client with credentials that will expire in 3 seconds
+ long clientExpireTime = System.currentTimeMillis() + 3 * 1000;
+ String clientAuthParams = "entityType:client,expiryTime:" + clientExpireTime;
+
+ @Cleanup
+ PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientAuthParams, 1);
+
+ @Cleanup
+ var producer =
+ proxyClient.newProducer(Schema.BYTES).topic(topicName).sendTimeout(5, TimeUnit.SECONDS).create();
+ producer.send("test message".getBytes());
+
+ Awaitility.await().untilAsserted(() -> {
+ assertThatThrownBy(() -> producer.send("test message after expiry".getBytes()))
+ .isExactlyInstanceOf(PulsarClientException.TimeoutException.class);
+ });
+
+ if (producer instanceof ProducerImpl producerImpl) {
+ long lastDisconnectedTimestamp = producerImpl.getLastDisconnectedTimestamp();
+ assertThat(lastDisconnectedTimestamp).isGreaterThan(clientExpireTime);
+ }
+ }
}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
index b058e4af8301e..56a6c64395c91 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
@@ -177,37 +177,25 @@ public void testAuthDataRefresh(boolean forwardAuthData) throws Exception {
PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient;
pulsarClient.getPartitionsForTopic(topic).get();
- Set> connections = pulsarClientImpl.getCnxPool().getConnections();
- Awaitility.await().during(5, SECONDS).untilAsserted(() -> {
- pulsarClient.getPartitionsForTopic(topic).get();
- assertTrue(connections.stream().allMatch(n -> {
- try {
- ClientCnx clientCnx = n.get();
- long timestamp = clientCnx.getLastDisconnectedTimestamp();
- return timestamp == 0;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }));
- });
+ // Verify initial connection state
+ Set> connections = pulsarClientImpl.getCnxPool().getConnections();
- // Force all connections from proxy to broker to close and therefore require the proxy to re-authenticate with
- // the broker. (The client doesn't lose this connection.)
- restartBroker();
-
- // Rerun assertion to ensure that it still works
- Awaitility.await().during(5, SECONDS).untilAsserted(() -> {
- pulsarClient.getPartitionsForTopic(topic).get();
- assertTrue(connections.stream().allMatch(n -> {
- try {
- ClientCnx clientCnx = n.get();
- long timestamp = clientCnx.getLastDisconnectedTimestamp();
- return timestamp == 0;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }));
- });
+ Awaitility.await()
+ .during(5, SECONDS)
+ .untilAsserted(() -> {
+ for (CompletableFuture cf : connections) {
+ try {
+ ClientCnx clientCnx = cf.get();
+ long timestamp = clientCnx.getLastDisconnectedTimestamp();
+ // If forwardAuthData is false, the broker cannot see the client's authentication data.
+ // As a result, the broker cannot perform any refresh operations on the client's auth data.
+ // Only the proxy has visibility of the client's connection state.
+ assertTrue(forwardAuthData ? timestamp == 0 : timestamp > 0);
+ } catch (Exception e) {
+ throw new AssertionError("Failed to get connection state", e);
+ }
+ }
+ });
}
}
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index 8db1c31264525..5724e64120b8a 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -347,9 +347,9 @@ The Apache Software License, Version 2.0
- leveldb-0.12.jar
- leveldb-api-0.12.jar
* Log4j
- - log4j-api-2.18.0.jar
- - log4j-core-2.18.0.jar
- - log4j-slf4j-impl-2.18.0.jar
+ - log4j-api-2.25.3.jar
+ - log4j-core-2.25.3.jar
+ - log4j-slf4j-impl-2.25.3.jar
* Log4j implemented over SLF4J
- log4j-over-slf4j-1.7.32.jar
* Lucene Common Analyzers