diff --git a/README.md b/README.md index aad5e04..e0797d0 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ When you're ready to enable automated dbt runs on PRs, pushes to main, or a sche - **[Getting Started](docs/getting-started.md)** - Initial setup for new developers - **[Development Workflow](docs/development-workflow.md)** - How to develop models - **[dbt Best Practices](docs/dbt-best-practices.md)** - Patterns and configurations +- **[Dune Datashares](docs/dune-datashares.md)** - Sync tables to external warehouses - **[Testing](docs/testing.md)** - Test requirements - **[CI/CD](docs/cicd.md)** - GitHub Actions workflows - **[Troubleshooting](docs/troubleshooting.md)** - Common issues @@ -149,9 +150,16 @@ select * from dune.dune__tmp_.dbt_template_view_model | Incremental (Merge) | `dbt_template_merge_incremental_model.sql` | Efficient updates via merge | | Incremental (Delete+Insert) | `dbt_template_delete_insert_incremental_model.sql` | Efficient updates via delete+insert | | Incremental (Append) | `dbt_template_append_incremental_model.sql` | Append-only with deduplication | +| Incremental (Datashare) | `dbt_template_datashare_incremental_model.sql` | Merge model with datashare sync | All templates are in `models/templates/`. +## Datashares + +This template includes an opt-in datashare post-hook for `table` and `incremental` models. To enable it on a model, set `meta.datashare.enabled: true` and provide the sync window fields in the model config. + +See [docs/dune-datashares.md](docs/dune-datashares.md) for the full setup, `run-operation` examples, monitoring queries, and cleanup commands. + ## Table Visibility By default, all tables are **private** — only your team can query them. To make a table publicly accessible (visible and queryable by anyone on Dune), set `meta.dune.public: true` in the model config: diff --git a/dbt_project.yml b/dbt_project.yml index a6c5e84..16477ce 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -40,3 +40,5 @@ models: transaction: true - sql: "{{ vacuum_table(this, model.config.materialized) }}" transaction: true + - sql: "{{ datashare_trigger_sync() }}" + transaction: true diff --git a/docs/dune-datashares.md b/docs/dune-datashares.md new file mode 100644 index 0000000..398bea5 --- /dev/null +++ b/docs/dune-datashares.md @@ -0,0 +1,175 @@ +# Dune Datashares + +Datashares sync Dune tables to external data warehouses such as Snowflake and BigQuery so downstream consumers can query the data outside Dune. + +## Prerequisites + +Datashare is an enterprise feature that requires setup before any SQL statements will work: + +1. Contract and feature enablement with Dune. +2. Target warehouse configuration in Dune backoffice. +3. A Dune API key with Data Transformations access. + +If datashare is not enabled for your team, the SQL statements below will fail with an authorization error. + +Datashare syncs are billed based on bytes transferred and byte-months of storage for the synced table. + +## What This Template Includes + +This template ships with datashare support already wired in: + +- `macros/dune_dbt_overrides/datashare_table_sync_post_hook.sql` +- a global post-hook in `dbt_project.yml` that calls `datashare_trigger_sync()` +- an opt-in example model at `models/templates/dbt_template_datashare_incremental_model.sql` + +Models without `meta.datashare` are unchanged. The hook skips them. + +The built-in post-hook only executes on the `prod` target, so local `dev` runs and CI temp schemas do not create datashare syncs by default. + +## Supported Models + +Datashare sync is only applied to `table` and `incremental` models. + +Views are skipped. + +## Enable Datashare On A Model + +Add `meta.datashare` to a `table` or `incremental` model: + +```sql +{% set time_start = "current_date - interval '1' day" if is_incremental() else "current_date - interval '2' day" %} +{% set time_end = "current_date + interval '1' day" %} + +{{ config( + alias = 'my_datashared_model' + , materialized = 'incremental' + , incremental_strategy = 'merge' + , unique_key = ['block_number', 'block_date'] + , meta = { + "datashare": { + "enabled": true, + "time_column": "block_date", + "time_start": time_start, + "time_end": time_end + } + } +) }} + +select ... +``` + +The included example model in this repo follows this pattern. + +## Configuration Reference + +All datashare config lives under `meta.datashare` in the model `config()` block. + +| Property | Required | Type | Description | +| --- | --- | --- | --- | +| `enabled` | Yes | `boolean` | Must be `true` to trigger sync. | +| `time_column` | Yes | `string` | Column used to define the sync window. | +| `time_start` | Yes | `string` | SQL expression for the start of the sync window. | +| `time_end` | No | `string` | SQL expression for the end of the sync window. Defaults to `now()`. | +| `unique_key_columns` | No | `list[string]` | Row identity columns. Falls back to the model `unique_key` if omitted. | + +`time_start` and `time_end` are SQL expressions, not literal timestamps. The macro wraps them in `CAST(... AS VARCHAR)` before calling the table procedure. + +Keep the sync window aligned with the `time_column` granularity. For example, if `time_column` is a `date`, use date-based expressions like `current_date - interval '1' day`, not hour-based timestamp windows. + +## Full Refresh Behavior + +The macro determines `full_refresh` automatically: + +| Context | `full_refresh` | +| --- | --- | +| Incremental post-hook on a normal incremental run | `false` | +| Incremental post-hook on first run or `--full-refresh` | `true` | +| Table materialization post-hook | `true` | +| `run-operation` | `false` unless overridden | + +## Generated SQL + +The post-hook generates this Trino statement: + +```sql +ALTER TABLE dune.. EXECUTE datashare( + time_column => '', + unique_key_columns => ARRAY['col1', 'col2'], + time_start => CAST( AS VARCHAR), + time_end => CAST( AS VARCHAR), + full_refresh => true|false +) +``` + +## Manual Syncs + +Use `run-operation` when you want to trigger a sync outside `dbt run`. + +Preview the generated SQL only: + +```bash +uv run dbt run-operation datashare_trigger_sync_operation --args ' +model_selector: dbt_template_datashare_incremental_model +dry_run: true +' +``` + +Execute a sync: + +```bash +uv run dbt run-operation datashare_trigger_sync_operation --args ' +model_selector: dbt_template_datashare_incremental_model +time_start: "current_date - interval '\''7'\'' day" +time_end: "current_date + interval '\''1'\'' day" +' +``` + +Force a full refresh sync: + +```bash +uv run dbt run-operation datashare_trigger_sync_operation --args ' +model_selector: dbt_template_datashare_incremental_model +full_refresh: true +' +``` + +`model_selector` accepts the model name, alias, fully qualified name, or dbt `unique_id`. + +## Monitoring + +Check the datashare system tables after a run: + +```sql +SELECT * +FROM dune.datashare.table_syncs +WHERE source_schema = ''; + +SELECT * +FROM dune.datashare.table_sync_runs +WHERE source_schema = '' +ORDER BY created_at DESC; +``` + +`table_syncs` shows the registered share and its latest status. + +`table_sync_runs` shows individual sync attempts, including the time window and whether the run was a full refresh. + +## Cleanup + +Remove a table from datashare with: + +```sql +ALTER TABLE dune..
EXECUTE delete_datashare +``` + +## Example Workflow + +1. Configure a model with `meta.datashare`. +2. Run it with `uv run dbt run --select my_model --target prod`. +3. Confirm the datashare registration in `dune.datashare.table_syncs`. +4. Inspect run history in `dune.datashare.table_sync_runs`. + +## Further Reading + +- [Supported SQL Operations](https://docs.dune.com/api-reference/connectors/sql-operations) +- [dbt connector overview](https://docs.dune.com/api-reference/connectors/dbt/overview) diff --git a/macros/dune_dbt_overrides/datashare_table_sync_post_hook.sql b/macros/dune_dbt_overrides/datashare_table_sync_post_hook.sql new file mode 100644 index 0000000..b3845f6 --- /dev/null +++ b/macros/dune_dbt_overrides/datashare_table_sync_post_hook.sql @@ -0,0 +1,134 @@ +{% macro _datashare_sql_string(value) %} + {{ return("'" ~ (value | string | replace("'", "''")) ~ "'") }} +{%- endmacro -%} + +{% macro _datashare_unique_key_columns_sql(unique_key_columns) %} + {%- if unique_key_columns is string -%} + {%- set unique_key_columns = [unique_key_columns] -%} + {%- elif unique_key_columns is not iterable or unique_key_columns is mapping -%} + {{ return("CAST(ARRAY[] AS ARRAY(VARCHAR))") }} + {%- endif -%} + {%- set quoted = [] -%} + {%- for col in unique_key_columns -%} + {%- do quoted.append(_datashare_sql_string(col)) -%} + {%- endfor -%} + {{ return("CAST(ARRAY[] AS ARRAY(VARCHAR))" if quoted | length == 0 else "ARRAY[" ~ quoted | join(', ') ~ "]") }} +{%- endmacro -%} + +{% macro _datashare_optional_time_sql(value) %} + {{ return('NULL' if value is none else 'CAST(' ~ value ~ ' AS VARCHAR)') }} +{%- endmacro -%} + +{# + Datashare sync macro - generates ALTER TABLE ... EXECUTE datashare() SQL. + Config reference and usage: docs/dune-datashares.md +#} +{% macro _datashare_table_sync_sql( + schema_name + , table_name + , meta + , materialized + , unique_key=None + , time_start=None + , time_end=None + , full_refresh=False + , catalog_name=target.database +) %} + {%- set model_ref = schema_name ~ '.' ~ table_name -%} + {%- if meta is not mapping or meta.get('datashare') is none or meta.get('datashare') is not mapping -%} + {{ log('Skipping datashare sync for ' ~ model_ref ~ ': meta.datashare is not configured.', info=True) }} + {{ return(none) }} + {%- endif -%} + {%- set datashare = meta.get('datashare') -%} + {%- if datashare.get('enabled') is not sameas true -%} + {{ log('Skipping datashare sync for ' ~ model_ref ~ ': meta.datashare.enabled is not true.', info=True) }} + {{ return(none) }} + {%- endif -%} + {%- if materialized not in ['incremental', 'table'] -%} + {{ log('Skipping datashare sync for ' ~ model_ref ~ ': materialization "' ~ materialized ~ '" is not incremental/table.') }} + {{ return(none) }} + {%- endif -%} + {%- set time_column = datashare.get('time_column') -%} + {%- set resolved_time_start = time_start if time_start is not none else datashare.get('time_start') -%} + {%- set resolved_time_end = time_end if time_end is not none else datashare.get('time_end', 'now()') -%} + + {%- set sql -%} +ALTER TABLE {{ catalog_name }}.{{ schema_name }}.{{ table_name }} EXECUTE datashare( + time_column => {{ _datashare_sql_string(time_column | default('', true)) }}, + unique_key_columns => {{ _datashare_unique_key_columns_sql(datashare.get('unique_key_columns', unique_key)) }}, + time_start => {{ _datashare_optional_time_sql(resolved_time_start) }}, + time_end => {{ _datashare_optional_time_sql(resolved_time_end) }}, + full_refresh => {{ 'true' if full_refresh else 'false' }} +) + {%- endset -%} + {{ log('datashare sync preview for ' ~ model_ref ~ ':\n' ~ sql, info=True) }} + {{ return(sql) }} +{%- endmacro -%} + +{% macro datashare_trigger_sync() %} + {%- if target.name != 'prod' -%} + {{ log('Skipping datashare sync for ' ~ this.schema ~ '.' ~ this.identifier ~ ': datashare post-hook only runs on the prod target.', info=True) }} + {{ return('') }} + {%- endif -%} + {{ return(_datashare_table_sync_sql( + schema_name=this.schema, + table_name=this.identifier, + meta=model.config.get('meta', {}), + materialized=model.config.materialized, + unique_key=model.config.get('unique_key'), + full_refresh=(not is_incremental()) + ) or '') }} +{%- endmacro -%} + +{% macro _datashare_resolve_model_node(model_selector) %} + {%- set matches = [] -%} + {%- for node in graph.nodes.values() -%} + {%- if node.resource_type == 'model' -%} + {%- set fqn_name = node.fqn | join('.') -%} + {%- if node.unique_id == model_selector or node.name == model_selector or node.alias == model_selector or fqn_name == model_selector -%} + {%- do matches.append(node) -%} + {%- endif -%} + {%- endif -%} + {%- endfor -%} + + {%- if matches | length == 0 -%} + {{ exceptions.raise_compiler_error("No model found for selector '" ~ model_selector ~ "'. Use model name, alias, fqn, or unique_id.") }} + {%- endif -%} + + {%- if matches | length > 1 -%} + {{ exceptions.raise_compiler_error("Model selector '" ~ model_selector ~ "' is ambiguous. Matches: " ~ (matches | map(attribute='unique_id') | join(', '))) }} + {%- endif -%} + + {{ return(matches[0]) }} +{%- endmacro -%} + +{% macro datashare_trigger_sync_operation(model_selector, time_start=None, time_end=None, dry_run=False, full_refresh=False) %} + {%- set node = _datashare_resolve_model_node(model_selector) -%} + {%- set node_config = node.config if node.config is mapping else {} -%} + {%- set materialized = node_config.get('materialized', 'view') -%} + {%- set table_name = node.alias if node.alias is not none else node.name -%} + {%- set is_full_refresh = materialized == 'table' or full_refresh is sameas true -%} + + {%- set sql = _datashare_table_sync_sql( + schema_name=node.schema, + table_name=table_name, + meta=node_config.get('meta', {}), + materialized=materialized, + unique_key=node_config.get('unique_key'), + time_start=time_start, + time_end=time_end, + full_refresh=is_full_refresh, + catalog_name=node.database or target.database + ) -%} + + {%- if sql is none -%} + {{ exceptions.raise_compiler_error("Cannot sync " ~ node.schema ~ "." ~ table_name ~ ": model must be incremental or table with meta.datashare.enabled = true.") }} + {%- endif -%} + + {%- set is_dry_run = dry_run is sameas true or (dry_run is string and dry_run | lower in ['true', '1', 'yes', 'y']) -%} + {%- if not is_dry_run -%} + {% do run_query(sql) %} + {{ log('Executed datashare sync for selector ' ~ model_selector, info=True) }} + {%- endif -%} + {{ return(sql) }} +{%- endmacro -%} diff --git a/models/templates/_schema.yml b/models/templates/_schema.yml index 08896fe..7b4c35b 100644 --- a/models/templates/_schema.yml +++ b/models/templates/_schema.yml @@ -64,3 +64,18 @@ models: description: "The date of the block" - name: total_tx_per_block description: "The total number of transactions per block" + - name: dbt_template_datashare_incremental_model + description: "A starter dbt incremental model using merge strategy with datashare sync enabled" + data_tests: + - dbt_utils.unique_combination_of_columns: + arguments: + combination_of_columns: + - block_number + - block_date + columns: + - name: block_number + description: "The unique block number in the sync window" + - name: block_date + description: "The date used for datashare sync windows" + - name: total_tx_per_block + description: "The total number of transactions per block" diff --git a/models/templates/dbt_template_datashare_incremental_model.sql b/models/templates/dbt_template_datashare_incremental_model.sql new file mode 100644 index 0000000..fc783d2 --- /dev/null +++ b/models/templates/dbt_template_datashare_incremental_model.sql @@ -0,0 +1,33 @@ +{% set time_start = "current_date - interval '1' day" if is_incremental() else "current_date - interval '2' day" %} +{% set time_end = "current_date + interval '1' day" %} + +{{ config( + alias = 'dbt_template_datashare_incremental_model' + , materialized = 'incremental' + , incremental_strategy = 'merge' + , unique_key = ['block_number', 'block_date'] + , incremental_predicates = ["DBT_INTERNAL_DEST.block_date >= current_date - interval '1' day"] + , meta = { + "dune": { + "public": false + }, + "datashare": { + "enabled": true, + "time_column": "block_date", + "time_start": time_start, + "time_end": time_end + } + } + , properties = { + "partitioned_by": "ARRAY['block_date']" + } +) }} + +select + block_number + , block_date + , count(*) as total_tx_per_block +from {{ source('ethereum', 'transactions') }} +where block_date >= {{ time_start }} + and block_date < {{ time_end }} +group by 1, 2