diff --git a/src/extension.ts b/src/extension.ts index 763bc19fd0..6a33b0e20b 100644 --- a/src/extension.ts +++ b/src/extension.ts @@ -390,6 +390,7 @@ async function setupContextValues() { "ccloud-flinkable-kafka-cluster", "ccloud-kafka-topic", "ccloud-kafka-topic-with-schema", + // consumer groups and consumers have dynamic context values based on state; can't include here "ccloud-schema-registry", "ccloud-flink-compute-pool", "ccloud-flink-statement", @@ -410,6 +411,7 @@ async function setupContextValues() { "flinkable-ccloud-environment", "ccloud-kafka-cluster", "ccloud-flinkable-kafka-cluster", + // consumer groups and consumers have dynamic context values based on state; can't include here "ccloud-schema-registry", // only ID, no name "ccloud-flink-compute-pool", "ccloud-flink-artifact", diff --git a/src/icons.ts b/src/icons.ts index 4d8e95cbb1..9b94576fc7 100644 --- a/src/icons.ts +++ b/src/icons.ts @@ -1,3 +1,5 @@ +import { ThemeColor, ThemeIcon } from "vscode"; + /** * Ids to use with ThemeIcons for different Confluent/Kafka resources * @see https://code.visualstudio.com/api/references/icons-in-labels @@ -41,3 +43,6 @@ export enum IconNames { /** General-purpose icon to use when we don't have a dedicated icon for a given resource. */ PLACEHOLDER = "symbol-misc", } + +/** Red "warning" icon to use when dealing with connectivity issues or failed resource fetching. */ +export const ERROR_ICON = new ThemeIcon("warning", new ThemeColor("problemsErrorIcon.foreground")); diff --git a/src/models/consumerGroup.test.ts b/src/models/consumerGroup.test.ts new file mode 100644 index 0000000000..664bb9778a --- /dev/null +++ b/src/models/consumerGroup.test.ts @@ -0,0 +1,390 @@ +import * as assert from "assert"; +import type { MarkdownString, ThemeIcon } from "vscode"; +import { TreeItemCollapsibleState } from "vscode"; +import { + createConsumerGroup, + createConsumerGroupMember, + TEST_CCLOUD_CONSUMER, + TEST_CCLOUD_CONSUMER_GROUP, + TEST_CCLOUD_CONSUMER_GROUP_ID, + TEST_DIRECT_CONSUMER, + TEST_DIRECT_CONSUMER_GROUP, + TEST_LOCAL_CONSUMER, + TEST_LOCAL_CONSUMER_GROUP, +} from "../../tests/unit/testResources/consumerGroup"; +import { TEST_CCLOUD_KAFKA_CLUSTER } from "../../tests/unit/testResources/kafkaCluster"; +import { ConnectionType } from "../clients/sidecar"; +import { CCLOUD_BASE_PATH, CCLOUD_CONNECTION_ID } from "../constants"; +import { IconNames } from "../icons"; +import { ConsumerGroupState, ConsumerGroupTreeItem, ConsumerTreeItem } from "./consumerGroup"; + +describe("models/consumerGroup.ts", () => { + describe("ConsumerGroup", () => { + describe("id", () => { + it("should return clusterId-consumerGroupId", () => { + assert.strictEqual( + TEST_CCLOUD_CONSUMER_GROUP.id, + `${TEST_CCLOUD_CONSUMER_GROUP.clusterId}-${TEST_CCLOUD_CONSUMER_GROUP_ID}`, + ); + }); + }); + + describe("hasMembers", () => { + it("should return true when members exist", () => { + assert.strictEqual(TEST_CCLOUD_CONSUMER_GROUP.hasMembers, true); + }); + + it("should return false when the members array is empty", () => { + const group = createConsumerGroup({ + connectionId: CCLOUD_CONNECTION_ID, + connectionType: ConnectionType.Ccloud, + environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id, + members: [], + }); + + assert.strictEqual(group.hasMembers, false); + }); + }); + + describe("canResetOffsets", () => { + const resettableStates = [ConsumerGroupState.Empty, ConsumerGroupState.Dead]; + const nonResettableStates = [ + ConsumerGroupState.Stable, + ConsumerGroupState.PreparingRebalance, + ConsumerGroupState.CompletingRebalance, + ConsumerGroupState.Unknown, + ]; + + for (const state of resettableStates) { + it(`should return true for ${state} state`, () => { + const group = createConsumerGroup({ + connectionId: CCLOUD_CONNECTION_ID, + connectionType: ConnectionType.Ccloud, + environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id, + state, + }); + + assert.strictEqual(group.canResetOffsets, true); + }); + } + + for (const state of nonResettableStates) { + it(`should return false for ${state} state`, () => { + const group = createConsumerGroup({ + connectionId: CCLOUD_CONNECTION_ID, + connectionType: ConnectionType.Ccloud, + environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id, + state, + }); + + assert.strictEqual(group.canResetOffsets, false); + }); + } + }); + + describe("searchableText", () => { + it("should return the consumerGroupId", () => { + assert.strictEqual( + TEST_CCLOUD_CONSUMER_GROUP.searchableText(), + TEST_CCLOUD_CONSUMER_GROUP_ID, + ); + }); + }); + + describe("ccloudUrl", () => { + it("should return the correct URL for CCloud groups", () => { + const group = TEST_CCLOUD_CONSUMER_GROUP; + const expected = `https://${CCLOUD_BASE_PATH}/environments/${group.environmentId}/clusters/${group.clusterId}/clients/consumer-lag/${group.consumerGroupId}`; + + assert.strictEqual(group.ccloudUrl, expected); + }); + + it("should return empty string for non-CCloud groups", () => { + assert.strictEqual(TEST_DIRECT_CONSUMER_GROUP.ccloudUrl, ""); + assert.strictEqual(TEST_LOCAL_CONSUMER_GROUP.ccloudUrl, ""); + }); + }); + }); + + describe("Consumer", () => { + describe("id", () => { + it("should return clusterId-consumerGroupId-consumerId", () => { + const consumer = TEST_CCLOUD_CONSUMER; + assert.strictEqual( + consumer.id, + `${consumer.clusterId}-${consumer.consumerGroupId}-${consumer.consumerId}`, + ); + }); + }); + + describe("searchableText", () => { + it("should return consumerId and clientId", () => { + const consumer = TEST_CCLOUD_CONSUMER; + assert.strictEqual( + consumer.searchableText(), + `${consumer.consumerId} ${consumer.clientId}`, + ); + }); + }); + + describe("ccloudUrl", () => { + it("should return the correct URL for CCloud consumers", () => { + const consumer = TEST_CCLOUD_CONSUMER; + const expected = `https://${CCLOUD_BASE_PATH}/environments/${consumer.environmentId}/clusters/${consumer.clusterId}/clients/consumers/${consumer.clientId}`; + + assert.strictEqual(consumer.ccloudUrl, expected); + }); + + it("should return empty string for non-CCloud consumers", () => { + assert.strictEqual(TEST_DIRECT_CONSUMER.ccloudUrl, ""); + assert.strictEqual(TEST_LOCAL_CONSUMER.ccloudUrl, ""); + }); + }); + + describe("instanceId", () => { + it("should default to null when not provided", () => { + const consumer = createConsumerGroupMember({ + connectionId: CCLOUD_CONNECTION_ID, + connectionType: ConnectionType.Ccloud, + environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id, + consumerGroupId: "group-1", + }); + + assert.strictEqual(consumer.instanceId, null); + }); + + it("should preserve instanceId when provided", () => { + const consumer = createConsumerGroupMember({ + connectionId: CCLOUD_CONNECTION_ID, + connectionType: ConnectionType.Ccloud, + environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id, + consumerGroupId: "group-1", + instanceId: "instance-42", + }); + + assert.strictEqual(consumer.instanceId, "instance-42"); + }); + }); + }); + + describe("ConsumerGroupTreeItem", () => { + it("should use consumerGroupId as label", () => { + const treeItem = new ConsumerGroupTreeItem(TEST_CCLOUD_CONSUMER_GROUP); + + assert.strictEqual(treeItem.label, TEST_CCLOUD_CONSUMER_GROUP.consumerGroupId); + }); + + it("should set id from the resource", () => { + const treeItem = new ConsumerGroupTreeItem(TEST_CCLOUD_CONSUMER_GROUP); + + assert.strictEqual(treeItem.id, TEST_CCLOUD_CONSUMER_GROUP.id); + }); + + it("should include connection type and state in contextValue", () => { + const treeItem = new ConsumerGroupTreeItem(TEST_CCLOUD_CONSUMER_GROUP); + + assert.strictEqual( + treeItem.contextValue, + `ccloud-consumer-group-${ConsumerGroupState.Stable}`, + ); + }); + + it("should set contextValue for local connection type", () => { + const treeItem = new ConsumerGroupTreeItem(TEST_LOCAL_CONSUMER_GROUP); + + assert.strictEqual( + treeItem.contextValue, + `local-consumer-group-${ConsumerGroupState.Stable}`, + ); + }); + + it("should set contextValue for direct connection type", () => { + const treeItem = new ConsumerGroupTreeItem(TEST_DIRECT_CONSUMER_GROUP); + + assert.strictEqual( + treeItem.contextValue, + `direct-consumer-group-${ConsumerGroupState.Stable}`, + ); + }); + + it("should always set collapsible state to Collapsed", () => { + const treeItem = new ConsumerGroupTreeItem(TEST_CCLOUD_CONSUMER_GROUP); + + assert.strictEqual(treeItem.collapsibleState, TreeItemCollapsibleState.Collapsed); + }); + + it("should set description to the group state", () => { + const treeItem = new ConsumerGroupTreeItem(TEST_CCLOUD_CONSUMER_GROUP); + + assert.strictEqual(treeItem.description, ConsumerGroupState.Stable); + }); + + it("should use the consumer group icon", () => { + const treeItem = new ConsumerGroupTreeItem(TEST_CCLOUD_CONSUMER_GROUP); + const icon = treeItem.iconPath as ThemeIcon; + + assert.strictEqual(icon.id, IconNames.CONSUMER_GROUP); + }); + + describe("icon color", () => { + // inactive states (Empty/Dead) get a warning color, all others use the default + const expectedColorByState: Record = { + [ConsumerGroupState.Stable]: undefined, + [ConsumerGroupState.Empty]: "problemsWarningIcon.foreground", + [ConsumerGroupState.Dead]: "problemsWarningIcon.foreground", + [ConsumerGroupState.PreparingRebalance]: undefined, + [ConsumerGroupState.CompletingRebalance]: undefined, + [ConsumerGroupState.Unknown]: undefined, + }; + + for (const [state, expectedColor] of Object.entries(expectedColorByState)) { + it(`should use color=${expectedColor} when state=${state}`, () => { + const group = createConsumerGroup({ + connectionId: CCLOUD_CONNECTION_ID, + connectionType: ConnectionType.Ccloud, + environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id, + state: state as ConsumerGroupState, + }); + const item = new ConsumerGroupTreeItem(group); + const icon = item.iconPath as ThemeIcon; + + assert.strictEqual(icon.color?.id, expectedColor); + }); + } + }); + + describe("tooltip", () => { + it("should include consumer group details", () => { + const treeItem = new ConsumerGroupTreeItem(TEST_CCLOUD_CONSUMER_GROUP); + const text = (treeItem.tooltip as MarkdownString).value; + + assert.ok(text.includes("Consumer Group")); + assert.ok(text.includes(TEST_CCLOUD_CONSUMER_GROUP.consumerGroupId)); + assert.ok(text.includes(ConsumerGroupState.Stable)); + assert.ok(text.includes("range")); + }); + + it("should include member count when members exist", () => { + const treeItem = new ConsumerGroupTreeItem(TEST_CCLOUD_CONSUMER_GROUP); + const text = (treeItem.tooltip as MarkdownString).value; + + assert.ok(text.includes("Members")); + }); + + it("should show warning for Empty state", () => { + const group = createConsumerGroup({ + connectionId: CCLOUD_CONNECTION_ID, + connectionType: ConnectionType.Ccloud, + environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id, + state: ConsumerGroupState.Empty, + }); + const text = (new ConsumerGroupTreeItem(group).tooltip as MarkdownString).value; + + assert.ok(text.includes("No active consumers")); + }); + + it("should show warning for Dead state", () => { + const group = createConsumerGroup({ + connectionId: CCLOUD_CONNECTION_ID, + connectionType: ConnectionType.Ccloud, + environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id, + state: ConsumerGroupState.Dead, + }); + const text = (new ConsumerGroupTreeItem(group).tooltip as MarkdownString).value; + + assert.ok(text.includes("dead and will be removed")); + }); + + it("should show warning for rebalancing states", () => { + const group = createConsumerGroup({ + connectionId: CCLOUD_CONNECTION_ID, + connectionType: ConnectionType.Ccloud, + environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id, + state: ConsumerGroupState.PreparingRebalance, + }); + const text = (new ConsumerGroupTreeItem(group).tooltip as MarkdownString).value; + + assert.ok(text.includes("currently rebalancing")); + }); + }); + }); + + describe("ConsumerTreeItem", () => { + it("should include consumerId and clientId in label", () => { + const treeItem = new ConsumerTreeItem(TEST_CCLOUD_CONSUMER); + assert.strictEqual( + treeItem.label, + `${TEST_CCLOUD_CONSUMER.consumerId} (client: ${TEST_CCLOUD_CONSUMER.clientId})`, + ); + }); + + it("should use only consumerId as label when clientId is empty", () => { + const consumer = createConsumerGroupMember({ + connectionId: CCLOUD_CONNECTION_ID, + connectionType: ConnectionType.Ccloud, + environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id, + consumerGroupId: "group-1", + clientId: "", + }); + const treeItem = new ConsumerTreeItem(consumer); + assert.strictEqual(treeItem.label, consumer.consumerId); + }); + + it("should set id from the resource", () => { + const treeItem = new ConsumerTreeItem(TEST_CCLOUD_CONSUMER); + assert.strictEqual(treeItem.id, TEST_CCLOUD_CONSUMER.id); + }); + + it("should include connection type in contextValue", () => { + const treeItem = new ConsumerTreeItem(TEST_CCLOUD_CONSUMER); + assert.strictEqual(treeItem.contextValue, "ccloud-consumer-group-member"); + }); + + it("should set collapsible state to None", () => { + const treeItem = new ConsumerTreeItem(TEST_CCLOUD_CONSUMER); + assert.strictEqual(treeItem.collapsibleState, TreeItemCollapsibleState.None); + }); + + it("should use the placeholder icon", () => { + const treeItem = new ConsumerTreeItem(TEST_CCLOUD_CONSUMER); + const icon = treeItem.iconPath as ThemeIcon; + assert.strictEqual(icon.id, IconNames.PLACEHOLDER); + }); + + describe("tooltip", () => { + it("should include consumer details", () => { + const treeItem = new ConsumerTreeItem(TEST_CCLOUD_CONSUMER); + const text = (treeItem.tooltip as MarkdownString).value; + + assert.ok(text.includes("Consumer")); + assert.ok(text.includes(TEST_CCLOUD_CONSUMER.consumerId)); + assert.ok(text.includes(TEST_CCLOUD_CONSUMER.clientId)); + assert.ok(text.includes(TEST_CCLOUD_CONSUMER.consumerGroupId)); + }); + + it("should include instanceId when present", () => { + const consumer = createConsumerGroupMember({ + connectionId: CCLOUD_CONNECTION_ID, + connectionType: ConnectionType.Ccloud, + environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id, + consumerGroupId: "group-1", + instanceId: "instance-42", + }); + const text = (new ConsumerTreeItem(consumer).tooltip as MarkdownString).value; + + assert.ok(text.includes("instance-42")); + }); + }); + }); +}); diff --git a/src/models/consumerGroup.ts b/src/models/consumerGroup.ts new file mode 100644 index 0000000000..492af9e5e6 --- /dev/null +++ b/src/models/consumerGroup.ts @@ -0,0 +1,259 @@ +import * as vscode from "vscode"; +import { ConnectionType } from "../clients/sidecar"; +import { CCLOUD_BASE_PATH } from "../constants"; +import { IconNames } from "../icons"; +import type { IdItem } from "./main"; +import { CustomMarkdownString } from "./main"; +import type { ConnectionId, EnvironmentId, IResourceBase, ISearchable } from "./resource"; + +/** + * Consumer group states as returned by the Kafka REST API. + * @see https://kafka.apache.org/20/javadoc/org/apache/kafka/common/ConsumerGroupState.html + */ +export enum ConsumerGroupState { + Dead = "DEAD", + Empty = "EMPTY", + PreparingRebalance = "PREPARING_REBALANCE", + CompletingRebalance = "COMPLETING_REBALANCE", + Stable = "STABLE", + Unknown = "UNKNOWN", +} + +/** States where the consumer group has no active consumers and offsets can be reset. */ +const INACTIVE_STATES: readonly ConsumerGroupState[] = [ + ConsumerGroupState.Empty, + ConsumerGroupState.Dead, +]; + +/** Main class representing a Kafka consumer group. */ +export class ConsumerGroup implements IResourceBase, ISearchable, IdItem { + connectionId: ConnectionId; + connectionType: ConnectionType; + environmentId: EnvironmentId; + clusterId: string; + + /** The broker ID of the group coordinator. */ + coordinatorId: number | null; + /** The partition assignor strategy (e.g., "range", "roundrobin", "sticky"). */ + partitionAssignor: string; + + consumerGroupId: string; + state: ConsumerGroupState; + members: Consumer[] = []; + /** + * Whether the group uses manual partition assignment (`assign()`) rather than dynamic + * group coordination (`subscribe()`). Simple groups only use Kafka for offset storage. + * @see https://kafka.apache.org/26/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html + */ + isSimple: boolean; + + iconName: IconNames = IconNames.CONSUMER_GROUP; + + constructor( + props: Pick< + ConsumerGroup, + | "connectionId" + | "connectionType" + | "environmentId" + | "clusterId" + | "coordinatorId" + | "partitionAssignor" + | "consumerGroupId" + | "state" + | "members" + | "isSimple" + >, + ) { + this.connectionId = props.connectionId; + this.connectionType = props.connectionType; + this.environmentId = props.environmentId; + this.clusterId = props.clusterId; + + this.coordinatorId = props.coordinatorId; + this.partitionAssignor = props.partitionAssignor; + + this.consumerGroupId = props.consumerGroupId; + this.state = props.state; + this.members = props.members ?? []; + this.isSimple = props.isSimple; + } + + get id(): string { + return `${this.clusterId}-${this.consumerGroupId}`; + } + + get hasMembers(): boolean { + return this.members.length > 0; + } + + /** Whether the consumer group is in a state that allows offset resets. */ + get canResetOffsets(): boolean { + return INACTIVE_STATES.includes(this.state); + } + + searchableText(): string { + return this.consumerGroupId; + } + + get ccloudUrl(): string { + if (this.connectionType !== ConnectionType.Ccloud) { + return ""; + } + return `https://${CCLOUD_BASE_PATH}/environments/${this.environmentId}/clusters/${this.clusterId}/clients/consumer-lag/${this.consumerGroupId}`; + } +} + +/** A member (consumer instance) of a {@link ConsumerGroup}. */ +export class Consumer implements IResourceBase, ISearchable, IdItem { + connectionId: ConnectionId; + connectionType: ConnectionType; + environmentId: EnvironmentId; + clusterId: string; + consumerGroupId: string; + + consumerId: string; + clientId: string; + /** + * Static group membership identifier (`group.instance.id`), or null if not configured. + * @see https://kafka.apache.org/26/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html + */ + instanceId: string | null; + + // https://github.com/confluentinc/vscode/issues/3233 + iconName: IconNames = IconNames.PLACEHOLDER; + + constructor( + props: Pick< + Consumer, + | "connectionId" + | "connectionType" + | "environmentId" + | "clusterId" + | "consumerGroupId" + | "consumerId" + | "clientId" + | "instanceId" + >, + ) { + this.connectionId = props.connectionId; + this.connectionType = props.connectionType; + this.environmentId = props.environmentId; + this.clusterId = props.clusterId; + this.consumerGroupId = props.consumerGroupId; + + this.consumerId = props.consumerId; + this.clientId = props.clientId; + this.instanceId = props.instanceId ?? null; + } + + get id(): string { + return `${this.clusterId}-${this.consumerGroupId}-${this.consumerId}`; + } + + searchableText(): string { + return `${this.consumerId} ${this.clientId}`; + } + + get ccloudUrl(): string { + if (this.connectionType !== ConnectionType.Ccloud) { + return ""; + } + return `https://${CCLOUD_BASE_PATH}/environments/${this.environmentId}/clusters/${this.clusterId}/clients/consumers/${this.clientId}`; + } +} + +/** Tree item representation for a {@link ConsumerGroup}. */ +export class ConsumerGroupTreeItem extends vscode.TreeItem { + resource: ConsumerGroup; + + constructor(resource: ConsumerGroup) { + super(resource.consumerGroupId); + + this.id = resource.id; + this.resource = resource; + // includes state for conditional menu visibility, like: + // "ccloud-consumer-group-STABLE" or "local-consumer-group-EMPTY" + this.contextValue = `${resource.connectionType.toLowerCase()}-consumer-group-${resource.state}`; + + this.collapsibleState = vscode.TreeItemCollapsibleState.Collapsed; + this.description = resource.state; + + const isInactive = INACTIVE_STATES.includes(resource.state); + this.iconPath = new vscode.ThemeIcon( + resource.iconName, + isInactive ? new vscode.ThemeColor("problemsWarningIcon.foreground") : undefined, + ); + + this.tooltip = createConsumerGroupTooltip(resource); + } +} + +function createConsumerGroupTooltip(resource: ConsumerGroup): CustomMarkdownString { + const tooltip = new CustomMarkdownString() + .addHeader("Consumer Group", resource.iconName) + .addField("Group ID", resource.consumerGroupId) + .addField("State", resource.state) + .addField("Partition Assignor", resource.partitionAssignor) + .addField("Simple Consumer", resource.isSimple ? "Yes" : "No"); + + if (resource.coordinatorId !== null) { + tooltip.addField("Coordinator Broker", resource.coordinatorId.toString()); + } + + if (resource.hasMembers) { + tooltip.addField("Members", resource.members.length.toString()); + } + + // warnings for non-stable states + if (resource.state === ConsumerGroupState.Empty) { + tooltip.addWarning("No active consumers in this group."); + } else if (resource.state === ConsumerGroupState.Dead) { + tooltip.addWarning("Consumer group is dead and will be removed."); + } else if ( + resource.state === ConsumerGroupState.PreparingRebalance || + resource.state === ConsumerGroupState.CompletingRebalance + ) { + tooltip.addWarning("Consumer group is currently rebalancing."); + } + + tooltip.addCCloudLink(resource.ccloudUrl); + + return tooltip; +} + +/** Tree item representation for a {@link Consumer}. */ +export class ConsumerTreeItem extends vscode.TreeItem { + resource: Consumer; + + constructor(resource: Consumer) { + const label = resource.clientId + ? `${resource.consumerId} (client: ${resource.clientId})` + : resource.consumerId; + super(label); + + this.id = resource.id; + this.resource = resource; + this.contextValue = `${resource.connectionType.toLowerCase()}-consumer-group-member`; + + this.collapsibleState = vscode.TreeItemCollapsibleState.None; + + this.iconPath = new vscode.ThemeIcon(resource.iconName); + this.tooltip = createConsumerTooltip(resource); + } +} + +function createConsumerTooltip(resource: Consumer): CustomMarkdownString { + const tooltip = new CustomMarkdownString() + .addHeader("Consumer", IconNames.PLACEHOLDER) + .addField("Consumer ID", resource.consumerId) + .addField("Client ID", resource.clientId) + .addField("Group", resource.consumerGroupId); + + if (resource.instanceId) { + tooltip.addField("Instance ID", resource.instanceId); + } + + tooltip.addCCloudLink(resource.ccloudUrl); + + return tooltip; +} diff --git a/src/models/flinkDatabaseResourceContainer.test.ts b/src/models/flinkDatabaseResourceContainer.test.ts index 6e952540a1..6bbb450d07 100644 --- a/src/models/flinkDatabaseResourceContainer.test.ts +++ b/src/models/flinkDatabaseResourceContainer.test.ts @@ -4,10 +4,9 @@ import { ThemeIcon, TreeItemCollapsibleState } from "vscode"; import { createFakeFlinkDatabaseResource } from "../../tests/unit/testResources/flinkDatabaseResource"; import { ConnectionType } from "../clients/sidecar"; import { CCLOUD_CONNECTION_ID } from "../constants"; -import { IconNames } from "../icons"; +import { ERROR_ICON, IconNames } from "../icons"; import type { FlinkDatabaseResource } from "./flinkDatabaseResource"; import { - ERROR_ICON, FlinkDatabaseResourceContainer, LOADING_POLL_INTERVAL_MS, } from "./flinkDatabaseResourceContainer"; diff --git a/src/models/flinkDatabaseResourceContainer.ts b/src/models/flinkDatabaseResourceContainer.ts index 6611af94a4..686d550396 100644 --- a/src/models/flinkDatabaseResourceContainer.ts +++ b/src/models/flinkDatabaseResourceContainer.ts @@ -1,7 +1,7 @@ -import { ThemeColor, ThemeIcon, TreeItem, TreeItemCollapsibleState } from "vscode"; +import { ThemeIcon, TreeItem, TreeItemCollapsibleState } from "vscode"; import { ConnectionType } from "../clients/sidecar"; import { CCLOUD_CONNECTION_ID } from "../constants"; -import { IconNames } from "../icons"; +import { ERROR_ICON, IconNames } from "../icons"; import { Logger } from "../logging"; import type { FlinkArtifact } from "./flinkArtifact"; import type { FlinkDatabaseResource } from "./flinkDatabaseResource"; @@ -18,9 +18,6 @@ export enum FlinkDatabaseContainerLabel { AI_AGENTS = "AI Agents", } -/** Error icon to use for Flink Database resource containers items if fetching resources fails. */ -export const ERROR_ICON = new ThemeIcon("warning", new ThemeColor("problemsErrorIcon.foreground")); - /** Poll interval to use when waiting for a container to finish loading. */ export const LOADING_POLL_INTERVAL_MS = 100; diff --git a/tests/unit/testResources/consumerGroup.ts b/tests/unit/testResources/consumerGroup.ts new file mode 100644 index 0000000000..ff61595f7d --- /dev/null +++ b/tests/unit/testResources/consumerGroup.ts @@ -0,0 +1,117 @@ +import { ConnectionType } from "../../../src/clients/sidecar"; +import { CCLOUD_CONNECTION_ID, LOCAL_CONNECTION_ID } from "../../../src/constants"; +import { Consumer, ConsumerGroup, ConsumerGroupState } from "../../../src/models/consumerGroup"; +import { TEST_DIRECT_CONNECTION_ID } from "./connection"; +import { + TEST_CCLOUD_KAFKA_CLUSTER, + TEST_DIRECT_KAFKA_CLUSTER, + TEST_LOCAL_KAFKA_CLUSTER, +} from "./kafkaCluster"; + +/** Create a {@link ConsumerGroup} for testing purposes. */ +export function createConsumerGroup( + args: { + connectionId: string; + connectionType: ConnectionType; + environmentId: string; + clusterId: string; + } & Partial, +): ConsumerGroup { + return new ConsumerGroup({ + connectionId: args.connectionId, + connectionType: args.connectionType, + environmentId: args.environmentId, + clusterId: args.clusterId, + consumerGroupId: args.consumerGroupId ?? "test-consumer-group", + state: args.state ?? ConsumerGroupState.Stable, + isSimple: args.isSimple ?? false, + partitionAssignor: args.partitionAssignor ?? "range", + coordinatorId: args.coordinatorId ?? 0, + members: args.members ?? [], + }); +} + +/** Create a {@link Consumer} for testing purposes. */ +export function createConsumerGroupMember( + args: { + connectionId: string; + connectionType: ConnectionType; + environmentId: string; + clusterId: string; + consumerGroupId: string; + } & Partial, +): Consumer { + return new Consumer({ + connectionId: args.connectionId, + connectionType: args.connectionType, + environmentId: args.environmentId, + clusterId: args.clusterId, + consumerGroupId: args.consumerGroupId, + consumerId: args.consumerId ?? "test-consumer-1", + clientId: args.clientId ?? "test-client", + instanceId: args.instanceId ?? null, + }); +} + +export const TEST_CCLOUD_CONSUMER_GROUP_ID = "test-ccloud-consumer-group"; +export const TEST_CCLOUD_CONSUMER_GROUP = createConsumerGroup({ + connectionId: CCLOUD_CONNECTION_ID, + connectionType: ConnectionType.Ccloud, + environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id, + consumerGroupId: TEST_CCLOUD_CONSUMER_GROUP_ID, + members: [ + createConsumerGroupMember({ + connectionId: CCLOUD_CONNECTION_ID, + connectionType: ConnectionType.Ccloud, + environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId, + clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id, + consumerGroupId: TEST_CCLOUD_CONSUMER_GROUP_ID, + consumerId: "consumer-ccloud-1", + clientId: "my-ccloud-app", + }), + ], +}); +export const TEST_CCLOUD_CONSUMER = TEST_CCLOUD_CONSUMER_GROUP.members[0]; + +export const TEST_DIRECT_CONSUMER_GROUP_ID = "test-direct-consumer-group"; +export const TEST_DIRECT_CONSUMER_GROUP = createConsumerGroup({ + connectionId: TEST_DIRECT_CONNECTION_ID, + connectionType: ConnectionType.Direct, + environmentId: TEST_DIRECT_KAFKA_CLUSTER.environmentId, + clusterId: TEST_DIRECT_KAFKA_CLUSTER.id, + consumerGroupId: TEST_DIRECT_CONSUMER_GROUP_ID, + members: [ + createConsumerGroupMember({ + connectionId: TEST_DIRECT_CONNECTION_ID, + connectionType: ConnectionType.Direct, + environmentId: TEST_DIRECT_KAFKA_CLUSTER.environmentId, + clusterId: TEST_DIRECT_KAFKA_CLUSTER.id, + consumerGroupId: TEST_DIRECT_CONSUMER_GROUP_ID, + consumerId: "consumer-direct-1", + clientId: "my-direct-app", + }), + ], +}); +export const TEST_DIRECT_CONSUMER = TEST_DIRECT_CONSUMER_GROUP.members[0]; + +export const TEST_LOCAL_CONSUMER_GROUP_ID = "test-local-consumer-group"; +export const TEST_LOCAL_CONSUMER_GROUP = createConsumerGroup({ + connectionId: LOCAL_CONNECTION_ID, + connectionType: ConnectionType.Local, + environmentId: TEST_LOCAL_KAFKA_CLUSTER.environmentId, + clusterId: TEST_LOCAL_KAFKA_CLUSTER.id, + consumerGroupId: TEST_LOCAL_CONSUMER_GROUP_ID, + members: [ + createConsumerGroupMember({ + connectionId: LOCAL_CONNECTION_ID, + connectionType: ConnectionType.Local, + environmentId: TEST_LOCAL_KAFKA_CLUSTER.environmentId, + clusterId: TEST_LOCAL_KAFKA_CLUSTER.id, + consumerGroupId: TEST_LOCAL_CONSUMER_GROUP_ID, + consumerId: "consumer-local-1", + clientId: "my-local-app", + }), + ], +}); +export const TEST_LOCAL_CONSUMER = TEST_LOCAL_CONSUMER_GROUP.members[0];