Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -577,4 +577,92 @@ protected CompletableFuture<Void> internalAbortTransaction(boolean authoritative
.thenCompose(__ -> pulsar().getTransactionMetadataStoreService()
.endTransaction(new TxnID(mostSigBits, leastSigBits), TxnAction.ABORT_VALUE, false));
}

protected void internalGetOwnedTransactions(AsyncResponse asyncResponse,
boolean authoritative, Integer coordinatorId, String owner) {
try {
if (coordinatorId != null) {
validateTopicOwnership(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
authoritative);
TransactionMetadataStore transactionMetadataStore = pulsar().getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(coordinatorId));
if (transactionMetadataStore == null) {
asyncResponse.resume(new RestException(NOT_FOUND,
"Transaction coordinator not found! coordinator id : " + coordinatorId));
return;
}
List<TxnMeta> transactions = transactionMetadataStore.getOwnedTransactions(owner);
List<CompletableFuture<TransactionMetadata>> completableFutures = new ArrayList<>();
for (TxnMeta txnMeta : transactions) {
CompletableFuture<TransactionMetadata> completableFuture = new CompletableFuture<>();
getTransactionMetadata(txnMeta, completableFuture);
completableFutures.add(completableFuture);
}

FutureUtil.waitForAll(completableFutures).whenComplete((v, e) -> {
if (e != null) {
asyncResponse.resume(new RestException(e.getCause()));
return;
}

Map<String, TransactionMetadata> transactionMetadata = new HashMap<>();
for (CompletableFuture<TransactionMetadata> future : completableFutures) {
try {
transactionMetadata.put(future.get().txnId, future.get());
} catch (Exception exception) {
asyncResponse.resume(new RestException(exception.getCause()));
return;
}
}
asyncResponse.resume(transactionMetadata);
});
} else {
getPartitionedTopicMetadataAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
false, false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions == 0) {
asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
"Transaction coordinator not found"));
return;
}
List<CompletableFuture<Map<String, TransactionMetadata>>> completableFutures =
new ArrayList<>();
for (int i = 0; i < partitionMetadata.partitions; i++) {
try {
completableFutures
.add(pulsar().getAdminClient().transactions()
.getOwnedTransactionsByCoordinatorIdAsync(i, owner));
} catch (PulsarServerException e) {
asyncResponse.resume(new RestException(e));
return;
}
}
Map<String, TransactionMetadata> transactionMetadataMaps = new HashMap<>();
FutureUtil.waitForAll(completableFutures).whenComplete((result, e) -> {
if (e != null) {
asyncResponse.resume(new RestException(e));
return;
}

for (CompletableFuture<Map<String, TransactionMetadata>> transactionMetadataMap
: completableFutures) {
try {
transactionMetadataMaps.putAll(transactionMetadataMap.get());
} catch (Exception exception) {
asyncResponse.resume(new RestException(exception.getCause()));
return;
}
}
asyncResponse.resume(transactionMetadataMaps);
});
}).exceptionally(ex -> {
log.error("[{}] Failed to get transaction coordinator state.", clientAppId(), ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});

}
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -484,4 +484,24 @@ public void abortTransaction(@Suspended final AsyncResponse asyncResponse,
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}

@GET
@Path("/ownedTransactions/{owner}")
@ApiOperation(value = "Get owned transactions.", response = TransactionMetadata.class, responseContainer = "Map")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic "
+ "or coordinator or transaction doesn't exist"),
@ApiResponse(code = 503, message = "This Broker is not configured "
+ "with transactionCoordinatorEnabled=true."),
@ApiResponse(code = 307, message = "Topic don't owner by this broker!"),
@ApiResponse(code = 400, message = "Topic is not a persistent topic!"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void getOwnedTransactions(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative")
@DefaultValue("false") boolean authoritative,
@PathParam("owner") String owner,
@QueryParam("coordinatorId") Integer coordinatorId) {
checkTransactionCoordinatorEnabled();
internalGetOwnedTransactions(asyncResponse, authoritative, coordinatorId, owner);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
import org.apache.pulsar.common.stats.PositionInPendingAckStats;
import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
Expand Down Expand Up @@ -1044,6 +1045,38 @@ public void testPeekMessageForShowAllMessages() throws Exception {
}
}

public void testGetOwnedTransactions() throws Exception {
initTransaction(2);
TxnID txnID1 = pulsar.getTransactionMetadataStoreService()
.newTransaction(new TransactionCoordinatorID(0), 10000, "user1").get();
TxnID txnID2 = pulsar.getTransactionMetadataStoreService()
.newTransaction(new TransactionCoordinatorID(0), 20000, "user2").get();
TxnID txnID3 = pulsar.getTransactionMetadataStoreService()
.newTransaction(new TransactionCoordinatorID(1), 30000, "user1").get();

Map<String, TransactionMetadata> transactionMetadataMap = admin
.transactions().getOwnedTransactionsByCoordinatorId(1, "user1");
assertEquals(transactionMetadataMap.size(), 1);
TransactionMetadata transactionMetadata = transactionMetadataMap.get(txnID3.toString());
assertNotNull(transactionMetadata);
assertEquals(transactionMetadata.timeoutAt, 30000);

transactionMetadataMap = admin.transactions().getOwnedTransactions("user1");
assertEquals(transactionMetadataMap.size(), 2);
transactionMetadata = transactionMetadataMap.get(txnID1.toString());
assertNotNull(transactionMetadata);
assertEquals(transactionMetadata.timeoutAt, 10000);
transactionMetadata = transactionMetadataMap.get(txnID3.toString());
assertNotNull(transactionMetadata);
assertEquals(transactionMetadata.timeoutAt, 30000);

transactionMetadataMap = admin.transactions().getOwnedTransactions("user2");
assertEquals(transactionMetadataMap.size(), 1);
transactionMetadata = transactionMetadataMap.get(txnID2.toString());
assertNotNull(transactionMetadata);
assertEquals(transactionMetadata.timeoutAt, 20000);
}

private static void verifyCoordinatorStats(String state,
long sequenceId, long lowWaterMark) {
assertEquals(state, "Ready");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,4 +414,36 @@ CompletableFuture<PositionInPendingAckStats> getPositionStatsInPendingAckAsync(S
* @param txnID the txnId
*/
CompletableFuture<Void> abortTransactionAsync(TxnID txnID);

/**
* Get owned transactions by coordinator id.
* @param coordinatorId the coordinator id
* @param owner the owner
* @return the metadata of slow transactions.
*/
CompletableFuture<Map<String, TransactionMetadata>> getOwnedTransactionsByCoordinatorIdAsync(Integer coordinatorId,
String owner);

/**
* Get owned transactions by coordinator id.
* @param coordinatorId the coordinator id
* @param owner the owner
* @return the metadata of slow transactions.
*/
Map<String, TransactionMetadata> getOwnedTransactionsByCoordinatorId(Integer coordinatorId,
String owner) throws PulsarAdminException;

/**
* Get owned transactions.
* @param owner the owner
* @return the metadata of slow transactions.
*/
CompletableFuture<Map<String, TransactionMetadata>> getOwnedTransactionsAsync(String owner);

/**
* Get owned transactions.
* @param owner the owner
* @return the metadata of slow transactions.
*/
Map<String, TransactionMetadata> getOwnedTransactions(String owner) throws PulsarAdminException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -296,4 +296,31 @@ public CompletableFuture<Void> abortTransactionAsync(TxnID txnID) {
public void abortTransaction(TxnID txnID) throws PulsarAdminException {
sync(() -> abortTransactionAsync(txnID));
}

@Override
public CompletableFuture<Map<String, TransactionMetadata>> getOwnedTransactionsByCoordinatorIdAsync(
Integer coordinatorId, String owner) {
WebTarget path = adminV3Transactions.path("ownedTransactions");
path = path.path(owner);
if (coordinatorId != null) {
path = path.queryParam("coordinatorId", coordinatorId);
}
return asyncGetRequest(path, new FutureCallback<Map<String, TransactionMetadata>>(){});
}

@Override
public Map<String, TransactionMetadata> getOwnedTransactionsByCoordinatorId(Integer coordinatorId, String owner)
throws PulsarAdminException {
return sync(() -> getOwnedTransactionsByCoordinatorIdAsync(coordinatorId, owner));
}

@Override
public CompletableFuture<Map<String, TransactionMetadata>> getOwnedTransactionsAsync(String owner) {
return getOwnedTransactionsByCoordinatorIdAsync(null, owner);
}

@Override
public Map<String, TransactionMetadata> getOwnedTransactions(String owner) throws PulsarAdminException {
return sync(() -> getOwnedTransactionsAsync(owner));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2438,6 +2438,14 @@ void transactions() throws Exception {
cmdTransactions = new CmdTransactions(() -> admin);
cmdTransactions.run(split("abort-transaction -m 1 -l 2"));
verify(transactions).abortTransaction(new TxnID(1, 2));

cmdTransactions = new CmdTransactions(() -> admin);
cmdTransactions.run(split("owned-transactions -c 1 -o user"));
verify(transactions).getOwnedTransactionsByCoordinatorId(1, "user");

cmdTransactions = new CmdTransactions(() -> admin);
cmdTransactions.run(split("owned-transactions -o user"));
verify(transactions).getOwnedTransactions("user");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,24 @@ void run() throws Exception {
}
}

@Command(description = "Get owned transactions.")
private class GetOwnedTransactions extends CliCommand {
@Option(names = {"-c", "--coordinator-id"}, description = "The coordinator id", required = false)
private Integer coordinatorId;

@Option(names = {"-o", "--owner"}, description = "The owner", required = true)
private String owner;

@Override
void run() throws Exception {
if (coordinatorId != null) {
print(getAdmin().transactions().getOwnedTransactionsByCoordinatorId(coordinatorId, owner));
} else {
print(getAdmin().transactions().getOwnedTransactions(owner));
}
}
}

public CmdTransactions(Supplier<PulsarAdmin> admin) {
super("transactions", admin);
addCommand("coordinator-internal-stats", new GetCoordinatorInternalStats());
Expand All @@ -280,6 +298,6 @@ public CmdTransactions(Supplier<PulsarAdmin> admin) {
addCommand("position-stats-in-pending-ack", new GetPositionStatsInPendingAck());
addCommand("coordinators-list", new ListTransactionCoordinators());
addCommand("abort-transaction", new AbortTransaction());

addCommand("owned-transactions", new GetOwnedTransactions());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,11 @@ default long getLowWaterMark() {
* @return {@link TxnMeta} the txnMetas of slow transactions
*/
List<TxnMeta> getSlowTransactions(long timeout);

/**
* Get the transactions owned by the given owner
*
* @return {@link TxnMeta} the txnMetas of transactions
*/
List<TxnMeta> getOwnedTransactions(String owner);
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,4 +165,9 @@ public TransactionMetadataStoreStats getMetadataStoreStats() {
public List<TxnMeta> getSlowTransactions(long timeout) {
return null;
}

@Override
public List<TxnMeta> getOwnedTransactions(String owner) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,17 @@ public List<TxnMeta> getSlowTransactions(long timeout) {
return txnMetas;
}

@Override
public List<TxnMeta> getOwnedTransactions(String owner) {
List<TxnMeta> txnMetas = new ArrayList<>();
txnMetaMap.forEach((k, v) -> {
if (v.getLeft().getOwner() != null && v.getLeft().getOwner().equals(owner)) {
txnMetas.add(v.getLeft());
}
});
return txnMetas;
}

public static List<Subscription> txnSubscriptionToSubscription(List<TransactionSubscription> tnxSubscriptions) {
List<Subscription> subscriptions = new ArrayList<>(tnxSubscriptions.size());
for (TransactionSubscription transactionSubscription : tnxSubscriptions) {
Expand Down