Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -338,25 +338,50 @@ public void endTransaction(TxnID txnID, int txnAction, boolean isTimeout,
getTxnMeta(txnID)
.thenCompose(txnMeta -> {
if (txnMeta.status() == TxnStatus.OPEN) {
return updateTxnStatus(txnID, newStatus, TxnStatus.OPEN, isTimeout)
.thenCompose(__ -> endTxnInTransactionBuffer(txnID, txnAction));
return updateTxnStatus(txnID, newStatus, TxnStatus.OPEN, isTimeout);
}
return fakeAsyncCheckTxnStatus(txnMeta.status(), txnAction, txnID, newStatus)
.thenCompose(__ -> endTxnInTransactionBuffer(txnID, txnAction));
return fakeAsyncCheckTxnStatus(txnMeta.status(), txnAction, txnID, newStatus);
}).whenComplete((__, ex)-> {
if (ex == null) {
future.complete(null);
endTxnInTransactionBuffer(txnID, txnAction)
.whenComplete((__2, ex2)-> {
if (ex2 == null) {
return;
}
Throwable realCause = FutureUtil.unwrapCompletionException(ex2);
if (!isRetryableException(realCause)) {
LOG.error("End transaction EndTxnInTransactionBuffer fail! TxnId : {}, "
+ "TxnAction : {}", txnID, txnAction, realCause);
// error for committing -> committed, this case can not occur
// or this txn would stay in committing until TC restart.
// So add a metric for reminding this case.
TransactionMetadataStore store = getStore(txnID);
if (store != null) {
store.incrementNonRetryableCount();
}
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("End transaction EndTxnInTransactionBuffer retry! " +
"TxnId : {}, TxnAction : {}", txnID, txnAction, realCause);
}
transactionOpRetryTimer.newTimeout(timeout ->
endTransaction(txnID, txnAction, isTimeout, future),
endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS);
});
return;
}
if (!isRetryableException(ex)) {
LOG.error("End transaction fail! TxnId : {}, "
+ "TxnAction : {}", txnID, txnAction, ex);
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
if (!isRetryableException(realCause)) {
LOG.error("End transaction UpdateTxnStatus fail! TxnId : {}, "
+ "TxnAction : {}", txnID, txnAction, realCause);
future.completeExceptionally(ex);
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("EndTxnInTransactionBuffer retry! TxnId : {}, "
+ "TxnAction : {}", txnID, txnAction, ex);
LOG.debug("End transaction UpdateTxnStatus retry! TxnId : {}, "
+ "TxnAction : {}", txnID, txnAction, realCause);
}
transactionOpRetryTimer.newTimeout(timeout ->
endTransaction(txnID, txnAction, isTimeout, future),
Expand Down Expand Up @@ -484,6 +509,10 @@ public Map<TransactionCoordinatorID, TransactionMetadataStore> getStores() {
return Collections.unmodifiableMap(stores);
}

private TransactionMetadataStore getStore(TxnID txnID) {
return stores.get(new TransactionCoordinatorID(txnID.getMostSigBits()));
}

public CompletableFuture<Boolean> verifyTxnOwnership(TxnID txnID, String checkOwner) {
return getTxnMeta(txnID)
.thenCompose(meta -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class AggregatedTransactionCoordinatorStats {

public long appendLogCount;

public long nonRetryableCount;

public long[] executionLatency;

public void reset() {
Expand All @@ -41,6 +43,7 @@ public void reset() {
createdCount = 0;
timeoutCount = 0;
appendLogCount = 0;
nonRetryableCount = 0;
executionLatency = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ public static void generate(PulsarService pulsar, PrometheusMetricStreams stream
transactionMetadataStoreStats.getTimeoutCount();
transactionCoordinatorStats.appendLogCount =
transactionMetadataStoreStats.getAppendLogCount();
transactionCoordinatorStats.nonRetryableCount =
transactionMetadataStoreStats.getNonRetryableCount();
transactionMetadataStoreStats.executionLatencyBuckets.refresh();
transactionCoordinatorStats.executionLatency =
transactionMetadataStoreStats.executionLatencyBuckets.getBuckets();
Expand Down Expand Up @@ -251,6 +253,8 @@ static void printTransactionCoordinatorStats(PrometheusMetricStreams stream, Str
coordinatorId);
writeMetric(stream, "pulsar_txn_append_log_total", stats.appendLogCount, cluster,
coordinatorId);
writeMetric(stream, "pulsar_txn_non_retryable_total", stats.nonRetryableCount, cluster,
coordinatorId);
long[] latencyBuckets = stats.executionLatency;
writeMetric(stream, "pulsar_txn_execution_latency_le_10", latencyBuckets[0], cluster, coordinatorId);
writeMetric(stream, "pulsar_txn_execution_latency_le_20", latencyBuckets[1], cluster, coordinatorId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,12 +362,15 @@ public void testEndTransactionOpRetry(TxnStatus txnStatus) throws Exception {
field.set(transactionMetadataStore, TransactionMetadataStoreState.State.None);
CompletableFuture<Void> completableFuture = null;
try {
// since server can respond to client when txn become committing, testcase:committing would not fail
completableFuture = pulsar.getTransactionMetadataStoreService().endTransaction(txnID, TxnAction.COMMIT.getValue(),
false);
completableFuture.get(5, TimeUnit.SECONDS);
fail();
if (txnStatus != TxnStatus.COMMITTING) {
fail();
}
} catch (Exception e) {
if (txnStatus == TxnStatus.OPEN || txnStatus == TxnStatus.COMMITTING) {
if (txnStatus == TxnStatus.OPEN) {
assertTrue(e instanceof TimeoutException);
} else if (txnStatus == TxnStatus.ABORTING) {
assertTrue(e.getCause() instanceof CoordinatorException.InvalidTxnStatusException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ public void testTransactionCoordinatorRateMetrics() throws Exception {
assertEquals(metric.size(), 1);
metric.forEach(item -> assertEquals(item.value, txnCount * 4 + 3));

metric = metrics.get("pulsar_txn_non_retryable_total");
assertEquals(metric.size(), 1);
metric.forEach(item -> assertEquals(item.value, 0));

metric = metrics.get("pulsar_txn_execution_latency_le_5000");
assertEquals(metric.size(), 1);
metric.forEach(item -> assertEquals(item.value, 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
Expand All @@ -143,6 +144,7 @@
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
Expand Down Expand Up @@ -649,6 +651,17 @@ public void testMaxReadPositionForNormalPublish() throws Exception {
Assert.assertEquals(position2.getLedgerId(), messageId.getLedgerId());
Assert.assertEquals(position2.getEntryId(), messageId.getEntryId());
transaction.commit().get();
// open->committing after client finish commit(), need to wait for server committing->committed
Transaction finishTxn = transaction;
try {
Awaitility.await().until(() -> pulsarServiceList.get(0)
.getTransactionMetadataStoreService().getTxnMeta(finishTxn.getTxnID()).get() == null);
} catch (Exception e) {
if (!(e.getCause() instanceof CoordinatorException.TransactionNotFoundException)) {
fail();
}
}

PositionImpl position3 = topicTransactionBuffer.getMaxReadPosition();

Assert.assertEquals(position3.getLedgerId(), messageId2.getLedgerId());
Expand Down Expand Up @@ -1409,6 +1422,16 @@ public void testPendingAckBatchMessageCommit() throws Exception {
// abort the txn2
txn2.abort().get();

Transaction finishTxn = txn2;
try {
Awaitility.await().until(() -> pulsarServiceList.get(0)
.getTransactionMetadataStoreService().getTxnMeta(finishTxn.getTxnID()).get() == null);
} catch (Exception e) {
if (!(e.getCause() instanceof CoordinatorException.TransactionNotFoundException)) {
fail();
}
}

Transaction txn3 = pulsarClient.newTransaction()
.withTransactionTimeout(10, TimeUnit.MINUTES).build().get();
// repeat ack the second message, can ack successful
Expand Down Expand Up @@ -1601,7 +1624,16 @@ public void testGetTxnState() throws Exception {
transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.SECONDS)
.build().get();
pulsarServiceList.get(0).getTransactionMetadataStoreService()
.endTransaction(transaction.getTxnID(), 0, false);
.endTransaction(transaction.getTxnID(), TxnAction.COMMIT_VALUE, false).get();
Transaction finishTxn = transaction;
try {
Awaitility.await().until(() -> pulsarServiceList.get(0)
.getTransactionMetadataStoreService().getTxnMeta(finishTxn.getTxnID()).get() == null);
} catch (Exception e) {
if (!(e.getCause() instanceof CoordinatorException.TransactionNotFoundException)) {
fail();
}
}
transaction.commit();
Transaction errorTxn = transaction;
Awaitility.await().until(() -> errorTxn.getState() == Transaction.State.ERROR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -702,6 +703,43 @@ public void txnMessageAckTest() throws Exception {
log.info("receive transaction messages count: {}", receiveCnt);
}

@Test
public void txnTestCommitTwice() throws Exception {
Transaction commitTxn = getTxn();

commitTxn.commit().get();
Field field = TransactionImpl.class.getDeclaredField("state");
field.setAccessible(true);
field.set(commitTxn, TransactionImpl.State.OPEN);
// since first commit not completely finish in server, second commit can succeed
try {
commitTxn.commit().get();
} catch (Exception reCommitError) {
fail("recommit one not-finished transaction should not be failed.");
}

// since first commit completely finish transaction in server, second commit would throw txnNotFoundException
Transaction finishTxn = commitTxn;
try {
Awaitility.await().until(() -> pulsarServiceList.get(0)
.getTransactionMetadataStoreService().getTxnMeta(finishTxn.getTxnID()).get() == null);
} catch (Exception e) {
if (!(e.getCause() instanceof CoordinatorException.TransactionNotFoundException)) {
fail();
}
}
field.setAccessible(true);
field.set(commitTxn, TransactionImpl.State.OPEN);
try {
commitTxn.commit().get();
fail("recommit one finished transaction should be failed.");
} catch (Exception reCommitError) {
log.info("expected exception for recommit one transaction.");
Assert.assertNotNull(reCommitError);
Assert.assertTrue(reCommitError.getCause() instanceof TransactionNotFoundException);
}
}

@Test
public void txnAckTestBatchAndCumulativeSub() throws Exception {
txnCumulativeAckTest(true, 200, SubscriptionType.Failover);
Expand Down Expand Up @@ -785,12 +823,21 @@ private void txnCumulativeAckTest(boolean batchEnable, int maxBatchSize, Subscri
}

commitTxn.commit().get();
Transaction finishTxn = commitTxn;
try {
Awaitility.await().until(() -> pulsarServiceList.get(0)
.getTransactionMetadataStoreService().getTxnMeta(finishTxn.getTxnID()).get() == null);
} catch (Exception e) {
if (!(e.getCause() instanceof CoordinatorException.TransactionNotFoundException)) {
fail();
}
}
Field field = TransactionImpl.class.getDeclaredField("state");
field.setAccessible(true);
field.set(commitTxn, TransactionImpl.State.OPEN);
try {
commitTxn.commit().get();
fail("recommit one transaction should be failed.");
fail("recommit one finished transaction should be failed.");
} catch (Exception reCommitError) {
// recommit one transaction should be failed
log.info("expected exception for recommit one transaction.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,6 @@ default long getLowWaterMark() {
* @return {@link TxnMeta} the txnMetas of slow transactions
*/
List<TxnMeta> getSlowTransactions(long timeout);

void incrementNonRetryableCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class InMemTransactionMetadataStore implements TransactionMetadataStore {
private final LongAdder commitTransactionCount;
private final LongAdder abortTransactionCount;
private final LongAdder transactionTimeoutCount;
private final LongAdder nonRetryableCount;

InMemTransactionMetadataStore(TransactionCoordinatorID tcID) {
this.tcID = tcID;
Expand All @@ -58,7 +59,7 @@ class InMemTransactionMetadataStore implements TransactionMetadataStore {
this.commitTransactionCount = new LongAdder();
this.abortTransactionCount = new LongAdder();
this.transactionTimeoutCount = new LongAdder();

this.nonRetryableCount = new LongAdder();
}

@Override
Expand Down Expand Up @@ -165,4 +166,9 @@ public TransactionMetadataStoreStats getMetadataStoreStats() {
public List<TxnMeta> getSlowTransactions(long timeout) {
return null;
}

@Override
public void incrementNonRetryableCount() {
nonRetryableCount.increment();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class MLTransactionMetadataStore
private final LongAdder abortedTransactionCount;
private final LongAdder transactionTimeoutCount;
private final LongAdder appendLogCount;
private final LongAdder nonRetryableCount;
private final MLTransactionSequenceIdGenerator sequenceIdGenerator;
private final ExecutorService internalPinnedExecutor;
public final RecoverTimeRecord recoverTime = new RecoverTimeRecord();
Expand All @@ -100,6 +101,7 @@ public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
this.committedTransactionCount = new LongAdder();
this.abortedTransactionCount = new LongAdder();
this.transactionTimeoutCount = new LongAdder();
this.nonRetryableCount = new LongAdder();
this.appendLogCount = new LongAdder();
DefaultThreadFactory threadFactory = new DefaultThreadFactory("transaction_coordinator_"
+ tcID.toString() + "thread_factory");
Expand Down Expand Up @@ -465,6 +467,11 @@ public TransactionCoordinatorStats getCoordinatorStats() {
return transactionCoordinatorstats;
}

@Override
public void incrementNonRetryableCount() {
nonRetryableCount.increment();
}

private CompletableFuture<Pair<TxnMeta, List<Position>>> getTxnPositionPair(TxnID txnID) {
CompletableFuture<Pair<TxnMeta, List<Position>>> completableFuture = new CompletableFuture<>();
Pair<TxnMeta, List<Position>> txnMetaListPair = txnMetaMap.get(txnID.getLeastSigBits());
Expand Down Expand Up @@ -507,6 +514,7 @@ public TransactionMetadataStoreStats getMetadataStoreStats() {
this.transactionMetadataStoreStats.setAbortedCount(this.abortedTransactionCount.longValue());
this.transactionMetadataStoreStats.setTimeoutCount(this.transactionTimeoutCount.longValue());
this.transactionMetadataStoreStats.setAppendLogCount(this.appendLogCount.longValue());
this.transactionMetadataStoreStats.setNonRetryableCount(this.nonRetryableCount.longValue());
return transactionMetadataStoreStats;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ public class TransactionMetadataStoreStats {
/** The timeout out transaction count of this transaction coordinator. */
public long timeoutCount;

/** The non retryable exception count of this transaction coordinator. */
public long nonRetryableCount;

/** The transaction execution latency. */
public StatsBuckets executionLatencyBuckets = new StatsBuckets(TRANSACTION_EXECUTION_BUCKETS);

Expand Down