From 79321b42e7e563365576d315d6a3f9b8e013e93a Mon Sep 17 00:00:00 2001 From: Jiawen Wang <2876645134@qq.com> Date: Wed, 9 Oct 2024 16:40:37 +0800 Subject: [PATCH] [fix][ml][PIP-327] fix recover from ledger when ledgerForceRecovery is true --- .../mledger/impl/ManagedCursorImpl.java | 8 +++--- .../mledger/impl/ManagedCursorTest.java | 28 +++++++++++++++++++ 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index f469b88cae8e6..7c0d13108b1c4 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -549,10 +549,10 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac if (log.isInfoEnabled()) { log.info("[{}] Opened ledger {} for cursor {}. rc={}", ledger.getName(), ledgerId, name, rc); } - if (isBkErrorNotRecoverable(rc) || ledgerForceRecovery) { + if (isBkErrorNotRecoverable(rc) || (rc != BKException.Code.OK && ledgerForceRecovery)) { log.error("[{}] Error opening metadata ledger {} for cursor {}: {}", ledger.getName(), ledgerId, name, BKException.getMessage(rc)); - // Rewind to oldest entry available + // Rewind to the oldest entry available initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback); return; } else if (rc != BKException.Code.OK) { @@ -577,10 +577,10 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac if (log.isDebugEnabled()) { log.debug("[{}} readComplete rc={} entryId={}", ledger.getName(), rc1, lh1.getLastAddConfirmed()); } - if (isBkErrorNotRecoverable(rc1) || ledgerForceRecovery) { + if (isBkErrorNotRecoverable(rc1) || (rc1 != BKException.Code.OK && ledgerForceRecovery)) { log.error("[{}] Error reading from metadata ledger {} for cursor {}: {}", ledger.getName(), ledgerId, name, BKException.getMessage(rc1)); - // Rewind to oldest entry available + // Rewind to the oldest entry available initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback); return; } else if (rc1 != BKException.Code.OK) { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 8ae5a04a507b1..587f87a7d1d38 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -1255,6 +1255,34 @@ void cursorPersistence() throws Exception { assertEquals(c2.getMarkDeletedPosition(), p2); } + @Test(timeOut = 20000) + public void cursorPersistenceWithLedgerForceRecovery() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setLedgerForceRecovery(true); + + ManagedLedger ledger = factory.open("my_test_ledger", config); + ManagedCursor c1 = ledger.openCursor("c1"); + ledger.addEntry("dummy-entry-1".getBytes(Encoding)); + ledger.addEntry("dummy-entry-2".getBytes(Encoding)); + ledger.addEntry("dummy-entry-3".getBytes(Encoding)); + + List entries = c1.readEntries(2); + Position p1 = entries.get(1).getPosition(); + c1.markDelete(p1); + entries.forEach(Entry::release); + + entries = c1.readEntries(1); + entries.forEach(Entry::release); + + // Reopen + @Cleanup("shutdown") + ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); + ledger = factory2.open("my_test_ledger", config); + c1 = ledger.openCursor("c1"); + + assertEquals(c1.getMarkDeletedPosition(), p1); + } + @Test(timeOut = 20000) void cursorPersistence2() throws Exception { ManagedLedger ledger = factory.open("my_test_ledger",