diff --git a/CHANGELOG.md b/CHANGELOG.md index cb78e2c6cf..6351ff470b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ All notable changes to this extension will be documented in this file. ## Unreleased +### Added + +- Consumer groups and their members are now visible in the Topics view under a collapsible "Consumer + Groups" container - Direct connections form now supports OAuth authentication for WarpStream connections. ## 2.2.2 diff --git a/package.json b/package.json index 4fcd89da34..50adedd131 100644 --- a/package.json +++ b/package.json @@ -1846,11 +1846,6 @@ "when": "view == confluent-topics && confluent.kafkaClusterSelected && confluent.topicSearchApplied", "group": "navigation@1" }, - { - "command": "confluent.topics.create", - "when": "view == confluent-topics && confluent.kafkaClusterSelected", - "group": "navigation@2" - }, { "command": "confluent.topics.kafka-cluster.select", "when": "view == confluent-topics && (confluent.ccloudConnectionAvailable || confluent.localKafkaClusterAvailable || confluent.directKafkaClusterAvailable)", @@ -2008,6 +2003,11 @@ "when": "viewItem == resources-ccloud-container-connected", "group": "inline@2" }, + { + "command": "confluent.topics.create", + "when": "view == confluent-topics && viewItem == topics-container", + "group": "inline@1" + }, { "command": "confluent.flinkdatabase.createRelation", "when": "view == confluent-flink-database && viewItem == flink-database-relations-container", diff --git a/src/loaders/cachingResourceLoader.ts b/src/loaders/cachingResourceLoader.ts index 0cf042118a..7b06f1689c 100644 --- a/src/loaders/cachingResourceLoader.ts +++ b/src/loaders/cachingResourceLoader.ts @@ -1,5 +1,7 @@ -import type { TopicData } from "../clients/kafkaRest"; +import type { ConsumerGroupData, TopicData } from "../clients/kafkaRest"; import { Logger } from "../logging"; +import type { ConsumerGroupState } from "../models/consumerGroup"; +import { Consumer, ConsumerGroup } from "../models/consumerGroup"; import type { Environment, EnvironmentType } from "../models/environment"; import type { KafkaCluster, KafkaClusterType } from "../models/kafkaCluster"; import type { EnvironmentId } from "../models/resource"; @@ -7,8 +9,15 @@ import type { Subject } from "../models/schema"; import type { SchemaRegistry, SchemaRegistryType } from "../models/schemaRegistry"; import type { KafkaTopic } from "../models/topic"; import { getResourceManager } from "../storage/resourceManager"; +import { executeInWorkerPool } from "../utils/workerPool"; import { ResourceLoader } from "./resourceLoader"; -import { correlateTopicsWithSchemaSubjects, fetchTopics } from "./utils/loaderUtils"; +import { + correlateTopicsWithSchemaSubjects, + fetchConsumerGroupMembers, + fetchConsumerGroups, + fetchTopics, + parseCoordinatorId, +} from "./utils/loaderUtils"; const logger = new Logger("cachingResourceLoader"); @@ -246,4 +255,80 @@ export abstract class CachingResourceLoader< return topics; } + + /** + * Return the consumer groups for a given Kafka cluster. + * + * Caches the consumer groups for the cluster in the resource manager. + */ + public async getConsumerGroupsForCluster( + cluster: KCT, + forceDeepRefresh: boolean = false, + ): Promise { + if (cluster.connectionId !== this.connectionId) { + throw new Error( + `Mismatched connectionId ${this.connectionId} for cluster ${JSON.stringify(cluster, null, 2)}`, + ); + } + + await this.ensureCoarseResourcesLoaded(forceDeepRefresh); + + const resourceManager = getResourceManager(); + const cachedConsumerGroups = await resourceManager.getConsumerGroupsForCluster(cluster); + if (cachedConsumerGroups !== undefined && !forceDeepRefresh) { + // Cache hit. + logger.debug( + `Returning ${cachedConsumerGroups.length} cached consumer groups for cluster ${cluster.id}`, + ); + return cachedConsumerGroups; + } + + // Deep fetch consumer groups from the API. + const responseConsumerGroups: ConsumerGroupData[] = await fetchConsumerGroups(cluster); + + // Fetch members for each group with bounded concurrency to avoid overwhelming + // the sidecar/Kafka REST API on clusters with many consumer groups. + const memberResults = await executeInWorkerPool( + (data: ConsumerGroupData) => fetchConsumerGroupMembers(cluster, data.consumer_group_id), + responseConsumerGroups, + { maxWorkers: 5, taskName: "fetchConsumerGroupMembers" }, + ); + + // Convert API responses to ConsumerGroup models. + const consumerGroups: ConsumerGroup[] = responseConsumerGroups.map((data, index) => { + const memberResult = memberResults[index]; + const members: Consumer[] = + memberResult?.result?.map( + (m) => + new Consumer({ + connectionId: cluster.connectionId, + connectionType: cluster.connectionType, + environmentId: cluster.environmentId, + clusterId: cluster.id, + consumerGroupId: data.consumer_group_id, + consumerId: m.consumer_id, + clientId: m.client_id, + instanceId: m.instance_id ?? null, + }), + ) ?? []; + + return new ConsumerGroup({ + connectionId: cluster.connectionId, + connectionType: cluster.connectionType, + environmentId: cluster.environmentId, + clusterId: cluster.id, + consumerGroupId: data.consumer_group_id, + state: data.state as ConsumerGroupState, + isSimple: data.is_simple, + partitionAssignor: data.partition_assignor, + coordinatorId: parseCoordinatorId(data.coordinator?.related), + members, + }); + }); + + // Cache the consumer groups for this cluster. + await resourceManager.setConsumerGroupsForCluster(cluster, consumerGroups); + + return consumerGroups; + } } diff --git a/src/loaders/resourceLoader.test.ts b/src/loaders/resourceLoader.test.ts index 870fecd179..67a83c2507 100644 --- a/src/loaders/resourceLoader.test.ts +++ b/src/loaders/resourceLoader.test.ts @@ -1,5 +1,6 @@ import assert from "assert"; import * as sinon from "sinon"; +import { getStubbedResourceManager } from "../../tests/stubs/extensionStorage"; import { getStubbedLocalResourceLoader } from "../../tests/stubs/resourceLoaders"; import { getSidecarStub } from "../../tests/stubs/sidecar"; import { @@ -17,16 +18,17 @@ import { TEST_LOCAL_SUBJECT_WITH_SCHEMAS, } from "../../tests/unit/testResources"; import { createTestSubject, createTestTopicData } from "../../tests/unit/testUtils"; -import type { TopicData } from "../clients/kafkaRest"; +import type { ConsumerData, ConsumerGroupData, TopicData } from "../clients/kafkaRest"; import { SubjectsV1Api } from "../clients/schemaRegistryRest"; import { CCLOUD_CONNECTION_ID, LOCAL_CONNECTION_ID } from "../constants"; import * as errors from "../errors"; +import { Consumer, ConsumerGroup, ConsumerGroupState } from "../models/consumerGroup"; import type { ConnectionId } from "../models/resource"; import type { Subject } from "../models/schema"; import { Schema } from "../models/schema"; import * as notifications from "../notifications"; import type { SidecarHandle } from "../sidecar"; -import { getResourceManager } from "../storage/resourceManager"; +import { getResourceManager, type ResourceManager } from "../storage/resourceManager"; import { clearWorkspaceState } from "../storage/utils"; import { CCloudResourceLoader } from "./ccloudResourceLoader"; import { DirectResourceLoader } from "./directResourceLoader"; @@ -469,6 +471,130 @@ describe("ResourceLoader::getTopicsForCluster()", () => { }); }); +describe("ResourceLoader::getConsumerGroupsForCluster()", () => { + let loaderInstance: LocalResourceLoader; + let sandbox: sinon.SinonSandbox; + let fetchConsumerGroupsStub: sinon.SinonStub; + let fetchConsumerGroupMembersStub: sinon.SinonStub; + let rmStub: sinon.SinonStubbedInstance; + + // minimal ConsumerGroupData fixture matching the API response shape + const testConsumerGroupData: ConsumerGroupData = { + kind: "KafkaConsumerGroup", + metadata: { self: "", resource_name: "" }, + cluster_id: TEST_LOCAL_KAFKA_CLUSTER.id, + consumer_group_id: "test-group-1", + is_simple: false, + partition_assignor: "range", + state: "STABLE", + coordinator: { + related: `http://localhost/kafka/v3/clusters/${TEST_LOCAL_KAFKA_CLUSTER.id}/brokers/0`, + }, + lag_summary: { related: "" }, + }; + + const testConsumerData: ConsumerData = { + kind: "KafkaConsumer", + metadata: { self: "", resource_name: "" }, + cluster_id: TEST_LOCAL_KAFKA_CLUSTER.id, + consumer_group_id: "test-group-1", + consumer_id: "consumer-1", + client_id: "client-1", + assignments: { related: "" }, + }; + + beforeEach(() => { + sandbox = sinon.createSandbox(); + + fetchConsumerGroupsStub = sandbox.stub(loaderUtils, "fetchConsumerGroups"); + fetchConsumerGroupMembersStub = sandbox.stub(loaderUtils, "fetchConsumerGroupMembers"); + loaderInstance = LocalResourceLoader.getInstance(); + // bracket notation to stub protected method (same pattern as ccloudResourceLoader.test.ts) + loaderInstance["ensureCoarseResourcesLoaded"] = sandbox.stub().resolves(); + + rmStub = getStubbedResourceManager(sandbox); + rmStub.getConsumerGroupsForCluster.resolves(undefined); + rmStub.setConsumerGroupsForCluster.resolves(); + }); + + afterEach(async () => { + await clearWorkspaceState(); + sandbox.restore(); + }); + + it("raises error for mismatched connectionId", async () => { + await assert.rejects( + loaderInstance.getConsumerGroupsForCluster(TEST_CCLOUD_KAFKA_CLUSTER), + (err) => { + return (err as Error).message.startsWith("Mismatched connectionId"); + }, + ); + }); + + it("returns cached data if available", async () => { + const cachedGroups = [ + new ConsumerGroup({ + connectionId: TEST_LOCAL_KAFKA_CLUSTER.connectionId, + connectionType: TEST_LOCAL_KAFKA_CLUSTER.connectionType, + environmentId: TEST_LOCAL_KAFKA_CLUSTER.environmentId, + clusterId: TEST_LOCAL_KAFKA_CLUSTER.id, + consumerGroupId: "cached-group", + state: ConsumerGroupState.Stable, + isSimple: false, + partitionAssignor: "range", + coordinatorId: 0, + members: [], + }), + ]; + rmStub.getConsumerGroupsForCluster.resolves(cachedGroups); + + const groups = await loaderInstance.getConsumerGroupsForCluster(TEST_LOCAL_KAFKA_CLUSTER); + + assert.deepStrictEqual(groups, cachedGroups); + sinon.assert.notCalled(fetchConsumerGroupsStub); + sinon.assert.calledOnce(rmStub.getConsumerGroupsForCluster); + }); + + it("fetches consumer groups and members from the API on cache miss", async () => { + fetchConsumerGroupsStub.resolves([testConsumerGroupData]); + fetchConsumerGroupMembersStub.resolves([testConsumerData]); + + const groups = await loaderInstance.getConsumerGroupsForCluster(TEST_LOCAL_KAFKA_CLUSTER); + + assert.strictEqual(groups.length, 1); + assert.ok(groups[0] instanceof ConsumerGroup); + assert.strictEqual(groups[0].consumerGroupId, "test-group-1"); + assert.strictEqual(groups[0].members.length, 1); + assert.ok(groups[0].members[0] instanceof Consumer); + assert.strictEqual(groups[0].members[0].consumerId, "consumer-1"); + sinon.assert.calledOnce(fetchConsumerGroupsStub); + sinon.assert.calledOnce(fetchConsumerGroupMembersStub); + }); + + it("returns group with empty members when member fetch fails", async () => { + fetchConsumerGroupsStub.resolves([testConsumerGroupData]); + fetchConsumerGroupMembersStub.rejects(new Error("member fetch failed")); + + const groups = await loaderInstance.getConsumerGroupsForCluster(TEST_LOCAL_KAFKA_CLUSTER); + + assert.strictEqual(groups.length, 1); + assert.strictEqual(groups[0].members.length, 0); + }); + + it("bypasses cache when forceDeepRefresh is true", async () => { + // even though cache returns data, forceDeepRefresh should re-fetch + rmStub.getConsumerGroupsForCluster.resolves([]); + + fetchConsumerGroupsStub.resolves([testConsumerGroupData]); + fetchConsumerGroupMembersStub.resolves([]); + + const groups = await loaderInstance.getConsumerGroupsForCluster(TEST_LOCAL_KAFKA_CLUSTER, true); + + assert.strictEqual(groups.length, 1); + sinon.assert.calledOnce(fetchConsumerGroupsStub); + }); +}); + describe("ResourceLoader::getSchemasForSubject()", () => { let loaderInstance: ResourceLoader; let sandbox: sinon.SinonSandbox; diff --git a/src/loaders/resourceLoader.ts b/src/loaders/resourceLoader.ts index ee2f9d56d2..58123993d4 100644 --- a/src/loaders/resourceLoader.ts +++ b/src/loaders/resourceLoader.ts @@ -11,6 +11,7 @@ import type { ConnectionId, EnvironmentId, IResourceBase } from "../models/resou import type { Schema } from "../models/schema"; import { Subject, subjectMatchesTopicName } from "../models/schema"; import type { SchemaRegistry } from "../models/schemaRegistry"; +import type { ConsumerGroup } from "../models/consumerGroup"; import type { KafkaTopic } from "../models/topic"; import { showWarningNotificationWithButtons } from "../notifications"; import { getSidecar } from "../sidecar"; @@ -133,6 +134,17 @@ export abstract class ResourceLoader extends DisposableCollection implements IRe forceRefresh?: boolean, ): Promise; + /** + * Return the consumer groups for a given Kafka cluster. + * @param cluster The Kafka cluster to fetch consumer groups for. + * @param forceRefresh If true, will ignore any cached consumer groups and fetch anew. + * @returns An array of consumer groups for the cluster. + */ + public abstract getConsumerGroupsForCluster( + cluster: KafkaCluster, + forceRefresh?: boolean, + ): Promise; + // Schema registry methods. /** diff --git a/src/loaders/utils/loaderUtils.test.ts b/src/loaders/utils/loaderUtils.test.ts index 21b7465c55..bdaeac84ed 100644 --- a/src/loaders/utils/loaderUtils.test.ts +++ b/src/loaders/utils/loaderUtils.test.ts @@ -399,6 +399,14 @@ describe("loaderUtils.ts", () => { assert.strictEqual(result.length, 0); }); + + it("should propagate API errors from listKafkaConsumerGroups", async () => { + stubbedClient.listKafkaConsumerGroups.rejects(new Error("Connection refused")); + + await assert.rejects(loaderUtils.fetchConsumerGroups(TEST_LOCAL_KAFKA_CLUSTER), { + message: "Connection refused", + }); + }); }); describe("fetchConsumerGroupMembers()", () => { @@ -475,6 +483,49 @@ describe("loaderUtils.ts", () => { consumer_group_id: testGroupId, }); }); + + it("should propagate API errors from listKafkaConsumers", async () => { + stubbedClient.listKafkaConsumers.rejects(new Error("Connection refused")); + + await assert.rejects( + loaderUtils.fetchConsumerGroupMembers(TEST_LOCAL_KAFKA_CLUSTER, testGroupId), + { message: "Connection refused" }, + ); + }); + }); + + describe("parseCoordinatorId()", () => { + it("should parse broker ID from a full Kafka REST URL", () => { + const url = "http://localhost:26636/kafka/v3/clusters/lkc-abc123/brokers/2"; + assert.strictEqual(loaderUtils.parseCoordinatorId(url), 2); + }); + + it("should parse broker ID from a CCloud-style URL", () => { + const url = + "https://pkc-abc123.us-east-1.aws.confluent.cloud/kafka/v3/clusters/lkc-5vmjd8/brokers/0"; + assert.strictEqual(loaderUtils.parseCoordinatorId(url), 0); + }); + + it("should parse a plain numeric string", () => { + assert.strictEqual(loaderUtils.parseCoordinatorId("7"), 7); + }); + + it("should return null for undefined", () => { + assert.strictEqual(loaderUtils.parseCoordinatorId(undefined), null); + }); + + it("should return null for empty string", () => { + assert.strictEqual(loaderUtils.parseCoordinatorId(""), null); + }); + + it("should return null when the last segment is non-numeric", () => { + assert.strictEqual( + loaderUtils.parseCoordinatorId( + "http://localhost/kafka/v3/clusters/lkc-abc123/brokers/notanumber", + ), + null, + ); + }); }); describe("generateFlinkStatementKey()", () => { diff --git a/src/loaders/utils/loaderUtils.ts b/src/loaders/utils/loaderUtils.ts index 494fc46a1e..305160f667 100644 --- a/src/loaders/utils/loaderUtils.ts +++ b/src/loaders/utils/loaderUtils.ts @@ -315,3 +315,13 @@ export function generateFlinkStatementKey(params: IFlinkStatementSubmitParameter hasher.update(params.statement); return hasher.digest("hex"); } + +/** Parse a broker ID from a Kafka REST API relationship URL, returning null if non-numeric. + * e.g. "http://localhost:26636/kafka/v3/clusters/lkc-abc123/brokers/1" → 1 */ +export function parseCoordinatorId(related: string | undefined): number | null { + if (!related) return null; + const lastSegment = related.split("/").pop(); + if (!lastSegment) return null; + const parsed = parseInt(lastSegment, 10); + return Number.isNaN(parsed) ? null : parsed; +} diff --git a/src/models/consumerGroup.ts b/src/models/consumerGroup.ts index 492af9e5e6..6581a7a570 100644 --- a/src/models/consumerGroup.ts +++ b/src/models/consumerGroup.ts @@ -185,6 +185,10 @@ export class ConsumerGroupTreeItem extends vscode.TreeItem { ); this.tooltip = createConsumerGroupTooltip(resource); + + this.accessibilityInformation = { + label: `Consumer Group: ${resource.consumerGroupId}`, + }; } } @@ -239,6 +243,10 @@ export class ConsumerTreeItem extends vscode.TreeItem { this.iconPath = new vscode.ThemeIcon(resource.iconName); this.tooltip = createConsumerTooltip(resource); + + this.accessibilityInformation = { + label: `Consumer: ${resource.consumerId}`, + }; } } diff --git a/src/models/schema.ts b/src/models/schema.ts index ac83459bc3..0f709cbfbd 100644 --- a/src/models/schema.ts +++ b/src/models/schema.ts @@ -282,6 +282,10 @@ export class SubjectTreeItem extends vscode.TreeItem { } propertyParts.push("schema-subject"); this.contextValue = propertyParts.join("-"); + + this.accessibilityInformation = { + label: `Schema Subject: ${subject.name}`, + }; } } diff --git a/src/models/topic.ts b/src/models/topic.ts index 2dd9f24605..b124248b9b 100644 --- a/src/models/topic.ts +++ b/src/models/topic.ts @@ -134,6 +134,10 @@ export class KafkaTopicTreeItem extends vscode.TreeItem { const missingAuthz: KafkaTopicOperation[] = this.checkMissingAuthorizedOperations(resource); this.tooltip = createKafkaTopicTooltip(this.resource, missingAuthz); + + this.accessibilityInformation = { + label: `Kafka Topic: ${resource.name}`, + }; } checkMissingAuthorizedOperations(resource: KafkaTopic): KafkaTopicOperation[] { diff --git a/src/storage/resourceManager.test.ts b/src/storage/resourceManager.test.ts index bfdf839851..ca6daf9c6c 100644 --- a/src/storage/resourceManager.test.ts +++ b/src/storage/resourceManager.test.ts @@ -16,6 +16,10 @@ import { TEST_LOCAL_KAFKA_TOPIC, TEST_LOCAL_SCHEMA_REGISTRY, } from "../../tests/unit/testResources"; +import { + TEST_CCLOUD_CONSUMER_GROUP, + TEST_LOCAL_CONSUMER_GROUP, +} from "../../tests/unit/testResources/consumerGroup"; import { TEST_DIRECT_CONNECTION_FORM_SPEC, TEST_DIRECT_CONNECTION_ID, @@ -34,6 +38,7 @@ import { KafkaClusterConfigToJSON, } from "../clients/sidecar"; import { CCLOUD_CONNECTION_ID, LOCAL_CONNECTION_ID } from "../constants"; +import { Consumer, ConsumerGroup } from "../models/consumerGroup"; import { CCloudEnvironment } from "../models/environment"; import { FlinkAIAgent } from "../models/flinkAiAgent"; import { FlinkAIConnection } from "../models/flinkAiConnection"; @@ -395,6 +400,66 @@ describe("storage/resourceManager", () => { }); }); + describe("ResourceManager setConsumerGroupsForCluster() / getConsumerGroupsForCluster()", function () { + it("setConsumerGroupsForCluster() should throw an error if given mixed cluster IDs", async () => { + const mixedGroups = [TEST_CCLOUD_CONSUMER_GROUP, TEST_LOCAL_CONSUMER_GROUP]; + + await assert.rejects( + getResourceManager().setConsumerGroupsForCluster(TEST_CCLOUD_KAFKA_CLUSTER, mixedGroups), + (err) => { + return ( + err instanceof Error && err.message.includes("Cluster ID mismatch in consumer groups") + ); + }, + "Expected error when setting mixed cluster IDs", + ); + }); + + it("getConsumerGroupsForCluster() should return undefined if no cached consumer groups", async () => { + const groups = + await getResourceManager().getConsumerGroupsForCluster(TEST_CCLOUD_KAFKA_CLUSTER); + assert.deepStrictEqual(groups, undefined); + }); + + it("getConsumerGroupsForCluster() should return empty array if empty array is set", async () => { + const manager = getResourceManager(); + await manager.setConsumerGroupsForCluster(TEST_CCLOUD_KAFKA_CLUSTER, []); + const groups = await manager.getConsumerGroupsForCluster(TEST_CCLOUD_KAFKA_CLUSTER); + assert.deepStrictEqual(groups, []); + }); + + it("should round-trip consumer groups through set/get with correct types", async () => { + const manager = getResourceManager(); + await manager.setConsumerGroupsForCluster(TEST_CCLOUD_KAFKA_CLUSTER, [ + TEST_CCLOUD_CONSUMER_GROUP, + ]); + + const groups = await manager.getConsumerGroupsForCluster(TEST_CCLOUD_KAFKA_CLUSTER); + + assert.ok(groups); + assert.strictEqual(groups.length, 1); + assert.ok(groups[0] instanceof ConsumerGroup); + assert.strictEqual(groups[0].consumerGroupId, TEST_CCLOUD_CONSUMER_GROUP.consumerGroupId); + }); + + it("should rehydrate Consumer members when reading from storage", async () => { + const manager = getResourceManager(); + await manager.setConsumerGroupsForCluster(TEST_CCLOUD_KAFKA_CLUSTER, [ + TEST_CCLOUD_CONSUMER_GROUP, + ]); + + const groups = await manager.getConsumerGroupsForCluster(TEST_CCLOUD_KAFKA_CLUSTER); + + assert.ok(groups); + assert.ok(groups[0].members.length > 0); + assert.ok(groups[0].members[0] instanceof Consumer); + assert.strictEqual( + groups[0].members[0].consumerId, + TEST_CCLOUD_CONSUMER_GROUP.members[0].consumerId, + ); + }); + }); + describe("ResourceManager Schema Registry methods", function () { it("setSchemaRegistries() error when given mixed connection id array", async () => { const ccloudSchemaRegistry = CCloudSchemaRegistry.create(TEST_CCLOUD_SCHEMA_REGISTRY); diff --git a/src/storage/resourceManager.ts b/src/storage/resourceManager.ts index 7780d80a9b..761b9eca5c 100644 --- a/src/storage/resourceManager.ts +++ b/src/storage/resourceManager.ts @@ -7,6 +7,7 @@ import { getExtensionContext } from "../context/extension"; import type { FormConnectionType } from "../directConnections/types"; import { ExtensionContextNotSetError } from "../errors"; import { Logger } from "../logging"; +import { Consumer, ConsumerGroup } from "../models/consumerGroup"; import type { Environment, EnvironmentType } from "../models/environment"; import { getEnvironmentClass } from "../models/environment"; import { FlinkAIAgent } from "../models/flinkAiAgent"; @@ -71,6 +72,7 @@ export enum GeneratedKeyResourceType { SCHEMA_REGISTRIES = "schemaRegistries", TOPICS = "topics", SUBJECTS = "subjects", + CONSUMER_GROUPS = "consumerGroups", FLINK_ARTIFACTS = "flinkArtifacts", } @@ -518,6 +520,99 @@ export class ResourceManager { }); } + /** + * Store this (possibly empty) list of consumer groups for a cluster in workspace state. + * If it is known that a given cluster has no consumer groups, then should call with empty array. + * + * Raises an error if the cluster ID of any consumer group does not match the given cluster's ID. + */ + async setConsumerGroupsForCluster( + cluster: KafkaClusterType, + consumerGroups: ConsumerGroup[], + ): Promise { + // Ensure that all consumer groups have the correct cluster ID. + if (consumerGroups.some((group) => group.clusterId !== cluster.id)) { + logger.warn("Cluster ID mismatch in consumer groups", cluster, consumerGroups); + throw new Error("Cluster ID mismatch in consumer groups"); + } + + // Will be a per-connection-id key for storing consumer groups by cluster ID. + const key = this.generateWorkspaceStorageKey( + cluster.connectionId, + GeneratedKeyResourceType.CONSUMER_GROUPS, + ); + + await this.runWithMutex(key, async () => { + // Get the JSON-stringified map from storage + const consumerGroupsByClusterString: string | undefined = this.workspaceState.get(key); + const consumerGroupsByCluster: Map = consumerGroupsByClusterString + ? stringToMap(consumerGroupsByClusterString) + : new Map(); + + // Set the new consumer groups for the cluster + consumerGroupsByCluster.set(cluster.id, consumerGroups); + + // Now save the updated cluster consumer groups into the proper key'd storage. + await this.workspaceState.update(key, mapToString(consumerGroupsByCluster)); + }); + } + + /** + * Get consumer groups given a cluster, be it local, ccloud, or direct. + * + * @returns ConsumerGroup[] (possibly empty) if known, else undefined + * indicating nothing at all known about this cluster (and should be deep probed). + */ + async getConsumerGroupsForCluster( + cluster: KafkaClusterType, + ): Promise { + const key = this.generateWorkspaceStorageKey( + cluster.connectionId, + GeneratedKeyResourceType.CONSUMER_GROUPS, + ); + + // Get the JSON-stringified map from storage + const consumerGroupsByClusterString: string | undefined = this.workspaceState.get(key); + const consumerGroupsByCluster: Map = consumerGroupsByClusterString + ? stringToMap(consumerGroupsByClusterString) + : new Map(); + + // Will either be undefined or an array of plain json objects since + // just deserialized from storage. + const vanillaJSONConsumerGroups: object[] | undefined = consumerGroupsByCluster.get(cluster.id); + if (vanillaJSONConsumerGroups === undefined) { + return undefined; + } + + // Promote each member to be an instance of ConsumerGroup, return. + // (Empty list will be returned as is, indicating that we know there are + // no consumer groups in this cluster.) + return vanillaJSONConsumerGroups.map((group) => { + const cgObj = group as ConsumerGroup; + // also rehydrate Consumer members before recreating the ConsumerGroup instance + let members: Consumer[] = []; + if (cgObj.members && cgObj.members.length > 0) { + members = cgObj.members.map((member: Consumer | object) => { + if (member instanceof Consumer) { + return member; + } + const memberObj = member as Consumer; + return new Consumer({ + connectionId: memberObj.connectionId, + connectionType: memberObj.connectionType, + environmentId: memberObj.environmentId, + clusterId: memberObj.clusterId, + consumerGroupId: memberObj.consumerGroupId, + consumerId: memberObj.consumerId, + clientId: memberObj.clientId, + instanceId: memberObj.instanceId, + }); + }); + } + return new ConsumerGroup({ ...cgObj, members }); + }); + } + // Flink Artifacts /** diff --git a/src/viewProviders/topics.test.ts b/src/viewProviders/topics.test.ts index 5eb2566495..3181416045 100644 --- a/src/viewProviders/topics.test.ts +++ b/src/viewProviders/topics.test.ts @@ -15,9 +15,24 @@ import { TEST_LOCAL_ENVIRONMENT_ID, TEST_LOCAL_KAFKA_CLUSTER, } from "../../tests/unit/testResources"; +import { + createConsumerGroup, + TEST_CCLOUD_CONSUMER, + TEST_CCLOUD_CONSUMER_GROUP, +} from "../../tests/unit/testResources/consumerGroup"; import type { EventChangeType, SubjectChangeEvent } from "../emitters"; import type { CCloudResourceLoader } from "../loaders"; import { TopicFetchError } from "../loaders/utils/loaderUtils"; +import { ConnectionType } from "../clients/sidecar"; +import { CCLOUD_CONNECTION_ID } from "../constants"; +import { + Consumer, + type ConsumerGroup, + ConsumerGroupTreeItem, + ConsumerTreeItem, +} from "../models/consumerGroup"; +import type { CustomMarkdownString } from "../models/main"; +import { KafkaClusterResourceContainer } from "../models/containers/kafkaClusterResourceContainer"; import { SchemaTreeItem, Subject, SubjectTreeItem } from "../models/schema"; import { KafkaTopic, KafkaTopicTreeItem } from "../models/topic"; import { TopicViewProvider } from "./topics"; @@ -48,6 +63,7 @@ describe("viewProviders/topics.ts", () => { stubbedLoader = getStubbedCCloudResourceLoader(sandbox); stubbedLoader.getTopicsForCluster.resolves([]); + stubbedLoader.getConsumerGroupsForCluster.resolves([]); }); afterEach(() => { @@ -64,7 +80,8 @@ describe("viewProviders/topics.ts", () => { it("no-arg refresh() when focused on a cluster should call onDidChangeTreeData.fire()", async () => { provider.kafkaCluster = TEST_CCLOUD_KAFKA_CLUSTER; await provider.refresh(); - sinon.assert.calledOnce(onDidChangeTreeDataFireStub); + // twice for topics (loading start/end) + twice for consumer groups (loading start/end) + assert.strictEqual(onDidChangeTreeDataFireStub.callCount, 4); }); it("no-arg refresh() when no cluster is set should call onDidChangeTreeData.fire() once to clear (disconnect scenario)", async () => { @@ -104,7 +121,41 @@ describe("viewProviders/topics.ts", () => { it("onlyIfMatching a kafka cluster when the cluster matches should call onDidChangeTreeData.fire()", async () => { provider.kafkaCluster = TEST_CCLOUD_KAFKA_CLUSTER; await provider.refresh(false, TEST_CCLOUD_KAFKA_CLUSTER); - sinon.assert.calledOnce(onDidChangeTreeDataFireStub); + // twice for topics (loading start/end) + twice for consumer groups (loading start/end) + sinon.assert.callCount(onDidChangeTreeDataFireStub, 4); + }); + + it("refresh() reuses existing container instances so targeted fires match VS Code's references", async () => { + provider.kafkaCluster = TEST_CCLOUD_KAFKA_CLUSTER; + await provider.refresh(); + + const topicsContainerAfterFirst = provider["topicsContainer"]; + const consumerGroupsContainerAfterFirst = provider["consumerGroupsContainer"]; + + // second refresh on the same cluster should reuse the same container instances + onDidChangeTreeDataFireStub.resetHistory(); + await provider.refresh(true); + + assert.strictEqual( + provider["topicsContainer"], + topicsContainerAfterFirst, + "topicsContainer should be the same instance after refresh", + ); + assert.strictEqual( + provider["consumerGroupsContainer"], + consumerGroupsContainerAfterFirst, + "consumerGroupsContainer should be the same instance after refresh", + ); + // targeted fires should reference the same container instances + const firedElements = onDidChangeTreeDataFireStub.args.map((args: unknown[]) => args[0]); + assert.ok( + firedElements.includes(topicsContainerAfterFirst), + "should fire with the reused topicsContainer", + ); + assert.ok( + firedElements.includes(consumerGroupsContainerAfterFirst), + "should fire with the reused consumerGroupsContainer", + ); }); it("onlyIfMatching a contained Kafka topic when the cluster doesn't match should do nothing", async () => { @@ -116,13 +167,23 @@ describe("viewProviders/topics.ts", () => { it("onlyIfMatching a contained Kafka topic when the cluster matches should call onDidChangeTreeData.fire()", async () => { provider.kafkaCluster = TEST_CCLOUD_KAFKA_CLUSTER; await provider.refresh(false, TEST_CCLOUD_KAFKA_TOPIC); - sinon.assert.calledOnce(onDidChangeTreeDataFireStub); + // twice for topics (loading start/end) + twice for consumer groups (loading start/end) + sinon.assert.callCount(onDidChangeTreeDataFireStub, 4); }); }); describe("refreshTopics()", () => { + let onDidChangeTreeDataFireStub: sinon.SinonStub; + beforeEach(() => { provider.kafkaCluster = TEST_CCLOUD_KAFKA_CLUSTER; + // set up the container like refresh() does + provider["topicsContainer"] = new KafkaClusterResourceContainer( + TEST_CCLOUD_KAFKA_CLUSTER.connectionId, + TEST_CCLOUD_KAFKA_CLUSTER.connectionType, + "Topics", + ); + onDidChangeTreeDataFireStub = sandbox.stub(provider["_onDidChangeTreeData"], "fire"); }); it("should populate topicsInTreeView from loader results", async () => { @@ -137,6 +198,14 @@ describe("viewProviders/topics.ts", () => { ); }); + it("should set container children from loader results", async () => { + stubbedLoader.getTopicsForCluster.resolves([TEST_CCLOUD_KAFKA_TOPIC]); + + await provider.refreshTopics(TEST_CCLOUD_KAFKA_CLUSTER, false); + + assert.deepStrictEqual(provider["topicsContainer"]!.children, [TEST_CCLOUD_KAFKA_TOPIC]); + }); + it("should populate subjectsInTreeView and subjectToTopicMap when topics have associated Subjects", async () => { stubbedLoader.getTopicsForCluster.resolves([testTopicWithSchema]); @@ -150,15 +219,145 @@ describe("viewProviders/topics.ts", () => { ); }); - it("should call showErrorMessage when loader raises a TopicFetchError", async () => { + it("should set hasError on the container when loader raises a TopicFetchError", async () => { const showErrorMessageStub = sandbox.stub(window, "showErrorMessage"); stubbedLoader.getTopicsForCluster.rejects(new TopicFetchError("Test error")); await provider.refreshTopics(TEST_CCLOUD_KAFKA_CLUSTER, false); - assert.strictEqual(provider["topicsInTreeView"].size, 0); + assert.strictEqual(provider["topicsContainer"]!.hasError, true); + assert.deepStrictEqual(provider["topicsContainer"]!.children, []); + assert.ok( + (provider["topicsContainer"]!.tooltip as CustomMarkdownString).value.includes( + "Test error", + ), + "tooltip should include the error message", + ); sinon.assert.calledOnce(showErrorMessageStub); }); + + it("should fire tree data change events for loading start and end", async () => { + stubbedLoader.getTopicsForCluster.resolves([]); + + await provider.refreshTopics(TEST_CCLOUD_KAFKA_CLUSTER, false); + + // fired twice: once for loading start, once for loading end + assert.strictEqual(onDidChangeTreeDataFireStub.callCount, 2); + }); + + it("should return early if topicsContainer is null", async () => { + provider["topicsContainer"] = null; + + await provider.refreshTopics(TEST_CCLOUD_KAFKA_CLUSTER, false); + + sinon.assert.notCalled(stubbedLoader.getTopicsForCluster); + sinon.assert.notCalled(onDidChangeTreeDataFireStub); + }); + + it("should clear stale topic entries before repopulating", async () => { + // pre-populate a stale entry that won't be returned by the loader + const staleTopic = new KafkaTopic({ ...TEST_CCLOUD_KAFKA_TOPIC, name: "stale-topic" }); + provider["topicsInTreeView"].set(staleTopic.name, staleTopic); + stubbedLoader.getTopicsForCluster.resolves([TEST_CCLOUD_KAFKA_TOPIC]); + + await provider.refreshTopics(TEST_CCLOUD_KAFKA_CLUSTER, false); + + assert.strictEqual(provider["topicsInTreeView"].has("stale-topic"), false); + assert.strictEqual(provider["topicsInTreeView"].has(TEST_CCLOUD_KAFKA_TOPIC.name), true); + assert.strictEqual(provider["topicsInTreeView"].size, 1); + }); + }); + + describe("refreshConsumerGroups()", () => { + let onDidChangeTreeDataFireStub: sinon.SinonStub; + + beforeEach(() => { + provider.kafkaCluster = TEST_CCLOUD_KAFKA_CLUSTER; + // set up the container like refresh() does + provider["consumerGroupsContainer"] = new KafkaClusterResourceContainer( + TEST_CCLOUD_KAFKA_CLUSTER.connectionId, + TEST_CCLOUD_KAFKA_CLUSTER.connectionType, + "Consumer Groups", + ); + onDidChangeTreeDataFireStub = sandbox.stub(provider["_onDidChangeTreeData"], "fire"); + }); + + it("should populate consumerGroupsInTreeView from loader results", async () => { + stubbedLoader.getConsumerGroupsForCluster.resolves([TEST_CCLOUD_CONSUMER_GROUP]); + + await provider.refreshConsumerGroups(TEST_CCLOUD_KAFKA_CLUSTER, false); + + assert.strictEqual(provider["consumerGroupsInTreeView"].size, 1); + assert.deepStrictEqual( + provider["consumerGroupsInTreeView"].get(TEST_CCLOUD_CONSUMER_GROUP.consumerGroupId), + TEST_CCLOUD_CONSUMER_GROUP, + ); + }); + + it("should set container children from loader results", async () => { + stubbedLoader.getConsumerGroupsForCluster.resolves([TEST_CCLOUD_CONSUMER_GROUP]); + + await provider.refreshConsumerGroups(TEST_CCLOUD_KAFKA_CLUSTER, false); + + assert.deepStrictEqual(provider["consumerGroupsContainer"]!.children, [ + TEST_CCLOUD_CONSUMER_GROUP, + ]); + }); + + it("should set hasError on the container when loader throws", async () => { + stubbedLoader.getConsumerGroupsForCluster.rejects(new Error("API error")); + + await provider.refreshConsumerGroups(TEST_CCLOUD_KAFKA_CLUSTER, false); + + assert.strictEqual(provider["consumerGroupsContainer"]!.hasError, true); + assert.deepStrictEqual(provider["consumerGroupsContainer"]!.children, []); + assert.ok( + (provider["consumerGroupsContainer"]!.tooltip as CustomMarkdownString).value.includes( + "API error", + ), + "tooltip should include the error message", + ); + }); + + it("should fire tree data change events for loading start and end", async () => { + stubbedLoader.getConsumerGroupsForCluster.resolves([]); + + await provider.refreshConsumerGroups(TEST_CCLOUD_KAFKA_CLUSTER, false); + + // fired twice: once for loading start, once for loading end + assert.strictEqual(onDidChangeTreeDataFireStub.callCount, 2); + }); + + it("should return early if consumerGroupsContainer is null", async () => { + provider["consumerGroupsContainer"] = null; + + await provider.refreshConsumerGroups(TEST_CCLOUD_KAFKA_CLUSTER, false); + + sinon.assert.notCalled(stubbedLoader.getConsumerGroupsForCluster); + sinon.assert.notCalled(onDidChangeTreeDataFireStub); + }); + + it("should clear stale consumer group entries before repopulating", async () => { + // pre-populate a stale entry that won't be returned by the loader + const staleGroup = createConsumerGroup({ + connectionId: CCLOUD_CONNECTION_ID, + connectionType: ConnectionType.Ccloud, + environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id, + consumerGroupId: "stale-group", + }); + provider["consumerGroupsInTreeView"].set(staleGroup.consumerGroupId, staleGroup); + stubbedLoader.getConsumerGroupsForCluster.resolves([TEST_CCLOUD_CONSUMER_GROUP]); + + await provider.refreshConsumerGroups(TEST_CCLOUD_KAFKA_CLUSTER, false); + + assert.strictEqual(provider["consumerGroupsInTreeView"].has("stale-group"), false); + assert.strictEqual( + provider["consumerGroupsInTreeView"].has(TEST_CCLOUD_CONSUMER_GROUP.consumerGroupId), + true, + ); + assert.strictEqual(provider["consumerGroupsInTreeView"].size, 1); + }); }); describe("getTreeItem()", () => { @@ -176,6 +375,26 @@ describe("viewProviders/topics.ts", () => { const treeItem = provider.getTreeItem(TEST_CCLOUD_SUBJECT_WITH_SCHEMAS); assert.ok(treeItem instanceof SubjectTreeItem); }); + + it("should return a ConsumerGroupTreeItem for a ConsumerGroup instance", () => { + const treeItem = provider.getTreeItem(TEST_CCLOUD_CONSUMER_GROUP); + assert.ok(treeItem instanceof ConsumerGroupTreeItem); + }); + + it("should return a ConsumerTreeItem for a Consumer instance", () => { + const treeItem = provider.getTreeItem(TEST_CCLOUD_CONSUMER); + assert.ok(treeItem instanceof ConsumerTreeItem); + }); + + it("should return the container itself for a KafkaClusterResourceContainer", () => { + const container = new KafkaClusterResourceContainer( + TEST_CCLOUD_KAFKA_CLUSTER.connectionId, + TEST_CCLOUD_KAFKA_CLUSTER.connectionType, + "Topics", + ); + const treeItem = provider.getTreeItem(container); + assert.strictEqual(treeItem, container); + }); }); describe("isFocusedOnCCloud()", () => { @@ -209,13 +428,26 @@ describe("viewProviders/topics.ts", () => { assert.strictEqual(children.length, 0); }); - it("should return topics from topicsInTreeView at the root level", () => { - provider["topicsInTreeView"].set(TEST_CCLOUD_KAFKA_TOPIC.name, TEST_CCLOUD_KAFKA_TOPIC); + it("should return both containers at the root level when both are set", () => { + provider["topicsContainer"] = new KafkaClusterResourceContainer( + TEST_CCLOUD_KAFKA_CLUSTER.connectionId, + TEST_CCLOUD_KAFKA_CLUSTER.connectionType, + "Topics", + ); + provider["consumerGroupsContainer"] = new KafkaClusterResourceContainer( + TEST_CCLOUD_KAFKA_CLUSTER.connectionId, + TEST_CCLOUD_KAFKA_CLUSTER.connectionType, + "Consumer Groups", + ); const children = provider.getChildren(); - assert.strictEqual(children.length, 1); - assert.deepStrictEqual(children[0], TEST_CCLOUD_KAFKA_TOPIC); + assert.strictEqual(children.length, 2); + // consumer groups container should be first, topics second + assert.ok(children[0] instanceof KafkaClusterResourceContainer); + assert.strictEqual(children[0].label, "Consumer Groups"); + assert.ok(children[1] instanceof KafkaClusterResourceContainer); + assert.strictEqual(children[1].label, "Topics"); }); it("should return subjects from topic.children when expanding a topic", () => { @@ -256,13 +488,45 @@ describe("viewProviders/topics.ts", () => { assert.strictEqual(children.length, 0); }); + + it("should return members when expanding a ConsumerGroup in the cache", () => { + provider["consumerGroupsInTreeView"].set( + TEST_CCLOUD_CONSUMER_GROUP.consumerGroupId, + TEST_CCLOUD_CONSUMER_GROUP, + ); + + const children = provider.getChildren(TEST_CCLOUD_CONSUMER_GROUP); + + assert.strictEqual(children.length, TEST_CCLOUD_CONSUMER_GROUP.members.length); + assert.ok(children[0] instanceof Consumer); + }); + + it("should return an empty array when expanding a ConsumerGroup not in the cache", () => { + const otherGroup = createConsumerGroup({ + connectionId: CCLOUD_CONNECTION_ID, + connectionType: ConnectionType.Ccloud, + environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id, + consumerGroupId: "not-in-cache", + }); + + const children = provider.getChildren(otherGroup); + + assert.strictEqual(children.length, 0); + }); }); describe("getParent()", () => { - it("should return undefined for a KafkaTopic (root-level item)", () => { + it("should return the topics container for a KafkaTopic", () => { + provider["topicsContainer"] = new KafkaClusterResourceContainer( + TEST_CCLOUD_KAFKA_CLUSTER.connectionId, + TEST_CCLOUD_KAFKA_CLUSTER.connectionType, + "Topics", + ); + const parent = provider.getParent(TEST_CCLOUD_KAFKA_TOPIC); - assert.strictEqual(parent, undefined); + assert.strictEqual(parent, provider["topicsContainer"]); }); it("should return the parent topic for a Subject", () => { @@ -298,6 +562,41 @@ describe("viewProviders/topics.ts", () => { assert.strictEqual(parent, undefined); }); + + it("should return the consumer groups container for a ConsumerGroup", () => { + provider["consumerGroupsContainer"] = new KafkaClusterResourceContainer( + TEST_CCLOUD_KAFKA_CLUSTER.connectionId, + TEST_CCLOUD_KAFKA_CLUSTER.connectionType, + "Consumer Groups", + ); + + const parent = provider.getParent(TEST_CCLOUD_CONSUMER_GROUP); + + assert.strictEqual(parent, provider["consumerGroupsContainer"]); + }); + + it("should return the parent ConsumerGroup for a Consumer", () => { + provider["consumerGroupsInTreeView"].set( + TEST_CCLOUD_CONSUMER_GROUP.consumerGroupId, + TEST_CCLOUD_CONSUMER_GROUP, + ); + + const parent = provider.getParent(TEST_CCLOUD_CONSUMER); + + assert.strictEqual(parent, TEST_CCLOUD_CONSUMER_GROUP); + }); + + it("should return undefined for a KafkaClusterResourceContainer", () => { + const container = new KafkaClusterResourceContainer( + TEST_CCLOUD_KAFKA_CLUSTER.connectionId, + TEST_CCLOUD_KAFKA_CLUSTER.connectionType, + "Topics", + ); + + const parent = provider.getParent(container); + + assert.strictEqual(parent, undefined); + }); }); describe("updateSubjectSchemas()", () => { @@ -388,6 +687,81 @@ describe("viewProviders/topics.ts", () => { sinon.assert.notCalled(treeViewRevealStub); }); + + it("should reveal a KafkaClusterResourceContainer (topics container)", async () => { + // set up the container so reveal can match against it + provider["topicsContainer"] = new KafkaClusterResourceContainer( + TEST_CCLOUD_KAFKA_CLUSTER.connectionId, + TEST_CCLOUD_KAFKA_CLUSTER.connectionType, + "Topics", + ); + // pass a different instance with the same id to verify lookup by id + const lookupContainer = new KafkaClusterResourceContainer( + CCLOUD_CONNECTION_ID, + ConnectionType.Ccloud, + "Topics", + ); + + await provider.reveal(lookupContainer, { select: true }); + + sinon.assert.calledOnceWithExactly(treeViewRevealStub, provider["topicsContainer"], { + select: true, + }); + }); + + it("should reveal a ConsumerGroup from the cache", async () => { + provider["consumerGroupsInTreeView"].set( + TEST_CCLOUD_CONSUMER_GROUP.consumerGroupId, + TEST_CCLOUD_CONSUMER_GROUP, + ); + + await provider.reveal(TEST_CCLOUD_CONSUMER_GROUP, { select: true }); + + sinon.assert.calledOnceWithExactly(treeViewRevealStub, TEST_CCLOUD_CONSUMER_GROUP, { + select: true, + }); + }); + + it("should reveal a Consumer within its parent ConsumerGroup", async () => { + provider["consumerGroupsInTreeView"].set( + TEST_CCLOUD_CONSUMER_GROUP.consumerGroupId, + TEST_CCLOUD_CONSUMER_GROUP, + ); + + await provider.reveal(TEST_CCLOUD_CONSUMER, { focus: true }); + + sinon.assert.calledOnce(treeViewRevealStub); + const revealedItem = treeViewRevealStub.firstCall.args[0]; + assert.ok(revealedItem instanceof Consumer); + assert.strictEqual(revealedItem.consumerId, TEST_CCLOUD_CONSUMER.consumerId); + }); + + it("should not reveal a ConsumerGroup when not in the cache", async () => { + await provider.reveal(TEST_CCLOUD_CONSUMER_GROUP); + + sinon.assert.notCalled(treeViewRevealStub); + }); + + it("should reveal a KafkaClusterResourceContainer (consumer groups container)", async () => { + provider["consumerGroupsContainer"] = new KafkaClusterResourceContainer( + TEST_CCLOUD_KAFKA_CLUSTER.connectionId, + TEST_CCLOUD_KAFKA_CLUSTER.connectionType, + "Consumer Groups", + ); + const lookupContainer = new KafkaClusterResourceContainer( + CCLOUD_CONNECTION_ID, + ConnectionType.Ccloud, + "Consumer Groups", + ); + + await provider.reveal(lookupContainer, { focus: true }); + + sinon.assert.calledOnceWithExactly( + treeViewRevealStub, + provider["consumerGroupsContainer"], + { focus: true }, + ); + }); }); describe("event handlers", () => { diff --git a/src/viewProviders/topics.ts b/src/viewProviders/topics.ts index 080ad95350..dda7fe38bf 100644 --- a/src/viewProviders/topics.ts +++ b/src/viewProviders/topics.ts @@ -1,5 +1,5 @@ import type { Disposable, TreeItem } from "vscode"; -import { window } from "vscode"; +import { ThemeIcon, TreeItemCollapsibleState } from "vscode"; import { ContextValues } from "../context/values"; import type { EnvironmentChangeEvent, @@ -16,19 +16,39 @@ import { topicSearchSet, topicsViewResourceChanged, } from "../emitters"; +import { IconNames } from "../icons"; import { ResourceLoader } from "../loaders"; import { TopicFetchError } from "../loaders/utils/loaderUtils"; +import { + Consumer, + ConsumerGroup, + ConsumerGroupTreeItem, + ConsumerTreeItem, +} from "../models/consumerGroup"; +import { + KafkaClusterContainerLabel, + KafkaClusterResourceContainer, +} from "../models/containers/kafkaClusterResourceContainer"; import { KafkaCluster } from "../models/kafkaCluster"; +import { CustomMarkdownString } from "../models/main"; import { isCCloud, isLocal } from "../models/resource"; import { Schema, SchemaTreeItem, Subject, SubjectTreeItem } from "../models/schema"; import { KafkaTopic, KafkaTopicTreeItem } from "../models/topic"; +import { showErrorNotificationWithButtons } from "../notifications"; import { ParentedBaseViewProvider } from "./baseModels/parentedBase"; /** * The types managed by the {@link TopicViewProvider}, which are converted to their appropriate tree item * type via the {@link TopicViewProvider provider's} {@linkcode TopicViewProvider.getTreeItem() .getTreeItem()} method. */ -type TopicViewProviderData = KafkaTopic | Subject | Schema; +type TopicViewProviderData = + | KafkaClusterResourceContainer + | KafkaClusterResourceContainer + | ConsumerGroup + | Consumer + | KafkaTopic + | Subject + | Schema; /** * Provider for the "Topics" view resources. @@ -48,6 +68,8 @@ export class TopicViewProvider extends ParentedBaseViewProvider< searchContextValue = ContextValues.topicSearchApplied; searchChangedEmitter = topicSearchSet; + /** Container for topics in this cluster (expanded by default). */ + private topicsContainer: KafkaClusterResourceContainer | null = null; /** Map of topic name -> {@link KafkaTopic} instance currently in the tree view. */ private topicsInTreeView: Map = new Map(); /** Map of subject name -> {@link Subject} instance currently in the tree view. */ @@ -55,10 +77,16 @@ export class TopicViewProvider extends ParentedBaseViewProvider< /** Map of subject name -> parent {@link KafkaTopic} for easy parent lookup. */ private subjectToTopicMap: Map = new Map(); + /** Container for consumer groups in this cluster. */ + private consumerGroupsContainer: KafkaClusterResourceContainer | null = null; + /** Map of consumer group ID -> {@link ConsumerGroup} instance currently in the tree view. */ + private consumerGroupsInTreeView: Map = new Map(); + private clearCaches(): void { this.topicsInTreeView.clear(); this.subjectsInTreeView.clear(); this.subjectToTopicMap.clear(); + this.consumerGroupsInTreeView.clear(); } get kafkaCluster(): KafkaCluster | null { @@ -77,8 +105,20 @@ export class TopicViewProvider extends ParentedBaseViewProvider< let children: TopicViewProviderData[] = []; if (!element) { - // top-level: show topics - children = Array.from(this.topicsInTreeView.values()); + // top-level: show consumer groups first, then topics + const containers: TopicViewProviderData[] = []; + if (this.consumerGroupsContainer) containers.push(this.consumerGroupsContainer); + if (this.topicsContainer) containers.push(this.topicsContainer); + children = containers; + } else if (element instanceof KafkaClusterResourceContainer) { + // expanding a container to show its children (topics or consumer groups) + children = element.children; + } else if (element instanceof ConsumerGroup) { + // expanding a consumer group to show its members + const cachedGroup = this.consumerGroupsInTreeView.get(element.consumerGroupId); + if (cachedGroup) { + children = cachedGroup.members; + } } else if (element instanceof KafkaTopic) { // expanding a topic to show its subject(s) const cachedTopic: KafkaTopic | undefined = this.topicsInTreeView.get(element.name); @@ -107,7 +147,13 @@ export class TopicViewProvider extends ParentedBaseViewProvider< getTreeItem(element: TopicViewProviderData): TreeItem { let treeItem: TreeItem; - if (element instanceof KafkaTopic) { + if (element instanceof KafkaClusterResourceContainer) { + treeItem = element; + } else if (element instanceof ConsumerGroup) { + treeItem = new ConsumerGroupTreeItem(element); + } else if (element instanceof Consumer) { + treeItem = new ConsumerTreeItem(element); + } else if (element instanceof KafkaTopic) { treeItem = new KafkaTopicTreeItem(element); } else if (element instanceof Subject) { treeItem = new SubjectTreeItem(element); @@ -147,22 +193,63 @@ export class TopicViewProvider extends ParentedBaseViewProvider< this.clearCaches(); if (!this.kafkaCluster) { // nothing focused; return to empty state + this.topicsContainer = null; + this.consumerGroupsContainer = null; this._onDidChangeTreeData.fire(); return; } const cluster: KafkaCluster = this.kafkaCluster; - await this.withProgress("Loading topics...", async () => { - await this.refreshTopics(cluster, forceDeepRefresh); + await this.withProgress("Loading topics and consumer groups...", async () => { + // reuse existing containers so VS Code can match targeted tree-data-change fires + // against the same object references it already tracks; only create when absent + // (e.g. first load after reset or cluster change) + if (!this.topicsContainer) { + this.topicsContainer = new KafkaClusterResourceContainer( + cluster.connectionId, + cluster.connectionType, + KafkaClusterContainerLabel.TOPICS, + [], + "topics-container", + new ThemeIcon(IconNames.TOPIC), + ); + this.topicsContainer.collapsibleState = TreeItemCollapsibleState.Expanded; + } + + if (!this.consumerGroupsContainer) { + this.consumerGroupsContainer = new KafkaClusterResourceContainer( + cluster.connectionId, + cluster.connectionType, + KafkaClusterContainerLabel.CONSUMER_GROUPS, + [], + undefined, // no context value for now since no commands are needed yet for this container + new ThemeIcon(IconNames.CONSUMER_GROUP), + ); + } + + await Promise.allSettled([ + this.refreshTopics(cluster, forceDeepRefresh), + this.refreshConsumerGroups(cluster, forceDeepRefresh), + ]); }); } async refreshTopics(cluster: KafkaCluster, forceDeepRefresh: boolean): Promise { + if (!this.topicsContainer) { + return; + } + this.topicsContainer.setLoading(); + this._onDidChangeTreeData.fire(this.topicsContainer); + + // clear stale entries before repopulating + this.topicsInTreeView.clear(); + this.subjectsInTreeView.clear(); + this.subjectToTopicMap.clear(); + const loader = ResourceLoader.getInstance(cluster.connectionId); try { const topics = await loader.getTopicsForCluster(cluster, forceDeepRefresh); topics.forEach((topic) => { - // update topic and subject caches before firing change event this.topicsInTreeView.set(topic.name, topic); if (topic.children && topic.children.length > 0) { topic.children.forEach((subject) => { @@ -171,16 +258,60 @@ export class TopicViewProvider extends ParentedBaseViewProvider< }); } }); + this.topicsContainer.setLoaded(topics); } catch (err) { this.logger.error("Error fetching topics for cluster", cluster, err); + const message = err instanceof Error ? err.message : String(err); + this.topicsContainer.setError( + new CustomMarkdownString() + .addWarning(`Failed to load topics for **${cluster.name}**:`) + .addCodeBlock(message), + ); if (err instanceof TopicFetchError) { - window.showErrorMessage( + void showErrorNotificationWithButtons( `Failed to list topics for cluster "${cluster.name}": ${err.message}`, ); } } - this._onDidChangeTreeData.fire(); + this._onDidChangeTreeData.fire(this.topicsContainer); + } + + /** Fetch and cache consumer groups for the focused cluster. */ + async refreshConsumerGroups( + cluster: KafkaCluster, + forceDeepRefresh: boolean = false, + ): Promise { + if (!this.consumerGroupsContainer) { + return; + } + this.consumerGroupsContainer.setLoading(); + this._onDidChangeTreeData.fire(this.consumerGroupsContainer); + + // clear stale entries before repopulating + this.consumerGroupsInTreeView.clear(); + + const loader = ResourceLoader.getInstance(cluster.connectionId); + try { + const consumerGroups = await loader.getConsumerGroupsForCluster(cluster, forceDeepRefresh); + consumerGroups.forEach((group) => { + this.consumerGroupsInTreeView.set(group.consumerGroupId, group); + }); + this.consumerGroupsContainer.setLoaded(consumerGroups); + } catch (err) { + this.logger.error("Error fetching consumer groups for cluster", cluster, err); + const message = err instanceof Error ? err.message : String(err); + this.consumerGroupsContainer.setError( + new CustomMarkdownString() + .addWarning(`Failed to load consumer groups for **${cluster.name}**:`) + .addCodeBlock(message), + ); + void showErrorNotificationWithButtons( + `Failed to load consumer groups for cluster "${cluster.name}": ${message}`, + ); + } + + this._onDidChangeTreeData.fire(this.consumerGroupsContainer); } /** Fetch and cache {@link Schema schemas} for a specific {@link Subject subject}. */ @@ -202,10 +333,19 @@ export class TopicViewProvider extends ParentedBaseViewProvider< /** Get the parent of the given element, or `undefined` if it's a root-level item. */ getParent(element: TopicViewProviderData): TopicViewProviderData | undefined { - if (element instanceof KafkaTopic) { + if (element instanceof KafkaClusterResourceContainer) { // root-level item return; } + if (element instanceof ConsumerGroup) { + return this.consumerGroupsContainer ?? undefined; + } + if (element instanceof Consumer) { + return this.consumerGroupsInTreeView.get(element.consumerGroupId); + } + if (element instanceof KafkaTopic) { + return this.topicsContainer ?? undefined; + } if (element instanceof Subject) { return this.subjectToTopicMap.get(element.name); } @@ -220,10 +360,24 @@ export class TopicViewProvider extends ParentedBaseViewProvider< options?: { select?: boolean; focus?: boolean }, ): Promise { // callers likely won't have the exact instance in the provider's cache(s), so we need to - // find the instance (originally returned by getChildren()) by name + // find the instance (originally returned by getChildren()) by name/id let itemToReveal: TopicViewProviderData | undefined; - if (item instanceof KafkaTopic) { + if (item instanceof KafkaClusterResourceContainer) { + // match by tree item id against the known container instances + if (this.topicsContainer?.id === item.id) { + itemToReveal = this.topicsContainer; + } else if (this.consumerGroupsContainer?.id === item.id) { + itemToReveal = this.consumerGroupsContainer; + } + } else if (item instanceof ConsumerGroup) { + itemToReveal = this.consumerGroupsInTreeView.get(item.consumerGroupId); + } else if (item instanceof Consumer) { + const parentGroup = this.consumerGroupsInTreeView.get(item.consumerGroupId); + if (parentGroup) { + itemToReveal = parentGroup.members.find((member) => member.consumerId === item.consumerId); + } + } else if (item instanceof KafkaTopic) { itemToReveal = this.topicsInTreeView.get(item.name); } else if (item instanceof Subject) { itemToReveal = this.subjectsInTreeView.get(item.name); @@ -244,6 +398,8 @@ export class TopicViewProvider extends ParentedBaseViewProvider< } async reset(): Promise { + this.topicsContainer = null; + this.consumerGroupsContainer = null; this.clearCaches(); await super.reset(); } diff --git a/tests/e2e/objects/views/TopicsView.ts b/tests/e2e/objects/views/TopicsView.ts index ec30b578bb..6f12df1547 100644 --- a/tests/e2e/objects/views/TopicsView.ts +++ b/tests/e2e/objects/views/TopicsView.ts @@ -6,6 +6,7 @@ import { Quickpick } from "../quickInputs/Quickpick"; import { ResourcesView } from "./ResourcesView"; import { SearchableView } from "./View"; import { TopicItem } from "./viewItems/TopicItem"; +import { ViewItem } from "./viewItems/ViewItem"; export enum SelectKafkaCluster { FromResourcesView = "Kafka cluster action from the Resources view", @@ -29,9 +30,10 @@ export class TopicsView extends SearchableView { await this.clickNavAction("Search"); } - /** Click the "Create Topic" nav action in the view title area. */ + /** Click the "Create Topic" inline action on the Topics container item. */ async clickCreateTopic(): Promise { - await this.clickNavAction("Create Topic"); + const containerItem = new ViewItem(this.page, this.topicsContainer); + await containerItem.clickInlineAction("Create Topic"); } /** @@ -49,9 +51,14 @@ export class TopicsView extends SearchableView { await expect(this.progressIndicator).toBeHidden(); } - /** Get all (root-level) topic items in the view. */ + /** Get the "Topics" container at the root level of the tree. */ + get topicsContainer(): Locator { + return this.body.locator("[role='treeitem'][aria-level='1']").filter({ hasText: "Topics" }); + } + + /** Get all topic items in the view (filtered by aria-label to exclude consumer groups). */ get topics(): Locator { - return this.body.locator("[role='treeitem'][aria-level='1']"); + return this.body.locator("[role='treeitem'][aria-label^='Kafka Topic:']"); } /** Get a topic item by its label/name. */ @@ -77,8 +84,7 @@ export class TopicsView extends SearchableView { * (One level below {@link topicsWithSchemas topic items with schemas}.) */ get subjects(): Locator { - // we don't use `this.topicsWithSchemas` because these are sibling elements to topics in the DOM - return this.body.locator("[role='treeitem'][aria-level='2']"); + return this.body.locator("[role='treeitem'][aria-label^='Schema Subject:']"); } /** @@ -87,7 +93,7 @@ export class TopicsView extends SearchableView { */ get schemaVersions(): Locator { // we don't use `this.subjects` because these are sibling elements to subjects in the DOM - return this.body.locator("[role='treeitem'][aria-level='3']"); + return this.body.locator("[role='treeitem'][aria-level='4']"); } /** @@ -130,8 +136,8 @@ export class TopicsView extends SearchableView { } /** - * Create a new topic using the "Create Topic" nav action in the view title area, filling out the - * required inputs in the subsequent input boxes. + * Create a new topic using the "Create Topic" inline action on the Topics container, filling out + * the required inputs in the subsequent input boxes. * * @param topicName The name of the new topic to create. * @param numPartitions (Optional) The number of partitions for the new topic. If not provided,