Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,12 @@
"title": "Refresh",
"category": "Confluent: Topics"
},
{
"command": "confluent.topics.refreshResourceContainer",
"icon": "$(sync)",
"title": "Refresh Group",
"category": "Confluent: Topics View"
},
{
"command": "confluent.topics.search",
"icon": "$(search)",
Expand Down Expand Up @@ -1722,6 +1728,10 @@
"command": "confluent.topics.refresh",
"when": "true"
},
{
"command": "confluent.topics.refreshResourceContainer",
"when": "false"
},
{
"command": "confluent.topics.search",
"when": "true"
Expand Down Expand Up @@ -2192,6 +2202,21 @@
"when": "view == confluent-topics && viewItem =~ /ccloud-kafka-topic.*-flinkable.*/",
"group": "z_openInCloud"
},
{
"command": "confluent.topics.refreshResourceContainer",
"when": "view == confluent-topics && viewItem =~ /.*-container/",
"group": "inline@2"
},
{
"command": "confluent.copyResourceId",
"when": "view == confluent-topics && viewItem =~ /.*-consumer-group-.*/",
"group": "2_copy@1"
},
{
"command": "confluent.openCCloudLink",
"when": "view == confluent-topics && viewItem =~ /ccloud-consumer-group-.*/",
"group": "z_openInCloud"
},
{
"command": "confluent.artifacts.createUdfRegistrationDocument",
"when": "view == confluent-flink-database && viewItem =~ /.*-flink-artifact/",
Expand Down
135 changes: 135 additions & 0 deletions src/commands/topicsView.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import * as assert from "assert";
import * as sinon from "sinon";

import * as indexModule from ".";

import { refreshResourceContainerCommand, registerTopicsViewCommands } from "./topicsView";

import { ConnectionType } from "../clients/sidecar";
import {
KafkaClusterContainerLabel,
KafkaClusterResourceContainer,
} from "../models/containers/kafkaClusterResourceContainer";
import type { ConsumerGroup } from "../models/consumerGroup";
import type { KafkaTopic } from "../models/topic";
import { TopicViewProvider } from "../viewProviders/topics";
import { TEST_CCLOUD_KAFKA_CLUSTER } from "../../tests/unit/testResources/kafkaCluster";

describe("commands/topicsView.ts", () => {
let sandbox: sinon.SinonSandbox;

beforeEach(() => {
sandbox = sinon.createSandbox();
});

afterEach(() => {
sandbox.restore();
});

describe("registerTopicsViewCommands", () => {
let registerCommandWithLoggingStub: sinon.SinonStub;

beforeEach(() => {
registerCommandWithLoggingStub = sandbox.stub(indexModule, "registerCommandWithLogging");
});

it("should register the expected commands", () => {
registerTopicsViewCommands();

assert.strictEqual(registerCommandWithLoggingStub.callCount, 1);

sinon.assert.calledWithExactly(
registerCommandWithLoggingStub,
"confluent.topics.refreshResourceContainer",
refreshResourceContainerCommand,
);
});
});

describe("refreshResourceContainerCommand", () => {
let provider: TopicViewProvider;
let refreshTopicsStub: sinon.SinonStub;
let refreshConsumerGroupsStub: sinon.SinonStub;

beforeEach(() => {
provider = TopicViewProvider.getInstance();
provider["resource"] = TEST_CCLOUD_KAFKA_CLUSTER;

refreshTopicsStub = sandbox.stub(provider, "refreshTopics").resolves();
refreshConsumerGroupsStub = sandbox.stub(provider, "refreshConsumerGroups").resolves();
});

afterEach(() => {
provider.dispose();
TopicViewProvider["instanceMap"].clear();
});

it("should bail early if no container is provided", async () => {
await refreshResourceContainerCommand(undefined as any);

sinon.assert.notCalled(refreshTopicsStub);
sinon.assert.notCalled(refreshConsumerGroupsStub);
});

it("should bail early if no Kafka cluster is selected", async () => {
provider["resource"] = null;
const container = new KafkaClusterResourceContainer<ConsumerGroup>(
TEST_CCLOUD_KAFKA_CLUSTER.connectionId,
ConnectionType.Ccloud,
KafkaClusterContainerLabel.CONSUMER_GROUPS,
[],
);

await refreshResourceContainerCommand(container);

sinon.assert.notCalled(refreshTopicsStub);
sinon.assert.notCalled(refreshConsumerGroupsStub);
});

it("should call refreshTopics when the Topics container is provided", async () => {
const container = new KafkaClusterResourceContainer<KafkaTopic>(
TEST_CCLOUD_KAFKA_CLUSTER.connectionId,
ConnectionType.Ccloud,
KafkaClusterContainerLabel.TOPICS,
[],
);

await refreshResourceContainerCommand(container);

sinon.assert.calledOnceWithExactly(refreshTopicsStub, TEST_CCLOUD_KAFKA_CLUSTER, true);
sinon.assert.notCalled(refreshConsumerGroupsStub);
});

it("should call refreshConsumerGroups when the Consumer Groups container is provided", async () => {
const container = new KafkaClusterResourceContainer<ConsumerGroup>(
TEST_CCLOUD_KAFKA_CLUSTER.connectionId,
ConnectionType.Ccloud,
KafkaClusterContainerLabel.CONSUMER_GROUPS,
[],
);

await refreshResourceContainerCommand(container);

sinon.assert.notCalled(refreshTopicsStub);
sinon.assert.calledOnceWithExactly(
refreshConsumerGroupsStub,
TEST_CCLOUD_KAFKA_CLUSTER,
true,
);
});

it("should log an error for an unknown container label", async () => {
const container = new KafkaClusterResourceContainer(
TEST_CCLOUD_KAFKA_CLUSTER.connectionId,
ConnectionType.Ccloud,
"Unknown Label" as any,
[],
);

await refreshResourceContainerCommand(container);

sinon.assert.notCalled(refreshTopicsStub);
sinon.assert.notCalled(refreshConsumerGroupsStub);
});
});
});
54 changes: 54 additions & 0 deletions src/commands/topicsView.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import type { Disposable } from "vscode";
import { registerCommandWithLogging } from ".";
import { Logger } from "../logging";
import type { ISearchable } from "../models/resource";
import {
KafkaClusterContainerLabel,
type KafkaClusterResourceContainer,
} from "../models/containers/kafkaClusterResourceContainer";
import { TopicViewProvider } from "../viewProviders/topics";

const logger = new Logger("commands.topicsView");

/**
* Refresh a resource container (Topics or Consumer Groups) in the Topics view.
* @param container The {@link KafkaClusterResourceContainer} tree item that was clicked.
*/
export async function refreshResourceContainerCommand(
container: KafkaClusterResourceContainer<ISearchable>,
): Promise<void> {
if (!container) {
logger.error("No container provided to refreshResourceContainerCommand");
return;
}

const provider = TopicViewProvider.getInstance();
const cluster = provider.kafkaCluster;
if (!cluster) {
logger.error("No Kafka cluster selected when attempting to refresh resource container.");
return;
}

switch (container.label) {
case KafkaClusterContainerLabel.TOPICS:
await provider.refreshTopics(cluster, true);
break;
case KafkaClusterContainerLabel.CONSUMER_GROUPS:
await provider.refreshConsumerGroups(cluster, true);
break;
default:
logger.error(
`Unknown container label "${container.label}" in refreshResourceContainerCommand`,
);
}
}

/** Register commands for the Topics view's container-level actions. */
export function registerTopicsViewCommands(): Disposable[] {
return [
registerCommandWithLogging(
"confluent.topics.refreshResourceContainer",
refreshResourceContainerCommand,
),
];
}
2 changes: 2 additions & 0 deletions src/extension.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { registerChatTools } from "./chat/tools/registration";
import { FlinkSqlCodelensProvider } from "./codelens/flinkSqlProvider";
import { registerCommandWithLogging } from "./commands";
import { registerConnectionCommands } from "./commands/connections";
import { registerTopicsViewCommands } from "./commands/topicsView";
import { registerDebugCommands } from "./commands/debugtools";
import { registerDiffCommands } from "./commands/diffs";
import { registerDockerCommands } from "./commands/docker";
Expand Down Expand Up @@ -263,6 +264,7 @@ async function _activateExtension(
...registerSchemaRegistryCommands(),
...registerSchemaCommands(),
...registerTopicCommands(),
...registerTopicsViewCommands(),
...registerDiffCommands(),
...registerExtraCommands(),
...registerDockerCommands(),
Expand Down
69 changes: 53 additions & 16 deletions src/models/consumerGroup.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@ import { TEST_CCLOUD_KAFKA_CLUSTER } from "../../tests/unit/testResources/kafkaC
import { ConnectionType } from "../clients/sidecar";
import { CCLOUD_BASE_PATH, CCLOUD_CONNECTION_ID } from "../constants";
import { IconNames } from "../icons";
import { ConsumerGroupState, ConsumerGroupTreeItem, ConsumerTreeItem } from "./consumerGroup";
import {
ConsumerGroup,
ConsumerGroupState,
ConsumerGroupTreeItem,
ConsumerTreeItem,
} from "./consumerGroup";

describe("models/consumerGroup.ts", () => {
describe("ConsumerGroup", () => {
describe("id", () => {
it("should return clusterId-consumerGroupId", () => {
assert.strictEqual(
TEST_CCLOUD_CONSUMER_GROUP.id,
`${TEST_CCLOUD_CONSUMER_GROUP.clusterId}-${TEST_CCLOUD_CONSUMER_GROUP_ID}`,
);
it("should return consumerGroupId", () => {
assert.strictEqual(TEST_CCLOUD_CONSUMER_GROUP.id, TEST_CCLOUD_CONSUMER_GROUP_ID);
});
});

Expand Down Expand Up @@ -111,12 +113,8 @@ describe("models/consumerGroup.ts", () => {

describe("Consumer", () => {
describe("id", () => {
it("should return clusterId-consumerGroupId-consumerId", () => {
const consumer = TEST_CCLOUD_CONSUMER;
assert.strictEqual(
consumer.id,
`${consumer.clusterId}-${consumer.consumerGroupId}-${consumer.consumerId}`,
);
it("should return consumerId", () => {
assert.strictEqual(TEST_CCLOUD_CONSUMER.id, TEST_CCLOUD_CONSUMER.consumerId);
});
});

Expand Down Expand Up @@ -179,10 +177,11 @@ describe("models/consumerGroup.ts", () => {
assert.strictEqual(treeItem.label, TEST_CCLOUD_CONSUMER_GROUP.consumerGroupId);
});

it("should set id from the resource", () => {
it("should set id as clusterId-consumerGroupId", () => {
const treeItem = new ConsumerGroupTreeItem(TEST_CCLOUD_CONSUMER_GROUP);
const group = TEST_CCLOUD_CONSUMER_GROUP;

assert.strictEqual(treeItem.id, TEST_CCLOUD_CONSUMER_GROUP.id);
assert.strictEqual(treeItem.id, `${group.clusterId}-${group.consumerGroupId}`);
});

it("should include connection type and state in contextValue", () => {
Expand Down Expand Up @@ -315,6 +314,39 @@ describe("models/consumerGroup.ts", () => {

assert.ok(text.includes("currently rebalancing"));
});

it("should show 'Yes' for simple consumer groups", () => {
const group = createConsumerGroup({
connectionId: CCLOUD_CONNECTION_ID,
connectionType: ConnectionType.Ccloud,
environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId,
clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id,
isSimple: true,
});
const item = new ConsumerGroupTreeItem(group);
const tooltip = (item.tooltip as MarkdownString).value;

assert.ok(tooltip.includes("Yes"));
});

it("should omit coordinator when coordinatorId is null", () => {
const group = new ConsumerGroup({
connectionId: CCLOUD_CONNECTION_ID,
connectionType: ConnectionType.Ccloud,
environmentId: TEST_CCLOUD_KAFKA_CLUSTER.environmentId,
clusterId: TEST_CCLOUD_KAFKA_CLUSTER.id,
consumerGroupId: "test-group",
state: ConsumerGroupState.Stable,
isSimple: false,
partitionAssignor: "range",
coordinatorId: null,
members: [],
});
const item = new ConsumerGroupTreeItem(group);
const tooltip = (item.tooltip as MarkdownString).value;

assert.ok(!tooltip.includes("Coordinator Broker"));
});
});
});

Expand All @@ -340,9 +372,14 @@ describe("models/consumerGroup.ts", () => {
assert.strictEqual(treeItem.label, consumer.consumerId);
});

it("should set id from the resource", () => {
it("should set id as clusterId-clientId-consumerId", () => {
const treeItem = new ConsumerTreeItem(TEST_CCLOUD_CONSUMER);
assert.strictEqual(treeItem.id, TEST_CCLOUD_CONSUMER.id);
const consumer = TEST_CCLOUD_CONSUMER;

assert.strictEqual(
treeItem.id,
`${consumer.clusterId}-${consumer.clientId}-${consumer.consumerId}`,
);
});

it("should include connection type in contextValue", () => {
Expand Down
Loading
Loading