From 62674f1b81389249693c1d13f3914f3e0428cc5b Mon Sep 17 00:00:00 2001 From: ruihongzhou Date: Wed, 26 Jun 2024 15:53:50 +0800 Subject: [PATCH] [improve][txn] Add admin api getOwnedTransactions --- .../broker/admin/impl/TransactionsBase.java | 88 +++++++++++++++++++ .../pulsar/broker/admin/v3/Transactions.java | 20 +++++ .../admin/v3/AdminApiTransactionTest.java | 33 +++++++ .../pulsar/client/admin/Transactions.java | 32 +++++++ .../admin/internal/TransactionsImpl.java | 27 ++++++ .../pulsar/admin/cli/PulsarAdminToolTest.java | 8 ++ .../pulsar/admin/cli/CmdTransactions.java | 20 ++++- .../coordinator/TransactionMetadataStore.java | 7 ++ .../impl/InMemTransactionMetadataStore.java | 5 ++ .../impl/MLTransactionMetadataStore.java | 11 +++ 10 files changed, 250 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java index 55767136f8151..e06b09c2a0a2e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java @@ -577,4 +577,92 @@ protected CompletableFuture internalAbortTransaction(boolean authoritative .thenCompose(__ -> pulsar().getTransactionMetadataStoreService() .endTransaction(new TxnID(mostSigBits, leastSigBits), TxnAction.ABORT_VALUE, false)); } + + protected void internalGetOwnedTransactions(AsyncResponse asyncResponse, + boolean authoritative, Integer coordinatorId, String owner) { + try { + if (coordinatorId != null) { + validateTopicOwnership(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId), + authoritative); + TransactionMetadataStore transactionMetadataStore = pulsar().getTransactionMetadataStoreService() + .getStores().get(TransactionCoordinatorID.get(coordinatorId)); + if (transactionMetadataStore == null) { + asyncResponse.resume(new RestException(NOT_FOUND, + "Transaction coordinator not found! coordinator id : " + coordinatorId)); + return; + } + List transactions = transactionMetadataStore.getOwnedTransactions(owner); + List> completableFutures = new ArrayList<>(); + for (TxnMeta txnMeta : transactions) { + CompletableFuture completableFuture = new CompletableFuture<>(); + getTransactionMetadata(txnMeta, completableFuture); + completableFutures.add(completableFuture); + } + + FutureUtil.waitForAll(completableFutures).whenComplete((v, e) -> { + if (e != null) { + asyncResponse.resume(new RestException(e.getCause())); + return; + } + + Map transactionMetadata = new HashMap<>(); + for (CompletableFuture future : completableFutures) { + try { + transactionMetadata.put(future.get().txnId, future.get()); + } catch (Exception exception) { + asyncResponse.resume(new RestException(exception.getCause())); + return; + } + } + asyncResponse.resume(transactionMetadata); + }); + } else { + getPartitionedTopicMetadataAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, + false, false).thenAccept(partitionMetadata -> { + if (partitionMetadata.partitions == 0) { + asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, + "Transaction coordinator not found")); + return; + } + List>> completableFutures = + new ArrayList<>(); + for (int i = 0; i < partitionMetadata.partitions; i++) { + try { + completableFutures + .add(pulsar().getAdminClient().transactions() + .getOwnedTransactionsByCoordinatorIdAsync(i, owner)); + } catch (PulsarServerException e) { + asyncResponse.resume(new RestException(e)); + return; + } + } + Map transactionMetadataMaps = new HashMap<>(); + FutureUtil.waitForAll(completableFutures).whenComplete((result, e) -> { + if (e != null) { + asyncResponse.resume(new RestException(e)); + return; + } + + for (CompletableFuture> transactionMetadataMap + : completableFutures) { + try { + transactionMetadataMaps.putAll(transactionMetadataMap.get()); + } catch (Exception exception) { + asyncResponse.resume(new RestException(exception.getCause())); + return; + } + } + asyncResponse.resume(transactionMetadataMaps); + }); + }).exceptionally(ex -> { + log.error("[{}] Failed to get transaction coordinator state.", clientAppId(), ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + + } + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java index 089ec53069287..584970ecd0155 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java @@ -484,4 +484,24 @@ public void abortTransaction(@Suspended final AsyncResponse asyncResponse, resumeAsyncResponseExceptionally(asyncResponse, e); } } + + @GET + @Path("/ownedTransactions/{owner}") + @ApiOperation(value = "Get owned transactions.", response = TransactionMetadata.class, responseContainer = "Map") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic " + + "or coordinator or transaction doesn't exist"), + @ApiResponse(code = 503, message = "This Broker is not configured " + + "with transactionCoordinatorEnabled=true."), + @ApiResponse(code = 307, message = "Topic don't owner by this broker!"), + @ApiResponse(code = 400, message = "Topic is not a persistent topic!"), + @ApiResponse(code = 409, message = "Concurrent modification")}) + public void getOwnedTransactions(@Suspended final AsyncResponse asyncResponse, + @QueryParam("authoritative") + @DefaultValue("false") boolean authoritative, + @PathParam("owner") String owner, + @QueryParam("coordinatorId") Integer coordinatorId) { + checkTransactionCoordinatorEnabled(); + internalGetOwnedTransactions(asyncResponse, authoritative, coordinatorId, owner); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java index 1cc20b04c2137..fd2ef6f472319 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java @@ -81,6 +81,7 @@ import org.apache.pulsar.common.policies.data.TransactionPendingAckStats; import org.apache.pulsar.common.stats.PositionInPendingAckStats; import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider; +import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TxnMeta; import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl; @@ -1044,6 +1045,38 @@ public void testPeekMessageForShowAllMessages() throws Exception { } } + public void testGetOwnedTransactions() throws Exception { + initTransaction(2); + TxnID txnID1 = pulsar.getTransactionMetadataStoreService() + .newTransaction(new TransactionCoordinatorID(0), 10000, "user1").get(); + TxnID txnID2 = pulsar.getTransactionMetadataStoreService() + .newTransaction(new TransactionCoordinatorID(0), 20000, "user2").get(); + TxnID txnID3 = pulsar.getTransactionMetadataStoreService() + .newTransaction(new TransactionCoordinatorID(1), 30000, "user1").get(); + + Map transactionMetadataMap = admin + .transactions().getOwnedTransactionsByCoordinatorId(1, "user1"); + assertEquals(transactionMetadataMap.size(), 1); + TransactionMetadata transactionMetadata = transactionMetadataMap.get(txnID3.toString()); + assertNotNull(transactionMetadata); + assertEquals(transactionMetadata.timeoutAt, 30000); + + transactionMetadataMap = admin.transactions().getOwnedTransactions("user1"); + assertEquals(transactionMetadataMap.size(), 2); + transactionMetadata = transactionMetadataMap.get(txnID1.toString()); + assertNotNull(transactionMetadata); + assertEquals(transactionMetadata.timeoutAt, 10000); + transactionMetadata = transactionMetadataMap.get(txnID3.toString()); + assertNotNull(transactionMetadata); + assertEquals(transactionMetadata.timeoutAt, 30000); + + transactionMetadataMap = admin.transactions().getOwnedTransactions("user2"); + assertEquals(transactionMetadataMap.size(), 1); + transactionMetadata = transactionMetadataMap.get(txnID2.toString()); + assertNotNull(transactionMetadata); + assertEquals(transactionMetadata.timeoutAt, 20000); + } + private static void verifyCoordinatorStats(String state, long sequenceId, long lowWaterMark) { assertEquals(state, "Ready"); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java index 8fadabdfba235..a4b7cb3c85fdd 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java @@ -414,4 +414,36 @@ CompletableFuture getPositionStatsInPendingAckAsync(S * @param txnID the txnId */ CompletableFuture abortTransactionAsync(TxnID txnID); + + /** + * Get owned transactions by coordinator id. + * @param coordinatorId the coordinator id + * @param owner the owner + * @return the metadata of slow transactions. + */ + CompletableFuture> getOwnedTransactionsByCoordinatorIdAsync(Integer coordinatorId, + String owner); + + /** + * Get owned transactions by coordinator id. + * @param coordinatorId the coordinator id + * @param owner the owner + * @return the metadata of slow transactions. + */ + Map getOwnedTransactionsByCoordinatorId(Integer coordinatorId, + String owner) throws PulsarAdminException; + + /** + * Get owned transactions. + * @param owner the owner + * @return the metadata of slow transactions. + */ + CompletableFuture> getOwnedTransactionsAsync(String owner); + + /** + * Get owned transactions. + * @param owner the owner + * @return the metadata of slow transactions. + */ + Map getOwnedTransactions(String owner) throws PulsarAdminException; } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java index 460478787eb10..f9dba14a9a60e 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java @@ -296,4 +296,31 @@ public CompletableFuture abortTransactionAsync(TxnID txnID) { public void abortTransaction(TxnID txnID) throws PulsarAdminException { sync(() -> abortTransactionAsync(txnID)); } + + @Override + public CompletableFuture> getOwnedTransactionsByCoordinatorIdAsync( + Integer coordinatorId, String owner) { + WebTarget path = adminV3Transactions.path("ownedTransactions"); + path = path.path(owner); + if (coordinatorId != null) { + path = path.queryParam("coordinatorId", coordinatorId); + } + return asyncGetRequest(path, new FutureCallback>(){}); + } + + @Override + public Map getOwnedTransactionsByCoordinatorId(Integer coordinatorId, String owner) + throws PulsarAdminException { + return sync(() -> getOwnedTransactionsByCoordinatorIdAsync(coordinatorId, owner)); + } + + @Override + public CompletableFuture> getOwnedTransactionsAsync(String owner) { + return getOwnedTransactionsByCoordinatorIdAsync(null, owner); + } + + @Override + public Map getOwnedTransactions(String owner) throws PulsarAdminException { + return sync(() -> getOwnedTransactionsAsync(owner)); + } } diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index a3b1fa075cffc..fce05fd229eac 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -2438,6 +2438,14 @@ void transactions() throws Exception { cmdTransactions = new CmdTransactions(() -> admin); cmdTransactions.run(split("abort-transaction -m 1 -l 2")); verify(transactions).abortTransaction(new TxnID(1, 2)); + + cmdTransactions = new CmdTransactions(() -> admin); + cmdTransactions.run(split("owned-transactions -c 1 -o user")); + verify(transactions).getOwnedTransactionsByCoordinatorId(1, "user"); + + cmdTransactions = new CmdTransactions(() -> admin); + cmdTransactions.run(split("owned-transactions -o user")); + verify(transactions).getOwnedTransactions("user"); } @Test diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java index 63c729263cbe6..f4b5ad55347cf 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java @@ -264,6 +264,24 @@ void run() throws Exception { } } + @Command(description = "Get owned transactions.") + private class GetOwnedTransactions extends CliCommand { + @Option(names = {"-c", "--coordinator-id"}, description = "The coordinator id", required = false) + private Integer coordinatorId; + + @Option(names = {"-o", "--owner"}, description = "The owner", required = true) + private String owner; + + @Override + void run() throws Exception { + if (coordinatorId != null) { + print(getAdmin().transactions().getOwnedTransactionsByCoordinatorId(coordinatorId, owner)); + } else { + print(getAdmin().transactions().getOwnedTransactions(owner)); + } + } + } + public CmdTransactions(Supplier admin) { super("transactions", admin); addCommand("coordinator-internal-stats", new GetCoordinatorInternalStats()); @@ -280,6 +298,6 @@ public CmdTransactions(Supplier admin) { addCommand("position-stats-in-pending-ack", new GetPositionStatsInPendingAck()); addCommand("coordinators-list", new ListTransactionCoordinators()); addCommand("abort-transaction", new AbortTransaction()); - + addCommand("owned-transactions", new GetOwnedTransactions()); } } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java index ff5adb4d409c7..638c3d82c6689 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java @@ -139,4 +139,11 @@ default long getLowWaterMark() { * @return {@link TxnMeta} the txnMetas of slow transactions */ List getSlowTransactions(long timeout); + + /** + * Get the transactions owned by the given owner + * + * @return {@link TxnMeta} the txnMetas of transactions + */ + List getOwnedTransactions(String owner); } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java index 0f3c5e42d7a69..b11e5efbc0721 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java @@ -165,4 +165,9 @@ public TransactionMetadataStoreStats getMetadataStoreStats() { public List getSlowTransactions(long timeout) { return null; } + + @Override + public List getOwnedTransactions(String owner) { + return null; + } } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java index b6eaad2e3e38f..b9380646f6445 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java @@ -521,6 +521,17 @@ public List getSlowTransactions(long timeout) { return txnMetas; } + @Override + public List getOwnedTransactions(String owner) { + List txnMetas = new ArrayList<>(); + txnMetaMap.forEach((k, v) -> { + if (v.getLeft().getOwner() != null && v.getLeft().getOwner().equals(owner)) { + txnMetas.add(v.getLeft()); + } + }); + return txnMetas; + } + public static List txnSubscriptionToSubscription(List tnxSubscriptions) { List subscriptions = new ArrayList<>(tnxSubscriptions.size()); for (TransactionSubscription transactionSubscription : tnxSubscriptions) {