From fa4b0c01f005d021b99966219b5323829b9ca13b Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Mon, 26 Jan 2026 14:42:15 -0500 Subject: [PATCH 01/18] set up initial consumer group models --- src/models/consumerGroup.ts | 400 ++++++++++++++++++++++++++++++++++++ 1 file changed, 400 insertions(+) create mode 100644 src/models/consumerGroup.ts diff --git a/src/models/consumerGroup.ts b/src/models/consumerGroup.ts new file mode 100644 index 0000000000..6474ebba7e --- /dev/null +++ b/src/models/consumerGroup.ts @@ -0,0 +1,400 @@ +import * as vscode from "vscode"; +import { ConnectionType } from "../clients/sidecar"; +import { ERROR_ICON, IconNames } from "../icons"; +import { Logger } from "../logging"; +import type { IdItem } from "./main"; +import { CustomMarkdownString } from "./main"; +import type { ConnectionId, EnvironmentId, IResourceBase, ISearchable } from "./resource"; + +/** + * Consumer group states as returned by the Kafka REST API. + * @see https://kafka.apache.org/20/javadoc/org/apache/kafka/common/ConsumerGroupState.html + */ +export enum ConsumerGroupState { + Dead = "Dead", + Empty = "Empty", + PreparingRebalance = "PreparingRebalance", + CompletingRebalance = "CompletingRebalance", + Stable = "Stable", + Unknown = "Unknown", +} + +export function parseConsumerGroupState(state: string): ConsumerGroupState { + switch (state) { + case "Dead": + return ConsumerGroupState.Dead; + case "Empty": + return ConsumerGroupState.Empty; + case "PreparingRebalance": + return ConsumerGroupState.PreparingRebalance; + case "CompletingRebalance": + return ConsumerGroupState.CompletingRebalance; + case "Stable": + return ConsumerGroupState.Stable; + default: + return ConsumerGroupState.Unknown; + } +} + +/** Main class representing a Kafka consumer group. */ +export class ConsumerGroup implements IResourceBase, ISearchable, IdItem { + connectionId: ConnectionId; + connectionType: ConnectionType; + environmentId: EnvironmentId; + clusterId: string; + + /** The broker ID of the group coordinator. */ + coordinatorId: number | null; + /** The partition assignor strategy (e.g., "range", "roundrobin", "sticky"). */ + partitionAssignor: string; + + consumerGroupId: string; + state: ConsumerGroupState; + members: Consumer[] = []; + isSimple: boolean; + + // https://github.com/confluentinc/vscode/issues/3232 + iconName: IconNames = IconNames.PLACEHOLDER; + + constructor( + props: Pick< + ConsumerGroup, + | "connectionId" + | "connectionType" + | "environmentId" + | "clusterId" + | "coordinatorId" + | "partitionAssignor" + | "consumerGroupId" + | "state" + | "members" + | "isSimple" + >, + ) { + this.connectionId = props.connectionId; + this.connectionType = props.connectionType; + this.environmentId = props.environmentId; + this.clusterId = props.clusterId; + + this.coordinatorId = props.coordinatorId; + this.partitionAssignor = props.partitionAssignor; + + this.consumerGroupId = props.consumerGroupId; + this.state = props.state; + this.members = props.members ?? []; + this.isSimple = props.isSimple; + } + + get id(): string { + return `${this.clusterId}-${this.consumerGroupId}`; + } + + get hasMembers(): boolean { + return this.members.length > 0; + } + + /** Whether the consumer group is in a state that allows offset resets. */ + get canResetOffsets(): boolean { + const resettableStates = [ConsumerGroupState.Empty, ConsumerGroupState.Dead]; + return resettableStates.includes(this.state); + } + + searchableText(): string { + return this.consumerGroupId; + } + + ccloudUrl(): string { + if (this.connectionType !== ConnectionType.Ccloud) { + return ""; + } + return `https://confluent.cloud/environments/${this.environmentId}/clusters/${this.clusterId}/clients/consumer-lag/${this.consumerGroupId}`; + } +} + +/** A member (consumer instance) of a {@link ConsumerGroup}. */ +export class Consumer implements IResourceBase, ISearchable, IdItem { + connectionId: ConnectionId; + connectionType: ConnectionType; + environmentId: EnvironmentId; + clusterId: string; + consumerGroupId: string; + + consumerId: string; + clientId: string; + instanceId: string | null; + + // https://github.com/confluentinc/vscode/issues/3233 + iconName: IconNames = IconNames.PLACEHOLDER; + + constructor( + props: Pick< + Consumer, + | "connectionId" + | "connectionType" + | "environmentId" + | "clusterId" + | "consumerGroupId" + | "consumerId" + | "clientId" + | "instanceId" + >, + ) { + this.connectionId = props.connectionId; + this.connectionType = props.connectionType; + this.environmentId = props.environmentId; + this.clusterId = props.clusterId; + this.consumerGroupId = props.consumerGroupId; + + this.consumerId = props.consumerId; + this.clientId = props.clientId; + this.instanceId = props.instanceId ?? null; + } + + get id(): string { + return `${this.clusterId}-${this.consumerGroupId}-${this.consumerId}`; + } + + searchableText(): string { + return `${this.consumerId} ${this.clientId}`; + } + + ccloudUrl(): string { + if (this.connectionType !== ConnectionType.Ccloud) { + return ""; + } + return `https://confluent.cloud/environments/${this.environmentId}/clusters/${this.clusterId}/clients/consumers/${this.clientId}`; + } +} + +/** Tree item representation for a {@link ConsumerGroup}. */ +export class ConsumerGroupTreeItem extends vscode.TreeItem { + resource: ConsumerGroup; + + constructor(resource: ConsumerGroup) { + super(resource.consumerGroupId); + + this.id = resource.id; + this.resource = resource; + // includes state for conditional menu visibility, like: + // "ccloud-consumerGroup-Stable" or "local-consumerGroup-Empty" + this.contextValue = `${resource.connectionType.toLowerCase()}-consumerGroup-${resource.state}`; + + this.collapsibleState = vscode.TreeItemCollapsibleState.Collapsed; + this.description = resource.state; + this.iconPath = getConsumerGroupIcon(resource); + this.tooltip = createConsumerGroupTooltip(resource); + } +} + +function getConsumerGroupIcon(group: ConsumerGroup): vscode.ThemeIcon { + let stateColor: string | undefined; + switch (group.state) { + case ConsumerGroupState.Stable: + // Green for stable/healthy + stateColor = "testing.iconPassed"; + break; + case ConsumerGroupState.Empty: + // Yellow/warning for empty (no active consumers) + stateColor = "problemsWarningIcon.foreground"; + break; + case ConsumerGroupState.Dead: + // Red for dead + stateColor = "problemsErrorIcon.foreground"; + break; + case ConsumerGroupState.PreparingRebalance: + case ConsumerGroupState.CompletingRebalance: + stateColor = "notificationsInfoIcon.foreground"; + break; + } + return new vscode.ThemeIcon( + group.iconName, + stateColor ? new vscode.ThemeColor(stateColor) : undefined, + ); +} + +function createConsumerGroupTooltip(resource: ConsumerGroup): CustomMarkdownString { + const tooltip = new CustomMarkdownString() + .addHeader("Consumer Group", resource.iconName) + .addField("Group ID", resource.consumerGroupId) + .addField("State", resource.state) + .addField("Partition Assignor", resource.partitionAssignor) + .addField("Simple Consumer", resource.isSimple ? "Yes" : "No"); + + if (resource.coordinatorId !== null) { + tooltip.addField("Coordinator Broker", resource.coordinatorId.toString()); + } + + if (resource.hasMembers) { + tooltip.addField("Members", resource.members.length.toString()); + } + + // warnings for non-stable states + if (resource.state === ConsumerGroupState.Empty) { + tooltip.addWarning("No active consumers in this group."); + } else if (resource.state === ConsumerGroupState.Dead) { + tooltip.addWarning("Consumer group is dead and will be removed."); + } else if ( + resource.state === ConsumerGroupState.PreparingRebalance || + resource.state === ConsumerGroupState.CompletingRebalance + ) { + tooltip.addWarning("Consumer group is currently rebalancing."); + } + + return tooltip; +} + +/** Tree item representation for a {@link Consumer}. */ +export class ConsumerTreeItem extends vscode.TreeItem { + resource: Consumer; + + constructor(resource: Consumer) { + const label = resource.clientId + ? `${resource.consumerId} (client: ${resource.clientId})` + : resource.consumerId; + super(label); + + this.id = resource.id; + this.resource = resource; + this.contextValue = `${resource.connectionType.toLowerCase()}-consumerGroup-member`; + + this.collapsibleState = vscode.TreeItemCollapsibleState.None; + + this.iconPath = new vscode.ThemeIcon(resource.iconName); + this.tooltip = createConsumerGroupMemberTooltip(resource); + } +} + +/** + * Create a tooltip for a consumer group member. + * @param resource The consumer group member resource. + * @returns A CustomMarkdownString with formatted tooltip content. + */ +function createConsumerGroupMemberTooltip(resource: Consumer): CustomMarkdownString { + const tooltip = new CustomMarkdownString() + .addHeader("Consumer", IconNames.PLACEHOLDER) + .addField("Consumer ID", resource.consumerId) + .addField("Client ID", resource.clientId) + .addField("Group", resource.consumerGroupId); + + if (resource.instanceId) { + tooltip.addField("Instance ID", resource.instanceId); + } + + return tooltip; +} + +// TODO: merge with FlinkDatabaseResourceContainer? + +/** Poll interval to use when waiting for a container to finish loading. */ +const LOADING_POLL_INTERVAL_MS = 100; + +/** A container {@link TreeItem} for consumer groups in the Topics view. */ +export class ConsumerGroupContainer extends vscode.TreeItem implements ISearchable { + readonly connectionId: ConnectionId; + readonly connectionType: ConnectionType; + readonly clusterId: string; + readonly environmentId: EnvironmentId; + + // `id` is string|undefined in TreeItem, but we need it to be a string for IdItem + id: string; + + private _children: ConsumerGroup[]; + private _isLoading: boolean = false; + private _hasError: boolean = false; + private readonly _defaultContextValue: string = "consumerGroups-container"; + private readonly _defaultIcon: vscode.ThemeIcon; + + private logger: Logger; + + constructor( + connectionId: ConnectionId, + connectionType: ConnectionType, + clusterId: string, + environmentId: EnvironmentId, + children: ConsumerGroup[] = [], + ) { + super("Consumer Groups", vscode.TreeItemCollapsibleState.Collapsed); + + this.connectionId = connectionId; + this.connectionType = connectionType; + this.clusterId = clusterId; + this.environmentId = environmentId; + this._children = children; + + this.id = `${connectionId}-${clusterId}-consumer-groups`; + this.contextValue = this._defaultContextValue; + this._defaultIcon = new vscode.ThemeIcon("symbol-event"); + this.iconPath = this._defaultIcon; + + this.logger = new Logger("models.ConsumerGroupContainer"); + } + + /** + * Consumer groups belonging to this container. + * Setting this will clear the internal {@linkcode isLoading} state. + * If the children array has items, this will also set {@linkcode hasError} to `false`. + */ + get children(): ConsumerGroup[] { + return this._children; + } + + set children(children: ConsumerGroup[]) { + this._children = children; + this.isLoading = false; + this.description = `(${children.length})`; + + if (children.length > 0) { + this.hasError = false; + } + } + + get isLoading(): boolean { + return this._isLoading; + } + + set isLoading(loading: boolean) { + this._isLoading = loading; + this.iconPath = loading ? new vscode.ThemeIcon(IconNames.LOADING) : this._defaultIcon; + } + + get hasError(): boolean { + return this._hasError; + } + + /** Set or clear the error state for this container. */ + set hasError(error: boolean) { + this._hasError = error; + this.iconPath = error ? ERROR_ICON : this._defaultIcon; + + // Append or remove "-error" suffix to context value based on error state + this.contextValue = error ? `${this._defaultContextValue}-error` : this._defaultContextValue; + } + + searchableText(): string { + return "Consumer Groups"; + } + + /** Wait until the container is no longer in a loading state, or timeout after timeoutMs. */ + async ensureDoneLoading(timeoutMs: number = 10000): Promise { + const startTime = Date.now(); + + while (this.isLoading) { + if (Date.now() - startTime >= timeoutMs) { + throw new Error("Timeout waiting for container to finish loading"); + } + await new Promise((resolve) => setTimeout(resolve, LOADING_POLL_INTERVAL_MS)); + } + } + + /** Get the container's resources, waiting for loading to complete if necessary. */ + async gatherResources(timeoutMs: number = 10000): Promise { + let resources: ConsumerGroup[] = []; + try { + await this.ensureDoneLoading(timeoutMs); + resources = this.children; + } catch (error) { + this.logger.error(`Error getting resources: ${error}`); + } + return resources; + } +} From 6fd77ae1fef9c98353670f9b01197905c8d18212 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Mon, 26 Jan 2026 14:42:47 -0500 Subject: [PATCH 02/18] add comments for consumer group context values --- src/extension.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/extension.ts b/src/extension.ts index 763bc19fd0..6a33b0e20b 100644 --- a/src/extension.ts +++ b/src/extension.ts @@ -390,6 +390,7 @@ async function setupContextValues() { "ccloud-flinkable-kafka-cluster", "ccloud-kafka-topic", "ccloud-kafka-topic-with-schema", + // consumer groups and consumers have dynamic context values based on state; can't include here "ccloud-schema-registry", "ccloud-flink-compute-pool", "ccloud-flink-statement", @@ -410,6 +411,7 @@ async function setupContextValues() { "flinkable-ccloud-environment", "ccloud-kafka-cluster", "ccloud-flinkable-kafka-cluster", + // consumer groups and consumers have dynamic context values based on state; can't include here "ccloud-schema-registry", // only ID, no name "ccloud-flink-compute-pool", "ccloud-flink-artifact", From 3a5863d5683ace8d6d81b21132bed77930b57625 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Mon, 26 Jan 2026 14:46:22 -0500 Subject: [PATCH 03/18] set up consumer + group test helpers --- tests/unit/testResources/consumerGroup.ts | 117 ++++++++++++++++++++++ 1 file changed, 117 insertions(+) create mode 100644 tests/unit/testResources/consumerGroup.ts diff --git a/tests/unit/testResources/consumerGroup.ts b/tests/unit/testResources/consumerGroup.ts new file mode 100644 index 0000000000..ff61595f7d --- /dev/null +++ b/tests/unit/testResources/consumerGroup.ts @@ -0,0 +1,117 @@ +import { ConnectionType } from "../../../src/clients/sidecar"; +import { CCLOUD_CONNECTION_ID, LOCAL_CONNECTION_ID } from "../../../src/constants"; +import { Consumer, ConsumerGroup, ConsumerGroupState } from "../../../src/models/consumerGroup"; +import { TEST_DIRECT_CONNECTION_ID } from "./connection"; +import { + TEST_CCLOUD_KAFKA_CLUSTER, + TEST_DIRECT_KAFKA_CLUSTER, + TEST_LOCAL_KAFKA_CLUSTER, +} from "./kafkaCluster"; + +/** Create a {@link ConsumerGroup} for testing purposes. */ +export function createConsumerGroup( + args: { + connectionId: string; + connectionType: ConnectionType; + environmentId: string; + clusterId: string; + } & Partial, +): ConsumerGroup { + return new ConsumerGroup({ + connectionId: args.connectionId, + connectionType: args.connectionType, + environmentId: args.environmentId, + clusterId: args.clusterId, + consumerGroupId: args.consumerGroupId ?? "test-consumer-group", + state: args.state ?? ConsumerGroupState.Stable, + isSimple: args.isSimple ?? false, + partitionAssignor: args.partitionAssignor ?? "range", + coordinatorId: args.coordinatorId ?? 0, + members: args.members ?? [], + }); +} + +/** Create a {@link Consumer} for testing purposes. */ +export function createConsumerGroupMember( + args: { + connectionId: string; + connectionType: ConnectionType; + environmentId: string; + clusterId: string; + consumerGroupId: string; + } & Partial, +): Consumer { + return new Consumer({ + connectionId: args.connectionId, + connectionType: args.connectionType, + environmentId: args.environmentId, + clusterId: args.clusterId, + consumerGroupId: args.consumerGroupId, + consumerId: args.consumerId ?? "test-consumer-1", + clientId: args.clientId ?? "test-client", + instanceId: args.instanceId ?? null, + }); +} + +export const TEST_CCLOUD_CONSUMER_GROUP_ID = "test-ccloud-consumer-group"; +export const TEST_CCLOUD_CONSUMER_GROUP = createConsumerGroup({ + connectionId: CCLOUD_CONNECTION_ID, + connectionType: ConnectionType.Ccloud, + environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id, + consumerGroupId: TEST_CCLOUD_CONSUMER_GROUP_ID, + members: [ + createConsumerGroupMember({ + connectionId: CCLOUD_CONNECTION_ID, + connectionType: ConnectionType.Ccloud, + environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id, + consumerGroupId: TEST_CCLOUD_CONSUMER_GROUP_ID, + consumerId: "consumer-ccloud-1", + clientId: "my-ccloud-app", + }), + ], +}); +export const TEST_CCLOUD_CONSUMER = TEST_CCLOUD_CONSUMER_GROUP.members[0]; + +export const TEST_DIRECT_CONSUMER_GROUP_ID = "test-direct-consumer-group"; +export const TEST_DIRECT_CONSUMER_GROUP = createConsumerGroup({ + connectionId: TEST_DIRECT_CONNECTION_ID, + connectionType: ConnectionType.Direct, + environmentId: TEST_DIRECT_KAFKA_CLUSTER.environmentId, + clusterId: TEST_DIRECT_KAFKA_CLUSTER.id, + consumerGroupId: TEST_DIRECT_CONSUMER_GROUP_ID, + members: [ + createConsumerGroupMember({ + connectionId: TEST_DIRECT_CONNECTION_ID, + connectionType: ConnectionType.Direct, + environmentId: TEST_DIRECT_KAFKA_CLUSTER.environmentId, + clusterId: TEST_DIRECT_KAFKA_CLUSTER.id, + consumerGroupId: TEST_DIRECT_CONSUMER_GROUP_ID, + consumerId: "consumer-direct-1", + clientId: "my-direct-app", + }), + ], +}); +export const TEST_DIRECT_CONSUMER = TEST_DIRECT_CONSUMER_GROUP.members[0]; + +export const TEST_LOCAL_CONSUMER_GROUP_ID = "test-local-consumer-group"; +export const TEST_LOCAL_CONSUMER_GROUP = createConsumerGroup({ + connectionId: LOCAL_CONNECTION_ID, + connectionType: ConnectionType.Local, + environmentId: TEST_LOCAL_KAFKA_CLUSTER.environmentId, + clusterId: TEST_LOCAL_KAFKA_CLUSTER.id, + consumerGroupId: TEST_LOCAL_CONSUMER_GROUP_ID, + members: [ + createConsumerGroupMember({ + connectionId: LOCAL_CONNECTION_ID, + connectionType: ConnectionType.Local, + environmentId: TEST_LOCAL_KAFKA_CLUSTER.environmentId, + clusterId: TEST_LOCAL_KAFKA_CLUSTER.id, + consumerGroupId: TEST_LOCAL_CONSUMER_GROUP_ID, + consumerId: "consumer-local-1", + clientId: "my-local-app", + }), + ], +}); +export const TEST_LOCAL_CONSUMER = TEST_LOCAL_CONSUMER_GROUP.members[0]; From bb0219cd640dde5e6a5cddc3b53805b40aa6407b Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Tue, 27 Jan 2026 17:26:04 -0500 Subject: [PATCH 04/18] use custom consumer group icon --- src/models/consumerGroup.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/models/consumerGroup.ts b/src/models/consumerGroup.ts index 6474ebba7e..184a863dda 100644 --- a/src/models/consumerGroup.ts +++ b/src/models/consumerGroup.ts @@ -53,8 +53,7 @@ export class ConsumerGroup implements IResourceBase, ISearchable, IdItem { members: Consumer[] = []; isSimple: boolean; - // https://github.com/confluentinc/vscode/issues/3232 - iconName: IconNames = IconNames.PLACEHOLDER; + iconName: IconNames = IconNames.CONSUMER_GROUP; constructor( props: Pick< @@ -323,7 +322,7 @@ export class ConsumerGroupContainer extends vscode.TreeItem implements ISearchab this.id = `${connectionId}-${clusterId}-consumer-groups`; this.contextValue = this._defaultContextValue; - this._defaultIcon = new vscode.ThemeIcon("symbol-event"); + this._defaultIcon = new vscode.ThemeIcon(IconNames.CONSUMER_GROUP); this.iconPath = this._defaultIcon; this.logger = new Logger("models.ConsumerGroupContainer"); From 583dee4f89d5b72dd5beee5974da9cfaa7e78b79 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Fri, 6 Feb 2026 21:23:10 -0500 Subject: [PATCH 05/18] remove unnecessary parse function --- src/models/consumerGroup.ts | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/src/models/consumerGroup.ts b/src/models/consumerGroup.ts index 184a863dda..74eabae09a 100644 --- a/src/models/consumerGroup.ts +++ b/src/models/consumerGroup.ts @@ -19,23 +19,6 @@ export enum ConsumerGroupState { Unknown = "Unknown", } -export function parseConsumerGroupState(state: string): ConsumerGroupState { - switch (state) { - case "Dead": - return ConsumerGroupState.Dead; - case "Empty": - return ConsumerGroupState.Empty; - case "PreparingRebalance": - return ConsumerGroupState.PreparingRebalance; - case "CompletingRebalance": - return ConsumerGroupState.CompletingRebalance; - case "Stable": - return ConsumerGroupState.Stable; - default: - return ConsumerGroupState.Unknown; - } -} - /** Main class representing a Kafka consumer group. */ export class ConsumerGroup implements IResourceBase, ISearchable, IdItem { connectionId: ConnectionId; From 95899a105e633c6722a215fd5dcd07668af190b9 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Sun, 22 Feb 2026 08:42:33 -0500 Subject: [PATCH 06/18] update naming for consistency --- src/models/consumerGroup.ts | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/src/models/consumerGroup.ts b/src/models/consumerGroup.ts index 74eabae09a..7f035c9879 100644 --- a/src/models/consumerGroup.ts +++ b/src/models/consumerGroup.ts @@ -242,16 +242,11 @@ export class ConsumerTreeItem extends vscode.TreeItem { this.collapsibleState = vscode.TreeItemCollapsibleState.None; this.iconPath = new vscode.ThemeIcon(resource.iconName); - this.tooltip = createConsumerGroupMemberTooltip(resource); + this.tooltip = createConsumerTooltip(resource); } } -/** - * Create a tooltip for a consumer group member. - * @param resource The consumer group member resource. - * @returns A CustomMarkdownString with formatted tooltip content. - */ -function createConsumerGroupMemberTooltip(resource: Consumer): CustomMarkdownString { +function createConsumerTooltip(resource: Consumer): CustomMarkdownString { const tooltip = new CustomMarkdownString() .addHeader("Consumer", IconNames.PLACEHOLDER) .addField("Consumer ID", resource.consumerId) From 2bf92107917462ac311eafe56da704b4f30060df Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Mon, 23 Feb 2026 16:48:15 -0500 Subject: [PATCH 07/18] link docs for less obvious properties --- src/models/consumerGroup.ts | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/models/consumerGroup.ts b/src/models/consumerGroup.ts index 7f035c9879..ba1f757ed2 100644 --- a/src/models/consumerGroup.ts +++ b/src/models/consumerGroup.ts @@ -34,6 +34,11 @@ export class ConsumerGroup implements IResourceBase, ISearchable, IdItem { consumerGroupId: string; state: ConsumerGroupState; members: Consumer[] = []; + /** + * Whether the group uses manual partition assignment (`assign()`) rather than dynamic + * group coordination (`subscribe()`). Simple groups only use Kafka for offset storage. + * @see https://kafka.apache.org/26/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html + */ isSimple: boolean; iconName: IconNames = IconNames.CONSUMER_GROUP; @@ -103,6 +108,10 @@ export class Consumer implements IResourceBase, ISearchable, IdItem { consumerId: string; clientId: string; + /** + * Static group membership identifier (`group.instance.id`), or null if not configured. + * @see https://kafka.apache.org/26/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html + */ instanceId: string | null; // https://github.com/confluentinc/vscode/issues/3233 From 458aa01482657ea000fbaf85475a817d63482cd5 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Mon, 23 Feb 2026 16:53:33 -0500 Subject: [PATCH 08/18] add tests for consumer group related models --- src/models/consumerGroup.test.ts | 380 +++++++++++++++++++++++++++++++ 1 file changed, 380 insertions(+) create mode 100644 src/models/consumerGroup.test.ts diff --git a/src/models/consumerGroup.test.ts b/src/models/consumerGroup.test.ts new file mode 100644 index 0000000000..399b7af617 --- /dev/null +++ b/src/models/consumerGroup.test.ts @@ -0,0 +1,380 @@ +import * as assert from "assert"; +import type { MarkdownString, ThemeIcon } from "vscode"; +import { TreeItemCollapsibleState } from "vscode"; +import { + createConsumerGroup, + createConsumerGroupMember, + TEST_CCLOUD_CONSUMER, + TEST_CCLOUD_CONSUMER_GROUP, + TEST_CCLOUD_CONSUMER_GROUP_ID, + TEST_DIRECT_CONSUMER, + TEST_DIRECT_CONSUMER_GROUP, + TEST_LOCAL_CONSUMER, + TEST_LOCAL_CONSUMER_GROUP, +} from "../../tests/unit/testResources/consumerGroup"; +import { TEST_CCLOUD_KAFKA_CLUSTER } from "../../tests/unit/testResources/kafkaCluster"; +import { ConnectionType } from "../clients/sidecar"; +import { CCLOUD_CONNECTION_ID } from "../constants"; +import { IconNames } from "../icons"; +import { ConsumerGroupState, ConsumerGroupTreeItem, ConsumerTreeItem } from "./consumerGroup"; + +describe("models/consumerGroup.ts", () => { + describe("ConsumerGroup", () => { + describe("id", () => { + it("should return clusterId-consumerGroupId", () => { + assert.strictEqual( + TEST_CCLOUD_CONSUMER_GROUP.id, + `${TEST_CCLOUD_CONSUMER_GROUP.clusterId}-${TEST_CCLOUD_CONSUMER_GROUP_ID}`, + ); + }); + }); + + describe("hasMembers", () => { + it("should return true when members exist", () => { + assert.strictEqual(TEST_CCLOUD_CONSUMER_GROUP.hasMembers, true); + }); + + it("should return false when the members array is empty", () => { + const group = createConsumerGroup({ + connectionId: CCLOUD_CONNECTION_ID, + connectionType: ConnectionType.Ccloud, + environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id, + members: [], + }); + + assert.strictEqual(group.hasMembers, false); + }); + }); + + describe("canResetOffsets", () => { + const resettableStates = [ConsumerGroupState.Empty, ConsumerGroupState.Dead]; + const nonResettableStates = [ + ConsumerGroupState.Stable, + ConsumerGroupState.PreparingRebalance, + ConsumerGroupState.CompletingRebalance, + ConsumerGroupState.Unknown, + ]; + + for (const state of resettableStates) { + it(`should return true for ${state} state`, () => { + const group = createConsumerGroup({ + connectionId: CCLOUD_CONNECTION_ID, + connectionType: ConnectionType.Ccloud, + environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id, + state, + }); + + assert.strictEqual(group.canResetOffsets, true); + }); + } + + for (const state of nonResettableStates) { + it(`should return false for ${state} state`, () => { + const group = createConsumerGroup({ + connectionId: CCLOUD_CONNECTION_ID, + connectionType: ConnectionType.Ccloud, + environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id, + state, + }); + + assert.strictEqual(group.canResetOffsets, false); + }); + } + }); + + describe("searchableText", () => { + it("should return the consumerGroupId", () => { + assert.strictEqual( + TEST_CCLOUD_CONSUMER_GROUP.searchableText(), + TEST_CCLOUD_CONSUMER_GROUP_ID, + ); + }); + }); + + describe("ccloudUrl", () => { + it("should return the correct URL for CCloud groups", () => { + const group = TEST_CCLOUD_CONSUMER_GROUP; + const expected = `https://confluent.cloud/environments/${group.environmentId}/clusters/${group.clusterId}/clients/consumer-lag/${group.consumerGroupId}`; + + assert.strictEqual(group.ccloudUrl(), expected); + }); + + it("should return empty string for non-CCloud groups", () => { + assert.strictEqual(TEST_DIRECT_CONSUMER_GROUP.ccloudUrl(), ""); + assert.strictEqual(TEST_LOCAL_CONSUMER_GROUP.ccloudUrl(), ""); + }); + }); + }); + + describe("Consumer", () => { + describe("id", () => { + it("should return clusterId-consumerGroupId-consumerId", () => { + const consumer = TEST_CCLOUD_CONSUMER; + assert.strictEqual( + consumer.id, + `${consumer.clusterId}-${consumer.consumerGroupId}-${consumer.consumerId}`, + ); + }); + }); + + describe("searchableText", () => { + it("should return consumerId and clientId", () => { + const consumer = TEST_CCLOUD_CONSUMER; + assert.strictEqual( + consumer.searchableText(), + `${consumer.consumerId} ${consumer.clientId}`, + ); + }); + }); + + describe("ccloudUrl", () => { + it("should return the correct URL for CCloud consumers", () => { + const consumer = TEST_CCLOUD_CONSUMER; + const expected = `https://confluent.cloud/environments/${consumer.environmentId}/clusters/${consumer.clusterId}/clients/consumers/${consumer.clientId}`; + + assert.strictEqual(consumer.ccloudUrl(), expected); + }); + + it("should return empty string for non-CCloud consumers", () => { + assert.strictEqual(TEST_DIRECT_CONSUMER.ccloudUrl(), ""); + assert.strictEqual(TEST_LOCAL_CONSUMER.ccloudUrl(), ""); + }); + }); + + describe("instanceId", () => { + it("should default to null when not provided", () => { + const consumer = createConsumerGroupMember({ + connectionId: CCLOUD_CONNECTION_ID, + connectionType: ConnectionType.Ccloud, + environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id, + consumerGroupId: "group-1", + }); + + assert.strictEqual(consumer.instanceId, null); + }); + + it("should preserve instanceId when provided", () => { + const consumer = createConsumerGroupMember({ + connectionId: CCLOUD_CONNECTION_ID, + connectionType: ConnectionType.Ccloud, + environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id, + consumerGroupId: "group-1", + instanceId: "instance-42", + }); + + assert.strictEqual(consumer.instanceId, "instance-42"); + }); + }); + }); + + describe("ConsumerGroupTreeItem", () => { + it("should use consumerGroupId as label", () => { + const treeItem = new ConsumerGroupTreeItem(TEST_CCLOUD_CONSUMER_GROUP); + + assert.strictEqual(treeItem.label, TEST_CCLOUD_CONSUMER_GROUP.consumerGroupId); + }); + + it("should set id from the resource", () => { + const treeItem = new ConsumerGroupTreeItem(TEST_CCLOUD_CONSUMER_GROUP); + + assert.strictEqual(treeItem.id, TEST_CCLOUD_CONSUMER_GROUP.id); + }); + + it("should include connection type and state in contextValue", () => { + const treeItem = new ConsumerGroupTreeItem(TEST_CCLOUD_CONSUMER_GROUP); + + assert.strictEqual(treeItem.contextValue, "ccloud-consumerGroup-Stable"); + }); + + it("should set contextValue for local connection type", () => { + const treeItem = new ConsumerGroupTreeItem(TEST_LOCAL_CONSUMER_GROUP); + + assert.strictEqual(treeItem.contextValue, "local-consumerGroup-Stable"); + }); + + it("should set contextValue for direct connection type", () => { + const treeItem = new ConsumerGroupTreeItem(TEST_DIRECT_CONSUMER_GROUP); + + assert.strictEqual(treeItem.contextValue, "direct-consumerGroup-Stable"); + }); + + it("should always set collapsible state to Collapsed", () => { + const treeItem = new ConsumerGroupTreeItem(TEST_CCLOUD_CONSUMER_GROUP); + + assert.strictEqual(treeItem.collapsibleState, TreeItemCollapsibleState.Collapsed); + }); + + it("should set description to the group state", () => { + const treeItem = new ConsumerGroupTreeItem(TEST_CCLOUD_CONSUMER_GROUP); + + assert.strictEqual(treeItem.description, ConsumerGroupState.Stable); + }); + + it("should use the consumer group icon", () => { + const treeItem = new ConsumerGroupTreeItem(TEST_CCLOUD_CONSUMER_GROUP); + const icon = treeItem.iconPath as ThemeIcon; + + assert.strictEqual(icon.id, IconNames.CONSUMER_GROUP); + }); + + describe("icon color", () => { + const expectedColorByState: Record = { + [ConsumerGroupState.Stable]: "testing.iconPassed", + [ConsumerGroupState.Empty]: "problemsWarningIcon.foreground", + [ConsumerGroupState.Dead]: "problemsErrorIcon.foreground", + [ConsumerGroupState.PreparingRebalance]: "notificationsInfoIcon.foreground", + [ConsumerGroupState.CompletingRebalance]: "notificationsInfoIcon.foreground", + [ConsumerGroupState.Unknown]: undefined, + }; + + for (const [state, expectedColor] of Object.entries(expectedColorByState)) { + it(`should use color=${expectedColor} when state=${state}`, () => { + const group = createConsumerGroup({ + connectionId: CCLOUD_CONNECTION_ID, + connectionType: ConnectionType.Ccloud, + environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id, + state: state as ConsumerGroupState, + }); + const item = new ConsumerGroupTreeItem(group); + const icon = item.iconPath as ThemeIcon; + + assert.strictEqual(icon.color?.id, expectedColor); + }); + } + }); + + describe("tooltip", () => { + it("should include consumer group details", () => { + const treeItem = new ConsumerGroupTreeItem(TEST_CCLOUD_CONSUMER_GROUP); + const text = (treeItem.tooltip as MarkdownString).value; + + assert.ok(text.includes("Consumer Group")); + assert.ok(text.includes(TEST_CCLOUD_CONSUMER_GROUP.consumerGroupId)); + assert.ok(text.includes(ConsumerGroupState.Stable)); + assert.ok(text.includes("range")); + }); + + it("should include member count when members exist", () => { + const treeItem = new ConsumerGroupTreeItem(TEST_CCLOUD_CONSUMER_GROUP); + const text = (treeItem.tooltip as MarkdownString).value; + + assert.ok(text.includes("Members")); + }); + + it("should show warning for Empty state", () => { + const group = createConsumerGroup({ + connectionId: CCLOUD_CONNECTION_ID, + connectionType: ConnectionType.Ccloud, + environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id, + state: ConsumerGroupState.Empty, + }); + const text = (new ConsumerGroupTreeItem(group).tooltip as MarkdownString).value; + + assert.ok(text.includes("No active consumers")); + }); + + it("should show warning for Dead state", () => { + const group = createConsumerGroup({ + connectionId: CCLOUD_CONNECTION_ID, + connectionType: ConnectionType.Ccloud, + environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id, + state: ConsumerGroupState.Dead, + }); + const text = (new ConsumerGroupTreeItem(group).tooltip as MarkdownString).value; + + assert.ok(text.includes("dead and will be removed")); + }); + + it("should show warning for rebalancing states", () => { + const group = createConsumerGroup({ + connectionId: CCLOUD_CONNECTION_ID, + connectionType: ConnectionType.Ccloud, + environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id, + state: ConsumerGroupState.PreparingRebalance, + }); + const text = (new ConsumerGroupTreeItem(group).tooltip as MarkdownString).value; + + assert.ok(text.includes("currently rebalancing")); + }); + }); + }); + + describe("ConsumerTreeItem", () => { + it("should include consumerId and clientId in label", () => { + const treeItem = new ConsumerTreeItem(TEST_CCLOUD_CONSUMER); + assert.strictEqual( + treeItem.label, + `${TEST_CCLOUD_CONSUMER.consumerId} (client: ${TEST_CCLOUD_CONSUMER.clientId})`, + ); + }); + + it("should use only consumerId as label when clientId is empty", () => { + const consumer = createConsumerGroupMember({ + connectionId: CCLOUD_CONNECTION_ID, + connectionType: ConnectionType.Ccloud, + environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id, + consumerGroupId: "group-1", + clientId: "", + }); + const treeItem = new ConsumerTreeItem(consumer); + assert.strictEqual(treeItem.label, consumer.consumerId); + }); + + it("should set id from the resource", () => { + const treeItem = new ConsumerTreeItem(TEST_CCLOUD_CONSUMER); + assert.strictEqual(treeItem.id, TEST_CCLOUD_CONSUMER.id); + }); + + it("should include connection type in contextValue", () => { + const treeItem = new ConsumerTreeItem(TEST_CCLOUD_CONSUMER); + assert.strictEqual(treeItem.contextValue, "ccloud-consumerGroup-member"); + }); + + it("should set collapsible state to None", () => { + const treeItem = new ConsumerTreeItem(TEST_CCLOUD_CONSUMER); + assert.strictEqual(treeItem.collapsibleState, TreeItemCollapsibleState.None); + }); + + it("should use the placeholder icon", () => { + const treeItem = new ConsumerTreeItem(TEST_CCLOUD_CONSUMER); + const icon = treeItem.iconPath as ThemeIcon; + assert.strictEqual(icon.id, IconNames.PLACEHOLDER); + }); + + describe("tooltip", () => { + it("should include consumer details", () => { + const treeItem = new ConsumerTreeItem(TEST_CCLOUD_CONSUMER); + const text = (treeItem.tooltip as MarkdownString).value; + + assert.ok(text.includes("Consumer")); + assert.ok(text.includes(TEST_CCLOUD_CONSUMER.consumerId)); + assert.ok(text.includes(TEST_CCLOUD_CONSUMER.clientId)); + assert.ok(text.includes(TEST_CCLOUD_CONSUMER.consumerGroupId)); + }); + + it("should include instanceId when present", () => { + const consumer = createConsumerGroupMember({ + connectionId: CCLOUD_CONNECTION_ID, + connectionType: ConnectionType.Ccloud, + environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id, + consumerGroupId: "group-1", + instanceId: "instance-42", + }); + const text = (new ConsumerTreeItem(consumer).tooltip as MarkdownString).value; + + assert.ok(text.includes("instance-42")); + }); + }); + }); +}); From 5dc2e76559e0725092f187e2eb4e2b4846f9d629 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Mon, 26 Jan 2026 14:42:35 -0500 Subject: [PATCH 09/18] move error icon const to icons.ts --- src/icons.ts | 5 +++++ src/models/flinkDatabaseResourceContainer.ts | 7 ++----- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/icons.ts b/src/icons.ts index 4d8e95cbb1..9b94576fc7 100644 --- a/src/icons.ts +++ b/src/icons.ts @@ -1,3 +1,5 @@ +import { ThemeColor, ThemeIcon } from "vscode"; + /** * Ids to use with ThemeIcons for different Confluent/Kafka resources * @see https://code.visualstudio.com/api/references/icons-in-labels @@ -41,3 +43,6 @@ export enum IconNames { /** General-purpose icon to use when we don't have a dedicated icon for a given resource. */ PLACEHOLDER = "symbol-misc", } + +/** Red "warning" icon to use when dealing with connectivity issues or failed resource fetching. */ +export const ERROR_ICON = new ThemeIcon("warning", new ThemeColor("problemsErrorIcon.foreground")); diff --git a/src/models/flinkDatabaseResourceContainer.ts b/src/models/flinkDatabaseResourceContainer.ts index 6611af94a4..686d550396 100644 --- a/src/models/flinkDatabaseResourceContainer.ts +++ b/src/models/flinkDatabaseResourceContainer.ts @@ -1,7 +1,7 @@ -import { ThemeColor, ThemeIcon, TreeItem, TreeItemCollapsibleState } from "vscode"; +import { ThemeIcon, TreeItem, TreeItemCollapsibleState } from "vscode"; import { ConnectionType } from "../clients/sidecar"; import { CCLOUD_CONNECTION_ID } from "../constants"; -import { IconNames } from "../icons"; +import { ERROR_ICON, IconNames } from "../icons"; import { Logger } from "../logging"; import type { FlinkArtifact } from "./flinkArtifact"; import type { FlinkDatabaseResource } from "./flinkDatabaseResource"; @@ -18,9 +18,6 @@ export enum FlinkDatabaseContainerLabel { AI_AGENTS = "AI Agents", } -/** Error icon to use for Flink Database resource containers items if fetching resources fails. */ -export const ERROR_ICON = new ThemeIcon("warning", new ThemeColor("problemsErrorIcon.foreground")); - /** Poll interval to use when waiting for a container to finish loading. */ export const LOADING_POLL_INTERVAL_MS = 100; From 9b4450dad65415a64ad03dd7c569521e649a9231 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Tue, 27 Jan 2026 17:11:12 -0500 Subject: [PATCH 10/18] fix ERROR_ICON import --- src/models/flinkDatabaseResourceContainer.test.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/models/flinkDatabaseResourceContainer.test.ts b/src/models/flinkDatabaseResourceContainer.test.ts index 6e952540a1..6bbb450d07 100644 --- a/src/models/flinkDatabaseResourceContainer.test.ts +++ b/src/models/flinkDatabaseResourceContainer.test.ts @@ -4,10 +4,9 @@ import { ThemeIcon, TreeItemCollapsibleState } from "vscode"; import { createFakeFlinkDatabaseResource } from "../../tests/unit/testResources/flinkDatabaseResource"; import { ConnectionType } from "../clients/sidecar"; import { CCLOUD_CONNECTION_ID } from "../constants"; -import { IconNames } from "../icons"; +import { ERROR_ICON, IconNames } from "../icons"; import type { FlinkDatabaseResource } from "./flinkDatabaseResource"; import { - ERROR_ICON, FlinkDatabaseResourceContainer, LOADING_POLL_INTERVAL_MS, } from "./flinkDatabaseResourceContainer"; From eb6dc207f06defdbf8994b7a6ed517dcc1f1be8c Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Tue, 24 Feb 2026 12:52:02 -0500 Subject: [PATCH 11/18] remove ConsumerGroupContainer since container refactoring happens in downstream branch --- src/models/consumerGroup.ts | 119 +----------------------------------- 1 file changed, 1 insertion(+), 118 deletions(-) diff --git a/src/models/consumerGroup.ts b/src/models/consumerGroup.ts index ba1f757ed2..6d90d705f5 100644 --- a/src/models/consumerGroup.ts +++ b/src/models/consumerGroup.ts @@ -1,7 +1,6 @@ import * as vscode from "vscode"; import { ConnectionType } from "../clients/sidecar"; -import { ERROR_ICON, IconNames } from "../icons"; -import { Logger } from "../logging"; +import { IconNames } from "../icons"; import type { IdItem } from "./main"; import { CustomMarkdownString } from "./main"; import type { ConnectionId, EnvironmentId, IResourceBase, ISearchable } from "./resource"; @@ -268,119 +267,3 @@ function createConsumerTooltip(resource: Consumer): CustomMarkdownString { return tooltip; } - -// TODO: merge with FlinkDatabaseResourceContainer? - -/** Poll interval to use when waiting for a container to finish loading. */ -const LOADING_POLL_INTERVAL_MS = 100; - -/** A container {@link TreeItem} for consumer groups in the Topics view. */ -export class ConsumerGroupContainer extends vscode.TreeItem implements ISearchable { - readonly connectionId: ConnectionId; - readonly connectionType: ConnectionType; - readonly clusterId: string; - readonly environmentId: EnvironmentId; - - // `id` is string|undefined in TreeItem, but we need it to be a string for IdItem - id: string; - - private _children: ConsumerGroup[]; - private _isLoading: boolean = false; - private _hasError: boolean = false; - private readonly _defaultContextValue: string = "consumerGroups-container"; - private readonly _defaultIcon: vscode.ThemeIcon; - - private logger: Logger; - - constructor( - connectionId: ConnectionId, - connectionType: ConnectionType, - clusterId: string, - environmentId: EnvironmentId, - children: ConsumerGroup[] = [], - ) { - super("Consumer Groups", vscode.TreeItemCollapsibleState.Collapsed); - - this.connectionId = connectionId; - this.connectionType = connectionType; - this.clusterId = clusterId; - this.environmentId = environmentId; - this._children = children; - - this.id = `${connectionId}-${clusterId}-consumer-groups`; - this.contextValue = this._defaultContextValue; - this._defaultIcon = new vscode.ThemeIcon(IconNames.CONSUMER_GROUP); - this.iconPath = this._defaultIcon; - - this.logger = new Logger("models.ConsumerGroupContainer"); - } - - /** - * Consumer groups belonging to this container. - * Setting this will clear the internal {@linkcode isLoading} state. - * If the children array has items, this will also set {@linkcode hasError} to `false`. - */ - get children(): ConsumerGroup[] { - return this._children; - } - - set children(children: ConsumerGroup[]) { - this._children = children; - this.isLoading = false; - this.description = `(${children.length})`; - - if (children.length > 0) { - this.hasError = false; - } - } - - get isLoading(): boolean { - return this._isLoading; - } - - set isLoading(loading: boolean) { - this._isLoading = loading; - this.iconPath = loading ? new vscode.ThemeIcon(IconNames.LOADING) : this._defaultIcon; - } - - get hasError(): boolean { - return this._hasError; - } - - /** Set or clear the error state for this container. */ - set hasError(error: boolean) { - this._hasError = error; - this.iconPath = error ? ERROR_ICON : this._defaultIcon; - - // Append or remove "-error" suffix to context value based on error state - this.contextValue = error ? `${this._defaultContextValue}-error` : this._defaultContextValue; - } - - searchableText(): string { - return "Consumer Groups"; - } - - /** Wait until the container is no longer in a loading state, or timeout after timeoutMs. */ - async ensureDoneLoading(timeoutMs: number = 10000): Promise { - const startTime = Date.now(); - - while (this.isLoading) { - if (Date.now() - startTime >= timeoutMs) { - throw new Error("Timeout waiting for container to finish loading"); - } - await new Promise((resolve) => setTimeout(resolve, LOADING_POLL_INTERVAL_MS)); - } - } - - /** Get the container's resources, waiting for loading to complete if necessary. */ - async gatherResources(timeoutMs: number = 10000): Promise { - let resources: ConsumerGroup[] = []; - try { - await this.ensureDoneLoading(timeoutMs); - resources = this.children; - } catch (error) { - this.logger.error(`Error getting resources: ${error}`); - } - return resources; - } -} From 71b27b3188bb45711f09e1a98ec6e66bb074cb59 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Tue, 24 Feb 2026 13:04:13 -0500 Subject: [PATCH 12/18] simplify consumer group icon color; update test state:color map --- src/models/consumerGroup.test.ts | 9 ++++---- src/models/consumerGroup.ts | 35 ++++++++------------------------ 2 files changed, 13 insertions(+), 31 deletions(-) diff --git a/src/models/consumerGroup.test.ts b/src/models/consumerGroup.test.ts index 399b7af617..d9236e6d23 100644 --- a/src/models/consumerGroup.test.ts +++ b/src/models/consumerGroup.test.ts @@ -223,12 +223,13 @@ describe("models/consumerGroup.ts", () => { }); describe("icon color", () => { + // inactive states (Empty/Dead) get a warning color, all others use the default const expectedColorByState: Record = { - [ConsumerGroupState.Stable]: "testing.iconPassed", + [ConsumerGroupState.Stable]: undefined, [ConsumerGroupState.Empty]: "problemsWarningIcon.foreground", - [ConsumerGroupState.Dead]: "problemsErrorIcon.foreground", - [ConsumerGroupState.PreparingRebalance]: "notificationsInfoIcon.foreground", - [ConsumerGroupState.CompletingRebalance]: "notificationsInfoIcon.foreground", + [ConsumerGroupState.Dead]: "problemsWarningIcon.foreground", + [ConsumerGroupState.PreparingRebalance]: undefined, + [ConsumerGroupState.CompletingRebalance]: undefined, [ConsumerGroupState.Unknown]: undefined, }; diff --git a/src/models/consumerGroup.ts b/src/models/consumerGroup.ts index 6d90d705f5..706f093d7d 100644 --- a/src/models/consumerGroup.ts +++ b/src/models/consumerGroup.ts @@ -171,35 +171,16 @@ export class ConsumerGroupTreeItem extends vscode.TreeItem { this.collapsibleState = vscode.TreeItemCollapsibleState.Collapsed; this.description = resource.state; - this.iconPath = getConsumerGroupIcon(resource); - this.tooltip = createConsumerGroupTooltip(resource); - } -} -function getConsumerGroupIcon(group: ConsumerGroup): vscode.ThemeIcon { - let stateColor: string | undefined; - switch (group.state) { - case ConsumerGroupState.Stable: - // Green for stable/healthy - stateColor = "testing.iconPassed"; - break; - case ConsumerGroupState.Empty: - // Yellow/warning for empty (no active consumers) - stateColor = "problemsWarningIcon.foreground"; - break; - case ConsumerGroupState.Dead: - // Red for dead - stateColor = "problemsErrorIcon.foreground"; - break; - case ConsumerGroupState.PreparingRebalance: - case ConsumerGroupState.CompletingRebalance: - stateColor = "notificationsInfoIcon.foreground"; - break; + // highlight inactive groups (Empty/Dead) with a warning color + const isInactive = [ConsumerGroupState.Empty, ConsumerGroupState.Dead].includes(resource.state); + this.iconPath = new vscode.ThemeIcon( + resource.iconName, + isInactive ? new vscode.ThemeColor("problemsWarningIcon.foreground") : undefined, + ); + + this.tooltip = createConsumerGroupTooltip(resource); } - return new vscode.ThemeIcon( - group.iconName, - stateColor ? new vscode.ThemeColor(stateColor) : undefined, - ); } function createConsumerGroupTooltip(resource: ConsumerGroup): CustomMarkdownString { From 1950e17a67e74f5a0dffca40f6d7bc772ed1be20 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Tue, 24 Feb 2026 14:31:40 -0500 Subject: [PATCH 13/18] set INACTIVE_STATES const; convert ccloudUrl to getter method for consistency and update tests; add CCloud link section to tooltips --- src/models/consumerGroup.test.ts | 12 ++++++------ src/models/consumerGroup.ts | 20 ++++++++++++++------ 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/src/models/consumerGroup.test.ts b/src/models/consumerGroup.test.ts index d9236e6d23..30eb53221d 100644 --- a/src/models/consumerGroup.test.ts +++ b/src/models/consumerGroup.test.ts @@ -99,12 +99,12 @@ describe("models/consumerGroup.ts", () => { const group = TEST_CCLOUD_CONSUMER_GROUP; const expected = `https://confluent.cloud/environments/${group.environmentId}/clusters/${group.clusterId}/clients/consumer-lag/${group.consumerGroupId}`; - assert.strictEqual(group.ccloudUrl(), expected); + assert.strictEqual(group.ccloudUrl, expected); }); it("should return empty string for non-CCloud groups", () => { - assert.strictEqual(TEST_DIRECT_CONSUMER_GROUP.ccloudUrl(), ""); - assert.strictEqual(TEST_LOCAL_CONSUMER_GROUP.ccloudUrl(), ""); + assert.strictEqual(TEST_DIRECT_CONSUMER_GROUP.ccloudUrl, ""); + assert.strictEqual(TEST_LOCAL_CONSUMER_GROUP.ccloudUrl, ""); }); }); }); @@ -135,12 +135,12 @@ describe("models/consumerGroup.ts", () => { const consumer = TEST_CCLOUD_CONSUMER; const expected = `https://confluent.cloud/environments/${consumer.environmentId}/clusters/${consumer.clusterId}/clients/consumers/${consumer.clientId}`; - assert.strictEqual(consumer.ccloudUrl(), expected); + assert.strictEqual(consumer.ccloudUrl, expected); }); it("should return empty string for non-CCloud consumers", () => { - assert.strictEqual(TEST_DIRECT_CONSUMER.ccloudUrl(), ""); - assert.strictEqual(TEST_LOCAL_CONSUMER.ccloudUrl(), ""); + assert.strictEqual(TEST_DIRECT_CONSUMER.ccloudUrl, ""); + assert.strictEqual(TEST_LOCAL_CONSUMER.ccloudUrl, ""); }); }); diff --git a/src/models/consumerGroup.ts b/src/models/consumerGroup.ts index 706f093d7d..a520a21690 100644 --- a/src/models/consumerGroup.ts +++ b/src/models/consumerGroup.ts @@ -18,6 +18,12 @@ export enum ConsumerGroupState { Unknown = "Unknown", } +/** States where the consumer group has no active consumers and offsets can be reset. */ +const INACTIVE_STATES: readonly ConsumerGroupState[] = [ + ConsumerGroupState.Empty, + ConsumerGroupState.Dead, +]; + /** Main class representing a Kafka consumer group. */ export class ConsumerGroup implements IResourceBase, ISearchable, IdItem { connectionId: ConnectionId; @@ -81,15 +87,14 @@ export class ConsumerGroup implements IResourceBase, ISearchable, IdItem { /** Whether the consumer group is in a state that allows offset resets. */ get canResetOffsets(): boolean { - const resettableStates = [ConsumerGroupState.Empty, ConsumerGroupState.Dead]; - return resettableStates.includes(this.state); + return INACTIVE_STATES.includes(this.state); } searchableText(): string { return this.consumerGroupId; } - ccloudUrl(): string { + get ccloudUrl(): string { if (this.connectionType !== ConnectionType.Ccloud) { return ""; } @@ -148,7 +153,7 @@ export class Consumer implements IResourceBase, ISearchable, IdItem { return `${this.consumerId} ${this.clientId}`; } - ccloudUrl(): string { + get ccloudUrl(): string { if (this.connectionType !== ConnectionType.Ccloud) { return ""; } @@ -172,8 +177,7 @@ export class ConsumerGroupTreeItem extends vscode.TreeItem { this.collapsibleState = vscode.TreeItemCollapsibleState.Collapsed; this.description = resource.state; - // highlight inactive groups (Empty/Dead) with a warning color - const isInactive = [ConsumerGroupState.Empty, ConsumerGroupState.Dead].includes(resource.state); + const isInactive = INACTIVE_STATES.includes(resource.state); this.iconPath = new vscode.ThemeIcon( resource.iconName, isInactive ? new vscode.ThemeColor("problemsWarningIcon.foreground") : undefined, @@ -211,6 +215,8 @@ function createConsumerGroupTooltip(resource: ConsumerGroup): CustomMarkdownStri tooltip.addWarning("Consumer group is currently rebalancing."); } + tooltip.addCCloudLink(resource.ccloudUrl); + return tooltip; } @@ -246,5 +252,7 @@ function createConsumerTooltip(resource: Consumer): CustomMarkdownString { tooltip.addField("Instance ID", resource.instanceId); } + tooltip.addCCloudLink(resource.ccloudUrl); + return tooltip; } From b1c907a8311d37e66250a767541c2528d12d742a Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Tue, 24 Feb 2026 16:55:28 -0500 Subject: [PATCH 14/18] fix enum values based on response data --- src/models/consumerGroup.ts | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/models/consumerGroup.ts b/src/models/consumerGroup.ts index a520a21690..0b29d81add 100644 --- a/src/models/consumerGroup.ts +++ b/src/models/consumerGroup.ts @@ -10,12 +10,12 @@ import type { ConnectionId, EnvironmentId, IResourceBase, ISearchable } from "./ * @see https://kafka.apache.org/20/javadoc/org/apache/kafka/common/ConsumerGroupState.html */ export enum ConsumerGroupState { - Dead = "Dead", - Empty = "Empty", - PreparingRebalance = "PreparingRebalance", - CompletingRebalance = "CompletingRebalance", - Stable = "Stable", - Unknown = "Unknown", + Dead = "DEAD", + Empty = "EMPTY", + PreparingRebalance = "PREPARING_REBALANCE", + CompletingRebalance = "COMPLETING_REBALANCE", + Stable = "STABLE", + Unknown = "UNKNOWN", } /** States where the consumer group has no active consumers and offsets can be reset. */ From df8dace93ac17a6e04f44ea91135bd4383f2c0b1 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Tue, 24 Feb 2026 16:58:24 -0500 Subject: [PATCH 15/18] use enum values in assertions --- src/models/consumerGroup.test.ts | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/models/consumerGroup.test.ts b/src/models/consumerGroup.test.ts index 30eb53221d..2df643936b 100644 --- a/src/models/consumerGroup.test.ts +++ b/src/models/consumerGroup.test.ts @@ -188,19 +188,25 @@ describe("models/consumerGroup.ts", () => { it("should include connection type and state in contextValue", () => { const treeItem = new ConsumerGroupTreeItem(TEST_CCLOUD_CONSUMER_GROUP); - assert.strictEqual(treeItem.contextValue, "ccloud-consumerGroup-Stable"); + assert.strictEqual( + treeItem.contextValue, + `ccloud-consumerGroup-${ConsumerGroupState.Stable}`, + ); }); it("should set contextValue for local connection type", () => { const treeItem = new ConsumerGroupTreeItem(TEST_LOCAL_CONSUMER_GROUP); - assert.strictEqual(treeItem.contextValue, "local-consumerGroup-Stable"); + assert.strictEqual(treeItem.contextValue, `local-consumerGroup-${ConsumerGroupState.Stable}`); }); it("should set contextValue for direct connection type", () => { const treeItem = new ConsumerGroupTreeItem(TEST_DIRECT_CONSUMER_GROUP); - assert.strictEqual(treeItem.contextValue, "direct-consumerGroup-Stable"); + assert.strictEqual( + treeItem.contextValue, + `direct-consumerGroup-${ConsumerGroupState.Stable}`, + ); }); it("should always set collapsible state to Collapsed", () => { From fbcae6f43748c9dcc1eacdfc385b7c46d3534fc3 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Tue, 24 Feb 2026 17:22:39 -0500 Subject: [PATCH 16/18] update context value for state accuracy --- src/models/consumerGroup.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/models/consumerGroup.ts b/src/models/consumerGroup.ts index 0b29d81add..888b22037f 100644 --- a/src/models/consumerGroup.ts +++ b/src/models/consumerGroup.ts @@ -171,7 +171,7 @@ export class ConsumerGroupTreeItem extends vscode.TreeItem { this.id = resource.id; this.resource = resource; // includes state for conditional menu visibility, like: - // "ccloud-consumerGroup-Stable" or "local-consumerGroup-Empty" + // "ccloud-consumerGroup-STABLE" or "local-consumerGroup-EMPTY" this.contextValue = `${resource.connectionType.toLowerCase()}-consumerGroup-${resource.state}`; this.collapsibleState = vscode.TreeItemCollapsibleState.Collapsed; From 8f458fa87f09f9ac9fb73040a09c9e709e429736 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Thu, 26 Feb 2026 11:46:01 -0500 Subject: [PATCH 17/18] update context value to use kebab case for consistency; update tests --- src/models/consumerGroup.test.ts | 11 +++++++---- src/models/consumerGroup.ts | 6 +++--- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/models/consumerGroup.test.ts b/src/models/consumerGroup.test.ts index 2df643936b..94f0c1859a 100644 --- a/src/models/consumerGroup.test.ts +++ b/src/models/consumerGroup.test.ts @@ -190,14 +190,17 @@ describe("models/consumerGroup.ts", () => { assert.strictEqual( treeItem.contextValue, - `ccloud-consumerGroup-${ConsumerGroupState.Stable}`, + `ccloud-consumer-group-${ConsumerGroupState.Stable}`, ); }); it("should set contextValue for local connection type", () => { const treeItem = new ConsumerGroupTreeItem(TEST_LOCAL_CONSUMER_GROUP); - assert.strictEqual(treeItem.contextValue, `local-consumerGroup-${ConsumerGroupState.Stable}`); + assert.strictEqual( + treeItem.contextValue, + `local-consumer-group-${ConsumerGroupState.Stable}`, + ); }); it("should set contextValue for direct connection type", () => { @@ -205,7 +208,7 @@ describe("models/consumerGroup.ts", () => { assert.strictEqual( treeItem.contextValue, - `direct-consumerGroup-${ConsumerGroupState.Stable}`, + `direct-consumer-group-${ConsumerGroupState.Stable}`, ); }); @@ -344,7 +347,7 @@ describe("models/consumerGroup.ts", () => { it("should include connection type in contextValue", () => { const treeItem = new ConsumerTreeItem(TEST_CCLOUD_CONSUMER); - assert.strictEqual(treeItem.contextValue, "ccloud-consumerGroup-member"); + assert.strictEqual(treeItem.contextValue, "ccloud-consumer-group-member"); }); it("should set collapsible state to None", () => { diff --git a/src/models/consumerGroup.ts b/src/models/consumerGroup.ts index 888b22037f..a22a84d985 100644 --- a/src/models/consumerGroup.ts +++ b/src/models/consumerGroup.ts @@ -171,8 +171,8 @@ export class ConsumerGroupTreeItem extends vscode.TreeItem { this.id = resource.id; this.resource = resource; // includes state for conditional menu visibility, like: - // "ccloud-consumerGroup-STABLE" or "local-consumerGroup-EMPTY" - this.contextValue = `${resource.connectionType.toLowerCase()}-consumerGroup-${resource.state}`; + // "ccloud-consumer-group-STABLE" or "local-consumer-group-EMPTY" + this.contextValue = `${resource.connectionType.toLowerCase()}-consumer-group-${resource.state}`; this.collapsibleState = vscode.TreeItemCollapsibleState.Collapsed; this.description = resource.state; @@ -232,7 +232,7 @@ export class ConsumerTreeItem extends vscode.TreeItem { this.id = resource.id; this.resource = resource; - this.contextValue = `${resource.connectionType.toLowerCase()}-consumerGroup-member`; + this.contextValue = `${resource.connectionType.toLowerCase()}-consumer-group-member`; this.collapsibleState = vscode.TreeItemCollapsibleState.None; From a453935c21a1f0f03a08f9b276f733645870c8c9 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Thu, 26 Feb 2026 12:02:24 -0500 Subject: [PATCH 18/18] use CCLOUD_BASE_PATH const; update tests --- src/models/consumerGroup.test.ts | 6 +++--- src/models/consumerGroup.ts | 5 +++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/models/consumerGroup.test.ts b/src/models/consumerGroup.test.ts index 94f0c1859a..664bb9778a 100644 --- a/src/models/consumerGroup.test.ts +++ b/src/models/consumerGroup.test.ts @@ -14,7 +14,7 @@ import { } from "../../tests/unit/testResources/consumerGroup"; import { TEST_CCLOUD_KAFKA_CLUSTER } from "../../tests/unit/testResources/kafkaCluster"; import { ConnectionType } from "../clients/sidecar"; -import { CCLOUD_CONNECTION_ID } from "../constants"; +import { CCLOUD_BASE_PATH, CCLOUD_CONNECTION_ID } from "../constants"; import { IconNames } from "../icons"; import { ConsumerGroupState, ConsumerGroupTreeItem, ConsumerTreeItem } from "./consumerGroup"; @@ -97,7 +97,7 @@ describe("models/consumerGroup.ts", () => { describe("ccloudUrl", () => { it("should return the correct URL for CCloud groups", () => { const group = TEST_CCLOUD_CONSUMER_GROUP; - const expected = `https://confluent.cloud/environments/${group.environmentId}/clusters/${group.clusterId}/clients/consumer-lag/${group.consumerGroupId}`; + const expected = `https://${CCLOUD_BASE_PATH}/environments/${group.environmentId}/clusters/${group.clusterId}/clients/consumer-lag/${group.consumerGroupId}`; assert.strictEqual(group.ccloudUrl, expected); }); @@ -133,7 +133,7 @@ describe("models/consumerGroup.ts", () => { describe("ccloudUrl", () => { it("should return the correct URL for CCloud consumers", () => { const consumer = TEST_CCLOUD_CONSUMER; - const expected = `https://confluent.cloud/environments/${consumer.environmentId}/clusters/${consumer.clusterId}/clients/consumers/${consumer.clientId}`; + const expected = `https://${CCLOUD_BASE_PATH}/environments/${consumer.environmentId}/clusters/${consumer.clusterId}/clients/consumers/${consumer.clientId}`; assert.strictEqual(consumer.ccloudUrl, expected); }); diff --git a/src/models/consumerGroup.ts b/src/models/consumerGroup.ts index a22a84d985..492af9e5e6 100644 --- a/src/models/consumerGroup.ts +++ b/src/models/consumerGroup.ts @@ -1,5 +1,6 @@ import * as vscode from "vscode"; import { ConnectionType } from "../clients/sidecar"; +import { CCLOUD_BASE_PATH } from "../constants"; import { IconNames } from "../icons"; import type { IdItem } from "./main"; import { CustomMarkdownString } from "./main"; @@ -98,7 +99,7 @@ export class ConsumerGroup implements IResourceBase, ISearchable, IdItem { if (this.connectionType !== ConnectionType.Ccloud) { return ""; } - return `https://confluent.cloud/environments/${this.environmentId}/clusters/${this.clusterId}/clients/consumer-lag/${this.consumerGroupId}`; + return `https://${CCLOUD_BASE_PATH}/environments/${this.environmentId}/clusters/${this.clusterId}/clients/consumer-lag/${this.consumerGroupId}`; } } @@ -157,7 +158,7 @@ export class Consumer implements IResourceBase, ISearchable, IdItem { if (this.connectionType !== ConnectionType.Ccloud) { return ""; } - return `https://confluent.cloud/environments/${this.environmentId}/clusters/${this.clusterId}/clients/consumers/${this.clientId}`; + return `https://${CCLOUD_BASE_PATH}/environments/${this.environmentId}/clusters/${this.clusterId}/clients/consumers/${this.clientId}`; } }