diff --git a/ansible/tasks/neovim.yml b/ansible/tasks/neovim.yml index 35bc060..0343bd3 100644 --- a/ansible/tasks/neovim.yml +++ b/ansible/tasks/neovim.yml @@ -15,13 +15,15 @@ - ruff - yamllint -- name: Remove existing ftplugin directory if it exists (to allow symlinking) +- name: Remove existing directories that need to be replaced with symlinks ansible.builtin.file: - path: "{{ ansible_env.HOME }}/.config/nvim/ftplugin" + path: "{{ ansible_env.HOME }}/.config/nvim/{{ item }}" state: absent + loop: + - ftplugin + - dbt - name: Link specific configuration files - ansible.builtin.file: src: "{{ playbook_dir }}/../{{ item.src }}" dest: "{{ ansible_env.HOME }}/{{ item.dest }}" @@ -30,6 +32,7 @@ - { src: "nvim/init.lua", dest: ".config/nvim/init.lua" } - { src: "nvim/lua", dest: ".config/nvim/lua" } - { src: "nvim/ftplugin", dest: ".config/nvim/ftplugin" } + - { src: "dbt", dest: ".config/nvim/dbt" } - name: Install jsonlint node.js package ansible.builtin.command: diff --git a/dbt/dbt_analyse.py b/dbt/dbt_analyse.py new file mode 100755 index 0000000..dd92fa3 --- /dev/null +++ b/dbt/dbt_analyse.py @@ -0,0 +1,228 @@ +#!/usr/bin/env python3 +# /// script +# requires-python = ">=3.10" +# dependencies = ["click", "pyyaml"] +# /// +""" +dbt_analyse: compile a dbt model, gather context, then launch an interactive +cursor-agent session. Designed to be called from a tmux window. + +Usage: + dbt_analyse.py --model --root \ + --filepath --prompt +""" + +import subprocess +import sys +import glob +import os +import re +import tempfile + +import click +import yaml + + +def run(cmd, cwd=None, capture=False, check=True): + result = subprocess.run( + cmd, + cwd=cwd, + capture_output=capture, + text=True, + ) + if check and result.returncode != 0: + stderr = result.stderr.strip() if result.stderr else "" + click.echo(f"ERROR: command failed (exit {result.returncode}): {' '.join(cmd)}", err=True) + if stderr: + click.echo(stderr, err=True) + sys.exit(result.returncode) + return result + + +def get_lineage(model, root): + """Return a summary of immediate parents and children from dbt ls.""" + lines = [] + for direction, selector in [ + ("parents", f"+{model},1+{model}"), + ("children", f"{model}+,{model}1+"), + ]: + result = subprocess.run( + ["uv", "run", "dbt", "ls", "-s", selector, "--output", "name", "--quiet"], + capture_output=True, + text=True, + cwd=root, + ) + if result.returncode == 0: + names = [ + n.strip() + for n in result.stdout.strip().splitlines() + if n.strip() and n.strip() != model + ] + if names: + lines.append(f"**{direction.title()}:** {', '.join(names)}") + return "\n".join(lines) if lines else "" + + +def get_existing_tests(model, root): + """Extract test definitions for a model from schema.yml files.""" + unique_files = glob.glob(os.path.join(root, "**", "*.yml"), recursive=True) + + tests = [] + for schema_path in unique_files: + try: + with open(schema_path) as f: + doc = yaml.safe_load(f) + except (yaml.YAMLError, OSError): + continue + if not isinstance(doc, dict): + continue + for m in doc.get("models", []): + if not isinstance(m, dict) or m.get("name") != model: + continue + # Model-level tests + for t in m.get("tests", []): + tests.append(f"- model-level: {t}") + # Column-level tests + for col in m.get("columns", []): + if not isinstance(col, dict): + continue + col_name = col.get("name", "?") + for t in col.get("tests", []): + if isinstance(t, str): + tests.append(f"- {col_name}: {t}") + elif isinstance(t, dict): + tests.append(f"- {col_name}: {t}") + return "\n".join(tests) if tests else "" + + +def render_template(template, replacements): + """Replace template placeholders, handling conditional {{#if}}/{{^if}} blocks.""" + for key, value in replacements.items(): + # Handle {{#if key}}...{{/if}} blocks + if_pattern = re.compile( + r"\{\{#if " + re.escape(key) + r"\}\}(.*?)\{\{/if\}\}", + re.DOTALL, + ) + not_pattern = re.compile( + r"\{\{\^if " + re.escape(key) + r"\}\}(.*?)\{\{/if\}\}", + re.DOTALL, + ) + if value: + template = if_pattern.sub(r"\1", template) + template = not_pattern.sub("", template) + else: + template = if_pattern.sub("", template) + template = not_pattern.sub(r"\1", template) + # Replace the simple placeholder + template = template.replace("{{" + key + "}}", value) + return template + + +@click.command() +@click.option("--model", required=True, help="dbt model name (no extension)") +@click.option("--root", required=True, help="Path to dbt project root") +@click.option("--filepath", required=True, help="Absolute path to the source SQL file") +@click.option("--prompt", required=True, help="Path to the prompt template .md file") +@click.option("--limit", default=20, show_default=True, help="Row limit for dbt show") +@click.option( + "--model-flag", default="sonnet-4.6-thinking", show_default=True, help="cursor-agent model" +) +def main(model, root, filepath, prompt, limit, model_flag): + # --- 1. compile --- + click.echo(f"Compiling {model}...") + run(["uv", "run", "dbt", "compile", "-s", model, "--quiet"], cwd=root) + + # --- 2. find compiled SQL --- + pattern = os.path.join(root, "target", "compiled", "**", f"{model}.sql") + matches = glob.glob(pattern, recursive=True) + if not matches: + click.echo(f"ERROR: no compiled SQL found for {model} — did compile succeed?", err=True) + sys.exit(1) + with open(matches[0]) as f: + compiled_sql = f.read() + click.echo(f"Compiled SQL: {matches[0]}") + + # --- 3. sample rows --- + click.echo(f"Fetching sample rows (limit={limit})...") + result = run( + [ + "uv", + "run", + "dbt", + "show", + "-s", + model, + "--limit", + str(limit), + "--output", + "json", + "--log-format", + "json", + ], + cwd=root, + capture=True, + check=False, + ) + if result.returncode != 0: + click.echo( + f"WARNING: dbt show failed (exit {result.returncode}), continuing without sample rows", + err=True, + ) + sample_rows = "(dbt show failed)" + else: + sample_rows = result.stdout.strip() or "(no rows returned)" + + # --- 4. source SQL --- + if not os.path.exists(filepath): + click.echo(f"ERROR: source file not found: {filepath}", err=True) + sys.exit(1) + with open(filepath) as f: + source_sql = f.read() + + # --- 5. gather lineage and existing tests --- + click.echo("Gathering model lineage...") + lineage = get_lineage(model, root) + + click.echo("Scanning for existing dbt tests...") + existing_tests = get_existing_tests(model, root) + + # --- 6. build prompt --- + if not os.path.exists(prompt): + click.echo(f"ERROR: prompt template not found: {prompt}", err=True) + sys.exit(1) + with open(prompt) as f: + template = f.read() + + full_prompt = render_template( + template, + { + "compiled_sql": compiled_sql, + "sample_rows": sample_rows, + "existing_tests": existing_tests, + "lineage": lineage, + "data_profile": "", + }, + ) + full_prompt += f"\n\nSource SQL:\n{source_sql}" + + # --- 7. write context to a temp file & launch cursor-agent --- + ctx = tempfile.NamedTemporaryFile( + mode="w", + suffix=".md", + prefix=f"dbt_audit_{model}_", + delete=False, + ) + ctx.write(full_prompt) + ctx.close() + click.echo(f"Context written to {ctx.name}") + + click.echo(f"Launching cursor-agent ({model_flag})...") + agent_prompt = ( + f"Read the audit instructions and dbt model context from {ctx.name}. " + "Perform a thorough data quality audit of the dbt model as described." + ) + os.execlp("cursor-agent", "cursor-agent", "--model", model_flag, agent_prompt) + + +if __name__ == "__main__": + main() diff --git a/dbt/dbt_batch_audit.py b/dbt/dbt_batch_audit.py new file mode 100644 index 0000000..5bd0b67 --- /dev/null +++ b/dbt/dbt_batch_audit.py @@ -0,0 +1,542 @@ +#!/usr/bin/env python3 +# /// script +# requires-python = ">=3.10" +# dependencies = ["click", "mdformat", "pyyaml"] +# /// +""" +dbt_batch_audit: run dbt model audits across multiple models and LLMs in +parallel, then synthesize a final consolidated report. + +Accepts SQL file paths, directories, and shell globs. Model names are inferred +from filenames (e.g. models/int_orders.sql → int_orders). + +Usage: + # single files + dbt_batch_audit.py models/int_orders.sql models/stg_users.sql ... + + # a whole directory + dbt_batch_audit.py models/intermediate/ + + # shell glob (expanded by the shell before the script sees it) + dbt_batch_audit.py models/int_*.sql + + # mix and match + dbt_batch_audit.py models/intermediate/ models/stg_special.sql +""" + +import re +import subprocess +import sys +import glob +import os +import time +from concurrent.futures import ThreadPoolExecutor, as_completed + +import click +import yaml + + +def run(cmd, cwd=None, capture=False, check=True): + result = subprocess.run(cmd, cwd=cwd, capture_output=capture, text=True) + if check and result.returncode != 0: + stderr = result.stderr.strip() if result.stderr else "" + click.echo( + f"ERROR: command failed (exit {result.returncode}): {' '.join(cmd)}", + err=True, + ) + if stderr: + click.echo(stderr, err=True) + sys.exit(result.returncode) + return result + + +def compile_model(model_name, root): + click.echo(f" Compiling {model_name}...") + run(["uv", "run", "dbt", "compile", "-s", model_name, "--quiet"], cwd=root) + pattern = os.path.join(root, "target", "compiled", "**", f"{model_name}.sql") + matches = glob.glob(pattern, recursive=True) + if not matches: + click.echo(f"ERROR: no compiled SQL found for {model_name}", err=True) + sys.exit(1) + with open(matches[0]) as f: + return f.read() + + +def get_sample_rows(model_name, root, limit): + click.echo(f" Fetching sample rows for {model_name} (limit={limit})...") + result = run( + [ + "uv", + "run", + "dbt", + "show", + "-s", + model_name, + "--limit", + str(limit), + "--output", + "json", + "--log-format", + "json", + ], + cwd=root, + capture=True, + check=False, + ) + if result.returncode != 0: + click.echo(f" WARNING: dbt show failed for {model_name}", err=True) + return "(dbt show failed)" + return result.stdout.strip() or "(no rows returned)" + + +def get_lineage(model_name, root): + """Return a summary of immediate parents and children from dbt ls.""" + lines = [] + for direction, selector in [ + ("parents", f"+{model_name},1+{model_name}"), + ("children", f"{model_name}+,{model_name}1+"), + ]: + result = subprocess.run( + ["uv", "run", "dbt", "ls", "-s", selector, "--output", "name", "--quiet"], + capture_output=True, + text=True, + cwd=root, + ) + if result.returncode == 0: + names = [ + n.strip() + for n in result.stdout.strip().splitlines() + if n.strip() and n.strip() != model_name + ] + if names: + lines.append(f"**{direction.title()}:** {', '.join(names)}") + return "\n".join(lines) if lines else "" + + +def get_existing_tests(model_name, root): + """Extract test definitions for a model from schema.yml files.""" + schema_files = glob.glob(os.path.join(root, "**", "*.yml"), recursive=True) + seen = set() + unique_files = [] + for f in schema_files: + if f not in seen: + seen.add(f) + unique_files.append(f) + + tests = [] + for schema_path in unique_files: + try: + with open(schema_path) as f: + doc = yaml.safe_load(f) + except (yaml.YAMLError, OSError): + continue + if not isinstance(doc, dict): + continue + for m in doc.get("models", []): + if not isinstance(m, dict) or m.get("name") != model_name: + continue + for t in m.get("tests", []): + tests.append(f"- model-level: {t}") + for col in m.get("columns", []): + if not isinstance(col, dict): + continue + col_name = col.get("name", "?") + for t in col.get("tests", []): + if isinstance(t, str): + tests.append(f"- {col_name}: {t}") + elif isinstance(t, dict): + tests.append(f"- {col_name}: {t}") + return "\n".join(tests) if tests else "" + + +def render_template(template, replacements): + """Replace template placeholders, handling conditional {{#if}}/{{^if}} blocks.""" + for key, value in replacements.items(): + if_pattern = re.compile( + r"\{\{#if " + re.escape(key) + r"\}\}(.*?)\{\{/if\}\}", + re.DOTALL, + ) + not_pattern = re.compile( + r"\{\{\^if " + re.escape(key) + r"\}\}(.*?)\{\{/if\}\}", + re.DOTALL, + ) + if value: + template = if_pattern.sub(r"\1", template) + template = not_pattern.sub("", template) + else: + template = if_pattern.sub("", template) + template = not_pattern.sub(r"\1", template) + template = template.replace("{{" + key + "}}", value) + return template + + +def write_context_file( + output_dir, + model_name, + template, + compiled_sql, + sample_rows, + source_sql, + lineage="", + existing_tests="", +): + """Write the full audit context to a file so cursor-agent can read it.""" + ctx_dir = os.path.join(output_dir, ".context") + os.makedirs(ctx_dir, exist_ok=True) + + content = render_template( + template, + { + "compiled_sql": compiled_sql, + "sample_rows": sample_rows, + "existing_tests": existing_tests, + "lineage": lineage, + "data_profile": "", + }, + ) + content += f"\n\nSource SQL:\n{source_sql}" + + ctx_path = os.path.join(ctx_dir, f"{model_name}__context.md") + with open(ctx_path, "w") as f: + f.write(content) + return os.path.abspath(ctx_path) + + +def get_available_models(): + """Return the set of valid model IDs from cursor-agent --list-models.""" + result = subprocess.run( + ["cursor-agent", "--list-models"], + capture_output=True, + text=True, + ) + # Strip ANSI escape codes, then extract the first token of each line that + # looks like a model ID (alphanumeric + hyphens/dots, before the " - " separator). + ansi_escape = re.compile(r"\x1b\[[0-9;]*[A-Za-z]|\x1b\[[0-9]*[A-Za-z]") + clean = ansi_escape.sub("", result.stdout) + models = set() + for line in clean.splitlines(): + m = re.match(r"^([a-zA-Z0-9][a-zA-Z0-9._-]+)\s+-\s+", line.strip()) + if m: + models.add(m.group(1)) + return models + + +def validate_llms(llms): + """Fail fast if any requested LLM is not in cursor-agent's available models.""" + click.echo("Validating model names against cursor-agent...") + available = get_available_models() + if not available: + click.echo( + "WARNING: could not retrieve available models list; skipping validation", + err=True, + ) + return + invalid = [llm for llm in llms if llm not in available] + if invalid: + click.echo( + "ERROR: the following model(s) are not available in cursor-agent:\n" + + "\n".join(f" - {m}" for m in invalid) + + "\n\nAvailable models:\n " + + "\n ".join(sorted(available)), + err=True, + ) + sys.exit(1) + click.echo(f" All {len(llms)} model(s) validated OK.\n") + + +def run_audit(model_name, context_path, llm, output_dir, root): + click.echo(f" [{model_name} × {llm}] Starting audit...") + start = time.monotonic() + + prompt = ( + f"Read the audit instructions and dbt model context from {context_path}. " + "Perform a thorough data quality audit of the dbt model as described. " + "Output your complete findings as a well-structured markdown report." + ) + + try: + result = subprocess.run( + ["cursor-agent", "--print", "--force", "--model", llm, prompt], + capture_output=True, + text=True, + cwd=root, + timeout=900, + ) + except subprocess.TimeoutExpired: + report = "(audit timed out after 900s)" + click.echo(f" [{model_name} × {llm}] Timed out", err=True) + safe_llm = llm.replace("/", "_").replace(" ", "_") + report_path = os.path.join(output_dir, f"{model_name}__{safe_llm}.md") + with open(report_path, "w") as f: + f.write(f"# Audit: {model_name} (LLM: {llm})\n\n{report}\n") + elapsed = time.monotonic() - start + click.echo(f" [{model_name} × {llm}] Done ({elapsed:.0f}s) → {report_path}") + return model_name, llm, report + + if result.returncode == 0: + report = result.stdout.strip() + else: + report = f"(audit failed: exit {result.returncode})\n{result.stderr}" + + safe_llm = llm.replace("/", "_").replace(" ", "_") + report_path = os.path.join(output_dir, f"{model_name}__{safe_llm}.md") + with open(report_path, "w") as f: + f.write(f"# Audit: {model_name} (LLM: {llm})\n\n{report}\n") + + elapsed = time.monotonic() - start + click.echo(f" [{model_name} × {llm}] Done ({elapsed:.0f}s) → {report_path}") + return model_name, llm, report + + +def synthesize_reports(reports, synthesis_model, output_dir, root): + click.echo("\nSynthesizing final report...") + start = time.monotonic() + + combined_parts = [] + for model_name, llm, report in reports: + combined_parts.append(f"---\n## Model: {model_name} | Reviewer: {llm}\n\n{report}\n") + combined_text = "\n".join(combined_parts) + + combined_path = os.path.join(output_dir, "all_individual_reports.md") + with open(combined_path, "w") as f: + f.write(f"# All Individual Audit Reports\n\n{combined_text}\n") + + # Write synthesis context to a file to avoid command-line length limits + ctx_dir = os.path.join(output_dir, ".context") + os.makedirs(ctx_dir, exist_ok=True) + synthesis_ctx_path = os.path.abspath(os.path.join(ctx_dir, "synthesis_context.md")) + + synthesis_instructions = f"""\ +You are a senior analytics engineer reviewing multiple dbt model audit reports. +Each report below was generated by a different LLM auditing a dbt model. + +Your job: +1. Cross-reference findings across models and reviewers +2. Identify the most critical issues that appear consistently +3. Flag any contradictions between reviewers +4. Prioritize recommendations by impact and effort +5. Produce a final consolidated report in markdown + +A key part of your analysis is **cross-model bug propagation**: for every significant finding in any +model, explicitly trace its downstream consequences through the model dependency chain. Ask: which +downstream models consume this model's output? If this bug is present in the data, what does that +mean for each downstream model's correctness? Does the bug amplify, get filtered out, or silently +corrupt aggregations further down the chain? Call out cases where a bug in an upstream model makes +a downstream model's output untrustworthy even if the downstream model itself has no defects. + +Generate a final synthesis report with: +- An executive summary +- Critical findings (agreed upon by multiple reviewers or clearly valid), each with an explicit + **Downstream impact** sub-section tracing the bug through the dependency chain +- A dependency propagation map: a table or diagram showing which bugs flow into which downstream + models and what the compounded effect is +- Model-specific recommendations ordered by severity +- Cross-model patterns or systemic issues +- A prioritized action plan table, where items that fix root-cause bugs affecting multiple + downstream models are ranked higher than equivalent-effort fixes that are locally scoped + +--- + +Individual audit reports: + +{combined_text}""" + + with open(synthesis_ctx_path, "w") as f: + f.write(synthesis_instructions) + + prompt = ( + f"Read the synthesis instructions and individual audit reports from " + f"{synthesis_ctx_path}. Follow the instructions to produce a final " + "consolidated synthesis report in markdown." + ) + + try: + result = subprocess.run( + ["cursor-agent", "--print", "--force", "--model", synthesis_model, prompt], + capture_output=True, + text=True, + cwd=root, + timeout=1200, + ) + except subprocess.TimeoutExpired: + synthesis = "(synthesis timed out after 1200s)" + click.echo("WARNING: synthesis timed out", err=True) + synthesis_path = os.path.join(output_dir, "final_synthesis.md") + with open(synthesis_path, "w") as f: + f.write(f"# dbt Model Audit — Final Synthesis\n\n{synthesis}\n") + return synthesis_path + + if result.returncode == 0: + synthesis = result.stdout.strip() + else: + synthesis = f"(synthesis failed: exit {result.returncode})\n{result.stderr}" + + synthesis_path = os.path.join(output_dir, "final_synthesis.md") + with open(synthesis_path, "w") as f: + f.write(f"# dbt Model Audit — Final Synthesis\n\n{synthesis}\n") + + elapsed = time.monotonic() - start + click.echo(f"Final synthesis ({elapsed:.0f}s) → {synthesis_path}") + click.echo(f"Combined reports → {combined_path}") + return synthesis_path + + +def resolve_sql_paths(paths): + """Expand directories and verify that every path is a .sql file.""" + resolved = [] + for p in paths: + p = os.path.abspath(p) + if os.path.isdir(p): + children = sorted(f for f in glob.glob(os.path.join(p, "**", "*.sql"), recursive=True)) + if not children: + click.echo(f"WARNING: no .sql files found in {p}", err=True) + resolved.extend(children) + elif os.path.isfile(p): + if not p.endswith(".sql"): + click.echo(f"ERROR: not a .sql file: {p}", err=True) + sys.exit(1) + resolved.append(p) + else: + click.echo(f"ERROR: path not found: {p}", err=True) + sys.exit(1) + return resolved + + +def model_name_from_path(filepath): + return os.path.splitext(os.path.basename(filepath))[0] + + +@click.command() +@click.argument("paths", nargs=-1, required=True) +@click.option( + "--llm", + "llms", + required=True, + multiple=True, + help="LLM model name for cursor-agent (repeatable)", +) +@click.option("--root", required=True, help="Path to dbt project root") +@click.option("--prompt", required=True, help="Path to the prompt template .md file") +@click.option( + "--output-dir", + default="./audit_reports", + show_default=True, + help="Directory for output reports", +) +@click.option("--limit", default=20, show_default=True, help="Row limit for dbt show") +@click.option( + "--synthesis-model", + default="sonnet-4.6-thinking", + show_default=True, + help="LLM for the final synthesis step", +) +@click.option( + "--concurrency", + default=3, + show_default=True, + help="Max parallel cursor-agent invocations", +) +def main(paths, llms, root, prompt, output_dir, limit, synthesis_model, concurrency): + """Run dbt model audits across multiple models and LLMs, then synthesize. + + PATHS are .sql files, directories containing .sql files, or shell globs. + Model names are inferred from filenames (int_orders.sql → int_orders). + """ + validate_llms(llms + (synthesis_model,)) + + sql_files = resolve_sql_paths(paths) + if not sql_files: + click.echo("ERROR: no .sql files resolved from the given paths", err=True) + sys.exit(1) + + model_specs = [] + seen = set() + for filepath in sql_files: + name = model_name_from_path(filepath) + if name in seen: + click.echo( + f"ERROR: duplicate model name '{name}' from {filepath}", + err=True, + ) + sys.exit(1) + seen.add(name) + model_specs.append((name, filepath)) + + if not os.path.exists(prompt): + click.echo(f"ERROR: prompt template not found: {prompt}", err=True) + sys.exit(1) + with open(prompt) as f: + template = f.read() + + root = os.path.abspath(root) + output_dir = os.path.abspath(output_dir) + os.makedirs(output_dir, exist_ok=True) + + total = len(model_specs) * len(llms) + click.echo(f"Auditing {len(model_specs)} model(s) × {len(llms)} LLM(s) = {total} audit(s)") + click.echo(f"Concurrency: {concurrency} | Synthesis model: {synthesis_model}\n") + + context_paths = {} + for name, filepath in model_specs: + click.echo(f"Preparing {name}...") + compiled_sql = compile_model(name, root) + sample_rows = get_sample_rows(name, root, limit) + with open(filepath) as f: + source_sql = f.read() + click.echo(f" Gathering lineage for {name}...") + lineage = get_lineage(name, root) + click.echo(f" Scanning tests for {name}...") + existing_tests = get_existing_tests(name, root) + context_paths[name] = write_context_file( + output_dir, + name, + template, + compiled_sql, + sample_rows, + source_sql, + lineage=lineage, + existing_tests=existing_tests, + ) + + click.echo(f"\nAll models compiled. Launching {total} audit(s)...\n") + + reports = [] + with ThreadPoolExecutor(max_workers=concurrency) as pool: + futures = {} + for name, _ in model_specs: + for llm in llms: + fut = pool.submit( + run_audit, + name, + context_paths[name], + llm, + output_dir, + root, + ) + futures[fut] = (name, llm) + + for fut in as_completed(futures): + name, llm = futures[fut] + try: + reports.append(fut.result()) + except Exception as e: + click.echo(f" ERROR [{name} × {llm}]: {e}", err=True) + reports.append((name, llm, f"(error: {e})")) + + reports.sort(key=lambda r: (r[0], r[1])) + + synthesize_reports(reports, synthesis_model, output_dir, root) + + md_files = glob.glob(os.path.join(output_dir, "*.md")) + if md_files: + click.echo(f"\nFormatting {len(md_files)} markdown file(s)...") + run( + ["uvx", "mdformat", "--wrap", "100"] + md_files, + cwd=root, + ) + + click.echo(f"\nDone! {len(reports)} audit(s) completed. Reports in: {output_dir}") + + +if __name__ == "__main__": + main() diff --git a/dbt/dbt_deep_analysis.md b/dbt/dbt_deep_analysis.md new file mode 100644 index 0000000..fbc10d4 --- /dev/null +++ b/dbt/dbt_deep_analysis.md @@ -0,0 +1,98 @@ +You have access to a duckdb database. You are auditing a dbt model for data quality, correctness, and best practices. Interrogate the database to validate every claim you make — do not speculate without running a query first. + +## Audit checklist + +Work through each section. For every finding, run a query to confirm it. + +### 1. Schema & types +- Are column types appropriate (e.g. dates stored as DATE not VARCHAR, monetary values as DECIMAL not FLOAT)? +- Are there implicit casts in joins or WHERE clauses that could silently drop rows or change values? +- Do any columns contain mixed types or unexpected NULLs? + +### 2. Join correctness +- Is every join relationship correct (1:1, 1:N, M:N)? Run a query: does the join **fan out** (produce more rows than the driving table)? +- Are there orphaned rows (LEFT JOIN misses)? What fraction of rows have NULL foreign keys after the join? +- Are join keys unique on the side that should be unique? Query `COUNT(*) vs COUNT(DISTINCT key)`. + +### 3. Filters & business logic +- Are there WHERE / HAVING filters that could silently exclude valid records (e.g. filtering on a column that is sometimes NULL)? +- Is there business logic (CASE statements, date arithmetic, aggregations) that could produce wrong results on edge cases? +- Are date boundaries inclusive/exclusive as intended? + +### 4. Grain & uniqueness +- What is the intended grain of this model? Verify with `COUNT(*) vs COUNT(DISTINCT )`. +- Could the model produce duplicate rows under any upstream data condition? + +### 5. Data quality +- What percentage of each column is NULL? Flag any column where the NULL rate is suspicious. +- Are there unexpected duplicate values, negative numbers, future dates, or empty strings where there shouldn't be? +- Do value distributions look reasonable (run MIN, MAX, AVG, percentiles for numeric columns)? + +### 6. Performance & best practices +- Are there SELECT * or unnecessary columns being carried through? +- Could CTEs be simplified or combined? +- Are there window functions that could be replaced with simpler aggregations, or vice versa? +- Is the model incremental where it should be, or full-refresh where incremental would be better? + +### 7. Test coverage gaps +{{#if existing_tests}} +The following dbt tests are already defined for this model: +{{existing_tests}} + +Identify what is NOT covered by existing tests. Focus recommendations on gaps. +{{/if}} +{{^if existing_tests}} +No dbt tests were found for this model. Recommend the most important tests to add. +{{/if}} + +### 8. Upstream dependency risks +{{#if lineage}} +Model lineage (immediate upstream/downstream): +{{lineage}} + +Consider: if an upstream model delivers late, delivers duplicates, or changes its grain, how does this model behave? Are there defensive checks? +{{/if}} + +## Context + +### Compiled SQL +{{compiled_sql}} + +### Sample rows +{{sample_rows}} + +### Data profile +{{#if data_profile}} +{{data_profile}} +{{/if}} + +## Output format + +Structure your report as follows: + +``` +## Executive summary +(2-3 sentences: overall health, most critical finding) + +## Critical findings +(Issues that could produce wrong numbers in production) + +| # | Finding | Severity | Evidence query | Affected columns | +|---|---------|----------|---------------|-----------------| +| 1 | ... | ... | ... | ... | + +### Finding 1: +**Query:** <the SQL you ran> +**Result:** <what you found> +**Impact:** <what goes wrong downstream> +**Fix:** <specific SQL or config change> + +## Warnings +(Issues that aren't wrong today but are fragile) + +## Recommendations +(Best-practice improvements, ordered by impact) + +## Suggested dbt tests +(Specific test YAML snippets to add) +``` diff --git a/dbt/dbt_quick_analysis.md b/dbt/dbt_quick_analysis.md new file mode 100644 index 0000000..6eb4511 --- /dev/null +++ b/dbt/dbt_quick_analysis.md @@ -0,0 +1,4 @@ +Output the complete SQL file with inline comments added as SQL comments (-- ). +Add brief comments suggesting improvements, potential issues, or best-practice violations. +Where appropriate, include a short explanation of why the suggestion matters. +Output ONLY the SQL with comments, no markdown fences, no preamble. diff --git a/nvim/lua/config/dbt.lua b/nvim/lua/config/dbt.lua new file mode 100644 index 0000000..962891b --- /dev/null +++ b/nvim/lua/config/dbt.lua @@ -0,0 +1,409 @@ +-- dbt keymaps and helpers +-- Loaded from keymaps.lua + +-- Extract model name from current file path (e.g., models/staging/stg_orders.sql -> stg_orders) +local function dbt_model_name() + local filepath = vim.fn.expand("%:t:r") + if filepath == "" then + vim.notify("No file open", vim.log.levels.WARN) + return nil + end + return filepath +end + +-- Format the current SQL file, then send a dbt command to the terminal +local function dbt_cmd(cmd_template) + local model = dbt_model_name() + if not model then + return + end + + -- Format with conform (sqlfmt), then save + require("conform").format({ async = false, lsp_fallback = true }) + vim.cmd("write") + + -- Build the command + local cmd = string.format(cmd_template, model) + + -- Send to toggleterm (terminal 1), then return focus to the code window + local prev_win = vim.api.nvim_get_current_win() + local term = require("toggleterm.terminal").get(1) + if not term then + term = require("toggleterm.terminal").Terminal:new({ id = 1 }) + end + if not term:is_open() then + term:toggle() + end + term:send(cmd) + vim.api.nvim_set_current_win(prev_win) +end + +-- Find the project root (directory containing dbt_project.yml) +local function dbt_project_root() + local path = vim.fn.findfile("dbt_project.yml", ".;") + if path == "" then + return nil + end + return vim.fn.fnamemodify(path, ":p:h") +end + +-- Send a raw command string to toggleterm (used by picker actions) +local function dbt_cmd_raw(cmd) + local term = require("toggleterm.terminal").get(1) + if not term then + term = require("toggleterm.terminal").Terminal:new({ id = 1 }) + end + if not term:is_open() then + term:toggle() + end + term:send(cmd) +end + +-- Read a prompt template and substitute {{key}} placeholders +local function dbt_load_prompt(name, vars) + local prompt_dir = vim.fn.stdpath("config") .. "/dbt/" + local path = prompt_dir .. name .. ".md" + local lines = vim.fn.readfile(path) + if #lines == 0 then + vim.notify("Prompt not found: " .. path, vim.log.levels.ERROR) + return nil + end + local prompt = table.concat(lines, "\n") + for key, value in pairs(vars or {}) do + prompt = prompt:gsub("{{" .. key .. "}}", value) + end + return prompt +end + +-- Jump to model under cursor from {{ ref('model_name') }} or {{ source('src', 'table') }} +vim.keymap.set("n", "<leader>dg", function() + local line = vim.api.nvim_get_current_line() + local col = vim.api.nvim_win_get_cursor(0)[2] + 1 + + -- Try to find ref('model') or ref("model") around cursor + local ref_model = nil + for start_pos, name, end_pos in line:gmatch("()ref%(['\"]([^'\"]+)['\"]%)()" ) do + if col >= start_pos and col <= end_pos then + ref_model = name + break + end + end + + -- Try source('source_name', 'table_name') if no ref found + local source_name, source_table = nil, nil + if not ref_model then + for start_pos, src, tbl, end_pos in line:gmatch("()source%(['\"]([^'\"]+)['\"]%s*,%s*['\"]([^'\"]+)['\"]%)()" ) do + if col >= start_pos and col <= end_pos then + source_name, source_table = src, tbl + break + end + end + end + + if not ref_model and not source_table then + vim.notify("No ref() or source() under cursor", vim.log.levels.WARN) + return + end + + local root = dbt_project_root() + if not root then + vim.notify("No dbt_project.yml found", vim.log.levels.WARN) + return + end + + -- Search for the model file + local search_name = ref_model or source_table + local matches = vim.fn.globpath(root, "**/" .. search_name .. ".sql", false, true) + if #matches == 0 then + -- Also try .yml for source definitions + matches = vim.fn.globpath(root, "**/" .. search_name .. ".yml", false, true) + end + + if #matches == 1 then + vim.cmd.edit(matches[1]) + elseif #matches > 1 then + vim.ui.select(matches, { prompt = "Multiple matches:" }, function(choice) + if choice then + vim.cmd.edit(choice) + end + end) + else + vim.notify("No file found for: " .. search_name, vim.log.levels.WARN) + end +end, { desc = "[D]bt [G]o to ref/source" }) + +-- Fuzzy model picker — select a model then choose an action +vim.keymap.set("n", "<leader>df", function() + local root = dbt_project_root() + if not root then + vim.notify("No dbt_project.yml found", vim.log.levels.WARN) + return + end + + local actions = require("telescope.actions") + local action_state = require("telescope.actions.state") + require("telescope.builtin").find_files({ + prompt_title = "dbt model (enter=open, C-r=run, C-b=build, C-t=test)", + cwd = root, + search_dirs = { "models" }, + find_command = { "fd", "-e", "sql" }, + attach_mappings = function(prompt_bufnr, map) + local function get_model_name() + local entry = action_state.get_selected_entry() + if entry then + return entry[1]:match("([^/]+)%.sql$") + end + end + map("i", "<C-r>", function() + local name = get_model_name() + actions.close(prompt_bufnr) + if name then dbt_cmd_raw("uv run dbt run -s " .. name) end + end) + map("i", "<C-b>", function() + local name = get_model_name() + actions.close(prompt_bufnr) + if name then dbt_cmd_raw("uv run dbt build -s " .. name) end + end) + map("i", "<C-t>", function() + local name = get_model_name() + actions.close(prompt_bufnr) + if name then dbt_cmd_raw("uv run dbt test -s " .. name) end + end) + return true + end, + }) +end, { desc = "[D]bt [F]ind model" }) + +-- Open the compiled SQL for the current model in a split +vim.keymap.set("n", "<leader>do", function() + local model = dbt_model_name() + if not model then + return + end + local root = dbt_project_root() + if not root then + vim.notify("No dbt_project.yml found", vim.log.levels.WARN) + return + end + local compiled = vim.fn.globpath(root, "target/compiled/**/" .. model .. ".sql", false, true) + if #compiled == 0 then + vim.notify("No compiled SQL found — run dbt compile first", vim.log.levels.WARN) + return + end + vim.cmd("vsplit " .. compiled[1]) + vim.bo.readonly = true + vim.bo.modifiable = false +end, { desc = "[D]bt [O]pen compiled SQL" }) + +-- Grep across all models (search for column names, CTEs, etc.) +vim.keymap.set("n", "<leader>d/", function() + local root = dbt_project_root() + if not root then + vim.notify("No dbt_project.yml found", vim.log.levels.WARN) + return + end + require("telescope.builtin").live_grep({ prompt_title = "dbt grep", cwd = root .. "/models" }) +end, { desc = "[D]bt search models" }) + +vim.keymap.set("n", "<leader>dr", function() + dbt_cmd("uv run dbt run -s %s") +end, { desc = "[D]bt [R]un current model" }) + +vim.keymap.set("n", "<leader>dR", function() + dbt_cmd("uv run dbt run -s %s+") +end, { desc = "[D]bt [R]un model + downstream" }) + +vim.keymap.set("n", "<leader>db", function() + dbt_cmd("uv run dbt build -s %s") +end, { desc = "[D]bt [B]uild current model (run + test)" }) + +vim.keymap.set("n", "<leader>dc", function() + dbt_cmd("uv run dbt compile -s %s") +end, { desc = "[D]bt [C]ompile current model" }) + +vim.keymap.set("n", "<leader>dt", function() + dbt_cmd("uv run dbt test -s %s") +end, { desc = "[D]bt [T]est current model" }) + +vim.keymap.set("n", "<leader>ds", function() + dbt_cmd("uv run dbt show -s %s") +end, { desc = "[D]bt [S]how preview results" }) + +-- Preview sample rows in a horizontal split +vim.keymap.set("n", "<leader>dp", function() + local model = dbt_model_name() + if not model then + return + end + vim.notify("Fetching preview for " .. model .. "...", vim.log.levels.INFO) + local cmd = { "uv", "run", "dbt", "show", "-s", model, "--limit", "20" } + local output = {} + vim.fn.jobstart(cmd, { + cwd = dbt_project_root(), + stdout_buffered = true, + stderr_buffered = true, + on_stdout = function(_, data) + if data then + vim.list_extend(output, data) + end + end, + on_exit = function(_, exit_code) + vim.schedule(function() + if exit_code ~= 0 then + vim.notify("dbt show failed (exit " .. exit_code .. ")", vim.log.levels.ERROR) + return + end + -- Trim trailing empty lines + while #output > 0 and output[#output] == "" do + table.remove(output) + end + if #output == 0 then + vim.notify("No rows returned", vim.log.levels.WARN) + return + end + -- Open a scratch buffer in a horizontal split + vim.cmd("botright new") + local buf = vim.api.nvim_get_current_buf() + vim.bo[buf].buftype = "nofile" + vim.bo[buf].bufhidden = "wipe" + vim.bo[buf].swapfile = false + vim.bo[buf].filetype = "sql" + vim.api.nvim_buf_set_name(buf, "dbt-preview://" .. model) + vim.api.nvim_buf_set_lines(buf, 0, -1, false, output) + vim.bo[buf].modifiable = false + -- Resize to fit content (max 20 lines) + local height = math.min(#output, 20) + vim.api.nvim_win_set_height(0, height) + vim.keymap.set("n", "q", "<cmd>close<cr>", { buffer = buf, silent = true }) + end) + end, + }) +end, { desc = "[D]bt [P]review model rows" }) + +-- Show model output as CSV piped into visidata +vim.keymap.set("n", "<leader>dv", function() + local model = dbt_model_name() + if not model then + return + end + local root = dbt_project_root() + if not root then + vim.notify("No dbt_project.yml found", vim.log.levels.WARN) + return + end + require("conform").format({ async = false, lsp_fallback = true }) + vim.cmd("write") + local prev_win = vim.api.nvim_get_current_win() + local json_to_csv = [[python3 -c " +import sys, json, csv +raw = sys.stdin.read() +for line in raw.splitlines(): + try: + obj = json.loads(line) + except json.JSONDecodeError: + continue + preview = None + if 'data' in obj and 'preview' in obj['data']: + preview = obj['data']['preview'] + elif 'results' in obj: + preview = obj['results'][0].get('preview') + elif 'preview' in obj: + preview = obj['preview'] + if preview: + if isinstance(preview, str): + preview = json.loads(preview) + w = csv.DictWriter(sys.stdout, fieldnames=preview[0].keys()) + w.writeheader() + w.writerows(preview) + break +"]] + local cmd = "cd " .. root .. " && uv run dbt show -s " .. model .. " --limit 500 --output json --log-format json | " .. json_to_csv .. " | vd -f csv" + require("toggleterm.terminal").Terminal + :new({ + cmd = cmd, + close_on_exit = true, + on_exit = function() + vim.schedule(function() + if vim.api.nvim_win_is_valid(prev_win) then + vim.api.nvim_set_current_win(prev_win) + end + end) + end, + }) + :toggle() +end, { desc = "[D]bt [V]isidata preview" }) + +-- Run cursor-agent on current model (quick analysis with sonnet) +vim.keymap.set("n", "<leader>da", function() + local filepath = vim.fn.expand("%:p") + if filepath == "" then + vim.notify("No file open", vim.log.levels.WARN) + return + end + local prompt = dbt_load_prompt("dbt_quick_analysis", {}) + if not prompt then + return + end + local bufnr = vim.api.nvim_get_current_buf() + + vim.notify("Running quick analysis...", vim.log.levels.INFO) + + local file_content = table.concat(vim.fn.readfile(filepath), "\n") + local full_prompt = prompt .. "\n\nFile: " .. filepath .. "\n```sql\n" .. file_content .. "\n```" + local cmd = { "cursor-agent", "--print", "--model", "sonnet-4.6", full_prompt } + local output = {} + vim.fn.jobstart(cmd, { + stdout_buffered = true, + on_stdout = function(_, data) + if data then + output = data + end + end, + on_exit = function(_, exit_code) + vim.schedule(function() + if exit_code ~= 0 then + vim.notify("cursor-agent exited with code " .. exit_code, vim.log.levels.ERROR) + return + end + -- Remove trailing empty strings from jobstart output + while #output > 0 and output[#output] == "" do + table.remove(output) + end + if #output == 0 then + vim.notify("No output from cursor-agent", vim.log.levels.WARN) + return + end + vim.api.nvim_buf_set_lines(bufnr, 0, -1, false, output) + vim.notify("Inline comments added — review and :w to save, or :u to undo", vim.log.levels.INFO) + end) + end, + }) +end, { desc = "[D]bt [A]nalyse model (quick)" }) + +-- Open interactive cursor-agent session in a new tmux window with compiled SQL + sample rows as context +vim.keymap.set("n", "<leader>dA", function() + local filepath = vim.fn.expand("%:p") + if filepath == "" then + vim.notify("No file open", vim.log.levels.WARN) + return + end + local model = dbt_model_name() + if not model then + return + end + local root = dbt_project_root() + if not root then + vim.notify("No dbt_project.yml found", vim.log.levels.WARN) + return + end + + -- Open a new tmux window running the standalone dbt_analyse.py script + local prompt_path = vim.fn.stdpath("config") .. "/dbt/dbt_deep_analysis.md" + local script_path = vim.fn.stdpath("config") .. "/dbt/dbt_analyse.py" + local shell_script = string.format( + [[cd %s && uv run python3 %s --model %s --root %s --filepath %s --prompt %s || (echo "Press enter to close..." && read)]], + root, script_path, model, root, filepath, prompt_path + ) + vim.fn.jobstart({ "tmux", "new-window", "-n", "dbt:" .. model, shell_script }, { detach = true }) + vim.notify("Opened interactive cursor-agent session in tmux window 'dbt:" .. model .. "'", vim.log.levels.INFO) +end, { desc = "[D]bt [A]nalyse model (interactive)" }) diff --git a/nvim/lua/config/keymaps.lua b/nvim/lua/config/keymaps.lua index 9880f21..6dab4bf 100644 --- a/nvim/lua/config/keymaps.lua +++ b/nvim/lua/config/keymaps.lua @@ -17,8 +17,7 @@ end, { desc = "[I]nsert [D]ate" }) -- Shortcut for searching the wiki vim.keymap.set("n", "<leader>ws", function() - local fzf = require("fzf-lua") - fzf.files({ prompt = "wiki  ", cwd = vim.g.wiki_root }) + require("telescope.builtin").find_files({ prompt_title = "Wiki", cwd = vim.g.wiki_root }) end, { desc = "[W]iki [S]earch" }) -- Create a new page in the wiki @@ -33,27 +32,25 @@ vim.keymap.set("n", "<leader>wn", function() end, { desc = "[W]iki [N]ew Page" }) vim.keymap.set("n", "<leader>wi", function() - local fzf = require("fzf-lua") - fzf.files({ - prompt = "Wiki Files", + local actions = require("telescope.actions") + local action_state = require("telescope.actions.state") + require("telescope.builtin").find_files({ + prompt_title = "Wiki Insert Link", cwd = vim.g.wiki_root, - actions = { - -- Override the default selection action - ["default"] = function(selected) - -- Get the selected file (should be just one) - if selected and #selected > 0 then - local full_path = selected[1] - - -- Extract just the filename from the path - local filename = full_path:match("([^/\\]+)$") - - -- Create a wiki link format [[filename]] without the file extension + attach_mappings = function(prompt_bufnr, map) + actions.select_default:replace(function() + local entry = action_state.get_selected_entry() + actions.close(prompt_bufnr) + if entry then + local filename = entry[1]:match("([^/\\]+)$") local link = string.format("[[%s]]", filename:gsub("%.%w+$", "")) - - -- Insert the wiki link at the current cursor position vim.api.nvim_put({ link }, "c", true, true) end - end, - }, + end) + return true + end, }) end, { noremap = true, silent = true, desc = "[W]iki [I]nsert Link" }) + +-- dbt keymaps (loaded from separate file) +require("config.dbt") diff --git a/nvim/lua/plugins/language.lua b/nvim/lua/plugins/language.lua index 946c320..3546b2d 100644 --- a/nvim/lua/plugins/language.lua +++ b/nvim/lua/plugins/language.lua @@ -33,6 +33,7 @@ return { "python", "query", "regex", + "sql", "vim", "yaml", },