From fb64079a17495577ae87c4b65555caa3938e82e1 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Fri, 27 Feb 2026 23:33:50 -0500 Subject: [PATCH 1/3] add custom block parser for flinksql documents --- src/documentParsing/flinkSql.test.ts | 593 +++++++++++++++++++++++++++ src/documentParsing/flinkSql.ts | 346 ++++++++++++++++ 2 files changed, 939 insertions(+) create mode 100644 src/documentParsing/flinkSql.test.ts create mode 100644 src/documentParsing/flinkSql.ts diff --git a/src/documentParsing/flinkSql.test.ts b/src/documentParsing/flinkSql.test.ts new file mode 100644 index 0000000000..ec38a76d51 --- /dev/null +++ b/src/documentParsing/flinkSql.test.ts @@ -0,0 +1,593 @@ +import * as assert from "assert"; +import * as sinon from "sinon"; +import { Position, Range, Uri } from "vscode"; +import * as fileUtils from "../utils/file"; +import type { ExecutableBlock, ParsedStatement } from "./flinkSql"; +import { + classifyStatementType, + getBlockAtLine, + groupStatementsIntoBlocks, + parseFlinkSqlDocument, + StatementType, +} from "./flinkSql"; + +describe("documentParsing/flinkSql.ts", () => { + let sandbox: sinon.SinonSandbox; + + beforeEach(() => { + sandbox = sinon.createSandbox(); + }); + + afterEach(() => { + sandbox.restore(); + }); + + describe("classifyStatementType()", () => { + it("should identify SET statements", () => { + assert.strictEqual(classifyStatementType("SET 'key' = 'value';"), StatementType.SET); + assert.strictEqual(classifyStatementType(" SET 'key' = 'value'"), StatementType.SET); + assert.strictEqual(classifyStatementType("set 'key' = 'value';"), StatementType.SET); + }); + + it("should identify USE statements", () => { + assert.strictEqual(classifyStatementType("USE CATALOG my_catalog;"), StatementType.USE); + assert.strictEqual(classifyStatementType("USE my_database;"), StatementType.USE); + assert.strictEqual(classifyStatementType("USE MODULES core;"), StatementType.USE); + assert.strictEqual(classifyStatementType(" use catalog test"), StatementType.USE); + }); + + it("should identify SELECT as EXECUTABLE", () => { + assert.strictEqual(classifyStatementType("SELECT * FROM table1;"), StatementType.EXECUTABLE); + assert.strictEqual( + classifyStatementType(" select id from users;"), + StatementType.EXECUTABLE, + ); + }); + + it("should identify INSERT as EXECUTABLE", () => { + assert.strictEqual( + classifyStatementType("INSERT INTO table2 SELECT * FROM table1;"), + StatementType.EXECUTABLE, + ); + }); + + it("should identify CREATE as EXECUTABLE", () => { + assert.strictEqual( + classifyStatementType("CREATE TABLE my_table (id INT);"), + StatementType.EXECUTABLE, + ); + assert.strictEqual( + classifyStatementType("CREATE FUNCTION my_func AS 'MyClass';"), + StatementType.EXECUTABLE, + ); + }); + + it("should identify DROP as EXECUTABLE", () => { + assert.strictEqual(classifyStatementType("DROP TABLE my_table;"), StatementType.EXECUTABLE); + }); + + it("should identify ALTER as EXECUTABLE", () => { + assert.strictEqual( + classifyStatementType("ALTER TABLE my_table ADD COLUMN name STRING;"), + StatementType.EXECUTABLE, + ); + }); + + it("should identify SHOW as EXECUTABLE", () => { + assert.strictEqual(classifyStatementType("SHOW TABLES;"), StatementType.EXECUTABLE); + assert.strictEqual(classifyStatementType("SHOW DATABASES;"), StatementType.EXECUTABLE); + }); + + it("should identify DESCRIBE as EXECUTABLE", () => { + assert.strictEqual(classifyStatementType("DESCRIBE my_table;"), StatementType.EXECUTABLE); + assert.strictEqual(classifyStatementType("DESC my_table;"), StatementType.EXECUTABLE); + }); + }); + + describe("groupStatementsIntoBlocks()", () => { + function createStatement( + text: string, + type: StatementType, + startLine: number, + endLine: number, + ): ParsedStatement { + return { + text, + type, + range: new Range(new Position(startLine, 0), new Position(endLine, text.length)), + }; + } + + it("should create one block for a single executable statement", () => { + const statements: ParsedStatement[] = [ + createStatement("SELECT * FROM table1;", StatementType.EXECUTABLE, 0, 0), + ]; + + const blocks = groupStatementsIntoBlocks(statements); + + assert.strictEqual(blocks.length, 1); + assert.strictEqual(blocks[0].statements.length, 1); + assert.strictEqual(blocks[0].hasConfigStatements, false); + assert.strictEqual(blocks[0].index, 0); + }); + + it("should group SET with next executable statement", () => { + const statements: ParsedStatement[] = [ + createStatement("SET 'key' = 'value';", StatementType.SET, 0, 0), + createStatement("SELECT * FROM table1;", StatementType.EXECUTABLE, 1, 1), + ]; + + const blocks = groupStatementsIntoBlocks(statements); + + assert.strictEqual(blocks.length, 1); + assert.strictEqual(blocks[0].statements.length, 2); + assert.strictEqual(blocks[0].hasConfigStatements, true); + assert.strictEqual(blocks[0].statements[0].type, StatementType.SET); + assert.strictEqual(blocks[0].statements[1].type, StatementType.EXECUTABLE); + }); + + it("should group USE with next executable statement", () => { + const statements: ParsedStatement[] = [ + createStatement("USE CATALOG my_catalog;", StatementType.USE, 0, 0), + createStatement("INSERT INTO t2 SELECT * FROM t1;", StatementType.EXECUTABLE, 1, 1), + ]; + + const blocks = groupStatementsIntoBlocks(statements); + + assert.strictEqual(blocks.length, 1); + assert.strictEqual(blocks[0].statements.length, 2); + assert.strictEqual(blocks[0].hasConfigStatements, true); + }); + + it("should group multiple SET/USE with next executable", () => { + const statements: ParsedStatement[] = [ + createStatement("SET 'key1' = 'value1';", StatementType.SET, 0, 0), + createStatement("SET 'key2' = 'value2';", StatementType.SET, 1, 1), + createStatement("USE CATALOG my_catalog;", StatementType.USE, 2, 2), + createStatement("SELECT * FROM table1;", StatementType.EXECUTABLE, 3, 3), + ]; + + const blocks = groupStatementsIntoBlocks(statements); + + assert.strictEqual(blocks.length, 1); + assert.strictEqual(blocks[0].statements.length, 4); + assert.strictEqual(blocks[0].hasConfigStatements, true); + assert.strictEqual(blocks[0].text.includes("SET 'key1'"), true); + assert.strictEqual(blocks[0].text.includes("SET 'key2'"), true); + assert.strictEqual(blocks[0].text.includes("USE CATALOG"), true); + assert.strictEqual(blocks[0].text.includes("SELECT"), true); + }); + + it("should create separate blocks for independent statements", () => { + const statements: ParsedStatement[] = [ + createStatement("SELECT * FROM table1;", StatementType.EXECUTABLE, 0, 0), + createStatement("INSERT INTO t2 SELECT * FROM t1;", StatementType.EXECUTABLE, 2, 2), + createStatement("CREATE TABLE t3 (id INT);", StatementType.EXECUTABLE, 4, 4), + ]; + + const blocks = groupStatementsIntoBlocks(statements); + + assert.strictEqual(blocks.length, 3); + assert.strictEqual(blocks[0].index, 0); + assert.strictEqual(blocks[1].index, 1); + assert.strictEqual(blocks[2].index, 2); + assert.strictEqual(blocks[0].hasConfigStatements, false); + assert.strictEqual(blocks[1].hasConfigStatements, false); + assert.strictEqual(blocks[2].hasConfigStatements, false); + }); + + it("should skip orphaned SET/USE at end of document", () => { + const statements: ParsedStatement[] = [ + createStatement("SELECT * FROM table1;", StatementType.EXECUTABLE, 0, 0), + createStatement("SET 'key' = 'value';", StatementType.SET, 2, 2), + createStatement("USE CATALOG my_catalog;", StatementType.USE, 3, 3), + ]; + + const blocks = groupStatementsIntoBlocks(statements); + + assert.strictEqual(blocks.length, 1); + assert.strictEqual(blocks[0].statements.length, 1); + assert.strictEqual(blocks[0].statements[0].type, StatementType.EXECUTABLE); + }); + + it("should handle mixed configuration and executable statements", () => { + const statements: ParsedStatement[] = [ + createStatement("SET 'key1' = 'value1';", StatementType.SET, 0, 0), + createStatement("SELECT * FROM table1;", StatementType.EXECUTABLE, 1, 1), + createStatement("SET 'key2' = 'value2';", StatementType.SET, 3, 3), + createStatement("USE CATALOG catalog2;", StatementType.USE, 4, 4), + createStatement("INSERT INTO t2 SELECT * FROM t1;", StatementType.EXECUTABLE, 5, 5), + createStatement("DROP TABLE t3;", StatementType.EXECUTABLE, 7, 7), + ]; + + const blocks = groupStatementsIntoBlocks(statements); + + assert.strictEqual(blocks.length, 3); + + // First block: SET + SELECT + assert.strictEqual(blocks[0].statements.length, 2); + assert.strictEqual(blocks[0].hasConfigStatements, true); + + // Second block: SET + USE + INSERT + assert.strictEqual(blocks[1].statements.length, 3); + assert.strictEqual(blocks[1].hasConfigStatements, true); + + // Third block: DROP (standalone) + assert.strictEqual(blocks[2].statements.length, 1); + assert.strictEqual(blocks[2].hasConfigStatements, false); + }); + + it("should handle empty statements array", () => { + const blocks = groupStatementsIntoBlocks([]); + assert.strictEqual(blocks.length, 0); + }); + + it("should handle only SET/USE statements (all orphaned)", () => { + const statements: ParsedStatement[] = [ + createStatement("SET 'key1' = 'value1';", StatementType.SET, 0, 0), + createStatement("USE CATALOG my_catalog;", StatementType.USE, 1, 1), + createStatement("SET 'key2' = 'value2';", StatementType.SET, 2, 2), + ]; + + const blocks = groupStatementsIntoBlocks(statements); + + assert.strictEqual(blocks.length, 0); + }); + }); + + describe("getBlockAtLine()", () => { + function createBlock(startLine: number, endLine: number, index: number): ExecutableBlock { + const statement: ParsedStatement = { + text: "SELECT * FROM table1;", + type: StatementType.EXECUTABLE, + range: new Range(new Position(startLine, 0), new Position(endLine, 20)), + }; + return { + statements: [statement], + range: statement.range, + text: statement.text, + hasConfigStatements: false, + index, + }; + } + + it("should find block containing given line", () => { + const blocks: ExecutableBlock[] = [ + createBlock(0, 2, 0), + createBlock(5, 7, 1), + createBlock(10, 12, 2), + ]; + + const block1 = getBlockAtLine(blocks, 1); + assert.strictEqual(block1?.index, 0); + + const block2 = getBlockAtLine(blocks, 6); + assert.strictEqual(block2?.index, 1); + + const block3 = getBlockAtLine(blocks, 11); + assert.strictEqual(block3?.index, 2); + }); + + it("should return undefined for line outside any block", () => { + const blocks: ExecutableBlock[] = [createBlock(0, 2, 0), createBlock(5, 7, 1)]; + + const result = getBlockAtLine(blocks, 4); + assert.strictEqual(result, undefined); + }); + + it("should handle line at block boundary", () => { + const blocks: ExecutableBlock[] = [createBlock(0, 5, 0), createBlock(6, 10, 1)]; + + const blockStart = getBlockAtLine(blocks, 0); + assert.strictEqual(blockStart?.index, 0); + + const blockEnd = getBlockAtLine(blocks, 5); + assert.strictEqual(blockEnd?.index, 0); + + const nextBlockStart = getBlockAtLine(blocks, 6); + assert.strictEqual(nextBlockStart?.index, 1); + }); + + it("should return undefined for empty blocks array", () => { + const result = getBlockAtLine([], 5); + assert.strictEqual(result, undefined); + }); + }); + + describe("parseFlinkSqlDocument()", () => { + let getEditorOrFileContentsStub: sinon.SinonStub; + const fakeFileUri = Uri.file("/path/to/fake.flinksql"); + + beforeEach(() => { + getEditorOrFileContentsStub = sandbox.stub(fileUtils, "getEditorOrFileContents"); + }); + + it("should return empty array for empty document", async () => { + getEditorOrFileContentsStub.resolves({ content: "" }); + + const blocks = await parseFlinkSqlDocument(fakeFileUri); + + assert.strictEqual(blocks.length, 0); + }); + + it("should return empty array for whitespace-only document", async () => { + getEditorOrFileContentsStub.resolves({ content: " \n\n \t \n" }); + + const blocks = await parseFlinkSqlDocument(fakeFileUri); + + assert.strictEqual(blocks.length, 0); + }); + + it("should parse single SELECT statement", async () => { + getEditorOrFileContentsStub.resolves({ + content: "SELECT * FROM table1;", + }); + + const blocks = await parseFlinkSqlDocument(fakeFileUri); + + assert.strictEqual(blocks.length, 1); + assert.strictEqual(blocks[0].statements.length, 1); + assert.strictEqual(blocks[0].statements[0].type, StatementType.EXECUTABLE); + assert.strictEqual(blocks[0].hasConfigStatements, false); + }); + + it("should parse and group SET with SELECT", async () => { + getEditorOrFileContentsStub.resolves({ + content: `SET 'sql-client.execution.mode' = 'table'; +SELECT * FROM table1;`, + }); + + const blocks = await parseFlinkSqlDocument(fakeFileUri); + + assert.strictEqual(blocks.length, 1); + assert.strictEqual(blocks[0].statements.length, 2); + assert.strictEqual(blocks[0].statements[0].type, StatementType.SET); + assert.strictEqual(blocks[0].statements[1].type, StatementType.EXECUTABLE); + assert.strictEqual(blocks[0].hasConfigStatements, true); + }); + + it("should parse multiple independent statements", async () => { + getEditorOrFileContentsStub.resolves({ + content: `SELECT * FROM table1; + +INSERT INTO table2 SELECT * FROM table1; + +SELECT COUNT(*) FROM table3;`, + }); + + const blocks = await parseFlinkSqlDocument(fakeFileUri); + + assert.strictEqual(blocks.length, 3); + assert.strictEqual(blocks[0].index, 0); + assert.strictEqual(blocks[1].index, 1); + assert.strictEqual(blocks[2].index, 2); + }); + + it("should handle document with only comments", async () => { + getEditorOrFileContentsStub.resolves({ + content: `-- This is a comment +/* This is a + multi-line comment */`, + }); + + const blocks = await parseFlinkSqlDocument(fakeFileUri); + + // Parser should return empty array for comment-only document + assert.strictEqual(blocks.length, 0, JSON.stringify(blocks, null, 2)); + }); + + it("should skip orphaned SET/USE statements", async () => { + getEditorOrFileContentsStub.resolves({ + content: `SELECT * FROM table1; +SET 'key' = 'value'; +USE CATALOG my_catalog;`, + }); + + const blocks = await parseFlinkSqlDocument(fakeFileUri); + + // Should only have one block (the SELECT), orphaned SET/USE are skipped + assert.strictEqual(blocks.length, 1); + assert.strictEqual(blocks[0].statements.length, 1); + assert.strictEqual(blocks[0].statements[0].type, StatementType.EXECUTABLE); + }); + + it("should handle multi-line statements", async () => { + getEditorOrFileContentsStub.resolves({ + content: `SELECT + id, + name, + email +FROM users +WHERE active = true;`, + }); + + const blocks = await parseFlinkSqlDocument(fakeFileUri); + + assert.strictEqual(blocks.length, 1); + assert.strictEqual(blocks[0].statements[0].range.start.line, 0); + assert.ok(blocks[0].statements[0].range.end.line > 0); + }); + + it("should return empty array when getEditorOrFileContents throws", async () => { + getEditorOrFileContentsStub.rejects(new Error("File not found")); + + const blocks = await parseFlinkSqlDocument(fakeFileUri); + + assert.strictEqual(blocks.length, 0); + }); + + // --- edge cases: semicolons and special characters inside strings/comments --- + + it("should not split on semicolons inside string literals", async () => { + getEditorOrFileContentsStub.resolves({ + content: "SELECT * FROM t WHERE name = 'foo;bar';", + }); + + const blocks = await parseFlinkSqlDocument(fakeFileUri); + + assert.strictEqual(blocks.length, 1); + assert.ok(blocks[0].text.includes("'foo;bar'")); + }); + + it("should not treat -- inside strings as comments", async () => { + getEditorOrFileContentsStub.resolves({ + content: "SELECT * FROM t WHERE name = '--not-a-comment';\nSELECT 1;", + }); + + const blocks = await parseFlinkSqlDocument(fakeFileUri); + + assert.strictEqual(blocks.length, 2); + assert.ok(blocks[0].text.includes("'--not-a-comment'")); + }); + + it("should not treat apostrophes inside single-line comments as string delimiters", async () => { + getEditorOrFileContentsStub.resolves({ + content: "-- Here's a comment\nSELECT 1;\nSELECT 2;", + }); + + const blocks = await parseFlinkSqlDocument(fakeFileUri); + + assert.strictEqual(blocks.length, 2); + }); + + it("should not treat apostrophes inside multi-line comments as string delimiters", async () => { + getEditorOrFileContentsStub.resolves({ + content: "/* it's a multi-line\n comment */\nSELECT 1;\nSELECT 2;", + }); + + const blocks = await parseFlinkSqlDocument(fakeFileUri); + + assert.strictEqual(blocks.length, 2); + }); + + it("should ignore semicolons inside single-line comments", async () => { + getEditorOrFileContentsStub.resolves({ + content: "SELECT * FROM t1; -- comment with ; semicolon\nSELECT * FROM t2;", + }); + + const blocks = await parseFlinkSqlDocument(fakeFileUri); + + assert.strictEqual(blocks.length, 2); + }); + + it("should ignore semicolons inside multi-line comments", async () => { + getEditorOrFileContentsStub.resolves({ + content: + "SELECT * FROM t1;\n/* comment spanning\n multiple lines; with semicolons */\nSELECT * FROM t2;", + }); + + const blocks = await parseFlinkSqlDocument(fakeFileUri); + + assert.strictEqual(blocks.length, 2); + }); + + it("should handle escaped quotes in strings", async () => { + getEditorOrFileContentsStub.resolves({ + content: "SELECT * FROM t WHERE name = 'it''s a value';\nSELECT 1;", + }); + + const blocks = await parseFlinkSqlDocument(fakeFileUri); + + assert.strictEqual(blocks.length, 2); + assert.ok(blocks[0].text.includes("it''s a value")); + }); + + it("should handle backtick identifiers containing apostrophes", async () => { + getEditorOrFileContentsStub.resolves({ + content: "SELECT * FROM `table's`;\nSELECT 1;", + }); + + const blocks = await parseFlinkSqlDocument(fakeFileUri); + + assert.strictEqual(blocks.length, 2); + }); + + it("should handle backtick identifiers containing -- comment markers", async () => { + getEditorOrFileContentsStub.resolves({ + content: "SELECT `col--name` FROM t;\nSELECT 1;", + }); + + const blocks = await parseFlinkSqlDocument(fakeFileUri); + + assert.strictEqual(blocks.length, 2); + }); + + it("should handle unclosed string gracefully", async () => { + getEditorOrFileContentsStub.resolves({ + content: "SELECT 'unclosed string FROM table1;", + }); + + const blocks = await parseFlinkSqlDocument(fakeFileUri); + + // unclosed string regex fails to match, so the ' is treated as code and the + // semicolon is found normally. the language server handles the syntax error. + assert.ok(blocks.length >= 0); + }); + + it("should handle unclosed multi-line comment gracefully", async () => { + getEditorOrFileContentsStub.resolves({ + content: "SELECT * FROM t1; /* unclosed comment\nSELECT * FROM t2;", + }); + + const blocks = await parseFlinkSqlDocument(fakeFileUri); + + // unclosed comment regex fails to match, so /* is treated as code and both + // semicolons are found. the language server handles the syntax error. + assert.ok(blocks.length >= 1); + }); + + it("should handle real-world Flink SQL with backticks and advanced syntax", async () => { + getEditorOrFileContentsStub.resolves({ + content: `SELECT + played_at, + \`track\`.name AS track_name, + \`track\`.album.name AS album_name, + \`track\`.artists[1].name AS artist_name, + genres +FROM \`realworld-data-env\`.\`realworld-data-cluster\`.\`spotify-listening-data\` +WHERE played_at IS NOT NULL +ORDER BY played_at DESC +LIMIT 10;`, + }); + + const blocks = await parseFlinkSqlDocument(fakeFileUri); + + assert.strictEqual(blocks.length, 1); + assert.ok(blocks[0].text.includes("artists[1]")); + assert.ok(blocks[0].text.includes("LIMIT 10")); + }); + + it("should handle mixed strings, comments, and code together", async () => { + getEditorOrFileContentsStub.resolves({ + content: "SELECT 'string; with; text' AS col1 FROM t1; -- comment; here\nSELECT * FROM t2;", + }); + + const blocks = await parseFlinkSqlDocument(fakeFileUri); + + assert.strictEqual(blocks.length, 2); + }); + }); + + describe("classifyStatementType() edge cases", () => { + it("should classify correctly when comments contain apostrophes", () => { + assert.strictEqual( + classifyStatementType("-- Here's a note\nSELECT * FROM t;"), + StatementType.EXECUTABLE, + ); + }); + + it("should not misclassify string content as SET keyword", () => { + assert.strictEqual( + classifyStatementType("SELECT 'SET value' FROM t;"), + StatementType.EXECUTABLE, + ); + }); + + it("should not misclassify string content as USE keyword", () => { + assert.strictEqual( + classifyStatementType("SELECT 'USE CATALOG foo' FROM t;"), + StatementType.EXECUTABLE, + ); + }); + }); +}); diff --git a/src/documentParsing/flinkSql.ts b/src/documentParsing/flinkSql.ts new file mode 100644 index 0000000000..cb3d5bcf89 --- /dev/null +++ b/src/documentParsing/flinkSql.ts @@ -0,0 +1,346 @@ +import type { Uri } from "vscode"; +import { Position, Range } from "vscode"; +import { Logger } from "../logging"; +import { getEditorOrFileContents } from "../utils/file"; + +const logger = new Logger("documentParsing.flinkSql"); + +// combined token pattern for Flink SQL lexical elements that can contain semicolons. +// alternation order provides precedence: backtick identifiers > strings > multi-line comments > +// single-line comments. each alternative consumes its full match, so special characters inside one +// construct (e.g. `--` inside a string) are never misidentified by a later alternative. +// +// references: +// - backtick identifiers: `identifier` with `` for escaping +// - single-quoted strings: 'text' with '' for escaping +// - multi-line comments: /* ... */ +// - single-line comments: -- to end of line +// +// see https://docs.confluent.io/cloud/current/flink/reference/sql-syntax.html +const NON_CODE_TOKEN_PATTERN = /`(?:``|[^`])*`|'(?:''|[^'])*'|\/\*[\s\S]*?\*\/|--[^\n]*/; + +/** Type of SQL statement based on execution semantics */ +export enum StatementType { + /** {@see https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/set/} */ + SET = "SET", + /** {@see https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/use/ */ + USE = "USE", + /** Any executable statement (SELECT, INSERT, CREATE, etc.) */ + EXECUTABLE = "EXECUTABLE", +} + +/** Represents a single parsed SQL statement with its document range */ +export interface ParsedStatement { + text: string; + range: Range; + type: StatementType; +} + +/** Represents an executable block (one or more statements that can be submitted together). */ +export interface ExecutableBlock { + statements: ParsedStatement[]; + range: Range; + text: string; + /** Whether this block includes SET/USE configuration statements */ + hasConfigStatements: boolean; + /** Index of this block in the document (0-based) */ + index: number; +} + +/** + * Parse a Flink SQL document and return {@link ExecutableBlock executable blocks}, associating any + * SET/USE statements with the subsequent executable portion of the statement document. + * + * NOTE: This uses semicolon-based statement splitting with proper handling of strings and comments. + * + * @param documentUri - URI of the document to parse + */ +export async function parseFlinkSqlDocument(documentUri: Uri): Promise { + try { + const { content } = await getEditorOrFileContents(documentUri); + if (!content || content.trim().length === 0) { + logger.debug("document is empty, returning no blocks"); + return []; + } + + logger.debug(`parsing document with ${content.length} characters`, { + uri: documentUri.toString(), + preview: content.substring(0, 100), + }); + + const statements: ParsedStatement[] = splitIntoStatements(content); + if (statements.length === 0) { + logger.debug("no statements found in document"); + return []; + } + logger.debug(`parsed ${statements.length} statement(s) from document`); + + const blocks = groupStatementsIntoBlocks(statements); + logger.debug(`grouped statements into ${blocks.length} executable blocks`); + return blocks; + } catch (error) { + logger.error("Error parsing Flink SQL document", error); + return []; + } +} + +/** + * Find all semicolon character positions that are not inside strings, comments, or backtick + * identifiers. Uses a single combined regex where alternation order provides correct precedence: + * earlier alternatives consume their matches before later ones can misidentify special characters. + * + * @param content - The SQL document content + * @returns Array of semicolon character positions in the original content + */ +function findValidSemicolons(content: string): number[] { + // append `|;` to the shared non-code pattern so semicolons are captured as the final alternative + const tokenRegex = new RegExp(`${NON_CODE_TOKEN_PATTERN.source}|;`, "g"); + const semicolons: number[] = []; + let match: RegExpExecArray | null; + while ((match = tokenRegex.exec(content)) !== null) { + if (match[0] === ";") { + semicolons.push(match.index); + } + } + return semicolons; +} + +/** + * Convert character position to line and column (0-based). + * + * @param content - The full document content + * @param position - Character position in the content + * @returns VS Code Position object + */ +function positionAt(content: string, position: number): Position { + const textUpToPosition = content.substring(0, position); + const lines = textUpToPosition.split("\n"); + const line = lines.length - 1; + const column = lines[line].length; + return new Position(line, column); +} + +/** Remove all comments, strings, and backtick identifiers from text, returning only SQL keywords. */ +function stripNonCodeTokens(text: string): string { + return text.replace(new RegExp(NON_CODE_TOKEN_PATTERN.source, "g"), "").trim(); +} + +/** + * Create a {@link ParsedStatement} from a text range if it contains executable SQL code. + * + * @param content - Full document content + * @param startPos - Start character position + * @param endPos - End character position + */ +function createStatementIfValid( + content: string, + startPos: number, + endPos: number, +): ParsedStatement | null { + const statementText: string = content.substring(startPos, endPos).trim(); + if (!statementText.length) { + return null; + } + + // confirm executable code exists (not just comments/whitespace) + const codeOnly: string = stripNonCodeTokens(statementText); + if (!codeOnly.length) { + return null; + } + + // find executable code start position, taking into account possible leading whitespace + const leadingWhitespace = content.substring(startPos, endPos).match(/^\s*/); + const actualStart = startPos + (leadingWhitespace ? leadingWhitespace[0].length : 0); + + return { + text: statementText, + type: classifyStatementType(statementText), + range: new Range(positionAt(content, actualStart), positionAt(content, endPos)), + }; +} + +/** + * Find the end of a trailing single-line comment after a position, if one exists. + * This ensures trailing comments stay with the statement they follow. + * + * @param content - The SQL document content + * @param position - Starting position (typically semicolon + 1) + * @returns Position after the trailing comment, or the original position if no comment found + */ +function findTrailingCommentEnd(content: string, position: number): number { + // Check if there's a single-line comment starting after any whitespace + const remainingText = content.substring(position); + const trailingCommentMatch = remainingText.match(/^[ \t]*--[^\n]*/); + + if (trailingCommentMatch) { + return position + trailingCommentMatch[0].length; + } + + return position; +} + +/** + * Extract statements by splitting on semicolons (excluding comments/strings). + * Includes trailing single-line comments with the statement they follow. + * + * @param content - Original SQL document content + * @param semicolonPositions - Positions of valid semicolons + * @returns Array of parsed statements with ranges + */ +function extractStatements(content: string, semicolonPositions: number[]): ParsedStatement[] { + const statements: ParsedStatement[] = []; + + let statementStart = 0; + + for (const semicolonPos of semicolonPositions) { + // Include any trailing single-line comment on the same line as the semicolon + const statementEnd = findTrailingCommentEnd(content, semicolonPos + 1); + + const statement: ParsedStatement | null = createStatementIfValid( + content, + statementStart, + statementEnd, + ); + if (statement) { + statements.push(statement); + } + statementStart = statementEnd; + } + + // orphaned content after last semicolon + if (statementStart < content.length) { + const statement: ParsedStatement | null = createStatementIfValid( + content, + statementStart, + content.length, + ); + if (statement) { + statements.push(statement); + } + } + + return statements; +} + +/** + * Split SQL content into individual statements using regex-based approach. + * Properly handles semicolons inside strings and comments by ignoring them. + * + * @param content - The SQL document content + * @returns Array of {@link ParsedStatement} objects + */ +function splitIntoStatements(content: string): ParsedStatement[] { + const semicolonPositions: number[] = findValidSemicolons(content); + return extractStatements(content, semicolonPositions); +} + +/** + * Group individual statements into {@link ExecutableBlock executable blocks} based on statement + * types: + * - SET/USE statements are attached to the next EXECUTABLE statement. + * - Orphaned SET/USE statements (with no following executable) are skipped. + * + * @param statements - Array of parsed statements + * @returns Array of executable blocks + */ +export function groupStatementsIntoBlocks(statements: ParsedStatement[]): ExecutableBlock[] { + const blocks: ExecutableBlock[] = []; + let partialStatementBlocks: ParsedStatement[] = []; + let blockIndex = 0; + + for (const statement of statements) { + if (statement.type === StatementType.SET || statement.type === StatementType.USE) { + // gather SET/USE statements until they can be paired with an executable statement + partialStatementBlocks.push(statement); + } else { + const blockStatements = [...partialStatementBlocks, statement]; + const block = createExecutableBlock(blockStatements, blockIndex); + blocks.push(block); + blockIndex++; + // reset partials since they've been grouped with an executable statement + partialStatementBlocks = []; + } + } + + if (partialStatementBlocks.length) { + logger.debug(`skipping ${partialStatementBlocks.length} partial statement configs`); + } + return blocks; +} + +/** + * Create an {@link ExecutableBlock} from an array of {@link ParsedStatement statements}. + * + * @param statements - Array of statements to include in the block + * @param index - Index of this block in the document + * @returns ExecutableBlock + */ +function createExecutableBlock(statements: ParsedStatement[], index: number): ExecutableBlock { + // Determine if this block has configuration statements + const hasConfigStatements = statements.some( + (s) => s.type === StatementType.SET || s.type === StatementType.USE, + ); + + // Calculate the combined range (from first statement start to last statement end) + const firstStatement = statements[0]; + const lastStatement = statements[statements.length - 1]; + const combinedRange = new Range(firstStatement.range.start, lastStatement.range.end); + + // Combine statement texts + const combinedText = statements.map((s) => s.text).join("\n"); + + return { + statements, + range: combinedRange, + text: combinedText, + hasConfigStatements, + index, + }; +} + +/** + * Determine the statement type from SQL text. + * Uses simple pattern matching on the first keyword, ignoring leading comments. + * + * @param statementText - The SQL statement text + * @returns Classification of the statement type + */ +export function classifyStatementType(statementText: string): StatementType { + // Strip comments to find the actual SQL keyword + // (statements can have leading comments like: -- comment\nSET 'key' = 'value';) + const withoutComments = stripNonCodeTokens(statementText); + const trimmed = withoutComments.trim().toUpperCase(); + + // Check for SET statement + if (trimmed.startsWith("SET ") || trimmed === "SET") { + return StatementType.SET; + } + + // Check for USE statement (catalog, database, or modules) + if ( + trimmed.startsWith("USE ") || + trimmed.startsWith("USE CATALOG ") || + trimmed.startsWith("USE MODULES ") || + trimmed === "USE" + ) { + return StatementType.USE; + } + + // Everything else is executable + return StatementType.EXECUTABLE; +} + +/** + * Find the executable block that contains the given line number. + * Useful for determining which block to submit when user clicks a codelens. + * + * @param blocks - Array of executable blocks + * @param line - 0-based line number + * @returns The block containing that line, or undefined + */ +export function getBlockAtLine( + blocks: ExecutableBlock[], + line: number, +): ExecutableBlock | undefined { + return blocks.find((block) => line >= block.range.start.line && line <= block.range.end.line); +} From 372140a3e793584a9195941f682d0c8b004d5e4d Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Fri, 27 Feb 2026 23:34:11 -0500 Subject: [PATCH 2/3] parse document to render block-level codelenses --- src/codelens/flinkSqlProvider.test.ts | 180 +++++++++++++++++++++++++- src/codelens/flinkSqlProvider.ts | 172 ++++++++++++++++++++++-- 2 files changed, 339 insertions(+), 13 deletions(-) diff --git a/src/codelens/flinkSqlProvider.test.ts b/src/codelens/flinkSqlProvider.test.ts index 0a8aac4bf3..9f019ce3cd 100644 --- a/src/codelens/flinkSqlProvider.test.ts +++ b/src/codelens/flinkSqlProvider.test.ts @@ -10,6 +10,9 @@ import { StubbedWorkspaceConfiguration } from "../../tests/stubs/workspaceConfig import { TEST_CCLOUD_ENVIRONMENT, TEST_CCLOUD_KAFKA_CLUSTER } from "../../tests/unit/testResources"; import { TEST_CCLOUD_FLINK_COMPUTE_POOL } from "../../tests/unit/testResources/flinkComputePool"; import { TEST_CCLOUD_ORGANIZATION } from "../../tests/unit/testResources/organization"; +import type { ExecutableBlock } from "../documentParsing/flinkSql"; +import * as flinkSqlParser from "../documentParsing/flinkSql"; +import { StatementType } from "../documentParsing/flinkSql"; import { FLINK_CONFIG_COMPUTE_POOL, FLINK_CONFIG_DATABASE } from "../extensionSettings/constants"; import type { CCloudResourceLoader } from "../loaders"; import { CCloudEnvironment } from "../models/environment"; @@ -63,6 +66,7 @@ describe("codelens/flinkSqlProvider.ts", () => { describe("FlinkSqlCodelensProvider", () => { let provider: FlinkSqlCodelensProvider; let resourceManagerStub: sinon.SinonStubbedInstance; + let parseFlinkSqlDocumentStub: sinon.SinonStub; // NOTE: setting up fake TextDocuments is tricky since we can't create them directly, so we're // only populating the fields needed for the test and associated codebase logic, then using the @@ -74,6 +78,11 @@ describe("codelens/flinkSqlProvider.ts", () => { await getResourceManager().deleteAllUriMetadata(); resourceManagerStub = getStubbedResourceManager(sandbox); + // Stub the parser to return empty array by default (fallback behavior) + parseFlinkSqlDocumentStub = sandbox + .stub(flinkSqlParser, "parseFlinkSqlDocument") + .resolves([]); + provider = FlinkSqlCodelensProvider.getInstance(); }); @@ -103,7 +112,8 @@ describe("codelens/flinkSqlProvider.ts", () => { it("setEventListeners() should return the expected number of listeners", () => { const listeners = provider["setEventListeners"](); - assert.strictEqual(listeners.length, handlerEmitterPairs.length); + // We now have 5 listeners: 2 for emitters + 3 for VS Code events (selection, active editor, document changes) + assert.strictEqual(listeners.length, 5); }); handlerEmitterPairs.forEach(([emitterName, handlerMethodName]) => { @@ -300,7 +310,12 @@ describe("codelens/flinkSqlProvider.ts", () => { assert.strictEqual(submitLens.command?.command, "confluent.statements.create"); assert.strictEqual(submitLens.command?.title, "▶️ Submit Statement"); - assert.deepStrictEqual(submitLens.command?.arguments, [fakeDocument.uri, pool, database]); + // When parser returns empty array (fallback), submit command should have 4 args with undefined block + assert.strictEqual(submitLens.command?.arguments?.length, 4); + assert.deepStrictEqual(submitLens.command?.arguments?.[0], fakeDocument.uri); + assert.deepStrictEqual(submitLens.command?.arguments?.[1], pool); + assert.deepStrictEqual(submitLens.command?.arguments?.[2], database); + assert.strictEqual(submitLens.command?.arguments?.[3], undefined); // block is undefined in fallback assert.strictEqual(dbLens.command?.command, "confluent.document.flinksql.setCCloudDatabase"); assert.strictEqual( @@ -323,6 +338,167 @@ describe("codelens/flinkSqlProvider.ts", () => { assert.strictEqual(resetLens.command?.title, "Clear Settings"); assert.deepStrictEqual(resetLens.command?.arguments, [fakeDocument.uri]); }); + + it("should create codelenses for each parsed executable block", async () => { + const pool: CCloudFlinkComputePool = TEST_CCLOUD_FLINK_COMPUTE_POOL; + const database: CCloudKafkaCluster = TEST_CCLOUD_KAFKA_CLUSTER; + + // Create two executable blocks + const block1: ExecutableBlock = { + statements: [ + { + text: "SELECT * FROM table1;", + range: new Range(0, 0, 0, 22), + type: StatementType.EXECUTABLE, + }, + ], + range: new Range(0, 0, 0, 22), + text: "SELECT * FROM table1;", + hasConfigStatements: false, + index: 0, + }; + + const block2: ExecutableBlock = { + statements: [ + { + text: "INSERT INTO table2 SELECT * FROM table1;", + range: new Range(2, 0, 2, 40), + type: StatementType.EXECUTABLE, + }, + ], + range: new Range(2, 0, 2, 40), + text: "INSERT INTO table2 SELECT * FROM table1;", + hasConfigStatements: false, + index: 1, + }; + + // Stub parser to return two blocks + parseFlinkSqlDocumentStub.resolves([block1, block2]); + + // simulate stored compute pool + database metadata + resourceManagerStub.getUriMetadata.resolves({ + [UriMetadataKeys.FLINK_COMPUTE_POOL_ID]: pool.id, + [UriMetadataKeys.FLINK_CATALOG_ID]: TEST_CCLOUD_ENVIRONMENT.id, + [UriMetadataKeys.FLINK_CATALOG_NAME]: TEST_CCLOUD_ENVIRONMENT.name, + [UriMetadataKeys.FLINK_DATABASE_ID]: database.id, + [UriMetadataKeys.FLINK_DATABASE_NAME]: database.name, + }); + ccloudLoaderStub.getEnvironments.resolves([testEnvWithPoolAndCluster]); + + const codeLenses: CodeLens[] = await provider.provideCodeLenses(fakeDocument); + + // Should have 4 codelenses per block (Submit, Set Pool, Set DB, Clear) = 8 total + assert.strictEqual(codeLenses.length, 8); + + // Verify first block's codelenses (at line 0) + const block1Lenses = codeLenses.slice(0, 4); + assert.strictEqual(block1Lenses[0].command?.command, "confluent.statements.create"); + assert.strictEqual(block1Lenses[0].command?.title, "▶️ Submit Statement"); + assert.strictEqual(block1Lenses[0].command?.arguments?.length, 4); + assert.deepStrictEqual(block1Lenses[0].command?.arguments?.[3], block1); // 4th arg is the block + + // Verify second block's codelenses (at line 2) + const block2Lenses = codeLenses.slice(4, 8); + assert.strictEqual(block2Lenses[0].command?.command, "confluent.statements.create"); + assert.strictEqual(block2Lenses[0].command?.title, "▶️ Submit Statement"); + assert.strictEqual(block2Lenses[0].command?.arguments?.length, 4); + assert.deepStrictEqual(block2Lenses[0].command?.arguments?.[3], block2); // 4th arg is the block + }); + + it("should create codelenses for blocks with SET/USE statements", async () => { + const pool: CCloudFlinkComputePool = TEST_CCLOUD_FLINK_COMPUTE_POOL; + const database: CCloudKafkaCluster = TEST_CCLOUD_KAFKA_CLUSTER; + + // Create a block with SET + USE + SELECT + const blockWithConfig: ExecutableBlock = { + statements: [ + { + text: "SET 'key' = 'value';", + range: new Range(0, 0, 0, 20), + type: StatementType.SET, + }, + { + text: "USE CATALOG my_catalog;", + range: new Range(1, 0, 1, 23), + type: StatementType.USE, + }, + { + text: "SELECT * FROM table1;", + range: new Range(2, 0, 2, 21), + type: StatementType.EXECUTABLE, + }, + ], + range: new Range(0, 0, 2, 21), + text: "SET 'key' = 'value';\nUSE CATALOG my_catalog;\nSELECT * FROM table1;", + hasConfigStatements: true, + index: 0, + }; + + // Stub parser to return one block with config statements + parseFlinkSqlDocumentStub.resolves([blockWithConfig]); + + // simulate stored compute pool + database metadata + resourceManagerStub.getUriMetadata.resolves({ + [UriMetadataKeys.FLINK_COMPUTE_POOL_ID]: pool.id, + [UriMetadataKeys.FLINK_CATALOG_ID]: TEST_CCLOUD_ENVIRONMENT.id, + [UriMetadataKeys.FLINK_CATALOG_NAME]: TEST_CCLOUD_ENVIRONMENT.name, + [UriMetadataKeys.FLINK_DATABASE_ID]: database.id, + [UriMetadataKeys.FLINK_DATABASE_NAME]: database.name, + }); + ccloudLoaderStub.getEnvironments.resolves([testEnvWithPoolAndCluster]); + + const codeLenses: CodeLens[] = await provider.provideCodeLenses(fakeDocument); + + // Should have 4 codelenses (Submit, Set Pool, Set DB, Clear) + assert.strictEqual(codeLenses.length, 4); + + // Verify submit command receives the full block (including SET/USE) + const submitLens = codeLenses[0]; + assert.strictEqual(submitLens.command?.command, "confluent.statements.create"); + assert.strictEqual(submitLens.command?.arguments?.length, 4); + const passedBlock = submitLens.command?.arguments?.[3] as ExecutableBlock; + assert.ok(passedBlock); + assert.strictEqual(passedBlock.hasConfigStatements, true); + assert.strictEqual(passedBlock.statements.length, 3); + assert.ok(passedBlock.text.includes("SET 'key' = 'value'")); + assert.ok(passedBlock.text.includes("USE CATALOG my_catalog")); + assert.ok(passedBlock.text.includes("SELECT * FROM table1")); + }); + + it("should show fallback codelenses at line 0 when parser returns empty array", async () => { + const pool: CCloudFlinkComputePool = TEST_CCLOUD_FLINK_COMPUTE_POOL; + const database: CCloudKafkaCluster = TEST_CCLOUD_KAFKA_CLUSTER; + + // Parser already stubbed to return empty array in beforeEach + parseFlinkSqlDocumentStub.resolves([]); + + // simulate stored compute pool + database metadata + resourceManagerStub.getUriMetadata.resolves({ + [UriMetadataKeys.FLINK_COMPUTE_POOL_ID]: pool.id, + [UriMetadataKeys.FLINK_CATALOG_ID]: TEST_CCLOUD_ENVIRONMENT.id, + [UriMetadataKeys.FLINK_CATALOG_NAME]: TEST_CCLOUD_ENVIRONMENT.name, + [UriMetadataKeys.FLINK_DATABASE_ID]: database.id, + [UriMetadataKeys.FLINK_DATABASE_NAME]: database.name, + }); + ccloudLoaderStub.getEnvironments.resolves([testEnvWithPoolAndCluster]); + + const codeLenses: CodeLens[] = await provider.provideCodeLenses(fakeDocument); + + // Should have 4 codelenses at line 0 (fallback behavior) + assert.strictEqual(codeLenses.length, 4); + + // All codelenses should be at line 0 + const expectedRange = new Range(new Position(0, 0), new Position(0, 0)); + for (const lens of codeLenses) { + assert.deepStrictEqual(lens.range, expectedRange); + } + + // Submit command should have 4th argument as undefined (fallback, no block) + const submitLens = codeLenses[0]; + assert.strictEqual(submitLens.command?.command, "confluent.statements.create"); + assert.strictEqual(submitLens.command?.arguments?.length, 4); + assert.strictEqual(submitLens.command?.arguments?.[3], undefined); // block is undefined in fallback + }); }); describe("getComputePoolFromMetadata()", () => { diff --git a/src/codelens/flinkSqlProvider.ts b/src/codelens/flinkSqlProvider.ts index 94d3f2cba1..dd788dbaa1 100644 --- a/src/codelens/flinkSqlProvider.ts +++ b/src/codelens/flinkSqlProvider.ts @@ -1,7 +1,24 @@ -import type { CodeLensProvider, Command, Disposable, Event, TextDocument } from "vscode"; -import { CodeLens, EventEmitter, Position, Range } from "vscode"; +import type { + CodeLensProvider, + Command, + Disposable, + Event, + TextDocument, + TextDocumentChangeEvent, + TextEditor, + TextEditorDecorationType, + TextEditorSelectionChangeEvent, + Uri, +} from "vscode"; +import { CodeLens, EventEmitter, Position, Range, window, workspace } from "vscode"; +import { + getBlockAtLine, + parseFlinkSqlDocument, + type ExecutableBlock, +} from "../documentParsing/flinkSql"; import { ccloudConnected, uriMetadataSet } from "../emitters"; import { FLINK_CONFIG_COMPUTE_POOL, FLINK_CONFIG_DATABASE } from "../extensionSettings/constants"; +import { FLINK_SQL_LANGUAGE_ID } from "../flinkSql/constants"; import { CCloudResourceLoader } from "../loaders"; import { Logger } from "../logging"; import type { CCloudEnvironment } from "../models/environment"; @@ -20,6 +37,11 @@ export class FlinkSqlCodelensProvider extends DisposableCollection implements Co private _onDidChangeCodeLenses: EventEmitter = new EventEmitter(); readonly onDidChangeCodeLenses: Event = this._onDidChangeCodeLenses.event; + /** Decoration type for visually separating executable blocks */ + private blockDecorationType: TextEditorDecorationType; + // cache of parsed blocks per document URI + private readonly documentBlocksCache = new Map(); + private static instance: FlinkSqlCodelensProvider | null = null; static getInstance(): FlinkSqlCodelensProvider { if (!FlinkSqlCodelensProvider.instance) { @@ -31,6 +53,19 @@ export class FlinkSqlCodelensProvider extends DisposableCollection implements Co private constructor() { super(); + // subtle background coloring for the statement block, to help the user visualize what will be + // sent when they click the "Submit Statement" codelens + this.blockDecorationType = window.createTextEditorDecorationType({ + isWholeLine: true, + light: { + backgroundColor: "rgba(0, 0, 0, 0.05)", + }, + dark: { + backgroundColor: "rgba(255, 255, 255, 0.05)", + }, + }); + this.disposables.push(this.blockDecorationType); + this.disposables.push(...this.setEventListeners()); } @@ -38,6 +73,15 @@ export class FlinkSqlCodelensProvider extends DisposableCollection implements Co return [ ccloudConnected.event(this.ccloudConnectedHandler.bind(this)), uriMetadataSet.event(this.uriMetadataSetHandler.bind(this)), + window.onDidChangeTextEditorSelection(this.selectionChangeHandler.bind(this)), + window.onDidChangeActiveTextEditor(this.activeEditorChangeHandler.bind(this)), + workspace.onDidChangeTextDocument((event: TextDocumentChangeEvent) => { + if (event.document.languageId === FLINK_SQL_LANGUAGE_ID) { + // invalidate the cached parsed blocks for this document + const uriKey = event.document.uri.toString(); + this.documentBlocksCache.delete(uriKey); + } + }), ]; } @@ -59,16 +103,54 @@ export class FlinkSqlCodelensProvider extends DisposableCollection implements Co this._onDidChangeCodeLenses.fire(); } + /** Handle text editor selection changes to update block decorations based on cursor position. */ + private selectionChangeHandler(event: TextEditorSelectionChangeEvent): void { + const editor = event.textEditor; + if (editor.document.languageId !== FLINK_SQL_LANGUAGE_ID) { + return; + } + void this.updateBlockDecorations(editor); + } + + /** Handle active editor changes to update block decorations. */ + private activeEditorChangeHandler(editor: TextEditor | undefined): void { + if (!editor || editor.document.languageId !== FLINK_SQL_LANGUAGE_ID) { + return; + } + void this.updateBlockDecorations(editor); + } + + /** Update decorations to highlight the executable block containing the cursor. */ + private async updateBlockDecorations(editor: TextEditor): Promise { + const document = editor.document; + const cursorLine = editor.selection.active.line; + + // look up existing blocks from the cache, or re-parse and cache + const uriKey = document.uri.toString(); + let blocks = this.documentBlocksCache.get(uriKey); + if (!blocks) { + blocks = await parseFlinkSqlDocument(document.uri); + this.documentBlocksCache.set(uriKey, blocks); + } + + const currentBlock: ExecutableBlock | undefined = getBlockAtLine(blocks, cursorLine); + if (currentBlock) { + editor.setDecorations(this.blockDecorationType, [currentBlock.range]); + } else { + editor.setDecorations(this.blockDecorationType, []); + } + } + async provideCodeLenses(document: TextDocument): Promise { const codeLenses: CodeLens[] = []; - // show codelenses at the top of the file - const range = new Range(new Position(0, 0), new Position(0, 0)); + // fallback range at the top of the file for when parsing fails or no CCloud auth + const topRange = new Range(new Position(0, 0), new Position(0, 0)); if (!hasCCloudAuthSession()) { // show single codelens to sign in to CCloud since we need to be able to list CCloud resources // in the other codelenses (via quickpicks) below - const signInLens = new CodeLens(range, { + const signInLens = new CodeLens(topRange, { title: "Sign in to Confluent Cloud", command: "confluent.connections.ccloud.signIn", tooltip: "Sign in to Confluent Cloud", @@ -88,15 +170,83 @@ export class FlinkSqlCodelensProvider extends DisposableCollection implements Co await getComputePoolFromMetadata(uriMetadata); const { catalog, database } = await getCatalogDatabaseFromMetadata(uriMetadata, computePool); - // codelens for selecting a compute pool, which we'll use to derive the rest of the properties - // needed for various Flink operations (env ID, provider/region, etc) + // parse the document to find executable blocks + const uriKey = document.uri.toString(); + let blocks = this.documentBlocksCache.get(uriKey); + if (!blocks) { + blocks = await parseFlinkSqlDocument(document.uri); + this.documentBlocksCache.set(uriKey, blocks); + } + + // show codelenses at the top of the document as fallback + if (blocks.length === 0) { + logger.debug("No executable blocks found, showing fallback codelenses at line 0"); + return this.createCodeLensesForRange( + topRange, + document.uri, + computePool, + catalog, + database, + undefined, + ); + } + + // create codelenses for each executable block + for (const block of blocks) { + const blockCodeLenses = this.createCodeLensesForRange( + block.range, + document.uri, + computePool, + catalog, + database, + block, + ); + codeLenses.push(...blockCodeLenses); + } + + return codeLenses; + } + + /** + * Create codelenses for a given range in the document. + * @param range The range to create codelenses for (either a block range or line 0) + * @param documentUri The URI of the document + * @param computePool The compute pool, if available + * @param catalog The catalog, if available + * @param database The database, if available + * @param block The executable block, if available (undefined for fallback/line 0 codelenses) + * @returns Array of CodeLens objects for the given range + */ + private createCodeLensesForRange( + range: Range, + documentUri: Uri, + computePool: CCloudFlinkComputePool | undefined, + catalog: CCloudEnvironment | undefined, + database: CCloudKafkaCluster | undefined, + block: ExecutableBlock | undefined, + ): CodeLens[] { + if (!hasCCloudAuthSession()) { + // show single codelens to sign in to CCloud since we need to be able to list CCloud resources + // in the other codelenses (via quickpicks) below + const signInLens = new CodeLens(range, { + title: "Sign in to Confluent Cloud", + command: "confluent.connections.ccloud.signIn", + tooltip: "Sign in to Confluent Cloud", + arguments: [], + } as Command); + return [signInLens]; + } + + const codeLenses: CodeLens[] = []; + + // codelens for selecting a compute pool const selectComputePoolCommand: Command = { title: computePool ? computePool.name : "Set Compute Pool", command: "confluent.document.flinksql.setCCloudComputePool", tooltip: computePool ? `Compute Pool: ${computePool.name}` : "Set CCloud Compute Pool for Flink Statement", - arguments: [document.uri, database], + arguments: [documentUri, database], }; const computePoolLens = new CodeLens(range, selectComputePoolCommand); @@ -108,7 +258,7 @@ export class FlinkSqlCodelensProvider extends DisposableCollection implements Co catalog && database ? `Catalog: ${catalog.name}, Database: ${database.name} (${database.provider} ${database.region})` : "Set Catalog & Database for Flink Statement", - arguments: [document.uri, computePool], + arguments: [documentUri, computePool], }; const databaseLens = new CodeLens(range, selectDatabaseCommand); @@ -117,7 +267,7 @@ export class FlinkSqlCodelensProvider extends DisposableCollection implements Co title: "Clear Settings", command: "confluent.document.flinksql.resetCCloudMetadata", tooltip: "Clear Selected CCloud Resources for Flink Statement", - arguments: [document.uri], + arguments: [documentUri], }; const resetLens = new CodeLens(range, resetCommand); @@ -126,7 +276,7 @@ export class FlinkSqlCodelensProvider extends DisposableCollection implements Co title: "▶️ Submit Statement", command: "confluent.statements.create", tooltip: "Submit Flink Statement to CCloud", - arguments: [document.uri, computePool, database], + arguments: [documentUri, computePool, database, block], }; const submitLens = new CodeLens(range, submitCommand); // show the "Submit Statement" | | codelenses From a892b782e0a3cf46785b0b3bd46c8620cee36f93 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Fri, 27 Feb 2026 23:34:25 -0500 Subject: [PATCH 3/3] update statement submission command logic to use executable blocks --- src/commands/flinkStatements.ts | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/src/commands/flinkStatements.ts b/src/commands/flinkStatements.ts index f2e8931fa3..be34f9c171 100644 --- a/src/commands/flinkStatements.ts +++ b/src/commands/flinkStatements.ts @@ -1,6 +1,8 @@ import * as vscode from "vscode"; import { registerCommandWithLogging } from "."; import { getCatalogDatabaseFromMetadata } from "../codelens/flinkSqlProvider"; +import type { ExecutableBlock, ParsedStatement } from "../documentParsing/flinkSql"; +import { StatementType, classifyStatementType } from "../documentParsing/flinkSql"; import { FLINKSTATEMENT_URI_SCHEME, FlinkStatementDocumentProvider, @@ -107,6 +109,7 @@ export async function submitFlinkStatementCommand( uri?: vscode.Uri, pool?: CCloudFlinkComputePool, database?: CCloudKafkaCluster, + block?: ExecutableBlock, ): Promise { const funcLogger = logger.withCallpoint("submitFlinkStatementCommand"); @@ -125,8 +128,26 @@ export async function submitFlinkStatementCommand( return; } - const document = await getEditorOrFileContents(statementBodyUri); - const statement = document.content; + let statement: string; + // submit a statement "block" if it was provided, otherwise submit the whole document + if (block) { + // TODO: handle SET/USE statements through the language client in a follow-up PR + const executableStatements: ParsedStatement[] = block.statements.filter( + (statement) => classifyStatementType(statement.text) === StatementType.EXECUTABLE, + ); + if (executableStatements.length === 0) { + await showErrorNotificationWithButtons("No executable statements found in block."); + return; + } + + statement = executableStatements.map((statement) => statement.text).join("\n"); + funcLogger.debug( + `Submitting block ${block.index} with ${executableStatements.length} executable statement(s) (filtered from ${block.statements.length} total)`, + ); + } else { + const document = await getEditorOrFileContents(statementBodyUri); + statement = document.content; + } const uriMetadata = await ResourceManager.getInstance().getUriMetadata(statementBodyUri); // 2. Choose the statement name