Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ All notable changes to this extension will be documented in this file.
- Submitted Flink statements now allow you to choose your own statement name prefix (or to leave
generic). Set your prefix using preference:
[`confluent.flink.statementPrefix`](vscode://settings/confluent.flink.statementPrefix).
- Clicking a statement from the Flink Statements view will now open an editable document instead of
a read-only preview document. This allows users to more easily edit and resubmit statements.

### Fixed

Expand Down
124 changes: 73 additions & 51 deletions src/commands/flinkStatements.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import assert from "assert";
import sinon from "sinon";
import * as vscode from "vscode";

import { TextDocument } from "vscode-json-languageservice";
import { TextDocument } from "vscode";
import { eventEmitterStubs } from "../../tests/stubs/emitters";
import { getStubbedCCloudResourceLoader } from "../../tests/stubs/resourceLoaders";
import {
Expand All @@ -13,7 +12,7 @@ import {
import { TEST_CCLOUD_FLINK_COMPUTE_POOL } from "../../tests/unit/testResources/flinkComputePool";
import { createFlinkStatement } from "../../tests/unit/testResources/flinkStatement";
import * as flinkCodeLens from "../codelens/flinkSqlProvider";
import { FlinkStatementDocumentProvider } from "../documentProviders/flinkStatement";
import { FLINK_SQL_LANGUAGE_ID } from "../flinkSql/constants";
import * as statementUtils from "../flinkSql/statementUtils";
import { CCloudResourceLoader } from "../loaders";
import { CCloudEnvironment } from "../models/environment";
Expand Down Expand Up @@ -45,14 +44,42 @@ describe("commands/flinkStatements.ts", () => {

describe("viewStatementSqlCommand", () => {
let getCatalogDatabaseFromMetadataStub: sinon.SinonStub;
let workspaceTextDocumentsStub: sinon.SinonStub;
let openTextDocumentStub: sinon.SinonStub;
let showTextDocumentStub: sinon.SinonStub;
let setUriMetadataStub: sinon.SinonStub;

const testPool = TEST_CCLOUD_FLINK_COMPUTE_POOL;
const testEnv = new CCloudEnvironment({
...TEST_CCLOUD_ENVIRONMENT,
flinkComputePools: [testPool],
});

const testSqlStatement = "SELECT * FROM my_test_flink_statement_table";
const testStatement: FlinkStatement = createFlinkStatement({
sqlStatement: testSqlStatement,
});
const testDoc = {
languageId: FLINK_SQL_LANGUAGE_ID,
content: testSqlStatement,
uri: vscode.Uri.parse("untitled:SELECT1"),
getText: () => testSqlStatement,
} as unknown as TextDocument;

beforeEach(() => {
getCatalogDatabaseFromMetadataStub = sandbox.stub(
flinkCodeLens,
"getCatalogDatabaseFromMetadata",
);
stubbedLoader.getEnvironments.resolves([testEnv]);
stubbedLoader.getFlinkComputePool.resolves(testPool);

getCatalogDatabaseFromMetadataStub = sandbox
.stub(flinkCodeLens, "getCatalogDatabaseFromMetadata")
.resolves({
catalog: testEnv,
database: TEST_CCLOUD_KAFKA_CLUSTER,
});

// no open documents in the workspace by default
workspaceTextDocumentsStub = sandbox.stub(vscode.workspace, "textDocuments").get(() => []);
openTextDocumentStub = sandbox.stub(vscode.workspace, "openTextDocument").resolves(testDoc);
showTextDocumentStub = sandbox.stub(vscode.window, "showTextDocument");
setUriMetadataStub = sandbox.stub(ResourceManager.getInstance(), "setUriMetadata");
});
Expand All @@ -67,77 +94,72 @@ describe("commands/flinkStatements.ts", () => {
assert.strictEqual(result, undefined);
});

it("should open a read-only document for a FlinkStatement", async () => {
const testPool = TEST_CCLOUD_FLINK_COMPUTE_POOL;
const testEnv = new CCloudEnvironment({
...TEST_CCLOUD_ENVIRONMENT,
flinkComputePools: [testPool],
});
stubbedLoader.getEnvironments.resolves([testEnv]);
stubbedLoader.getFlinkComputePool.resolves(testPool);
getCatalogDatabaseFromMetadataStub.returns({
catalog: testEnv,
database: TEST_CCLOUD_KAFKA_CLUSTER,
});

const statement = createFlinkStatement({
sqlStatement: "SELECT * FROM my_test_flink_statement_table",
});
const uri = FlinkStatementDocumentProvider.getStatementDocumentUri(statement);
it("should create and open an untitled document for a FlinkStatement", async () => {
// no existing open documents by default

await viewStatementSqlCommand(statement);
await viewStatementSqlCommand(testStatement);

sinon.assert.calledOnce(stubbedLoader.getFlinkComputePool);
sinon.assert.calledWithExactly(stubbedLoader.getFlinkComputePool, statement.computePoolId!);
sinon.assert.calledWithExactly(
stubbedLoader.getFlinkComputePool,
testStatement.computePoolId!,
);

sinon.assert.calledOnce(getCatalogDatabaseFromMetadataStub);
sinon.assert.calledWithExactly(
getCatalogDatabaseFromMetadataStub,
{
[UriMetadataKeys.FLINK_COMPUTE_POOL_ID]: statement.computePoolId,
[UriMetadataKeys.FLINK_CATALOG_NAME]: statement.catalog,
[UriMetadataKeys.FLINK_DATABASE_NAME]: statement.database,
[UriMetadataKeys.FLINK_COMPUTE_POOL_ID]: testStatement.computePoolId,
[UriMetadataKeys.FLINK_CATALOG_NAME]: testStatement.catalog,
[UriMetadataKeys.FLINK_DATABASE_NAME]: testStatement.database,
},
testPool,
);

sinon.assert.calledOnce(openTextDocumentStub);
sinon.assert.calledOnceWithExactly(openTextDocumentStub, {
language: FLINK_SQL_LANGUAGE_ID,
content: testStatement.sqlStatement,
});

sinon.assert.calledOnce(setUriMetadataStub); // params tested in separate test

sinon.assert.calledOnce(showTextDocumentStub);
const document: TextDocument = showTextDocumentStub.firstCall.args[0];
assert.strictEqual(document.uri.toString(), uri.toString());
assert.strictEqual(document.uri.scheme, "untitled");
sinon.assert.calledWithExactly(showTextDocumentStub, document, { preview: false });
});

it("should set Uri metadata before opening the document", async () => {
const testPool = TEST_CCLOUD_FLINK_COMPUTE_POOL;
const testEnv = new CCloudEnvironment({
...TEST_CCLOUD_ENVIRONMENT,
flinkComputePools: [testPool],
});
stubbedLoader.getEnvironments.resolves([testEnv]);
stubbedLoader.getFlinkComputePool.resolves(testPool);
getCatalogDatabaseFromMetadataStub.returns({
catalog: testEnv,
database: TEST_CCLOUD_KAFKA_CLUSTER,
});
it("should show an existing untitled document for a FlinkStatement", async () => {
// simulate an existing open document with the same SQL as the statement
workspaceTextDocumentsStub.get(() => [testDoc]);

const statement = createFlinkStatement({
sqlStatement: "SELECT * FROM my_test_flink_statement_table",
});
const uri = FlinkStatementDocumentProvider.getStatementDocumentUri(statement);
await viewStatementSqlCommand(testStatement);

// same assertions as the previous test, except we don't create a new document or set metadata
sinon.assert.notCalled(openTextDocumentStub);
sinon.assert.notCalled(setUriMetadataStub);
sinon.assert.calledOnce(showTextDocumentStub);
const document: TextDocument = showTextDocumentStub.firstCall.args[0];
assert.strictEqual(document.uri.scheme, "untitled");
sinon.assert.calledWithExactly(showTextDocumentStub, document, { preview: false });
});

await viewStatementSqlCommand(statement);
it("should set Uri metadata before showing the document", async () => {
await viewStatementSqlCommand(testStatement);

sinon.assert.calledOnce(setUriMetadataStub);
const callArgs = setUriMetadataStub.firstCall.args;
assert.strictEqual(callArgs.length, 2);
assert.strictEqual(callArgs[0].toString(), uri.toString());
assert.strictEqual(callArgs[0].toString(), testDoc.uri.toString());
assert.deepStrictEqual(callArgs[1], {
[UriMetadataKeys.FLINK_COMPUTE_POOL_ID]: statement.computePoolId,
[UriMetadataKeys.FLINK_COMPUTE_POOL_ID]: testStatement.computePoolId,
[UriMetadataKeys.FLINK_CATALOG_ID]: TEST_CCLOUD_ENVIRONMENT.id,
[UriMetadataKeys.FLINK_CATALOG_NAME]: statement.catalog,
[UriMetadataKeys.FLINK_CATALOG_NAME]: testStatement.catalog,
[UriMetadataKeys.FLINK_DATABASE_ID]: TEST_CCLOUD_KAFKA_CLUSTER.id,
[UriMetadataKeys.FLINK_DATABASE_NAME]: statement.database,
[UriMetadataKeys.FLINK_DATABASE_NAME]: testStatement.database,
});
sinon.assert.callOrder(openTextDocumentStub, setUriMetadataStub, showTextDocumentStub);
});
});

Expand Down
42 changes: 20 additions & 22 deletions src/commands/flinkStatements.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
import * as vscode from "vscode";
import { registerCommandWithLogging } from ".";
import { getCatalogDatabaseFromMetadata } from "../codelens/flinkSqlProvider";
import {
FLINKSTATEMENT_URI_SCHEME,
FlinkStatementDocumentProvider,
} from "../documentProviders/flinkStatement";
import { udfsChanged } from "../emitters";
import { extractResponseBody, isResponseError, logError } from "../errors";
import { FLINK_SQL_FILE_EXTENSIONS, FLINK_SQL_LANGUAGE_ID } from "../flinkSql/constants";
Expand Down Expand Up @@ -36,25 +32,15 @@ const logger = new Logger("commands.flinkStatements");

/** View the SQL statement portion of a FlinkStatement in a read-only document. */
export async function viewStatementSqlCommand(statement: FlinkStatement): Promise<void> {
if (!statement) {
logger.error("viewStatementSqlCommand", "statement is undefined");
return;
}

if (!(statement instanceof FlinkStatement)) {
logger.error("viewStatementSqlCommand", "statement is not an instance of FlinkStatement");
return;
}

if (!statement.computePoolId) {
logger.error("viewStatementSqlCommand", "statement has no computePoolId");
logger.error("statement is not an instance of FlinkStatement");
return;
}

const uri = FlinkStatementDocumentProvider.getStatementDocumentUri(statement);

const loader = CCloudResourceLoader.getInstance();
const pool = await loader.getFlinkComputePool(statement.computePoolId);
// if we're opening a statement from the Flink Statements view, it was submitted to a pool and
// we'll have the computePoolId
const pool = await loader.getFlinkComputePool(statement.computePoolId!);
if (!pool) {
logger.error(
"viewStatementSqlCommand",
Expand All @@ -78,10 +64,22 @@ export async function viewStatementSqlCommand(statement: FlinkStatement): Promis
updatedMetadata[UriMetadataKeys.FLINK_DATABASE_ID] = database.id;
}
const rm = ResourceManager.getInstance();
await rm.setUriMetadata(uri, updatedMetadata);

const doc = await vscode.workspace.openTextDocument(uri);
vscode.languages.setTextDocumentLanguage(doc, "flinksql");
// check if any existing document is already open with this statement's SQL content
// (exact match for now, but if we wanted to add header comments to new untitled documents and/or
// check for "document contains statement SQL" we'll need to update this logic)
let doc: vscode.TextDocument | undefined = vscode.workspace.textDocuments.find(
(doc) => doc.languageId === FLINK_SQL_LANGUAGE_ID && doc.getText() === statement.sqlStatement,
);
if (!doc) {
// create a new untitled doc, set its language and content, then update its metadata before
// showing it to the user
doc = await vscode.workspace.openTextDocument({
language: FLINK_SQL_LANGUAGE_ID,
content: statement.sqlStatement,
});
await rm.setUriMetadata(doc.uri, updatedMetadata);
}
await vscode.window.showTextDocument(doc, { preview: false });
}
/**
Expand All @@ -107,7 +105,7 @@ export async function submitFlinkStatementCommand(
const funcLogger = logger.withCallpoint("submitFlinkStatementCommand");

// 1. Choose the document with the SQL to submit
const uriSchemes = ["file", "untitled", FLINKSTATEMENT_URI_SCHEME];
const uriSchemes = ["file", "untitled"];
const languageIds = ["plaintext", FLINK_SQL_LANGUAGE_ID, "sql"];
const fileFilters = {
"FlinkSQL files": [...FLINK_SQL_FILE_EXTENSIONS, ".sql"],
Expand Down
Loading