Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion digital_land/package/dataset_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,20 @@ def group_parquet_files(
mem = process.memory_info().rss / 1024**2 # Memory in MB
logger.info(f"[Memory usage] After grouping: {mem:.2f} MB")

# temporary diagnostic: verify row count is preserved across batching
source_count = self.conn.execute(
f"SELECT COUNT(*) FROM read_parquet('{str(transformed_parquet_dir)}/*.parquet')"
).fetchone()[0]
batch_count = self.conn.execute(
f"SELECT COUNT(*) FROM read_parquet('{str(transformed_parquet_dir)}/batch/batch_*.parquet')"
).fetchone()[0]
logger.info(f"row count in source parquet files: {source_count}")
logger.info(f"row count in batch files after grouping: {batch_count}")
if source_count != batch_count:
logger.warning(
f"row count mismatch after grouping: {source_count} source rows vs {batch_count} batch rows"
)

def load_facts(self, transformed_parquet_dir):
"""
This method loads facts into a fact table from a directory containing all transformed files as parquet files
Expand Down Expand Up @@ -342,14 +356,32 @@ def load_facts(self, transformed_parquet_dir):
COPY (
SELECT *
FROM read_parquet('{f}')
WHERE MOD(HASH(fact), {n_buckets}) = {i}
WHERE CAST(HASH(fact) AS UBIGINT) % {n_buckets} = {i}
) TO '{bucket_paths[i]}' (FORMAT PARQUET, APPEND TRUE);
"""
)

logger.info(
f"Have {len(list(bucket_dir.glob('bucket_*.parquet')))} bucket files"
)

# temporary diagnostic: total rows across all buckets should equal
# total rows across all batch files (deduplication happens later)
bucket_total = self.conn.execute(
f"SELECT COUNT(*) FROM read_parquet('{str(bucket_dir)}/bucket_*.parquet')"
).fetchone()[0]
batch_total = self.conn.execute(
f"SELECT COUNT(*) FROM read_parquet('{str(transformed_parquet_dir)}/batch/batch_*.parquet')"
).fetchone()[0]
logger.info(f"row count in batch files: {batch_total}")
logger.info(
f"row count in bucket files after assignment: {bucket_total}"
)
if bucket_total != batch_total:
logger.warning(
f"row count mismatch after bucketing: {batch_total} batch rows vs {bucket_total} bucket rows"
)

process = psutil.Process(os.getpid())
mem = process.memory_info().rss / 1024**2 # Memory in MB
logger.info(f"[Memory usage] After 'bucketing': {mem:.2f} MB")
Expand Down
66 changes: 66 additions & 0 deletions tests/integration/package/test_dataset_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -980,3 +980,69 @@ def test_load_pq_to_sqlite_basic(
), "Some json object have underscores in their 'keys'"

cnx.close()


def test_multi_bucket_load_facts_no_fact_loss(tmp_path):
"""
Exercises the multi-bucket path in load_facts end-to-end and asserts that
every unique fact in the input appears exactly once in fact.parquet.

The multi-bucket path is forced by:
- creating one small parquet file per fact so group_parquet_files produces
many batch files (one per source file with target_mb=0.001)
- overriding parquet_dir_details to simulate a large dataset relative to
available memory, pushing n_buckets above 1

If any facts are silently dropped during bucket assignment or the final merge
this test will fail.
"""
n_facts = 50
unique_facts = [f"{'a' * 63}{i}" for i in range(n_facts)]

transformed_dir = tmp_path / "transformed"
transformed_dir.mkdir()

for i, fact_hash in enumerate(unique_facts):
pd.DataFrame(
{
"end_date": [""],
"entity": [i + 1],
"entry_date": ["2023-01-01"],
"entry_number": ["1"],
"fact": [fact_hash],
"field": ["name"],
"priority": ["1"],
"reference_entity": [""],
"resource": ["resource_abc"],
"start_date": [""],
"value": [f"value_{i}"],
}
).to_parquet(transformed_dir / f"resource_{i}.parquet", index=False)

package = DatasetParquetPackage(
dataset="conservation-area",
path=tmp_path / "output",
specification_dir=None,
transformed_parquet_dir=transformed_dir,
)

# One source file per batch, giving n_facts batch files
package.group_parquet_files(transformed_dir, target_mb=0.001)

# Simulate large-dataset conditions so n_buckets > 1 is calculated
package.parquet_dir_details["total_size_mb"] = 100.0
package.parquet_dir_details["memory_available"] = 1.0

package.load_facts(transformed_dir)

output_file = (
tmp_path / "output" / "fact" / "dataset=conservation-area" / "fact.parquet"
)
assert output_file.exists(), "fact.parquet was not created"

df_result = pd.read_parquet(output_file)

assert len(df_result) == n_facts, (
f"Expected {n_facts} facts but got {len(df_result)}. "
"The multi-bucket path silently dropped some facts."
)
Loading