From 333fdccc95abc0df2b536f6c3cdb363447c363a5 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Tue, 27 Jan 2026 16:58:02 -0500 Subject: [PATCH 01/29] add abstract `getConsumerGroupsForCluster` and set up default behavior in CachingResourceLoader --- src/loaders/cachingResourceLoader.ts | 80 +++++++++++++++++++++++++++- src/loaders/resourceLoader.ts | 12 +++++ 2 files changed, 90 insertions(+), 2 deletions(-) diff --git a/src/loaders/cachingResourceLoader.ts b/src/loaders/cachingResourceLoader.ts index 0cf042118a..3ddca7e31d 100644 --- a/src/loaders/cachingResourceLoader.ts +++ b/src/loaders/cachingResourceLoader.ts @@ -1,5 +1,6 @@ -import type { TopicData } from "../clients/kafkaRest"; +import type { ConsumerGroupData, TopicData } from "../clients/kafkaRest"; import { Logger } from "../logging"; +import { Consumer, ConsumerGroup, parseConsumerGroupState } from "../models/consumerGroup"; import type { Environment, EnvironmentType } from "../models/environment"; import type { KafkaCluster, KafkaClusterType } from "../models/kafkaCluster"; import type { EnvironmentId } from "../models/resource"; @@ -8,7 +9,12 @@ import type { SchemaRegistry, SchemaRegistryType } from "../models/schemaRegistr import type { KafkaTopic } from "../models/topic"; import { getResourceManager } from "../storage/resourceManager"; import { ResourceLoader } from "./resourceLoader"; -import { correlateTopicsWithSchemaSubjects, fetchTopics } from "./utils/loaderUtils"; +import { + correlateTopicsWithSchemaSubjects, + fetchConsumerGroupMembers, + fetchConsumerGroups, + fetchTopics, +} from "./utils/loaderUtils"; const logger = new Logger("cachingResourceLoader"); @@ -246,4 +252,74 @@ export abstract class CachingResourceLoader< return topics; } + + /** + * Return the consumer groups for a given Kafka cluster. + * + * Caches the consumer groups for the cluster in the resource manager. + */ + public async getConsumerGroupsForCluster( + cluster: KCT, + forceDeepRefresh: boolean = false, + ): Promise { + if (cluster.connectionId !== this.connectionId) { + throw new Error( + `Mismatched connectionId ${this.connectionId} for cluster ${JSON.stringify(cluster, null, 2)}`, + ); + } + + await this.ensureCoarseResourcesLoaded(forceDeepRefresh); + + const resourceManager = getResourceManager(); + const cachedConsumerGroups = await resourceManager.getConsumerGroupsForCluster(cluster); + if (cachedConsumerGroups !== undefined && !forceDeepRefresh) { + // Cache hit. + logger.debug( + `Returning ${cachedConsumerGroups.length} cached consumer groups for cluster ${cluster.id}`, + ); + return cachedConsumerGroups; + } + + // Deep fetch consumer groups from the API. + const responseConsumerGroups: ConsumerGroupData[] = await fetchConsumerGroups(cluster); + + // Convert API response to ConsumerGroup models, fetching members for each group. + const consumerGroups: ConsumerGroup[] = await Promise.all( + responseConsumerGroups.map(async (data) => { + // Fetch members for this consumer group + const memberData = await fetchConsumerGroupMembers(cluster, data.consumer_group_id); + const members: Consumer[] = memberData.map( + (m) => + new Consumer({ + connectionId: cluster.connectionId, + connectionType: cluster.connectionType, + environmentId: cluster.environmentId, + clusterId: cluster.id, + consumerGroupId: data.consumer_group_id, + consumerId: m.consumer_id, + clientId: m.client_id, + instanceId: m.instance_id ?? null, + }), + ); + + return new ConsumerGroup({ + connectionId: cluster.connectionId, + connectionType: cluster.connectionType, + environmentId: cluster.environmentId, + clusterId: cluster.id, + consumerGroupId: data.consumer_group_id, + state: parseConsumerGroupState(data.state), + isSimple: data.is_simple, + partitionAssignor: data.partition_assignor, + coordinatorId: data.coordinator?.related ? parseInt(data.coordinator.related, 10) : null, + members, + }); + }), + ); + + // Cache the consumer groups for this cluster. + await resourceManager.setConsumerGroupsForCluster(cluster, consumerGroups); + + return consumerGroups; + } } diff --git a/src/loaders/resourceLoader.ts b/src/loaders/resourceLoader.ts index ee2f9d56d2..58123993d4 100644 --- a/src/loaders/resourceLoader.ts +++ b/src/loaders/resourceLoader.ts @@ -11,6 +11,7 @@ import type { ConnectionId, EnvironmentId, IResourceBase } from "../models/resou import type { Schema } from "../models/schema"; import { Subject, subjectMatchesTopicName } from "../models/schema"; import type { SchemaRegistry } from "../models/schemaRegistry"; +import type { ConsumerGroup } from "../models/consumerGroup"; import type { KafkaTopic } from "../models/topic"; import { showWarningNotificationWithButtons } from "../notifications"; import { getSidecar } from "../sidecar"; @@ -133,6 +134,17 @@ export abstract class ResourceLoader extends DisposableCollection implements IRe forceRefresh?: boolean, ): Promise; + /** + * Return the consumer groups for a given Kafka cluster. + * @param cluster The Kafka cluster to fetch consumer groups for. + * @param forceRefresh If true, will ignore any cached consumer groups and fetch anew. + * @returns An array of consumer groups for the cluster. + */ + public abstract getConsumerGroupsForCluster( + cluster: KafkaCluster, + forceRefresh?: boolean, + ): Promise; + // Schema registry methods. /** From 7df5d3c7168eafaea0c926dfb01fac4375caec36 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Tue, 27 Jan 2026 16:58:30 -0500 Subject: [PATCH 02/29] add get/set methods for consumer group storage --- src/storage/resourceManager.ts | 95 ++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/src/storage/resourceManager.ts b/src/storage/resourceManager.ts index 7780d80a9b..761b9eca5c 100644 --- a/src/storage/resourceManager.ts +++ b/src/storage/resourceManager.ts @@ -7,6 +7,7 @@ import { getExtensionContext } from "../context/extension"; import type { FormConnectionType } from "../directConnections/types"; import { ExtensionContextNotSetError } from "../errors"; import { Logger } from "../logging"; +import { Consumer, ConsumerGroup } from "../models/consumerGroup"; import type { Environment, EnvironmentType } from "../models/environment"; import { getEnvironmentClass } from "../models/environment"; import { FlinkAIAgent } from "../models/flinkAiAgent"; @@ -71,6 +72,7 @@ export enum GeneratedKeyResourceType { SCHEMA_REGISTRIES = "schemaRegistries", TOPICS = "topics", SUBJECTS = "subjects", + CONSUMER_GROUPS = "consumerGroups", FLINK_ARTIFACTS = "flinkArtifacts", } @@ -518,6 +520,99 @@ export class ResourceManager { }); } + /** + * Store this (possibly empty) list of consumer groups for a cluster in workspace state. + * If it is known that a given cluster has no consumer groups, then should call with empty array. + * + * Raises an error if the cluster ID of any consumer group does not match the given cluster's ID. + */ + async setConsumerGroupsForCluster( + cluster: KafkaClusterType, + consumerGroups: ConsumerGroup[], + ): Promise { + // Ensure that all consumer groups have the correct cluster ID. + if (consumerGroups.some((group) => group.clusterId !== cluster.id)) { + logger.warn("Cluster ID mismatch in consumer groups", cluster, consumerGroups); + throw new Error("Cluster ID mismatch in consumer groups"); + } + + // Will be a per-connection-id key for storing consumer groups by cluster ID. + const key = this.generateWorkspaceStorageKey( + cluster.connectionId, + GeneratedKeyResourceType.CONSUMER_GROUPS, + ); + + await this.runWithMutex(key, async () => { + // Get the JSON-stringified map from storage + const consumerGroupsByClusterString: string | undefined = this.workspaceState.get(key); + const consumerGroupsByCluster: Map = consumerGroupsByClusterString + ? stringToMap(consumerGroupsByClusterString) + : new Map(); + + // Set the new consumer groups for the cluster + consumerGroupsByCluster.set(cluster.id, consumerGroups); + + // Now save the updated cluster consumer groups into the proper key'd storage. + await this.workspaceState.update(key, mapToString(consumerGroupsByCluster)); + }); + } + + /** + * Get consumer groups given a cluster, be it local, ccloud, or direct. + * + * @returns ConsumerGroup[] (possibly empty) if known, else undefined + * indicating nothing at all known about this cluster (and should be deep probed). + */ + async getConsumerGroupsForCluster( + cluster: KafkaClusterType, + ): Promise { + const key = this.generateWorkspaceStorageKey( + cluster.connectionId, + GeneratedKeyResourceType.CONSUMER_GROUPS, + ); + + // Get the JSON-stringified map from storage + const consumerGroupsByClusterString: string | undefined = this.workspaceState.get(key); + const consumerGroupsByCluster: Map = consumerGroupsByClusterString + ? stringToMap(consumerGroupsByClusterString) + : new Map(); + + // Will either be undefined or an array of plain json objects since + // just deserialized from storage. + const vanillaJSONConsumerGroups: object[] | undefined = consumerGroupsByCluster.get(cluster.id); + if (vanillaJSONConsumerGroups === undefined) { + return undefined; + } + + // Promote each member to be an instance of ConsumerGroup, return. + // (Empty list will be returned as is, indicating that we know there are + // no consumer groups in this cluster.) + return vanillaJSONConsumerGroups.map((group) => { + const cgObj = group as ConsumerGroup; + // also rehydrate Consumer members before recreating the ConsumerGroup instance + let members: Consumer[] = []; + if (cgObj.members && cgObj.members.length > 0) { + members = cgObj.members.map((member: Consumer | object) => { + if (member instanceof Consumer) { + return member; + } + const memberObj = member as Consumer; + return new Consumer({ + connectionId: memberObj.connectionId, + connectionType: memberObj.connectionType, + environmentId: memberObj.environmentId, + clusterId: memberObj.clusterId, + consumerGroupId: memberObj.consumerGroupId, + consumerId: memberObj.consumerId, + clientId: memberObj.clientId, + instanceId: memberObj.instanceId, + }); + }); + } + return new ConsumerGroup({ ...cgObj, members }); + }); + } + // Flink Artifacts /** From c513dc16cc493a517a171fc26b5721966d4785dd Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Tue, 27 Jan 2026 16:59:50 -0500 Subject: [PATCH 03/29] add new event emitter for consumer group changes --- src/emitters.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/emitters.ts b/src/emitters.ts index 6e03f7f5ec..1ebe35babd 100644 --- a/src/emitters.ts +++ b/src/emitters.ts @@ -163,6 +163,8 @@ export type TopicChangeEvent = { change: EventChangeType; cluster: KafkaCluster; }; - /** Fires when a topic has been created or deleted in a Kafka cluster. */ export const topicChanged = new vscode.EventEmitter(); + +/** Fires when consumer groups for a Kafka cluster have changed (removed or updated, direct addition not supported). */ +export const consumerGroupsChanged = new vscode.EventEmitter(); From cadea74bc39324b419b745bc3ca6486f73718182 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Tue, 27 Jan 2026 17:06:09 -0500 Subject: [PATCH 04/29] add consumer group container and refresh/loading behavior to topics view provider; update tests --- src/viewProviders/topics.test.ts | 11 ++- src/viewProviders/topics.ts | 115 +++++++++++++++++++++++++++++-- 2 files changed, 118 insertions(+), 8 deletions(-) diff --git a/src/viewProviders/topics.test.ts b/src/viewProviders/topics.test.ts index 5eb2566495..1bd1fa126d 100644 --- a/src/viewProviders/topics.test.ts +++ b/src/viewProviders/topics.test.ts @@ -48,6 +48,7 @@ describe("viewProviders/topics.ts", () => { stubbedLoader = getStubbedCCloudResourceLoader(sandbox); stubbedLoader.getTopicsForCluster.resolves([]); + stubbedLoader.getConsumerGroupsForCluster.resolves([]); }); afterEach(() => { @@ -64,7 +65,8 @@ describe("viewProviders/topics.ts", () => { it("no-arg refresh() when focused on a cluster should call onDidChangeTreeData.fire()", async () => { provider.kafkaCluster = TEST_CCLOUD_KAFKA_CLUSTER; await provider.refresh(); - sinon.assert.calledOnce(onDidChangeTreeDataFireStub); + // called once for topics, twice for consumer groups (loading start/end) + sinon.assert.called(onDidChangeTreeDataFireStub); }); it("no-arg refresh() when no cluster is set should call onDidChangeTreeData.fire() once to clear (disconnect scenario)", async () => { @@ -104,7 +106,8 @@ describe("viewProviders/topics.ts", () => { it("onlyIfMatching a kafka cluster when the cluster matches should call onDidChangeTreeData.fire()", async () => { provider.kafkaCluster = TEST_CCLOUD_KAFKA_CLUSTER; await provider.refresh(false, TEST_CCLOUD_KAFKA_CLUSTER); - sinon.assert.calledOnce(onDidChangeTreeDataFireStub); + // called once for topics, twice for consumer groups (loading start/end) + sinon.assert.called(onDidChangeTreeDataFireStub); }); it("onlyIfMatching a contained Kafka topic when the cluster doesn't match should do nothing", async () => { @@ -116,7 +119,8 @@ describe("viewProviders/topics.ts", () => { it("onlyIfMatching a contained Kafka topic when the cluster matches should call onDidChangeTreeData.fire()", async () => { provider.kafkaCluster = TEST_CCLOUD_KAFKA_CLUSTER; await provider.refresh(false, TEST_CCLOUD_KAFKA_TOPIC); - sinon.assert.calledOnce(onDidChangeTreeDataFireStub); + // called once for topics, twice for consumer groups (loading start/end) + sinon.assert.called(onDidChangeTreeDataFireStub); }); }); @@ -565,6 +569,7 @@ describe("viewProviders/topics.ts", () => { ["schemaSubjectChanged", "subjectChangeHandler"], ["schemaVersionsChanged", "subjectChangeHandler"], ["topicChanged", "topicChangedHandler"], + ["consumerGroupsChanged", "consumerGroupsChangedHandler"], ]; it("should return the expected number of listeners", () => { diff --git a/src/viewProviders/topics.ts b/src/viewProviders/topics.ts index 080ad95350..3f9bacfd58 100644 --- a/src/viewProviders/topics.ts +++ b/src/viewProviders/topics.ts @@ -8,6 +8,7 @@ import type { TopicChangeEvent, } from "../emitters"; import { + consumerGroupsChanged, environmentChanged, localKafkaConnected, schemaSubjectChanged, @@ -18,6 +19,13 @@ import { } from "../emitters"; import { ResourceLoader } from "../loaders"; import { TopicFetchError } from "../loaders/utils/loaderUtils"; +import { + Consumer, + ConsumerGroup, + ConsumerGroupContainer, + ConsumerGroupTreeItem, + ConsumerTreeItem, +} from "../models/consumerGroup"; import { KafkaCluster } from "../models/kafkaCluster"; import { isCCloud, isLocal } from "../models/resource"; import { Schema, SchemaTreeItem, Subject, SubjectTreeItem } from "../models/schema"; @@ -28,7 +36,13 @@ import { ParentedBaseViewProvider } from "./baseModels/parentedBase"; * The types managed by the {@link TopicViewProvider}, which are converted to their appropriate tree item * type via the {@link TopicViewProvider provider's} {@linkcode TopicViewProvider.getTreeItem() .getTreeItem()} method. */ -type TopicViewProviderData = KafkaTopic | Subject | Schema; +type TopicViewProviderData = + | ConsumerGroupContainer + | ConsumerGroup + | Consumer + | KafkaTopic + | Subject + | Schema; /** * Provider for the "Topics" view resources. @@ -55,10 +69,17 @@ export class TopicViewProvider extends ParentedBaseViewProvider< /** Map of subject name -> parent {@link KafkaTopic} for easy parent lookup. */ private subjectToTopicMap: Map = new Map(); + /** Container for consumer groups in this cluster. */ + private consumerGroupsContainer: ConsumerGroupContainer | null = null; + /** Map of consumer group ID -> {@link ConsumerGroup} instance currently in the tree view. */ + private consumerGroupsInTreeView: Map = new Map(); + private clearCaches(): void { this.topicsInTreeView.clear(); this.subjectsInTreeView.clear(); this.subjectToTopicMap.clear(); + this.consumerGroupsInTreeView.clear(); + this.consumerGroupsContainer = null; } get kafkaCluster(): KafkaCluster | null { @@ -77,8 +98,22 @@ export class TopicViewProvider extends ParentedBaseViewProvider< let children: TopicViewProviderData[] = []; if (!element) { - // top-level: show topics - children = Array.from(this.topicsInTreeView.values()); + // top-level: show consumer groups container + topics + const topics = Array.from(this.topicsInTreeView.values()); + if (this.consumerGroupsContainer) { + children = [this.consumerGroupsContainer, ...topics]; + } else { + children = topics; + } + } else if (element instanceof ConsumerGroupContainer) { + // expanding the consumer groups container to show consumer groups + children = element.children; + } else if (element instanceof ConsumerGroup) { + // expanding a consumer group to show its members + const cachedGroup = this.consumerGroupsInTreeView.get(element.consumerGroupId); + if (cachedGroup) { + children = cachedGroup.members; + } } else if (element instanceof KafkaTopic) { // expanding a topic to show its subject(s) const cachedTopic: KafkaTopic | undefined = this.topicsInTreeView.get(element.name); @@ -107,7 +142,13 @@ export class TopicViewProvider extends ParentedBaseViewProvider< getTreeItem(element: TopicViewProviderData): TreeItem { let treeItem: TreeItem; - if (element instanceof KafkaTopic) { + if (element instanceof ConsumerGroupContainer) { + treeItem = element; + } else if (element instanceof ConsumerGroup) { + treeItem = new ConsumerGroupTreeItem(element); + } else if (element instanceof Consumer) { + treeItem = new ConsumerTreeItem(element); + } else if (element instanceof KafkaTopic) { treeItem = new KafkaTopicTreeItem(element); } else if (element instanceof Subject) { treeItem = new SubjectTreeItem(element); @@ -153,7 +194,18 @@ export class TopicViewProvider extends ParentedBaseViewProvider< const cluster: KafkaCluster = this.kafkaCluster; await this.withProgress("Loading topics...", async () => { - await this.refreshTopics(cluster, forceDeepRefresh); + // set up consumer group container before loading + this.consumerGroupsContainer = new ConsumerGroupContainer( + cluster.connectionId, + cluster.connectionType, + cluster.id, + cluster.environmentId, + ); + + await Promise.all([ + this.refreshTopics(cluster, forceDeepRefresh), + this.refreshConsumerGroups(cluster, forceDeepRefresh), + ]); }); } @@ -183,6 +235,38 @@ export class TopicViewProvider extends ParentedBaseViewProvider< this._onDidChangeTreeData.fire(); } + // similar to the Flink Database provider's refreshResourceContainer method: + /** Fetch and cache consumer groups for the focused cluster. */ + async refreshConsumerGroups( + cluster: KafkaCluster, + forceDeepRefresh: boolean = false, + ): Promise { + if (!this.consumerGroupsContainer) { + return; + } + // set initial loading state + this.consumerGroupsContainer.isLoading = true; + this._onDidChangeTreeData.fire(this.consumerGroupsContainer); + + const loader = ResourceLoader.getInstance(cluster.connectionId); + try { + const consumerGroups = await loader.getConsumerGroupsForCluster(cluster, forceDeepRefresh); + consumerGroups.forEach((group) => { + this.consumerGroupsInTreeView.set(group.consumerGroupId, group); + }); + this.consumerGroupsContainer.children = consumerGroups; + // loading/error state is cleared + } catch (err) { + this.logger.error("Error fetching consumer groups for cluster", cluster, err); + // clear loading/error states + this.consumerGroupsContainer.hasError = true; + this.consumerGroupsContainer.children = []; + // TODO: show error in tooltip? + } + + this._onDidChangeTreeData.fire(this.consumerGroupsContainer); + } + /** Fetch and cache {@link Schema schemas} for a specific {@link Subject subject}. */ private async updateSubjectSchemas(subject: Subject): Promise { if (!this.kafkaCluster) { @@ -202,6 +286,16 @@ export class TopicViewProvider extends ParentedBaseViewProvider< /** Get the parent of the given element, or `undefined` if it's a root-level item. */ getParent(element: TopicViewProviderData): TopicViewProviderData | undefined { + if (element instanceof ConsumerGroupContainer) { + // root-level item + return; + } + if (element instanceof ConsumerGroup) { + return this.consumerGroupsContainer ?? undefined; + } + if (element instanceof Consumer) { + return this.consumerGroupsInTreeView.get(element.consumerGroupId); + } if (element instanceof KafkaTopic) { // root-level item return; @@ -255,6 +349,7 @@ export class TopicViewProvider extends ParentedBaseViewProvider< schemaSubjectChanged.event(this.subjectChangeHandler.bind(this)), schemaVersionsChanged.event(this.subjectChangeHandler.bind(this)), topicChanged.event(this.topicChangedHandler.bind(this)), + consumerGroupsChanged.event(this.consumerGroupsChangedHandler.bind(this)), ]; } @@ -268,6 +363,16 @@ export class TopicViewProvider extends ParentedBaseViewProvider< } } + async consumerGroupsChangedHandler(cluster: KafkaCluster): Promise { + if (this.kafkaCluster && this.kafkaCluster.equals(cluster)) { + this.logger.debug( + "consumerGroupsChanged event fired for the focused cluster, refreshing consumer groups", + { clusterId: cluster.id }, + ); + await this.refreshConsumerGroups(cluster, true); + } + } + async environmentChangedHandler(envEvent: EnvironmentChangeEvent): Promise { if (this.kafkaCluster && this.kafkaCluster.environmentId === envEvent.id) { if (!envEvent.wasDeleted) { From 801a01ceb1b165e0d1b749fb44e9f26f0101e5b8 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Fri, 6 Feb 2026 21:27:10 -0500 Subject: [PATCH 05/29] cast to enum --- src/loaders/cachingResourceLoader.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/loaders/cachingResourceLoader.ts b/src/loaders/cachingResourceLoader.ts index 3ddca7e31d..51d78c220c 100644 --- a/src/loaders/cachingResourceLoader.ts +++ b/src/loaders/cachingResourceLoader.ts @@ -1,6 +1,7 @@ import type { ConsumerGroupData, TopicData } from "../clients/kafkaRest"; import { Logger } from "../logging"; -import { Consumer, ConsumerGroup, parseConsumerGroupState } from "../models/consumerGroup"; +import type { ConsumerGroupState } from "../models/consumerGroup"; +import { Consumer, ConsumerGroup } from "../models/consumerGroup"; import type { Environment, EnvironmentType } from "../models/environment"; import type { KafkaCluster, KafkaClusterType } from "../models/kafkaCluster"; import type { EnvironmentId } from "../models/resource"; @@ -308,7 +309,7 @@ export abstract class CachingResourceLoader< environmentId: cluster.environmentId, clusterId: cluster.id, consumerGroupId: data.consumer_group_id, - state: parseConsumerGroupState(data.state), + state: data.state as ConsumerGroupState, isSimple: data.is_simple, partitionAssignor: data.partition_assignor, coordinatorId: data.coordinator?.related ? parseInt(data.coordinator.related, 10) : null, From cd4c05cb30a6dbfc961c075da10c6da07f741a65 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Tue, 24 Feb 2026 16:51:41 -0500 Subject: [PATCH 06/29] add tests for set/getConsumerGroupsForCluster methods --- src/storage/resourceManager.test.ts | 65 +++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/src/storage/resourceManager.test.ts b/src/storage/resourceManager.test.ts index bfdf839851..ca6daf9c6c 100644 --- a/src/storage/resourceManager.test.ts +++ b/src/storage/resourceManager.test.ts @@ -16,6 +16,10 @@ import { TEST_LOCAL_KAFKA_TOPIC, TEST_LOCAL_SCHEMA_REGISTRY, } from "../../tests/unit/testResources"; +import { + TEST_CCLOUD_CONSUMER_GROUP, + TEST_LOCAL_CONSUMER_GROUP, +} from "../../tests/unit/testResources/consumerGroup"; import { TEST_DIRECT_CONNECTION_FORM_SPEC, TEST_DIRECT_CONNECTION_ID, @@ -34,6 +38,7 @@ import { KafkaClusterConfigToJSON, } from "../clients/sidecar"; import { CCLOUD_CONNECTION_ID, LOCAL_CONNECTION_ID } from "../constants"; +import { Consumer, ConsumerGroup } from "../models/consumerGroup"; import { CCloudEnvironment } from "../models/environment"; import { FlinkAIAgent } from "../models/flinkAiAgent"; import { FlinkAIConnection } from "../models/flinkAiConnection"; @@ -395,6 +400,66 @@ describe("storage/resourceManager", () => { }); }); + describe("ResourceManager setConsumerGroupsForCluster() / getConsumerGroupsForCluster()", function () { + it("setConsumerGroupsForCluster() should throw an error if given mixed cluster IDs", async () => { + const mixedGroups = [TEST_CCLOUD_CONSUMER_GROUP, TEST_LOCAL_CONSUMER_GROUP]; + + await assert.rejects( + getResourceManager().setConsumerGroupsForCluster(TEST_CCLOUD_KAFKA_CLUSTER, mixedGroups), + (err) => { + return ( + err instanceof Error && err.message.includes("Cluster ID mismatch in consumer groups") + ); + }, + "Expected error when setting mixed cluster IDs", + ); + }); + + it("getConsumerGroupsForCluster() should return undefined if no cached consumer groups", async () => { + const groups = + await getResourceManager().getConsumerGroupsForCluster(TEST_CCLOUD_KAFKA_CLUSTER); + assert.deepStrictEqual(groups, undefined); + }); + + it("getConsumerGroupsForCluster() should return empty array if empty array is set", async () => { + const manager = getResourceManager(); + await manager.setConsumerGroupsForCluster(TEST_CCLOUD_KAFKA_CLUSTER, []); + const groups = await manager.getConsumerGroupsForCluster(TEST_CCLOUD_KAFKA_CLUSTER); + assert.deepStrictEqual(groups, []); + }); + + it("should round-trip consumer groups through set/get with correct types", async () => { + const manager = getResourceManager(); + await manager.setConsumerGroupsForCluster(TEST_CCLOUD_KAFKA_CLUSTER, [ + TEST_CCLOUD_CONSUMER_GROUP, + ]); + + const groups = await manager.getConsumerGroupsForCluster(TEST_CCLOUD_KAFKA_CLUSTER); + + assert.ok(groups); + assert.strictEqual(groups.length, 1); + assert.ok(groups[0] instanceof ConsumerGroup); + assert.strictEqual(groups[0].consumerGroupId, TEST_CCLOUD_CONSUMER_GROUP.consumerGroupId); + }); + + it("should rehydrate Consumer members when reading from storage", async () => { + const manager = getResourceManager(); + await manager.setConsumerGroupsForCluster(TEST_CCLOUD_KAFKA_CLUSTER, [ + TEST_CCLOUD_CONSUMER_GROUP, + ]); + + const groups = await manager.getConsumerGroupsForCluster(TEST_CCLOUD_KAFKA_CLUSTER); + + assert.ok(groups); + assert.ok(groups[0].members.length > 0); + assert.ok(groups[0].members[0] instanceof Consumer); + assert.strictEqual( + groups[0].members[0].consumerId, + TEST_CCLOUD_CONSUMER_GROUP.members[0].consumerId, + ); + }); + }); + describe("ResourceManager Schema Registry methods", function () { it("setSchemaRegistries() error when given mixed connection id array", async () => { const ccloudSchemaRegistry = CCloudSchemaRegistry.create(TEST_CCLOUD_SCHEMA_REGISTRY); From e9d12b3411ae02dbad7f53b518d849dc98dfee65 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Tue, 24 Feb 2026 16:51:59 -0500 Subject: [PATCH 07/29] use KafkaClusterResourceContainer for consumer groups and topics --- src/viewProviders/topics.ts | 75 +++++++++++++++++++++++++------------ 1 file changed, 52 insertions(+), 23 deletions(-) diff --git a/src/viewProviders/topics.ts b/src/viewProviders/topics.ts index 3f9bacfd58..f54e525609 100644 --- a/src/viewProviders/topics.ts +++ b/src/viewProviders/topics.ts @@ -1,5 +1,5 @@ import type { Disposable, TreeItem } from "vscode"; -import { window } from "vscode"; +import { ThemeIcon, TreeItemCollapsibleState, window } from "vscode"; import { ContextValues } from "../context/values"; import type { EnvironmentChangeEvent, @@ -17,15 +17,16 @@ import { topicSearchSet, topicsViewResourceChanged, } from "../emitters"; +import { IconNames } from "../icons"; import { ResourceLoader } from "../loaders"; import { TopicFetchError } from "../loaders/utils/loaderUtils"; import { Consumer, ConsumerGroup, - ConsumerGroupContainer, ConsumerGroupTreeItem, ConsumerTreeItem, } from "../models/consumerGroup"; +import { KafkaClusterResourceContainer } from "../models/containers/kafkaClusterResourceContainer"; import { KafkaCluster } from "../models/kafkaCluster"; import { isCCloud, isLocal } from "../models/resource"; import { Schema, SchemaTreeItem, Subject, SubjectTreeItem } from "../models/schema"; @@ -37,7 +38,8 @@ import { ParentedBaseViewProvider } from "./baseModels/parentedBase"; * type via the {@link TopicViewProvider provider's} {@linkcode TopicViewProvider.getTreeItem() .getTreeItem()} method. */ type TopicViewProviderData = - | ConsumerGroupContainer + | KafkaClusterResourceContainer + | KafkaClusterResourceContainer | ConsumerGroup | Consumer | KafkaTopic @@ -62,6 +64,8 @@ export class TopicViewProvider extends ParentedBaseViewProvider< searchContextValue = ContextValues.topicSearchApplied; searchChangedEmitter = topicSearchSet; + /** Container for topics in this cluster (expanded by default). */ + private topicsContainer: KafkaClusterResourceContainer | null = null; /** Map of topic name -> {@link KafkaTopic} instance currently in the tree view. */ private topicsInTreeView: Map = new Map(); /** Map of subject name -> {@link Subject} instance currently in the tree view. */ @@ -70,16 +74,17 @@ export class TopicViewProvider extends ParentedBaseViewProvider< private subjectToTopicMap: Map = new Map(); /** Container for consumer groups in this cluster. */ - private consumerGroupsContainer: ConsumerGroupContainer | null = null; + private consumerGroupsContainer: KafkaClusterResourceContainer | null = null; /** Map of consumer group ID -> {@link ConsumerGroup} instance currently in the tree view. */ private consumerGroupsInTreeView: Map = new Map(); private clearCaches(): void { + this.topicsContainer = null; this.topicsInTreeView.clear(); this.subjectsInTreeView.clear(); this.subjectToTopicMap.clear(); - this.consumerGroupsInTreeView.clear(); this.consumerGroupsContainer = null; + this.consumerGroupsInTreeView.clear(); } get kafkaCluster(): KafkaCluster | null { @@ -98,15 +103,13 @@ export class TopicViewProvider extends ParentedBaseViewProvider< let children: TopicViewProviderData[] = []; if (!element) { - // top-level: show consumer groups container + topics - const topics = Array.from(this.topicsInTreeView.values()); - if (this.consumerGroupsContainer) { - children = [this.consumerGroupsContainer, ...topics]; - } else { - children = topics; - } - } else if (element instanceof ConsumerGroupContainer) { - // expanding the consumer groups container to show consumer groups + // top-level: show consumer groups first, then topics + const containers: TopicViewProviderData[] = []; + if (this.consumerGroupsContainer) containers.push(this.consumerGroupsContainer); + if (this.topicsContainer) containers.push(this.topicsContainer); + children = containers; + } else if (element instanceof KafkaClusterResourceContainer) { + // expanding a container to show its children (topics or consumer groups) children = element.children; } else if (element instanceof ConsumerGroup) { // expanding a consumer group to show its members @@ -142,7 +145,7 @@ export class TopicViewProvider extends ParentedBaseViewProvider< getTreeItem(element: TopicViewProviderData): TreeItem { let treeItem: TreeItem; - if (element instanceof ConsumerGroupContainer) { + if (element instanceof KafkaClusterResourceContainer) { treeItem = element; } else if (element instanceof ConsumerGroup) { treeItem = new ConsumerGroupTreeItem(element); @@ -194,12 +197,28 @@ export class TopicViewProvider extends ParentedBaseViewProvider< const cluster: KafkaCluster = this.kafkaCluster; await this.withProgress("Loading topics...", async () => { - // set up consumer group container before loading - this.consumerGroupsContainer = new ConsumerGroupContainer( + // set up containers before loading + this.topicsContainer = new KafkaClusterResourceContainer( + cluster.connectionId, + cluster.connectionType, + cluster.id, + cluster.environmentId, + "Topics", + [], + "topics-container", + new ThemeIcon(IconNames.TOPIC), + ); + this.topicsContainer.collapsibleState = TreeItemCollapsibleState.Expanded; + + this.consumerGroupsContainer = new KafkaClusterResourceContainer( cluster.connectionId, cluster.connectionType, cluster.id, cluster.environmentId, + "Consumer Groups", + [], + undefined, + new ThemeIcon(IconNames.CONSUMER_GROUP), ); await Promise.all([ @@ -210,11 +229,17 @@ export class TopicViewProvider extends ParentedBaseViewProvider< } async refreshTopics(cluster: KafkaCluster, forceDeepRefresh: boolean): Promise { + if (!this.topicsContainer) { + return; + } + // set initial loading state + this.topicsContainer.isLoading = true; + this._onDidChangeTreeData.fire(this.topicsContainer); + const loader = ResourceLoader.getInstance(cluster.connectionId); try { const topics = await loader.getTopicsForCluster(cluster, forceDeepRefresh); topics.forEach((topic) => { - // update topic and subject caches before firing change event this.topicsInTreeView.set(topic.name, topic); if (topic.children && topic.children.length > 0) { topic.children.forEach((subject) => { @@ -223,8 +248,13 @@ export class TopicViewProvider extends ParentedBaseViewProvider< }); } }); + this.topicsContainer.children = topics; + // loading/error state is cleared by the children setter } catch (err) { this.logger.error("Error fetching topics for cluster", cluster, err); + // signal error state so the container shows an error indicator + this.topicsContainer.hasError = true; + this.topicsContainer.children = []; if (err instanceof TopicFetchError) { window.showErrorMessage( `Failed to list topics for cluster "${cluster.name}": ${err.message}`, @@ -232,7 +262,7 @@ export class TopicViewProvider extends ParentedBaseViewProvider< } } - this._onDidChangeTreeData.fire(); + this._onDidChangeTreeData.fire(this.topicsContainer); } // similar to the Flink Database provider's refreshResourceContainer method: @@ -258,7 +288,7 @@ export class TopicViewProvider extends ParentedBaseViewProvider< // loading/error state is cleared } catch (err) { this.logger.error("Error fetching consumer groups for cluster", cluster, err); - // clear loading/error states + // signal error state so the container shows an error indicator this.consumerGroupsContainer.hasError = true; this.consumerGroupsContainer.children = []; // TODO: show error in tooltip? @@ -286,7 +316,7 @@ export class TopicViewProvider extends ParentedBaseViewProvider< /** Get the parent of the given element, or `undefined` if it's a root-level item. */ getParent(element: TopicViewProviderData): TopicViewProviderData | undefined { - if (element instanceof ConsumerGroupContainer) { + if (element instanceof KafkaClusterResourceContainer) { // root-level item return; } @@ -297,8 +327,7 @@ export class TopicViewProvider extends ParentedBaseViewProvider< return this.consumerGroupsInTreeView.get(element.consumerGroupId); } if (element instanceof KafkaTopic) { - // root-level item - return; + return this.topicsContainer ?? undefined; } if (element instanceof Subject) { return this.subjectToTopicMap.get(element.name); From d3de23462c6f047c555ab6919178b61d9ab7a3c4 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Tue, 24 Feb 2026 16:54:18 -0500 Subject: [PATCH 08/29] update topic view provider tests for consumer group loading and container handling --- src/viewProviders/topics.test.ts | 157 ++++++++++++++++++++++++++++--- 1 file changed, 144 insertions(+), 13 deletions(-) diff --git a/src/viewProviders/topics.test.ts b/src/viewProviders/topics.test.ts index 1bd1fa126d..41a72e95c8 100644 --- a/src/viewProviders/topics.test.ts +++ b/src/viewProviders/topics.test.ts @@ -1,6 +1,6 @@ import * as assert from "assert"; import * as sinon from "sinon"; -import { window } from "vscode"; +import { ThemeIcon, window } from "vscode"; import type { StubbedEventEmitters } from "../../tests/stubs/emitters"; import { eventEmitterStubs } from "../../tests/stubs/emitters"; import { getStubbedCCloudResourceLoader } from "../../tests/stubs/resourceLoaders"; @@ -15,9 +15,13 @@ import { TEST_LOCAL_ENVIRONMENT_ID, TEST_LOCAL_KAFKA_CLUSTER, } from "../../tests/unit/testResources"; +import { TEST_CCLOUD_CONSUMER_GROUP } from "../../tests/unit/testResources/consumerGroup"; import type { EventChangeType, SubjectChangeEvent } from "../emitters"; +import { IconNames } from "../icons"; import type { CCloudResourceLoader } from "../loaders"; import { TopicFetchError } from "../loaders/utils/loaderUtils"; +import type { ConsumerGroup } from "../models/consumerGroup"; +import { KafkaClusterResourceContainer } from "../models/containers/kafkaClusterResourceContainer"; import { SchemaTreeItem, Subject, SubjectTreeItem } from "../models/schema"; import { KafkaTopic, KafkaTopicTreeItem } from "../models/topic"; import { TopicViewProvider } from "./topics"; @@ -65,8 +69,8 @@ describe("viewProviders/topics.ts", () => { it("no-arg refresh() when focused on a cluster should call onDidChangeTreeData.fire()", async () => { provider.kafkaCluster = TEST_CCLOUD_KAFKA_CLUSTER; await provider.refresh(); - // called once for topics, twice for consumer groups (loading start/end) - sinon.assert.called(onDidChangeTreeDataFireStub); + // twice for topics (loading start/end) + twice for consumer groups (loading start/end) + assert.strictEqual(onDidChangeTreeDataFireStub.callCount, 4); }); it("no-arg refresh() when no cluster is set should call onDidChangeTreeData.fire() once to clear (disconnect scenario)", async () => { @@ -106,8 +110,8 @@ describe("viewProviders/topics.ts", () => { it("onlyIfMatching a kafka cluster when the cluster matches should call onDidChangeTreeData.fire()", async () => { provider.kafkaCluster = TEST_CCLOUD_KAFKA_CLUSTER; await provider.refresh(false, TEST_CCLOUD_KAFKA_CLUSTER); - // called once for topics, twice for consumer groups (loading start/end) - sinon.assert.called(onDidChangeTreeDataFireStub); + // twice for topics (loading start/end) + twice for consumer groups (loading start/end) + sinon.assert.callCount(onDidChangeTreeDataFireStub, 4); }); it("onlyIfMatching a contained Kafka topic when the cluster doesn't match should do nothing", async () => { @@ -119,14 +123,25 @@ describe("viewProviders/topics.ts", () => { it("onlyIfMatching a contained Kafka topic when the cluster matches should call onDidChangeTreeData.fire()", async () => { provider.kafkaCluster = TEST_CCLOUD_KAFKA_CLUSTER; await provider.refresh(false, TEST_CCLOUD_KAFKA_TOPIC); - // called once for topics, twice for consumer groups (loading start/end) - sinon.assert.called(onDidChangeTreeDataFireStub); + // twice for topics (loading start/end) + twice for consumer groups (loading start/end) + sinon.assert.callCount(onDidChangeTreeDataFireStub, 4); }); }); describe("refreshTopics()", () => { + let onDidChangeTreeDataFireStub: sinon.SinonStub; + beforeEach(() => { provider.kafkaCluster = TEST_CCLOUD_KAFKA_CLUSTER; + // set up the container like refresh() does + provider["topicsContainer"] = new KafkaClusterResourceContainer( + TEST_CCLOUD_KAFKA_CLUSTER.connectionId, + TEST_CCLOUD_KAFKA_CLUSTER.connectionType, + TEST_CCLOUD_KAFKA_CLUSTER.id, + TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + "Topics", + ); + onDidChangeTreeDataFireStub = sandbox.stub(provider["_onDidChangeTreeData"], "fire"); }); it("should populate topicsInTreeView from loader results", async () => { @@ -141,6 +156,14 @@ describe("viewProviders/topics.ts", () => { ); }); + it("should set container children from loader results", async () => { + stubbedLoader.getTopicsForCluster.resolves([TEST_CCLOUD_KAFKA_TOPIC]); + + await provider.refreshTopics(TEST_CCLOUD_KAFKA_CLUSTER, false); + + assert.deepStrictEqual(provider["topicsContainer"]!.children, [TEST_CCLOUD_KAFKA_TOPIC]); + }); + it("should populate subjectsInTreeView and subjectToTopicMap when topics have associated Subjects", async () => { stubbedLoader.getTopicsForCluster.resolves([testTopicWithSchema]); @@ -154,15 +177,100 @@ describe("viewProviders/topics.ts", () => { ); }); - it("should call showErrorMessage when loader raises a TopicFetchError", async () => { + it("should set hasError on the container when loader raises a TopicFetchError", async () => { const showErrorMessageStub = sandbox.stub(window, "showErrorMessage"); stubbedLoader.getTopicsForCluster.rejects(new TopicFetchError("Test error")); await provider.refreshTopics(TEST_CCLOUD_KAFKA_CLUSTER, false); - assert.strictEqual(provider["topicsInTreeView"].size, 0); + assert.strictEqual(provider["topicsContainer"]!.hasError, true); + assert.deepStrictEqual(provider["topicsContainer"]!.children, []); sinon.assert.calledOnce(showErrorMessageStub); }); + + it("should fire tree data change events for loading start and end", async () => { + stubbedLoader.getTopicsForCluster.resolves([]); + + await provider.refreshTopics(TEST_CCLOUD_KAFKA_CLUSTER, false); + + // fired twice: once for loading start, once for loading end + assert.strictEqual(onDidChangeTreeDataFireStub.callCount, 2); + }); + + it("should return early if topicsContainer is null", async () => { + provider["topicsContainer"] = null; + + await provider.refreshTopics(TEST_CCLOUD_KAFKA_CLUSTER, false); + + sinon.assert.notCalled(stubbedLoader.getTopicsForCluster); + sinon.assert.notCalled(onDidChangeTreeDataFireStub); + }); + }); + + describe("refreshConsumerGroups()", () => { + let onDidChangeTreeDataFireStub: sinon.SinonStub; + + beforeEach(() => { + provider.kafkaCluster = TEST_CCLOUD_KAFKA_CLUSTER; + // set up the container like refresh() does + provider["consumerGroupsContainer"] = new KafkaClusterResourceContainer( + TEST_CCLOUD_KAFKA_CLUSTER.connectionId, + TEST_CCLOUD_KAFKA_CLUSTER.connectionType, + TEST_CCLOUD_KAFKA_CLUSTER.id, + TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + "Consumer Groups", + ); + onDidChangeTreeDataFireStub = sandbox.stub(provider["_onDidChangeTreeData"], "fire"); + }); + + it("should populate consumerGroupsInTreeView from loader results", async () => { + stubbedLoader.getConsumerGroupsForCluster.resolves([TEST_CCLOUD_CONSUMER_GROUP]); + + await provider.refreshConsumerGroups(TEST_CCLOUD_KAFKA_CLUSTER, false); + + assert.strictEqual(provider["consumerGroupsInTreeView"].size, 1); + assert.deepStrictEqual( + provider["consumerGroupsInTreeView"].get(TEST_CCLOUD_CONSUMER_GROUP.consumerGroupId), + TEST_CCLOUD_CONSUMER_GROUP, + ); + }); + + it("should set container children from loader results", async () => { + stubbedLoader.getConsumerGroupsForCluster.resolves([TEST_CCLOUD_CONSUMER_GROUP]); + + await provider.refreshConsumerGroups(TEST_CCLOUD_KAFKA_CLUSTER, false); + + assert.deepStrictEqual(provider["consumerGroupsContainer"]!.children, [ + TEST_CCLOUD_CONSUMER_GROUP, + ]); + }); + + it("should set hasError on the container when loader throws", async () => { + stubbedLoader.getConsumerGroupsForCluster.rejects(new Error("API error")); + + await provider.refreshConsumerGroups(TEST_CCLOUD_KAFKA_CLUSTER, false); + + assert.strictEqual(provider["consumerGroupsContainer"]!.hasError, true); + assert.deepStrictEqual(provider["consumerGroupsContainer"]!.children, []); + }); + + it("should fire tree data change events for loading start and end", async () => { + stubbedLoader.getConsumerGroupsForCluster.resolves([]); + + await provider.refreshConsumerGroups(TEST_CCLOUD_KAFKA_CLUSTER, false); + + // fired twice: once for loading start, once for loading end + assert.strictEqual(onDidChangeTreeDataFireStub.callCount, 2); + }); + + it("should return early if consumerGroupsContainer is null", async () => { + provider["consumerGroupsContainer"] = null; + + await provider.refreshConsumerGroups(TEST_CCLOUD_KAFKA_CLUSTER, false); + + sinon.assert.notCalled(stubbedLoader.getConsumerGroupsForCluster); + sinon.assert.notCalled(onDidChangeTreeDataFireStub); + }); }); describe("getTreeItem()", () => { @@ -213,13 +321,36 @@ describe("viewProviders/topics.ts", () => { assert.strictEqual(children.length, 0); }); - it("should return topics from topicsInTreeView at the root level", () => { - provider["topicsInTreeView"].set(TEST_CCLOUD_KAFKA_TOPIC.name, TEST_CCLOUD_KAFKA_TOPIC); + it("should return containers at the root level when both are set", () => { + provider["topicsContainer"] = new KafkaClusterResourceContainer( + TEST_CCLOUD_KAFKA_CLUSTER.connectionId, + TEST_CCLOUD_KAFKA_CLUSTER.connectionType, + TEST_CCLOUD_KAFKA_CLUSTER.id, + TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + "Topics", + [], + "topics-container", + new ThemeIcon(IconNames.TOPIC), + ); + provider["consumerGroupsContainer"] = new KafkaClusterResourceContainer( + TEST_CCLOUD_KAFKA_CLUSTER.connectionId, + TEST_CCLOUD_KAFKA_CLUSTER.connectionType, + TEST_CCLOUD_KAFKA_CLUSTER.id, + TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + "Consumer Groups", + [], + undefined, + new ThemeIcon(IconNames.CONSUMER_GROUP), + ); const children = provider.getChildren(); - assert.strictEqual(children.length, 1); - assert.deepStrictEqual(children[0], TEST_CCLOUD_KAFKA_TOPIC); + assert.strictEqual(children.length, 2); + // consumer groups container should be first, topics second + assert.ok(children[0] instanceof KafkaClusterResourceContainer); + assert.strictEqual(children[0].label, "Consumer Groups"); + assert.ok(children[1] instanceof KafkaClusterResourceContainer); + assert.strictEqual(children[1].label, "Topics"); }); it("should return subjects from topic.children when expanding a topic", () => { From 883f3054e11d4bbda1310d5975f65598ffb19f64 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Tue, 24 Feb 2026 16:54:51 -0500 Subject: [PATCH 09/29] move Create Topic command from Topics view/title to view/item/context for container --- package.json | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/package.json b/package.json index 4fcd89da34..50adedd131 100644 --- a/package.json +++ b/package.json @@ -1846,11 +1846,6 @@ "when": "view == confluent-topics && confluent.kafkaClusterSelected && confluent.topicSearchApplied", "group": "navigation@1" }, - { - "command": "confluent.topics.create", - "when": "view == confluent-topics && confluent.kafkaClusterSelected", - "group": "navigation@2" - }, { "command": "confluent.topics.kafka-cluster.select", "when": "view == confluent-topics && (confluent.ccloudConnectionAvailable || confluent.localKafkaClusterAvailable || confluent.directKafkaClusterAvailable)", @@ -2008,6 +2003,11 @@ "when": "viewItem == resources-ccloud-container-connected", "group": "inline@2" }, + { + "command": "confluent.topics.create", + "when": "view == confluent-topics && viewItem == topics-container", + "group": "inline@1" + }, { "command": "confluent.flinkdatabase.createRelation", "when": "view == confluent-flink-database && viewItem == flink-database-relations-container", From 5d761ce224df8a4ee14c96929221a88089861e5a Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Tue, 24 Feb 2026 17:25:03 -0500 Subject: [PATCH 10/29] update progress title for accuracy --- src/viewProviders/topics.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/viewProviders/topics.ts b/src/viewProviders/topics.ts index f54e525609..82b7fac3a1 100644 --- a/src/viewProviders/topics.ts +++ b/src/viewProviders/topics.ts @@ -196,7 +196,7 @@ export class TopicViewProvider extends ParentedBaseViewProvider< } const cluster: KafkaCluster = this.kafkaCluster; - await this.withProgress("Loading topics...", async () => { + await this.withProgress("Loading topics and consumer groups...", async () => { // set up containers before loading this.topicsContainer = new KafkaClusterResourceContainer( cluster.connectionId, From 0105622523a7cb7a20c5221b7d849379841f4173 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Thu, 26 Feb 2026 11:59:18 -0500 Subject: [PATCH 11/29] remove stale comment --- src/viewProviders/topics.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/viewProviders/topics.ts b/src/viewProviders/topics.ts index 82b7fac3a1..b86288ef4e 100644 --- a/src/viewProviders/topics.ts +++ b/src/viewProviders/topics.ts @@ -265,7 +265,6 @@ export class TopicViewProvider extends ParentedBaseViewProvider< this._onDidChangeTreeData.fire(this.topicsContainer); } - // similar to the Flink Database provider's refreshResourceContainer method: /** Fetch and cache consumer groups for the focused cluster. */ async refreshConsumerGroups( cluster: KafkaCluster, From 6db25cdb2828c5dccc726f33c05d3cb4d930ac08 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Thu, 26 Feb 2026 13:31:25 -0500 Subject: [PATCH 12/29] update TopicsView page object model to use inline command and lower aria-level values --- tests/e2e/objects/views/TopicsView.ts | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/tests/e2e/objects/views/TopicsView.ts b/tests/e2e/objects/views/TopicsView.ts index ec30b578bb..b23acad0d6 100644 --- a/tests/e2e/objects/views/TopicsView.ts +++ b/tests/e2e/objects/views/TopicsView.ts @@ -6,6 +6,7 @@ import { Quickpick } from "../quickInputs/Quickpick"; import { ResourcesView } from "./ResourcesView"; import { SearchableView } from "./View"; import { TopicItem } from "./viewItems/TopicItem"; +import { ViewItem } from "./viewItems/ViewItem"; export enum SelectKafkaCluster { FromResourcesView = "Kafka cluster action from the Resources view", @@ -29,9 +30,10 @@ export class TopicsView extends SearchableView { await this.clickNavAction("Search"); } - /** Click the "Create Topic" nav action in the view title area. */ + /** Click the "Create Topic" inline action on the Topics container item. */ async clickCreateTopic(): Promise { - await this.clickNavAction("Create Topic"); + const containerItem = new ViewItem(this.page, this.topicsContainer); + await containerItem.clickInlineAction("Create Topic"); } /** @@ -49,9 +51,14 @@ export class TopicsView extends SearchableView { await expect(this.progressIndicator).toBeHidden(); } - /** Get all (root-level) topic items in the view. */ + /** Get the "Topics" container at the root level of the tree. */ + get topicsContainer(): Locator { + return this.body.locator("[role='treeitem'][aria-level='1']").filter({ hasText: "Topics" }); + } + + /** Get all topic items in the view (nested under the Topics container). */ get topics(): Locator { - return this.body.locator("[role='treeitem'][aria-level='1']"); + return this.body.locator("[role='treeitem'][aria-level='2']"); } /** Get a topic item by its label/name. */ @@ -78,7 +85,7 @@ export class TopicsView extends SearchableView { */ get subjects(): Locator { // we don't use `this.topicsWithSchemas` because these are sibling elements to topics in the DOM - return this.body.locator("[role='treeitem'][aria-level='2']"); + return this.body.locator("[role='treeitem'][aria-level='3']"); } /** @@ -87,7 +94,7 @@ export class TopicsView extends SearchableView { */ get schemaVersions(): Locator { // we don't use `this.subjects` because these are sibling elements to subjects in the DOM - return this.body.locator("[role='treeitem'][aria-level='3']"); + return this.body.locator("[role='treeitem'][aria-level='4']"); } /** @@ -130,8 +137,8 @@ export class TopicsView extends SearchableView { } /** - * Create a new topic using the "Create Topic" nav action in the view title area, filling out the - * required inputs in the subsequent input boxes. + * Create a new topic using the "Create Topic" inline action on the Topics container, filling out + * the required inputs in the subsequent input boxes. * * @param topicName The name of the new topic to create. * @param numPartitions (Optional) The number of partitions for the new topic. If not provided, From b4954675eb795210c6ec5754139ba8db4268e40c Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Thu, 26 Feb 2026 17:23:33 -0500 Subject: [PATCH 13/29] fix container construction --- src/viewProviders/topics.ts | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/viewProviders/topics.ts b/src/viewProviders/topics.ts index b86288ef4e..4846ffb8a6 100644 --- a/src/viewProviders/topics.ts +++ b/src/viewProviders/topics.ts @@ -197,13 +197,11 @@ export class TopicViewProvider extends ParentedBaseViewProvider< const cluster: KafkaCluster = this.kafkaCluster; await this.withProgress("Loading topics and consumer groups...", async () => { - // set up containers before loading + // set up containers with the focused cluster's connection info this.topicsContainer = new KafkaClusterResourceContainer( cluster.connectionId, cluster.connectionType, - cluster.id, - cluster.environmentId, - "Topics", + KafkaClusterContainerLabel.TOPICS, [], "topics-container", new ThemeIcon(IconNames.TOPIC), @@ -213,11 +211,9 @@ export class TopicViewProvider extends ParentedBaseViewProvider< this.consumerGroupsContainer = new KafkaClusterResourceContainer( cluster.connectionId, cluster.connectionType, - cluster.id, - cluster.environmentId, - "Consumer Groups", + KafkaClusterContainerLabel.CONSUMER_GROUPS, [], - undefined, + undefined, // no context value for now since no commands are needed yet for this container new ThemeIcon(IconNames.CONSUMER_GROUP), ); From fecf8cab74b82e8afcc23b87b65a6152f2659d5a Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Fri, 27 Feb 2026 09:39:11 -0500 Subject: [PATCH 14/29] use updated state methods; clear provider cache when refreshing containers --- src/viewProviders/topics.ts | 40 +++++++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/src/viewProviders/topics.ts b/src/viewProviders/topics.ts index 4846ffb8a6..03e6e0a25f 100644 --- a/src/viewProviders/topics.ts +++ b/src/viewProviders/topics.ts @@ -28,6 +28,7 @@ import { } from "../models/consumerGroup"; import { KafkaClusterResourceContainer } from "../models/containers/kafkaClusterResourceContainer"; import { KafkaCluster } from "../models/kafkaCluster"; +import { CustomMarkdownString } from "../models/main"; import { isCCloud, isLocal } from "../models/resource"; import { Schema, SchemaTreeItem, Subject, SubjectTreeItem } from "../models/schema"; import { KafkaTopic, KafkaTopicTreeItem } from "../models/topic"; @@ -228,10 +229,14 @@ export class TopicViewProvider extends ParentedBaseViewProvider< if (!this.topicsContainer) { return; } - // set initial loading state - this.topicsContainer.isLoading = true; + this.topicsContainer.setLoading(); this._onDidChangeTreeData.fire(this.topicsContainer); + // clear stale entries before repopulating + this.topicsInTreeView.clear(); + this.subjectsInTreeView.clear(); + this.subjectToTopicMap.clear(); + const loader = ResourceLoader.getInstance(cluster.connectionId); try { const topics = await loader.getTopicsForCluster(cluster, forceDeepRefresh); @@ -244,13 +249,15 @@ export class TopicViewProvider extends ParentedBaseViewProvider< }); } }); - this.topicsContainer.children = topics; - // loading/error state is cleared by the children setter + this.topicsContainer.setLoaded(topics); } catch (err) { this.logger.error("Error fetching topics for cluster", cluster, err); - // signal error state so the container shows an error indicator - this.topicsContainer.hasError = true; - this.topicsContainer.children = []; + const message = err instanceof Error ? err.message : String(err); + this.topicsContainer.setError( + new CustomMarkdownString() + .addWarning(`Failed to load topics for **${cluster.name}**:`) + .addCodeBlock(message), + ); if (err instanceof TopicFetchError) { window.showErrorMessage( `Failed to list topics for cluster "${cluster.name}": ${err.message}`, @@ -269,24 +276,27 @@ export class TopicViewProvider extends ParentedBaseViewProvider< if (!this.consumerGroupsContainer) { return; } - // set initial loading state - this.consumerGroupsContainer.isLoading = true; + this.consumerGroupsContainer.setLoading(); this._onDidChangeTreeData.fire(this.consumerGroupsContainer); + // clear stale entries before repopulating + this.consumerGroupsInTreeView.clear(); + const loader = ResourceLoader.getInstance(cluster.connectionId); try { const consumerGroups = await loader.getConsumerGroupsForCluster(cluster, forceDeepRefresh); consumerGroups.forEach((group) => { this.consumerGroupsInTreeView.set(group.consumerGroupId, group); }); - this.consumerGroupsContainer.children = consumerGroups; - // loading/error state is cleared + this.consumerGroupsContainer.setLoaded(consumerGroups); } catch (err) { this.logger.error("Error fetching consumer groups for cluster", cluster, err); - // signal error state so the container shows an error indicator - this.consumerGroupsContainer.hasError = true; - this.consumerGroupsContainer.children = []; - // TODO: show error in tooltip? + const message = err instanceof Error ? err.message : String(err); + this.consumerGroupsContainer.setError( + new CustomMarkdownString() + .addWarning(`Failed to load consumer groups for **${cluster.name}**:`) + .addCodeBlock(message), + ); } this._onDidChangeTreeData.fire(this.consumerGroupsContainer); From 2717baf0f9b813fce74c50f74efd3dbcf79d12fd Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Fri, 27 Feb 2026 09:39:43 -0500 Subject: [PATCH 15/29] update reveal() logic based on new containers and resource types --- src/viewProviders/topics.ts | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/src/viewProviders/topics.ts b/src/viewProviders/topics.ts index 03e6e0a25f..5aa88e7107 100644 --- a/src/viewProviders/topics.ts +++ b/src/viewProviders/topics.ts @@ -26,7 +26,10 @@ import { ConsumerGroupTreeItem, ConsumerTreeItem, } from "../models/consumerGroup"; -import { KafkaClusterResourceContainer } from "../models/containers/kafkaClusterResourceContainer"; +import { + KafkaClusterContainerLabel, + KafkaClusterResourceContainer, +} from "../models/containers/kafkaClusterResourceContainer"; import { KafkaCluster } from "../models/kafkaCluster"; import { CustomMarkdownString } from "../models/main"; import { isCCloud, isLocal } from "../models/resource"; @@ -348,10 +351,24 @@ export class TopicViewProvider extends ParentedBaseViewProvider< options?: { select?: boolean; focus?: boolean }, ): Promise { // callers likely won't have the exact instance in the provider's cache(s), so we need to - // find the instance (originally returned by getChildren()) by name + // find the instance (originally returned by getChildren()) by name/id let itemToReveal: TopicViewProviderData | undefined; - if (item instanceof KafkaTopic) { + if (item instanceof KafkaClusterResourceContainer) { + // match by tree item id against the known container instances + if (this.topicsContainer?.id === item.id) { + itemToReveal = this.topicsContainer; + } else if (this.consumerGroupsContainer?.id === item.id) { + itemToReveal = this.consumerGroupsContainer; + } + } else if (item instanceof ConsumerGroup) { + itemToReveal = this.consumerGroupsInTreeView.get(item.consumerGroupId); + } else if (item instanceof Consumer) { + const parentGroup = this.consumerGroupsInTreeView.get(item.consumerGroupId); + if (parentGroup) { + itemToReveal = parentGroup.members.find((member) => member.consumerId === item.consumerId); + } + } else if (item instanceof KafkaTopic) { itemToReveal = this.topicsInTreeView.get(item.name); } else if (item instanceof Subject) { itemToReveal = this.subjectsInTreeView.get(item.name); From 001570a58319be900ab5d9c8fab078f68f1ae7a9 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Fri, 27 Feb 2026 09:40:46 -0500 Subject: [PATCH 16/29] update topic view provider tests for cache clearing and container constructor changes --- src/viewProviders/topics.test.ts | 141 ++++++++++++++++++++++++++----- 1 file changed, 120 insertions(+), 21 deletions(-) diff --git a/src/viewProviders/topics.test.ts b/src/viewProviders/topics.test.ts index 41a72e95c8..870c31c6b6 100644 --- a/src/viewProviders/topics.test.ts +++ b/src/viewProviders/topics.test.ts @@ -1,6 +1,6 @@ import * as assert from "assert"; import * as sinon from "sinon"; -import { ThemeIcon, window } from "vscode"; +import { window } from "vscode"; import type { StubbedEventEmitters } from "../../tests/stubs/emitters"; import { eventEmitterStubs } from "../../tests/stubs/emitters"; import { getStubbedCCloudResourceLoader } from "../../tests/stubs/resourceLoaders"; @@ -15,12 +15,18 @@ import { TEST_LOCAL_ENVIRONMENT_ID, TEST_LOCAL_KAFKA_CLUSTER, } from "../../tests/unit/testResources"; -import { TEST_CCLOUD_CONSUMER_GROUP } from "../../tests/unit/testResources/consumerGroup"; +import { + createConsumerGroup, + TEST_CCLOUD_CONSUMER, + TEST_CCLOUD_CONSUMER_GROUP, +} from "../../tests/unit/testResources/consumerGroup"; import type { EventChangeType, SubjectChangeEvent } from "../emitters"; -import { IconNames } from "../icons"; import type { CCloudResourceLoader } from "../loaders"; import { TopicFetchError } from "../loaders/utils/loaderUtils"; -import type { ConsumerGroup } from "../models/consumerGroup"; +import { ConnectionType } from "../clients/sidecar"; +import { CCLOUD_CONNECTION_ID } from "../constants"; +import { Consumer, type ConsumerGroup } from "../models/consumerGroup"; +import type { CustomMarkdownString } from "../models/main"; import { KafkaClusterResourceContainer } from "../models/containers/kafkaClusterResourceContainer"; import { SchemaTreeItem, Subject, SubjectTreeItem } from "../models/schema"; import { KafkaTopic, KafkaTopicTreeItem } from "../models/topic"; @@ -137,8 +143,6 @@ describe("viewProviders/topics.ts", () => { provider["topicsContainer"] = new KafkaClusterResourceContainer( TEST_CCLOUD_KAFKA_CLUSTER.connectionId, TEST_CCLOUD_KAFKA_CLUSTER.connectionType, - TEST_CCLOUD_KAFKA_CLUSTER.id, - TEST_CCLOUD_KAFKA_CLUSTER.environmentId, "Topics", ); onDidChangeTreeDataFireStub = sandbox.stub(provider["_onDidChangeTreeData"], "fire"); @@ -185,6 +189,12 @@ describe("viewProviders/topics.ts", () => { assert.strictEqual(provider["topicsContainer"]!.hasError, true); assert.deepStrictEqual(provider["topicsContainer"]!.children, []); + assert.ok( + (provider["topicsContainer"]!.tooltip as CustomMarkdownString).value.includes( + "Test error", + ), + "tooltip should include the error message", + ); sinon.assert.calledOnce(showErrorMessageStub); }); @@ -205,6 +215,19 @@ describe("viewProviders/topics.ts", () => { sinon.assert.notCalled(stubbedLoader.getTopicsForCluster); sinon.assert.notCalled(onDidChangeTreeDataFireStub); }); + + it("should clear stale topic entries before repopulating", async () => { + // pre-populate a stale entry that won't be returned by the loader + const staleTopic = new KafkaTopic({ ...TEST_CCLOUD_KAFKA_TOPIC, name: "stale-topic" }); + provider["topicsInTreeView"].set(staleTopic.name, staleTopic); + stubbedLoader.getTopicsForCluster.resolves([TEST_CCLOUD_KAFKA_TOPIC]); + + await provider.refreshTopics(TEST_CCLOUD_KAFKA_CLUSTER, false); + + assert.strictEqual(provider["topicsInTreeView"].has("stale-topic"), false); + assert.strictEqual(provider["topicsInTreeView"].has(TEST_CCLOUD_KAFKA_TOPIC.name), true); + assert.strictEqual(provider["topicsInTreeView"].size, 1); + }); }); describe("refreshConsumerGroups()", () => { @@ -216,8 +239,6 @@ describe("viewProviders/topics.ts", () => { provider["consumerGroupsContainer"] = new KafkaClusterResourceContainer( TEST_CCLOUD_KAFKA_CLUSTER.connectionId, TEST_CCLOUD_KAFKA_CLUSTER.connectionType, - TEST_CCLOUD_KAFKA_CLUSTER.id, - TEST_CCLOUD_KAFKA_CLUSTER.environmentId, "Consumer Groups", ); onDidChangeTreeDataFireStub = sandbox.stub(provider["_onDidChangeTreeData"], "fire"); @@ -252,6 +273,12 @@ describe("viewProviders/topics.ts", () => { assert.strictEqual(provider["consumerGroupsContainer"]!.hasError, true); assert.deepStrictEqual(provider["consumerGroupsContainer"]!.children, []); + assert.ok( + (provider["consumerGroupsContainer"]!.tooltip as CustomMarkdownString).value.includes( + "API error", + ), + "tooltip should include the error message", + ); }); it("should fire tree data change events for loading start and end", async () => { @@ -271,6 +298,28 @@ describe("viewProviders/topics.ts", () => { sinon.assert.notCalled(stubbedLoader.getConsumerGroupsForCluster); sinon.assert.notCalled(onDidChangeTreeDataFireStub); }); + + it("should clear stale consumer group entries before repopulating", async () => { + // pre-populate a stale entry that won't be returned by the loader + const staleGroup = createConsumerGroup({ + connectionId: CCLOUD_CONNECTION_ID, + connectionType: ConnectionType.Ccloud, + environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id, + consumerGroupId: "stale-group", + }); + provider["consumerGroupsInTreeView"].set(staleGroup.consumerGroupId, staleGroup); + stubbedLoader.getConsumerGroupsForCluster.resolves([TEST_CCLOUD_CONSUMER_GROUP]); + + await provider.refreshConsumerGroups(TEST_CCLOUD_KAFKA_CLUSTER, false); + + assert.strictEqual(provider["consumerGroupsInTreeView"].has("stale-group"), false); + assert.strictEqual( + provider["consumerGroupsInTreeView"].has(TEST_CCLOUD_CONSUMER_GROUP.consumerGroupId), + true, + ); + assert.strictEqual(provider["consumerGroupsInTreeView"].size, 1); + }); }); describe("getTreeItem()", () => { @@ -321,26 +370,16 @@ describe("viewProviders/topics.ts", () => { assert.strictEqual(children.length, 0); }); - it("should return containers at the root level when both are set", () => { + it("should return both containers at the root level when both are set", () => { provider["topicsContainer"] = new KafkaClusterResourceContainer( TEST_CCLOUD_KAFKA_CLUSTER.connectionId, TEST_CCLOUD_KAFKA_CLUSTER.connectionType, - TEST_CCLOUD_KAFKA_CLUSTER.id, - TEST_CCLOUD_KAFKA_CLUSTER.environmentId, "Topics", - [], - "topics-container", - new ThemeIcon(IconNames.TOPIC), ); provider["consumerGroupsContainer"] = new KafkaClusterResourceContainer( TEST_CCLOUD_KAFKA_CLUSTER.connectionId, TEST_CCLOUD_KAFKA_CLUSTER.connectionType, - TEST_CCLOUD_KAFKA_CLUSTER.id, - TEST_CCLOUD_KAFKA_CLUSTER.environmentId, "Consumer Groups", - [], - undefined, - new ThemeIcon(IconNames.CONSUMER_GROUP), ); const children = provider.getChildren(); @@ -394,10 +433,16 @@ describe("viewProviders/topics.ts", () => { }); describe("getParent()", () => { - it("should return undefined for a KafkaTopic (root-level item)", () => { + it("should return the topics container for a KafkaTopic", () => { + provider["topicsContainer"] = new KafkaClusterResourceContainer( + TEST_CCLOUD_KAFKA_CLUSTER.connectionId, + TEST_CCLOUD_KAFKA_CLUSTER.connectionType, + "Topics", + ); + const parent = provider.getParent(TEST_CCLOUD_KAFKA_TOPIC); - assert.strictEqual(parent, undefined); + assert.strictEqual(parent, provider["topicsContainer"]); }); it("should return the parent topic for a Subject", () => { @@ -523,6 +568,60 @@ describe("viewProviders/topics.ts", () => { sinon.assert.notCalled(treeViewRevealStub); }); + + it("should reveal a KafkaClusterResourceContainer (topics container)", async () => { + // set up the container so reveal can match against it + provider["topicsContainer"] = new KafkaClusterResourceContainer( + TEST_CCLOUD_KAFKA_CLUSTER.connectionId, + TEST_CCLOUD_KAFKA_CLUSTER.connectionType, + "Topics", + ); + // pass a different instance with the same id to verify lookup by id + const lookupContainer = new KafkaClusterResourceContainer( + CCLOUD_CONNECTION_ID, + ConnectionType.Ccloud, + "Topics", + ); + + await provider.reveal(lookupContainer, { select: true }); + + sinon.assert.calledOnceWithExactly(treeViewRevealStub, provider["topicsContainer"], { + select: true, + }); + }); + + it("should reveal a ConsumerGroup from the cache", async () => { + provider["consumerGroupsInTreeView"].set( + TEST_CCLOUD_CONSUMER_GROUP.consumerGroupId, + TEST_CCLOUD_CONSUMER_GROUP, + ); + + await provider.reveal(TEST_CCLOUD_CONSUMER_GROUP, { select: true }); + + sinon.assert.calledOnceWithExactly(treeViewRevealStub, TEST_CCLOUD_CONSUMER_GROUP, { + select: true, + }); + }); + + it("should reveal a Consumer within its parent ConsumerGroup", async () => { + provider["consumerGroupsInTreeView"].set( + TEST_CCLOUD_CONSUMER_GROUP.consumerGroupId, + TEST_CCLOUD_CONSUMER_GROUP, + ); + + await provider.reveal(TEST_CCLOUD_CONSUMER, { focus: true }); + + sinon.assert.calledOnce(treeViewRevealStub); + const revealedItem = treeViewRevealStub.firstCall.args[0]; + assert.ok(revealedItem instanceof Consumer); + assert.strictEqual(revealedItem.consumerId, TEST_CCLOUD_CONSUMER.consumerId); + }); + + it("should not reveal a ConsumerGroup when not in the cache", async () => { + await provider.reveal(TEST_CCLOUD_CONSUMER_GROUP); + + sinon.assert.notCalled(treeViewRevealStub); + }); }); describe("event handlers", () => { From 5d528992afac81428f199df9582865a7e346711d Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Fri, 27 Feb 2026 10:52:36 -0500 Subject: [PATCH 17/29] add tests for consumerGroupsChangedHandler() --- src/viewProviders/topics.test.ts | 33 ++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/src/viewProviders/topics.test.ts b/src/viewProviders/topics.test.ts index 870c31c6b6..c4561eb034 100644 --- a/src/viewProviders/topics.test.ts +++ b/src/viewProviders/topics.test.ts @@ -781,6 +781,39 @@ describe("viewProviders/topics.ts", () => { }); } }); + + describe("consumerGroupsChangedHandler", () => { + let refreshConsumerGroupsStub: sinon.SinonStub; + + beforeEach(() => { + refreshConsumerGroupsStub = sandbox.stub(provider, "refreshConsumerGroups"); + }); + + it("should do nothing when no cluster is focused", async () => { + provider.kafkaCluster = null; + + await provider.consumerGroupsChangedHandler(TEST_CCLOUD_KAFKA_CLUSTER); + + sinon.assert.notCalled(refreshConsumerGroupsStub); + }); + + it("should do nothing when the event cluster does not match the focused cluster", async () => { + provider.kafkaCluster = TEST_LOCAL_KAFKA_CLUSTER; + + await provider.consumerGroupsChangedHandler(TEST_CCLOUD_KAFKA_CLUSTER); + + sinon.assert.notCalled(refreshConsumerGroupsStub); + }); + + it("should call refreshConsumerGroups when the event cluster matches the focused cluster", async () => { + provider.kafkaCluster = TEST_CCLOUD_KAFKA_CLUSTER; + + await provider.consumerGroupsChangedHandler(TEST_CCLOUD_KAFKA_CLUSTER); + + sinon.assert.calledOnce(refreshConsumerGroupsStub); + sinon.assert.calledWith(refreshConsumerGroupsStub, TEST_CCLOUD_KAFKA_CLUSTER, true); + }); + }); }); describe("setCustomEventListeners()", () => { From 9792f403c61b65f0163989931288cde4aa7ee7bb Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Fri, 27 Feb 2026 10:52:43 -0500 Subject: [PATCH 18/29] add changelog entry --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index cb78e2c6cf..6351ff470b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ All notable changes to this extension will be documented in this file. ## Unreleased +### Added + +- Consumer groups and their members are now visible in the Topics view under a collapsible "Consumer + Groups" container - Direct connections form now supports OAuth authentication for WarpStream connections. ## 2.2.2 From 6303222ef2f730779f9b582e2713b4ddb86e1e51 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Fri, 27 Feb 2026 11:52:36 -0500 Subject: [PATCH 19/29] use Promise.allSettled to prevent any failed container loading doesn't prevent other resources from being refreshed --- src/viewProviders/topics.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/viewProviders/topics.ts b/src/viewProviders/topics.ts index 5aa88e7107..5796389b03 100644 --- a/src/viewProviders/topics.ts +++ b/src/viewProviders/topics.ts @@ -221,7 +221,7 @@ export class TopicViewProvider extends ParentedBaseViewProvider< new ThemeIcon(IconNames.CONSUMER_GROUP), ); - await Promise.all([ + await Promise.allSettled([ this.refreshTopics(cluster, forceDeepRefresh), this.refreshConsumerGroups(cluster, forceDeepRefresh), ]); From db7ddae5f25f5d9c15d4a4ae5da63274a341a7bb Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Fri, 27 Feb 2026 11:54:50 -0500 Subject: [PATCH 20/29] push coordinator URL parsing logic into loaderUtils.ts and add tests --- src/loaders/cachingResourceLoader.ts | 4 ++-- src/loaders/utils/loaderUtils.test.ts | 34 +++++++++++++++++++++++++++ src/loaders/utils/loaderUtils.ts | 10 ++++++++ 3 files changed, 46 insertions(+), 2 deletions(-) diff --git a/src/loaders/cachingResourceLoader.ts b/src/loaders/cachingResourceLoader.ts index 51d78c220c..be5f073308 100644 --- a/src/loaders/cachingResourceLoader.ts +++ b/src/loaders/cachingResourceLoader.ts @@ -15,6 +15,7 @@ import { fetchConsumerGroupMembers, fetchConsumerGroups, fetchTopics, + parseCoordinatorId, } from "./utils/loaderUtils"; const logger = new Logger("cachingResourceLoader"); @@ -287,7 +288,6 @@ export abstract class CachingResourceLoader< // Convert API response to ConsumerGroup models, fetching members for each group. const consumerGroups: ConsumerGroup[] = await Promise.all( responseConsumerGroups.map(async (data) => { - // Fetch members for this consumer group const memberData = await fetchConsumerGroupMembers(cluster, data.consumer_group_id); const members: Consumer[] = memberData.map( (m) => @@ -312,7 +312,7 @@ export abstract class CachingResourceLoader< state: data.state as ConsumerGroupState, isSimple: data.is_simple, partitionAssignor: data.partition_assignor, - coordinatorId: data.coordinator?.related ? parseInt(data.coordinator.related, 10) : null, + coordinatorId: parseCoordinatorId(data.coordinator?.related), members, }); }), diff --git a/src/loaders/utils/loaderUtils.test.ts b/src/loaders/utils/loaderUtils.test.ts index 21b7465c55..66ac33cedd 100644 --- a/src/loaders/utils/loaderUtils.test.ts +++ b/src/loaders/utils/loaderUtils.test.ts @@ -477,6 +477,40 @@ describe("loaderUtils.ts", () => { }); }); + describe("parseCoordinatorId()", () => { + it("should parse broker ID from a full Kafka REST URL", () => { + const url = "http://localhost:26636/kafka/v3/clusters/lkc-abc123/brokers/2"; + assert.strictEqual(loaderUtils.parseCoordinatorId(url), 2); + }); + + it("should parse broker ID from a CCloud-style URL", () => { + const url = + "https://pkc-abc123.us-east-1.aws.confluent.cloud/kafka/v3/clusters/lkc-5vmjd8/brokers/0"; + assert.strictEqual(loaderUtils.parseCoordinatorId(url), 0); + }); + + it("should parse a plain numeric string", () => { + assert.strictEqual(loaderUtils.parseCoordinatorId("7"), 7); + }); + + it("should return null for undefined", () => { + assert.strictEqual(loaderUtils.parseCoordinatorId(undefined), null); + }); + + it("should return null for empty string", () => { + assert.strictEqual(loaderUtils.parseCoordinatorId(""), null); + }); + + it("should return null when the last segment is non-numeric", () => { + assert.strictEqual( + loaderUtils.parseCoordinatorId( + "http://localhost/kafka/v3/clusters/lkc-abc123/brokers/notanumber", + ), + null, + ); + }); + }); + describe("generateFlinkStatementKey()", () => { const mainStatementParams: IFlinkStatementSubmitParameters = { statement: "SHOW USER FUNCTIONS", diff --git a/src/loaders/utils/loaderUtils.ts b/src/loaders/utils/loaderUtils.ts index 494fc46a1e..305160f667 100644 --- a/src/loaders/utils/loaderUtils.ts +++ b/src/loaders/utils/loaderUtils.ts @@ -315,3 +315,13 @@ export function generateFlinkStatementKey(params: IFlinkStatementSubmitParameter hasher.update(params.statement); return hasher.digest("hex"); } + +/** Parse a broker ID from a Kafka REST API relationship URL, returning null if non-numeric. + * e.g. "http://localhost:26636/kafka/v3/clusters/lkc-abc123/brokers/1" → 1 */ +export function parseCoordinatorId(related: string | undefined): number | null { + if (!related) return null; + const lastSegment = related.split("/").pop(); + if (!lastSegment) return null; + const parsed = parseInt(lastSegment, 10); + return Number.isNaN(parsed) ? null : parsed; +} From 108a86b04f4d21a48ecf0c0bf8e7cb4a8b6bfa7e Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Fri, 27 Feb 2026 17:36:12 -0500 Subject: [PATCH 21/29] add try/catch for group members instead of allSettled --- src/loaders/cachingResourceLoader.ts | 34 ++++++++++++++++----------- src/loaders/utils/loaderUtils.test.ts | 17 ++++++++++++++ 2 files changed, 37 insertions(+), 14 deletions(-) diff --git a/src/loaders/cachingResourceLoader.ts b/src/loaders/cachingResourceLoader.ts index be5f073308..2920e41ab6 100644 --- a/src/loaders/cachingResourceLoader.ts +++ b/src/loaders/cachingResourceLoader.ts @@ -1,4 +1,5 @@ import type { ConsumerGroupData, TopicData } from "../clients/kafkaRest"; +import { logError } from "../errors"; import { Logger } from "../logging"; import type { ConsumerGroupState } from "../models/consumerGroup"; import { Consumer, ConsumerGroup } from "../models/consumerGroup"; @@ -288,20 +289,25 @@ export abstract class CachingResourceLoader< // Convert API response to ConsumerGroup models, fetching members for each group. const consumerGroups: ConsumerGroup[] = await Promise.all( responseConsumerGroups.map(async (data) => { - const memberData = await fetchConsumerGroupMembers(cluster, data.consumer_group_id); - const members: Consumer[] = memberData.map( - (m) => - new Consumer({ - connectionId: cluster.connectionId, - connectionType: cluster.connectionType, - environmentId: cluster.environmentId, - clusterId: cluster.id, - consumerGroupId: data.consumer_group_id, - consumerId: m.consumer_id, - clientId: m.client_id, - instanceId: m.instance_id ?? null, - }), - ); + let members: Consumer[] = []; + try { + const memberData = await fetchConsumerGroupMembers(cluster, data.consumer_group_id); + members = memberData.map( + (m) => + new Consumer({ + connectionId: cluster.connectionId, + connectionType: cluster.connectionType, + environmentId: cluster.environmentId, + clusterId: cluster.id, + consumerGroupId: data.consumer_group_id, + consumerId: m.consumer_id, + clientId: m.client_id, + instanceId: m.instance_id ?? null, + }), + ); + } catch (error) { + logError(error, `fetching members for consumer group ${data.consumer_group_id}`); + } return new ConsumerGroup({ connectionId: cluster.connectionId, diff --git a/src/loaders/utils/loaderUtils.test.ts b/src/loaders/utils/loaderUtils.test.ts index 66ac33cedd..bdaeac84ed 100644 --- a/src/loaders/utils/loaderUtils.test.ts +++ b/src/loaders/utils/loaderUtils.test.ts @@ -399,6 +399,14 @@ describe("loaderUtils.ts", () => { assert.strictEqual(result.length, 0); }); + + it("should propagate API errors from listKafkaConsumerGroups", async () => { + stubbedClient.listKafkaConsumerGroups.rejects(new Error("Connection refused")); + + await assert.rejects(loaderUtils.fetchConsumerGroups(TEST_LOCAL_KAFKA_CLUSTER), { + message: "Connection refused", + }); + }); }); describe("fetchConsumerGroupMembers()", () => { @@ -475,6 +483,15 @@ describe("loaderUtils.ts", () => { consumer_group_id: testGroupId, }); }); + + it("should propagate API errors from listKafkaConsumers", async () => { + stubbedClient.listKafkaConsumers.rejects(new Error("Connection refused")); + + await assert.rejects( + loaderUtils.fetchConsumerGroupMembers(TEST_LOCAL_KAFKA_CLUSTER, testGroupId), + { message: "Connection refused" }, + ); + }); }); describe("parseCoordinatorId()", () => { From 27c744ff4d633013ade4a1b61cd849f30c6e50d9 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Fri, 27 Feb 2026 18:48:18 -0500 Subject: [PATCH 22/29] remove consumerGroupsChanged event emitter and handler since the extension doesn't support consumer group management --- src/emitters.ts | 4 +--- src/viewProviders/topics.test.ts | 34 -------------------------------- src/viewProviders/topics.ts | 12 ----------- 3 files changed, 1 insertion(+), 49 deletions(-) diff --git a/src/emitters.ts b/src/emitters.ts index 1ebe35babd..6e03f7f5ec 100644 --- a/src/emitters.ts +++ b/src/emitters.ts @@ -163,8 +163,6 @@ export type TopicChangeEvent = { change: EventChangeType; cluster: KafkaCluster; }; + /** Fires when a topic has been created or deleted in a Kafka cluster. */ export const topicChanged = new vscode.EventEmitter(); - -/** Fires when consumer groups for a Kafka cluster have changed (removed or updated, direct addition not supported). */ -export const consumerGroupsChanged = new vscode.EventEmitter(); diff --git a/src/viewProviders/topics.test.ts b/src/viewProviders/topics.test.ts index c4561eb034..401f00b745 100644 --- a/src/viewProviders/topics.test.ts +++ b/src/viewProviders/topics.test.ts @@ -781,39 +781,6 @@ describe("viewProviders/topics.ts", () => { }); } }); - - describe("consumerGroupsChangedHandler", () => { - let refreshConsumerGroupsStub: sinon.SinonStub; - - beforeEach(() => { - refreshConsumerGroupsStub = sandbox.stub(provider, "refreshConsumerGroups"); - }); - - it("should do nothing when no cluster is focused", async () => { - provider.kafkaCluster = null; - - await provider.consumerGroupsChangedHandler(TEST_CCLOUD_KAFKA_CLUSTER); - - sinon.assert.notCalled(refreshConsumerGroupsStub); - }); - - it("should do nothing when the event cluster does not match the focused cluster", async () => { - provider.kafkaCluster = TEST_LOCAL_KAFKA_CLUSTER; - - await provider.consumerGroupsChangedHandler(TEST_CCLOUD_KAFKA_CLUSTER); - - sinon.assert.notCalled(refreshConsumerGroupsStub); - }); - - it("should call refreshConsumerGroups when the event cluster matches the focused cluster", async () => { - provider.kafkaCluster = TEST_CCLOUD_KAFKA_CLUSTER; - - await provider.consumerGroupsChangedHandler(TEST_CCLOUD_KAFKA_CLUSTER); - - sinon.assert.calledOnce(refreshConsumerGroupsStub); - sinon.assert.calledWith(refreshConsumerGroupsStub, TEST_CCLOUD_KAFKA_CLUSTER, true); - }); - }); }); describe("setCustomEventListeners()", () => { @@ -832,7 +799,6 @@ describe("viewProviders/topics.ts", () => { ["schemaSubjectChanged", "subjectChangeHandler"], ["schemaVersionsChanged", "subjectChangeHandler"], ["topicChanged", "topicChangedHandler"], - ["consumerGroupsChanged", "consumerGroupsChangedHandler"], ]; it("should return the expected number of listeners", () => { diff --git a/src/viewProviders/topics.ts b/src/viewProviders/topics.ts index 5796389b03..cb43195d74 100644 --- a/src/viewProviders/topics.ts +++ b/src/viewProviders/topics.ts @@ -8,7 +8,6 @@ import type { TopicChangeEvent, } from "../emitters"; import { - consumerGroupsChanged, environmentChanged, localKafkaConnected, schemaSubjectChanged, @@ -400,7 +399,6 @@ export class TopicViewProvider extends ParentedBaseViewProvider< schemaSubjectChanged.event(this.subjectChangeHandler.bind(this)), schemaVersionsChanged.event(this.subjectChangeHandler.bind(this)), topicChanged.event(this.topicChangedHandler.bind(this)), - consumerGroupsChanged.event(this.consumerGroupsChangedHandler.bind(this)), ]; } @@ -414,16 +412,6 @@ export class TopicViewProvider extends ParentedBaseViewProvider< } } - async consumerGroupsChangedHandler(cluster: KafkaCluster): Promise { - if (this.kafkaCluster && this.kafkaCluster.equals(cluster)) { - this.logger.debug( - "consumerGroupsChanged event fired for the focused cluster, refreshing consumer groups", - { clusterId: cluster.id }, - ); - await this.refreshConsumerGroups(cluster, true); - } - } - async environmentChangedHandler(envEvent: EnvironmentChangeEvent): Promise { if (this.kafkaCluster && this.kafkaCluster.environmentId === envEvent.id) { if (!envEvent.wasDeleted) { From 70f72954722f620ec70fd82638b822e4743c97e9 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Wed, 11 Mar 2026 17:59:18 -0400 Subject: [PATCH 23/29] add tests for ConsumerGroup and Consumer handling within TopicViewProvider --- src/viewProviders/topics.test.ts | 109 ++++++++++++++++++++++++++++++- 1 file changed, 108 insertions(+), 1 deletion(-) diff --git a/src/viewProviders/topics.test.ts b/src/viewProviders/topics.test.ts index 401f00b745..72249d7b34 100644 --- a/src/viewProviders/topics.test.ts +++ b/src/viewProviders/topics.test.ts @@ -25,7 +25,12 @@ import type { CCloudResourceLoader } from "../loaders"; import { TopicFetchError } from "../loaders/utils/loaderUtils"; import { ConnectionType } from "../clients/sidecar"; import { CCLOUD_CONNECTION_ID } from "../constants"; -import { Consumer, type ConsumerGroup } from "../models/consumerGroup"; +import { + Consumer, + type ConsumerGroup, + ConsumerGroupTreeItem, + ConsumerTreeItem, +} from "../models/consumerGroup"; import type { CustomMarkdownString } from "../models/main"; import { KafkaClusterResourceContainer } from "../models/containers/kafkaClusterResourceContainer"; import { SchemaTreeItem, Subject, SubjectTreeItem } from "../models/schema"; @@ -337,6 +342,26 @@ describe("viewProviders/topics.ts", () => { const treeItem = provider.getTreeItem(TEST_CCLOUD_SUBJECT_WITH_SCHEMAS); assert.ok(treeItem instanceof SubjectTreeItem); }); + + it("should return a ConsumerGroupTreeItem for a ConsumerGroup instance", () => { + const treeItem = provider.getTreeItem(TEST_CCLOUD_CONSUMER_GROUP); + assert.ok(treeItem instanceof ConsumerGroupTreeItem); + }); + + it("should return a ConsumerTreeItem for a Consumer instance", () => { + const treeItem = provider.getTreeItem(TEST_CCLOUD_CONSUMER); + assert.ok(treeItem instanceof ConsumerTreeItem); + }); + + it("should return the container itself for a KafkaClusterResourceContainer", () => { + const container = new KafkaClusterResourceContainer( + TEST_CCLOUD_KAFKA_CLUSTER.connectionId, + TEST_CCLOUD_KAFKA_CLUSTER.connectionType, + "Topics", + ); + const treeItem = provider.getTreeItem(container); + assert.strictEqual(treeItem, container); + }); }); describe("isFocusedOnCCloud()", () => { @@ -430,6 +455,32 @@ describe("viewProviders/topics.ts", () => { assert.strictEqual(children.length, 0); }); + + it("should return members when expanding a ConsumerGroup in the cache", () => { + provider["consumerGroupsInTreeView"].set( + TEST_CCLOUD_CONSUMER_GROUP.consumerGroupId, + TEST_CCLOUD_CONSUMER_GROUP, + ); + + const children = provider.getChildren(TEST_CCLOUD_CONSUMER_GROUP); + + assert.strictEqual(children.length, TEST_CCLOUD_CONSUMER_GROUP.members.length); + assert.ok(children[0] instanceof Consumer); + }); + + it("should return an empty array when expanding a ConsumerGroup not in the cache", () => { + const otherGroup = createConsumerGroup({ + connectionId: CCLOUD_CONNECTION_ID, + connectionType: ConnectionType.Ccloud, + environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id, + consumerGroupId: "not-in-cache", + }); + + const children = provider.getChildren(otherGroup); + + assert.strictEqual(children.length, 0); + }); }); describe("getParent()", () => { @@ -478,6 +529,41 @@ describe("viewProviders/topics.ts", () => { assert.strictEqual(parent, undefined); }); + + it("should return the consumer groups container for a ConsumerGroup", () => { + provider["consumerGroupsContainer"] = new KafkaClusterResourceContainer( + TEST_CCLOUD_KAFKA_CLUSTER.connectionId, + TEST_CCLOUD_KAFKA_CLUSTER.connectionType, + "Consumer Groups", + ); + + const parent = provider.getParent(TEST_CCLOUD_CONSUMER_GROUP); + + assert.strictEqual(parent, provider["consumerGroupsContainer"]); + }); + + it("should return the parent ConsumerGroup for a Consumer", () => { + provider["consumerGroupsInTreeView"].set( + TEST_CCLOUD_CONSUMER_GROUP.consumerGroupId, + TEST_CCLOUD_CONSUMER_GROUP, + ); + + const parent = provider.getParent(TEST_CCLOUD_CONSUMER); + + assert.strictEqual(parent, TEST_CCLOUD_CONSUMER_GROUP); + }); + + it("should return undefined for a KafkaClusterResourceContainer", () => { + const container = new KafkaClusterResourceContainer( + TEST_CCLOUD_KAFKA_CLUSTER.connectionId, + TEST_CCLOUD_KAFKA_CLUSTER.connectionType, + "Topics", + ); + + const parent = provider.getParent(container); + + assert.strictEqual(parent, undefined); + }); }); describe("updateSubjectSchemas()", () => { @@ -622,6 +708,27 @@ describe("viewProviders/topics.ts", () => { sinon.assert.notCalled(treeViewRevealStub); }); + + it("should reveal a KafkaClusterResourceContainer (consumer groups container)", async () => { + provider["consumerGroupsContainer"] = new KafkaClusterResourceContainer( + TEST_CCLOUD_KAFKA_CLUSTER.connectionId, + TEST_CCLOUD_KAFKA_CLUSTER.connectionType, + "Consumer Groups", + ); + const lookupContainer = new KafkaClusterResourceContainer( + CCLOUD_CONNECTION_ID, + ConnectionType.Ccloud, + "Consumer Groups", + ); + + await provider.reveal(lookupContainer, { focus: true }); + + sinon.assert.calledOnceWithExactly( + treeViewRevealStub, + provider["consumerGroupsContainer"], + { focus: true }, + ); + }); }); describe("event handlers", () => { From 63a1dd3df46b448fa46c24bfc042db395f9056c1 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Wed, 11 Mar 2026 18:00:01 -0400 Subject: [PATCH 24/29] add ResourceLoader tests for fetching consumer groups and members --- src/loaders/resourceLoader.test.ts | 130 ++++++++++++++++++++++++++++- 1 file changed, 128 insertions(+), 2 deletions(-) diff --git a/src/loaders/resourceLoader.test.ts b/src/loaders/resourceLoader.test.ts index 870fecd179..67a83c2507 100644 --- a/src/loaders/resourceLoader.test.ts +++ b/src/loaders/resourceLoader.test.ts @@ -1,5 +1,6 @@ import assert from "assert"; import * as sinon from "sinon"; +import { getStubbedResourceManager } from "../../tests/stubs/extensionStorage"; import { getStubbedLocalResourceLoader } from "../../tests/stubs/resourceLoaders"; import { getSidecarStub } from "../../tests/stubs/sidecar"; import { @@ -17,16 +18,17 @@ import { TEST_LOCAL_SUBJECT_WITH_SCHEMAS, } from "../../tests/unit/testResources"; import { createTestSubject, createTestTopicData } from "../../tests/unit/testUtils"; -import type { TopicData } from "../clients/kafkaRest"; +import type { ConsumerData, ConsumerGroupData, TopicData } from "../clients/kafkaRest"; import { SubjectsV1Api } from "../clients/schemaRegistryRest"; import { CCLOUD_CONNECTION_ID, LOCAL_CONNECTION_ID } from "../constants"; import * as errors from "../errors"; +import { Consumer, ConsumerGroup, ConsumerGroupState } from "../models/consumerGroup"; import type { ConnectionId } from "../models/resource"; import type { Subject } from "../models/schema"; import { Schema } from "../models/schema"; import * as notifications from "../notifications"; import type { SidecarHandle } from "../sidecar"; -import { getResourceManager } from "../storage/resourceManager"; +import { getResourceManager, type ResourceManager } from "../storage/resourceManager"; import { clearWorkspaceState } from "../storage/utils"; import { CCloudResourceLoader } from "./ccloudResourceLoader"; import { DirectResourceLoader } from "./directResourceLoader"; @@ -469,6 +471,130 @@ describe("ResourceLoader::getTopicsForCluster()", () => { }); }); +describe("ResourceLoader::getConsumerGroupsForCluster()", () => { + let loaderInstance: LocalResourceLoader; + let sandbox: sinon.SinonSandbox; + let fetchConsumerGroupsStub: sinon.SinonStub; + let fetchConsumerGroupMembersStub: sinon.SinonStub; + let rmStub: sinon.SinonStubbedInstance; + + // minimal ConsumerGroupData fixture matching the API response shape + const testConsumerGroupData: ConsumerGroupData = { + kind: "KafkaConsumerGroup", + metadata: { self: "", resource_name: "" }, + cluster_id: TEST_LOCAL_KAFKA_CLUSTER.id, + consumer_group_id: "test-group-1", + is_simple: false, + partition_assignor: "range", + state: "STABLE", + coordinator: { + related: `http://localhost/kafka/v3/clusters/${TEST_LOCAL_KAFKA_CLUSTER.id}/brokers/0`, + }, + lag_summary: { related: "" }, + }; + + const testConsumerData: ConsumerData = { + kind: "KafkaConsumer", + metadata: { self: "", resource_name: "" }, + cluster_id: TEST_LOCAL_KAFKA_CLUSTER.id, + consumer_group_id: "test-group-1", + consumer_id: "consumer-1", + client_id: "client-1", + assignments: { related: "" }, + }; + + beforeEach(() => { + sandbox = sinon.createSandbox(); + + fetchConsumerGroupsStub = sandbox.stub(loaderUtils, "fetchConsumerGroups"); + fetchConsumerGroupMembersStub = sandbox.stub(loaderUtils, "fetchConsumerGroupMembers"); + loaderInstance = LocalResourceLoader.getInstance(); + // bracket notation to stub protected method (same pattern as ccloudResourceLoader.test.ts) + loaderInstance["ensureCoarseResourcesLoaded"] = sandbox.stub().resolves(); + + rmStub = getStubbedResourceManager(sandbox); + rmStub.getConsumerGroupsForCluster.resolves(undefined); + rmStub.setConsumerGroupsForCluster.resolves(); + }); + + afterEach(async () => { + await clearWorkspaceState(); + sandbox.restore(); + }); + + it("raises error for mismatched connectionId", async () => { + await assert.rejects( + loaderInstance.getConsumerGroupsForCluster(TEST_CCLOUD_KAFKA_CLUSTER), + (err) => { + return (err as Error).message.startsWith("Mismatched connectionId"); + }, + ); + }); + + it("returns cached data if available", async () => { + const cachedGroups = [ + new ConsumerGroup({ + connectionId: TEST_LOCAL_KAFKA_CLUSTER.connectionId, + connectionType: TEST_LOCAL_KAFKA_CLUSTER.connectionType, + environmentId: TEST_LOCAL_KAFKA_CLUSTER.environmentId, + clusterId: TEST_LOCAL_KAFKA_CLUSTER.id, + consumerGroupId: "cached-group", + state: ConsumerGroupState.Stable, + isSimple: false, + partitionAssignor: "range", + coordinatorId: 0, + members: [], + }), + ]; + rmStub.getConsumerGroupsForCluster.resolves(cachedGroups); + + const groups = await loaderInstance.getConsumerGroupsForCluster(TEST_LOCAL_KAFKA_CLUSTER); + + assert.deepStrictEqual(groups, cachedGroups); + sinon.assert.notCalled(fetchConsumerGroupsStub); + sinon.assert.calledOnce(rmStub.getConsumerGroupsForCluster); + }); + + it("fetches consumer groups and members from the API on cache miss", async () => { + fetchConsumerGroupsStub.resolves([testConsumerGroupData]); + fetchConsumerGroupMembersStub.resolves([testConsumerData]); + + const groups = await loaderInstance.getConsumerGroupsForCluster(TEST_LOCAL_KAFKA_CLUSTER); + + assert.strictEqual(groups.length, 1); + assert.ok(groups[0] instanceof ConsumerGroup); + assert.strictEqual(groups[0].consumerGroupId, "test-group-1"); + assert.strictEqual(groups[0].members.length, 1); + assert.ok(groups[0].members[0] instanceof Consumer); + assert.strictEqual(groups[0].members[0].consumerId, "consumer-1"); + sinon.assert.calledOnce(fetchConsumerGroupsStub); + sinon.assert.calledOnce(fetchConsumerGroupMembersStub); + }); + + it("returns group with empty members when member fetch fails", async () => { + fetchConsumerGroupsStub.resolves([testConsumerGroupData]); + fetchConsumerGroupMembersStub.rejects(new Error("member fetch failed")); + + const groups = await loaderInstance.getConsumerGroupsForCluster(TEST_LOCAL_KAFKA_CLUSTER); + + assert.strictEqual(groups.length, 1); + assert.strictEqual(groups[0].members.length, 0); + }); + + it("bypasses cache when forceDeepRefresh is true", async () => { + // even though cache returns data, forceDeepRefresh should re-fetch + rmStub.getConsumerGroupsForCluster.resolves([]); + + fetchConsumerGroupsStub.resolves([testConsumerGroupData]); + fetchConsumerGroupMembersStub.resolves([]); + + const groups = await loaderInstance.getConsumerGroupsForCluster(TEST_LOCAL_KAFKA_CLUSTER, true); + + assert.strictEqual(groups.length, 1); + sinon.assert.calledOnce(fetchConsumerGroupsStub); + }); +}); + describe("ResourceLoader::getSchemasForSubject()", () => { let loaderInstance: ResourceLoader; let sandbox: sinon.SinonSandbox; From 066658e1be03e2c2a8ddf81292f9f3e18f376fe2 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Thu, 12 Mar 2026 11:32:11 -0400 Subject: [PATCH 25/29] fix bug where refreshing view after consumer groups start/stop show old group state; add tests --- src/viewProviders/topics.test.ts | 33 +++++++++++++++++++++ src/viewProviders/topics.ts | 50 ++++++++++++++++++-------------- 2 files changed, 62 insertions(+), 21 deletions(-) diff --git a/src/viewProviders/topics.test.ts b/src/viewProviders/topics.test.ts index 72249d7b34..3181416045 100644 --- a/src/viewProviders/topics.test.ts +++ b/src/viewProviders/topics.test.ts @@ -125,6 +125,39 @@ describe("viewProviders/topics.ts", () => { sinon.assert.callCount(onDidChangeTreeDataFireStub, 4); }); + it("refresh() reuses existing container instances so targeted fires match VS Code's references", async () => { + provider.kafkaCluster = TEST_CCLOUD_KAFKA_CLUSTER; + await provider.refresh(); + + const topicsContainerAfterFirst = provider["topicsContainer"]; + const consumerGroupsContainerAfterFirst = provider["consumerGroupsContainer"]; + + // second refresh on the same cluster should reuse the same container instances + onDidChangeTreeDataFireStub.resetHistory(); + await provider.refresh(true); + + assert.strictEqual( + provider["topicsContainer"], + topicsContainerAfterFirst, + "topicsContainer should be the same instance after refresh", + ); + assert.strictEqual( + provider["consumerGroupsContainer"], + consumerGroupsContainerAfterFirst, + "consumerGroupsContainer should be the same instance after refresh", + ); + // targeted fires should reference the same container instances + const firedElements = onDidChangeTreeDataFireStub.args.map((args: unknown[]) => args[0]); + assert.ok( + firedElements.includes(topicsContainerAfterFirst), + "should fire with the reused topicsContainer", + ); + assert.ok( + firedElements.includes(consumerGroupsContainerAfterFirst), + "should fire with the reused consumerGroupsContainer", + ); + }); + it("onlyIfMatching a contained Kafka topic when the cluster doesn't match should do nothing", async () => { provider.kafkaCluster = TEST_LOCAL_KAFKA_CLUSTER; await provider.refresh(false, TEST_CCLOUD_KAFKA_TOPIC); diff --git a/src/viewProviders/topics.ts b/src/viewProviders/topics.ts index cb43195d74..620e6f3373 100644 --- a/src/viewProviders/topics.ts +++ b/src/viewProviders/topics.ts @@ -82,11 +82,9 @@ export class TopicViewProvider extends ParentedBaseViewProvider< private consumerGroupsInTreeView: Map = new Map(); private clearCaches(): void { - this.topicsContainer = null; this.topicsInTreeView.clear(); this.subjectsInTreeView.clear(); this.subjectToTopicMap.clear(); - this.consumerGroupsContainer = null; this.consumerGroupsInTreeView.clear(); } @@ -194,31 +192,39 @@ export class TopicViewProvider extends ParentedBaseViewProvider< this.clearCaches(); if (!this.kafkaCluster) { // nothing focused; return to empty state + this.topicsContainer = null; + this.consumerGroupsContainer = null; this._onDidChangeTreeData.fire(); return; } const cluster: KafkaCluster = this.kafkaCluster; await this.withProgress("Loading topics and consumer groups...", async () => { - // set up containers with the focused cluster's connection info - this.topicsContainer = new KafkaClusterResourceContainer( - cluster.connectionId, - cluster.connectionType, - KafkaClusterContainerLabel.TOPICS, - [], - "topics-container", - new ThemeIcon(IconNames.TOPIC), - ); - this.topicsContainer.collapsibleState = TreeItemCollapsibleState.Expanded; - - this.consumerGroupsContainer = new KafkaClusterResourceContainer( - cluster.connectionId, - cluster.connectionType, - KafkaClusterContainerLabel.CONSUMER_GROUPS, - [], - undefined, // no context value for now since no commands are needed yet for this container - new ThemeIcon(IconNames.CONSUMER_GROUP), - ); + // reuse existing containers so VS Code can match targeted tree-data-change fires + // against the same object references it already tracks; only create when absent + // (e.g. first load after reset or cluster change) + if (!this.topicsContainer) { + this.topicsContainer = new KafkaClusterResourceContainer( + cluster.connectionId, + cluster.connectionType, + KafkaClusterContainerLabel.TOPICS, + [], + "topics-container", + new ThemeIcon(IconNames.TOPIC), + ); + this.topicsContainer.collapsibleState = TreeItemCollapsibleState.Expanded; + } + + if (!this.consumerGroupsContainer) { + this.consumerGroupsContainer = new KafkaClusterResourceContainer( + cluster.connectionId, + cluster.connectionType, + KafkaClusterContainerLabel.CONSUMER_GROUPS, + [], + undefined, // no context value for now since no commands are needed yet for this container + new ThemeIcon(IconNames.CONSUMER_GROUP), + ); + } await Promise.allSettled([ this.refreshTopics(cluster, forceDeepRefresh), @@ -388,6 +394,8 @@ export class TopicViewProvider extends ParentedBaseViewProvider< } async reset(): Promise { + this.topicsContainer = null; + this.consumerGroupsContainer = null; this.clearCaches(); await super.reset(); } From 5fb18ba54e9a2d0285ce9a01420468051128d0d5 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Thu, 12 Mar 2026 22:23:19 -0400 Subject: [PATCH 26/29] use showErrorNotificationWithButtons instead of basic error notification --- src/viewProviders/topics.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/viewProviders/topics.ts b/src/viewProviders/topics.ts index 620e6f3373..dda7fe38bf 100644 --- a/src/viewProviders/topics.ts +++ b/src/viewProviders/topics.ts @@ -1,5 +1,5 @@ import type { Disposable, TreeItem } from "vscode"; -import { ThemeIcon, TreeItemCollapsibleState, window } from "vscode"; +import { ThemeIcon, TreeItemCollapsibleState } from "vscode"; import { ContextValues } from "../context/values"; import type { EnvironmentChangeEvent, @@ -34,6 +34,7 @@ import { CustomMarkdownString } from "../models/main"; import { isCCloud, isLocal } from "../models/resource"; import { Schema, SchemaTreeItem, Subject, SubjectTreeItem } from "../models/schema"; import { KafkaTopic, KafkaTopicTreeItem } from "../models/topic"; +import { showErrorNotificationWithButtons } from "../notifications"; import { ParentedBaseViewProvider } from "./baseModels/parentedBase"; /** @@ -267,7 +268,7 @@ export class TopicViewProvider extends ParentedBaseViewProvider< .addCodeBlock(message), ); if (err instanceof TopicFetchError) { - window.showErrorMessage( + void showErrorNotificationWithButtons( `Failed to list topics for cluster "${cluster.name}": ${err.message}`, ); } @@ -305,6 +306,9 @@ export class TopicViewProvider extends ParentedBaseViewProvider< .addWarning(`Failed to load consumer groups for **${cluster.name}**:`) .addCodeBlock(message), ); + void showErrorNotificationWithButtons( + `Failed to load consumer groups for cluster "${cluster.name}": ${message}`, + ); } this._onDidChangeTreeData.fire(this.consumerGroupsContainer); From 77d39fb55ddf564afbdbeb25cdfea1e2aca36864 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Thu, 12 Mar 2026 22:23:43 -0400 Subject: [PATCH 27/29] use executeInWorkerPool to prevent overwhelming request count --- src/loaders/cachingResourceLoader.ts | 76 ++++++++++++++-------------- 1 file changed, 39 insertions(+), 37 deletions(-) diff --git a/src/loaders/cachingResourceLoader.ts b/src/loaders/cachingResourceLoader.ts index 2920e41ab6..7b06f1689c 100644 --- a/src/loaders/cachingResourceLoader.ts +++ b/src/loaders/cachingResourceLoader.ts @@ -1,5 +1,4 @@ import type { ConsumerGroupData, TopicData } from "../clients/kafkaRest"; -import { logError } from "../errors"; import { Logger } from "../logging"; import type { ConsumerGroupState } from "../models/consumerGroup"; import { Consumer, ConsumerGroup } from "../models/consumerGroup"; @@ -10,6 +9,7 @@ import type { Subject } from "../models/schema"; import type { SchemaRegistry, SchemaRegistryType } from "../models/schemaRegistry"; import type { KafkaTopic } from "../models/topic"; import { getResourceManager } from "../storage/resourceManager"; +import { executeInWorkerPool } from "../utils/workerPool"; import { ResourceLoader } from "./resourceLoader"; import { correlateTopicsWithSchemaSubjects, @@ -286,44 +286,46 @@ export abstract class CachingResourceLoader< // Deep fetch consumer groups from the API. const responseConsumerGroups: ConsumerGroupData[] = await fetchConsumerGroups(cluster); - // Convert API response to ConsumerGroup models, fetching members for each group. - const consumerGroups: ConsumerGroup[] = await Promise.all( - responseConsumerGroups.map(async (data) => { - let members: Consumer[] = []; - try { - const memberData = await fetchConsumerGroupMembers(cluster, data.consumer_group_id); - members = memberData.map( - (m) => - new Consumer({ - connectionId: cluster.connectionId, - connectionType: cluster.connectionType, - environmentId: cluster.environmentId, - clusterId: cluster.id, - consumerGroupId: data.consumer_group_id, - consumerId: m.consumer_id, - clientId: m.client_id, - instanceId: m.instance_id ?? null, - }), - ); - } catch (error) { - logError(error, `fetching members for consumer group ${data.consumer_group_id}`); - } - - return new ConsumerGroup({ - connectionId: cluster.connectionId, - connectionType: cluster.connectionType, - environmentId: cluster.environmentId, - clusterId: cluster.id, - consumerGroupId: data.consumer_group_id, - state: data.state as ConsumerGroupState, - isSimple: data.is_simple, - partitionAssignor: data.partition_assignor, - coordinatorId: parseCoordinatorId(data.coordinator?.related), - members, - }); - }), + // Fetch members for each group with bounded concurrency to avoid overwhelming + // the sidecar/Kafka REST API on clusters with many consumer groups. + const memberResults = await executeInWorkerPool( + (data: ConsumerGroupData) => fetchConsumerGroupMembers(cluster, data.consumer_group_id), + responseConsumerGroups, + { maxWorkers: 5, taskName: "fetchConsumerGroupMembers" }, ); + // Convert API responses to ConsumerGroup models. + const consumerGroups: ConsumerGroup[] = responseConsumerGroups.map((data, index) => { + const memberResult = memberResults[index]; + const members: Consumer[] = + memberResult?.result?.map( + (m) => + new Consumer({ + connectionId: cluster.connectionId, + connectionType: cluster.connectionType, + environmentId: cluster.environmentId, + clusterId: cluster.id, + consumerGroupId: data.consumer_group_id, + consumerId: m.consumer_id, + clientId: m.client_id, + instanceId: m.instance_id ?? null, + }), + ) ?? []; + + return new ConsumerGroup({ + connectionId: cluster.connectionId, + connectionType: cluster.connectionType, + environmentId: cluster.environmentId, + clusterId: cluster.id, + consumerGroupId: data.consumer_group_id, + state: data.state as ConsumerGroupState, + isSimple: data.is_simple, + partitionAssignor: data.partition_assignor, + coordinatorId: parseCoordinatorId(data.coordinator?.related), + members, + }); + }); + // Cache the consumer groups for this cluster. await resourceManager.setConsumerGroupsForCluster(cluster, consumerGroups); From e8ebaa03548003b57cf34b608bd4a5f7b26b1370 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Thu, 12 Mar 2026 22:23:57 -0400 Subject: [PATCH 28/29] filter tree items by icon to avoid matching on consumer groups --- tests/e2e/objects/views/TopicsView.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/e2e/objects/views/TopicsView.ts b/tests/e2e/objects/views/TopicsView.ts index b23acad0d6..87097a7d9b 100644 --- a/tests/e2e/objects/views/TopicsView.ts +++ b/tests/e2e/objects/views/TopicsView.ts @@ -56,9 +56,11 @@ export class TopicsView extends SearchableView { return this.body.locator("[role='treeitem'][aria-level='1']").filter({ hasText: "Topics" }); } - /** Get all topic items in the view (nested under the Topics container). */ + /** Get all topic items in the view (filtered by topic icon classes to exclude consumer groups). */ get topics(): Locator { - return this.body.locator("[role='treeitem'][aria-level='2']"); + return this.body.locator("[role='treeitem'][aria-level='2']").filter({ + has: this.page.locator(".codicon-confluent-topic, .codicon-confluent-topic-without-schema"), + }); } /** Get a topic item by its label/name. */ From c6f90ee2e24356c1c73417d589ad419e8f043aac Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Thu, 12 Mar 2026 23:49:54 -0400 Subject: [PATCH 29/29] use accessibilityInformation instead of tracking aria-levels for tree item locators in the Topics view --- src/models/consumerGroup.ts | 8 ++++++++ src/models/schema.ts | 4 ++++ src/models/topic.ts | 4 ++++ tests/e2e/objects/views/TopicsView.ts | 9 +++------ 4 files changed, 19 insertions(+), 6 deletions(-) diff --git a/src/models/consumerGroup.ts b/src/models/consumerGroup.ts index 492af9e5e6..6581a7a570 100644 --- a/src/models/consumerGroup.ts +++ b/src/models/consumerGroup.ts @@ -185,6 +185,10 @@ export class ConsumerGroupTreeItem extends vscode.TreeItem { ); this.tooltip = createConsumerGroupTooltip(resource); + + this.accessibilityInformation = { + label: `Consumer Group: ${resource.consumerGroupId}`, + }; } } @@ -239,6 +243,10 @@ export class ConsumerTreeItem extends vscode.TreeItem { this.iconPath = new vscode.ThemeIcon(resource.iconName); this.tooltip = createConsumerTooltip(resource); + + this.accessibilityInformation = { + label: `Consumer: ${resource.consumerId}`, + }; } } diff --git a/src/models/schema.ts b/src/models/schema.ts index ac83459bc3..0f709cbfbd 100644 --- a/src/models/schema.ts +++ b/src/models/schema.ts @@ -282,6 +282,10 @@ export class SubjectTreeItem extends vscode.TreeItem { } propertyParts.push("schema-subject"); this.contextValue = propertyParts.join("-"); + + this.accessibilityInformation = { + label: `Schema Subject: ${subject.name}`, + }; } } diff --git a/src/models/topic.ts b/src/models/topic.ts index 2dd9f24605..b124248b9b 100644 --- a/src/models/topic.ts +++ b/src/models/topic.ts @@ -134,6 +134,10 @@ export class KafkaTopicTreeItem extends vscode.TreeItem { const missingAuthz: KafkaTopicOperation[] = this.checkMissingAuthorizedOperations(resource); this.tooltip = createKafkaTopicTooltip(this.resource, missingAuthz); + + this.accessibilityInformation = { + label: `Kafka Topic: ${resource.name}`, + }; } checkMissingAuthorizedOperations(resource: KafkaTopic): KafkaTopicOperation[] { diff --git a/tests/e2e/objects/views/TopicsView.ts b/tests/e2e/objects/views/TopicsView.ts index 87097a7d9b..6f12df1547 100644 --- a/tests/e2e/objects/views/TopicsView.ts +++ b/tests/e2e/objects/views/TopicsView.ts @@ -56,11 +56,9 @@ export class TopicsView extends SearchableView { return this.body.locator("[role='treeitem'][aria-level='1']").filter({ hasText: "Topics" }); } - /** Get all topic items in the view (filtered by topic icon classes to exclude consumer groups). */ + /** Get all topic items in the view (filtered by aria-label to exclude consumer groups). */ get topics(): Locator { - return this.body.locator("[role='treeitem'][aria-level='2']").filter({ - has: this.page.locator(".codicon-confluent-topic, .codicon-confluent-topic-without-schema"), - }); + return this.body.locator("[role='treeitem'][aria-label^='Kafka Topic:']"); } /** Get a topic item by its label/name. */ @@ -86,8 +84,7 @@ export class TopicsView extends SearchableView { * (One level below {@link topicsWithSchemas topic items with schemas}.) */ get subjects(): Locator { - // we don't use `this.topicsWithSchemas` because these are sibling elements to topics in the DOM - return this.body.locator("[role='treeitem'][aria-level='3']"); + return this.body.locator("[role='treeitem'][aria-label^='Schema Subject:']"); } /**