Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 67 additions & 61 deletions digital_land/package/dataset_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,56 @@ def group_parquet_files(
f"row count mismatch after grouping: {source_count} source rows vs {batch_count} batch rows"
)

def _assign_to_buckets(self, batch_dir, bucket_dir, n_buckets):
"""
Distribute rows from all batch files into n_buckets bucket files based
on HASH(fact) % n_buckets. Each unique fact hash always maps to the same
bucket, so per-bucket deduplication is equivalent to global deduplication.

Returns the list of bucket file paths.
"""
digits = max(2, len(str(n_buckets - 1)))
bucket_paths = [
bucket_dir / f"bucket_{i:0{digits}}.parquet" for i in range(n_buckets)
]
for i in range(n_buckets):
self.conn.execute(
f"""
COPY (
SELECT *
FROM read_parquet('{str(batch_dir)}/batch_*.parquet')
WHERE CAST(HASH(fact) AS UBIGINT) % {n_buckets} = {i}
) TO '{bucket_paths[i]}' (FORMAT PARQUET);
"""
)
return bucket_paths

def _deduplicate_buckets(self, bucket_paths, result_dir):
"""
Apply per-fact deduplication to each bucket file, keeping the highest
priority and most recent entry for each unique fact hash.

Returns the list of result file paths.
"""
digits = max(2, len(str(len(bucket_paths) - 1)))
result_paths = [
result_dir / f"result_{i:0{digits}}.parquet"
for i in range(len(bucket_paths))
]
for i, bucket_path in enumerate(bucket_paths):
self.conn.execute(
f"""
COPY (
SELECT *
FROM read_parquet('{bucket_path}')
QUALIFY ROW_NUMBER() OVER (
PARTITION BY fact ORDER BY priority, entry_date DESC, entry_number DESC
) = 1
) TO '{result_paths[i]}' (FORMAT PARQUET);
"""
)
return result_paths

def load_facts(self, transformed_parquet_dir):
"""
This method loads facts into a fact table from a directory containing all transformed files as parquet files
Expand Down Expand Up @@ -336,43 +386,23 @@ def load_facts(self, transformed_parquet_dir):
logger.info(
"Need to use multiple buckets in windowed function for facts"
)
bucket_dir = transformed_parquet_dir / "bucket"
bucket_dir.mkdir(parents=True, exist_ok=True)
digits = max(2, len(str(n_buckets - 1)))
bucket_paths = [
bucket_dir / f"bucket_{i:0{digits}}.parquet"
for i in range(n_buckets)
]
batch_dir = transformed_parquet_dir / "batch"
logger.info(
f"Have {len(list(transformed_parquet_dir.glob('batch/batch_*.parquet')))} batch files"
f"Have {len(list(batch_dir.glob('batch_*.parquet')))} batch files"
)

# Loop over each batch file and assign to a bucket file
bucket_dir = transformed_parquet_dir / "bucket"
bucket_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Assigning to {n_buckets} buckets")
for f in transformed_parquet_dir.glob("batch/batch_*.parquet"):
for i in range(n_buckets):
self.conn.execute(
f"""
COPY (
SELECT *
FROM read_parquet('{f}')
WHERE CAST(HASH(fact) AS UBIGINT) % {n_buckets} = {i}
) TO '{bucket_paths[i]}' (FORMAT PARQUET, APPEND TRUE);
"""
)

logger.info(
f"Have {len(list(bucket_dir.glob('bucket_*.parquet')))} bucket files"
)
bucket_paths = self._assign_to_buckets(batch_dir, bucket_dir, n_buckets)
logger.info(f"Have {len(bucket_paths)} bucket files")

# temporary diagnostic: total rows across all buckets should equal
# total rows across all batch files (deduplication happens later)
batch_total = self.conn.execute(
f"SELECT COUNT(*) FROM read_parquet('{str(batch_dir)}/batch_*.parquet')"
).fetchone()[0]
bucket_total = self.conn.execute(
f"SELECT COUNT(*) FROM read_parquet('{str(bucket_dir)}/bucket_*.parquet')"
).fetchone()[0]
batch_total = self.conn.execute(
f"SELECT COUNT(*) FROM read_parquet('{str(transformed_parquet_dir)}/batch/batch_*.parquet')"
).fetchone()[0]
logger.info(f"row count in batch files: {batch_total}")
logger.info(
f"row count in bucket files after assignment: {bucket_total}"
Expand All @@ -383,51 +413,27 @@ def load_facts(self, transformed_parquet_dir):
)

process = psutil.Process(os.getpid())
mem = process.memory_info().rss / 1024**2 # Memory in MB
mem = process.memory_info().rss / 1024**2
logger.info(f"[Memory usage] After 'bucketing': {mem:.2f} MB")

result_dir = transformed_parquet_dir / "result"
result_dir.mkdir(parents=True, exist_ok=True)
result_paths = [
result_dir / f"result_{i:0{digits}}.parquet"
for i in range(n_buckets)
]
for i in range(n_buckets):
bucket_path = bucket_dir / f"bucket_{i:0{digits}}.parquet"
self.conn.execute(
f"""
COPY (
SELECT *
FROM read_parquet('{bucket_path}')
QUALIFY ROW_NUMBER() OVER (
PARTITION BY fact ORDER BY priority, entry_date DESC, entry_number DESC
) = 1
) TO '{result_paths[i]}' (FORMAT PARQUET);
"""
)
logger.info(
f"Have {len(list(transformed_parquet_dir.glob('result_*.parquet')))} result files"
)

# for path in bucket_paths:
# path.unlink(missing_ok=True)
result_paths = self._deduplicate_buckets(bucket_paths, result_dir)
logger.info(f"Have {len(result_paths)} result files")

process = psutil.Process(os.getpid())
mem = process.memory_info().rss / 1024**2 # Memory in MB
mem = process.memory_info().rss / 1024**2
logger.info(f"[Memory usage] After 'result': {mem:.2f} MB")

self.conn.execute(
f"""
COPY(
SELECT * FROM
read_parquet('{str(result_dir)}/result_*.parquet')
) TO '{str(output_path)}' (FORMAT PARQUET);
"""
COPY (
SELECT * FROM read_parquet('{str(result_dir)}/result_*.parquet')
) TO '{str(output_path)}' (FORMAT PARQUET);
"""
)
logger.info(f"Facts saved to output_path: {output_path}")
logger.info(f"output_path exists: {os.path.exists(output_path)}")
# for path in result_paths:
# path.unlink(missing_ok=True)

process = psutil.Process(os.getpid())
mem = process.memory_info().rss / 1024**2 # Memory in MB
Expand Down
132 changes: 132 additions & 0 deletions tests/integration/package/test_dataset_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1046,3 +1046,135 @@ def test_multi_bucket_load_facts_no_fact_loss(tmp_path):
f"Expected {n_facts} facts but got {len(df_result)}. "
"The multi-bucket path silently dropped some facts."
)


def test_assign_to_buckets_creates_correct_files(tmp_path):
"""
Tests that _assign_to_buckets creates exactly n_buckets files in bucket_dir
and that the total row count across all bucket files equals the total row
count in the batch files (no rows dropped or duplicated during assignment).
"""
n_facts = 30
n_buckets = 3

batch_dir = tmp_path / "batch"
batch_dir.mkdir()
for i in range(n_facts):
pd.DataFrame(
{
"end_date": [""],
"entity": [i + 1],
"entry_date": ["2023-01-01"],
"entry_number": ["1"],
"fact": [f"{'a' * 63}{i}"],
"field": ["name"],
"priority": ["1"],
"reference_entity": [""],
"resource": ["resource_abc"],
"start_date": [""],
"value": [f"value_{i}"],
}
).to_parquet(batch_dir / f"batch_{i:02}.parquet", index=False)

bucket_dir = tmp_path / "bucket"
bucket_dir.mkdir()

package = DatasetParquetPackage(
dataset="conservation-area",
path=tmp_path / "output",
specification_dir=None,
)
bucket_paths = package._assign_to_buckets(batch_dir, bucket_dir, n_buckets)

assert (
len(bucket_paths) == n_buckets
), f"Expected {n_buckets} bucket files, got {len(bucket_paths)}"
assert all(p.exists() for p in bucket_paths), "Not all bucket files were created"

bucket_total = sum(len(pd.read_parquet(p)) for p in bucket_paths)
assert bucket_total == n_facts, (
f"Expected {n_facts} total rows across buckets, got {bucket_total}. "
"Rows were dropped or duplicated during bucket assignment."
)

# each fact hash should appear in exactly one bucket
fact_counts = {}
for p in bucket_paths:
for fact in pd.read_parquet(p)["fact"]:
fact_counts[fact] = fact_counts.get(fact, 0) + 1
assert all(
v == 1 for v in fact_counts.values()
), "Some fact hashes appear in more than one bucket"


def test_deduplicate_buckets_creates_correct_files(tmp_path):
"""
Tests that _deduplicate_buckets creates exactly one result file per bucket
and that duplicate fact hashes within a bucket are reduced to a single row,
keeping the row with the latest entry_date.
"""
n_buckets = 2

bucket_dir = tmp_path / "bucket"
bucket_dir.mkdir()

# bucket_00: two rows for the same fact — only the later entry_date should survive
pd.DataFrame(
{
"end_date": ["", ""],
"entity": [1, 1],
"entry_date": ["2023-01-01", "2023-06-01"],
"entry_number": ["1", "2"],
"fact": ["fact_aaa", "fact_aaa"],
"field": ["name", "name"],
"priority": ["1", "1"],
"reference_entity": ["", ""],
"resource": ["res_a", "res_b"],
"start_date": ["", ""],
"value": ["old_value", "new_value"],
}
).to_parquet(bucket_dir / "bucket_00.parquet", index=False)

# bucket_01: two distinct facts, no duplicates
pd.DataFrame(
{
"end_date": ["", ""],
"entity": [2, 3],
"entry_date": ["2023-01-01", "2023-01-01"],
"entry_number": ["1", "1"],
"fact": ["fact_bbb", "fact_ccc"],
"field": ["name", "name"],
"priority": ["1", "1"],
"reference_entity": ["", ""],
"resource": ["res_a", "res_a"],
"start_date": ["", ""],
"value": ["value_b", "value_c"],
}
).to_parquet(bucket_dir / "bucket_01.parquet", index=False)

result_dir = tmp_path / "result"
result_dir.mkdir()

package = DatasetParquetPackage(
dataset="conservation-area",
path=tmp_path / "output",
specification_dir=None,
)
bucket_paths = [bucket_dir / "bucket_00.parquet", bucket_dir / "bucket_01.parquet"]
result_paths = package._deduplicate_buckets(bucket_paths, result_dir)

assert (
len(result_paths) == n_buckets
), f"Expected {n_buckets} result files, got {len(result_paths)}"
assert all(p.exists() for p in result_paths), "Not all result files were created"

df_00 = pd.read_parquet(result_paths[0])
assert (
len(df_00) == 1
), "Duplicate fact in bucket_00 should be deduplicated to 1 row"
assert (
df_00.iloc[0]["value"] == "new_value"
), "The row with the latest entry_date should be kept"

df_01 = pd.read_parquet(result_paths[1])
assert len(df_01) == 2, "bucket_01 has 2 distinct facts, both should be kept"
Loading