sessionTopicPartitions() {
+ return sessionPartitions.keySet();
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index fdacc09db8c8a..1d469a66436a6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -36,6 +36,7 @@
import java.time.Duration;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -919,12 +920,21 @@ default ListConsumerGroupsResult listConsumerGroups() {
* @param options The options to use when listing the consumer group offsets.
* @return The ListGroupOffsetsResult
*/
- ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options);
+ default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options) {
+ @SuppressWarnings("deprecation")
+ ListConsumerGroupOffsetsSpec groupSpec = new ListConsumerGroupOffsetsSpec()
+ .topicPartitions(options.topicPartitions());
+
+ // We can use the provided options with the batched API, which uses topic partitions from
+ // the group spec and ignores any topic partitions set in the options.
+ return listConsumerGroupOffsets(Collections.singletonMap(groupId, groupSpec), options);
+ }
/**
* List the consumer group offsets available in the cluster with the default options.
*
- * This is a convenience method for {@link #listConsumerGroupOffsets(String, ListConsumerGroupOffsetsOptions)} with default options.
+ * This is a convenience method for {@link #listConsumerGroupOffsets(Map, ListConsumerGroupOffsetsOptions)}
+ * to list offsets of all partitions of one group with default options.
*
* @return The ListGroupOffsetsResult.
*/
@@ -932,6 +942,29 @@ default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId)
return listConsumerGroupOffsets(groupId, new ListConsumerGroupOffsetsOptions());
}
+ /**
+ * List the consumer group offsets available in the cluster for the specified consumer groups.
+ *
+ * @param groupSpecs Map of consumer group ids to a spec that specifies the topic partitions of the group to list offsets for.
+ *
+ * @param options The options to use when listing the consumer group offsets.
+ * @return The ListConsumerGroupOffsetsResult
+ */
+ ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map groupSpecs, ListConsumerGroupOffsetsOptions options);
+
+ /**
+ * List the consumer group offsets available in the cluster for the specified groups with the default options.
+ *
+ * This is a convenience method for
+ * {@link #listConsumerGroupOffsets(Map, ListConsumerGroupOffsetsOptions)} with default options.
+ *
+ * @param groupSpecs Map of consumer group ids to a spec that specifies the topic partitions of the group to list offsets for.
+ * @return The ListConsumerGroupOffsetsResult.
+ */
+ default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map groupSpecs) {
+ return listConsumerGroupOffsets(groupSpecs, new ListConsumerGroupOffsetsOptions());
+ }
+
/**
* Delete consumer groups from the cluster.
*
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java b/clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java
index aa0401a8a86eb..1442de58513f9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java
@@ -36,10 +36,10 @@ public class FinalizedVersionRange {
* @throws IllegalArgumentException Raised when the condition described above is not met.
*/
FinalizedVersionRange(final short minVersionLevel, final short maxVersionLevel) {
- if (minVersionLevel < 1 || maxVersionLevel < 1 || maxVersionLevel < minVersionLevel) {
+ if (minVersionLevel < 0 || maxVersionLevel < 0 || maxVersionLevel < minVersionLevel) {
throw new IllegalArgumentException(
String.format(
- "Expected minVersionLevel >= 1, maxVersionLevel >= 1 and" +
+ "Expected minVersionLevel >= 0, maxVersionLevel >= 0 and" +
" maxVersionLevel >= minVersionLevel, but received" +
" minVersionLevel: %d, maxVersionLevel: %d", minVersionLevel, maxVersionLevel));
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index aad2610c94a45..89bc011a4d504 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -218,6 +218,7 @@
import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
+import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.ListGroupsRequest;
import org.apache.kafka.common.requests.ListGroupsResponse;
import org.apache.kafka.common.requests.ListOffsetsRequest;
@@ -273,8 +274,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.kafka.common.internals.Topic.METADATA_TOPIC_NAME;
-import static org.apache.kafka.common.internals.Topic.METADATA_TOPIC_PARTITION;
+import static org.apache.kafka.common.internals.Topic.CLUSTER_METADATA_TOPIC_NAME;
+import static org.apache.kafka.common.internals.Topic.CLUSTER_METADATA_TOPIC_PARTITION;
import static org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignablePartition;
import static org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
import static org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
@@ -3400,13 +3401,14 @@ void handleFailure(Throwable throwable) {
}
@Override
- public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String groupId,
- final ListConsumerGroupOffsetsOptions options) {
+ public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map groupSpecs,
+ ListConsumerGroupOffsetsOptions options) {
SimpleAdminApiFuture> future =
- ListConsumerGroupOffsetsHandler.newFuture(groupId);
- ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(groupId, options.topicPartitions(), options.requireStable(), logContext);
+ ListConsumerGroupOffsetsHandler.newFuture(groupSpecs.keySet());
+ ListConsumerGroupOffsetsHandler handler =
+ new ListConsumerGroupOffsetsHandler(groupSpecs, options.requireStable(), logContext);
invokeDriver(handler, future, options.timeoutMs);
- return new ListConsumerGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)));
+ return new ListConsumerGroupOffsetsResult(future.all());
}
@Override
@@ -3756,7 +3758,7 @@ private List getMembersFromGroup(String groupId, String reason)
public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(String groupId,
RemoveMembersFromConsumerGroupOptions options) {
String reason = options.reason() == null || options.reason().isEmpty() ?
- DEFAULT_LEAVE_GROUP_REASON : options.reason();
+ DEFAULT_LEAVE_GROUP_REASON : JoinGroupRequest.maybeTruncateReason(options.reason());
List members;
if (options.removeAll()) {
@@ -4355,16 +4357,27 @@ private QuorumInfo.ReplicaState translateReplicaState(DescribeQuorumResponseData
}
private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) {
+ List voters = partition.currentVoters().stream()
+ .map(this::translateReplicaState)
+ .collect(Collectors.toList());
+
+ List observers = partition.observers().stream()
+ .map(this::translateReplicaState)
+ .collect(Collectors.toList());
+
return new QuorumInfo(
- partition.leaderId(),
- partition.currentVoters().stream().map(v -> translateReplicaState(v)).collect(Collectors.toList()),
- partition.observers().stream().map(o -> translateReplicaState(o)).collect(Collectors.toList()));
+ partition.leaderId(),
+ partition.leaderEpoch(),
+ partition.highWatermark(),
+ voters,
+ observers
+ );
}
@Override
DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
return new Builder(DescribeQuorumRequest.singletonRequest(
- new TopicPartition(METADATA_TOPIC_NAME, METADATA_TOPIC_PARTITION.partition())));
+ new TopicPartition(CLUSTER_METADATA_TOPIC_NAME, CLUSTER_METADATA_TOPIC_PARTITION.partition())));
}
@Override
@@ -4380,9 +4393,9 @@ void handleResponse(AbstractResponse response) {
throw new UnknownServerException(msg);
}
DescribeQuorumResponseData.TopicData topic = quorumResponse.data().topics().get(0);
- if (!topic.topicName().equals(METADATA_TOPIC_NAME)) {
+ if (!topic.topicName().equals(CLUSTER_METADATA_TOPIC_NAME)) {
String msg = String.format("DescribeMetadataQuorum received a topic with name %s when %s was expected",
- topic.topicName(), METADATA_TOPIC_NAME);
+ topic.topicName(), CLUSTER_METADATA_TOPIC_NAME);
log.debug(msg);
throw new UnknownServerException(msg);
}
@@ -4393,9 +4406,9 @@ void handleResponse(AbstractResponse response) {
throw new UnknownServerException(msg);
}
DescribeQuorumResponseData.PartitionData partition = topic.partitions().get(0);
- if (partition.partitionIndex() != METADATA_TOPIC_PARTITION.partition()) {
+ if (partition.partitionIndex() != CLUSTER_METADATA_TOPIC_PARTITION.partition()) {
String msg = String.format("DescribeMetadataQuorum received a single partition with index %d when %d was expected",
- partition.partitionIndex(), METADATA_TOPIC_PARTITION.partition());
+ partition.partitionIndex(), CLUSTER_METADATA_TOPIC_PARTITION.partition());
log.debug(msg);
throw new UnknownServerException(msg);
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
index 292a47ef393c3..44d3a407327e1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
@@ -23,23 +23,28 @@
import java.util.List;
/**
- * Options for {@link Admin#listConsumerGroupOffsets(String)}.
+ * Options for {@link Admin#listConsumerGroupOffsets(java.util.Map)} and {@link Admin#listConsumerGroupOffsets(String)}.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class ListConsumerGroupOffsetsOptions extends AbstractOptions {
- private List topicPartitions = null;
+ private List topicPartitions;
private boolean requireStable = false;
/**
* Set the topic partitions to list as part of the result.
* {@code null} includes all topic partitions.
+ *
+ * @deprecated Since 3.3.
+ * Use {@link Admin#listConsumerGroupOffsets(java.util.Map, ListConsumerGroupOffsetsOptions)}
+ * to specify topic partitions.
*
* @param topicPartitions List of topic partitions to include
* @return This ListGroupOffsetsOptions
*/
+ @Deprecated
public ListConsumerGroupOffsetsOptions topicPartitions(List topicPartitions) {
this.topicPartitions = topicPartitions;
return this;
@@ -55,7 +60,12 @@ public ListConsumerGroupOffsetsOptions requireStable(final boolean requireStable
/**
* Returns a list of topic partitions to add as part of the result.
+ *
+ * @deprecated Since 3.3.
+ * Use {@link Admin#listConsumerGroupOffsets(java.util.Map, ListConsumerGroupOffsetsOptions)}
+ * to specify topic partitions.
*/
+ @Deprecated
public List topicPartitions() {
return topicPartitions;
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
index 48f4531418110..2136e33a401e1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
@@ -17,25 +17,32 @@
package org.apache.kafka.clients.admin;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.admin.internals.CoordinatorKey;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
-import java.util.Map;
-
/**
- * The result of the {@link Admin#listConsumerGroupOffsets(String)} call.
+ * The result of the {@link Admin#listConsumerGroupOffsets(Map)} and
+ * {@link Admin#listConsumerGroupOffsets(String)} call.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class ListConsumerGroupOffsetsResult {
- final KafkaFuture