Skip to content

Commit e1fb74d

Browse files
authored
move parquets to the same transformed directory (#450)
* move parquets to the same transformed directory * make sure output_path is a path object * add acceptance test for running the pipeline command * replace input paths with input directory for create command * change cli and test so that one arguement is expected * fix error and ensure its tested
1 parent 56070aa commit e1fb74d

5 files changed

Lines changed: 227 additions & 32 deletions

File tree

digital_land/cli.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,11 +199,11 @@ def convert_cmd(input_path, output_path):
199199
default="collection/resource.csv",
200200
help="link to where the resource list is stored",
201201
)
202-
@click.argument("input-paths", nargs=-1, type=click.Path(exists=True))
202+
@click.argument("input-dir", nargs=1, type=click.Path(exists=True))
203203
@click.pass_context
204204
def dataset_create_cmd(
205205
ctx,
206-
input_paths,
206+
input_dir,
207207
output_path,
208208
organisation_path,
209209
column_field_dir,
@@ -213,7 +213,7 @@ def dataset_create_cmd(
213213
resource_path,
214214
):
215215
return dataset_create(
216-
input_paths=input_paths,
216+
input_dir=input_dir,
217217
output_path=output_path,
218218
organisation_path=organisation_path,
219219
pipeline=ctx.obj["PIPELINE"],

digital_land/command_arguments.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
# Custom decorators for common command arguments
22
import functools
3+
from pathlib import Path
34

45
import click
56

67

78
def input_output_path(f):
89
arguments = [
910
click.argument("input-path", type=click.Path(exists=True)),
10-
click.argument("output-path", type=click.Path(), default=""),
11+
click.argument("output-path", type=click.Path(path_type=Path), default=""),
1112
]
1213
return functools.reduce(lambda x, arg: arg(x), reversed(arguments), f)
1314

digital_land/commands.py

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ def pipeline_run(
218218
pipeline,
219219
specification,
220220
input_path,
221-
output_path,
221+
output_path: Path,
222222
collection_dir, # TBD: remove, replaced by endpoints, organisations and entry_date
223223
null_path=None, # TBD: remove this
224224
issue_dir=None,
@@ -398,9 +398,9 @@ def pipeline_run(
398398
column_field_log.save(os.path.join(column_field_dir, resource + ".csv"))
399399
dataset_resource_log.save(os.path.join(dataset_resource_dir, resource + ".csv"))
400400
converted_resource_log.save(os.path.join(converted_resource_dir, resource + ".csv"))
401-
# create converted parquet in the var directory
402-
cache_dir = Path(organisation_path).parent
403-
transformed_parquet_dir = cache_dir / "transformed_parquet" / dataset
401+
# create converted parquet in the var director
402+
# TODO test without output_path conversation above to make sure we have a test that would've failed
403+
transformed_parquet_dir = output_path.parent
404404
transformed_parquet_dir.mkdir(exist_ok=True, parents=True)
405405
convert_tranformed_csv_to_pq(
406406
input_path=output_path,
@@ -412,7 +412,7 @@ def pipeline_run(
412412
# build dataset from processed resources
413413
#
414414
def dataset_create(
415-
input_paths,
415+
input_dir,
416416
output_path,
417417
organisation_path,
418418
pipeline,
@@ -424,19 +424,39 @@ def dataset_create(
424424
cache_dir="var/cache",
425425
resource_path="collection/resource.csv",
426426
):
427+
"""
428+
Create a dataset package from transformed parquet files.
429+
430+
Builds both SQLite and Parquet dataset packages from transformed resources,
431+
loading facts, entities, issues, and provenance information.
432+
433+
Args:
434+
input_dir: Directory containing transformed parquet files
435+
output_path: Path for the output SQLite database
436+
organisation_path: Path to organisation.csv file
437+
pipeline: Pipeline object containing configuration
438+
dataset: Name of the dataset to create
439+
specification: Specification object defining the dataset schema
440+
issue_dir: Directory containing issue logs (default: "issue")
441+
column_field_dir: Directory for column-field mappings (default: "var/column-field")
442+
dataset_resource_dir: Directory for dataset-resource mappings (default: "var/dataset-resource")
443+
cache_dir: Directory for caching intermediate files (default: "var/cache")
444+
resource_path: Path to resource.csv file (default: "collection/resource.csv")
445+
"""
427446
# set level for logging to see what's going on
428447
logger.setLevel(logging.INFO)
429448
logging.getLogger("digital_land.package.dataset_parquet").setLevel(logging.INFO)
430449

431-
# chek all paths are paths
450+
# check all paths are paths
432451
issue_dir = Path(issue_dir)
433452
column_field_dir = Path(column_field_dir)
434453
dataset_resource_dir = Path(dataset_resource_dir)
435454
cache_dir = Path(cache_dir)
436455
resource_path = Path(resource_path)
456+
input_dir = Path(input_dir)
437457

438458
# get the transformed files from the cache directory this is assumed right now but we may want to be stricter in the future
439-
transformed_parquet_dir = cache_dir / "transformed_parquet" / dataset
459+
# input_dir
440460

441461
# create directory for dataset_parquet_package, will create a general provenance one for now
442462
dataset_parquet_path = cache_dir / "provenance"
@@ -460,13 +480,12 @@ def dataset_create(
460480
# don't use create as we don't want to create the indexes
461481
package.create_database()
462482
package.disconnect()
463-
for path in input_paths:
464-
path_obj = Path(path)
483+
for path in input_dir.glob("*.parquet"):
465484
logging.info(f"loading column field log into {output_path}")
466-
package.load_column_fields(column_field_dir / dataset / f"{path_obj.stem}.csv")
485+
package.load_column_fields(column_field_dir / dataset / f"{path.stem}.csv")
467486
logging.info(f"loading dataset resource log into {output_path}")
468487
package.load_dataset_resource(
469-
dataset_resource_dir / dataset / f"{path_obj.stem}.csv"
488+
dataset_resource_dir / dataset / f"{path.stem}.csv"
470489
)
471490
logger.info(f"loading old entities into {output_path}")
472491
old_entity_path = Path(pipeline.path) / "old-entity.csv"
@@ -476,8 +495,8 @@ def dataset_create(
476495
logger.info(f"loading issues into {output_path}")
477496
issue_paths = issue_dir / dataset
478497
if issue_paths.exists():
479-
for issue_path in os.listdir(issue_paths):
480-
package.load_issues(os.path.join(issue_paths, issue_path))
498+
for issue_path in issue_paths.glob("*.csv"):
499+
package.load_issues(issue_path)
481500
else:
482501
logger.warning("No directory for this dataset in the provided issue_directory")
483502

@@ -491,7 +510,7 @@ def dataset_create(
491510
path=dataset_parquet_path,
492511
specification_dir=None, # TBD: package should use this specification object
493512
duckdb_path=cache_dir / "overflow.duckdb",
494-
transformed_parquet_dir=transformed_parquet_dir,
513+
transformed_parquet_dir=input_dir,
495514
)
496515
# To find facts we have a complex SQL window function that can cause memory issues. To aid the allocation of memory
497516
# we decide on a parquet strategy, based on how many parquet files we have, the overall size of these
@@ -503,10 +522,10 @@ def dataset_create(
503522

504523
# Group parquet files into approx 256MB batches (if needed)
505524
if pqpackage.strategy != "direct":
506-
pqpackage.group_parquet_files(transformed_parquet_dir, target_mb=256)
507-
pqpackage.load_facts(transformed_parquet_dir)
508-
pqpackage.load_fact_resource(transformed_parquet_dir)
509-
pqpackage.load_entities(transformed_parquet_dir, resource_path, organisation_path)
525+
pqpackage.group_parquet_files(input_dir, target_mb=256)
526+
pqpackage.load_facts(input_dir)
527+
pqpackage.load_fact_resource(input_dir)
528+
pqpackage.load_entities(input_dir, resource_path, organisation_path)
510529

511530
logger.info("loading fact,fact_resource and entity into {output_path}")
512531
pqpackage.load_to_sqlite(output_path)

tests/acceptance/test_dataset_create.py

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616

1717
from digital_land.cli import cli
1818

19-
test_collection = "conservation-area"
20-
test_dataset = "conservation-area"
19+
TEST_COLLECTION = "conservation-area"
20+
TEST_DATASET = "conservation-area"
2121

2222

2323
@pytest.fixture(scope="session")
@@ -135,7 +135,7 @@ def cache_path(tmp_path):
135135

136136

137137
@pytest.fixture
138-
def input_paths(cache_path):
138+
def input_dir(cache_path):
139139
data_dicts = {"resource_1": transformed_1_data}
140140
input_paths = []
141141
directory = cache_path / "transformed_parquet" / "conservation-area"
@@ -148,7 +148,7 @@ def input_paths(cache_path):
148148
logging.error(str(input_path))
149149
input_paths.append(str(input_path))
150150

151-
return input_paths
151+
return directory
152152

153153

154154
@pytest.fixture
@@ -222,6 +222,32 @@ def dataset_dir(session_tmp_path):
222222
def issue_dir(session_tmp_path):
223223
issue_dir = session_tmp_path / "issue"
224224
os.makedirs(issue_dir, exist_ok=True)
225+
226+
# Create test issue files for each dataset
227+
dataset = TEST_DATASET
228+
dataset_issue_dir = issue_dir / dataset
229+
os.makedirs(dataset_issue_dir, exist_ok=True)
230+
231+
# Create a sample issue CSV file
232+
issue_file = dataset_issue_dir / "test-resource.csv"
233+
issue_data = {
234+
"dataset": [dataset, dataset, dataset],
235+
"resource": ["test-resource", "test-resource", "test-resource"],
236+
"line-number": [2, 3, 4],
237+
"entry-number": [1, 2, 3],
238+
"field": ["name", "reference", "start-date"],
239+
"entity": ["", "12345", ""],
240+
"issue-type": ["missing value", "invalid format", "invalid date"],
241+
"value": ["", "INVALID-REF", "2023-13-45"],
242+
"message": [
243+
"name field is required",
244+
"reference format is invalid",
245+
"date must be in format YYYY-MM-DD",
246+
],
247+
}
248+
df = pd.DataFrame(issue_data)
249+
df.to_csv(issue_file, index=False)
250+
225251
return issue_dir
226252

227253

@@ -237,24 +263,24 @@ def resource_path(session_tmp_path):
237263
def test_acceptance_dataset_create(
238264
session_tmp_path,
239265
organisation_path,
240-
input_paths,
266+
input_dir,
241267
issue_dir,
242268
cache_path,
243269
dataset_dir,
244270
resource_path,
245271
column_field_path,
246272
dataset_resource_path,
247273
):
248-
output_path = dataset_dir / f"{test_dataset}.sqlite3"
274+
output_path = dataset_dir / f"{TEST_DATASET}.sqlite3"
249275

250276
runner = CliRunner()
251277
result = runner.invoke(
252278
cli,
253279
[
254280
"--dataset",
255-
str(test_dataset),
281+
str(TEST_DATASET),
256282
"--pipeline-dir",
257-
str(f"tests/data/{test_collection}/pipeline"),
283+
str(f"tests/data/{TEST_COLLECTION}/pipeline"),
258284
"dataset-create",
259285
"--output-path",
260286
str(output_path),
@@ -270,8 +296,8 @@ def test_acceptance_dataset_create(
270296
str(cache_path),
271297
"--resource-path",
272298
str(resource_path),
273-
]
274-
+ input_paths,
299+
str(input_dir),
300+
],
275301
catch_exceptions=False,
276302
)
277303

0 commit comments

Comments
 (0)