Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 92 additions & 54 deletions digital_land/package/dataset_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,70 @@ def group_parquet_files(
mem = process.memory_info().rss / 1024**2 # Memory in MB
logger.info(f"[Memory usage] After grouping: {mem:.2f} MB")

# temporary diagnostic: verify row count is preserved across batching
source_count = self.conn.execute(
f"SELECT COUNT(*) FROM read_parquet('{str(transformed_parquet_dir)}/*.parquet')"
).fetchone()[0]
batch_count = self.conn.execute(
f"SELECT COUNT(*) FROM read_parquet('{str(transformed_parquet_dir)}/batch/batch_*.parquet')"
).fetchone()[0]
logger.info(f"row count in source parquet files: {source_count}")
logger.info(f"row count in batch files after grouping: {batch_count}")
if source_count != batch_count:
logger.warning(
f"row count mismatch after grouping: {source_count} source rows vs {batch_count} batch rows"
)

def _assign_to_buckets(self, batch_dir, bucket_dir, n_buckets):
"""
Distribute rows from all batch files into n_buckets bucket files based
on HASH(fact) % n_buckets. Each unique fact hash always maps to the same
bucket, so per-bucket deduplication is equivalent to global deduplication.

Returns the list of bucket file paths.
"""
digits = max(2, len(str(n_buckets - 1)))
bucket_paths = [
bucket_dir / f"bucket_{i:0{digits}}.parquet" for i in range(n_buckets)
]
for i in range(n_buckets):
self.conn.execute(
f"""
COPY (
SELECT *
FROM read_parquet('{str(batch_dir)}/batch_*.parquet')
WHERE CAST(HASH(fact) AS UBIGINT) % {n_buckets} = {i}
) TO '{bucket_paths[i]}' (FORMAT PARQUET);
"""
)
return bucket_paths

def _deduplicate_buckets(self, bucket_paths, result_dir):
"""
Apply per-fact deduplication to each bucket file, keeping the highest
priority and most recent entry for each unique fact hash.

Returns the list of result file paths.
"""
digits = max(2, len(str(len(bucket_paths) - 1)))
result_paths = [
result_dir / f"result_{i:0{digits}}.parquet"
for i in range(len(bucket_paths))
]
for i, bucket_path in enumerate(bucket_paths):
self.conn.execute(
f"""
COPY (
SELECT *
FROM read_parquet('{bucket_path}')
QUALIFY ROW_NUMBER() OVER (
PARTITION BY fact ORDER BY priority, entry_date DESC, entry_number DESC
) = 1
) TO '{result_paths[i]}' (FORMAT PARQUET);
"""
)
return result_paths

def load_facts(self, transformed_parquet_dir):
"""
This method loads facts into a fact table from a directory containing all transformed files as parquet files
Expand Down Expand Up @@ -322,80 +386,54 @@ def load_facts(self, transformed_parquet_dir):
logger.info(
"Need to use multiple buckets in windowed function for facts"
)
bucket_dir = transformed_parquet_dir / "bucket"
bucket_dir.mkdir(parents=True, exist_ok=True)
digits = max(2, len(str(n_buckets - 1)))
bucket_paths = [
bucket_dir / f"bucket_{i:0{digits}}.parquet"
for i in range(n_buckets)
]
batch_dir = transformed_parquet_dir / "batch"
logger.info(
f"Have {len(list(transformed_parquet_dir.glob('batch/batch_*.parquet')))} batch files"
f"Have {len(list(batch_dir.glob('batch_*.parquet')))} batch files"
)

# Loop over each batch file and assign to a bucket file
bucket_dir = transformed_parquet_dir / "bucket"
bucket_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Assigning to {n_buckets} buckets")
for f in transformed_parquet_dir.glob("batch/batch_*.parquet"):
for i in range(n_buckets):
self.conn.execute(
f"""
COPY (
SELECT *
FROM read_parquet('{f}')
WHERE MOD(HASH(fact), {n_buckets}) = {i}
) TO '{bucket_paths[i]}' (FORMAT PARQUET, APPEND TRUE);
"""
)

bucket_paths = self._assign_to_buckets(batch_dir, bucket_dir, n_buckets)
logger.info(f"Have {len(bucket_paths)} bucket files")

batch_total = self.conn.execute(
f"SELECT COUNT(*) FROM read_parquet('{str(batch_dir)}/batch_*.parquet')"
).fetchone()[0]
bucket_total = self.conn.execute(
f"SELECT COUNT(*) FROM read_parquet('{str(bucket_dir)}/bucket_*.parquet')"
).fetchone()[0]
logger.info(f"row count in batch files: {batch_total}")
logger.info(
f"Have {len(list(bucket_dir.glob('bucket_*.parquet')))} bucket files"
f"row count in bucket files after assignment: {bucket_total}"
)
if bucket_total != batch_total:
logger.warning(
f"row count mismatch after bucketing: {batch_total} batch rows vs {bucket_total} bucket rows"
)

process = psutil.Process(os.getpid())
mem = process.memory_info().rss / 1024**2 # Memory in MB
mem = process.memory_info().rss / 1024**2
logger.info(f"[Memory usage] After 'bucketing': {mem:.2f} MB")

result_dir = transformed_parquet_dir / "result"
result_dir.mkdir(parents=True, exist_ok=True)
result_paths = [
result_dir / f"result_{i:0{digits}}.parquet"
for i in range(n_buckets)
]
for i in range(n_buckets):
bucket_path = bucket_dir / f"bucket_{i:0{digits}}.parquet"
self.conn.execute(
f"""
COPY (
SELECT *
FROM read_parquet('{bucket_path}')
QUALIFY ROW_NUMBER() OVER (
PARTITION BY fact ORDER BY priority, entry_date DESC, entry_number DESC
) = 1
) TO '{result_paths[i]}' (FORMAT PARQUET);
"""
)
logger.info(
f"Have {len(list(transformed_parquet_dir.glob('result_*.parquet')))} result files"
)

# for path in bucket_paths:
# path.unlink(missing_ok=True)
result_paths = self._deduplicate_buckets(bucket_paths, result_dir)
logger.info(f"Have {len(result_paths)} result files")

process = psutil.Process(os.getpid())
mem = process.memory_info().rss / 1024**2 # Memory in MB
mem = process.memory_info().rss / 1024**2
logger.info(f"[Memory usage] After 'result': {mem:.2f} MB")

self.conn.execute(
f"""
COPY(
SELECT * FROM
read_parquet('{str(result_dir)}/result_*.parquet')
) TO '{str(output_path)}' (FORMAT PARQUET);
"""
COPY (
SELECT * FROM read_parquet('{str(result_dir)}/result_*.parquet')
) TO '{str(output_path)}' (FORMAT PARQUET);
"""
)
logger.info(f"Facts saved to output_path: {output_path}")
logger.info(f"output_path exists: {os.path.exists(output_path)}")
# for path in result_paths:
# path.unlink(missing_ok=True)

process = psutil.Process(os.getpid())
mem = process.memory_info().rss / 1024**2 # Memory in MB
Expand Down
198 changes: 198 additions & 0 deletions tests/integration/package/test_dataset_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -980,3 +980,201 @@ def test_load_pq_to_sqlite_basic(
), "Some json object have underscores in their 'keys'"

cnx.close()


def test_multi_bucket_load_facts_no_fact_loss(tmp_path):
"""
Exercises the multi-bucket path in load_facts end-to-end and asserts that
every unique fact in the input appears exactly once in fact.parquet.

The multi-bucket path is forced by:
- creating one small parquet file per fact so group_parquet_files produces
many batch files (one per source file with target_mb=0.001)
- overriding parquet_dir_details to simulate a large dataset relative to
available memory, pushing n_buckets above 1

If any facts are silently dropped during bucket assignment or the final merge
this test will fail.
"""
n_facts = 50
unique_facts = [f"{'a' * 63}{i}" for i in range(n_facts)]

transformed_dir = tmp_path / "transformed"
transformed_dir.mkdir()

for i, fact_hash in enumerate(unique_facts):
pd.DataFrame(
{
"end_date": [""],
"entity": [i + 1],
"entry_date": ["2023-01-01"],
"entry_number": ["1"],
"fact": [fact_hash],
"field": ["name"],
"priority": ["1"],
"reference_entity": [""],
"resource": ["resource_abc"],
"start_date": [""],
"value": [f"value_{i}"],
}
).to_parquet(transformed_dir / f"resource_{i}.parquet", index=False)

package = DatasetParquetPackage(
dataset="conservation-area",
path=tmp_path / "output",
specification_dir=None,
transformed_parquet_dir=transformed_dir,
)

# One source file per batch, giving n_facts batch files
package.group_parquet_files(transformed_dir, target_mb=0.001)

# Simulate large-dataset conditions so n_buckets > 1 is calculated
package.parquet_dir_details["total_size_mb"] = 100.0
package.parquet_dir_details["memory_available"] = 1.0

package.load_facts(transformed_dir)

output_file = (
tmp_path / "output" / "fact" / "dataset=conservation-area" / "fact.parquet"
)
assert output_file.exists(), "fact.parquet was not created"

df_result = pd.read_parquet(output_file)

assert len(df_result) == n_facts, (
f"Expected {n_facts} facts but got {len(df_result)}. "
"The multi-bucket path silently dropped some facts."
)


def test_assign_to_buckets_creates_correct_files(tmp_path):
"""
Tests that _assign_to_buckets creates exactly n_buckets files in bucket_dir
and that the total row count across all bucket files equals the total row
count in the batch files (no rows dropped or duplicated during assignment).
"""
n_facts = 30
n_buckets = 3

batch_dir = tmp_path / "batch"
batch_dir.mkdir()
for i in range(n_facts):
pd.DataFrame(
{
"end_date": [""],
"entity": [i + 1],
"entry_date": ["2023-01-01"],
"entry_number": ["1"],
"fact": [f"{'a' * 63}{i}"],
"field": ["name"],
"priority": ["1"],
"reference_entity": [""],
"resource": ["resource_abc"],
"start_date": [""],
"value": [f"value_{i}"],
}
).to_parquet(batch_dir / f"batch_{i:02}.parquet", index=False)

bucket_dir = tmp_path / "bucket"
bucket_dir.mkdir()

package = DatasetParquetPackage(
dataset="conservation-area",
path=tmp_path / "output",
specification_dir=None,
)
bucket_paths = package._assign_to_buckets(batch_dir, bucket_dir, n_buckets)

assert (
len(bucket_paths) == n_buckets
), f"Expected {n_buckets} bucket files, got {len(bucket_paths)}"
assert all(p.exists() for p in bucket_paths), "Not all bucket files were created"

bucket_total = sum(len(pd.read_parquet(p)) for p in bucket_paths)
assert bucket_total == n_facts, (
f"Expected {n_facts} total rows across buckets, got {bucket_total}. "
"Rows were dropped or duplicated during bucket assignment."
)

# each fact hash should appear in exactly one bucket
fact_counts = {}
for p in bucket_paths:
for fact in pd.read_parquet(p)["fact"]:
fact_counts[fact] = fact_counts.get(fact, 0) + 1
assert all(
v == 1 for v in fact_counts.values()
), "Some fact hashes appear in more than one bucket"


def test_deduplicate_buckets_creates_correct_files(tmp_path):
"""
Tests that _deduplicate_buckets creates exactly one result file per bucket
and that duplicate fact hashes within a bucket are reduced to a single row,
keeping the row with the latest entry_date.
"""
n_buckets = 2

bucket_dir = tmp_path / "bucket"
bucket_dir.mkdir()

# bucket_00: two rows for the same fact — only the later entry_date should survive
pd.DataFrame(
{
"end_date": ["", ""],
"entity": [1, 1],
"entry_date": ["2023-01-01", "2023-06-01"],
"entry_number": ["1", "2"],
"fact": ["fact_aaa", "fact_aaa"],
"field": ["name", "name"],
"priority": ["1", "1"],
"reference_entity": ["", ""],
"resource": ["res_a", "res_b"],
"start_date": ["", ""],
"value": ["old_value", "new_value"],
}
).to_parquet(bucket_dir / "bucket_00.parquet", index=False)

# bucket_01: two distinct facts, no duplicates
pd.DataFrame(
{
"end_date": ["", ""],
"entity": [2, 3],
"entry_date": ["2023-01-01", "2023-01-01"],
"entry_number": ["1", "1"],
"fact": ["fact_bbb", "fact_ccc"],
"field": ["name", "name"],
"priority": ["1", "1"],
"reference_entity": ["", ""],
"resource": ["res_a", "res_a"],
"start_date": ["", ""],
"value": ["value_b", "value_c"],
}
).to_parquet(bucket_dir / "bucket_01.parquet", index=False)

result_dir = tmp_path / "result"
result_dir.mkdir()

package = DatasetParquetPackage(
dataset="conservation-area",
path=tmp_path / "output",
specification_dir=None,
)
bucket_paths = [bucket_dir / "bucket_00.parquet", bucket_dir / "bucket_01.parquet"]
result_paths = package._deduplicate_buckets(bucket_paths, result_dir)

assert (
len(result_paths) == n_buckets
), f"Expected {n_buckets} result files, got {len(result_paths)}"
assert all(p.exists() for p in result_paths), "Not all result files were created"

df_00 = pd.read_parquet(result_paths[0])
assert (
len(df_00) == 1
), "Duplicate fact in bucket_00 should be deduplicated to 1 row"
assert (
df_00.iloc[0]["value"] == "new_value"
), "The row with the latest entry_date should be kept"

df_01 = pd.read_parquet(result_paths[1])
assert len(df_01) == 2, "bucket_01 has 2 distinct facts, both should be kept"