-
Notifications
You must be signed in to change notification settings - Fork 12
Add datashare support to the template repo #61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,175 @@ | ||
| # Dune Datashares | ||
|
|
||
| Datashares sync Dune tables to external data warehouses such as Snowflake and BigQuery so downstream consumers can query the data outside Dune. | ||
|
|
||
| ## Prerequisites | ||
|
|
||
| Datashare is an enterprise feature that requires setup before any SQL statements will work: | ||
|
|
||
| 1. Contract and feature enablement with Dune. | ||
| 2. Target warehouse configuration in Dune backoffice. | ||
| 3. A Dune API key with Data Transformations access. | ||
|
|
||
| If datashare is not enabled for your team, the SQL statements below will fail with an authorization error. | ||
|
|
||
| Datashare syncs are billed based on bytes transferred and byte-months of storage for the synced table. | ||
|
|
||
| ## What This Template Includes | ||
|
|
||
| This template ships with datashare support already wired in: | ||
|
|
||
| - `macros/dune_dbt_overrides/datashare_table_sync_post_hook.sql` | ||
| - a global post-hook in `dbt_project.yml` that calls `datashare_trigger_sync()` | ||
| - an opt-in example model at `models/templates/dbt_template_datashare_incremental_model.sql` | ||
|
|
||
| Models without `meta.datashare` are unchanged. The hook skips them. | ||
|
|
||
| The built-in post-hook only executes on the `prod` target, so local `dev` runs and CI temp schemas do not create datashare syncs by default. | ||
|
|
||
| ## Supported Models | ||
|
|
||
| Datashare sync is only applied to `table` and `incremental` models. | ||
|
|
||
| Views are skipped. | ||
|
|
||
| ## Enable Datashare On A Model | ||
|
|
||
| Add `meta.datashare` to a `table` or `incremental` model: | ||
|
|
||
| ```sql | ||
| {% set time_start = "current_date - interval '1' day" if is_incremental() else "current_date - interval '2' day" %} | ||
| {% set time_end = "current_date + interval '1' day" %} | ||
|
|
||
| {{ config( | ||
| alias = 'my_datashared_model' | ||
| , materialized = 'incremental' | ||
| , incremental_strategy = 'merge' | ||
| , unique_key = ['block_number', 'block_date'] | ||
| , meta = { | ||
| "datashare": { | ||
| "enabled": true, | ||
| "time_column": "block_date", | ||
| "time_start": time_start, | ||
| "time_end": time_end | ||
| } | ||
| } | ||
| ) }} | ||
|
|
||
| select ... | ||
| ``` | ||
|
|
||
| The included example model in this repo follows this pattern. | ||
|
|
||
| ## Configuration Reference | ||
|
|
||
| All datashare config lives under `meta.datashare` in the model `config()` block. | ||
|
|
||
| | Property | Required | Type | Description | | ||
| | --- | --- | --- | --- | | ||
| | `enabled` | Yes | `boolean` | Must be `true` to trigger sync. | | ||
| | `time_column` | Yes | `string` | Column used to define the sync window. | | ||
| | `time_start` | Yes | `string` | SQL expression for the start of the sync window. | | ||
| | `time_end` | No | `string` | SQL expression for the end of the sync window. Defaults to `now()`. | | ||
| | `unique_key_columns` | No | `list[string]` | Row identity columns. Falls back to the model `unique_key` if omitted. | | ||
|
|
||
| `time_start` and `time_end` are SQL expressions, not literal timestamps. The macro wraps them in `CAST(... AS VARCHAR)` before calling the table procedure. | ||
|
|
||
| Keep the sync window aligned with the `time_column` granularity. For example, if `time_column` is a `date`, use date-based expressions like `current_date - interval '1' day`, not hour-based timestamp windows. | ||
|
|
||
| ## Full Refresh Behavior | ||
|
|
||
| The macro determines `full_refresh` automatically: | ||
|
|
||
| | Context | `full_refresh` | | ||
| | --- | --- | | ||
| | Incremental post-hook on a normal incremental run | `false` | | ||
| | Incremental post-hook on first run or `--full-refresh` | `true` | | ||
| | Table materialization post-hook | `true` | | ||
| | `run-operation` | `false` unless overridden | | ||
|
|
||
| ## Generated SQL | ||
|
|
||
| The post-hook generates this Trino statement: | ||
|
|
||
| ```sql | ||
| ALTER TABLE dune.<schema>.<table> EXECUTE datashare( | ||
| time_column => '<column_name>', | ||
| unique_key_columns => ARRAY['col1', 'col2'], | ||
| time_start => CAST(<sql_expression> AS VARCHAR), | ||
| time_end => CAST(<sql_expression> AS VARCHAR), | ||
| full_refresh => true|false | ||
| ) | ||
| ``` | ||
|
|
||
| ## Manual Syncs | ||
|
|
||
| Use `run-operation` when you want to trigger a sync outside `dbt run`. | ||
|
|
||
| Preview the generated SQL only: | ||
|
|
||
| ```bash | ||
| uv run dbt run-operation datashare_trigger_sync_operation --args ' | ||
| model_selector: dbt_template_datashare_incremental_model | ||
| dry_run: true | ||
| ' | ||
| ``` | ||
|
|
||
| Execute a sync: | ||
|
|
||
| ```bash | ||
| uv run dbt run-operation datashare_trigger_sync_operation --args ' | ||
| model_selector: dbt_template_datashare_incremental_model | ||
| time_start: "current_date - interval '\''7'\'' day" | ||
| time_end: "current_date + interval '\''1'\'' day" | ||
| ' | ||
| ``` | ||
|
|
||
| Force a full refresh sync: | ||
|
|
||
| ```bash | ||
| uv run dbt run-operation datashare_trigger_sync_operation --args ' | ||
| model_selector: dbt_template_datashare_incremental_model | ||
| full_refresh: true | ||
| ' | ||
| ``` | ||
|
|
||
| `model_selector` accepts the model name, alias, fully qualified name, or dbt `unique_id`. | ||
|
|
||
| ## Monitoring | ||
|
|
||
| Check the datashare system tables after a run: | ||
|
|
||
| ```sql | ||
| SELECT * | ||
| FROM dune.datashare.table_syncs | ||
| WHERE source_schema = '<your_schema>'; | ||
|
|
||
| SELECT * | ||
| FROM dune.datashare.table_sync_runs | ||
| WHERE source_schema = '<your_schema>' | ||
| ORDER BY created_at DESC; | ||
| ``` | ||
|
|
||
| `table_syncs` shows the registered share and its latest status. | ||
|
|
||
| `table_sync_runs` shows individual sync attempts, including the time window and whether the run was a full refresh. | ||
|
|
||
| ## Cleanup | ||
|
|
||
| Remove a table from datashare with: | ||
|
|
||
| ```sql | ||
| ALTER TABLE dune.<schema>.<table> EXECUTE delete_datashare | ||
| ``` | ||
|
|
||
| ## Example Workflow | ||
|
|
||
| 1. Configure a model with `meta.datashare`. | ||
| 2. Run it with `uv run dbt run --select my_model --target prod`. | ||
| 3. Confirm the datashare registration in `dune.datashare.table_syncs`. | ||
| 4. Inspect run history in `dune.datashare.table_sync_runs`. | ||
|
|
||
| ## Further Reading | ||
|
|
||
| - [Supported SQL Operations](https://docs.dune.com/api-reference/connectors/sql-operations) | ||
| - [dbt connector overview](https://docs.dune.com/api-reference/connectors/dbt/overview) |
134 changes: 134 additions & 0 deletions
134
macros/dune_dbt_overrides/datashare_table_sync_post_hook.sql
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,134 @@ | ||
| {% macro _datashare_sql_string(value) %} | ||
| {{ return("'" ~ (value | string | replace("'", "''")) ~ "'") }} | ||
| {%- endmacro -%} | ||
|
|
||
| {% macro _datashare_unique_key_columns_sql(unique_key_columns) %} | ||
| {%- if unique_key_columns is string -%} | ||
| {%- set unique_key_columns = [unique_key_columns] -%} | ||
| {%- elif unique_key_columns is not iterable or unique_key_columns is mapping -%} | ||
| {{ return("CAST(ARRAY[] AS ARRAY(VARCHAR))") }} | ||
| {%- endif -%} | ||
| {%- set quoted = [] -%} | ||
| {%- for col in unique_key_columns -%} | ||
| {%- do quoted.append(_datashare_sql_string(col)) -%} | ||
| {%- endfor -%} | ||
| {{ return("CAST(ARRAY[] AS ARRAY(VARCHAR))" if quoted | length == 0 else "ARRAY[" ~ quoted | join(', ') ~ "]") }} | ||
| {%- endmacro -%} | ||
|
|
||
| {% macro _datashare_optional_time_sql(value) %} | ||
| {{ return('NULL' if value is none else 'CAST(' ~ value ~ ' AS VARCHAR)') }} | ||
| {%- endmacro -%} | ||
|
|
||
| {# | ||
| Datashare sync macro - generates ALTER TABLE ... EXECUTE datashare() SQL. | ||
| Config reference and usage: docs/dune-datashares.md | ||
| #} | ||
| {% macro _datashare_table_sync_sql( | ||
| schema_name | ||
| , table_name | ||
| , meta | ||
| , materialized | ||
| , unique_key=None | ||
| , time_start=None | ||
| , time_end=None | ||
| , full_refresh=False | ||
| , catalog_name=target.database | ||
| ) %} | ||
| {%- set model_ref = schema_name ~ '.' ~ table_name -%} | ||
| {%- if meta is not mapping or meta.get('datashare') is none or meta.get('datashare') is not mapping -%} | ||
| {{ log('Skipping datashare sync for ' ~ model_ref ~ ': meta.datashare is not configured.', info=True) }} | ||
| {{ return(none) }} | ||
| {%- endif -%} | ||
| {%- set datashare = meta.get('datashare') -%} | ||
| {%- if datashare.get('enabled') is not sameas true -%} | ||
| {{ log('Skipping datashare sync for ' ~ model_ref ~ ': meta.datashare.enabled is not true.', info=True) }} | ||
| {{ return(none) }} | ||
| {%- endif -%} | ||
| {%- if materialized not in ['incremental', 'table'] -%} | ||
| {{ log('Skipping datashare sync for ' ~ model_ref ~ ': materialization "' ~ materialized ~ '" is not incremental/table.') }} | ||
| {{ return(none) }} | ||
| {%- endif -%} | ||
| {%- set time_column = datashare.get('time_column') -%} | ||
| {%- set resolved_time_start = time_start if time_start is not none else datashare.get('time_start') -%} | ||
| {%- set resolved_time_end = time_end if time_end is not none else datashare.get('time_end', 'now()') -%} | ||
|
|
||
| {%- set sql -%} | ||
| ALTER TABLE {{ catalog_name }}.{{ schema_name }}.{{ table_name }} EXECUTE datashare( | ||
| time_column => {{ _datashare_sql_string(time_column | default('', true)) }}, | ||
| unique_key_columns => {{ _datashare_unique_key_columns_sql(datashare.get('unique_key_columns', unique_key)) }}, | ||
| time_start => {{ _datashare_optional_time_sql(resolved_time_start) }}, | ||
| time_end => {{ _datashare_optional_time_sql(resolved_time_end) }}, | ||
| full_refresh => {{ 'true' if full_refresh else 'false' }} | ||
| ) | ||
| {%- endset -%} | ||
| {{ log('datashare sync preview for ' ~ model_ref ~ ':\n' ~ sql, info=True) }} | ||
| {{ return(sql) }} | ||
| {%- endmacro -%} | ||
|
|
||
| {% macro datashare_trigger_sync() %} | ||
| {%- if target.name != 'prod' -%} | ||
| {{ log('Skipping datashare sync for ' ~ this.schema ~ '.' ~ this.identifier ~ ': datashare post-hook only runs on the prod target.', info=True) }} | ||
| {{ return('') }} | ||
| {%- endif -%} | ||
| {{ return(_datashare_table_sync_sql( | ||
| schema_name=this.schema, | ||
| table_name=this.identifier, | ||
| meta=model.config.get('meta', {}), | ||
| materialized=model.config.materialized, | ||
| unique_key=model.config.get('unique_key'), | ||
| full_refresh=(not is_incremental()) | ||
| ) or '') }} | ||
| {%- endmacro -%} | ||
|
|
||
| {% macro _datashare_resolve_model_node(model_selector) %} | ||
| {%- set matches = [] -%} | ||
| {%- for node in graph.nodes.values() -%} | ||
| {%- if node.resource_type == 'model' -%} | ||
| {%- set fqn_name = node.fqn | join('.') -%} | ||
| {%- if node.unique_id == model_selector or node.name == model_selector or node.alias == model_selector or fqn_name == model_selector -%} | ||
| {%- do matches.append(node) -%} | ||
| {%- endif -%} | ||
| {%- endif -%} | ||
| {%- endfor -%} | ||
|
|
||
| {%- if matches | length == 0 -%} | ||
| {{ exceptions.raise_compiler_error("No model found for selector '" ~ model_selector ~ "'. Use model name, alias, fqn, or unique_id.") }} | ||
| {%- endif -%} | ||
|
|
||
| {%- if matches | length > 1 -%} | ||
| {{ exceptions.raise_compiler_error("Model selector '" ~ model_selector ~ "' is ambiguous. Matches: " ~ (matches | map(attribute='unique_id') | join(', '))) }} | ||
| {%- endif -%} | ||
|
|
||
| {{ return(matches[0]) }} | ||
| {%- endmacro -%} | ||
|
|
||
| {% macro datashare_trigger_sync_operation(model_selector, time_start=None, time_end=None, dry_run=False, full_refresh=False) %} | ||
| {%- set node = _datashare_resolve_model_node(model_selector) -%} | ||
| {%- set node_config = node.config if node.config is mapping else {} -%} | ||
| {%- set materialized = node_config.get('materialized', 'view') -%} | ||
| {%- set table_name = node.alias if node.alias is not none else node.name -%} | ||
| {%- set is_full_refresh = materialized == 'table' or full_refresh is sameas true -%} | ||
|
|
||
| {%- set sql = _datashare_table_sync_sql( | ||
| schema_name=node.schema, | ||
| table_name=table_name, | ||
| meta=node_config.get('meta', {}), | ||
| materialized=materialized, | ||
| unique_key=node_config.get('unique_key'), | ||
| time_start=time_start, | ||
| time_end=time_end, | ||
| full_refresh=is_full_refresh, | ||
| catalog_name=node.database or target.database | ||
| ) -%} | ||
|
|
||
| {%- if sql is none -%} | ||
| {{ exceptions.raise_compiler_error("Cannot sync " ~ node.schema ~ "." ~ table_name ~ ": model must be incremental or table with meta.datashare.enabled = true.") }} | ||
| {%- endif -%} | ||
|
|
||
| {%- set is_dry_run = dry_run is sameas true or (dry_run is string and dry_run | lower in ['true', '1', 'yes', 'y']) -%} | ||
| {%- if not is_dry_run -%} | ||
| {% do run_query(sql) %} | ||
| {{ log('Executed datashare sync for selector ' ~ model_selector, info=True) }} | ||
| {%- endif -%} | ||
| {{ return(sql) }} | ||
| {%- endmacro -%} | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
33 changes: 33 additions & 0 deletions
33
models/templates/dbt_template_datashare_incremental_model.sql
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| {% set time_start = "current_date - interval '1' day" if is_incremental() else "current_date - interval '2' day" %} | ||
| {% set time_end = "current_date + interval '1' day" %} | ||
|
|
||
| {{ config( | ||
| alias = 'dbt_template_datashare_incremental_model' | ||
| , materialized = 'incremental' | ||
| , incremental_strategy = 'merge' | ||
| , unique_key = ['block_number', 'block_date'] | ||
| , incremental_predicates = ["DBT_INTERNAL_DEST.block_date >= current_date - interval '1' day"] | ||
| , meta = { | ||
| "dune": { | ||
| "public": false | ||
| }, | ||
| "datashare": { | ||
| "enabled": true, | ||
| "time_column": "block_date", | ||
| "time_start": time_start, | ||
| "time_end": time_end | ||
| } | ||
| } | ||
| , properties = { | ||
| "partitioned_by": "ARRAY['block_date']" | ||
| } | ||
| ) }} | ||
|
|
||
| select | ||
| block_number | ||
| , block_date | ||
| , count(*) as total_tx_per_block | ||
| from {{ source('ethereum', 'transactions') }} | ||
| where block_date >= {{ time_start }} | ||
| and block_date < {{ time_end }} | ||
| group by 1, 2 |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.