diff --git a/digital_land/package/dataset_parquet.py b/digital_land/package/dataset_parquet.py index 69c238df..9b509e40 100644 --- a/digital_land/package/dataset_parquet.py +++ b/digital_land/package/dataset_parquet.py @@ -244,6 +244,20 @@ def group_parquet_files( mem = process.memory_info().rss / 1024**2 # Memory in MB logger.info(f"[Memory usage] After grouping: {mem:.2f} MB") + # temporary diagnostic: verify row count is preserved across batching + source_count = self.conn.execute( + f"SELECT COUNT(*) FROM read_parquet('{str(transformed_parquet_dir)}/*.parquet')" + ).fetchone()[0] + batch_count = self.conn.execute( + f"SELECT COUNT(*) FROM read_parquet('{str(transformed_parquet_dir)}/batch/batch_*.parquet')" + ).fetchone()[0] + logger.info(f"row count in source parquet files: {source_count}") + logger.info(f"row count in batch files after grouping: {batch_count}") + if source_count != batch_count: + logger.warning( + f"row count mismatch after grouping: {source_count} source rows vs {batch_count} batch rows" + ) + def load_facts(self, transformed_parquet_dir): """ This method loads facts into a fact table from a directory containing all transformed files as parquet files @@ -342,7 +356,7 @@ def load_facts(self, transformed_parquet_dir): COPY ( SELECT * FROM read_parquet('{f}') - WHERE MOD(HASH(fact), {n_buckets}) = {i} + WHERE CAST(HASH(fact) AS UBIGINT) % {n_buckets} = {i} ) TO '{bucket_paths[i]}' (FORMAT PARQUET, APPEND TRUE); """ ) @@ -350,6 +364,24 @@ def load_facts(self, transformed_parquet_dir): logger.info( f"Have {len(list(bucket_dir.glob('bucket_*.parquet')))} bucket files" ) + + # temporary diagnostic: total rows across all buckets should equal + # total rows across all batch files (deduplication happens later) + bucket_total = self.conn.execute( + f"SELECT COUNT(*) FROM read_parquet('{str(bucket_dir)}/bucket_*.parquet')" + ).fetchone()[0] + batch_total = self.conn.execute( + f"SELECT COUNT(*) FROM read_parquet('{str(transformed_parquet_dir)}/batch/batch_*.parquet')" + ).fetchone()[0] + logger.info(f"row count in batch files: {batch_total}") + logger.info( + f"row count in bucket files after assignment: {bucket_total}" + ) + if bucket_total != batch_total: + logger.warning( + f"row count mismatch after bucketing: {batch_total} batch rows vs {bucket_total} bucket rows" + ) + process = psutil.Process(os.getpid()) mem = process.memory_info().rss / 1024**2 # Memory in MB logger.info(f"[Memory usage] After 'bucketing': {mem:.2f} MB") diff --git a/tests/integration/package/test_dataset_parquet.py b/tests/integration/package/test_dataset_parquet.py index 037d5d8a..82a0e430 100644 --- a/tests/integration/package/test_dataset_parquet.py +++ b/tests/integration/package/test_dataset_parquet.py @@ -980,3 +980,69 @@ def test_load_pq_to_sqlite_basic( ), "Some json object have underscores in their 'keys'" cnx.close() + + +def test_multi_bucket_load_facts_no_fact_loss(tmp_path): + """ + Exercises the multi-bucket path in load_facts end-to-end and asserts that + every unique fact in the input appears exactly once in fact.parquet. + + The multi-bucket path is forced by: + - creating one small parquet file per fact so group_parquet_files produces + many batch files (one per source file with target_mb=0.001) + - overriding parquet_dir_details to simulate a large dataset relative to + available memory, pushing n_buckets above 1 + + If any facts are silently dropped during bucket assignment or the final merge + this test will fail. + """ + n_facts = 50 + unique_facts = [f"{'a' * 63}{i}" for i in range(n_facts)] + + transformed_dir = tmp_path / "transformed" + transformed_dir.mkdir() + + for i, fact_hash in enumerate(unique_facts): + pd.DataFrame( + { + "end_date": [""], + "entity": [i + 1], + "entry_date": ["2023-01-01"], + "entry_number": ["1"], + "fact": [fact_hash], + "field": ["name"], + "priority": ["1"], + "reference_entity": [""], + "resource": ["resource_abc"], + "start_date": [""], + "value": [f"value_{i}"], + } + ).to_parquet(transformed_dir / f"resource_{i}.parquet", index=False) + + package = DatasetParquetPackage( + dataset="conservation-area", + path=tmp_path / "output", + specification_dir=None, + transformed_parquet_dir=transformed_dir, + ) + + # One source file per batch, giving n_facts batch files + package.group_parquet_files(transformed_dir, target_mb=0.001) + + # Simulate large-dataset conditions so n_buckets > 1 is calculated + package.parquet_dir_details["total_size_mb"] = 100.0 + package.parquet_dir_details["memory_available"] = 1.0 + + package.load_facts(transformed_dir) + + output_file = ( + tmp_path / "output" / "fact" / "dataset=conservation-area" / "fact.parquet" + ) + assert output_file.exists(), "fact.parquet was not created" + + df_result = pd.read_parquet(output_file) + + assert len(df_result) == n_facts, ( + f"Expected {n_facts} facts but got {len(df_result)}. " + "The multi-bucket path silently dropped some facts." + )