Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
333fdcc
add abstract `getConsumerGroupsForCluster` and set up default behavio…
shouples Jan 27, 2026
7df5d3c
add get/set methods for consumer group storage
shouples Jan 27, 2026
c513dc1
add new event emitter for consumer group changes
shouples Jan 27, 2026
cadea74
add consumer group container and refresh/loading behavior to topics v…
shouples Jan 27, 2026
801a01c
cast to enum
shouples Feb 7, 2026
cd4c05c
add tests for set/getConsumerGroupsForCluster methods
shouples Feb 24, 2026
e9d12b3
use KafkaClusterResourceContainer for consumer groups and topics
shouples Feb 24, 2026
d3de234
update topic view provider tests for consumer group loading and conta…
shouples Feb 24, 2026
883f305
move Create Topic command from Topics view/title to view/item/context…
shouples Feb 24, 2026
5d761ce
update progress title for accuracy
shouples Feb 24, 2026
0105622
remove stale comment
shouples Feb 26, 2026
6db25cd
update TopicsView page object model to use inline command and lower a…
shouples Feb 26, 2026
b495467
fix container construction
shouples Feb 26, 2026
fecf8ca
use updated state methods; clear provider cache when refreshing conta…
shouples Feb 27, 2026
2717baf
update reveal() logic based on new containers and resource types
shouples Feb 27, 2026
001570a
update topic view provider tests for cache clearing and container con…
shouples Feb 27, 2026
5d52899
add tests for consumerGroupsChangedHandler()
shouples Feb 27, 2026
9792f40
add changelog entry
shouples Feb 27, 2026
6303222
use Promise.allSettled to prevent any failed container loading doesn'…
shouples Feb 27, 2026
db7ddae
push coordinator URL parsing logic into loaderUtils.ts and add tests
shouples Feb 27, 2026
108a86b
add try/catch for group members instead of allSettled
shouples Feb 27, 2026
27c744f
remove consumerGroupsChanged event emitter and handler since the exte…
shouples Feb 27, 2026
70f7295
add tests for ConsumerGroup and Consumer handling within TopicViewPro…
shouples Mar 11, 2026
63a1dd3
add ResourceLoader tests for fetching consumer groups and members
shouples Mar 11, 2026
066658e
fix bug where refreshing view after consumer groups start/stop show o…
shouples Mar 12, 2026
5fb18ba
use showErrorNotificationWithButtons instead of basic error notification
shouples Mar 13, 2026
77d39fb
use executeInWorkerPool to prevent overwhelming request count
shouples Mar 13, 2026
e8ebaa0
filter tree items by icon to avoid matching on consumer groups
shouples Mar 13, 2026
c6f90ee
use accessibilityInformation instead of tracking aria-levels for tree…
shouples Mar 13, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this extension will be documented in this file.

## Unreleased

### Added

- Consumer groups and their members are now visible in the Topics view under a collapsible "Consumer
Groups" container
- Direct connections form now supports OAuth authentication for WarpStream connections.

## 2.2.2
Expand Down
10 changes: 5 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -1846,11 +1846,6 @@
"when": "view == confluent-topics && confluent.kafkaClusterSelected && confluent.topicSearchApplied",
"group": "navigation@1"
},
{
"command": "confluent.topics.create",
"when": "view == confluent-topics && confluent.kafkaClusterSelected",
"group": "navigation@2"
},
Comment on lines -1849 to -1853
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved from the view/title section to view/item/context section since this is now shown inline on the "Topics" container item

{
"command": "confluent.topics.kafka-cluster.select",
"when": "view == confluent-topics && (confluent.ccloudConnectionAvailable || confluent.localKafkaClusterAvailable || confluent.directKafkaClusterAvailable)",
Expand Down Expand Up @@ -2008,6 +2003,11 @@
"when": "viewItem == resources-ccloud-container-connected",
"group": "inline@2"
},
{
"command": "confluent.topics.create",
"when": "view == confluent-topics && viewItem == topics-container",
"group": "inline@1"
},
{
"command": "confluent.flinkdatabase.createRelation",
"when": "view == confluent-flink-database && viewItem == flink-database-relations-container",
Expand Down
89 changes: 87 additions & 2 deletions src/loaders/cachingResourceLoader.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
import type { TopicData } from "../clients/kafkaRest";
import type { ConsumerGroupData, TopicData } from "../clients/kafkaRest";
import { Logger } from "../logging";
import type { ConsumerGroupState } from "../models/consumerGroup";
import { Consumer, ConsumerGroup } from "../models/consumerGroup";
import type { Environment, EnvironmentType } from "../models/environment";
import type { KafkaCluster, KafkaClusterType } from "../models/kafkaCluster";
import type { EnvironmentId } from "../models/resource";
import type { Subject } from "../models/schema";
import type { SchemaRegistry, SchemaRegistryType } from "../models/schemaRegistry";
import type { KafkaTopic } from "../models/topic";
import { getResourceManager } from "../storage/resourceManager";
import { executeInWorkerPool } from "../utils/workerPool";
import { ResourceLoader } from "./resourceLoader";
import { correlateTopicsWithSchemaSubjects, fetchTopics } from "./utils/loaderUtils";
import {
correlateTopicsWithSchemaSubjects,
fetchConsumerGroupMembers,
fetchConsumerGroups,
fetchTopics,
parseCoordinatorId,
} from "./utils/loaderUtils";

const logger = new Logger("cachingResourceLoader");

Expand Down Expand Up @@ -246,4 +255,80 @@ export abstract class CachingResourceLoader<

return topics;
}

/**
* Return the consumer groups for a given Kafka cluster.
*
* Caches the consumer groups for the cluster in the resource manager.
*/
public async getConsumerGroupsForCluster(
cluster: KCT,
forceDeepRefresh: boolean = false,
): Promise<ConsumerGroup[]> {
if (cluster.connectionId !== this.connectionId) {
throw new Error(
`Mismatched connectionId ${this.connectionId} for cluster ${JSON.stringify(cluster, null, 2)}`,
);
}

await this.ensureCoarseResourcesLoaded(forceDeepRefresh);

const resourceManager = getResourceManager();
const cachedConsumerGroups = await resourceManager.getConsumerGroupsForCluster(cluster);
if (cachedConsumerGroups !== undefined && !forceDeepRefresh) {
// Cache hit.
logger.debug(
`Returning ${cachedConsumerGroups.length} cached consumer groups for cluster ${cluster.id}`,
);
return cachedConsumerGroups;
}

// Deep fetch consumer groups from the API.
const responseConsumerGroups: ConsumerGroupData[] = await fetchConsumerGroups(cluster);

// Fetch members for each group with bounded concurrency to avoid overwhelming
// the sidecar/Kafka REST API on clusters with many consumer groups.
const memberResults = await executeInWorkerPool(
(data: ConsumerGroupData) => fetchConsumerGroupMembers(cluster, data.consumer_group_id),
responseConsumerGroups,
{ maxWorkers: 5, taskName: "fetchConsumerGroupMembers" },
);

// Convert API responses to ConsumerGroup models.
const consumerGroups: ConsumerGroup[] = responseConsumerGroups.map((data, index) => {
const memberResult = memberResults[index];
const members: Consumer[] =
memberResult?.result?.map(
(m) =>
new Consumer({
connectionId: cluster.connectionId,
connectionType: cluster.connectionType,
environmentId: cluster.environmentId,
clusterId: cluster.id,
consumerGroupId: data.consumer_group_id,
consumerId: m.consumer_id,
clientId: m.client_id,
instanceId: m.instance_id ?? null,
}),
) ?? [];

return new ConsumerGroup({
connectionId: cluster.connectionId,
connectionType: cluster.connectionType,
environmentId: cluster.environmentId,
clusterId: cluster.id,
consumerGroupId: data.consumer_group_id,
state: data.state as ConsumerGroupState,
isSimple: data.is_simple,
partitionAssignor: data.partition_assignor,
coordinatorId: parseCoordinatorId(data.coordinator?.related),
members,
});
});

// Cache the consumer groups for this cluster.
await resourceManager.setConsumerGroupsForCluster(cluster, consumerGroups);

return consumerGroups;
}
}
130 changes: 128 additions & 2 deletions src/loaders/resourceLoader.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import assert from "assert";
import * as sinon from "sinon";
import { getStubbedResourceManager } from "../../tests/stubs/extensionStorage";
import { getStubbedLocalResourceLoader } from "../../tests/stubs/resourceLoaders";
import { getSidecarStub } from "../../tests/stubs/sidecar";
import {
Expand All @@ -17,16 +18,17 @@ import {
TEST_LOCAL_SUBJECT_WITH_SCHEMAS,
} from "../../tests/unit/testResources";
import { createTestSubject, createTestTopicData } from "../../tests/unit/testUtils";
import type { TopicData } from "../clients/kafkaRest";
import type { ConsumerData, ConsumerGroupData, TopicData } from "../clients/kafkaRest";
import { SubjectsV1Api } from "../clients/schemaRegistryRest";
import { CCLOUD_CONNECTION_ID, LOCAL_CONNECTION_ID } from "../constants";
import * as errors from "../errors";
import { Consumer, ConsumerGroup, ConsumerGroupState } from "../models/consumerGroup";
import type { ConnectionId } from "../models/resource";
import type { Subject } from "../models/schema";
import { Schema } from "../models/schema";
import * as notifications from "../notifications";
import type { SidecarHandle } from "../sidecar";
import { getResourceManager } from "../storage/resourceManager";
import { getResourceManager, type ResourceManager } from "../storage/resourceManager";
import { clearWorkspaceState } from "../storage/utils";
import { CCloudResourceLoader } from "./ccloudResourceLoader";
import { DirectResourceLoader } from "./directResourceLoader";
Expand Down Expand Up @@ -469,6 +471,130 @@ describe("ResourceLoader::getTopicsForCluster()", () => {
});
});

describe("ResourceLoader::getConsumerGroupsForCluster()", () => {
let loaderInstance: LocalResourceLoader;
let sandbox: sinon.SinonSandbox;
let fetchConsumerGroupsStub: sinon.SinonStub;
let fetchConsumerGroupMembersStub: sinon.SinonStub;
let rmStub: sinon.SinonStubbedInstance<ResourceManager>;

// minimal ConsumerGroupData fixture matching the API response shape
const testConsumerGroupData: ConsumerGroupData = {
kind: "KafkaConsumerGroup",
metadata: { self: "", resource_name: "" },
cluster_id: TEST_LOCAL_KAFKA_CLUSTER.id,
consumer_group_id: "test-group-1",
is_simple: false,
partition_assignor: "range",
state: "STABLE",
coordinator: {
related: `http://localhost/kafka/v3/clusters/${TEST_LOCAL_KAFKA_CLUSTER.id}/brokers/0`,
},
lag_summary: { related: "" },
};

const testConsumerData: ConsumerData = {
kind: "KafkaConsumer",
metadata: { self: "", resource_name: "" },
cluster_id: TEST_LOCAL_KAFKA_CLUSTER.id,
consumer_group_id: "test-group-1",
consumer_id: "consumer-1",
client_id: "client-1",
assignments: { related: "" },
};

beforeEach(() => {
sandbox = sinon.createSandbox();

fetchConsumerGroupsStub = sandbox.stub(loaderUtils, "fetchConsumerGroups");
fetchConsumerGroupMembersStub = sandbox.stub(loaderUtils, "fetchConsumerGroupMembers");
loaderInstance = LocalResourceLoader.getInstance();
// bracket notation to stub protected method (same pattern as ccloudResourceLoader.test.ts)
loaderInstance["ensureCoarseResourcesLoaded"] = sandbox.stub().resolves();

rmStub = getStubbedResourceManager(sandbox);
rmStub.getConsumerGroupsForCluster.resolves(undefined);
rmStub.setConsumerGroupsForCluster.resolves();
});

afterEach(async () => {
await clearWorkspaceState();
sandbox.restore();
});

it("raises error for mismatched connectionId", async () => {
await assert.rejects(
loaderInstance.getConsumerGroupsForCluster(TEST_CCLOUD_KAFKA_CLUSTER),
(err) => {
return (err as Error).message.startsWith("Mismatched connectionId");
},
);
});

it("returns cached data if available", async () => {
const cachedGroups = [
new ConsumerGroup({
connectionId: TEST_LOCAL_KAFKA_CLUSTER.connectionId,
connectionType: TEST_LOCAL_KAFKA_CLUSTER.connectionType,
environmentId: TEST_LOCAL_KAFKA_CLUSTER.environmentId,
clusterId: TEST_LOCAL_KAFKA_CLUSTER.id,
consumerGroupId: "cached-group",
state: ConsumerGroupState.Stable,
isSimple: false,
partitionAssignor: "range",
coordinatorId: 0,
members: [],
}),
];
rmStub.getConsumerGroupsForCluster.resolves(cachedGroups);

const groups = await loaderInstance.getConsumerGroupsForCluster(TEST_LOCAL_KAFKA_CLUSTER);

assert.deepStrictEqual(groups, cachedGroups);
sinon.assert.notCalled(fetchConsumerGroupsStub);
sinon.assert.calledOnce(rmStub.getConsumerGroupsForCluster);
});

it("fetches consumer groups and members from the API on cache miss", async () => {
fetchConsumerGroupsStub.resolves([testConsumerGroupData]);
fetchConsumerGroupMembersStub.resolves([testConsumerData]);

const groups = await loaderInstance.getConsumerGroupsForCluster(TEST_LOCAL_KAFKA_CLUSTER);

assert.strictEqual(groups.length, 1);
assert.ok(groups[0] instanceof ConsumerGroup);
assert.strictEqual(groups[0].consumerGroupId, "test-group-1");
assert.strictEqual(groups[0].members.length, 1);
assert.ok(groups[0].members[0] instanceof Consumer);
assert.strictEqual(groups[0].members[0].consumerId, "consumer-1");
sinon.assert.calledOnce(fetchConsumerGroupsStub);
sinon.assert.calledOnce(fetchConsumerGroupMembersStub);
});

it("returns group with empty members when member fetch fails", async () => {
fetchConsumerGroupsStub.resolves([testConsumerGroupData]);
fetchConsumerGroupMembersStub.rejects(new Error("member fetch failed"));

const groups = await loaderInstance.getConsumerGroupsForCluster(TEST_LOCAL_KAFKA_CLUSTER);

assert.strictEqual(groups.length, 1);
assert.strictEqual(groups[0].members.length, 0);
});

it("bypasses cache when forceDeepRefresh is true", async () => {
// even though cache returns data, forceDeepRefresh should re-fetch
rmStub.getConsumerGroupsForCluster.resolves([]);

fetchConsumerGroupsStub.resolves([testConsumerGroupData]);
fetchConsumerGroupMembersStub.resolves([]);

const groups = await loaderInstance.getConsumerGroupsForCluster(TEST_LOCAL_KAFKA_CLUSTER, true);

assert.strictEqual(groups.length, 1);
sinon.assert.calledOnce(fetchConsumerGroupsStub);
});
});

describe("ResourceLoader::getSchemasForSubject()", () => {
let loaderInstance: ResourceLoader;
let sandbox: sinon.SinonSandbox;
Expand Down
12 changes: 12 additions & 0 deletions src/loaders/resourceLoader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type { ConnectionId, EnvironmentId, IResourceBase } from "../models/resou
import type { Schema } from "../models/schema";
import { Subject, subjectMatchesTopicName } from "../models/schema";
import type { SchemaRegistry } from "../models/schemaRegistry";
import type { ConsumerGroup } from "../models/consumerGroup";
import type { KafkaTopic } from "../models/topic";
import { showWarningNotificationWithButtons } from "../notifications";
import { getSidecar } from "../sidecar";
Expand Down Expand Up @@ -133,6 +134,17 @@ export abstract class ResourceLoader extends DisposableCollection implements IRe
forceRefresh?: boolean,
): Promise<KafkaTopic[]>;

/**
* Return the consumer groups for a given Kafka cluster.
* @param cluster The Kafka cluster to fetch consumer groups for.
* @param forceRefresh If true, will ignore any cached consumer groups and fetch anew.
* @returns An array of consumer groups for the cluster.
*/
public abstract getConsumerGroupsForCluster(
cluster: KafkaCluster,
forceRefresh?: boolean,
): Promise<ConsumerGroup[]>;

// Schema registry methods.

/**
Expand Down
51 changes: 51 additions & 0 deletions src/loaders/utils/loaderUtils.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,14 @@ describe("loaderUtils.ts", () => {

assert.strictEqual(result.length, 0);
});

it("should propagate API errors from listKafkaConsumerGroups", async () => {
stubbedClient.listKafkaConsumerGroups.rejects(new Error("Connection refused"));

await assert.rejects(loaderUtils.fetchConsumerGroups(TEST_LOCAL_KAFKA_CLUSTER), {
message: "Connection refused",
});
});
});

describe("fetchConsumerGroupMembers()", () => {
Expand Down Expand Up @@ -475,6 +483,49 @@ describe("loaderUtils.ts", () => {
consumer_group_id: testGroupId,
});
});

it("should propagate API errors from listKafkaConsumers", async () => {
stubbedClient.listKafkaConsumers.rejects(new Error("Connection refused"));

await assert.rejects(
loaderUtils.fetchConsumerGroupMembers(TEST_LOCAL_KAFKA_CLUSTER, testGroupId),
{ message: "Connection refused" },
);
});
});

describe("parseCoordinatorId()", () => {
it("should parse broker ID from a full Kafka REST URL", () => {
const url = "http://localhost:26636/kafka/v3/clusters/lkc-abc123/brokers/2";
assert.strictEqual(loaderUtils.parseCoordinatorId(url), 2);
});

it("should parse broker ID from a CCloud-style URL", () => {
const url =
"https://pkc-abc123.us-east-1.aws.confluent.cloud/kafka/v3/clusters/lkc-5vmjd8/brokers/0";
assert.strictEqual(loaderUtils.parseCoordinatorId(url), 0);
});

it("should parse a plain numeric string", () => {
assert.strictEqual(loaderUtils.parseCoordinatorId("7"), 7);
});

it("should return null for undefined", () => {
assert.strictEqual(loaderUtils.parseCoordinatorId(undefined), null);
});

it("should return null for empty string", () => {
assert.strictEqual(loaderUtils.parseCoordinatorId(""), null);
});

it("should return null when the last segment is non-numeric", () => {
assert.strictEqual(
loaderUtils.parseCoordinatorId(
"http://localhost/kafka/v3/clusters/lkc-abc123/brokers/notanumber",
),
null,
);
});
});

describe("generateFlinkStatementKey()", () => {
Expand Down
Loading