Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion buildtools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.release>8</maven.compiler.release>
<surefire.version>3.1.0</surefire.version>
<log4j2.version>2.18.0</log4j2.version>
<log4j2.version>2.25.3</log4j2.version>
<slf4j.version>1.7.32</slf4j.version>
<testng.version>7.7.1</testng.version>
<commons-lang3.version>3.18.0</commons-lang3.version>
Expand Down
8 changes: 4 additions & 4 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,10 @@ The Apache Software License, Version 2.0
- jakarta.validation-jakarta.validation-api-2.0.2.jar
- javax.validation-validation-api-1.1.0.Final.jar
* Log4J
- org.apache.logging.log4j-log4j-api-2.18.0.jar
- org.apache.logging.log4j-log4j-core-2.18.0.jar
- org.apache.logging.log4j-log4j-slf4j-impl-2.18.0.jar
- org.apache.logging.log4j-log4j-web-2.18.0.jar
- org.apache.logging.log4j-log4j-api-2.25.3.jar
- org.apache.logging.log4j-log4j-core-2.25.3.jar
- org.apache.logging.log4j-log4j-slf4j-impl-2.25.3.jar
- org.apache.logging.log4j-log4j-web-2.25.3.jar
* Java Native Access JNA
- net.java.dev.jna-jna-jpms-5.12.1.jar
- net.java.dev.jna-jna-platform-jpms-5.12.1.jar
Expand Down
8 changes: 4 additions & 4 deletions distribution/shell/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -380,10 +380,10 @@ The Apache Software License, Version 2.0
- simpleclient_tracer_otel-0.16.0.jar
- simpleclient_tracer_otel_agent-0.16.0.jar
* Log4J
- log4j-api-2.18.0.jar
- log4j-core-2.18.0.jar
- log4j-slf4j-impl-2.18.0.jar
- log4j-web-2.18.0.jar
- log4j-api-2.25.3.jar
- log4j-core-2.25.3.jar
- log4j-slf4j-impl-2.25.3.jar
- log4j-web-2.25.3.jar

* BookKeeper
- bookkeeper-common-allocator-4.16.7.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Collection;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NoSuchElementException;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
Expand Down Expand Up @@ -81,17 +82,15 @@ static int internalEstimateEntryCountByBytesSize(int maxEntries, long maxSizeByt
return maxEntries;
}

// Adjust the read position to ensure it falls within the valid range of available ledgers.
// This handles special cases such as EARLIEST and LATEST positions by resetting them
// to the first available ledger or the last active ledger, respectively.
if (lastLedgerId != null && readPosition.getLedgerId() > lastLedgerId.longValue()) {
readPosition = PositionImpl.get(lastLedgerId, Math.max(lastLedgerTotalEntries - 1, 0));
} else if (lastLedgerId == null && readPosition.getLedgerId() > ledgersInfo.lastKey()) {
Map.Entry<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> lastEntry = ledgersInfo.lastEntry();
readPosition =
PositionImpl.get(lastEntry.getKey(), Math.max(lastEntry.getValue().getEntries() - 1, 0));
} else if (readPosition.getLedgerId() < ledgersInfo.firstKey()) {
readPosition = PositionImpl.get(ledgersInfo.firstKey(), 0);
if (ledgersInfo.isEmpty()) {
return 1;
}

try {
readPosition = adjustReadPosition(readPosition, ledgersInfo, lastLedgerId, lastLedgerTotalEntries);
} catch (NoSuchElementException e) {
// there was a race condition where ledgersInfo became empty just before adjustReadPosition was called
return 1;
}

long estimatedEntryCount = 0;
Expand Down Expand Up @@ -182,4 +181,28 @@ static int internalEstimateEntryCountByBytesSize(int maxEntries, long maxSizeByt
// Ensure at least one entry is always returned as the result
return Math.max((int) Math.min(estimatedEntryCount, maxEntries), 1);
}

private static Position adjustReadPosition(Position readPosition,
NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo>
ledgersInfo,
Long lastLedgerId, long lastLedgerTotalEntries) {
// Adjust the read position to ensure it falls within the valid range of available ledgers.
// This handles special cases such as EARLIEST and LATEST positions by resetting them
// to the first available ledger or the last active ledger, respectively.
if (lastLedgerId != null && readPosition.getLedgerId() > lastLedgerId.longValue()) {
return PositionImpl.get(lastLedgerId, Math.max(lastLedgerTotalEntries - 1, 0));
}
long lastKey = ledgersInfo.lastKey();
if (lastLedgerId == null && readPosition.getLedgerId() > lastKey) {
Map.Entry<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> lastEntry = ledgersInfo.lastEntry();
if (lastEntry != null && lastEntry.getKey() == lastKey) {
return PositionImpl.get(lastEntry.getKey(), Math.max(lastEntry.getValue().getEntries() - 1, 0));
}
}
long firstKey = ledgersInfo.firstKey();
if (readPosition.getLedgerId() < firstKey) {
return PositionImpl.get(firstKey, 0);
}
return readPosition;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
package org.apache.bookkeeper.mledger.impl;

import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import java.util.HashSet;
import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeMap;
import org.apache.bookkeeper.mledger.Position;
Expand Down Expand Up @@ -288,4 +291,41 @@ public void testMaxSizeIsLongMAX_VALUE() {
int result = estimateEntryCountByBytesSize(Long.MAX_VALUE);
assertEquals(result, maxEntries);
}

@Test
public void testNoLedgers() {
readPosition = PositionImpl.EARLIEST;
// remove all ledgers from ledgersInfo
ledgersInfo.clear();
int result = estimateEntryCountByBytesSize(5_000_000);
// expect that result is 1 because the estimation couldn't be done
assertEquals(result, 1);
}

@Test
public void testNoLedgersRaceFirstKey() {
readPosition = PositionImpl.EARLIEST;
// remove all ledgers from ledgersInfo
ledgersInfo = mock(NavigableMap.class);
when(ledgersInfo.isEmpty()).thenReturn(false);
when(ledgersInfo.firstKey()).thenThrow(NoSuchElementException.class);
when(ledgersInfo.lastKey()).thenReturn(1L);
int result = estimateEntryCountByBytesSize(5_000_000);
// expect that result is 1 because the estimation couldn't be done
assertEquals(result, 1);
}

@Test
public void testNoLedgersRaceLastKey() {
readPosition = PositionImpl.EARLIEST;
// remove all ledgers from ledgersInfo
ledgersInfo = mock(NavigableMap.class);
lastLedgerId = null;
when(ledgersInfo.isEmpty()).thenReturn(false);
when(ledgersInfo.firstKey()).thenReturn(1L);
when(ledgersInfo.lastKey()).thenThrow(NoSuchElementException.class);
int result = estimateEntryCountByBytesSize(5_000_000);
// expect that result is 1 because the estimation couldn't be done
assertEquals(result, 1);
}
}
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ flexible messaging model and an intuitive client API.</description>
<rocksdb.version>7.9.2</rocksdb.version>
<slf4j.version>1.7.32</slf4j.version>
<commons.collections4.version>4.4</commons.collections4.version>
<log4j2.version>2.18.0</log4j2.version>
<log4j2.version>2.25.3</log4j2.version>
<!-- bouncycastle dependencies aren't necessarily aligned -->
<bouncycastle.bcprov-jdk18on.version>1.78.1</bouncycastle.bcprov-jdk18on.version>
<bouncycastle.bcpkix-jdk18on.version>1.81</bouncycastle.bcpkix-jdk18on.version>
Expand Down Expand Up @@ -263,9 +263,9 @@ flexible messaging model and an intuitive client API.</description>
<failsafe.version>3.3.2</failsafe.version>

<!-- test dependencies -->
<testcontainers.version>1.20.4</testcontainers.version>
<testcontainers.version>1.21.4</testcontainers.version>
<!-- Set docker-java.version to the version of docker-java used in org.testcontainers:testcontainers pom -->
<docker-java.version>3.4.0</docker-java.version>
<docker-java.version>3.7.0</docker-java.version>
<hamcrest.version>2.2</hamcrest.version>
<restassured.version>5.4.0</restassured.version>
<kerby.version>1.1.1</kerby.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,6 @@ DecodedJWT verifyJWT(PublicKey publicKey,
.withAnyOfAudience(allowedAudiences)
.withClaimPresence(RegisteredClaims.ISSUED_AT)
.withClaimPresence(RegisteredClaims.EXPIRES_AT)
.withClaimPresence(RegisteredClaims.NOT_BEFORE)
.withClaimPresence(RegisteredClaims.SUBJECT);

if (isRoleClaimNotSubject) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.authentication.oidc;

import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertNull;
import com.auth0.jwt.JWT;
import com.auth0.jwt.interfaces.DecodedJWT;
Expand Down Expand Up @@ -174,6 +175,20 @@ public void ensureFutureNBFFails() throws Exception {
() -> basicProvider.verifyJWT(keyPair.getPublic(), SignatureAlgorithm.RS256.getValue(), jwt));
}

@Test
public void ensureWithoutNBFSucceeds() throws Exception {
KeyPair keyPair = Keys.keyPairFor(SignatureAlgorithm.RS256);
DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
addValidMandatoryClaims(defaultJwtBuilder, basicProviderAudience);
// remove "nbf" claim
defaultJwtBuilder.setNotBefore(null);
defaultJwtBuilder.signWith(keyPair.getPrivate());
DecodedJWT jwt = JWT.decode(defaultJwtBuilder.compact());
assertThat(jwt.getNotBefore()).isNull();
assertThat(jwt.getClaims().get("nbf")).isNull();
basicProvider.verifyJWT(keyPair.getPublic(), SignatureAlgorithm.RS256.getValue(), jwt);
}

@Test
public void ensureFutureIATFails() throws Exception {
KeyPair keyPair = Keys.keyPairFor(SignatureAlgorithm.RS256);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.SafeCollectionUtils;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentBitSet;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
Expand Down Expand Up @@ -3015,7 +3015,7 @@ private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId me
} else {
if (Commands.peerSupportsMultiMessageAcknowledgment(
getClientCnx().getRemoteEndpointProtocolVersion())) {
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck =
List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck =
new ArrayList<>(chunkMsgIds.length);
for (MessageIdImpl cMsgId : chunkMsgIds) {
if (cMsgId != null && chunkMsgIds.length > 1) {
Expand Down Expand Up @@ -3053,7 +3053,7 @@ private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId me
}

private ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID txnID,
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries,
List<Triple<Long, Long, ConcurrentBitSet>> entries,
long requestID) {
BaseCommand cmd = newMultiMessageAckCommon(entries);
cmd.getAck()
Expand All @@ -3072,7 +3072,7 @@ protected BaseCommand initialValue() throws Exception {
}
};

private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries) {
private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, ConcurrentBitSet>> entries) {
BaseCommand cmd = LOCAL_BASE_COMMAND.get()
.clear()
.setType(BaseCommand.Type.ACK);
Expand All @@ -3081,7 +3081,7 @@ private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, Conc
for (int i = 0; i < entriesCount; i++) {
long ledgerId = entries.get(i).getLeft();
long entryId = entries.get(i).getMiddle();
ConcurrentBitSetRecyclable bitSet = entries.get(i).getRight();
ConcurrentBitSet bitSet = entries.get(i).getRight();
MessageIdData msgId = ack.addMessageId()
.setLedgerId(ledgerId)
.setEntryId(entryId);
Expand All @@ -3090,7 +3090,6 @@ private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, Conc
for (int j = 0; j < ackSet.length; j++) {
msgId.addAckSet(ackSet[j]);
}
bitSet.recycle();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentBitSet;

/**
* Group the acknowledgements for a certain time and then sends them out in a single protobuf command.
Expand Down Expand Up @@ -83,7 +83,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
*/
private final ConcurrentSkipListSet<MessageIdAdv> pendingIndividualAcks;
@VisibleForTesting
final ConcurrentSkipListMap<MessageIdAdv, ConcurrentBitSetRecyclable> pendingIndividualBatchIndexAcks;
final ConcurrentSkipListMap<MessageIdAdv, ConcurrentBitSet> pendingIndividualBatchIndexAcks;

private final ScheduledFuture<?> scheduledTask;
private final boolean batchIndexAckEnabled;
Expand Down Expand Up @@ -133,7 +133,7 @@ public boolean isDuplicate(MessageId messageId) {
return true;
}
if (messageIdAdv.getBatchIndex() >= 0) {
ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.get(key);
ConcurrentBitSet bitSet = pendingIndividualBatchIndexAcks.get(key);
return bitSet != null && !bitSet.get(messageIdAdv.getBatchIndex());
}
return false;
Expand Down Expand Up @@ -327,21 +327,22 @@ private CompletableFuture<Void> doCumulativeAck(MessageIdAdv messageId, Map<Stri

@VisibleForTesting
CompletableFuture<Void> doIndividualBatchAckAsync(MessageIdAdv msgId) {
ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent(
ConcurrentBitSet bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent(
MessageIdAdvUtils.discardBatch(msgId), __ -> {
final BitSet ackSet = msgId.getAckSet();
final ConcurrentBitSetRecyclable value;
final ConcurrentBitSet value;
if (ackSet != null) {
synchronized (ackSet) {
if (!ackSet.isEmpty()) {
value = ConcurrentBitSetRecyclable.create(ackSet);
value = new ConcurrentBitSet();
value.or(ackSet);
} else {
value = ConcurrentBitSetRecyclable.create();
value = new ConcurrentBitSet();
value.set(0, msgId.getBatchSize());
}
}
} else {
value = ConcurrentBitSetRecyclable.create();
value = new ConcurrentBitSet();
value.set(0, msgId.getBatchSize());
}
return value;
Expand Down Expand Up @@ -445,7 +446,7 @@ private void flushAsync(ClientCnx cnx) {
}

// Flush all individual acks
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck =
List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck =
new ArrayList<>(pendingIndividualAcks.size() + pendingIndividualBatchIndexAcks.size());
if (!pendingIndividualAcks.isEmpty()) {
if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) {
Expand Down Expand Up @@ -487,7 +488,7 @@ private void flushAsync(ClientCnx cnx) {
}

while (true) {
Map.Entry<MessageIdAdv, ConcurrentBitSetRecyclable> entry =
Map.Entry<MessageIdAdv, ConcurrentBitSet> entry =
pendingIndividualBatchIndexAcks.pollFirstEntry();
if (entry == null) {
// The entry has been removed in a different thread
Expand Down Expand Up @@ -539,7 +540,7 @@ private CompletableFuture<Void> newImmediateAckAndFlush(long consumerId, Message
// cumulative ack chunk by the last messageId
if (chunkMsgIds != null && ackType != AckType.Cumulative) {
if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) {
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = new ArrayList<>(chunkMsgIds.length);
List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck = new ArrayList<>(chunkMsgIds.length);
for (MessageIdImpl cMsgId : chunkMsgIds) {
if (cMsgId != null && chunkMsgIds.length > 1) {
entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null));
Expand Down Expand Up @@ -568,7 +569,7 @@ private CompletableFuture<Void> newMessageAckCommandAndWrite(
long entryId, BitSetRecyclable ackSet, AckType ackType,
Map<String, Long> properties, boolean flush,
TimedCompletableFuture<Void> timedCompletableFuture,
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck) {
List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck) {
if (consumer.isAckReceiptEnabled()) {
final long requestId = consumer.getClient().newRequestId();
final ByteBuf cmd;
Expand Down
Loading
Loading