diff --git a/digital_land/package/dataset_parquet.py b/digital_land/package/dataset_parquet.py index 69c238df..f2b983ca 100644 --- a/digital_land/package/dataset_parquet.py +++ b/digital_land/package/dataset_parquet.py @@ -244,6 +244,70 @@ def group_parquet_files( mem = process.memory_info().rss / 1024**2 # Memory in MB logger.info(f"[Memory usage] After grouping: {mem:.2f} MB") + # temporary diagnostic: verify row count is preserved across batching + source_count = self.conn.execute( + f"SELECT COUNT(*) FROM read_parquet('{str(transformed_parquet_dir)}/*.parquet')" + ).fetchone()[0] + batch_count = self.conn.execute( + f"SELECT COUNT(*) FROM read_parquet('{str(transformed_parquet_dir)}/batch/batch_*.parquet')" + ).fetchone()[0] + logger.info(f"row count in source parquet files: {source_count}") + logger.info(f"row count in batch files after grouping: {batch_count}") + if source_count != batch_count: + logger.warning( + f"row count mismatch after grouping: {source_count} source rows vs {batch_count} batch rows" + ) + + def _assign_to_buckets(self, batch_dir, bucket_dir, n_buckets): + """ + Distribute rows from all batch files into n_buckets bucket files based + on HASH(fact) % n_buckets. Each unique fact hash always maps to the same + bucket, so per-bucket deduplication is equivalent to global deduplication. + + Returns the list of bucket file paths. + """ + digits = max(2, len(str(n_buckets - 1))) + bucket_paths = [ + bucket_dir / f"bucket_{i:0{digits}}.parquet" for i in range(n_buckets) + ] + for i in range(n_buckets): + self.conn.execute( + f""" + COPY ( + SELECT * + FROM read_parquet('{str(batch_dir)}/batch_*.parquet') + WHERE CAST(HASH(fact) AS UBIGINT) % {n_buckets} = {i} + ) TO '{bucket_paths[i]}' (FORMAT PARQUET); + """ + ) + return bucket_paths + + def _deduplicate_buckets(self, bucket_paths, result_dir): + """ + Apply per-fact deduplication to each bucket file, keeping the highest + priority and most recent entry for each unique fact hash. + + Returns the list of result file paths. + """ + digits = max(2, len(str(len(bucket_paths) - 1))) + result_paths = [ + result_dir / f"result_{i:0{digits}}.parquet" + for i in range(len(bucket_paths)) + ] + for i, bucket_path in enumerate(bucket_paths): + self.conn.execute( + f""" + COPY ( + SELECT * + FROM read_parquet('{bucket_path}') + QUALIFY ROW_NUMBER() OVER ( + PARTITION BY fact ORDER BY priority, entry_date DESC, entry_number DESC + ) = 1 + ) TO '{result_paths[i]}' (FORMAT PARQUET); + """ + ) + return result_paths + def load_facts(self, transformed_parquet_dir): """ This method loads facts into a fact table from a directory containing all transformed files as parquet files @@ -322,80 +386,54 @@ def load_facts(self, transformed_parquet_dir): logger.info( "Need to use multiple buckets in windowed function for facts" ) - bucket_dir = transformed_parquet_dir / "bucket" - bucket_dir.mkdir(parents=True, exist_ok=True) - digits = max(2, len(str(n_buckets - 1))) - bucket_paths = [ - bucket_dir / f"bucket_{i:0{digits}}.parquet" - for i in range(n_buckets) - ] + batch_dir = transformed_parquet_dir / "batch" logger.info( - f"Have {len(list(transformed_parquet_dir.glob('batch/batch_*.parquet')))} batch files" + f"Have {len(list(batch_dir.glob('batch_*.parquet')))} batch files" ) - # Loop over each batch file and assign to a bucket file + bucket_dir = transformed_parquet_dir / "bucket" + bucket_dir.mkdir(parents=True, exist_ok=True) logger.info(f"Assigning to {n_buckets} buckets") - for f in transformed_parquet_dir.glob("batch/batch_*.parquet"): - for i in range(n_buckets): - self.conn.execute( - f""" - COPY ( - SELECT * - FROM read_parquet('{f}') - WHERE MOD(HASH(fact), {n_buckets}) = {i} - ) TO '{bucket_paths[i]}' (FORMAT PARQUET, APPEND TRUE); - """ - ) - + bucket_paths = self._assign_to_buckets(batch_dir, bucket_dir, n_buckets) + logger.info(f"Have {len(bucket_paths)} bucket files") + + batch_total = self.conn.execute( + f"SELECT COUNT(*) FROM read_parquet('{str(batch_dir)}/batch_*.parquet')" + ).fetchone()[0] + bucket_total = self.conn.execute( + f"SELECT COUNT(*) FROM read_parquet('{str(bucket_dir)}/bucket_*.parquet')" + ).fetchone()[0] + logger.info(f"row count in batch files: {batch_total}") logger.info( - f"Have {len(list(bucket_dir.glob('bucket_*.parquet')))} bucket files" + f"row count in bucket files after assignment: {bucket_total}" ) + if bucket_total != batch_total: + logger.warning( + f"row count mismatch after bucketing: {batch_total} batch rows vs {bucket_total} bucket rows" + ) + process = psutil.Process(os.getpid()) - mem = process.memory_info().rss / 1024**2 # Memory in MB + mem = process.memory_info().rss / 1024**2 logger.info(f"[Memory usage] After 'bucketing': {mem:.2f} MB") result_dir = transformed_parquet_dir / "result" result_dir.mkdir(parents=True, exist_ok=True) - result_paths = [ - result_dir / f"result_{i:0{digits}}.parquet" - for i in range(n_buckets) - ] - for i in range(n_buckets): - bucket_path = bucket_dir / f"bucket_{i:0{digits}}.parquet" - self.conn.execute( - f""" - COPY ( - SELECT * - FROM read_parquet('{bucket_path}') - QUALIFY ROW_NUMBER() OVER ( - PARTITION BY fact ORDER BY priority, entry_date DESC, entry_number DESC - ) = 1 - ) TO '{result_paths[i]}' (FORMAT PARQUET); - """ - ) - logger.info( - f"Have {len(list(transformed_parquet_dir.glob('result_*.parquet')))} result files" - ) - - # for path in bucket_paths: - # path.unlink(missing_ok=True) + result_paths = self._deduplicate_buckets(bucket_paths, result_dir) + logger.info(f"Have {len(result_paths)} result files") process = psutil.Process(os.getpid()) - mem = process.memory_info().rss / 1024**2 # Memory in MB + mem = process.memory_info().rss / 1024**2 logger.info(f"[Memory usage] After 'result': {mem:.2f} MB") self.conn.execute( f""" - COPY( - SELECT * FROM - read_parquet('{str(result_dir)}/result_*.parquet') - ) TO '{str(output_path)}' (FORMAT PARQUET); - """ + COPY ( + SELECT * FROM read_parquet('{str(result_dir)}/result_*.parquet') + ) TO '{str(output_path)}' (FORMAT PARQUET); + """ ) logger.info(f"Facts saved to output_path: {output_path}") logger.info(f"output_path exists: {os.path.exists(output_path)}") - # for path in result_paths: - # path.unlink(missing_ok=True) process = psutil.Process(os.getpid()) mem = process.memory_info().rss / 1024**2 # Memory in MB diff --git a/tests/integration/package/test_dataset_parquet.py b/tests/integration/package/test_dataset_parquet.py index 037d5d8a..5a800a33 100644 --- a/tests/integration/package/test_dataset_parquet.py +++ b/tests/integration/package/test_dataset_parquet.py @@ -980,3 +980,201 @@ def test_load_pq_to_sqlite_basic( ), "Some json object have underscores in their 'keys'" cnx.close() + + +def test_multi_bucket_load_facts_no_fact_loss(tmp_path): + """ + Exercises the multi-bucket path in load_facts end-to-end and asserts that + every unique fact in the input appears exactly once in fact.parquet. + + The multi-bucket path is forced by: + - creating one small parquet file per fact so group_parquet_files produces + many batch files (one per source file with target_mb=0.001) + - overriding parquet_dir_details to simulate a large dataset relative to + available memory, pushing n_buckets above 1 + + If any facts are silently dropped during bucket assignment or the final merge + this test will fail. + """ + n_facts = 50 + unique_facts = [f"{'a' * 63}{i}" for i in range(n_facts)] + + transformed_dir = tmp_path / "transformed" + transformed_dir.mkdir() + + for i, fact_hash in enumerate(unique_facts): + pd.DataFrame( + { + "end_date": [""], + "entity": [i + 1], + "entry_date": ["2023-01-01"], + "entry_number": ["1"], + "fact": [fact_hash], + "field": ["name"], + "priority": ["1"], + "reference_entity": [""], + "resource": ["resource_abc"], + "start_date": [""], + "value": [f"value_{i}"], + } + ).to_parquet(transformed_dir / f"resource_{i}.parquet", index=False) + + package = DatasetParquetPackage( + dataset="conservation-area", + path=tmp_path / "output", + specification_dir=None, + transformed_parquet_dir=transformed_dir, + ) + + # One source file per batch, giving n_facts batch files + package.group_parquet_files(transformed_dir, target_mb=0.001) + + # Simulate large-dataset conditions so n_buckets > 1 is calculated + package.parquet_dir_details["total_size_mb"] = 100.0 + package.parquet_dir_details["memory_available"] = 1.0 + + package.load_facts(transformed_dir) + + output_file = ( + tmp_path / "output" / "fact" / "dataset=conservation-area" / "fact.parquet" + ) + assert output_file.exists(), "fact.parquet was not created" + + df_result = pd.read_parquet(output_file) + + assert len(df_result) == n_facts, ( + f"Expected {n_facts} facts but got {len(df_result)}. " + "The multi-bucket path silently dropped some facts." + ) + + +def test_assign_to_buckets_creates_correct_files(tmp_path): + """ + Tests that _assign_to_buckets creates exactly n_buckets files in bucket_dir + and that the total row count across all bucket files equals the total row + count in the batch files (no rows dropped or duplicated during assignment). + """ + n_facts = 30 + n_buckets = 3 + + batch_dir = tmp_path / "batch" + batch_dir.mkdir() + for i in range(n_facts): + pd.DataFrame( + { + "end_date": [""], + "entity": [i + 1], + "entry_date": ["2023-01-01"], + "entry_number": ["1"], + "fact": [f"{'a' * 63}{i}"], + "field": ["name"], + "priority": ["1"], + "reference_entity": [""], + "resource": ["resource_abc"], + "start_date": [""], + "value": [f"value_{i}"], + } + ).to_parquet(batch_dir / f"batch_{i:02}.parquet", index=False) + + bucket_dir = tmp_path / "bucket" + bucket_dir.mkdir() + + package = DatasetParquetPackage( + dataset="conservation-area", + path=tmp_path / "output", + specification_dir=None, + ) + bucket_paths = package._assign_to_buckets(batch_dir, bucket_dir, n_buckets) + + assert ( + len(bucket_paths) == n_buckets + ), f"Expected {n_buckets} bucket files, got {len(bucket_paths)}" + assert all(p.exists() for p in bucket_paths), "Not all bucket files were created" + + bucket_total = sum(len(pd.read_parquet(p)) for p in bucket_paths) + assert bucket_total == n_facts, ( + f"Expected {n_facts} total rows across buckets, got {bucket_total}. " + "Rows were dropped or duplicated during bucket assignment." + ) + + # each fact hash should appear in exactly one bucket + fact_counts = {} + for p in bucket_paths: + for fact in pd.read_parquet(p)["fact"]: + fact_counts[fact] = fact_counts.get(fact, 0) + 1 + assert all( + v == 1 for v in fact_counts.values() + ), "Some fact hashes appear in more than one bucket" + + +def test_deduplicate_buckets_creates_correct_files(tmp_path): + """ + Tests that _deduplicate_buckets creates exactly one result file per bucket + and that duplicate fact hashes within a bucket are reduced to a single row, + keeping the row with the latest entry_date. + """ + n_buckets = 2 + + bucket_dir = tmp_path / "bucket" + bucket_dir.mkdir() + + # bucket_00: two rows for the same fact — only the later entry_date should survive + pd.DataFrame( + { + "end_date": ["", ""], + "entity": [1, 1], + "entry_date": ["2023-01-01", "2023-06-01"], + "entry_number": ["1", "2"], + "fact": ["fact_aaa", "fact_aaa"], + "field": ["name", "name"], + "priority": ["1", "1"], + "reference_entity": ["", ""], + "resource": ["res_a", "res_b"], + "start_date": ["", ""], + "value": ["old_value", "new_value"], + } + ).to_parquet(bucket_dir / "bucket_00.parquet", index=False) + + # bucket_01: two distinct facts, no duplicates + pd.DataFrame( + { + "end_date": ["", ""], + "entity": [2, 3], + "entry_date": ["2023-01-01", "2023-01-01"], + "entry_number": ["1", "1"], + "fact": ["fact_bbb", "fact_ccc"], + "field": ["name", "name"], + "priority": ["1", "1"], + "reference_entity": ["", ""], + "resource": ["res_a", "res_a"], + "start_date": ["", ""], + "value": ["value_b", "value_c"], + } + ).to_parquet(bucket_dir / "bucket_01.parquet", index=False) + + result_dir = tmp_path / "result" + result_dir.mkdir() + + package = DatasetParquetPackage( + dataset="conservation-area", + path=tmp_path / "output", + specification_dir=None, + ) + bucket_paths = [bucket_dir / "bucket_00.parquet", bucket_dir / "bucket_01.parquet"] + result_paths = package._deduplicate_buckets(bucket_paths, result_dir) + + assert ( + len(result_paths) == n_buckets + ), f"Expected {n_buckets} result files, got {len(result_paths)}" + assert all(p.exists() for p in result_paths), "Not all result files were created" + + df_00 = pd.read_parquet(result_paths[0]) + assert ( + len(df_00) == 1 + ), "Duplicate fact in bucket_00 should be deduplicated to 1 row" + assert ( + df_00.iloc[0]["value"] == "new_value" + ), "The row with the latest entry_date should be kept" + + df_01 = pd.read_parquet(result_paths[1]) + assert len(df_01) == 2, "bucket_01 has 2 distinct facts, both should be kept"