Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions src/agent/prompts/default_system_prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,32 @@
- If they're asking about the status of a job, provide the job status but don't suggest checking for tables or schemas to indicate the job progress.

IMPORTANT WORKFLOWS:
1. BROWSING DATA: To help users browse data, use these tools in sequence:
- list_catalogs -> set_catalog -> list_schemas -> set_schema -> list_tables -> get_table_info

1. CATALOGS: To work with catalogs:
- If user asks "what catalogs do I have?" or wants to see catalogs: use list_catalogs with display=true (shows full table)
- If user asks to "use X catalog" or "switch to X catalog": DIRECTLY use select_catalog with catalog parameter (accepts name, has built-in fuzzy matching). DO NOT call list_catalogs first - select_catalog has built-in fuzzy matching and will find the catalog.
- If you need catalog info for internal processing: use list_catalogs (defaults to no table display)

2. PII and/or Customer data DETECTION: To help with PII and/or customer data scanning:
- For single table: navigate to the right catalog/schema, then use tag_pii_columns
- For bulk scanning: navigate to the right catalog/schema, then use scan_schema_for_pii

3. STITCH INTEGRATION: To set up data pipelines:
- Navigate to the right catalog/schema, then use setup_stitch
3. STITCH INTEGRATION: To set up identity graph or customer 360 with Stitch:
- If the catalog and schema are already selected - have the user select them first. Stitch requires a catalog and schema to be selected.
- If user asks about setting up Stitch: use setup_stitch

4. SCHEMAS: To work with schemas:
- If user asks "what schemas do I have?" or wants to see schemas: use list_schemas with display=true (shows full table)
- If user asks to "use X schema" or "switch to X schema": use select_schema with schema parameter (accepts name, has built-in fuzzy matching). DO NOT call list_schemas first - select_schema has built-in fuzzy matching and will find the schema.
- If you need schema info for internal processing: use list_schemas (defaults to no table display)

5. TABLES: To work with tables:
- If user asks "what tables do I have?" or wants to see tables: use list_tables with display=true (shows full table)
- If you need table info for internal processing: use list_tables (defaults to no table display)

4. SQL WAREHOUSES: To work with SQL warehouses:
6. SQL WAREHOUSES: To work with SQL warehouses:
- If user asks "what warehouses do I have?" or wants to see warehouses: use list_warehouses with display=true (shows full table)
- If user asks to "use X warehouse" or "switch to X warehouse": use select_warehouse with warehouse parameter (accepts ID or name, has built-in fuzzy matching)
- If user asks to "use X warehouse" or "switch to X warehouse": use select_warehouse with warehouse parameter (accepts ID or name, has built-in fuzzy matching). DO NOT call list_warehouses first - select_catalog has built-in fuzzy matching and will find the catalog.
- If you need warehouse info for internal processing: use list_warehouses (defaults to no table display)

Some of the tools you can use require the user to select a catalog and/or schema first. If the user hasn't selected one YOU MUST ask them if they want help selecting a catalog and schema. DO NO OTHER ACTION
Expand Down
41 changes: 31 additions & 10 deletions src/command_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,22 +170,43 @@ def _display_catalogs(data: Dict[str, Any], console: Console) -> None:
console.print(f"[{WARNING}]No catalogs found.[/{WARNING}]")
return

# Define a style map for conditional formatting
def style_name(row):
if row.get("name") == current_catalog:
return f"[{SUCCESS_STYLE}]{row.get('name')}[/{SUCCESS_STYLE}]"
return row.get("name")
# Transform data for display
display_data = []
for catalog in catalogs:
display_data.append(
{
"name": catalog.get("name", ""),
"type": catalog.get("type", ""),
"comment": catalog.get("comment", ""),
"owner": catalog.get("owner", ""),
}
)

# Define styling functions
def name_style(value):
if value == current_catalog:
return "bold green"
return None

def type_style(value):
if value.lower() == "managed":
return "green"
elif value.lower() == "external":
return "blue"
else:
return "yellow"

style_map = {
"name": style_name,
"name": name_style,
"type": type_style,
}

# Display the catalogs table
display_table(
console=console,
data=catalogs,
columns=["name", "type", "comment"],
headers=["Name", "Type", "Comment"],
data=display_data,
columns=["name", "type", "comment", "owner"],
headers=["Name", "Type", "Comment", "Owner"],
title="Available Catalogs",
style_map=style_map,
title_style=TABLE_TITLE_STYLE,
Expand All @@ -195,7 +216,7 @@ def style_name(row):
# Display current catalog if set
if current_catalog:
console.print(
f"\nCurrent catalog: [{SUCCESS_STYLE}]{current_catalog}[/{SUCCESS_STYLE}]"
f"\nCurrent catalog: [bold green]{current_catalog}[/bold green]"
)

@staticmethod
Expand Down
2 changes: 1 addition & 1 deletion src/command_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
by both the user interface and LLM agent tools, reducing code duplication.
"""

from typing import Dict, Any, Callable, List, Optional, Union
from typing import Dict, Any, Callable, List, Optional
from dataclasses import dataclass, field


Expand Down
147 changes: 123 additions & 24 deletions src/commands/catalog_selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,64 +7,163 @@

import logging
from typing import Optional
from difflib import SequenceMatcher

from src.clients.databricks import DatabricksAPIClient
from src.command_registry import CommandDefinition
from src.config import set_active_catalog
from .base import CommandResult


def _similarity_score(name1: str, name2: str) -> float:
"""Calculate similarity score between two strings (0.0 to 1.0)."""
return SequenceMatcher(None, name1.lower().strip(), name2.lower().strip()).ratio()


def _find_best_catalog_match(target_name: str, catalogs: list) -> Optional[dict]:
"""Find the best matching catalog by name using fuzzy matching."""
best_match = None
best_score = 0.0
target_lower = target_name.lower().strip()

for catalog in catalogs:
catalog_name = catalog.get("name", "")
if not catalog_name:
continue

catalog_lower = catalog_name.lower().strip()

# Check for exact match first (case insensitive)
if catalog_lower == target_lower:
return catalog

# Check if target is a substring of catalog name
if target_lower in catalog_lower or catalog_lower.startswith(target_lower):
return catalog

# Calculate similarity score for fuzzy matching
score = _similarity_score(target_name, catalog_name)
if score > best_score and score >= 0.4: # Threshold for fuzzy matching
best_score = score
best_match = catalog

return best_match


def _report_step(message: str, tool_output_callback=None):
"""Report a step in the catalog selection process."""
if tool_output_callback:
tool_output_callback("select-catalog", {"step": message})


def handle_command(client: Optional[DatabricksAPIClient], **kwargs) -> CommandResult:
"""
Set the active catalog.
Set the active catalog by ID or name.

Args:
client: API client instance
**kwargs: catalog_name (str)
**kwargs: catalog (str) - catalog name, tool_output_callback (optional)
"""
catalog_name: str = kwargs.get("catalog_name")
if not catalog_name:
return CommandResult(False, message="catalog_name parameter is required.")
catalog: str = kwargs.get("catalog")
tool_output_callback = kwargs.get("tool_output_callback")

if not catalog:
return CommandResult(
False,
message="catalog parameter is required.",
)

identifier = catalog

if not client:
return CommandResult(
False,
message="No API client available to verify catalog.",
)

try:
catalog_type = "Unknown"
target_catalog = None

# Try to get catalog directly first
try:
from src.catalogs import get_catalog

catalog_info = get_catalog(client, catalog_name)
catalog_type = catalog_info.get("type", "Unknown").lower()
catalog_obj = get_catalog(client, identifier)
if catalog_obj:
target_catalog = catalog_obj
except Exception:
set_active_catalog(catalog_name) # Set anyway if verification fails
return CommandResult(
True,
message=f"Warning: Could not verify catalog '{catalog_name}'. Setting anyway.",
data={"catalog_name": catalog_name, "catalog_type": catalog_type},
# Direct lookup failed - fall back to name matching
pass

# If not found directly, search by name
if not target_catalog:
_report_step(
f"Looking for catalog matching '{identifier}'", tool_output_callback
)

set_active_catalog(catalog_name)
# Get all catalogs
from src.catalogs import list_catalogs

catalogs_result = list_catalogs(client)
catalogs = catalogs_result.get("catalogs", [])
if not catalogs:
return CommandResult(False, message="No catalogs found in workspace.")

# Find best match by name
target_catalog = _find_best_catalog_match(identifier, catalogs)

if not target_catalog:
return CommandResult(
False,
message=f"No catalog found matching '{identifier}'. Available catalogs: {', '.join([c.get('name', 'Unknown') for c in catalogs])}",
)

# Report the selection
selected_name = target_catalog.get("name", "Unknown")
if selected_name.lower().strip() != identifier.lower().strip():
_report_step(f"Selecting '{selected_name}'", tool_output_callback)
else:
_report_step(f"Found catalog '{selected_name}'", tool_output_callback)

# Set the active catalog
catalog_name_to_set = target_catalog.get("name")
catalog_type = target_catalog.get("type", "Unknown")
catalog_owner = target_catalog.get("owner", "Unknown")

set_active_catalog(catalog_name_to_set)

return CommandResult(
True,
message=f"Active catalog is now set to '{catalog_name}' (Type: {catalog_type}).",
data={"catalog_name": catalog_name, "catalog_type": catalog_type},
message=f"Active catalog is now set to '{catalog_name_to_set}' (Type: {catalog_type}, Owner: {catalog_owner}).",
data={
"catalog_name": catalog_name_to_set,
"catalog_type": catalog_type,
"owner": catalog_owner,
"step": f"Catalog set - Name: {catalog_name_to_set}",
},
)

except Exception as e:
logging.error(f"Failed to set catalog '{catalog_name}': {e}", exc_info=True)
logging.error(f"Failed to set catalog: {e}", exc_info=True)
return CommandResult(False, error=e, message=str(e))


DEFINITION = CommandDefinition(
name="set-catalog",
description="Set the active catalog for database operations",
name="select-catalog",
description="Set the active catalog for operations by name with fuzzy matching",
handler=handle_command,
parameters={
"catalog_name": {
"catalog": {
"type": "string",
"description": "Name of the catalog to set as active",
"description": "Catalog name to select",
}
},
required_params=["catalog_name"],
tui_aliases=["/select-catalog"],
required_params=["catalog"],
tui_aliases=["/select-catalog", "/use-catalog"],
needs_api_client=True,
visible_to_user=True,
visible_to_agent=True,
condensed_action="Setting catalog",
agent_display="condensed",
condensed_action="Setting catalog:",
usage_hint="Usage: /select-catalog <catalog_name>",
)
38 changes: 33 additions & 5 deletions src/commands/list_catalogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def handle_command(
Args:
client: DatabricksAPIClient instance for API calls
**kwargs: Command parameters
- display: bool, whether to display the table (default: False)
- include_browse: Whether to include catalogs with selective metadata access (optional)
- max_results: Maximum number of catalogs to return (optional)
- page_token: Opaque pagination token to go to next page (optional)
Expand All @@ -32,6 +33,14 @@ def handle_command(
message="No Databricks client available. Please set up your workspace first.",
)

# Check if display should be shown (default to False for agent calls)
display = kwargs.get("display", False)

# Get current catalog for highlighting
from src.config import get_active_catalog

current_catalog = get_active_catalog()

# Extract parameters
include_browse = kwargs.get("include_browse", False)
max_results = kwargs.get("max_results")
Expand All @@ -50,7 +59,16 @@ def handle_command(
next_page_token = result.get("next_page_token")

if not catalogs:
return CommandResult(True, message="No catalogs found.")
return CommandResult(
True,
message="No catalogs found in this workspace.",
data={
"catalogs": [],
"total_count": 0,
"display": display,
"current_catalog": current_catalog,
},
)

# Format catalog information for display
formatted_catalogs = []
Expand All @@ -72,6 +90,8 @@ def handle_command(
"catalogs": formatted_catalogs,
"total_count": len(formatted_catalogs),
"next_page_token": next_page_token,
"display": display, # Pass through to display logic
"current_catalog": current_catalog,
},
message=f"Found {len(formatted_catalogs)} catalog(s)."
+ (
Expand All @@ -89,9 +109,13 @@ def handle_command(

DEFINITION = CommandDefinition(
name="list-catalogs",
description="List catalogs in Unity Catalog. Only useful for listing catalogs, not schemas, not tables nor anything else.",
description="Lists all catalogs in the current workspace. By default returns data without showing table. Use display=true when user asks to see catalogs.",
handler=handle_command,
parameters={
"display": {
"type": "boolean",
"description": "Whether to display the catalog table to the user (default: false). Set to true when user asks to see catalogs.",
},
"include_browse": {
"type": "boolean",
"description": "Whether to include catalogs with selective metadata access.",
Expand All @@ -107,10 +131,14 @@ def handle_command(
},
},
required_params=[],
tui_aliases=["/catalogs"],
tui_aliases=["/list-catalogs", "/catalogs"],
needs_api_client=True,
visible_to_user=True,
visible_to_agent=True,
agent_display="full", # Show full catalog list to agents
usage_hint="Usage: /list-catalogs [--include_browse true|false] [--max_results <number>] [--page_token <token>]",
agent_display="conditional", # Use conditional display based on display parameter
display_condition=lambda result: result.get(
"display", False
), # Show full table only when display=True
condensed_action="Listing catalogs", # Friendly name for condensed display
usage_hint="Usage: /list-catalogs [--display true|false] [--include_browse true|false] [--max_results <number>] [--page_token <token>]",
)
Loading