Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ attrs = "*"
boto3 = "*"
duckdb = "*"
duckdb-engine = "*"
pandas = "*"
pandas = "<3.0.0"
pyarrow = "*"
sqlalchemy = "==2.0.44"

Expand Down
1,267 changes: 654 additions & 613 deletions Pipfile.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ dependencies = [
"boto3",
"duckdb",
"duckdb_engine",
"pandas",
"pandas<3.0.0",
"pyarrow",
"sqlalchemy==2.0.44"
]
Expand Down
9 changes: 2 additions & 7 deletions tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,17 +248,12 @@ def test_dataset_duckdb_context_created_on_init(timdex_dataset):


def test_dataset_duckdb_context_creates_data_schema(timdex_dataset):
assert (
timdex_dataset.conn.query(
"""
assert timdex_dataset.conn.query("""
select count(*)
from information_schema.schemata
where catalog_name = 'memory'
and schema_name = 'data';
"""
).fetchone()[0]
== 1
)
""").fetchone()[0] == 1


def test_dataset_preload_current_records_default_false(timdex_dataset):
Expand Down
81 changes: 22 additions & 59 deletions tests/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,12 @@ def test_tdm_init_metadata_file_found_success(timdex_metadata):


def test_tdm_duckdb_context_creates_metadata_schema(timdex_metadata):
assert (
timdex_metadata.conn.query(
"""
assert timdex_metadata.conn.query("""
select count(*)
from information_schema.schemata
where catalog_name = 'memory'
and schema_name = 'metadata';
"""
).fetchone()[0]
== 1
)
""").fetchone()[0] == 1


def test_tdm_connection_has_static_database_attached(timdex_metadata):
Expand Down Expand Up @@ -241,38 +236,32 @@ def test_tdm_current_records_with_deltas_logic(timdex_metadata_with_deltas):

def test_tdm_current_records_most_recent_version(timdex_metadata_with_deltas):
# check that for records with multiple versions, only the most recent is returned
multi_version_records = timdex_metadata_with_deltas.conn.query(
"""
multi_version_records = timdex_metadata_with_deltas.conn.query("""
select timdex_record_id, count(*) as version_count
from metadata.records
group by timdex_record_id
having count(*) > 1
limit 1;
"""
).to_df()
""").to_df()

if len(multi_version_records) > 0:
record_id = multi_version_records.iloc[0]["timdex_record_id"]

# get most recent timestamp for this record
most_recent = timdex_metadata_with_deltas.conn.query(
f"""
most_recent = timdex_metadata_with_deltas.conn.query(f"""
select run_timestamp, run_id
from metadata.records
where timdex_record_id = '{record_id}'
order by run_timestamp desc
limit 1;
"""
).to_df()
""").to_df()

# verify current_records contains this version
current_version = timdex_metadata_with_deltas.conn.query(
f"""
current_version = timdex_metadata_with_deltas.conn.query(f"""
select run_timestamp, run_id
from metadata.current_records
where timdex_record_id = '{record_id}';
"""
).to_df()
""").to_df()

assert len(current_version) == 1
assert (
Expand All @@ -294,21 +283,17 @@ def test_tdm_merge_append_deltas_static_counts_match_records_count_before_merge(
def test_tdm_merge_append_deltas_adds_records_to_static_db(
timdex_metadata_with_deltas, timdex_metadata_merged_deltas
):
append_deltas = timdex_metadata_with_deltas.conn.query(
f"""
append_deltas = timdex_metadata_with_deltas.conn.query(f"""
select
{','.join(ORDERED_METADATA_COLUMN_NAMES)}
from metadata.append_deltas
"""
).to_df()
""").to_df()

merged_static_db = timdex_metadata_merged_deltas.conn.query(
f"""
merged_static_db = timdex_metadata_merged_deltas.conn.query(f"""
select
{','.join(ORDERED_METADATA_COLUMN_NAMES)}
from static_db.records
"""
).to_df()
""").to_df()

assert set(map(tuple, append_deltas.to_numpy())).issubset(
set(map(tuple, merged_static_db.to_numpy()))
Expand All @@ -332,18 +317,12 @@ def test_td_prepare_duckdb_secret_and_extensions_home_env_var_set_and_valid(
monkeypatch.setenv("HOME", str(preset_home))

td = TIMDEXDataset(timdex_dataset_with_runs.location)
df = (
td.conn.query(
"""
df = td.conn.query("""
select
current_setting('secret_directory') as secret_directory,
current_setting('extension_directory') as extension_directory
;
"""
)
.to_df()
.iloc[0]
)
""").to_df().iloc[0]
assert "my-account" in df.secret_directory
assert df.extension_directory == "" # expected and okay when HOME set

Expand All @@ -355,18 +334,12 @@ def test_td_prepare_duckdb_secret_and_extensions_home_env_var_unset(

td = TIMDEXDataset(timdex_dataset_with_runs.location)

df = (
td.conn.query(
"""
df = td.conn.query("""
select
current_setting('secret_directory') as secret_directory,
current_setting('extension_directory') as extension_directory
;
"""
)
.to_df()
.iloc[0]
)
""").to_df().iloc[0]
assert df.secret_directory == "/tmp/.duckdb/secrets"
assert df.extension_directory == "/tmp/.duckdb/extensions"

Expand All @@ -378,18 +351,12 @@ def test_td_prepare_duckdb_secret_and_extensions_home_env_var_set_but_empty(

td = TIMDEXDataset(timdex_dataset_with_runs.location)

df = (
td.conn.query(
"""
df = td.conn.query("""
select
current_setting('secret_directory') as secret_directory,
current_setting('extension_directory') as extension_directory
;
"""
)
.to_df()
.iloc[0]
)
""").to_df().iloc[0]
assert df.secret_directory == "/tmp/.duckdb/secrets"
assert df.extension_directory == "/tmp/.duckdb/extensions"

Expand All @@ -411,16 +378,14 @@ def test_tdm_preload_false_no_temp_table(timdex_dataset_with_runs):
td = TIMDEXDataset(timdex_dataset_with_runs.location)

# assert that materialized, temporary table "temp.current_records" does not exist
temp_table_count = td.metadata.conn.query(
"""
temp_table_count = td.metadata.conn.query("""
select count(*)
from information_schema.tables
where table_catalog = 'temp'
and table_name = 'current_records'
and table_type = 'LOCAL TEMPORARY'
;
"""
).fetchone()[0]
""").fetchone()[0]

assert temp_table_count == 0

Expand All @@ -430,15 +395,13 @@ def test_tdm_preload_true_has_temp_table(timdex_dataset_with_runs):
td = TIMDEXDataset(timdex_dataset_with_runs.location, preload_current_records=True)

# assert that materialized, temporary table "temp.current_records" does exist
temp_table_count = td.metadata.conn.query(
"""
temp_table_count = td.metadata.conn.query("""
select count(*)
from information_schema.tables
where table_catalog = 'temp'
and table_name = 'current_records'
and table_type = 'LOCAL TEMPORARY'
;
"""
).fetchone()[0]
""").fetchone()[0]

assert temp_table_count == 1
6 changes: 2 additions & 4 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,12 @@ def generate_sample_embeddings_for_run(
embedding_dimensions: int = 3,
) -> Iterator[DatasetEmbedding]:
"""Generate sample DatasetEmbeddings for a given ETL run."""
records_metadata = timdex_dataset.conn.query(
f"""
records_metadata = timdex_dataset.conn.query(f"""
select
*
from metadata.records
where run_id = '{run_id}';
"""
).to_df()
""").to_df()

if not embedding_timestamp:
embedding_timestamp = records_metadata.iloc[0].run_timestamp.isoformat()
Expand Down
2 changes: 1 addition & 1 deletion timdex_dataset_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from timdex_dataset_api.metadata import TIMDEXDatasetMetadata
from timdex_dataset_api.record import DatasetRecord

__version__ = "3.10.0"
__version__ = "3.11.0"

__all__ = [
"DatasetEmbedding",
Expand Down
18 changes: 6 additions & 12 deletions timdex_dataset_api/embeddings.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,7 @@ def _create_embeddings_view(self, conn: DuckDBPyConnection) -> None:
"""Create a view that projects over embeddings parquet files."""
logger.debug("creating view data.embeddings")

conn.execute(
f"""
conn.execute(f"""
create or replace view data.embeddings as
(
select *
Expand All @@ -192,8 +191,7 @@ def _create_embeddings_view(self, conn: DuckDBPyConnection) -> None:
filename=true
)
);
"""
)
""")

def _create_current_embeddings_view(self, conn: DuckDBPyConnection) -> None:
"""Create a view of current embedding records.
Expand All @@ -205,8 +203,7 @@ def _create_current_embeddings_view(self, conn: DuckDBPyConnection) -> None:
logger.debug("creating view data.current_embeddings")

# SQL for the current records logic (CTEs)
conn.execute(
"""
conn.execute("""
create or replace view data.current_embeddings as
(
with
Expand All @@ -229,8 +226,7 @@ def _create_current_embeddings_view(self, conn: DuckDBPyConnection) -> None:
from ce_ranked_embeddings
where rn = 1
);
"""
)
""")

def _create_current_run_embeddings_view(self, conn: DuckDBPyConnection) -> None:
"""Create a view of current embedding records per run.
Expand All @@ -242,8 +238,7 @@ def _create_current_run_embeddings_view(self, conn: DuckDBPyConnection) -> None:
logger.debug("creating view data.current_run_embeddings")

# SQL for the current records logic (CTEs)
conn.execute(
"""
conn.execute("""
create or replace view data.current_run_embeddings as
(
with
Expand All @@ -267,8 +262,7 @@ def _create_current_run_embeddings_view(self, conn: DuckDBPyConnection) -> None:
from ce_ranked_embeddings
where rn = 1
);
"""
)
""")

def write(
self,
Expand Down
Loading