From 307edb6f86882ef7685508a55203e3f029578af8 Mon Sep 17 00:00:00 2001 From: fanjianye Date: Mon, 10 Apr 2023 20:42:54 +0800 Subject: [PATCH] impl respond to client when txn become committing --- .../TransactionMetadataStoreService.java | 47 ++++++++++++++---- ...AggregatedTransactionCoordinatorStats.java | 3 ++ .../prometheus/TransactionAggregator.java | 4 ++ .../TransactionMetadataStoreServiceTest.java | 7 ++- .../broker/stats/TransactionMetricsTest.java | 4 ++ .../broker/transaction/TransactionTest.java | 34 ++++++++++++- .../client/impl/TransactionEndToEndTest.java | 49 ++++++++++++++++++- .../coordinator/TransactionMetadataStore.java | 2 + .../impl/InMemTransactionMetadataStore.java | 8 ++- .../impl/MLTransactionMetadataStore.java | 8 +++ .../impl/TransactionMetadataStoreStats.java | 3 ++ 11 files changed, 155 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java index 3e3b044ec51b8..df51e4a69f173 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java @@ -338,25 +338,50 @@ public void endTransaction(TxnID txnID, int txnAction, boolean isTimeout, getTxnMeta(txnID) .thenCompose(txnMeta -> { if (txnMeta.status() == TxnStatus.OPEN) { - return updateTxnStatus(txnID, newStatus, TxnStatus.OPEN, isTimeout) - .thenCompose(__ -> endTxnInTransactionBuffer(txnID, txnAction)); + return updateTxnStatus(txnID, newStatus, TxnStatus.OPEN, isTimeout); } - return fakeAsyncCheckTxnStatus(txnMeta.status(), txnAction, txnID, newStatus) - .thenCompose(__ -> endTxnInTransactionBuffer(txnID, txnAction)); + return fakeAsyncCheckTxnStatus(txnMeta.status(), txnAction, txnID, newStatus); }).whenComplete((__, ex)-> { if (ex == null) { future.complete(null); + endTxnInTransactionBuffer(txnID, txnAction) + .whenComplete((__2, ex2)-> { + if (ex2 == null) { + return; + } + Throwable realCause = FutureUtil.unwrapCompletionException(ex2); + if (!isRetryableException(realCause)) { + LOG.error("End transaction EndTxnInTransactionBuffer fail! TxnId : {}, " + + "TxnAction : {}", txnID, txnAction, realCause); + // error for committing -> committed, this case can not occur + // or this txn would stay in committing until TC restart. + // So add a metric for reminding this case. + TransactionMetadataStore store = getStore(txnID); + if (store != null) { + store.incrementNonRetryableCount(); + } + return; + } + if (LOG.isDebugEnabled()) { + LOG.debug("End transaction EndTxnInTransactionBuffer retry! " + + "TxnId : {}, TxnAction : {}", txnID, txnAction, realCause); + } + transactionOpRetryTimer.newTimeout(timeout -> + endTransaction(txnID, txnAction, isTimeout, future), + endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS); + }); return; } - if (!isRetryableException(ex)) { - LOG.error("End transaction fail! TxnId : {}, " - + "TxnAction : {}", txnID, txnAction, ex); + Throwable realCause = FutureUtil.unwrapCompletionException(ex); + if (!isRetryableException(realCause)) { + LOG.error("End transaction UpdateTxnStatus fail! TxnId : {}, " + + "TxnAction : {}", txnID, txnAction, realCause); future.completeExceptionally(ex); return; } if (LOG.isDebugEnabled()) { - LOG.debug("EndTxnInTransactionBuffer retry! TxnId : {}, " - + "TxnAction : {}", txnID, txnAction, ex); + LOG.debug("End transaction UpdateTxnStatus retry! TxnId : {}, " + + "TxnAction : {}", txnID, txnAction, realCause); } transactionOpRetryTimer.newTimeout(timeout -> endTransaction(txnID, txnAction, isTimeout, future), @@ -484,6 +509,10 @@ public Map getStores() { return Collections.unmodifiableMap(stores); } + private TransactionMetadataStore getStore(TxnID txnID) { + return stores.get(new TransactionCoordinatorID(txnID.getMostSigBits())); + } + public CompletableFuture verifyTxnOwnership(TxnID txnID, String checkOwner) { return getTxnMeta(txnID) .thenCompose(meta -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedTransactionCoordinatorStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedTransactionCoordinatorStats.java index f417715ffd619..c0cfe386dc575 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedTransactionCoordinatorStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedTransactionCoordinatorStats.java @@ -32,6 +32,8 @@ public class AggregatedTransactionCoordinatorStats { public long appendLogCount; + public long nonRetryableCount; + public long[] executionLatency; public void reset() { @@ -41,6 +43,7 @@ public void reset() { createdCount = 0; timeoutCount = 0; appendLogCount = 0; + nonRetryableCount = 0; executionLatency = null; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java index 3da061f6ffef2..d92fb27e9b360 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java @@ -96,6 +96,8 @@ public static void generate(PulsarService pulsar, PrometheusMetricStreams stream transactionMetadataStoreStats.getTimeoutCount(); transactionCoordinatorStats.appendLogCount = transactionMetadataStoreStats.getAppendLogCount(); + transactionCoordinatorStats.nonRetryableCount = + transactionMetadataStoreStats.getNonRetryableCount(); transactionMetadataStoreStats.executionLatencyBuckets.refresh(); transactionCoordinatorStats.executionLatency = transactionMetadataStoreStats.executionLatencyBuckets.getBuckets(); @@ -251,6 +253,8 @@ static void printTransactionCoordinatorStats(PrometheusMetricStreams stream, Str coordinatorId); writeMetric(stream, "pulsar_txn_append_log_total", stats.appendLogCount, cluster, coordinatorId); + writeMetric(stream, "pulsar_txn_non_retryable_total", stats.nonRetryableCount, cluster, + coordinatorId); long[] latencyBuckets = stats.executionLatency; writeMetric(stream, "pulsar_txn_execution_latency_le_10", latencyBuckets[0], cluster, coordinatorId); writeMetric(stream, "pulsar_txn_execution_latency_le_20", latencyBuckets[1], cluster, coordinatorId); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java index 5cd3ed9f90454..b657494116d36 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java @@ -362,12 +362,15 @@ public void testEndTransactionOpRetry(TxnStatus txnStatus) throws Exception { field.set(transactionMetadataStore, TransactionMetadataStoreState.State.None); CompletableFuture completableFuture = null; try { + // since server can respond to client when txn become committing, testcase:committing would not fail completableFuture = pulsar.getTransactionMetadataStoreService().endTransaction(txnID, TxnAction.COMMIT.getValue(), false); completableFuture.get(5, TimeUnit.SECONDS); - fail(); + if (txnStatus != TxnStatus.COMMITTING) { + fail(); + } } catch (Exception e) { - if (txnStatus == TxnStatus.OPEN || txnStatus == TxnStatus.COMMITTING) { + if (txnStatus == TxnStatus.OPEN) { assertTrue(e instanceof TimeoutException); } else if (txnStatus == TxnStatus.ABORTING) { assertTrue(e.getCause() instanceof CoordinatorException.InvalidTxnStatusException); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java index 4d38f5fad5141..5834801fdba42 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java @@ -227,6 +227,10 @@ public void testTransactionCoordinatorRateMetrics() throws Exception { assertEquals(metric.size(), 1); metric.forEach(item -> assertEquals(item.value, txnCount * 4 + 3)); + metric = metrics.get("pulsar_txn_non_retryable_total"); + assertEquals(metric.size(), 1); + metric.forEach(item -> assertEquals(item.value, 0)); + metric = metrics.get("pulsar_txn_execution_latency_le_5000"); assertEquals(metric.size(), 1); metric.forEach(item -> assertEquals(item.value, 1)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index c3533e70cf8be..4767660c7aaad 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -127,6 +127,7 @@ import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.client.impl.transaction.TransactionImpl; +import org.apache.pulsar.common.api.proto.TxnAction; import org.apache.pulsar.common.events.EventType; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; @@ -143,6 +144,7 @@ import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker; import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker; +import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; @@ -649,6 +651,17 @@ public void testMaxReadPositionForNormalPublish() throws Exception { Assert.assertEquals(position2.getLedgerId(), messageId.getLedgerId()); Assert.assertEquals(position2.getEntryId(), messageId.getEntryId()); transaction.commit().get(); + // open->committing after client finish commit(), need to wait for server committing->committed + Transaction finishTxn = transaction; + try { + Awaitility.await().until(() -> pulsarServiceList.get(0) + .getTransactionMetadataStoreService().getTxnMeta(finishTxn.getTxnID()).get() == null); + } catch (Exception e) { + if (!(e.getCause() instanceof CoordinatorException.TransactionNotFoundException)) { + fail(); + } + } + PositionImpl position3 = topicTransactionBuffer.getMaxReadPosition(); Assert.assertEquals(position3.getLedgerId(), messageId2.getLedgerId()); @@ -1409,6 +1422,16 @@ public void testPendingAckBatchMessageCommit() throws Exception { // abort the txn2 txn2.abort().get(); + Transaction finishTxn = txn2; + try { + Awaitility.await().until(() -> pulsarServiceList.get(0) + .getTransactionMetadataStoreService().getTxnMeta(finishTxn.getTxnID()).get() == null); + } catch (Exception e) { + if (!(e.getCause() instanceof CoordinatorException.TransactionNotFoundException)) { + fail(); + } + } + Transaction txn3 = pulsarClient.newTransaction() .withTransactionTimeout(10, TimeUnit.MINUTES).build().get(); // repeat ack the second message, can ack successful @@ -1601,7 +1624,16 @@ public void testGetTxnState() throws Exception { transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.SECONDS) .build().get(); pulsarServiceList.get(0).getTransactionMetadataStoreService() - .endTransaction(transaction.getTxnID(), 0, false); + .endTransaction(transaction.getTxnID(), TxnAction.COMMIT_VALUE, false).get(); + Transaction finishTxn = transaction; + try { + Awaitility.await().until(() -> pulsarServiceList.get(0) + .getTransactionMetadataStoreService().getTxnMeta(finishTxn.getTxnID()).get() == null); + } catch (Exception e) { + if (!(e.getCause() instanceof CoordinatorException.TransactionNotFoundException)) { + fail(); + } + } transaction.commit(); Transaction errorTxn = transaction; Awaitility.await().until(() -> errorTxn.getState() == Transaction.State.ERROR); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java index 83feaa3ac1158..c360d395b8bcf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java @@ -78,6 +78,7 @@ import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; import org.apache.pulsar.transaction.coordinator.TransactionSubscription; +import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -702,6 +703,43 @@ public void txnMessageAckTest() throws Exception { log.info("receive transaction messages count: {}", receiveCnt); } + @Test + public void txnTestCommitTwice() throws Exception { + Transaction commitTxn = getTxn(); + + commitTxn.commit().get(); + Field field = TransactionImpl.class.getDeclaredField("state"); + field.setAccessible(true); + field.set(commitTxn, TransactionImpl.State.OPEN); + // since first commit not completely finish in server, second commit can succeed + try { + commitTxn.commit().get(); + } catch (Exception reCommitError) { + fail("recommit one not-finished transaction should not be failed."); + } + + // since first commit completely finish transaction in server, second commit would throw txnNotFoundException + Transaction finishTxn = commitTxn; + try { + Awaitility.await().until(() -> pulsarServiceList.get(0) + .getTransactionMetadataStoreService().getTxnMeta(finishTxn.getTxnID()).get() == null); + } catch (Exception e) { + if (!(e.getCause() instanceof CoordinatorException.TransactionNotFoundException)) { + fail(); + } + } + field.setAccessible(true); + field.set(commitTxn, TransactionImpl.State.OPEN); + try { + commitTxn.commit().get(); + fail("recommit one finished transaction should be failed."); + } catch (Exception reCommitError) { + log.info("expected exception for recommit one transaction."); + Assert.assertNotNull(reCommitError); + Assert.assertTrue(reCommitError.getCause() instanceof TransactionNotFoundException); + } + } + @Test public void txnAckTestBatchAndCumulativeSub() throws Exception { txnCumulativeAckTest(true, 200, SubscriptionType.Failover); @@ -785,12 +823,21 @@ private void txnCumulativeAckTest(boolean batchEnable, int maxBatchSize, Subscri } commitTxn.commit().get(); + Transaction finishTxn = commitTxn; + try { + Awaitility.await().until(() -> pulsarServiceList.get(0) + .getTransactionMetadataStoreService().getTxnMeta(finishTxn.getTxnID()).get() == null); + } catch (Exception e) { + if (!(e.getCause() instanceof CoordinatorException.TransactionNotFoundException)) { + fail(); + } + } Field field = TransactionImpl.class.getDeclaredField("state"); field.setAccessible(true); field.set(commitTxn, TransactionImpl.State.OPEN); try { commitTxn.commit().get(); - fail("recommit one transaction should be failed."); + fail("recommit one finished transaction should be failed."); } catch (Exception reCommitError) { // recommit one transaction should be failed log.info("expected exception for recommit one transaction."); diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java index ff5adb4d409c7..66beacd7c6d17 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java @@ -139,4 +139,6 @@ default long getLowWaterMark() { * @return {@link TxnMeta} the txnMetas of slow transactions */ List getSlowTransactions(long timeout); + + void incrementNonRetryableCount(); } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java index 0f3c5e42d7a69..e8eec702fc4a5 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java @@ -48,6 +48,7 @@ class InMemTransactionMetadataStore implements TransactionMetadataStore { private final LongAdder commitTransactionCount; private final LongAdder abortTransactionCount; private final LongAdder transactionTimeoutCount; + private final LongAdder nonRetryableCount; InMemTransactionMetadataStore(TransactionCoordinatorID tcID) { this.tcID = tcID; @@ -58,7 +59,7 @@ class InMemTransactionMetadataStore implements TransactionMetadataStore { this.commitTransactionCount = new LongAdder(); this.abortTransactionCount = new LongAdder(); this.transactionTimeoutCount = new LongAdder(); - + this.nonRetryableCount = new LongAdder(); } @Override @@ -165,4 +166,9 @@ public TransactionMetadataStoreStats getMetadataStoreStats() { public List getSlowTransactions(long timeout) { return null; } + + @Override + public void incrementNonRetryableCount() { + nonRetryableCount.increment(); + } } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java index b6eaad2e3e38f..ce7d1d3535195 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java @@ -78,6 +78,7 @@ public class MLTransactionMetadataStore private final LongAdder abortedTransactionCount; private final LongAdder transactionTimeoutCount; private final LongAdder appendLogCount; + private final LongAdder nonRetryableCount; private final MLTransactionSequenceIdGenerator sequenceIdGenerator; private final ExecutorService internalPinnedExecutor; public final RecoverTimeRecord recoverTime = new RecoverTimeRecord(); @@ -100,6 +101,7 @@ public MLTransactionMetadataStore(TransactionCoordinatorID tcID, this.committedTransactionCount = new LongAdder(); this.abortedTransactionCount = new LongAdder(); this.transactionTimeoutCount = new LongAdder(); + this.nonRetryableCount = new LongAdder(); this.appendLogCount = new LongAdder(); DefaultThreadFactory threadFactory = new DefaultThreadFactory("transaction_coordinator_" + tcID.toString() + "thread_factory"); @@ -465,6 +467,11 @@ public TransactionCoordinatorStats getCoordinatorStats() { return transactionCoordinatorstats; } + @Override + public void incrementNonRetryableCount() { + nonRetryableCount.increment(); + } + private CompletableFuture>> getTxnPositionPair(TxnID txnID) { CompletableFuture>> completableFuture = new CompletableFuture<>(); Pair> txnMetaListPair = txnMetaMap.get(txnID.getLeastSigBits()); @@ -507,6 +514,7 @@ public TransactionMetadataStoreStats getMetadataStoreStats() { this.transactionMetadataStoreStats.setAbortedCount(this.abortedTransactionCount.longValue()); this.transactionMetadataStoreStats.setTimeoutCount(this.transactionTimeoutCount.longValue()); this.transactionMetadataStoreStats.setAppendLogCount(this.appendLogCount.longValue()); + this.transactionMetadataStoreStats.setNonRetryableCount(this.nonRetryableCount.longValue()); return transactionMetadataStoreStats; } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TransactionMetadataStoreStats.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TransactionMetadataStoreStats.java index 8b8be92ae1691..53099ac0263e2 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TransactionMetadataStoreStats.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TransactionMetadataStoreStats.java @@ -51,6 +51,9 @@ public class TransactionMetadataStoreStats { /** The timeout out transaction count of this transaction coordinator. */ public long timeoutCount; + /** The non retryable exception count of this transaction coordinator. */ + public long nonRetryableCount; + /** The transaction execution latency. */ public StatsBuckets executionLatencyBuckets = new StatsBuckets(TRANSACTION_EXECUTION_BUCKETS);