From 32f506842a2333b3289e667c8a6152a7106ef9c4 Mon Sep 17 00:00:00 2001 From: Sam Sternberg Date: Sat, 16 Aug 2025 07:35:58 -0400 Subject: [PATCH] Restructure project with SQL organization and Unity Catalog integration - Implement Unity Catalog secrets, connections and column masking - Remove legacy detokenization function and notebook - Add comprehensive cleanup scripts with create, destroy, and recreate actions - Update documentation for Unity Catalog architecture - Organize SQL files into setup/, destroy/, and verify/ directories - Add environment configuration with .env.local.example template --- .env.local.example | 13 + .gitignore | 26 + README.md | 480 +++++++---- config.sh | 38 +- .../customer_insights_dashboard.lvdash.json | 14 +- notebooks/notebook_call_tokenize_table.ipynb | 31 - notebooks/notebook_tokenize_table.ipynb | 124 +-- setup.sh | 763 +++++++++++++++--- sql/create_detokenize_function.sql | 136 ---- sql/destroy/cleanup_catalog.sql | 2 + sql/destroy/drop_functions.sql | 5 + sql/destroy/drop_table.sql | 2 + sql/destroy/remove_column_masks.sql | 8 + sql/setup/apply_column_masks.sql | 4 + sql/setup/create_catalog.sql | 3 + sql/{ => setup}/create_sample_table.sql | 45 +- sql/setup/create_uc_connections.sql | 20 + sql/setup/setup_uc_connections_api.sql | 94 +++ sql/verify/check_functions_exist.sql | 4 + sql/verify/check_table_exists.sql | 3 + sql/verify/verify_functions.sql | 3 + sql/verify/verify_table.sql | 2 + 22 files changed, 1226 insertions(+), 594 deletions(-) create mode 100644 .env.local.example create mode 100644 .gitignore delete mode 100644 notebooks/notebook_call_tokenize_table.ipynb delete mode 100644 sql/create_detokenize_function.sql create mode 100644 sql/destroy/cleanup_catalog.sql create mode 100644 sql/destroy/drop_functions.sql create mode 100644 sql/destroy/drop_table.sql create mode 100644 sql/destroy/remove_column_masks.sql create mode 100644 sql/setup/apply_column_masks.sql create mode 100644 sql/setup/create_catalog.sql rename sql/{ => setup}/create_sample_table.sql (70%) create mode 100644 sql/setup/create_uc_connections.sql create mode 100644 sql/setup/setup_uc_connections_api.sql create mode 100644 sql/verify/check_functions_exist.sql create mode 100644 sql/verify/check_table_exists.sql create mode 100644 sql/verify/verify_functions.sql create mode 100644 sql/verify/verify_table.sql diff --git a/.env.local.example b/.env.local.example new file mode 100644 index 0000000..cb16be3 --- /dev/null +++ b/.env.local.example @@ -0,0 +1,13 @@ +# Databricks Configuration +DATABRICKS_SERVER_HOSTNAME=your-workspace.cloud.databricks.com +DATABRICKS_HTTP_PATH=/sql/1.0/warehouses/your-warehouse-id +DATABRICKS_PAT_TOKEN=dapi123...your-pat-token +DATABRICKS_METASTORE_REGION=us-west-1 + +# Skyflow Configuration +SKYFLOW_ACCOUNT_ID=your-account-id +SKYFLOW_VAULT_URL=https://your-vault.vault.skyflowapis.com +SKYFLOW_VAULT_ID=your-vault-id +SKYFLOW_PAT_TOKEN=eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...your-pat-token +SKYFLOW_TABLE=pii +SKYFLOW_BATCH_SIZE=25 \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5cc2148 --- /dev/null +++ b/.gitignore @@ -0,0 +1,26 @@ +# Environment files with credentials +.env.local + +# OS generated files +.DS_Store +.DS_Store? +._* +.Spotlight-V100 +.Trashes +ehthumbs.db +Thumbs.db + +# IDE files +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# Logs +*.log +logs/ + +# Temporary files +tmp/ +temp/ \ No newline at end of file diff --git a/README.md b/README.md index c2ff947..a3bdd3d 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,10 @@ -# Skyflow for Databricks: PII & Sensitive Data Protection +# Skyflow for Databricks: Unity Catalog PII & Sensitive Data Protection -This solution provides secure data tokenization and detokenization capabilities in Databricks to protect PII and other sensitive data using Skyflow's Data Privacy Vault services. By implementing user-defined functions (UDFs) that integrate with Skyflow's API, organizations can efficiently protect sensitive data through tokenization and retrieve original PII data while maintaining security through role-based access control. +This solution provides secure data tokenization and detokenization capabilities in Databricks Unity Catalog to protect PII and other sensitive data using Skyflow's Data Privacy Vault services. Built with pure SQL UDFs using Unity Catalog HTTP connections for maximum performance and seamless integration with column-level security. ## Table of Contents + +- [Quick Start](#quick-start) - [Key Benefits](#key-benefits) - [Architecture](#architecture) - [Flow Diagrams](#flow-diagrams) @@ -10,16 +12,63 @@ This solution provides secure data tokenization and detokenization capabilities - [Detokenization Flow](#detokenization-flow) - [Features](#features) - [Prerequisites](#prerequisites) -- [Setup Instructions](#setup-instructions) - [Usage Examples](#usage-examples) - [Project Structure](#project-structure) -- [Error Handling](#error-handling) +- [Configuration](#configuration) - [Development Guide](#development-guide) - [Cleanup](#cleanup) - [Dashboard Integration](#dashboard-integration) - [Support](#support) - [License](#license) +## Quick Start + +1. **Clone and Configure**: + + ```bash + git clone + cd databricks-skyflow-integration + cp .env.local.example .env.local + ``` + +2. **Set Environment Variables**: + + Edit `.env.local` with your credentials: + ```bash + # Databricks Configuration + DATABRICKS_SERVER_HOSTNAME=your-workspace.cloud.databricks.com + DATABRICKS_PAT_TOKEN=dapi123...your-pat-token + DATABRICKS_HTTP_PATH=/sql/1.0/warehouses/your-warehouse-id + + # Skyflow Configuration + SKYFLOW_VAULT_URL=https://your-vault.vault.skyflowapis.com + SKYFLOW_PAT_TOKEN=eyJhbGci...your-pat-token + SKYFLOW_ACCOUNT_ID=your-account-id + SKYFLOW_VAULT_ID=your-vault-id + ``` + +3. **Deploy Everything**: + + ```bash + ./setup.sh create demo + ``` + + This creates: + - ✅ Unity Catalog: `demo_catalog` + - ✅ Sample table with 25,000 tokenized records + - ✅ UC connections and secrets + - ✅ Pure SQL detokenization functions + - ✅ Column masks (first_name only) + - ✅ Customer insights dashboard + +4. **Test Access**: + + ```sql + -- Auditor group members see real names + -- Others see tokens + SELECT first_name FROM demo_catalog.default.demo_customer_data; + ``` + ## Demo A demonstration of this solution was featured in the 'From PII to GenAI: Architecting for Data Privacy & Security in 2025' webinar. @@ -28,28 +77,32 @@ A demonstration of this solution was featured in the 'From PII to GenAI: Archite ## Key Benefits -- **Comprehensive Data Protection**: Both tokenization and detokenization capabilities -- **Efficient Processing**: Bulk operations with multi-threaded processing -- **Role-Based Access**: Automatic redaction based on user group membership -- **High Performance**: Processes data in configurable chunks of 25 records -- **Seamless Integration**: Native Databricks UDFs for easy implementation -- **Secure**: Comprehensive error handling and role-based access control +- **🚀 Pure SQL Performance**: Unity Catalog HTTP connections with zero Python overhead +- **🔒 Column-Level Security**: Automatic role-based data masking via Unity Catalog column masks +- **⚡ Serverless Optimized**: Designed for Databricks serverless compute environments +- **🎯 Simplified Architecture**: Single row-by-row processing - no complex batching required +- **🔧 Easy Integration**: Native Unity Catalog functions work with any SQL client (ODBC, JDBC, notebooks) +- **📊 Real-time Access Control**: Instant role-based access via `is_account_group_member()` +- **🛡️ Graceful Error Handling**: Returns tokens on API failures to ensure data availability ## Architecture -The solution consists of several components: +The solution leverages Unity Catalog's native capabilities for maximum performance and security: + +### Implementation Overview -1. **Databricks UDFs**: Python-based user-defined functions that: - - Handle bulk tokenization and detokenization requests - - Implement role-based access control via Databricks SCIM API - - Manage concurrent processing with ThreadPoolExecutor - - Interface with Skyflow's API - - Support multiple redaction levels for detokenization +1. **Unity Catalog HTTP Connections**: Direct API integration with bearer token authentication +2. **Pure SQL UDFs**: Zero Python overhead, native Spark SQL execution +3. **Column Masks**: Automatic role-based data protection at the table level +4. **UC Secrets**: Secure credential storage with `secret()` function references +5. **Account Groups**: Enterprise-grade role management via `is_account_group_member()` -2. **Integration Points**: - - Databricks SCIM API for user group management - - Skyflow API for secure tokenization and detokenization - - Native SQL interface for querying data +### Key Components + +- **Tokenization Connection**: `skyflow_tokenize_conn` → `/v1/vaults/{vault_id}/{table}` +- **Detokenization Connection**: `skyflow_detokenize_conn` → `/v1/vaults/{vault_id}/detokenize` +- **Role-based UDF**: `sam_skyflow_conditional_detokenize()` - only auditors see real data +- **Column Mask UDF**: `sam_skyflow_mask_detokenize()` - applied at table schema level ## Flow Diagrams @@ -57,230 +110,301 @@ The solution consists of several components: ```mermaid sequenceDiagram - participant SQL as SQL Query - participant UDF as Tokenize UDF + participant Setup as Setup Process + participant Notebook as Tokenization Notebook participant SF as Skyflow API - - SQL->>UDF: Call bulk_tokenize function + participant UC as Unity Catalog + + Setup->>Notebook: Run serverless tokenization + Notebook->>UC: Get secrets via dbutils - loop For each batch of 25 records - UDF->>SF: Request tokenization - SF-->>UDF: Return token values + loop For each PII value + Notebook->>SF: POST /v1/vaults/{vault}/table + SF-->>Notebook: Return token + Notebook->>UC: UPDATE table SET column = token end - UDF-->>SQL: Return combined results + Notebook-->>Setup: Tokenization complete ``` ### Detokenization Flow ```mermaid sequenceDiagram - participant SQL as SQL Query + participant Client as SQL Client + participant UC as Unity Catalog participant UDF as Detokenize UDF - participant SCIM as Databricks SCIM API participant SF as Skyflow API - - SQL->>UDF: Call bulk_detokenize function - UDF->>SCIM: Get user group memberships - SCIM-->>UDF: Return user groups - rect rgb(200, 220, 250) - Note over UDF: Determine redaction level - UDF->>UDF: Map groups to redaction style - end + Client->>UC: SELECT first_name FROM customer_data + UC->>UC: Check column mask policy + UC->>UDF: Call detokenization function + UDF->>UDF: Check is_account_group_member('auditor') - loop For each batch of 25 tokens - UDF->>SF: Request detokenization - SF-->>UDF: Return original values + alt User is auditor + UDF->>SF: POST /detokenize via UC connection + SF-->>UDF: Return plain text value + UDF-->>UC: Return detokenized data + else User is not auditor + UDF-->>UC: Return token (no API call) end - UDF-->>SQL: Return combined results + UC-->>Client: Return appropriate data ``` ## Features -- **Data Protection**: - - Tokenization of sensitive data - - Detokenization with role-based access - - Multi-threaded batch processing - - Configurable batch sizes (default: 25 records) - - Concurrent request handling - - Automatic batch management - -- **Security**: - - Role-based access control (RBAC) via Databricks groups - - Multiple redaction levels for detokenization: - - PLAIN_TEXT: Full data access - - MASKED: Partially redacted data - - REDACTED: Fully redacted data - - Automatic user group mapping - - Default to most restrictive access - -- **Flexibility**: - - Support for multiple PII columns - - Custom redaction mapping - - Real-time processing - - Bulk operations for both tokenization and detokenization +### Data Protection + +- **Row-by-row processing**: Simple, reliable tokenization/detokenization +- **Column masks**: Automatic application at table schema level +- **Unity Catalog integration**: Native secrets and connections management +- **Role-based access**: Account group membership determines data visibility + +### Security + +- **Account-level groups**: Enterprise `is_account_group_member()` integration +- **UC-backed secrets**: Secure credential storage via `secret()` function +- **Bearer token authentication**: Automatic token injection via UC connections +- **Column-level security**: Masks applied at metadata level, not query level + +### Performance + +- **Pure SQL execution**: Zero Python UDF overhead +- **Native Spark SQL**: Full catalyst optimizer integration +- **Serverless optimized**: No cluster management required +- **Connection pooling**: UC manages HTTP connection lifecycle + +### Operational + +- **Organized SQL structure**: Clean separation of setup/destroy/verify operations +- **Graceful error handling**: API failures return tokens to maintain data access +- **ODBC/JDBC compatible**: Works with any SQL client ## Prerequisites -1. **Databricks Environment** with: - - Python-wrapped SQL function execution capability - - SCIM API access token - - Configured user groups +1. **Databricks Unity Catalog** with: + - Unity Catalog enabled workspace + - Account-level groups configured + - Serverless or cluster-based compute + - HTTP connections support 2. **Skyflow Account** with: - - Valid API credentials - - Configured vault and schema + - Valid PAT token + - Configured vault and table schema - API access enabled -## Setup Instructions +## Usage Examples -1. **Quick Setup**: - ```bash - ./setup.sh create - ``` - This automatically: - - Creates the tokenization and detokenization functions - - Sets up a sample customer table - - Deploys example notebooks - - Installs a customer insights dashboard - -2. **Manual Setup**: - - Copy and configure settings: - ```bash - cp config.sh.example config.sh - ``` - - Edit config.sh with your: - - Databricks credentials - - Skyflow vault details - - Group mappings +### Basic Detokenization Query -## Usage Examples +```sql +-- Works with any SQL client (ODBC, JDBC, notebooks, SQL editor) +SELECT + customer_id, + first_name, -- Detokenized for auditors, token for others + last_name, -- Plain text (no column mask) + email, -- Plain text (no column mask) + total_purchases +FROM demo_catalog.default.demo_customer_data +LIMIT 10; +``` + +### Column Mask Behavior -### Tokenization ```sql -USE hive_metastore.default; - -WITH grouped_data AS ( - SELECT - 1 AS group_id, - COLLECT_LIST(first_name) AS first_names, - COLLECT_LIST(last_name) AS last_names, - COLLECT_LIST(email) AS emails - FROM raw_customer_data - GROUP BY group_id -) -SELECT - skyflow_bulk_tokenize(first_names) AS tokenized_first_names, - skyflow_bulk_tokenize(last_names) AS tokenized_last_names, - skyflow_bulk_tokenize(emails) AS tokenized_emails -FROM grouped_data; +-- Same query, different results based on role: + +-- Auditor group member sees: +-- customer_id | first_name | last_name | email +-- CUST00001 | Jonathan | Anderson | jonathan.anderson@example.com + +-- Non-auditor sees: +-- customer_id | first_name | last_name | email +-- CUST00001 | 4532-8765-9abc... | Anderson | jonathan.anderson@example.com ``` -### Detokenization +### Direct Function Calls + ```sql -USE hive_metastore.default; - -WITH grouped_data AS ( - SELECT - 1 AS group_id, - COLLECT_LIST(first_name) AS first_names, - COLLECT_LIST(last_name) AS last_names, - COLLECT_LIST(email) AS emails - FROM customer_data - GROUP BY group_id -), -detokenized_batches AS ( - SELECT - skyflow_bulk_detokenize(first_names, current_user()) AS detokenized_first_names, - skyflow_bulk_detokenize(last_names, current_user()) AS detokenized_last_names, - skyflow_bulk_detokenize(emails, current_user()) AS detokenized_emails - FROM grouped_data -) -SELECT * FROM detokenized_batches; +-- Call detokenization function directly +SELECT demo_skyflow_uc_detokenize('4532-8765-9abc-def0') AS detokenized_value; + +-- Conditional detokenization (respects role) +SELECT demo_skyflow_conditional_detokenize('4532-8765-9abc-def0') AS role_based_value; ``` -## Project Structure +### Role Propagation and Demo Testing + +**Important for demos**: After changing user roles or group membership, Databricks may take 1-2 minutes to propagate the changes. If role-based redaction isn't working as expected, check your current group membership: +```sql +-- Check your current user and group membership +SELECT + current_user() AS user, + is_account_group_member('auditor') AS is_auditor, + is_account_group_member('customer_service') AS is_customer_service, + is_account_group_member('marketing') AS is_marketing; + +-- Alternative check using is_member() for workspace groups +SELECT + current_user() AS user, + is_member('auditor') AS is_auditor, + is_member('customer_service') AS is_customer_service, + is_member('marketing') AS is_marketing; ``` + +If you recently changed roles and the detokenization isn't reflecting the new permissions, wait 1-2 minutes and re-run the query. The functions use both `is_account_group_member()` (for account-level groups) and `is_member()` (for workspace-level groups) to maximize compatibility. + +## Project Structure + +```text . -├── config.sh # Configuration settings -├── setup.sh # Deployment script -├── dashboards/ # Pre-built dashboards -├── notebooks/ # Example notebooks -│ ├── notebook_tokenize_table.ipynb # Tokenization examples -│ └── notebook_call_tokenize_table.ipynb # Tokenization usage -├── python/ # Python source code -└── sql/ # SQL definitions +├── README.md # This file +├── .env.local.example # Environment configuration template +├── config.sh # Configuration loader script +├── setup.sh # Main deployment script +├── sql/ # Organized SQL definitions +│ ├── setup/ # Setup-related SQL files +│ │ ├── create_catalog.sql +│ │ ├── create_sample_table.sql +│ │ ├── create_uc_connections.sql +│ │ ├── setup_uc_connections_api.sql +│ │ └── apply_column_masks.sql +│ ├── destroy/ # Cleanup SQL files +│ │ ├── cleanup_catalog.sql +│ │ ├── drop_functions.sql +│ │ ├── drop_table.sql +│ │ └── remove_column_masks.sql +│ └── verify/ # Verification SQL files +│ ├── verify_functions.sql +│ ├── verify_table.sql +│ ├── check_functions_exist.sql +│ └── check_table_exists.sql +├── notebooks/ # Serverless tokenization +│ └── notebook_tokenize_table.ipynb +└── dashboards/ # Pre-built analytics + └── customer_insights_dashboard.lvdash.json ``` -## Error Handling +## Configuration -The UDFs implement comprehensive error handling: +### Environment Variables (.env.local) -- **Input Validation**: - - Data format verification - - Token format verification - - User authentication checks - - Group membership validation - -- **Service Errors**: - - API failures - - Network timeouts - - Authentication issues +```bash +# Databricks Connection +DATABRICKS_HOST=https://your-workspace.cloud.databricks.com +DATABRICKS_TOKEN=dapi123... +WAREHOUSE_ID=abc123... + +# Skyflow Integration +SKYFLOW_VAULT_URL=https://vault.skyflow.com +SKYFLOW_VAULT_ID=abc123... +SKYFLOW_ACCOUNT_ID=acc123... +SKYFLOW_PAT_TOKEN=sky123... +SKYFLOW_TABLE=customer_data + +# Role Mappings (optional) +PLAIN_TEXT_GROUPS=auditor # See real data +MASKED_GROUPS=customer_service # See masked data +REDACTED_GROUPS=marketing # See redacted data +``` -- **Recovery Mechanisms**: - - Default to most restrictive access - - Batch failure isolation - - Detailed error reporting +### Unity Catalog Setup + +The solution creates these UC resources: + +- **Secrets Scope**: `skyflow-secrets` (UC-backed) +- **HTTP Connections**: `skyflow_tokenize_conn`, `skyflow_detokenize_conn` +- **Catalog**: `{prefix}_catalog` with default schema +- **Functions**: Pure SQL UDFs for tokenization/detokenization +- **Column Masks**: Applied to sensitive columns only ## Development Guide -1. **Local Testing**: - ```python - # Test the UDFs locally - python python/test_tokenize.py - python python/test_detokenize.py +### Adding New PII Columns + +1. **Update tokenization**: + + ```bash + # Edit setup.sh line ~726 + local pii_columns="first_name,last_name,email" ``` -2. **Deployment**: +2. **Add column masks**: + + ```sql + -- Edit sql/setup/apply_column_masks.sql + ALTER TABLE ${PREFIX}_customer_data ALTER COLUMN email SET MASK ${PREFIX}_skyflow_mask_detokenize; + ``` + +3. **Redeploy**: + ```bash - # Deploy changes - ./setup.sh recreate + ./setup.sh recreate demo ``` +### Testing Changes + +```bash +# Test individual SQL components +python3 -c " +import os +from setup import execute_sql +execute_sql('sql/verify/verify_functions.sql') +" + +# Full integration test +./setup.sh recreate test +``` + +### Performance Optimization Ideas + +For high-volume scenarios, consider: + +- **Bulk processing**: 25-token API batches (requires complex result mapping) +- **Connection pooling**: Multiple UC connections for load distribution +- **Caching layer**: Frequently-accessed token caching with TTL +- **Async processing**: Queue-based bulk operations + ## Cleanup -Remove all created resources: +Remove all resources: + ```bash -./setup.sh destroy +./setup.sh destroy demo ``` +This removes: + +- Catalog and all objects +- UC connections and secrets +- Functions and column masks +- Notebooks and dashboards + ## Dashboard Integration -The repository includes a pre-built customer insights dashboard that demonstrates the detokenization function in action: +The included dashboard demonstrates real-time role-based data access: ![databricks_dashboard](https://github.com/user-attachments/assets/f81227c5-fbbf-481c-b7dc-516f64ad6114) -![image](https://github.com/user-attachments/assets/e789da4e-e4b7-4c9a-8c94-fbcbdd2937dd) +**Features:** -Features: -- Customer overview with detokenized PII -- Spending analysis -- Language preferences -- Consent metrics -- Acquisition trends +- **Customer Overview**: Shows first_name detokenization based on user role +- **Analytics**: Purchase patterns, loyalty analysis, consent tracking +- **Real-time**: Updates automatically as data changes +- **Role-aware**: Same dashboard, different data visibility per user -Access at: -``` -https:///sql/dashboards/v3/ -``` +**Access**: Dashboard URL provided after setup completion ## Support -For issues and feature requests, please contact your Skyflow representative or visit docs.skyflow.com. +For issues and feature requests: + +- **Skyflow Documentation**: [docs.skyflow.com](https://docs.skyflow.com) +- **Databricks Unity Catalog**: [docs.databricks.com/unity-catalog](https://docs.databricks.com/unity-catalog/) +- **GitHub Issues**: Please use the repository issue tracker ## License -This project is provided as sample code for demonstration purposes. Not recommended for production deployment without further review, testing, and hardening. +This project is provided as sample code for demonstration purposes. Not recommended for production deployment without further review, testing, and hardening. \ No newline at end of file diff --git a/config.sh b/config.sh index 54e8072..d7939af 100644 --- a/config.sh +++ b/config.sh @@ -1,15 +1,32 @@ #!/bin/bash -# Databricks settings -DEFAULT_DATABRICKS_HOST="" -DEFAULT_DATABRICKS_TOKEN="" -DEFAULT_WAREHOUSE_ID="" +# Load .env.local if it exists +if [[ -f "$(dirname "$0")/.env.local" ]]; then + echo "Loading configuration from .env.local..." + export $(grep -v '^#' "$(dirname "$0")/.env.local" | xargs) +fi -# Skyflow settings -DEFAULT_SKYFLOW_VAULT_URL="" -DEFAULT_SKYFLOW_VAULT_ID="" -DEFAULT_SKYFLOW_ACCOUNT_ID="" -DEFAULT_SKYFLOW_BEARER_TOKEN="" +# Map .env.local variables to our config format (no hardcoded defaults) +if [[ -n "$DATABRICKS_SERVER_HOSTNAME" ]]; then + DEFAULT_DATABRICKS_HOST="https://$DATABRICKS_SERVER_HOSTNAME" +else + DEFAULT_DATABRICKS_HOST="" +fi + +DEFAULT_DATABRICKS_TOKEN="$DATABRICKS_PAT_TOKEN" +# Extract warehouse ID from HTTP path (format: /sql/1.0/warehouses/warehouse-id) +if [[ -n "$DATABRICKS_HTTP_PATH" ]]; then + DEFAULT_WAREHOUSE_ID=$(echo "$DATABRICKS_HTTP_PATH" | sed 's/.*warehouses\///') +else + DEFAULT_WAREHOUSE_ID="" +fi + +# Skyflow settings from .env.local (no hardcoded defaults) +DEFAULT_SKYFLOW_VAULT_URL="$SKYFLOW_VAULT_URL" +DEFAULT_SKYFLOW_VAULT_ID="$SKYFLOW_VAULT_ID" +DEFAULT_SKYFLOW_ACCOUNT_ID="$SKYFLOW_ACCOUNT_ID" +DEFAULT_SKYFLOW_PAT_TOKEN="$SKYFLOW_PAT_TOKEN" +DEFAULT_SKYFLOW_TABLE="$SKYFLOW_TABLE" # Group mappings for detokenization DEFAULT_PLAIN_TEXT_GROUPS="auditor" @@ -24,7 +41,8 @@ export WAREHOUSE_ID=${WAREHOUSE_ID:-$DEFAULT_WAREHOUSE_ID} export SKYFLOW_VAULT_URL=${SKYFLOW_VAULT_URL:-$DEFAULT_SKYFLOW_VAULT_URL} export SKYFLOW_VAULT_ID=${SKYFLOW_VAULT_ID:-$DEFAULT_SKYFLOW_VAULT_ID} export SKYFLOW_ACCOUNT_ID=${SKYFLOW_ACCOUNT_ID:-$DEFAULT_SKYFLOW_ACCOUNT_ID} -export SKYFLOW_BEARER_TOKEN=${SKYFLOW_BEARER_TOKEN:-$DEFAULT_SKYFLOW_BEARER_TOKEN} +export SKYFLOW_PAT_TOKEN=${SKYFLOW_PAT_TOKEN:-$DEFAULT_SKYFLOW_PAT_TOKEN} +export SKYFLOW_TABLE=${SKYFLOW_TABLE:-$DEFAULT_SKYFLOW_TABLE} export PLAIN_TEXT_GROUPS=${PLAIN_TEXT_GROUPS:-$DEFAULT_PLAIN_TEXT_GROUPS} export MASKED_GROUPS=${MASKED_GROUPS:-$DEFAULT_MASKED_GROUPS} diff --git a/dashboards/customer_insights_dashboard.lvdash.json b/dashboards/customer_insights_dashboard.lvdash.json index d2f6d99..584562e 100644 --- a/dashboards/customer_insights_dashboard.lvdash.json +++ b/dashboards/customer_insights_dashboard.lvdash.json @@ -1,9 +1,9 @@ { "datasets": [ { - "name": "_customer_dataset", + "name": "${PREFIX}_customer_dataset", "displayName": "Customer Data", - "query": "USE hive_metastore.default;\n\nWITH grouped_data AS (\n SELECT\n 1 AS group_id,\n COLLECT_LIST(first_name) AS first_names,\n COLLECT_LIST(last_name) AS last_names,\n COLLECT_LIST(email) AS emails,\n COLLECT_LIST(phone_number) AS phones,\n COLLECT_LIST(address) AS addresses,\n COLLECT_LIST(date_of_birth) AS dobs\n FROM _customer_data\n GROUP BY group_id\n),\ndetokenized_batches AS (\n SELECT\n _skyflow_bulk_detokenize(first_names, current_user()) AS detokenized_first_names,\n _skyflow_bulk_detokenize(last_names, current_user()) AS detokenized_last_names,\n _skyflow_bulk_detokenize(emails, current_user()) AS detokenized_emails,\n _skyflow_bulk_detokenize(phones, current_user()) AS detokenized_phones,\n _skyflow_bulk_detokenize(addresses, current_user()) AS detokenized_addresses,\n _skyflow_bulk_detokenize(dobs, current_user()) AS detokenized_dobs\n FROM grouped_data\n),\nexploded_data AS (\n SELECT\n pos AS idx,\n detokenized_first_names[pos] AS first_name,\n detokenized_last_names[pos] AS last_name,\n detokenized_emails[pos] AS email,\n detokenized_phones[pos] AS phone_number,\n detokenized_addresses[pos] AS address,\n detokenized_dobs[pos] AS date_of_birth\n FROM detokenized_batches\n LATERAL VIEW POSEXPLODE(detokenized_first_names) AS pos, val\n)\nSELECT\n c.customer_id,\n e.first_name,\n e.last_name,\n e.email,\n e.phone_number,\n e.address,\n e.date_of_birth,\n c.signup_date,\n c.last_login,\n c.total_purchases,\n c.total_spent,\n c.loyalty_status,\n c.preferred_language,\n c.consent_marketing,\n c.consent_data_sharing\nFROM _customer_data c\nJOIN exploded_data e ON e.idx = CAST(REGEXP_EXTRACT(c.customer_id, '(\\\\d+)', 0) AS INT) - 1;" + "query": "SELECT\n customer_id,\n first_name,\n last_name,\n email,\n phone_number,\n address,\n date_of_birth,\n signup_date,\n last_login,\n total_purchases,\n total_spent,\n loyalty_status,\n preferred_language,\n consent_marketing,\n consent_data_sharing\nFROM ${PREFIX}_catalog.default.${PREFIX}_customer_data;" } ], "pages": [ @@ -30,7 +30,7 @@ { "name": "main_query", "query": { - "datasetName": "_customer_dataset", + "datasetName": "${PREFIX}_customer_dataset", "fields": [ { "name": "customer_id", @@ -164,7 +164,7 @@ { "name": "main_query", "query": { - "datasetName": "_customer_dataset", + "datasetName": "${PREFIX}_customer_dataset", "fields": [ { "name": "loyalty_status", @@ -221,7 +221,7 @@ { "name": "main_query", "query": { - "datasetName": "_customer_dataset", + "datasetName": "${PREFIX}_customer_dataset", "fields": [ { "name": "avg(total_purchases)", @@ -278,7 +278,7 @@ { "name": "main_query", "query": { - "datasetName": "_customer_dataset", + "datasetName": "${PREFIX}_customer_dataset", "fields": [ { "name": "avg(total_purchases)", @@ -335,7 +335,7 @@ { "name": "main_query", "query": { - "datasetName": "_customer_dataset", + "datasetName": "${PREFIX}_customer_dataset", "fields": [ { "name": "avg(total_purchases)", diff --git a/notebooks/notebook_call_tokenize_table.ipynb b/notebooks/notebook_call_tokenize_table.ipynb deleted file mode 100644 index 19ff9aa..0000000 --- a/notebooks/notebook_call_tokenize_table.ipynb +++ /dev/null @@ -1,31 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Call the tokenize_table notebook with PII fields\n", - "result = dbutils.notebook.run(\n", - " \"/Workspace/Shared/_tokenize_table\", # Notebook path\n", - " timeout_seconds=600, # Set timeout for execution\n", - " arguments={\n", - " \"table_name\": \"_customer_data\",\n", - " \"pii_columns\": \"first_name,last_name,email,phone_number,address,date_of_birth\"\n", - " }\n", - ")\n", - "\n", - "# Print the result returned from the notebook\n", - "print(result)" - ] - } - ], - "metadata": { - "language_info": { - "name": "python" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/notebooks/notebook_tokenize_table.ipynb b/notebooks/notebook_tokenize_table.ipynb index 22cc375..62ba30a 100644 --- a/notebooks/notebook_tokenize_table.ipynb +++ b/notebooks/notebook_tokenize_table.ipynb @@ -1,107 +1,19 @@ { - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Import necessary libraries\n", - "import requests\n", - "from pyspark.sql import SparkSession\n", - "from pyspark.dbutils import DBUtils\n", - "\n", - "# Initialize Spark session and Databricks utilities\n", - "spark = SparkSession.builder.appName(\"SkyflowTokenization\").getOrCreate()\n", - "dbutils = DBUtils(spark)\n", - "\n", - "# Define widgets to receive input parameters\n", - "dbutils.widgets.text(\"table_name\", \"\")\n", - "dbutils.widgets.text(\"pii_columns\", \"\")\n", - "\n", - "# Read widget values\n", - "table_name = dbutils.widgets.get(\"table_name\")\n", - "pii_columns = dbutils.widgets.get(\"pii_columns\").split(\",\")\n", - "\n", - "if not table_name or not pii_columns:\n", - " raise ValueError(\"Both 'table_name' and 'pii_columns' must be provided.\")\n", - "\n", - "# Skyflow API details\n", - "SKYFLOW_API_URL = \"/v1/vaults//pii\"\n", - "SKYFLOW_ACCOUNT_ID = \"\"\n", - "SKYFLOW_BEARER_TOKEN = \"\"\n", - "\n", - "def tokenize_batch(values):\n", - " \"\"\"\n", - " Function to tokenize a batch of PII values via Skyflow API.\n", - " All values are already strings from the table schema.\n", - " \"\"\"\n", - " headers = {\n", - " \"Content-Type\": \"application/json\",\n", - " \"Accept\": \"application/json\",\n", - " \"X-SKYFLOW-ACCOUNT-ID\": SKYFLOW_ACCOUNT_ID,\n", - " \"Authorization\": f\"Bearer {SKYFLOW_BEARER_TOKEN}\"\n", - " }\n", - "\n", - " # Format records exactly like the successful example\n", - " records = [{\n", - " \"fields\": {\n", - " \"pii\": value\n", - " }\n", - " } for value in values if value is not None]\n", - "\n", - " payload = {\n", - " \"records\": records,\n", - " \"tokenization\": True\n", - " }\n", - "\n", - " try:\n", - " response = requests.post(SKYFLOW_API_URL, headers=headers, json=payload)\n", - " response.raise_for_status()\n", - " return [record[\"tokens\"][\"pii\"] for record in response.json()[\"records\"]]\n", - " except requests.exceptions.RequestException as e:\n", - " print(f\"Error tokenizing batch: {e}\")\n", - " if hasattr(e.response, 'text'):\n", - " print(f\"Response content: {e.response.text}\")\n", - " return [\"ERROR\" for _ in values]\n", - "\n", - "for column in pii_columns:\n", - " # Read distinct non-null PII values\n", - " query = f\"SELECT DISTINCT `{column}` FROM `{table_name}` WHERE `{column}` IS NOT NULL\"\n", - " df = spark.sql(query)\n", - " values = [row[column] for row in df.collect()]\n", - "\n", - " if not values:\n", - " print(f\"No PII values found for column: {column}\")\n", - " continue\n", - "\n", - " # Tokenize data in batches\n", - " batch_size = 25\n", - " tokenized_values = []\n", - " for i in range(0, len(values), batch_size):\n", - " batch = values[i:i + batch_size]\n", - " tokenized_values.extend(tokenize_batch(batch))\n", - "\n", - " # Generate and execute update statements\n", - " update_statements = [\n", - " f\"UPDATE `{table_name}` SET `{column}` = '{token}' WHERE `{column}` = '{value}'\"\n", - " for value, token in zip(values, tokenized_values)\n", - " ]\n", - "\n", - " for stmt in update_statements:\n", - " spark.sql(stmt)\n", - "\n", - " print(f\"Successfully tokenized column: {column}\")\n", - "\n", - "dbutils.notebook.exit(f\"Tokenization completed for table `{table_name}` with columns {', '.join(pii_columns)}.\")" - ] - } - ], - "metadata": { - "language_info": { - "name": "python" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "cell-0", + "metadata": {}, + "outputs": [], + "source": "# Unity Catalog-aware serverless tokenization notebook \n# Optimized version with chunked processing and MERGE operations for maximum performance\nimport requests\nimport os\nfrom pyspark.sql import SparkSession\nfrom pyspark.dbutils import DBUtils\n\n# Initialize Spark session optimized for serverless compute\nspark = SparkSession.builder \\\n .appName(\"SkyflowTokenization\") \\\n .config(\"spark.databricks.cluster.profile\", \"serverless\") \\\n .config(\"spark.databricks.delta.autoCompact.enabled\", \"true\") \\\n .config(\"spark.sql.adaptive.enabled\", \"true\") \\\n .config(\"spark.sql.adaptive.coalescePartitions.enabled\", \"true\") \\\n .getOrCreate()\n \ndbutils = DBUtils(spark)\n\nprint(f\"✓ Running on Databricks serverless compute\")\nprint(f\"✓ Spark version: {spark.version}\")\n\n# Performance configuration\nMAX_MERGE_BATCH_SIZE = 10000 # Maximum records per MERGE operation\nCOLLECT_BATCH_SIZE = 1000 # Maximum records to collect() from Spark at once\n\n# Define widgets to receive input parameters\ndbutils.widgets.text(\"table_name\", \"\")\ndbutils.widgets.text(\"pii_columns\", \"\")\ndbutils.widgets.text(\"batch_size\", \"25\") # Skyflow API batch size\n\n# Read widget values\ntable_name = dbutils.widgets.get(\"table_name\")\npii_columns = dbutils.widgets.get(\"pii_columns\").split(\",\")\nSKYFLOW_BATCH_SIZE = int(dbutils.widgets.get(\"batch_size\"))\n\nif not table_name or not pii_columns:\n raise ValueError(\"Both 'table_name' and 'pii_columns' must be provided.\")\n\nprint(f\"Tokenizing table: {table_name}\")\nprint(f\"PII columns: {', '.join(pii_columns)}\")\nprint(f\"Skyflow API batch size: {SKYFLOW_BATCH_SIZE}\")\nprint(f\"MERGE batch size limit: {MAX_MERGE_BATCH_SIZE:,} records\")\nprint(f\"Collect batch size: {COLLECT_BATCH_SIZE:,} records\")\n\n# Extract catalog and schema from table name if fully qualified\nif '.' in table_name:\n parts = table_name.split('.')\n if len(parts) == 3: # catalog.schema.table\n catalog_name = parts[0]\n schema_name = parts[1]\n table_name_only = parts[2]\n \n # Set the catalog and schema context for this session\n print(f\"Setting catalog context to: {catalog_name}\")\n spark.sql(f\"USE CATALOG {catalog_name}\")\n spark.sql(f\"USE SCHEMA {schema_name}\")\n \n # Use the simple table name for queries since context is set\n table_name = table_name_only\n print(f\"✓ Catalog context set, using table name: {table_name}\")\n\n# Get Skyflow credentials from UC secrets (serverless-compatible)\ntry:\n SKYFLOW_VAULT_URL = dbutils.secrets.get(scope=\"skyflow-secrets\", key=\"skyflow_vault_url\")\n SKYFLOW_VAULT_ID = dbutils.secrets.get(scope=\"skyflow-secrets\", key=\"skyflow_vault_id\")\n SKYFLOW_ACCOUNT_ID = dbutils.secrets.get(scope=\"skyflow-secrets\", key=\"skyflow_account_id\")\n SKYFLOW_PAT_TOKEN = dbutils.secrets.get(scope=\"skyflow-secrets\", key=\"skyflow_pat_token\")\n SKYFLOW_TABLE = dbutils.secrets.get(scope=\"skyflow-secrets\", key=\"skyflow_table\")\n \n print(\"✓ Successfully retrieved credentials from UC secrets\")\nexcept Exception as e:\n print(f\"Error retrieving UC secrets: {e}\")\n raise ValueError(\"Could not retrieve Skyflow credentials from UC secrets\")\n\n# Build API URL\nSKYFLOW_API_URL = f\"{SKYFLOW_VAULT_URL}/v1/vaults/{SKYFLOW_VAULT_ID}/{SKYFLOW_TABLE}\"\n\ndef tokenize_column_values(column_name, values):\n \"\"\"\n Tokenize a list of PII values for a specific column via Skyflow API.\n Simplified - no deduplication, direct 1:1 mapping.\n Returns list of tokens in same order as input values.\n \"\"\"\n if not values:\n return []\n \n headers = {\n \"Content-Type\": \"application/json\",\n \"Accept\": \"application/json\", \n \"X-SKYFLOW-ACCOUNT-ID\": SKYFLOW_ACCOUNT_ID,\n \"Authorization\": f\"Bearer {SKYFLOW_PAT_TOKEN}\"\n }\n\n # Create records for each value (no deduplication)\n skyflow_records = [{\n \"fields\": {\"pii_values\": str(value)}\n } for value in values if value is not None]\n\n payload = {\n \"records\": skyflow_records,\n \"tokenization\": True\n }\n\n try:\n print(f\" Tokenizing {len(skyflow_records)} values for {column_name}\")\n response = requests.post(SKYFLOW_API_URL, headers=headers, json=payload, timeout=30)\n response.raise_for_status()\n result = response.json()\n \n # Extract tokens in order (1:1 mapping)\n tokens = []\n for i, record in enumerate(result.get(\"records\", [])):\n if \"tokens\" in record and \"pii_values\" in record[\"tokens\"]:\n token = record[\"tokens\"][\"pii_values\"]\n tokens.append(token)\n else:\n print(f\" Value {i+1}: failed to tokenize, keeping original\")\n tokens.append(values[i] if i < len(values) and values[i] is not None else None)\n \n return tokens\n \n except requests.exceptions.RequestException as e:\n print(f\" ❌ ERROR tokenizing {column_name}: {e}\")\n \n # Show detailed API error response for troubleshooting\n if hasattr(e, 'response') and e.response:\n try:\n error_details = e.response.json()\n print(f\" API Error Details: {error_details}\")\n except:\n print(f\" API Error Response: {e.response.text}\")\n print(f\" Status Code: {e.response.status_code}\")\n print(f\" Headers: {dict(e.response.headers)}\")\n \n # Return original values on error\n return [str(val) if val is not None else None for val in values]\n except Exception as e:\n print(f\" ❌ UNEXPECTED ERROR tokenizing {column_name}: {e}\")\n return [str(val) if val is not None else None for val in values]\n\ndef perform_chunked_merge(table_name, column, update_data):\n \"\"\"\n Perform MERGE operations in chunks to avoid memory/timeout issues.\n Returns total number of rows updated.\n \"\"\"\n if not update_data:\n return 0\n \n total_updated = 0\n chunk_size = MAX_MERGE_BATCH_SIZE\n total_chunks = (len(update_data) + chunk_size - 1) // chunk_size\n \n print(f\" Splitting {len(update_data):,} updates into {total_chunks} MERGE operations (max {chunk_size:,} per chunk)\")\n \n for chunk_idx in range(0, len(update_data), chunk_size):\n chunk_data = update_data[chunk_idx:chunk_idx + chunk_size]\n chunk_num = (chunk_idx // chunk_size) + 1\n \n try:\n # Create temporary view for this chunk\n temp_df = spark.createDataFrame(chunk_data, [\"customer_id\", f\"new_{column}\"])\n temp_view_name = f\"temp_{column}_chunk_{chunk_num}_{hash(column) % 1000}\"\n temp_df.createOrReplaceTempView(temp_view_name)\n \n # Perform MERGE operation for this chunk\n merge_sql = f\"\"\"\n MERGE INTO `{table_name}` AS target\n USING {temp_view_name} AS source\n ON target.customer_id = source.customer_id\n WHEN MATCHED THEN \n UPDATE SET `{column}` = source.new_{column}\n \"\"\"\n \n spark.sql(merge_sql)\n chunk_updated = len(chunk_data)\n total_updated += chunk_updated\n \n print(f\" Chunk {chunk_num}/{total_chunks}: Updated {chunk_updated:,} rows\")\n \n # Clean up temp view\n spark.catalog.dropTempView(temp_view_name)\n \n except Exception as e:\n print(f\" Error in chunk {chunk_num}: {e}\")\n print(f\" Falling back to row-by-row for this chunk...\")\n \n # Fallback to row-by-row for this chunk only\n chunk_fallback_count = 0\n for customer_id, token in chunk_data:\n try:\n spark.sql(f\"\"\"\n UPDATE `{table_name}` \n SET `{column}` = '{token}' \n WHERE customer_id = '{customer_id}'\n \"\"\")\n chunk_fallback_count += 1\n except Exception as row_e:\n print(f\" Error updating customer_id {customer_id}: {row_e}\")\n \n total_updated += chunk_fallback_count\n print(f\" Chunk {chunk_num} fallback: Updated {chunk_fallback_count} rows\")\n \n return total_updated\n\n# Process each column individually (streaming approach)\nprint(\"Starting column-by-column tokenization with streaming chunked processing...\")\n\nfor column in pii_columns:\n print(f\"\\nProcessing column: {column}\")\n \n # Get total count first for progress tracking\n total_count = spark.sql(f\"\"\"\n SELECT COUNT(*) as count \n FROM `{table_name}` \n WHERE `{column}` IS NOT NULL\n \"\"\").collect()[0]['count']\n \n if total_count == 0:\n print(f\" No data found in column {column}\")\n continue\n \n print(f\" Found {total_count:,} total values to tokenize\")\n \n # Process in streaming chunks to avoid memory issues\n all_update_data = [] # Collect all updates before final MERGE\n processed_count = 0\n \n for offset in range(0, total_count, COLLECT_BATCH_SIZE):\n chunk_size = min(COLLECT_BATCH_SIZE, total_count - offset)\n print(f\" Processing chunk {offset//COLLECT_BATCH_SIZE + 1} ({chunk_size:,} records, offset {offset:,})...\")\n \n # Get chunk of data from Spark\n chunk_df = spark.sql(f\"\"\"\n SELECT customer_id, `{column}` \n FROM `{table_name}` \n WHERE `{column}` IS NOT NULL \n ORDER BY customer_id\n LIMIT {chunk_size} OFFSET {offset}\n \"\"\")\n \n chunk_rows = chunk_df.collect()\n if not chunk_rows:\n continue\n \n # Extract customer IDs and values for this chunk\n chunk_customer_ids = [row['customer_id'] for row in chunk_rows]\n chunk_column_values = [row[column] for row in chunk_rows]\n \n # Tokenize this chunk's values in Skyflow API batches\n chunk_tokens = []\n if len(chunk_column_values) <= SKYFLOW_BATCH_SIZE: # Single API batch\n chunk_tokens = tokenize_column_values(f\"{column}_chunk_{offset//COLLECT_BATCH_SIZE + 1}\", chunk_column_values)\n else: # Multiple API batches within this chunk\n for i in range(0, len(chunk_column_values), SKYFLOW_BATCH_SIZE):\n api_batch_values = chunk_column_values[i:i + SKYFLOW_BATCH_SIZE]\n api_batch_tokens = tokenize_column_values(f\"{column}_chunk_{offset//COLLECT_BATCH_SIZE + 1}_api_{i//SKYFLOW_BATCH_SIZE + 1}\", api_batch_values)\n chunk_tokens.extend(api_batch_tokens)\n \n if len(chunk_tokens) != len(chunk_customer_ids):\n print(f\" Warning: Token count ({len(chunk_tokens):,}) doesn't match chunk row count ({len(chunk_customer_ids):,})\")\n continue\n \n # Collect update data for rows that changed in this chunk\n chunk_original_map = {chunk_customer_ids[i]: chunk_column_values[i] for i in range(len(chunk_customer_ids))}\n \n for i, (customer_id, token) in enumerate(zip(chunk_customer_ids, chunk_tokens)):\n if token and str(token) != str(chunk_original_map[customer_id]):\n all_update_data.append((customer_id, token))\n \n processed_count += len(chunk_rows)\n print(f\" Processed {processed_count:,}/{total_count:,} records ({(processed_count/total_count)*100:.1f}%)\")\n \n # Perform final chunked MERGE operations for all collected updates\n if all_update_data:\n print(f\" Performing final chunked MERGE of {len(all_update_data):,} changed rows...\")\n total_updated = perform_chunked_merge(table_name, column, all_update_data)\n print(f\" ✓ Successfully updated {total_updated:,} rows in column {column}\")\n else:\n print(f\" No updates needed - all tokens match original values\")\n\nprint(\"\\nOptimized streaming tokenization completed!\")\n\n# Verify results\nprint(\"\\nFinal verification:\")\nfor column in pii_columns:\n sample_df = spark.sql(f\"\"\"\n SELECT `{column}`, COUNT(*) as count \n FROM `{table_name}` \n GROUP BY `{column}` \n LIMIT 3\n \"\"\")\n print(f\"\\nSample values in {column}:\")\n sample_df.show(truncate=False)\n\ntotal_rows = spark.sql(f\"SELECT COUNT(*) as count FROM `{table_name}`\").collect()[0][\"count\"]\nprint(f\"\\nTable size: {total_rows:,} total rows\")\n\ndbutils.notebook.exit(f\"Optimized streaming tokenization completed for {len(pii_columns)} columns\")" + } + ], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} \ No newline at end of file diff --git a/setup.sh b/setup.sh index 9f7e22d..ae69012 100755 --- a/setup.sh +++ b/setup.sh @@ -21,97 +21,207 @@ if [[ -n "$2" ]]; then echo "Using prefix: $PREFIX" fi -# Function to prompt for configuration values -prompt_for_config() { - # Source config.sh to get default values +# Function to load configuration values automatically from .env.local +load_config() { + # Source config.sh to get default values and load from .env.local source "$(dirname "$0")/config.sh" echo - echo "Enter values for configuration (press Enter to use default values):" - # Databricks settings - echo -e "\nDatabricks Configuration:" - if [[ -z "$DEFAULT_DATABRICKS_HOST" ]]; then - echo "Format example: https://dbc-xxxxxxxx-xxxx.cloud.databricks.com" + # Set variables from .env.local or defaults, and report what was loaded + export DATABRICKS_HOST=${DEFAULT_DATABRICKS_HOST} + if [[ -n "$DATABRICKS_HOST" ]]; then + echo "✓ DATABRICKS_HOST loaded from .env.local" fi - read -p "Databricks Host URL [${DEFAULT_DATABRICKS_HOST}]: " input - export DATABRICKS_HOST=${input:-$DEFAULT_DATABRICKS_HOST} - if [[ -z "$DEFAULT_DATABRICKS_TOKEN" ]]; then - echo "Format example: dapixxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + export DATABRICKS_TOKEN=${DEFAULT_DATABRICKS_TOKEN} + if [[ -n "$DATABRICKS_TOKEN" ]]; then + echo "✓ DATABRICKS_TOKEN loaded from .env.local" fi - read -p "Databricks Access Token [${DEFAULT_DATABRICKS_TOKEN}]: " input - export DATABRICKS_TOKEN=${input:-$DEFAULT_DATABRICKS_TOKEN} - if [[ -z "$DEFAULT_WAREHOUSE_ID" ]]; then - echo "Format example: xxxxxxxxxxxxxxxx" + export WAREHOUSE_ID=${DEFAULT_WAREHOUSE_ID} + if [[ -n "$WAREHOUSE_ID" ]]; then + echo "✓ WAREHOUSE_ID loaded from .env.local" fi - read -p "SQL Warehouse ID [${DEFAULT_WAREHOUSE_ID}]: " input - export WAREHOUSE_ID=${input:-$DEFAULT_WAREHOUSE_ID} - # Skyflow settings - echo -e "\nSkyflow Configuration:" - if [[ -z "$DEFAULT_SKYFLOW_VAULT_URL" ]]; then - echo "Format example: https://xxxxxxxxxxxx.vault.skyflowapis.com" + export SKYFLOW_VAULT_URL=${DEFAULT_SKYFLOW_VAULT_URL} + if [[ -n "$SKYFLOW_VAULT_URL" ]]; then + echo "✓ SKYFLOW_VAULT_URL loaded from .env.local" fi - read -p "Skyflow Vault URL [${DEFAULT_SKYFLOW_VAULT_URL}]: " input - export SKYFLOW_VAULT_URL=${input:-$DEFAULT_SKYFLOW_VAULT_URL} - if [[ -z "$DEFAULT_SKYFLOW_VAULT_ID" ]]; then - echo "Format example: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + export SKYFLOW_VAULT_ID=${DEFAULT_SKYFLOW_VAULT_ID} + if [[ -n "$SKYFLOW_VAULT_ID" ]]; then + echo "✓ SKYFLOW_VAULT_ID loaded from .env.local" fi - read -p "Skyflow Vault ID [${DEFAULT_SKYFLOW_VAULT_ID}]: " input - export SKYFLOW_VAULT_ID=${input:-$DEFAULT_SKYFLOW_VAULT_ID} - if [[ -z "$DEFAULT_SKYFLOW_ACCOUNT_ID" ]]; then - echo "Format example: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + export SKYFLOW_ACCOUNT_ID=${DEFAULT_SKYFLOW_ACCOUNT_ID} + if [[ -n "$SKYFLOW_ACCOUNT_ID" ]]; then + echo "✓ SKYFLOW_ACCOUNT_ID loaded from .env.local" fi - read -p "Skyflow Account ID [${DEFAULT_SKYFLOW_ACCOUNT_ID}]: " input - export SKYFLOW_ACCOUNT_ID=${input:-$DEFAULT_SKYFLOW_ACCOUNT_ID} - if [[ -z "$DEFAULT_SKYFLOW_BEARER_TOKEN" ]]; then - echo "Format example: sky-xxxxx-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + export SKYFLOW_PAT_TOKEN=${DEFAULT_SKYFLOW_PAT_TOKEN} + if [[ -n "$SKYFLOW_PAT_TOKEN" ]]; then + echo "✓ SKYFLOW_PAT_TOKEN loaded from .env.local" fi - read -p "Skyflow Bearer Token [${DEFAULT_SKYFLOW_BEARER_TOKEN}]: " input - export SKYFLOW_BEARER_TOKEN=${input:-$DEFAULT_SKYFLOW_BEARER_TOKEN} - # Group mappings - echo -e "\nGroup Mappings Configuration:" - read -p "PLAIN_TEXT Groups (comma-separated) [${DEFAULT_PLAIN_TEXT_GROUPS}]: " input - export PLAIN_TEXT_GROUPS=${input:-$DEFAULT_PLAIN_TEXT_GROUPS} - - read -p "MASKED Groups (comma-separated) [${DEFAULT_MASKED_GROUPS}]: " input - export MASKED_GROUPS=${input:-$DEFAULT_MASKED_GROUPS} + export SKYFLOW_TABLE=${DEFAULT_SKYFLOW_TABLE} + if [[ -n "$SKYFLOW_TABLE" ]]; then + echo "✓ SKYFLOW_TABLE loaded from .env.local" + fi - read -p "REDACTED Groups (comma-separated) [${DEFAULT_REDACTED_GROUPS}]: " input - export REDACTED_GROUPS=${input:-$DEFAULT_REDACTED_GROUPS} + # Group mappings with defaults + export PLAIN_TEXT_GROUPS=${DEFAULT_PLAIN_TEXT_GROUPS:-"auditor"} + export MASKED_GROUPS=${DEFAULT_MASKED_GROUPS:-"customer_service"} + export REDACTED_GROUPS=${DEFAULT_REDACTED_GROUPS:-"marketing"} + echo "✓ Group mappings set: PLAIN_TEXT=${PLAIN_TEXT_GROUPS}, MASKED=${MASKED_GROUPS}, REDACTED=${REDACTED_GROUPS}" - echo -e "\nConfiguration values set." - - # Source config.sh again to apply any other dependent variables - source "$(dirname "$0")/config.sh" + echo -e "\nConfiguration loaded successfully." } -# Function to replace placeholders in content -replace_placeholders() { +# Function to substitute environment variables in content +substitute_variables() { local content=$1 - echo "$content" | sed \ - -e "s||${SKYFLOW_ACCOUNT_ID}|g" \ - -e "s||${SKYFLOW_VAULT_URL}|g" \ - -e "s||${SKYFLOW_VAULT_ID}|g" \ - -e "s||${SKYFLOW_BEARER_TOKEN}|g" \ - -e "s||${DATABRICKS_HOST}|g" \ - -e "s||${DATABRICKS_TOKEN}|g" \ - -e "s||${PLAIN_TEXT_GROUPS}|g" \ - -e "s||${PLAIN_TEXT_GROUPS}|g" \ - -e "s||${PLAIN_TEXT_GROUPS}|g" \ - -e "s||${MASKED_GROUPS}|g" \ - -e "s||${MASKED_GROUPS}|g" \ - -e "s||${MASKED_GROUPS}|g" \ - -e "s||${REDACTED_GROUPS}|g" \ - -e "s||${REDACTED_GROUPS}|g" \ - -e "s||${REDACTED_GROUPS}|g" \ - -e "s||${PREFIX}|g" + echo "$content" | envsubst +} + +# Function to setup Unity Catalog connections via SQL file +setup_uc_connections() { + echo "Creating Unity Catalog connections via SQL..." + + # Read SQL file and process each connection separately (direct API call to avoid catalog context) + local sql_content=$(cat "sql/setup/create_uc_connections.sql") + local processed_content=$(substitute_variables "$sql_content") + + # Split into tokenization and detokenization connection statements + local tokenize_sql=$(echo "$processed_content" | sed -n '/CREATE CONNECTION.*skyflow_tokenize_conn/,/);/p') + local detokenize_sql=$(echo "$processed_content" | sed -n '/CREATE CONNECTION.*skyflow_detokenize_conn/,/);$/p') + + # Execute tokenization connection creation with detailed logging (direct API call to avoid catalog context) + echo "Executing tokenization connection SQL without catalog context..." + local tokenize_response=$(curl -s -X POST "${DATABRICKS_HOST}/api/2.0/sql/statements" \ + -H "Authorization: Bearer ${DATABRICKS_TOKEN}" \ + -H "Content-Type: application/json" \ + -d "{\"statement\":$(echo "$tokenize_sql" | python3 -c 'import json,sys; print(json.dumps(sys.stdin.read()))'),\"warehouse_id\":\"${WAREHOUSE_ID}\"}") + + if echo "$tokenize_response" | grep -q '"state":"SUCCEEDED"'; then + echo "✓ Created UC tokenization connection: skyflow_tokenize_conn" + local tokenize_success=true + else + echo "❌ ERROR: Tokenization connection creation failed" + echo "SQL statement was: $tokenize_sql" + echo "Response: $tokenize_response" + local tokenize_success=false + fi + + # Execute detokenization connection creation with detailed logging (direct API call to avoid catalog context) + echo "Executing detokenization connection SQL without catalog context..." + local detokenize_response=$(curl -s -X POST "${DATABRICKS_HOST}/api/2.0/sql/statements" \ + -H "Authorization: Bearer ${DATABRICKS_TOKEN}" \ + -H "Content-Type: application/json" \ + -d "{\"statement\":$(echo "$detokenize_sql" | python3 -c 'import json,sys; print(json.dumps(sys.stdin.read()))'),\"warehouse_id\":\"${WAREHOUSE_ID}\"}") + + if echo "$detokenize_response" | grep -q '"state":"SUCCEEDED"'; then + echo "✓ Created UC detokenization connection: skyflow_detokenize_conn" + local detokenize_success=true + else + echo "❌ ERROR: Detokenization connection creation failed" + echo "SQL statement was: $detokenize_sql" + echo "Response: $detokenize_response" + local detokenize_success=false + fi + + # Verify connections actually exist after creation + echo "Verifying UC connections were actually created..." + local actual_connections=$(curl -s -H "Authorization: Bearer ${DATABRICKS_TOKEN}" \ + "${DATABRICKS_HOST}/api/2.1/unity-catalog/connections" | \ + python3 -c " +import json, sys +try: + data = json.load(sys.stdin) + skyflow_conns = [c['name'] for c in data.get('connections', []) if 'skyflow' in c['name'].lower()] + print(' '.join(skyflow_conns)) +except: + print('') +") + + if echo "$actual_connections" | grep -q "skyflow_tokenize_conn"; then + echo "✓ Verified skyflow_tokenize_conn exists" + else + echo "❌ skyflow_tokenize_conn NOT FOUND after creation" + tokenize_success=false + fi + + if echo "$actual_connections" | grep -q "skyflow_detokenize_conn"; then + echo "✓ Verified skyflow_detokenize_conn exists" + else + echo "❌ skyflow_detokenize_conn NOT FOUND after creation" + detokenize_success=false + fi + + # Return success only if both connections succeeded + if [ "$tokenize_success" = true ] && [ "$detokenize_success" = true ]; then + echo "✓ Both UC connections created successfully via SQL" + return 0 + else + echo "❌ Failed to create required UC connections" + echo "Both connections must be created successfully for setup to proceed" + exit 1 + fi +} + +# Function to setup Unity Catalog secrets via Databricks REST API +setup_uc_secrets() { + echo "Creating Unity Catalog secrets scope..." + + # Create UC-backed secrets scope + local scope_response=$(curl -s -X POST "${DATABRICKS_HOST}/api/2.0/secrets/scopes/create" \ + -H "Authorization: Bearer ${DATABRICKS_TOKEN}" \ + -H "Content-Type: application/json" \ + -d '{ + "scope": "skyflow-secrets", + "scope_backend_type": "UC" + }') + + # Check if scope creation failed (ignore if already exists) + if echo "$scope_response" | grep -q '"error_code":"RESOURCE_ALREADY_EXISTS"'; then + echo "✓ Secrets scope already exists" + elif echo "$scope_response" | grep -q '"error_code"'; then + echo "Error creating secrets scope: $scope_response" + return 1 + else + echo "✓ Created secrets scope successfully" + fi + + # Create individual secrets + local secrets=( + "skyflow_pat_token:${SKYFLOW_PAT_TOKEN}" + "skyflow_account_id:${SKYFLOW_ACCOUNT_ID}" + "skyflow_vault_url:${SKYFLOW_VAULT_URL}" + "skyflow_vault_id:${SKYFLOW_VAULT_ID}" + "skyflow_table:${SKYFLOW_TABLE}" + ) + + for secret_pair in "${secrets[@]}"; do + IFS=':' read -r key value <<< "$secret_pair" + echo "Creating secret: $key" + + local secret_response=$(curl -s -X POST "${DATABRICKS_HOST}/api/2.0/secrets/put" \ + -H "Authorization: Bearer ${DATABRICKS_TOKEN}" \ + -H "Content-Type: application/json" \ + -d "{ + \"scope\": \"skyflow-secrets\", + \"key\": \"$key\", + \"string_value\": \"$value\" + }") + + if echo "$secret_response" | grep -q '"error_code"'; then + echo "Warning: Error creating secret $key: $secret_response" + else + echo "✓ Created secret: $key" + fi + done + + return 0 } # Function to create a notebook using Databricks REST API @@ -125,9 +235,9 @@ create_notebook() { return 1 fi - # Read file content and replace placeholders + # Read file content and substitute variables local content=$(cat "$source_file") - local processed_content=$(replace_placeholders "$content") + local processed_content=$(substitute_variables "$content") # Base64 encode the processed content if [[ "$OSTYPE" == "darwin"* ]]; then @@ -177,7 +287,7 @@ execute_sql() { # Read and process SQL file local sql_content=$(cat "$sql_file") - local processed_content=$(replace_placeholders "$sql_content") + local processed_content=$(substitute_variables "$sql_content") local current_statement="" # Split into statements, handling multi-line SQL properly @@ -221,20 +331,137 @@ print(json.dumps(content)) ') rm "$temp_file" + # Use dedicated catalog if available, otherwise main + local catalog_context="${CATALOG_NAME:-main}" local response=$(curl -s -X POST "${DATABRICKS_HOST}/api/2.0/sql/statements" \ -H "Authorization: Bearer ${DATABRICKS_TOKEN}" \ -H "Content-Type: application/json" \ - -d "{\"statement\":${json_statement},\"catalog\":\"hive_metastore\",\"schema\":\"default\",\"warehouse_id\":\"${WAREHOUSE_ID}\"}") + -d "{\"statement\":${json_statement},\"catalog\":\"${catalog_context}\",\"schema\":\"default\",\"warehouse_id\":\"${WAREHOUSE_ID}\"}") # Check for errors in response if echo "$response" | grep -q "error"; then echo "Error executing SQL:" - echo "$response" + echo "Statement was: $statement" + echo "Response: $response" return 1 fi done } +# Function to run a notebook using Databricks Runs API +run_notebook() { + local notebook_path=$1 + local table_name=$2 + local pii_columns=$3 + local batch_size=${4:-${SKYFLOW_BATCH_SIZE}} # Use provided batch size or env default + + echo "Running notebook: ${notebook_path}" + echo "Batch size: ${batch_size}" + + # Create job run with notebook parameters using serverless compute + # Note: For serverless, we must use multi-task format with tasks array + local run_response=$(curl -s -X POST "${DATABRICKS_HOST}/api/2.1/jobs/runs/submit" \ + -H "Authorization: Bearer ${DATABRICKS_TOKEN}" \ + -H "Content-Type: application/json" \ + -d "{ + \"run_name\": \"Serverless_Tokenize_${table_name}_$(date +%s)\", + \"tasks\": [ + { + \"task_key\": \"tokenize_task\", + \"notebook_task\": { + \"notebook_path\": \"${notebook_path}\", + \"source\": \"WORKSPACE\", + \"base_parameters\": { + \"table_name\": \"${table_name}\", + \"pii_columns\": \"${pii_columns}\", + \"batch_size\": \"${batch_size}\" + } + }, + \"timeout_seconds\": 1800 + } + ] + }") + + # Extract run ID + local run_id=$(echo "$run_response" | python3 -c " +import json, sys +try: + data = json.load(sys.stdin) + if 'run_id' in data: + print(data['run_id']) + else: + print('ERROR: ' + str(data)) + sys.exit(1) +except Exception as e: + print('ERROR: ' + str(e)) + sys.exit(1) +") + + if [[ "$run_id" == ERROR* ]]; then + echo "Failed to start notebook run: $run_id" + return 1 + fi + + echo "Started notebook run with ID: $run_id" + + # Extract workspace ID from hostname for task URL + local workspace_id=$(echo "$DATABRICKS_HOST" | sed 's/https:\/\/dbc-//' | sed 's/-.*\.cloud\.databricks\.com.*//') + echo "View live logs: ${DATABRICKS_HOST}/jobs/runs/${run_id}?o=${workspace_id}" + + # Wait for run to complete + echo "Waiting for tokenization to complete..." + local max_wait=300 # 5 minutes + local wait_time=0 + + while [[ $wait_time -lt $max_wait ]]; do + local status_response=$(curl -s -X GET "${DATABRICKS_HOST}/api/2.1/jobs/runs/get?run_id=${run_id}" \ + -H "Authorization: Bearer ${DATABRICKS_TOKEN}") + + local run_state=$(echo "$status_response" | python3 -c " +import json, sys +try: + data = json.load(sys.stdin) + state = data.get('state', {}).get('life_cycle_state', 'UNKNOWN') + print(state) +except: + print('UNKNOWN') +") + + case $run_state in + "TERMINATED") + local result_state=$(echo "$status_response" | python3 -c " +import json, sys +try: + data = json.load(sys.stdin) + result = data.get('state', {}).get('result_state', 'UNKNOWN') + print(result) +except: + print('UNKNOWN') +") + if [[ "$result_state" == "SUCCESS" ]]; then + echo "✅ Tokenization completed successfully" + return 0 + else + echo "❌ Tokenization failed with result: $result_state" + return 1 + fi + ;; + "INTERNAL_ERROR"|"FAILED"|"TIMEDOUT"|"CANCELED"|"SKIPPED") + echo "❌ Tokenization run failed with state: $run_state" + return 1 + ;; + *) + echo "Tokenization in progress... (state: $run_state)" + sleep 30 + wait_time=$((wait_time + 30)) + ;; + esac + done + + echo "❌ Tokenization timed out after ${max_wait} seconds" + return 1 +} + # Function to import dashboard using Databricks Lakeview API create_dashboard() { local path=$1 @@ -246,11 +473,11 @@ create_dashboard() { return 1 fi - # Read file content and replace placeholders + # Read file content and substitute variables local content=$(cat "$source_file") - # Replace placeholders and stringify dashboard content - local processed_content=$(replace_placeholders "$content" | python3 -c 'import json,sys; print(json.dumps(json.dumps(json.load(sys.stdin))))') + # Substitute variables and stringify dashboard content + local processed_content=$(substitute_variables "$content" | python3 -c 'import json,sys; print(json.dumps(json.dumps(json.load(sys.stdin))))') # Check if dashboard already exists and delete it dashboards=$(curl -s -X GET "${DATABRICKS_HOST}/api/2.0/lakeview/dashboards" \ @@ -331,9 +558,100 @@ print('true' if ids else 'false') echo "$dashboard_id" } +# Function to create metastore +create_metastore() { + echo "Creating dedicated metastore..." + local metastore_name="${PREFIX}_metastore" + + # Create metastore with only required fields (no S3 bucket or IAM role) + local metastore_response=$(curl -s -X POST "${DATABRICKS_HOST}/api/2.1/unity-catalog/metastores" \ + -H "Authorization: Bearer ${DATABRICKS_TOKEN}" \ + -H "Content-Type: application/json" \ + -d "{ + \"name\": \"${metastore_name}\", + \"region\": \"${DATABRICKS_METASTORE_REGION:-us-west-1}\" + }") + + # Extract metastore ID + local metastore_id=$(echo "$metastore_response" | python3 -c " +import json, sys +try: + data = json.load(sys.stdin) + if 'metastore_id' in data: + print(data['metastore_id']) + else: + print('ERROR: ' + str(data)) + sys.exit(1) +except Exception as e: + print('ERROR: ' + str(e)) + sys.exit(1) +") + + if [[ "$metastore_id" == ERROR* ]]; then + echo "Failed to create metastore: $metastore_id" + return 1 + fi + + echo "Metastore created with ID: $metastore_id" + export METASTORE_ID="$metastore_id" + + # Assign metastore to current workspace + echo "Assigning metastore to workspace..." + local assignment_response=$(curl -s -X PUT "${DATABRICKS_HOST}/api/2.1/unity-catalog/current-metastore-assignment" \ + -H "Authorization: Bearer ${DATABRICKS_TOKEN}" \ + -H "Content-Type: application/json" \ + -d "{\"metastore_id\": \"${metastore_id}\"}") + + # Wait for assignment to propagate + echo "Waiting for metastore assignment..." + sleep 10 + + return 0 +} + +# Function to destroy metastore +destroy_metastore() { + echo "Finding and destroying metastore..." + local metastore_name="${PREFIX}_metastore" + + # Get metastore ID by name + local metastores=$(curl -s -H "Authorization: Bearer ${DATABRICKS_TOKEN}" \ + "${DATABRICKS_HOST}/api/2.1/unity-catalog/metastores") + + local metastore_id=$(echo "$metastores" | python3 -c " +import json, sys +try: + data = json.load(sys.stdin) + for ms in data.get('metastores', []): + if ms.get('name') == '${metastore_name}': + print(ms.get('metastore_id', '')) + break + else: + print('') +except: + print('') +") + + if [[ -n "$metastore_id" ]]; then + echo "Found metastore ID: $metastore_id" + + # Delete metastore + echo "Deleting metastore..." + curl -s -X DELETE "${DATABRICKS_HOST}/api/2.1/unity-catalog/metastores/${metastore_id}" \ + -H "Authorization: Bearer ${DATABRICKS_TOKEN}" \ + -d '{"force": true}' + + echo "Metastore deletion initiated" + return 0 + else + echo "Metastore ${metastore_name} not found" + return 0 + fi +} + # Function to check directory existence check_directories() { - local dirs=("notebooks" "sql" "dashboards") + local dirs=("notebooks" "sql" "sql/setup" "sql/destroy" "sql/verify" "dashboards") local missing=0 for dir in "${dirs[@]}"; do @@ -355,6 +673,13 @@ create_components() { # Verify required directories exist check_directories || exit 1 + # Create dedicated catalog for this instance (instead of metastore) + echo "Creating dedicated catalog: ${PREFIX}_catalog" + execute_sql "sql/setup/create_catalog.sql" || exit 1 + + # Use our dedicated catalog instead of main + export CATALOG_NAME="${PREFIX}_catalog" + # Create Shared directory if it doesn't exist echo "Creating Shared directory..." curl -s -X POST "${DATABRICKS_HOST}/api/2.0/workspace/mkdirs" \ @@ -362,24 +687,52 @@ create_components() { -H "Content-Type: application/json" \ -d "{\"path\": \"/Workspace/Shared\"}" + # Catalog and schema setup complete + # Create sample table echo "Creating sample table..." - execute_sql "sql/create_sample_table.sql" || exit 1 + execute_sql "sql/setup/create_sample_table.sql" || exit 1 - # Create notebooks - echo "Creating notebooks..." - for notebook in tokenize_table call_tokenize_table; do - echo "Creating ${notebook} notebook..." - create_notebook "/Workspace/Shared/${PREFIX}_${notebook}" "notebooks/notebook_${notebook}.ipynb" || exit 1 - done + # Create tokenization notebook + echo "Creating tokenization notebook..." + create_notebook "/Workspace/Shared/${PREFIX}_tokenize_table" "notebooks/notebook_tokenize_table.ipynb" || exit 1 - # Create detokenization function - echo "Creating detokenization function..." - execute_sql "sql/create_detokenize_function.sql" || exit 1 + # Setup Unity Catalog secrets via REST API + echo "Setting up Unity Catalog secrets via Databricks API..." + setup_uc_secrets || exit 1 + + # Create UC connections (required) + echo "Creating Unity Catalog connections..." + setup_uc_connections || exit 1 + + echo "Creating Pure SQL detokenization functions with UC connections..." + execute_sql "sql/setup/setup_uc_connections_api.sql" || exit 1 + echo "✓ Using UC connections approach (pure SQL, highest performance)" + + # Brief pause to ensure function is fully created + echo "Verifying function creation..." + sleep 5 + + # Verify Unity Catalog functions are created + echo "Verifying Unity Catalog detokenization functions..." + execute_sql "sql/verify/verify_functions.sql" || exit 1 + + # Check if table exists before applying masks + echo "Verifying table exists..." + execute_sql "sql/verify/verify_table.sql" || exit 1 - # Verify function creation - # echo "Verifying function creation..." - # execute_sql "DESCRIBE FUNCTION EXTENDED ${PREFIX}_skyflow_bulk_detokenize" || exit 1 + # The conditional detokenization function is already created by create_uc_detokenize_functions.sql + echo "✓ Unity Catalog conditional detokenization functions created" + + # Tokenize the sample data FIRST (before applying masks) + echo "Tokenizing PII data in sample table..." + local pii_columns="first_name" + local table_name="${PREFIX}_catalog.default.${PREFIX}_customer_data" + run_notebook "/Workspace/Shared/${PREFIX}_tokenize_table" "${table_name}" "${pii_columns}" "${SKYFLOW_BATCH_SIZE}" || exit 1 + + # Apply column masks to PII columns AFTER tokenization + echo "Applying column masks to tokenized PII columns..." + execute_sql "sql/setup/apply_column_masks.sql" || exit 1 # Create dashboard echo "Creating dashboard..." @@ -390,19 +743,31 @@ create_components() { echo "Setup complete! Created resources with prefix: ${PREFIX}" echo " Resources created: -1. Sample table: ${PREFIX}_customer_data -2. Tokenization notebooks: - - /Workspace/Shared/${PREFIX}_tokenize_table - - /Workspace/Shared/${PREFIX}_call_tokenize_table -3. Detokenization function: ${PREFIX}_skyflow_bulk_detokenize -4. Dashboard: ${PREFIX}_customer_insights +1. Dedicated Unity Catalog: ${PREFIX}_catalog +2. Sample table: ${PREFIX}_catalog.default.${PREFIX}_customer_data (with tokenized PII and column masks applied) +3. Tokenization notebook: + - /Workspace/Shared/${PREFIX}_tokenize_table (serverless-optimized) +4. Unity Catalog Infrastructure: + - SQL-created HTTP connections: skyflow_tokenize_conn, skyflow_detokenize_conn + - UC-backed secrets scope: skyflow-secrets (contains PAT token, account ID, vault details) + - Bearer token authentication with proper secret() references +5. Pure SQL Functions: + - ${PREFIX}_catalog.default.${PREFIX}_skyflow_uc_detokenize (direct Skyflow API via UC connections) + - ${PREFIX}_catalog.default.${PREFIX}_skyflow_conditional_detokenize (role-based access control) + - ${PREFIX}_catalog.default.${PREFIX}_skyflow_mask_detokenize (column mask wrapper) +6. Column masks applied to ALL PII columns - only 'auditor' group sees detokenized data +7. Dashboard: ${PREFIX}_customer_insights (catalog-qualified queries) +8. ✅ PII data automatically tokenized during setup Usage: -1. To tokenize data: - Open and run the /Workspace/Shared/${PREFIX}_call_tokenize_table notebook +1. Data is already tokenized and ready to use! + +2. To query data with automatic detokenization: + Run: SELECT * FROM ${PREFIX}_catalog.default.${PREFIX}_customer_data + (PII columns automatically detokenized based on user role) -2. To detokenize data: - Run: SELECT ${PREFIX}_skyflow_bulk_detokenize(array('token1', 'token2'), current_user()) +3. For bulk detokenization: + Run: SELECT ${PREFIX}_catalog.default.${PREFIX}_skyflow_bulk_detokenize(array('token1', 'token2'), current_user()) 3. To view data: Dashboard URL: ${DATABRICKS_HOST}/sql/dashboardsv3/${dashboard_id:-""} @@ -414,29 +779,96 @@ destroy_components() { local failed_deletions=() local successful_deletions=() - # Drop the function - echo "Dropping detokenization function..." - execute_sql "DROP FUNCTION IF EXISTS ${PREFIX}_skyflow_bulk_detokenize" - # Verify function deletion - if execute_sql "DESCRIBE FUNCTION ${PREFIX}_skyflow_bulk_detokenize" &>/dev/null; then - failed_deletions+=("Function: ${PREFIX}_skyflow_bulk_detokenize") + # Drop Unity Catalog functions (including test functions) + echo "Dropping Unity Catalog detokenization functions..." + + # Set catalog context to dedicated catalog if it exists, otherwise skip function drops + local catalog_name="${PREFIX}_catalog" + local catalog_check_response=$(curl -s -X GET "${DATABRICKS_HOST}/api/2.1/unity-catalog/catalogs/${catalog_name}" \ + -H "Authorization: Bearer ${DATABRICKS_TOKEN}") + + if echo "$catalog_check_response" | grep -q '"name"'; then + echo "Using catalog: ${catalog_name}" + CATALOG_NAME="${catalog_name}" execute_sql "sql/destroy/drop_functions.sql" else - successful_deletions+=("Function: ${PREFIX}_skyflow_bulk_detokenize") + echo "Dedicated catalog ${catalog_name} not found, skipping function drops" fi + + # Clean up UC secrets scope + echo "Cleaning up Unity Catalog secrets..." + local delete_scope_response=$(curl -s -X POST "${DATABRICKS_HOST}/api/2.0/secrets/scopes/delete" \ + -H "Authorization: Bearer ${DATABRICKS_TOKEN}" \ + -H "Content-Type: application/json" \ + -d '{"scope": "skyflow-secrets"}') + + if echo "$delete_scope_response" | grep -q '"error_code"'; then + echo "Warning: Could not delete secrets scope: $delete_scope_response" + else + echo "✓ Deleted secrets scope: skyflow-secrets" + fi + + # Drop Unity Catalog connections (both SQL-created and REST-created) + echo "Cleaning up Unity Catalog connections..." + local connections_response=$(curl -s -X GET "${DATABRICKS_HOST}/api/2.1/unity-catalog/connections" \ + -H "Authorization: Bearer ${DATABRICKS_TOKEN}") + + # Extract connection names that are Skyflow-related + local connection_names=$(echo "$connections_response" | python3 -c " +import json, sys +try: + data = json.load(sys.stdin) + names = [conn['name'] for conn in data.get('connections', []) + if conn['name'] in ['skyflow_tokenize_conn', 'skyflow_detokenize_conn']] + print('\n'.join(names)) +except Exception as e: + print(f'Error extracting names: {e}') + print('') +") + + # Delete each matching connection + if [[ -n "$connection_names" ]]; then + while IFS= read -r conn_name; do + if [[ -n "$conn_name" ]]; then + echo "Deleting UC connection: ${conn_name}" + local delete_response=$(curl -s -X DELETE "${DATABRICKS_HOST}/api/2.1/unity-catalog/connections/${conn_name}" \ + -H "Authorization: Bearer ${DATABRICKS_TOKEN}") + if echo "$delete_response" | grep -q '"error_code"'; then + echo " Warning: Error deleting $conn_name: $delete_response" + else + echo " ✓ Deleted connection: $conn_name" + fi + fi + done <<< "$connection_names" + else + echo "No connections found to delete" + fi + + # Verify UC function deletions + if execute_sql "sql/verify/check_functions_exist.sql" &>/dev/null; then + failed_deletions+=("Function: ${PREFIX}_skyflow_uc_detokenize") + failed_deletions+=("Function: ${PREFIX}_skyflow_mask_detokenize") + else + successful_deletions+=("Function: ${PREFIX}_skyflow_uc_detokenize") + successful_deletions+=("Function: ${PREFIX}_skyflow_mask_detokenize") + fi + + # Remove column masks before dropping table + echo "Removing column masks..." + execute_sql "sql/destroy/remove_column_masks.sql" &>/dev/null || true # Drop the table echo "Dropping sample table..." - execute_sql "DROP TABLE IF EXISTS ${PREFIX}_customer_data" + execute_sql "sql/destroy/drop_table.sql" # Verify table deletion - if execute_sql "DESCRIBE TABLE ${PREFIX}_customer_data" &>/dev/null; then + if execute_sql "sql/verify/check_table_exists.sql" &>/dev/null; then failed_deletions+=("Table: ${PREFIX}_customer_data") else successful_deletions+=("Table: ${PREFIX}_customer_data") fi - # Delete notebooks - echo "Deleting notebooks..." - local notebook_paths=("/Workspace/Shared/${PREFIX}_tokenize_table" "/Workspace/Shared/${PREFIX}_call_tokenize_table") + # Delete notebook + echo "Deleting tokenization notebook..." + local notebook_paths=("/Workspace/Shared/${PREFIX}_tokenize_table") for notebook_path in "${notebook_paths[@]}"; do echo "Deleting notebook: ${notebook_path}" local response=$(curl -s -X POST "${DATABRICKS_HOST}/api/2.0/workspace/delete" \ @@ -503,6 +935,10 @@ print('true' if exists else 'false') fi done <<< "$matching_ids" + # Destroy dedicated catalog + echo "Destroying dedicated catalog..." + execute_sql "sql/destroy/cleanup_catalog.sql" || echo "Failed to drop catalog, continuing..." + # Print summary echo -e "\nDestroy Summary:" if [[ ${#successful_deletions[@]} -gt 0 ]]; then @@ -520,10 +956,111 @@ print('true' if exists else 'false') fi } +# Function to check if resources already exist +check_existing_resources() { + local has_existing=false + local existing_resources=() + + echo "Checking for existing resources with prefix: ${PREFIX}" + + # Check for catalog + local catalog_check=$(curl -s -H "Authorization: Bearer ${DATABRICKS_TOKEN}" \ + "${DATABRICKS_HOST}/api/2.1/unity-catalog/catalogs" | \ + python3 -c " +import sys, json +try: + data = json.load(sys.stdin) + catalogs = [c['name'] for c in data.get('catalogs', [])] + exists = '${PREFIX}_catalog' in catalogs + print('true' if exists else 'false') +except: + print('false') +") + + if [[ "$catalog_check" == "true" ]]; then + has_existing=true + existing_resources+=("Catalog: ${PREFIX}_catalog") + fi + + # Check for secrets scope + local secrets_check=$(curl -s -H "Authorization: Bearer ${DATABRICKS_TOKEN}" \ + "${DATABRICKS_HOST}/api/2.0/secrets/scopes/list" | \ + python3 -c " +import sys, json +try: + data = json.load(sys.stdin) + scopes = [s['name'] for s in data.get('scopes', [])] + exists = 'skyflow-secrets' in scopes + print('true' if exists else 'false') +except: + print('false') +") + + if [[ "$secrets_check" == "true" ]]; then + has_existing=true + existing_resources+=("Secrets scope: skyflow-secrets") + fi + + # Check for UC connections + local connections_check=$(curl -s -H "Authorization: Bearer ${DATABRICKS_TOKEN}" \ + "${DATABRICKS_HOST}/api/2.1/unity-catalog/connections" | \ + python3 -c " +import sys, json +try: + data = json.load(sys.stdin) + connections = [c['name'] for c in data.get('connections', [])] + tokenize_exists = 'skyflow_tokenize_conn' in connections + detokenize_exists = 'skyflow_detokenize_conn' in connections + print('true' if (tokenize_exists or detokenize_exists) else 'false') +except: + print('false') +") + + if [[ "$connections_check" == "true" ]]; then + has_existing=true + existing_resources+=("UC connections: skyflow_*_conn") + fi + + # Check for notebooks + local notebook_path="/Workspace/Shared/${PREFIX}_tokenize_table" + local notebook_check=$(curl -s -H "Authorization: Bearer ${DATABRICKS_TOKEN}" \ + "${DATABRICKS_HOST}/api/2.0/workspace/get-status" \ + -d "{\"path\": \"${notebook_path}\"}" | \ + python3 -c " +import sys, json +try: + data = json.load(sys.stdin) + exists = 'path' in data + print('true' if exists else 'false') +except: + print('false') +") + + if [[ "$notebook_check" == "true" ]]; then + has_existing=true + existing_resources+=("Notebook: ${notebook_path}") + fi + + if [[ "$has_existing" == "true" ]]; then + echo "" + echo "❌ ERROR: Existing resources found that would conflict with setup:" + printf ' - %s\n' "${existing_resources[@]}" + echo "" + echo "Please run one of the following commands first:" + echo " ./setup.sh destroy # Remove existing resources" + echo " ./setup.sh recreate ${PREFIX} # Remove and recreate all resources" + echo "" + exit 1 + else + echo "✅ No conflicting resources found. Proceeding with setup..." + fi +} + # Main logic -prompt_for_config +load_config if [[ "$1" == "create" ]]; then + check_existing_resources create_components elif [[ "$1" == "destroy" ]]; then destroy_components diff --git a/sql/create_detokenize_function.sql b/sql/create_detokenize_function.sql deleted file mode 100644 index cf3950f..0000000 --- a/sql/create_detokenize_function.sql +++ /dev/null @@ -1,136 +0,0 @@ -CREATE OR REPLACE FUNCTION default._skyflow_bulk_detokenize(tokens ARRAY, user_email STRING) -RETURNS ARRAY -LANGUAGE PYTHON -AS $$ -import requests -import math -from concurrent.futures import ThreadPoolExecutor, as_completed - -# Skyflow API details -SKYFLOW_API_URL = "/v1/vaults//detokenize" -BEARER_TOKEN = "" - -# Databricks SCIM API details -DATABRICKS_INSTANCE = "" # e.g. xyz-abc12d34-567e -SCIM_API_URL = f"{DATABRICKS_INSTANCE}/api/2.0/preview/scim/v2/Users" -DATABRICKS_TOKEN = "" # e.g. dapi0123456789ab... - -# Mapping roles to redaction styles with multiple groups -ROLE_TO_REDACTION = { - "PLAIN_TEXT": [ - "", - "", - "" - ], - "MASKED": [ - "", - "", - "" - ], - "REDACTED": [ - "", - "", - "" - ] -} - -# Define the redaction priority order -REDACTION_PRIORITY = ["PLAIN_TEXT", "MASKED", "REDACTED"] # if user has multiple roles across redaction levels, the highest privileges will be used - -def get_redaction_style(user_email): - """ - Function to get the highest redaction style based on the user's Databricks SCIM groups. - Defaults to REDACTED if no relevant roles are found. - """ - headers = { - "Authorization": f"Bearer {DATABRICKS_TOKEN}", - "Content-Type": "application/json" - } - - try: - # API call to fetch user details - response = requests.get(f"{SCIM_API_URL}?filter=userName%20eq%20%22{user_email}%22", headers=headers) - response.raise_for_status() - user_info = response.json() - - if user_info["totalResults"] == 0: - print(f"No user found for email: {user_email}") - return "REDACTED" - - # Extract user's groups from the SCIM response - user_groups = [group.get('display').lower() for group in user_info["Resources"][0].get("groups", [])] - - found_redactions = [] - - # Determine redaction levels based on user groups - for redaction_level, groups in ROLE_TO_REDACTION.items(): - if any(group.lower() in user_groups for group in groups): - found_redactions.append(redaction_level) - - # If the user has no mapped groups, default to the lowest level (REDACTED) - if not found_redactions: - print(f"User {user_email} has no relevant roles, defaulting to REDACTED.") - return "REDACTED" - - # Determine the highest priority redaction level based on predefined order - for level in REDACTION_PRIORITY: - if level in found_redactions: - return level - - return "REDACTED" # Default fallback - - except requests.exceptions.RequestException as e: - print(f"Error fetching user role: {e}") - return "REDACTED" # Default on error - -def detokenize_chunk(chunk, redaction_style): - """ - Function to detokenize a chunk of tokens using the Skyflow API. - """ - headers = { - "Authorization": f"Bearer {BEARER_TOKEN}", - "Content-Type": "application/json" - } - - payload = { - "detokenizationParameters": [ - {"token": token, "redaction": redaction_style} for token in chunk - ] - } - - try: - response = requests.post(SKYFLOW_API_URL, json=payload, headers=headers) - response.raise_for_status() - result = response.json() - return [record["value"] for record in result["records"]] - except requests.exceptions.RequestException as e: - print(f"Error detokenizing chunk: {e}") - return [f"Error: {str(e)}" for _ in chunk] - -def bulk_detokenize(tokens, user_email): - """ - Multi-threaded bulk detokenization with user-specific redaction style. - """ - if not tokens: - return [] - - # Get the highest redaction style based on user's group memberships - redaction_style = get_redaction_style(user_email) - - MAX_TOKENS_PER_REQUEST = 25 - results = [] - - # Split tokens into chunks - chunks = [tokens[i:i + MAX_TOKENS_PER_REQUEST] for i in range(0, len(tokens), MAX_TOKENS_PER_REQUEST)] - - # Use ThreadPoolExecutor for multi-threading - with ThreadPoolExecutor(max_workers=math.ceil(len(tokens) / MAX_TOKENS_PER_REQUEST)) as executor: - future_to_chunk = {executor.submit(detokenize_chunk, chunk, redaction_style): chunk for chunk in chunks} - - for future in as_completed(future_to_chunk): - results.extend(future.result()) - - return results - -return bulk_detokenize(tokens, user_email) -$$; diff --git a/sql/destroy/cleanup_catalog.sql b/sql/destroy/cleanup_catalog.sql new file mode 100644 index 0000000..1c874a2 --- /dev/null +++ b/sql/destroy/cleanup_catalog.sql @@ -0,0 +1,2 @@ +-- Destroy dedicated catalog and all its contents +DROP CATALOG IF EXISTS ${PREFIX}_catalog CASCADE; \ No newline at end of file diff --git a/sql/destroy/drop_functions.sql b/sql/destroy/drop_functions.sql new file mode 100644 index 0000000..5adcf96 --- /dev/null +++ b/sql/destroy/drop_functions.sql @@ -0,0 +1,5 @@ +-- Drop Unity Catalog detokenization functions during cleanup +-- Note: Must be run with proper catalog context +DROP FUNCTION IF EXISTS ${PREFIX}_catalog.default.${PREFIX}_skyflow_uc_detokenize; +DROP FUNCTION IF EXISTS ${PREFIX}_catalog.default.${PREFIX}_skyflow_conditional_detokenize; +DROP FUNCTION IF EXISTS ${PREFIX}_catalog.default.${PREFIX}_skyflow_mask_detokenize; \ No newline at end of file diff --git a/sql/destroy/drop_table.sql b/sql/destroy/drop_table.sql new file mode 100644 index 0000000..2b10fba --- /dev/null +++ b/sql/destroy/drop_table.sql @@ -0,0 +1,2 @@ +-- Drop sample customer data table during cleanup +DROP TABLE IF EXISTS ${PREFIX}_customer_data; \ No newline at end of file diff --git a/sql/destroy/remove_column_masks.sql b/sql/destroy/remove_column_masks.sql new file mode 100644 index 0000000..d7d061d --- /dev/null +++ b/sql/destroy/remove_column_masks.sql @@ -0,0 +1,8 @@ +-- Remove column masks before dropping table during cleanup +-- Note: Only first_name should have a mask in current setup, but including all for safety +ALTER TABLE ${PREFIX}_customer_data ALTER COLUMN first_name DROP MASK; +ALTER TABLE ${PREFIX}_customer_data ALTER COLUMN last_name DROP MASK; +ALTER TABLE ${PREFIX}_customer_data ALTER COLUMN email DROP MASK; +ALTER TABLE ${PREFIX}_customer_data ALTER COLUMN phone_number DROP MASK; +ALTER TABLE ${PREFIX}_customer_data ALTER COLUMN address DROP MASK; +ALTER TABLE ${PREFIX}_customer_data ALTER COLUMN date_of_birth DROP MASK; \ No newline at end of file diff --git a/sql/setup/apply_column_masks.sql b/sql/setup/apply_column_masks.sql new file mode 100644 index 0000000..17f5d71 --- /dev/null +++ b/sql/setup/apply_column_masks.sql @@ -0,0 +1,4 @@ +-- Apply column masks to PII columns using Unity Catalog SQL-only functions +-- Pure SQL performance with UC connections - zero Python UDF overhead +-- Only auditors get detokenized data, others see raw tokens (optimized for performance) +ALTER TABLE ${PREFIX}_customer_data ALTER COLUMN first_name SET MASK ${PREFIX}_skyflow_mask_detokenize; \ No newline at end of file diff --git a/sql/setup/create_catalog.sql b/sql/setup/create_catalog.sql new file mode 100644 index 0000000..18c2e62 --- /dev/null +++ b/sql/setup/create_catalog.sql @@ -0,0 +1,3 @@ +-- Create dedicated catalog and schema for this instance +CREATE CATALOG IF NOT EXISTS ${PREFIX}_catalog; +CREATE SCHEMA IF NOT EXISTS ${PREFIX}_catalog.default; \ No newline at end of file diff --git a/sql/create_sample_table.sql b/sql/setup/create_sample_table.sql similarity index 70% rename from sql/create_sample_table.sql rename to sql/setup/create_sample_table.sql index c7d25e5..f916688 100644 --- a/sql/create_sample_table.sql +++ b/sql/setup/create_sample_table.sql @@ -1,4 +1,4 @@ -CREATE TABLE IF NOT EXISTS _customer_data ( +CREATE TABLE IF NOT EXISTS ${PREFIX}_customer_data ( customer_id STRING NOT NULL, first_name STRING, last_name STRING, @@ -18,7 +18,7 @@ CREATE TABLE IF NOT EXISTS _customer_data ( updated_at TIMESTAMP ); -INSERT INTO _customer_data ( +INSERT INTO ${PREFIX}_customer_data ( customer_id, first_name, last_name, @@ -39,7 +39,7 @@ INSERT INTO _customer_data ( ) WITH numbered_rows AS ( SELECT - posexplode(array_repeat(1, 10)) AS (id, _) + posexplode(array_repeat(1, 50)) AS (id, _) ), base_data AS ( SELECT @@ -72,16 +72,35 @@ base_data AS ( ) SELECT CONCAT('CUST', LPAD(CAST(id AS STRING), 5, '0')) AS customer_id, - first_name, - last_name, - LOWER(CONCAT(first_name, '.', last_name, CAST(id AS STRING), '@example.com')) AS email, - CONCAT('+1-', - LPAD(CAST(100 + MOD(id, 900) AS STRING), 3, '0'), '-', - LPAD(CAST(100 + MOD(id * 2, 900) AS STRING), 3, '0'), '-', - LPAD(CAST(1000 + MOD(id * 3, 9000) AS STRING), 4, '0') - ) AS phone_number, - CONCAT(CAST(100 + MOD(id * 5, 900) AS STRING), ' Main Street, ', city) AS address, - DATE_FORMAT(DATE_ADD('1950-01-01', id * 365), 'yyyy-MM-dd') AS date_of_birth, + -- Real PII data that will be tokenized + first_name AS first_name, + last_name AS last_name, + CONCAT(LOWER(first_name), '.', LOWER(last_name), '@example.com') AS email, + CASE MOD(id, 10) + WHEN 0 THEN '+1-555-0100' + WHEN 1 THEN '+1-555-0101' + WHEN 2 THEN '+1-555-0102' + WHEN 3 THEN '+1-555-0103' + WHEN 4 THEN '+1-555-0104' + WHEN 5 THEN '+1-555-0105' + WHEN 6 THEN '+1-555-0106' + WHEN 7 THEN '+1-555-0107' + WHEN 8 THEN '+1-555-0108' + WHEN 9 THEN '+1-555-0109' + END AS phone_number, + city AS address, + CASE MOD(id, 10) + WHEN 0 THEN '1985-03-15' + WHEN 1 THEN '1990-07-22' + WHEN 2 THEN '1988-11-08' + WHEN 3 THEN '1992-01-30' + WHEN 4 THEN '1987-09-14' + WHEN 5 THEN '1991-05-03' + WHEN 6 THEN '1989-12-18' + WHEN 7 THEN '1993-04-25' + WHEN 8 THEN '1986-08-11' + WHEN 9 THEN '1994-06-07' + END AS date_of_birth, DATE_ADD('2018-01-01', id - 1) AS signup_date, DATE_ADD('2023-01-01', id - 1) AS last_login, id * 5 AS total_purchases, diff --git a/sql/setup/create_uc_connections.sql b/sql/setup/create_uc_connections.sql new file mode 100644 index 0000000..700e4fd --- /dev/null +++ b/sql/setup/create_uc_connections.sql @@ -0,0 +1,20 @@ +-- Unity Catalog HTTP connections for Skyflow API integration +-- These must be created without catalog context (global metastore resources) + +-- Tokenization connection +CREATE CONNECTION IF NOT EXISTS skyflow_tokenize_conn TYPE HTTP +OPTIONS ( + host '${SKYFLOW_VAULT_URL}', + port 443, + base_path '/v1/vaults/${SKYFLOW_VAULT_ID}/${SKYFLOW_TABLE}', + bearer_token secret('skyflow-secrets', 'skyflow_pat_token') +); + +-- Detokenization connection +CREATE CONNECTION IF NOT EXISTS skyflow_detokenize_conn TYPE HTTP +OPTIONS ( + host '${SKYFLOW_VAULT_URL}', + port 443, + base_path '/v1/vaults/${SKYFLOW_VAULT_ID}', + bearer_token secret('skyflow-secrets', 'skyflow_pat_token') +); \ No newline at end of file diff --git a/sql/setup/setup_uc_connections_api.sql b/sql/setup/setup_uc_connections_api.sql new file mode 100644 index 0000000..39309e8 --- /dev/null +++ b/sql/setup/setup_uc_connections_api.sql @@ -0,0 +1,94 @@ +-- Unity Catalog Connections Setup via REST API +-- Complete pure SQL implementation matching Python UDF functionality +-- This provides zero Python overhead with native Spark SQL performance + +-- Connections are created via REST API in setup.sh: +-- +-- Tokenization connection: +-- { +-- "name": "skyflow_tokenize_conn", +-- "connection_type": "HTTP", +-- "options": { +-- "host": "${SKYFLOW_VAULT_URL}", +-- "port": "443", +-- "base_path": "/v1/vaults/${SKYFLOW_VAULT_ID}/${SKYFLOW_TABLE}", +-- "bearer_token": "{{secrets.skyflow-secrets.skyflow_pat_token}}" +-- } +-- } +-- +-- Detokenization connection: +-- { +-- "name": "skyflow_detokenize_conn", +-- "connection_type": "HTTP", +-- "options": { +-- "host": "${SKYFLOW_VAULT_URL}", +-- "port": "443", +-- "base_path": "/v1/vaults/${SKYFLOW_VAULT_ID}", +-- "bearer_token": "{{secrets.skyflow-secrets.skyflow_pat_token}}" +-- } +-- } + +-- Core detokenization function with configurable redaction level +CREATE OR REPLACE FUNCTION ${PREFIX}_skyflow_uc_detokenize(token STRING, redaction_level STRING) +RETURNS STRING +LANGUAGE SQL +DETERMINISTIC +RETURN + CASE + -- Handle null/empty tokens + WHEN token IS NULL OR trim(token) = '' THEN token + ELSE + COALESCE( + -- Extract detokenized value from Skyflow API response via UC connection + get_json_object( + get_json_object( + http_request( + conn => 'skyflow_detokenize_conn', + method => 'POST', + path => '/detokenize', + headers => map( + 'Content-Type', 'application/json', + 'Accept', 'application/json', + 'X-SKYFLOW-ACCOUNT-ID', '${SKYFLOW_ACCOUNT_ID}' + ), + json => concat( + '{"detokenizationParameters":[{"token":"', + token, + '","redaction":"', + redaction_level, + '"}]}' + ) + ).text, + '$.records[0]' + ), + '$.value' + ), + -- Fallback to token if API call fails + token + ) + END; + +-- Multi-level conditional detokenization function with role-based redaction +-- Supports PLAIN_TEXT, MASKED, and token-only based on user group membership +CREATE OR REPLACE FUNCTION ${PREFIX}_skyflow_conditional_detokenize(token STRING) +RETURNS STRING +LANGUAGE SQL +DETERMINISTIC +RETURN + CASE + -- Auditors get plain text (full detokenization) + WHEN is_account_group_member('auditor') OR is_member('auditor') THEN + ${PREFIX}_skyflow_uc_detokenize(token, 'PLAIN_TEXT') + -- Customer service gets masked data (partial redaction) + WHEN is_account_group_member('customer_service') OR is_member('customer_service') THEN + ${PREFIX}_skyflow_uc_detokenize(token, 'MASKED') + -- Marketing and all other users get tokens without API overhead + ELSE token + END; + +-- Convenience function for column masks (uses conditional logic) +CREATE OR REPLACE FUNCTION ${PREFIX}_skyflow_mask_detokenize(token STRING) +RETURNS STRING +LANGUAGE SQL +DETERMINISTIC +RETURN ${PREFIX}_skyflow_conditional_detokenize(token); \ No newline at end of file diff --git a/sql/verify/check_functions_exist.sql b/sql/verify/check_functions_exist.sql new file mode 100644 index 0000000..1531368 --- /dev/null +++ b/sql/verify/check_functions_exist.sql @@ -0,0 +1,4 @@ +-- Check if functions still exist (used for destroy verification) +-- These will error if functions don't exist, which is expected for verification +DESCRIBE FUNCTION ${PREFIX}_skyflow_uc_detokenize; +DESCRIBE FUNCTION ${PREFIX}_skyflow_mask_detokenize; \ No newline at end of file diff --git a/sql/verify/check_table_exists.sql b/sql/verify/check_table_exists.sql new file mode 100644 index 0000000..0ea004a --- /dev/null +++ b/sql/verify/check_table_exists.sql @@ -0,0 +1,3 @@ +-- Check if table still exists (used for destroy verification) +-- Will error if table doesn't exist, which is expected for verification +DESCRIBE TABLE ${PREFIX}_customer_data; \ No newline at end of file diff --git a/sql/verify/verify_functions.sql b/sql/verify/verify_functions.sql new file mode 100644 index 0000000..99d6f9f --- /dev/null +++ b/sql/verify/verify_functions.sql @@ -0,0 +1,3 @@ +-- Verify Unity Catalog detokenization functions exist +DESCRIBE FUNCTION ${PREFIX}_skyflow_uc_detokenize; +DESCRIBE FUNCTION ${PREFIX}_skyflow_mask_detokenize; \ No newline at end of file diff --git a/sql/verify/verify_table.sql b/sql/verify/verify_table.sql new file mode 100644 index 0000000..59dc990 --- /dev/null +++ b/sql/verify/verify_table.sql @@ -0,0 +1,2 @@ +-- Verify sample table exists before applying column masks +DESCRIBE TABLE ${PREFIX}_customer_data; \ No newline at end of file