From 6b0ebadfc8d457e439338b434e3d495ea1f7c356 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Thu, 22 May 2025 13:30:30 +0800 Subject: [PATCH 1/4] Fix ManagedCursorImpl.individualDeletedMessages concurrent issue --- .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 17988135c80aa..b559837d098fd 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -3260,9 +3260,12 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin */ if (getConfig().isUnackedRangesOpenCacheSetEnabled() && getConfig().isPersistIndividualAckAsLongArray()) { try { + lock.readLock().lock(); internalRanges = individualDeletedMessages.toRanges(getConfig().getMaxUnackedRangesToPersist()); } catch (Exception e) { log.warn("[{}]-{} Failed to serialize individualDeletedMessages", ledger.getName(), name, e); + } finally { + lock.readLock().unlock(); } } if (internalRanges != null && !internalRanges.isEmpty()) { From 9bd23940a85381ff9bb3ffbeace76b4b5426edd5 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Thu, 22 May 2025 13:35:45 +0800 Subject: [PATCH 2/4] Fix ManagedCursorImpl.individualDeletedMessages concurrent issue --- .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index b559837d098fd..74c5ec5a17982 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -3259,8 +3259,8 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin * and deserialization error. */ if (getConfig().isUnackedRangesOpenCacheSetEnabled() && getConfig().isPersistIndividualAckAsLongArray()) { + lock.readLock().lock(); try { - lock.readLock().lock(); internalRanges = individualDeletedMessages.toRanges(getConfig().getMaxUnackedRangesToPersist()); } catch (Exception e) { log.warn("[{}]-{} Failed to serialize individualDeletedMessages", ledger.getName(), name, e); From edbd379ae18ec4b0e6b4e345b6ff574fd5d6cc4d Mon Sep 17 00:00:00 2001 From: dao-jun Date: Thu, 22 May 2025 18:03:20 +0800 Subject: [PATCH 3/4] Fix ManagedCursorImpl.individualDeletedMessages concurrent issue --- .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 74c5ec5a17982..52a58113b4cf7 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -641,6 +641,7 @@ public void recoverIndividualDeletedMessages(PositionInfo positionInfo) { if (positionInfo.getIndividualDeletedMessagesCount() > 0) { recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList()); } else if (positionInfo.getIndividualDeletedMessageRangesCount() > 0) { + lock.writeLock().lock(); List rangeList = positionInfo.getIndividualDeletedMessageRangesList(); try { Map rangeMap = rangeList.stream().collect(Collectors.toMap(LongListMap::getKey, @@ -664,6 +665,8 @@ public void recoverIndividualDeletedMessages(PositionInfo positionInfo) { } catch (Exception e) { log.warn("[{}]-{} Failed to recover individualDeletedMessages from serialized data", ledger.getName(), name, e); + } finally { + lock.writeLock().unlock(); } } } From ec7277de6c24c8e339bc85ca40c92ea860854384 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Thu, 22 May 2025 19:13:56 +0800 Subject: [PATCH 4/4] Fix ManagedCursorImpl.individualDeletedMessages concurrent issue --- .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 52a58113b4cf7..e73699564a218 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -641,8 +641,8 @@ public void recoverIndividualDeletedMessages(PositionInfo positionInfo) { if (positionInfo.getIndividualDeletedMessagesCount() > 0) { recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList()); } else if (positionInfo.getIndividualDeletedMessageRangesCount() > 0) { - lock.writeLock().lock(); List rangeList = positionInfo.getIndividualDeletedMessageRangesList(); + lock.writeLock().lock(); try { Map rangeMap = rangeList.stream().collect(Collectors.toMap(LongListMap::getKey, list -> list.getValuesList().stream().mapToLong(i -> i).toArray()));