Skip to content

Commit 3a76e9a

Browse files
authored
add fix in (#519)
1 parent 5ef2085 commit 3a76e9a

2 files changed

Lines changed: 199 additions & 61 deletions

File tree

digital_land/package/dataset_parquet.py

Lines changed: 67 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,56 @@ def group_parquet_files(
258258
f"row count mismatch after grouping: {source_count} source rows vs {batch_count} batch rows"
259259
)
260260

261+
def _assign_to_buckets(self, batch_dir, bucket_dir, n_buckets):
262+
"""
263+
Distribute rows from all batch files into n_buckets bucket files based
264+
on HASH(fact) % n_buckets. Each unique fact hash always maps to the same
265+
bucket, so per-bucket deduplication is equivalent to global deduplication.
266+
267+
Returns the list of bucket file paths.
268+
"""
269+
digits = max(2, len(str(n_buckets - 1)))
270+
bucket_paths = [
271+
bucket_dir / f"bucket_{i:0{digits}}.parquet" for i in range(n_buckets)
272+
]
273+
for i in range(n_buckets):
274+
self.conn.execute(
275+
f"""
276+
COPY (
277+
SELECT *
278+
FROM read_parquet('{str(batch_dir)}/batch_*.parquet')
279+
WHERE CAST(HASH(fact) AS UBIGINT) % {n_buckets} = {i}
280+
) TO '{bucket_paths[i]}' (FORMAT PARQUET);
281+
"""
282+
)
283+
return bucket_paths
284+
285+
def _deduplicate_buckets(self, bucket_paths, result_dir):
286+
"""
287+
Apply per-fact deduplication to each bucket file, keeping the highest
288+
priority and most recent entry for each unique fact hash.
289+
290+
Returns the list of result file paths.
291+
"""
292+
digits = max(2, len(str(len(bucket_paths) - 1)))
293+
result_paths = [
294+
result_dir / f"result_{i:0{digits}}.parquet"
295+
for i in range(len(bucket_paths))
296+
]
297+
for i, bucket_path in enumerate(bucket_paths):
298+
self.conn.execute(
299+
f"""
300+
COPY (
301+
SELECT *
302+
FROM read_parquet('{bucket_path}')
303+
QUALIFY ROW_NUMBER() OVER (
304+
PARTITION BY fact ORDER BY priority, entry_date DESC, entry_number DESC
305+
) = 1
306+
) TO '{result_paths[i]}' (FORMAT PARQUET);
307+
"""
308+
)
309+
return result_paths
310+
261311
def load_facts(self, transformed_parquet_dir):
262312
"""
263313
This method loads facts into a fact table from a directory containing all transformed files as parquet files
@@ -336,43 +386,23 @@ def load_facts(self, transformed_parquet_dir):
336386
logger.info(
337387
"Need to use multiple buckets in windowed function for facts"
338388
)
339-
bucket_dir = transformed_parquet_dir / "bucket"
340-
bucket_dir.mkdir(parents=True, exist_ok=True)
341-
digits = max(2, len(str(n_buckets - 1)))
342-
bucket_paths = [
343-
bucket_dir / f"bucket_{i:0{digits}}.parquet"
344-
for i in range(n_buckets)
345-
]
389+
batch_dir = transformed_parquet_dir / "batch"
346390
logger.info(
347-
f"Have {len(list(transformed_parquet_dir.glob('batch/batch_*.parquet')))} batch files"
391+
f"Have {len(list(batch_dir.glob('batch_*.parquet')))} batch files"
348392
)
349393

350-
# Loop over each batch file and assign to a bucket file
394+
bucket_dir = transformed_parquet_dir / "bucket"
395+
bucket_dir.mkdir(parents=True, exist_ok=True)
351396
logger.info(f"Assigning to {n_buckets} buckets")
352-
for f in transformed_parquet_dir.glob("batch/batch_*.parquet"):
353-
for i in range(n_buckets):
354-
self.conn.execute(
355-
f"""
356-
COPY (
357-
SELECT *
358-
FROM read_parquet('{f}')
359-
WHERE CAST(HASH(fact) AS UBIGINT) % {n_buckets} = {i}
360-
) TO '{bucket_paths[i]}' (FORMAT PARQUET, APPEND TRUE);
361-
"""
362-
)
363-
364-
logger.info(
365-
f"Have {len(list(bucket_dir.glob('bucket_*.parquet')))} bucket files"
366-
)
397+
bucket_paths = self._assign_to_buckets(batch_dir, bucket_dir, n_buckets)
398+
logger.info(f"Have {len(bucket_paths)} bucket files")
367399

368-
# temporary diagnostic: total rows across all buckets should equal
369-
# total rows across all batch files (deduplication happens later)
400+
batch_total = self.conn.execute(
401+
f"SELECT COUNT(*) FROM read_parquet('{str(batch_dir)}/batch_*.parquet')"
402+
).fetchone()[0]
370403
bucket_total = self.conn.execute(
371404
f"SELECT COUNT(*) FROM read_parquet('{str(bucket_dir)}/bucket_*.parquet')"
372405
).fetchone()[0]
373-
batch_total = self.conn.execute(
374-
f"SELECT COUNT(*) FROM read_parquet('{str(transformed_parquet_dir)}/batch/batch_*.parquet')"
375-
).fetchone()[0]
376406
logger.info(f"row count in batch files: {batch_total}")
377407
logger.info(
378408
f"row count in bucket files after assignment: {bucket_total}"
@@ -383,51 +413,27 @@ def load_facts(self, transformed_parquet_dir):
383413
)
384414

385415
process = psutil.Process(os.getpid())
386-
mem = process.memory_info().rss / 1024**2 # Memory in MB
416+
mem = process.memory_info().rss / 1024**2
387417
logger.info(f"[Memory usage] After 'bucketing': {mem:.2f} MB")
388418

389419
result_dir = transformed_parquet_dir / "result"
390420
result_dir.mkdir(parents=True, exist_ok=True)
391-
result_paths = [
392-
result_dir / f"result_{i:0{digits}}.parquet"
393-
for i in range(n_buckets)
394-
]
395-
for i in range(n_buckets):
396-
bucket_path = bucket_dir / f"bucket_{i:0{digits}}.parquet"
397-
self.conn.execute(
398-
f"""
399-
COPY (
400-
SELECT *
401-
FROM read_parquet('{bucket_path}')
402-
QUALIFY ROW_NUMBER() OVER (
403-
PARTITION BY fact ORDER BY priority, entry_date DESC, entry_number DESC
404-
) = 1
405-
) TO '{result_paths[i]}' (FORMAT PARQUET);
406-
"""
407-
)
408-
logger.info(
409-
f"Have {len(list(transformed_parquet_dir.glob('result_*.parquet')))} result files"
410-
)
411-
412-
# for path in bucket_paths:
413-
# path.unlink(missing_ok=True)
421+
result_paths = self._deduplicate_buckets(bucket_paths, result_dir)
422+
logger.info(f"Have {len(result_paths)} result files")
414423

415424
process = psutil.Process(os.getpid())
416-
mem = process.memory_info().rss / 1024**2 # Memory in MB
425+
mem = process.memory_info().rss / 1024**2
417426
logger.info(f"[Memory usage] After 'result': {mem:.2f} MB")
418427

419428
self.conn.execute(
420429
f"""
421-
COPY(
422-
SELECT * FROM
423-
read_parquet('{str(result_dir)}/result_*.parquet')
424-
) TO '{str(output_path)}' (FORMAT PARQUET);
425-
"""
430+
COPY (
431+
SELECT * FROM read_parquet('{str(result_dir)}/result_*.parquet')
432+
) TO '{str(output_path)}' (FORMAT PARQUET);
433+
"""
426434
)
427435
logger.info(f"Facts saved to output_path: {output_path}")
428436
logger.info(f"output_path exists: {os.path.exists(output_path)}")
429-
# for path in result_paths:
430-
# path.unlink(missing_ok=True)
431437

432438
process = psutil.Process(os.getpid())
433439
mem = process.memory_info().rss / 1024**2 # Memory in MB

tests/integration/package/test_dataset_parquet.py

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1046,3 +1046,135 @@ def test_multi_bucket_load_facts_no_fact_loss(tmp_path):
10461046
f"Expected {n_facts} facts but got {len(df_result)}. "
10471047
"The multi-bucket path silently dropped some facts."
10481048
)
1049+
1050+
1051+
def test_assign_to_buckets_creates_correct_files(tmp_path):
1052+
"""
1053+
Tests that _assign_to_buckets creates exactly n_buckets files in bucket_dir
1054+
and that the total row count across all bucket files equals the total row
1055+
count in the batch files (no rows dropped or duplicated during assignment).
1056+
"""
1057+
n_facts = 30
1058+
n_buckets = 3
1059+
1060+
batch_dir = tmp_path / "batch"
1061+
batch_dir.mkdir()
1062+
for i in range(n_facts):
1063+
pd.DataFrame(
1064+
{
1065+
"end_date": [""],
1066+
"entity": [i + 1],
1067+
"entry_date": ["2023-01-01"],
1068+
"entry_number": ["1"],
1069+
"fact": [f"{'a' * 63}{i}"],
1070+
"field": ["name"],
1071+
"priority": ["1"],
1072+
"reference_entity": [""],
1073+
"resource": ["resource_abc"],
1074+
"start_date": [""],
1075+
"value": [f"value_{i}"],
1076+
}
1077+
).to_parquet(batch_dir / f"batch_{i:02}.parquet", index=False)
1078+
1079+
bucket_dir = tmp_path / "bucket"
1080+
bucket_dir.mkdir()
1081+
1082+
package = DatasetParquetPackage(
1083+
dataset="conservation-area",
1084+
path=tmp_path / "output",
1085+
specification_dir=None,
1086+
)
1087+
bucket_paths = package._assign_to_buckets(batch_dir, bucket_dir, n_buckets)
1088+
1089+
assert (
1090+
len(bucket_paths) == n_buckets
1091+
), f"Expected {n_buckets} bucket files, got {len(bucket_paths)}"
1092+
assert all(p.exists() for p in bucket_paths), "Not all bucket files were created"
1093+
1094+
bucket_total = sum(len(pd.read_parquet(p)) for p in bucket_paths)
1095+
assert bucket_total == n_facts, (
1096+
f"Expected {n_facts} total rows across buckets, got {bucket_total}. "
1097+
"Rows were dropped or duplicated during bucket assignment."
1098+
)
1099+
1100+
# each fact hash should appear in exactly one bucket
1101+
fact_counts = {}
1102+
for p in bucket_paths:
1103+
for fact in pd.read_parquet(p)["fact"]:
1104+
fact_counts[fact] = fact_counts.get(fact, 0) + 1
1105+
assert all(
1106+
v == 1 for v in fact_counts.values()
1107+
), "Some fact hashes appear in more than one bucket"
1108+
1109+
1110+
def test_deduplicate_buckets_creates_correct_files(tmp_path):
1111+
"""
1112+
Tests that _deduplicate_buckets creates exactly one result file per bucket
1113+
and that duplicate fact hashes within a bucket are reduced to a single row,
1114+
keeping the row with the latest entry_date.
1115+
"""
1116+
n_buckets = 2
1117+
1118+
bucket_dir = tmp_path / "bucket"
1119+
bucket_dir.mkdir()
1120+
1121+
# bucket_00: two rows for the same fact — only the later entry_date should survive
1122+
pd.DataFrame(
1123+
{
1124+
"end_date": ["", ""],
1125+
"entity": [1, 1],
1126+
"entry_date": ["2023-01-01", "2023-06-01"],
1127+
"entry_number": ["1", "2"],
1128+
"fact": ["fact_aaa", "fact_aaa"],
1129+
"field": ["name", "name"],
1130+
"priority": ["1", "1"],
1131+
"reference_entity": ["", ""],
1132+
"resource": ["res_a", "res_b"],
1133+
"start_date": ["", ""],
1134+
"value": ["old_value", "new_value"],
1135+
}
1136+
).to_parquet(bucket_dir / "bucket_00.parquet", index=False)
1137+
1138+
# bucket_01: two distinct facts, no duplicates
1139+
pd.DataFrame(
1140+
{
1141+
"end_date": ["", ""],
1142+
"entity": [2, 3],
1143+
"entry_date": ["2023-01-01", "2023-01-01"],
1144+
"entry_number": ["1", "1"],
1145+
"fact": ["fact_bbb", "fact_ccc"],
1146+
"field": ["name", "name"],
1147+
"priority": ["1", "1"],
1148+
"reference_entity": ["", ""],
1149+
"resource": ["res_a", "res_a"],
1150+
"start_date": ["", ""],
1151+
"value": ["value_b", "value_c"],
1152+
}
1153+
).to_parquet(bucket_dir / "bucket_01.parquet", index=False)
1154+
1155+
result_dir = tmp_path / "result"
1156+
result_dir.mkdir()
1157+
1158+
package = DatasetParquetPackage(
1159+
dataset="conservation-area",
1160+
path=tmp_path / "output",
1161+
specification_dir=None,
1162+
)
1163+
bucket_paths = [bucket_dir / "bucket_00.parquet", bucket_dir / "bucket_01.parquet"]
1164+
result_paths = package._deduplicate_buckets(bucket_paths, result_dir)
1165+
1166+
assert (
1167+
len(result_paths) == n_buckets
1168+
), f"Expected {n_buckets} result files, got {len(result_paths)}"
1169+
assert all(p.exists() for p in result_paths), "Not all result files were created"
1170+
1171+
df_00 = pd.read_parquet(result_paths[0])
1172+
assert (
1173+
len(df_00) == 1
1174+
), "Duplicate fact in bucket_00 should be deduplicated to 1 row"
1175+
assert (
1176+
df_00.iloc[0]["value"] == "new_value"
1177+
), "The row with the latest entry_date should be kept"
1178+
1179+
df_01 = pd.read_parquet(result_paths[1])
1180+
assert len(df_01) == 2, "bucket_01 has 2 distinct facts, both should be kept"

0 commit comments

Comments
 (0)