Skip to content

Commit 5ef2085

Browse files
authored
add a test for duckdb when converting factt hashes to numbers (#517)
* add a test for duckdb when converting factt hashes to numbers * small tweak
1 parent 83476ad commit 5ef2085

2 files changed

Lines changed: 99 additions & 1 deletion

File tree

digital_land/package/dataset_parquet.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,20 @@ def group_parquet_files(
244244
mem = process.memory_info().rss / 1024**2 # Memory in MB
245245
logger.info(f"[Memory usage] After grouping: {mem:.2f} MB")
246246

247+
# temporary diagnostic: verify row count is preserved across batching
248+
source_count = self.conn.execute(
249+
f"SELECT COUNT(*) FROM read_parquet('{str(transformed_parquet_dir)}/*.parquet')"
250+
).fetchone()[0]
251+
batch_count = self.conn.execute(
252+
f"SELECT COUNT(*) FROM read_parquet('{str(transformed_parquet_dir)}/batch/batch_*.parquet')"
253+
).fetchone()[0]
254+
logger.info(f"row count in source parquet files: {source_count}")
255+
logger.info(f"row count in batch files after grouping: {batch_count}")
256+
if source_count != batch_count:
257+
logger.warning(
258+
f"row count mismatch after grouping: {source_count} source rows vs {batch_count} batch rows"
259+
)
260+
247261
def load_facts(self, transformed_parquet_dir):
248262
"""
249263
This method loads facts into a fact table from a directory containing all transformed files as parquet files
@@ -342,14 +356,32 @@ def load_facts(self, transformed_parquet_dir):
342356
COPY (
343357
SELECT *
344358
FROM read_parquet('{f}')
345-
WHERE MOD(HASH(fact), {n_buckets}) = {i}
359+
WHERE CAST(HASH(fact) AS UBIGINT) % {n_buckets} = {i}
346360
) TO '{bucket_paths[i]}' (FORMAT PARQUET, APPEND TRUE);
347361
"""
348362
)
349363

350364
logger.info(
351365
f"Have {len(list(bucket_dir.glob('bucket_*.parquet')))} bucket files"
352366
)
367+
368+
# temporary diagnostic: total rows across all buckets should equal
369+
# total rows across all batch files (deduplication happens later)
370+
bucket_total = self.conn.execute(
371+
f"SELECT COUNT(*) FROM read_parquet('{str(bucket_dir)}/bucket_*.parquet')"
372+
).fetchone()[0]
373+
batch_total = self.conn.execute(
374+
f"SELECT COUNT(*) FROM read_parquet('{str(transformed_parquet_dir)}/batch/batch_*.parquet')"
375+
).fetchone()[0]
376+
logger.info(f"row count in batch files: {batch_total}")
377+
logger.info(
378+
f"row count in bucket files after assignment: {bucket_total}"
379+
)
380+
if bucket_total != batch_total:
381+
logger.warning(
382+
f"row count mismatch after bucketing: {batch_total} batch rows vs {bucket_total} bucket rows"
383+
)
384+
353385
process = psutil.Process(os.getpid())
354386
mem = process.memory_info().rss / 1024**2 # Memory in MB
355387
logger.info(f"[Memory usage] After 'bucketing': {mem:.2f} MB")

tests/integration/package/test_dataset_parquet.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -980,3 +980,69 @@ def test_load_pq_to_sqlite_basic(
980980
), "Some json object have underscores in their 'keys'"
981981

982982
cnx.close()
983+
984+
985+
def test_multi_bucket_load_facts_no_fact_loss(tmp_path):
986+
"""
987+
Exercises the multi-bucket path in load_facts end-to-end and asserts that
988+
every unique fact in the input appears exactly once in fact.parquet.
989+
990+
The multi-bucket path is forced by:
991+
- creating one small parquet file per fact so group_parquet_files produces
992+
many batch files (one per source file with target_mb=0.001)
993+
- overriding parquet_dir_details to simulate a large dataset relative to
994+
available memory, pushing n_buckets above 1
995+
996+
If any facts are silently dropped during bucket assignment or the final merge
997+
this test will fail.
998+
"""
999+
n_facts = 50
1000+
unique_facts = [f"{'a' * 63}{i}" for i in range(n_facts)]
1001+
1002+
transformed_dir = tmp_path / "transformed"
1003+
transformed_dir.mkdir()
1004+
1005+
for i, fact_hash in enumerate(unique_facts):
1006+
pd.DataFrame(
1007+
{
1008+
"end_date": [""],
1009+
"entity": [i + 1],
1010+
"entry_date": ["2023-01-01"],
1011+
"entry_number": ["1"],
1012+
"fact": [fact_hash],
1013+
"field": ["name"],
1014+
"priority": ["1"],
1015+
"reference_entity": [""],
1016+
"resource": ["resource_abc"],
1017+
"start_date": [""],
1018+
"value": [f"value_{i}"],
1019+
}
1020+
).to_parquet(transformed_dir / f"resource_{i}.parquet", index=False)
1021+
1022+
package = DatasetParquetPackage(
1023+
dataset="conservation-area",
1024+
path=tmp_path / "output",
1025+
specification_dir=None,
1026+
transformed_parquet_dir=transformed_dir,
1027+
)
1028+
1029+
# One source file per batch, giving n_facts batch files
1030+
package.group_parquet_files(transformed_dir, target_mb=0.001)
1031+
1032+
# Simulate large-dataset conditions so n_buckets > 1 is calculated
1033+
package.parquet_dir_details["total_size_mb"] = 100.0
1034+
package.parquet_dir_details["memory_available"] = 1.0
1035+
1036+
package.load_facts(transformed_dir)
1037+
1038+
output_file = (
1039+
tmp_path / "output" / "fact" / "dataset=conservation-area" / "fact.parquet"
1040+
)
1041+
assert output_file.exists(), "fact.parquet was not created"
1042+
1043+
df_result = pd.read_parquet(output_file)
1044+
1045+
assert len(df_result) == n_facts, (
1046+
f"Expected {n_facts} facts but got {len(df_result)}. "
1047+
"The multi-bucket path silently dropped some facts."
1048+
)

0 commit comments

Comments
 (0)