Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 190 additions & 0 deletions notebooks/integration_duckdb_example.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "1",
"metadata": {},
"outputs": [],
"source": [
"# Libraries\n",
"import duckdb\n",
"import pyarrow as pa\n",
"import pyarrow.compute as pc\n",
"import os\n",
"import tempfile\n",
"from pyiceberg.catalog.sql import SqlCatalog\n",
"\n",
"# Create temporary folders for the warehouse and catalog\n",
"warehouse_path = tempfile.mkdtemp(prefix='iceberg_warehouse_')\n",
"catalog_path = os.path.join(warehouse_path, 'catalog.db')\n",
"print('Temporary warehouse:', warehouse_path)\n",
"print('Temporary catalog:', catalog_path)\n",
"\n",
"# Create a temporary SQL catalog using SQLite\n",
"catalog = SqlCatalog(\n",
" name='tmp_sql_catalog',\n",
" uri=f'sqlite:///{catalog_path}',\n",
" warehouse=f'file://{warehouse_path}',\n",
" properties={}\n",
")\n",
"# Create the default namespace\n",
"catalog.create_namespace('default')"
]
},
{
"cell_type": "markdown",
"id": "2",
"metadata": {},
"source": [
"## First snapshot\n",
"We create the initial dataset and save it to an Iceberg table to create the first snapshot."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2",
"metadata": {},
"outputs": [],
"source": [
"# Initial dataset\n",
"data1 = {\n",
" 'vendor_id':[1,2,1,2,1],\n",
" 'trip_distance':[1.5,2.3,0.8,5.2,3.1],\n",
" 'fare_amount':[10.0,15.5,6.0,22.0,18.0],\n",
" 'tip_amount':[2.0,3.0,1.0,4.5,3.5],\n",
" 'passenger_count':[1,2,1,3,2]\n",
"}\n",
"df1 = pa.table(data1)\n",
"\n",
"# Create the Iceberg table and append initial data (first snapshot)\n",
"table = catalog.create_table('default.sample_trips', schema=df1.schema)\n",
"table.append(df1)\n",
"print('First snapshot rows:', len(table.scan().to_arrow()))"
]
},
{
"cell_type": "markdown",
"id": "3",
"metadata": {},
"source": [
"## Second snapshot\n",
"We add new data to the same table, creating a second snapshot."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3",
"metadata": {},
"outputs": [],
"source": [
"# New dataset for the second snapshot\n",
"data2 = {\n",
" 'vendor_id':[3,1],\n",
" 'trip_distance':[2.0,1.0],\n",
" 'fare_amount':[12.0,8.0],\n",
" 'tip_amount':[1.5,2.0],\n",
" 'passenger_count':[1,1]\n",
"}\n",
"df2 = pa.table(data2)\n",
"\n",
"# Append new data to the table (second snapshot)\n",
"table.append(df2)\n",
"print('Second snapshot total rows:', len(table.scan().to_arrow()))"
]
},
{
"cell_type": "markdown",
"id": "4",
"metadata": {},
"source": [
"## Compare snapshots using DuckDB\n",
"We load both snapshots into DuckDB as temporary tables to find added and removed rows."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4",
"metadata": {},
"outputs": [],
"source": [
"# Get snapshot IDs\n",
"snapshots = table.snapshots()\n",
"first_id = snapshots[0].snapshot_id\n",
"second_id = snapshots[-1].snapshot_id\n",
"print('Snapshot IDs:', first_id, second_id)\n",
"\n",
"# Load snapshots into PyArrow tables\n",
"arrow_first = table.scan(snapshot_id=first_id).to_arrow()\n",
"arrow_second = table.scan(snapshot_id=second_id).to_arrow()\n",
"\n",
"# Connect to DuckDB and register tables\n",
"con = duckdb.connect()\n",
"con.register('first_snap', arrow_first)\n",
"con.register('second_snap', arrow_second)\n",
"\n",
"# Find added rows in the second snapshot\n",
"added_rows = con.execute('''\n",
"SELECT * FROM second_snap\n",
"EXCEPT\n",
"SELECT * FROM first_snap\n",
"''').fetchall()\n",
"\n",
"# Find removed rows compared to the first snapshot\n",
"removed_rows = con.execute('''\n",
"SELECT * FROM first_snap\n",
"EXCEPT\n",
"SELECT * FROM second_snap\n",
"''').fetchall()\n",
"\n",
"print('=== ADDED ROWS ===')\n",
"for r in added_rows:\n",
" print(r)\n",
"\n",
"print('\\n=== REMOVED ROWS ===')\n",
"for r in removed_rows:\n",
" print(r)"
]
},
{
"cell_type": "markdown",
"id": "5",
"metadata": {},
"source": [
"## Filters and aggregations on the second snapshot\n",
"We add a computed column and perform filtering and aggregation using DuckDB."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5",
"metadata": {},
"outputs": [],
"source": [
"# Add computed column 'tip_per_mile'\n",
"arrow_second = arrow_second.append_column('tip_per_mile', pc.divide(arrow_second['tip_amount'], arrow_second['trip_distance']))\n",
"con.register('second_snap', arrow_second)\n",
"\n",
"# Filter rows with tip_per_mile > 1.0\n",
"filtered_df = con.execute('SELECT * FROM second_snap WHERE tip_per_mile > 1.0').fetchdf()\n",
"print('Filtered rows (tip_per_mile > 1.0):')\n",
"print(filtered_df)\n",
"\n",
"# Aggregate total fare by vendor\n",
"agg_df = con.execute('SELECT vendor_id, SUM(fare_amount) AS total_fare FROM second_snap GROUP BY vendor_id').fetchdf()\n",
"print('Total fare per vendor:')\n",
"print(agg_df)"
]
}
],
"metadata": {
"kernelspec": {"display_name":"Python 3 (ipykernel)","language":"python","name":"python3"},
"language_info":{"name":"python","version":"3.12"}
},
"nbformat": 4,
"nbformat_minor": 5
}
Loading