From c903499d6ff5853ee39f8cd33e139faeb5419d62 Mon Sep 17 00:00:00 2001 From: eveleighoj <35256612+eveleighoj@users.noreply.github.com> Date: Tue, 7 Apr 2026 17:16:33 +0100 Subject: [PATCH] add fix in --- digital_land/package/dataset_parquet.py | 128 +++++++++-------- .../package/test_dataset_parquet.py | 132 ++++++++++++++++++ 2 files changed, 199 insertions(+), 61 deletions(-) diff --git a/digital_land/package/dataset_parquet.py b/digital_land/package/dataset_parquet.py index 9b509e40..f2b983ca 100644 --- a/digital_land/package/dataset_parquet.py +++ b/digital_land/package/dataset_parquet.py @@ -258,6 +258,56 @@ def group_parquet_files( f"row count mismatch after grouping: {source_count} source rows vs {batch_count} batch rows" ) + def _assign_to_buckets(self, batch_dir, bucket_dir, n_buckets): + """ + Distribute rows from all batch files into n_buckets bucket files based + on HASH(fact) % n_buckets. Each unique fact hash always maps to the same + bucket, so per-bucket deduplication is equivalent to global deduplication. + + Returns the list of bucket file paths. + """ + digits = max(2, len(str(n_buckets - 1))) + bucket_paths = [ + bucket_dir / f"bucket_{i:0{digits}}.parquet" for i in range(n_buckets) + ] + for i in range(n_buckets): + self.conn.execute( + f""" + COPY ( + SELECT * + FROM read_parquet('{str(batch_dir)}/batch_*.parquet') + WHERE CAST(HASH(fact) AS UBIGINT) % {n_buckets} = {i} + ) TO '{bucket_paths[i]}' (FORMAT PARQUET); + """ + ) + return bucket_paths + + def _deduplicate_buckets(self, bucket_paths, result_dir): + """ + Apply per-fact deduplication to each bucket file, keeping the highest + priority and most recent entry for each unique fact hash. + + Returns the list of result file paths. + """ + digits = max(2, len(str(len(bucket_paths) - 1))) + result_paths = [ + result_dir / f"result_{i:0{digits}}.parquet" + for i in range(len(bucket_paths)) + ] + for i, bucket_path in enumerate(bucket_paths): + self.conn.execute( + f""" + COPY ( + SELECT * + FROM read_parquet('{bucket_path}') + QUALIFY ROW_NUMBER() OVER ( + PARTITION BY fact ORDER BY priority, entry_date DESC, entry_number DESC + ) = 1 + ) TO '{result_paths[i]}' (FORMAT PARQUET); + """ + ) + return result_paths + def load_facts(self, transformed_parquet_dir): """ This method loads facts into a fact table from a directory containing all transformed files as parquet files @@ -336,43 +386,23 @@ def load_facts(self, transformed_parquet_dir): logger.info( "Need to use multiple buckets in windowed function for facts" ) - bucket_dir = transformed_parquet_dir / "bucket" - bucket_dir.mkdir(parents=True, exist_ok=True) - digits = max(2, len(str(n_buckets - 1))) - bucket_paths = [ - bucket_dir / f"bucket_{i:0{digits}}.parquet" - for i in range(n_buckets) - ] + batch_dir = transformed_parquet_dir / "batch" logger.info( - f"Have {len(list(transformed_parquet_dir.glob('batch/batch_*.parquet')))} batch files" + f"Have {len(list(batch_dir.glob('batch_*.parquet')))} batch files" ) - # Loop over each batch file and assign to a bucket file + bucket_dir = transformed_parquet_dir / "bucket" + bucket_dir.mkdir(parents=True, exist_ok=True) logger.info(f"Assigning to {n_buckets} buckets") - for f in transformed_parquet_dir.glob("batch/batch_*.parquet"): - for i in range(n_buckets): - self.conn.execute( - f""" - COPY ( - SELECT * - FROM read_parquet('{f}') - WHERE CAST(HASH(fact) AS UBIGINT) % {n_buckets} = {i} - ) TO '{bucket_paths[i]}' (FORMAT PARQUET, APPEND TRUE); - """ - ) - - logger.info( - f"Have {len(list(bucket_dir.glob('bucket_*.parquet')))} bucket files" - ) + bucket_paths = self._assign_to_buckets(batch_dir, bucket_dir, n_buckets) + logger.info(f"Have {len(bucket_paths)} bucket files") - # temporary diagnostic: total rows across all buckets should equal - # total rows across all batch files (deduplication happens later) + batch_total = self.conn.execute( + f"SELECT COUNT(*) FROM read_parquet('{str(batch_dir)}/batch_*.parquet')" + ).fetchone()[0] bucket_total = self.conn.execute( f"SELECT COUNT(*) FROM read_parquet('{str(bucket_dir)}/bucket_*.parquet')" ).fetchone()[0] - batch_total = self.conn.execute( - f"SELECT COUNT(*) FROM read_parquet('{str(transformed_parquet_dir)}/batch/batch_*.parquet')" - ).fetchone()[0] logger.info(f"row count in batch files: {batch_total}") logger.info( f"row count in bucket files after assignment: {bucket_total}" @@ -383,51 +413,27 @@ def load_facts(self, transformed_parquet_dir): ) process = psutil.Process(os.getpid()) - mem = process.memory_info().rss / 1024**2 # Memory in MB + mem = process.memory_info().rss / 1024**2 logger.info(f"[Memory usage] After 'bucketing': {mem:.2f} MB") result_dir = transformed_parquet_dir / "result" result_dir.mkdir(parents=True, exist_ok=True) - result_paths = [ - result_dir / f"result_{i:0{digits}}.parquet" - for i in range(n_buckets) - ] - for i in range(n_buckets): - bucket_path = bucket_dir / f"bucket_{i:0{digits}}.parquet" - self.conn.execute( - f""" - COPY ( - SELECT * - FROM read_parquet('{bucket_path}') - QUALIFY ROW_NUMBER() OVER ( - PARTITION BY fact ORDER BY priority, entry_date DESC, entry_number DESC - ) = 1 - ) TO '{result_paths[i]}' (FORMAT PARQUET); - """ - ) - logger.info( - f"Have {len(list(transformed_parquet_dir.glob('result_*.parquet')))} result files" - ) - - # for path in bucket_paths: - # path.unlink(missing_ok=True) + result_paths = self._deduplicate_buckets(bucket_paths, result_dir) + logger.info(f"Have {len(result_paths)} result files") process = psutil.Process(os.getpid()) - mem = process.memory_info().rss / 1024**2 # Memory in MB + mem = process.memory_info().rss / 1024**2 logger.info(f"[Memory usage] After 'result': {mem:.2f} MB") self.conn.execute( f""" - COPY( - SELECT * FROM - read_parquet('{str(result_dir)}/result_*.parquet') - ) TO '{str(output_path)}' (FORMAT PARQUET); - """ + COPY ( + SELECT * FROM read_parquet('{str(result_dir)}/result_*.parquet') + ) TO '{str(output_path)}' (FORMAT PARQUET); + """ ) logger.info(f"Facts saved to output_path: {output_path}") logger.info(f"output_path exists: {os.path.exists(output_path)}") - # for path in result_paths: - # path.unlink(missing_ok=True) process = psutil.Process(os.getpid()) mem = process.memory_info().rss / 1024**2 # Memory in MB diff --git a/tests/integration/package/test_dataset_parquet.py b/tests/integration/package/test_dataset_parquet.py index 82a0e430..5a800a33 100644 --- a/tests/integration/package/test_dataset_parquet.py +++ b/tests/integration/package/test_dataset_parquet.py @@ -1046,3 +1046,135 @@ def test_multi_bucket_load_facts_no_fact_loss(tmp_path): f"Expected {n_facts} facts but got {len(df_result)}. " "The multi-bucket path silently dropped some facts." ) + + +def test_assign_to_buckets_creates_correct_files(tmp_path): + """ + Tests that _assign_to_buckets creates exactly n_buckets files in bucket_dir + and that the total row count across all bucket files equals the total row + count in the batch files (no rows dropped or duplicated during assignment). + """ + n_facts = 30 + n_buckets = 3 + + batch_dir = tmp_path / "batch" + batch_dir.mkdir() + for i in range(n_facts): + pd.DataFrame( + { + "end_date": [""], + "entity": [i + 1], + "entry_date": ["2023-01-01"], + "entry_number": ["1"], + "fact": [f"{'a' * 63}{i}"], + "field": ["name"], + "priority": ["1"], + "reference_entity": [""], + "resource": ["resource_abc"], + "start_date": [""], + "value": [f"value_{i}"], + } + ).to_parquet(batch_dir / f"batch_{i:02}.parquet", index=False) + + bucket_dir = tmp_path / "bucket" + bucket_dir.mkdir() + + package = DatasetParquetPackage( + dataset="conservation-area", + path=tmp_path / "output", + specification_dir=None, + ) + bucket_paths = package._assign_to_buckets(batch_dir, bucket_dir, n_buckets) + + assert ( + len(bucket_paths) == n_buckets + ), f"Expected {n_buckets} bucket files, got {len(bucket_paths)}" + assert all(p.exists() for p in bucket_paths), "Not all bucket files were created" + + bucket_total = sum(len(pd.read_parquet(p)) for p in bucket_paths) + assert bucket_total == n_facts, ( + f"Expected {n_facts} total rows across buckets, got {bucket_total}. " + "Rows were dropped or duplicated during bucket assignment." + ) + + # each fact hash should appear in exactly one bucket + fact_counts = {} + for p in bucket_paths: + for fact in pd.read_parquet(p)["fact"]: + fact_counts[fact] = fact_counts.get(fact, 0) + 1 + assert all( + v == 1 for v in fact_counts.values() + ), "Some fact hashes appear in more than one bucket" + + +def test_deduplicate_buckets_creates_correct_files(tmp_path): + """ + Tests that _deduplicate_buckets creates exactly one result file per bucket + and that duplicate fact hashes within a bucket are reduced to a single row, + keeping the row with the latest entry_date. + """ + n_buckets = 2 + + bucket_dir = tmp_path / "bucket" + bucket_dir.mkdir() + + # bucket_00: two rows for the same fact — only the later entry_date should survive + pd.DataFrame( + { + "end_date": ["", ""], + "entity": [1, 1], + "entry_date": ["2023-01-01", "2023-06-01"], + "entry_number": ["1", "2"], + "fact": ["fact_aaa", "fact_aaa"], + "field": ["name", "name"], + "priority": ["1", "1"], + "reference_entity": ["", ""], + "resource": ["res_a", "res_b"], + "start_date": ["", ""], + "value": ["old_value", "new_value"], + } + ).to_parquet(bucket_dir / "bucket_00.parquet", index=False) + + # bucket_01: two distinct facts, no duplicates + pd.DataFrame( + { + "end_date": ["", ""], + "entity": [2, 3], + "entry_date": ["2023-01-01", "2023-01-01"], + "entry_number": ["1", "1"], + "fact": ["fact_bbb", "fact_ccc"], + "field": ["name", "name"], + "priority": ["1", "1"], + "reference_entity": ["", ""], + "resource": ["res_a", "res_a"], + "start_date": ["", ""], + "value": ["value_b", "value_c"], + } + ).to_parquet(bucket_dir / "bucket_01.parquet", index=False) + + result_dir = tmp_path / "result" + result_dir.mkdir() + + package = DatasetParquetPackage( + dataset="conservation-area", + path=tmp_path / "output", + specification_dir=None, + ) + bucket_paths = [bucket_dir / "bucket_00.parquet", bucket_dir / "bucket_01.parquet"] + result_paths = package._deduplicate_buckets(bucket_paths, result_dir) + + assert ( + len(result_paths) == n_buckets + ), f"Expected {n_buckets} result files, got {len(result_paths)}" + assert all(p.exists() for p in result_paths), "Not all result files were created" + + df_00 = pd.read_parquet(result_paths[0]) + assert ( + len(df_00) == 1 + ), "Duplicate fact in bucket_00 should be deduplicated to 1 row" + assert ( + df_00.iloc[0]["value"] == "new_value" + ), "The row with the latest entry_date should be kept" + + df_01 = pd.read_parquet(result_paths[1]) + assert len(df_01) == 2, "bucket_01 has 2 distinct facts, both should be kept"