diff --git a/.gitignore b/.gitignore index e91b8b1e..98cb8de4 100644 --- a/.gitignore +++ b/.gitignore @@ -114,4 +114,5 @@ examples/demo/data *node_modules* data -tmp \ No newline at end of file +tmp +env.env \ No newline at end of file diff --git a/dtran/argtype.py b/dtran/argtype.py index 3bf0f07b..2392b1b7 100644 --- a/dtran/argtype.py +++ b/dtran/argtype.py @@ -22,6 +22,7 @@ class ArgType(object): Callable[[Any, bool, str, str], 'ArgType']] = dataset OrderedDict: 'ArgType' = None ListString: 'ArgType' = None + ListOrOneString: 'ArgType' = None String: 'ArgType' = None Number: 'ArgType' = None Boolean: 'ArgType' = None @@ -75,6 +76,7 @@ def type_cast(self, val: str): from_str=lambda val: str(Path(val))) ArgType.OrderedDict = ArgType("ordered_dict", validate=lambda val: isinstance(val, dict)) ArgType.ListString = ArgType("list_string", validate=lambda val: isinstance(val, list) and all(isinstance(x, str) for x in val)) +ArgType.ListOrOneString = ArgType("list_or_one_string", validate=lambda val: isinstance(val, str) or (isinstance(val, list) and all(isinstance(x, str) for x in val))) ArgType.String = ArgType("string", validate=lambda val: isinstance(val, str)) ArgType.Number = ArgType("number", validate=lambda val: isinstance(val, (int, float)), from_str=lambda val: ('.' in val and float(val)) or int(val)) diff --git a/dtran/config_parser.py b/dtran/config_parser.py index 85fa7943..ccf586a5 100644 --- a/dtran/config_parser.py +++ b/dtran/config_parser.py @@ -36,10 +36,6 @@ class InputSchema(Schema): class PipelineSchema(Schema): - def __init__(self, cli_inputs, **kwargs): - super().__init__(**kwargs) - self.cli_inputs = cli_inputs - version = fields.Str(required=True) description = fields.Str() inputs = OrderedDictField(validate=validate.Length(min=1), @@ -53,6 +49,10 @@ def __init__(self, cli_inputs, **kwargs): class Meta: ordered = True + def __init__(self, cli_inputs, **kwargs): + super().__init__(**kwargs) + self.cli_inputs: Dict[Union[Tuple[str, str], str], str] = cli_inputs + @staticmethod def process_input(val, data): # processing for root-level pipeline inputs recursively @@ -96,17 +96,27 @@ def construct_pipeline(self, data, **kwargs): mappings[name] = (cls, adapter_count[adapter['adapter']]) # validating cli_inputs - for name, input in self.cli_inputs: - if name not in mappings: - raise ValidationError( - ['cli_inputs exception', f"invalid adapter name {name}. not found in config file"]) - if input not in mappings[name][0].inputs: - raise ValidationError(['cli_inputs exception', - f"invalid input {input} in {data['adapters'][name]['adapter']} for {name}"]) - # cli_inputs has higher priority and overwrites config_file data - if 'inputs' not in data['adapters'][name]: - data['adapters'][name]['inputs'] = OrderedDict() - data['adapters'][name]['inputs'][input] = self.cli_inputs[(name, input)] + for cli_input in self.cli_inputs: + if isinstance(cli_input, (tuple,list)): + name, arg = cli_input + + if name not in mappings: + raise ValidationError( + ['cli_inputs exception', f"invalid adapter name {name}. not found in config file"]) + if arg not in mappings[name][0].inputs: + raise ValidationError(['cli_inputs exception', + f"invalid input {arg} in {data['adapters'][name]['adapter']} for {name}"]) + # cli_inputs has higher priority and overwrites config_file data + if 'inputs' not in data['adapters'][name]: + data['adapters'][name]['inputs'] = OrderedDict() + data['adapters'][name]['inputs'][arg] = self.cli_inputs[(name, arg)] + else: + name = cli_input + if name in data['inputs']: + if isinstance(data['inputs'][name], (dict, OrderedDict)): + data['inputs'][name]['value'] = self.cli_inputs[name] + else: + data['inputs'][name] = self.cli_inputs[name] inputs = {} wired = [] diff --git a/dtran/dcat/api.py b/dtran/dcat/api.py index 3a5bba82..0de324aa 100644 --- a/dtran/dcat/api.py +++ b/dtran/dcat/api.py @@ -280,7 +280,6 @@ def handle_api_response(response: requests.Response): """ This is a convenience method to handle api responses :param response: - :param print_response: :return: """ parsed_response = response.json() diff --git a/dtran/dcat/scripts/delete.json b/dtran/dcat/scripts/delete.json index 0637a088..e171d1f6 100644 --- a/dtran/dcat/scripts/delete.json +++ b/dtran/dcat/scripts/delete.json @@ -1 +1 @@ -[] \ No newline at end of file +["b3e79dc2-8fa1-4203-ac82-b5267925191f"] \ No newline at end of file diff --git a/dtran/dcat/scripts/register_datasets.py b/dtran/dcat/scripts/register_datasets.py index bbe80447..9ec9c7ef 100644 --- a/dtran/dcat/scripts/register_datasets.py +++ b/dtran/dcat/scripts/register_datasets.py @@ -14,8 +14,8 @@ def cli(): ignore_unknown_options=True, allow_extra_args=False, )) -@click.option("--name", help="DCAT dataset name", default="test-dataset") -@click.option("--description", help="DCAT dataset description", default="test-description") +@click.option("--name", help="DCAT dataset name", prompt="Dataset name") +@click.option("--description", help="DCAT dataset description", prompt="Dataset description") @click.option("--metadata_path", help="DCAT dataset metadata file path", default=None) @click.option("--resource_path", help="DCAT dataset resources json path, should be a file name-url dict", default=None) @click.option("--resource_type", help="DCAT dataset resource type", default="zip") @@ -23,11 +23,11 @@ def cli(): def register_dataset(name, description, metadata_path, resource_path, variable_path, resource_type): """ Registers DCAT dataset with multiple resources. - Example: PYTHONPATH=$(pwd):$(pwd):$PYTHONPATH python dtran/dcat/scripts/register_datasets.py register_dataset + Example: PYTHONPATH=$(pwd):$PYTHONPATH python dtran/dcat/scripts/register_datasets.py register_dataset --resource_path=./uploaded.json --variable_path=variables.json """ - dcat = DCatAPI.get_instance("https://api.mint-data-catalog.org") + dcat = DCatAPI.get_instance() if metadata_path is None: metadata = {} @@ -85,7 +85,7 @@ def delete_dataset(dcatid, json_path): Delete specified datasets. Example: PYTHONPATH=$(pwd):$(pwd):$PYTHONPATH python dtran/dcat/scripts/register_datasets.py delete_dataset --dcatid=c4fedf48-f888-4de1-b60f-c6ac5cb1615b """ - dcat = DCatAPI.get_instance("https://api.mint-data-catalog.org") + dcat = DCatAPI.get_instance() if dcatid is None and json_path is None: raise ValueError("Please enter dataset ids to delete!") diff --git a/dtran/dcat/scripts/resource.json b/dtran/dcat/scripts/resource.json new file mode 100644 index 00000000..1905ddc7 --- /dev/null +++ b/dtran/dcat/scripts/resource.json @@ -0,0 +1,3 @@ +{ + "awash.tif": "https://data.mint.isi.edu/files/hand-dem/GIS-Oromia/Awash/Awash-border_DEM_buffer.tif" +} \ No newline at end of file diff --git a/dtran/dcat/scripts/upload_files_in_batch.py b/dtran/dcat/scripts/upload_files_in_batch.py index 53005406..97e8991a 100644 --- a/dtran/dcat/scripts/upload_files_in_batch.py +++ b/dtran/dcat/scripts/upload_files_in_batch.py @@ -8,14 +8,20 @@ def setup_owncloud(upload_dir): oc = owncloud.Client('https://files.mint.isi.edu/') - oc.login('datacatalog', 'sVMIryVWEx3Ec2') - oc.mkdir(upload_dir) + oc.login(os.environ['USERNAME'], os.environ['PASSWORD']) + try: + # https://github.com/owncloud/pyocclient/blob/master/owncloud/owncloud.py + # trying to look through the documentation. However, I didn't see a way to check if the directory exists + # before, so I just assume that if the operator fails, the directory is already there. + oc.mkdir(upload_dir) + except: + pass return oc def upload_to_mint_server(target_dir, target_filename, upload_url): upload_output = subprocess.check_output( - f"curl -sD - --user upload:HVmyqAPWDNuk5SmkLOK2 --upload-file {target_dir}/{target_filename} {upload_url}", + f"curl -sD - --user {os.environ['USERNAME']}:{os.environ['PASSWORD']} --upload-file {target_dir}/{target_filename} {upload_url}", shell=True, ) uploaded_url = f'https://{upload_output.decode("utf-8").split("https://")[-1]}' diff --git a/dtran/dcat/scripts/variables.json b/dtran/dcat/scripts/variables.json index 9896ab96..dedcf17d 100644 --- a/dtran/dcat/scripts/variables.json +++ b/dtran/dcat/scripts/variables.json @@ -1,10 +1,10 @@ { "standard_variables": [ { - "ontology": "ScientificVariablesOntology", - "name": "precipitation_leg_volume_flux", - "uri": "http://www.geoscienceontology.org/svo/svl/variable/1.0.0/#atmosphere%40role%7Esource_water%40role%7Emain_precipitation__precipitation_leq_volume_flux" + "ontology": "MINT Ontology", + "name": "var_0", + "uri": "http://mint.isi.edu/var_0" } ], - "variable_names": ["test-var"] + "variable_names": ["var_0"] } \ No newline at end of file diff --git a/dtran/main.py b/dtran/main.py index b69bf1cb..27f90aaf 100644 --- a/dtran/main.py +++ b/dtran/main.py @@ -19,7 +19,7 @@ def exec_pipeline(ctx, config=None): Creates a pipeline and execute it based on given config and input(optional). To specify the input to pipeline, use (listed in ascending priority): 1) config file option: --config path_to_file - 2) arg params: e.g. --FuncName.Attr=value + 2) arg params: e.g. --FuncName.Attr=value, --InputName=value """ # Accept user-specified inputs: expect format of --key=value @@ -27,10 +27,13 @@ def exec_pipeline(ctx, config=None): for arg in ctx.args: try: key, value = arg[2:].split("=") - func_name, attr_name = key.split(".") - user_inputs[(func_name, attr_name)] = value + if key.find(".") != -1: + func_name, attr_name = key.split(".") + user_inputs[(func_name, attr_name)] = value + else: + user_inputs[key] = value except ValueError: - print(f"user input: '{arg}' should have format '--FuncName.Attr=value'") + print(f"user input: '{arg}' should have format '--FuncName.Attr=value' or --InputName=value") return parser = ConfigParser(user_inputs) diff --git a/environment.yml b/environment.yml index f41dc5b4..53e51a34 100644 --- a/environment.yml +++ b/environment.yml @@ -1,34 +1,185 @@ -# https://docs.conda.io/projects/conda-build/en/latest/resources/package-spec.html#package-match-specifications name: mintdt channels: - conda-forge - defaults dependencies: - - python=3.8 - - pip - - gdal=3.0.4 - - scipy - - pydap=3.2.2 - - numpy - - shapely=1.7 - - dask=2.11.0 + - _libgcc_mutex=0.1=conda_forge + - _openmp_mutex=4.5=1_llvm + - affine=2.3.0=py_0 + - attrs=19.3.0=py_0 + - beautifulsoup4=4.9.1=py38h32f6830_0 + - bokeh=2.1.1=py38h32f6830_0 + - boost-cpp=1.72.0=h8e57a91_0 + - bottleneck=1.3.2=py38h8790de6_1 + - brotlipy=0.7.0=py38h1e0a361_1000 + - bzip2=1.0.8=h516909a_2 + - ca-certificates=2020.6.20=hecda079_0 + - cairo=1.16.0=hcf35c78_1003 + - certifi=2020.6.20=py38h32f6830_0 + - cffi=1.14.0=py38hd463f26_0 + - cfitsio=3.470=h3eac812_5 + - cftime=1.2.0=py38h8790de6_1 + - chardet=3.0.4=py38h32f6830_1006 + - click-plugins=1.1.1=py_0 + - cloudpickle=1.5.0=py_0 + - cryptography=2.9.2=py38h766eaa4_0 + - curl=7.71.1=he644dc0_1 + - cytoolz=0.10.1=py38h516909a_0 + - dask=2.11.0=py_0 + - dask-core=2.11.0=py_0 + - distributed=2.20.0=py38h32f6830_0 + - docopt=0.6.2=py_1 + - expat=2.2.9=he1b5a44_2 + - fontconfig=2.13.1=h86ecdb6_1001 + - freetype=2.10.2=he06d7ca_0 + - freexl=1.0.5=h14c3975_1002 + - fsspec=0.7.4=py_0 + - gdal=3.0.4=py38h172510d_10 + - geos=3.8.1=he1b5a44_0 + - geotiff=1.6.0=h05acad5_0 + - gettext=0.19.8.1=hc5be6a0_1002 + - giflib=5.2.1=h516909a_2 + - glib=2.65.0=h6f030ca_0 + - hdf4=4.2.13=hf30be14_1003 + - hdf5=1.10.6=nompi_h3c11f04_100 + - heapdict=1.0.1=py_0 + - icu=64.2=he1b5a44_1 + - idna=2.10=pyh9f0ad1d_0 + - jinja2=2.11.2=pyh9f0ad1d_0 + - jpeg=9d=h516909a_0 + - json-c=0.13.1=hbfbb72e_1002 + - kealib=1.4.13=h33137a7_1 + - krb5=1.17.1=hfafb76e_1 + - lcms2=2.11=hbd6801e_0 + - ld_impl_linux-64=2.34=h53a641e_7 + - libblas=3.8.0=17_openblas + - libcblas=3.8.0=17_openblas + - libcurl=7.71.1=hcdd3856_1 + - libdap4=3.20.6=h1d1bd15_0 + - libedit=3.1.20191231=h46ee950_1 + - libffi=3.2.1=he1b5a44_1007 + - libgcc-ng=9.2.0=h24d8f2e_2 + - libgdal=3.0.4=he6a97d6_10 + - libgfortran-ng=7.5.0=hdf63c60_6 + - libiconv=1.15=h516909a_1006 + - libkml=1.3.0=hb574062_1011 + - liblapack=3.8.0=17_openblas + - libnetcdf=4.7.4=nompi_h84807e1_104 + - libopenblas=0.3.10=h5ec1e0e_0 + - libpng=1.6.37=hed695b0_1 + - libpq=12.2=h5513abc_1 + - libspatialite=4.3.0a=h2482549_1038 + - libssh2=1.9.0=hab1572f_3 + - libstdcxx-ng=9.2.0=hdf63c60_2 + - libtiff=4.1.0=hc7e4089_6 + - libuuid=2.32.1=h14c3975_1000 + - libwebp-base=1.1.0=h516909a_3 + - libxcb=1.13=h14c3975_1002 + - libxml2=2.9.10=hee79883_0 + - libxslt=1.1.33=h31b3aaa_0 + - llvm-openmp=10.0.0=hc9558a2_0 + - locket=0.2.0=py_2 + - lxml=4.5.2=py38hbb43d70_0 + - lz4-c=1.9.2=he1b5a44_1 + - markupsafe=1.1.1=py38h1e0a361_1 + - mechanicalsoup=0.12.0=py_0 + - msgpack-python=1.0.0=py38hbf85e49_1 + - ncurses=6.1=hf484d3e_1002 + - netcdf4=1.5.3=nompi_py38hfd55d45_105 + - numpy=1.18.5=py38h8854b6b_0 + - olefile=0.46=py_0 + - openjpeg=2.3.1=h981e76c_3 + - openssl=1.1.1g=h516909a_0 + - packaging=20.4=pyh9f0ad1d_0 + - pandas=1.0.5=py38hcb8c335_0 + - partd=1.1.0=py_0 + - pcre=8.44=he1b5a44_0 + - pillow=7.2.0=py38h9776b28_1 + - pip=20.1.1=py_1 + - pixman=0.38.0=h516909a_1003 + - poppler=0.87.0=h4190859_1 + - poppler-data=0.4.9=1 + - postgresql=12.2=h8573dbc_1 + - proj=7.0.0=h966b41f_4 + - psutil=5.7.0=py38h1e0a361_1 + - pthread-stubs=0.4=h14c3975_1001 + - pycparser=2.20=pyh9f0ad1d_2 + - pydap=3.2.2=py38_1000 + - pyopenssl=19.1.0=py_1 + - pyparsing=2.4.7=pyh9f0ad1d_0 + - pyproj=2.6.1.post1=py38h7521cb9_0 + - pysocks=1.7.1=py38h32f6830_1 + - python=3.8.3=cpython_he5300dc_0 + - python_abi=3.8=1_cp38 + - pytz=2020.1=pyh9f0ad1d_0 + - pyyaml=5.3.1=py38h1e0a361_0 + - rasterio=1.1.5=py38h033e0f6_0 + - readline=8.0=h46ee950_1 + - requests=2.24.0=pyh9f0ad1d_0 + - rioxarray=0.0.31=py_0 + - scipy=1.5.0=py38h18bccfc_0 + - setuptools=49.2.0=py38h32f6830_0 + - shapely=1.7.0=py38hd168ffb_3 + - six=1.15.0=pyh9f0ad1d_0 + - snuggs=1.4.7=py_0 + - sortedcontainers=2.2.2=pyh9f0ad1d_0 + - soupsieve=2.0.1=py38h32f6830_0 + - sqlite=3.32.3=hcee41ef_1 + - tbb=2020.1=hc9558a2_0 + - tblib=1.6.0=py_0 + - tiledb=1.7.7=h8efa9f0_3 + - tk=8.6.10=hed695b0_0 + - toolz=0.10.0=py_0 + - tornado=6.0.4=py38h1e0a361_1 + - typing_extensions=3.7.4.2=py_0 + - tzcode=2020a=h516909a_0 + - urllib3=1.25.9=py_0 + - webob=1.8.6=py_0 + - wheel=0.34.2=py_1 + - xerces-c=3.2.2=h8412b87_1004 + - xorg-kbproto=1.0.7=h14c3975_1002 + - xorg-libice=1.0.10=h516909a_0 + - xorg-libsm=1.2.3=h84519dc_1000 + - xorg-libx11=1.6.9=h516909a_0 + - xorg-libxau=1.0.9=h14c3975_0 + - xorg-libxdmcp=1.1.3=h516909a_0 + - xorg-libxext=1.3.4=h516909a_0 + - xorg-libxrender=0.9.10=h516909a_1002 + - xorg-renderproto=0.11.1=h14c3975_1002 + - xorg-xextproto=7.3.0=h14c3975_1002 + - xorg-xproto=7.0.31=h14c3975_1007 + - xz=5.2.5=h516909a_1 + - yaml=0.2.5=h516909a_0 + - zict=2.0.0=py_0 + - zlib=1.2.11=h516909a_1006 + - zstd=1.4.4=h6597ccf_3 - pip: - - drepr>=2.9.2 - - ujson - - netCDF4>=1.5.3 - - arpeggio - - rdflib - - flask - - flask_cors - - requests - - tqdm - - marshmallow==3.5 - - xarray==0.15 - - networkx==2.4 - - python-dateutil==2.8 - - click==7.0 - - ccut - - matplotlib - - python-dotenv==0.12 - - peewee==3.13 - - pyocclient + - arpeggio==1.9.2 + - ccut==1.0.0 + - click==7.0 + - cligj==0.5.0 + - cycler==0.10.0 + - decorator==4.4.2 + - drepr==2.9.10 + - fiona==1.8.13.post1 + - flask==1.1.2 + - flask-cors==3.0.8 + - isodate==0.6.0 + - itsdangerous==1.1.0 + - kiwisolver==1.2.0 + - marshmallow==3.5.0 + - matplotlib==3.2.2 + - munch==2.5.0 + - networkx==2.4 + - peewee==3.13.0 + - pyocclient==0.4 + - python-dateutil==2.8.0 + - python-dotenv==0.12.0 + - rdflib==5.0.0 + - ruamel-yaml==0.16.10 + - ruamel-yaml-clib==0.2.0 + - tqdm==4.47.0 + - ujson==3.0.0 + - werkzeug==1.0.1 + - xarray==0.15.0 + - xmltodict==0.12.0 diff --git a/examples/dame/filled/dem_cropping.yml b/examples/dame/filled/dem_cropping.yml new file mode 100644 index 00000000..af5d10b9 --- /dev/null +++ b/examples/dame/filled/dem_cropping.yml @@ -0,0 +1,37 @@ +version: "1.0" +description: Data transformation to generate daily average data from original GLDAS data sources +inputs: + dataset_id: + comment: DataCatalog Dataset ID for raster file + value: 3c5aa587-2e3d-49ef-ad60-532370941e87 + crop_region_xmin: + comment: Target region bounding box xmin coordinate + value: 32.75418 + crop_region_ymin: + comment: Target region bounding box ymin coordinate + value: 8.22206 + crop_region_xmax: + comment: Target region bounding box xmax coordinate + value: 47.98942 + crop_region_ymax: + comment: Target region bounding box ymax coordinate + value: 15.15943 + output_file: + comment: Path to output file + value: output.tiff +adapters: + read_func: + comment: My geotiff read func adapter + adapter: funcs.DcatReadNoReprFunc + inputs: + dataset_id: $$.dataset_id + my_crop_wrapper: + comment: My cropping func wrapper adapter + adapter: funcs.dem.DEMCropFunc + inputs: + input_file: $.read_func.data_path + output_file: $$.output_file + xmin: $$.crop_region_xmin + ymin: $$.crop_region_ymin + xmax: $$.crop_region_xmax + ymax: $$.crop_region_ymax diff --git a/examples/dame/filled/gldas_cycles.yml b/examples/dame/filled/gldas_cycles.yml new file mode 100644 index 00000000..6c0e11f7 --- /dev/null +++ b/examples/dame/filled/gldas_cycles.yml @@ -0,0 +1,48 @@ +version: "1" +description: Data transformation to generate Cycles-ready input files (RTS) from GLDAS weather data sources +inputs: + start_date: + comment: Start time to filter Resources for DataCatalog GLDAS/GPM Dataset ("YYYY-MM-DD") + value: "2000-01-01" + end_date: + comment: End time to filter Resources for DataCatalog GLDAS/GPM Dataset ("YYYY-MM-DD") + value: "2000-01-01" + dataset_id: + comment: Dataset ID + value: 5babae3f-c468-4e01-862e-8b201468e3b5 + output_file: + comment: "Path to the output directory" + value: "/tmp/demo/output/GLDAS/" + output_prefix: + comment: "Prefix to be added to the output file names" + value: "cycles" + latitude: + comment: "Latitude to extract data (use coord_file when lat = -1 or long = -1)" + value: -1 # + longitude: + comment: "Longitude to extract data (use coord_file when lat = -1 or long = -1)" + value: -1 # use coord_file when lat = -1 or long = -1 + coord_file: + comment: "File path that contains lat/long of the extracting locations" + value: "/tmp/demo/input/oromia.csv" +adapters: + dcat_read_func: + comment: | + Weather dataset + adapter: funcs.DcatReadFunc + inputs: + dataset_id: $$.dataset_id + start_time: $$.start_time + end_time: $$.end_time + gldas2cycles_func: + comment: GLDAS2Cycles adapter + adapter: funcs.Gldas2CyclesFunc + inputs: + start_date: $$.start_date + end_date: $$.end_date + gldas_path: $$.gldas_path + output_path: $$.output_file + output_prefix: $$.output_prefix + latitude: $$.latitude + longitude: $$.longitude + coord_file: $$.coord_file diff --git a/examples/dame/filled/topoflow_gldas.yml b/examples/dame/filled/topoflow_gldas.yml new file mode 100644 index 00000000..25c4989c --- /dev/null +++ b/examples/dame/filled/topoflow_gldas.yml @@ -0,0 +1,26 @@ +adapters: + weather_data: + comment: | + Weather dataset + adapter: funcs.ReadFunc + inputs: + resource_path: /home/rook/workspace/mint/MINT-Transformation/examples/topoflow4/dev/gpm/download/*.nc4 + repr_file: /home/rook/workspace/mint/MINT-Transformation/examples/topoflow4/dev/gpm.yml + geotiff_writer: + adapter: funcs.GeoTiffWriteFunc + inputs: + dataset: $.weather_data.data + variable_name: atmosphere_water__precipitation_mass_flux + output_dir: /home/rook/workspace/mint/MINT-Transformation/examples/topoflow4/dev/gpm/geotiff + tf_trans: + adapter: funcs.topoflow.topoflow_climate.Topoflow4ClimateWriteFunc + inputs: + geotiff_files: $.geotiff_writer.output_files + cropped_geotiff_dir: /home/rook/workspace/mint/MINT-Transformation/examples/topoflow4/dev/gpm/geotiff_crop + output_file: /home/rook/workspace/mint/MINT-Transformation/examples/topoflow4/dev/gpm/output.rts + bounds: "34.221249999999, 7.353749999999, 36.45458333333234, 9.503749999999" + xres_arcsecs: 60 + yres_arcsecs: 60 + # unit multiplier 1 for GPM, 3600 for GLDAS + unit_multiplier: 1 +# unit_multiplier: 3600 \ No newline at end of file diff --git a/examples/dame/filled/topoflow_climate.yml b/examples/dame/filled/topoflow_gpm.yml similarity index 100% rename from examples/dame/filled/topoflow_climate.yml rename to examples/dame/filled/topoflow_gpm.yml diff --git a/examples/dame/scripts/dem_cropping.sh b/examples/dame/scripts/dem_cropping.sh new file mode 100755 index 00000000..e06baaf5 --- /dev/null +++ b/examples/dame/scripts/dem_cropping.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env bash + +./run --config examples/dame/templates/dem_cropping.yml.template -i1 3c5aa587-2e3d-49ef-ad60-532370941e87 -p1 "32.75418" -p2 "8.22206" -p3 "47.98942" -p4 "15.15943" -o1 "output.tiff" diff --git a/examples/dame/templates/dem_cropping.yml.template b/examples/dame/templates/dem_cropping.yml.template new file mode 100644 index 00000000..a62a5289 --- /dev/null +++ b/examples/dame/templates/dem_cropping.yml.template @@ -0,0 +1,37 @@ +version: "1.0" +description: Data transformation to generate daily average data from original GLDAS data sources +inputs: + dataset_id: + comment: DataCatalog Dataset ID for raster file + value: {INPUTS1} + crop_region_xmin: + comment: Target region bounding box xmin coordinate + value: {PARAMS1} + crop_region_ymin: + comment: Target region bounding box ymin coordinate + value: {PARAMS2} + crop_region_xmax: + comment: Target region bounding box xmax coordinate + value: {PARAMS3} + crop_region_ymax: + comment: Target region bounding box ymax coordinate + value: {PARAMS4} + output_file: + comment: Path to output file + value: {OUTPUTS1} +adapters: + read_func: + comment: My geotiff read func adapter + adapter: funcs.DcatReadNoReprFunc + inputs: + dataset_id: $$.dataset_id + my_crop_wrapper: + comment: My cropping func wrapper adapter + adapter: funcs.dem.DEMCropFunc + inputs: + input_file: $.read_func.data_path + output_file: $$.output_file + xmin: $$.crop_region_xmin + ymin: $$.crop_region_ymin + xmax: $$.crop_region_xmax + ymax: $$.crop_region_ymax diff --git a/examples/to-be-fixed/cycles2econ/price/demo_pipeline.py b/examples/to-be-fixed/cycles2econ/price/demo_pipeline.py index 857f7753..c8fad1df 100644 --- a/examples/to-be-fixed/cycles2econ/price/demo_pipeline.py +++ b/examples/to-be-fixed/cycles2econ/price/demo_pipeline.py @@ -46,14 +46,14 @@ UnitTransFunc.I.unit_desired: "$/kg", CSVWriteFunc.I.main_class: "qb:Observation", CSVWriteFunc.I.mapped_columns: {}, - CSVWriteFunc.I.output_file: wdir / "price.csv", + CSVWriteFunc.I.output_path: wdir / "price.csv", VisJsonWriteFunc.I.filter: "@type = 'qb:Observation' and " "sdmx-attribute:refArea.contains('Aweil (Town)') and " "sdmx-dimension:refPeriod = '2016-10-15' and " f"dcat-dimension:thing in {str(set(crop_names.keys()))}", VisJsonWriteFunc.I.main_class: "qb:Observation", VisJsonWriteFunc.I.mapped_columns: {}, - VisJsonWriteFunc.I.output_file: wdir / "visualization.json", + VisJsonWriteFunc.I.output_path: wdir / "visualization.json", } outputs = pipeline.exec(inputs) diff --git a/examples/to-be-fixed/cycles2econ/price/pipeline.py b/examples/to-be-fixed/cycles2econ/price/pipeline.py index 8bc28ad1..2a890c02 100644 --- a/examples/to-be-fixed/cycles2econ/price/pipeline.py +++ b/examples/to-be-fixed/cycles2econ/price/pipeline.py @@ -34,14 +34,14 @@ UnitTransFunc.I.unit_desired: "$/kg", CSVWriteFunc.I.main_class: "qb:Observation", CSVWriteFunc.I.mapped_columns: {}, - CSVWriteFunc.I.output_file: wdir / "price.csv", + CSVWriteFunc.I.output_path: wdir / "price.csv", VisJsonWriteFunc.I.filter: "@type = 'qb:Observation' and " "sdmx-attribute:refArea.contains('Aweil (Town)') and " "sdmx-dimension:refPeriod = '2016-10-15' and " f"dcat-dimension:thing in {str(set(crop_names.keys()))}", VisJsonWriteFunc.I.main_class: "qb:Observation", VisJsonWriteFunc.I.mapped_columns: {}, - VisJsonWriteFunc.I.output_file: wdir / "visualization.json", + VisJsonWriteFunc.I.output_path: wdir / "visualization.json", } outputs = pipeline.exec(inputs) diff --git a/examples/to-be-fixed/cycles2econ/yield/pipeline.py b/examples/to-be-fixed/cycles2econ/yield/pipeline.py index 1766b461..884d4373 100644 --- a/examples/to-be-fixed/cycles2econ/yield/pipeline.py +++ b/examples/to-be-fixed/cycles2econ/yield/pipeline.py @@ -79,7 +79,7 @@ def exec(self) -> dict: TransWrapperFunc.I._2.filter: "@type = 'qb:Observation' and sdmx-dimension:refPeriod = '2016-10-12'", TransWrapperFunc.I.code: wdir / "cycles-to-crop.py", CSVWriteFunc.I.main_class: "qb:Observation", - CSVWriteFunc.I.output_file: wdir / "output.csv", + CSVWriteFunc.I.output_path: wdir / "output.csv", CSVWriteFunc.I.mapped_columns: {}, } diff --git a/examples/to-be-fixed/pihm2netcdf/geotiff_pipeline.py b/examples/to-be-fixed/pihm2netcdf/geotiff_pipeline.py index 6f4e54e1..99ce056b 100644 --- a/examples/to-be-fixed/pihm2netcdf/geotiff_pipeline.py +++ b/examples/to-be-fixed/pihm2netcdf/geotiff_pipeline.py @@ -37,7 +37,7 @@ "2017-12-31 23:59:59", "%Y-%m-%d %H:%M:%S" ), # MintGeoTiffWriteFunc.I.output_file: wdir / "MONTHLY_GRIDDED_SURFACE_INUNDATION_2017.tif", - MintGeoTiffWriteFunc.I.output_file: wdir / "MONTHLY_GRIDDED_SURFACE_INUNDATION_2017", + MintGeoTiffWriteFunc.I.output_path: wdir / "MONTHLY_GRIDDED_SURFACE_INUNDATION_2017", MintGeoTiffWriteFunc.I.is_multiple_files: True, } diff --git a/examples/to-be-fixed/pihm2netcdf/netcdf_pipeline.py b/examples/to-be-fixed/pihm2netcdf/netcdf_pipeline.py index c3272b2e..f8a6641e 100644 --- a/examples/to-be-fixed/pihm2netcdf/netcdf_pipeline.py +++ b/examples/to-be-fixed/pihm2netcdf/netcdf_pipeline.py @@ -30,7 +30,7 @@ PihmMonthlyAverageFloodingFunc.I.mean_space: 0.05, PihmMonthlyAverageFloodingFunc.I.start_time: datetime.strptime("2017-01-01 00:00:00", '%Y-%m-%d %H:%M:%S'), PihmMonthlyAverageFloodingFunc.I.end_time: datetime.strptime("2017-12-31 23:59:59", '%Y-%m-%d %H:%M:%S'), - MintNetCDFWriteFunc.I.output_file: wdir / "MONTHLY_GRIDDED_SURFACE_INUNDATION_2017.nc", + MintNetCDFWriteFunc.I.output_path: wdir / "MONTHLY_GRIDDED_SURFACE_INUNDATION_2017.nc", MintNetCDFWriteFunc.I.title: "Monthly gridded surface inundation for Pongo River in 2017", MintNetCDFWriteFunc.I.comment: "Outputs generated from the workflow", MintNetCDFWriteFunc.I.naming_authority: "edu.isi.workflow", diff --git a/examples/topoflow4/dev/run_gldas.sh b/examples/topoflow4/dev/run_gldas.sh new file mode 100644 index 00000000..d5b76507 --- /dev/null +++ b/examples/topoflow4/dev/run_gldas.sh @@ -0,0 +1,49 @@ +#!/bin/bash + +# This bash scripts run the transformation to produce all GLDAS datasets. Should run it with dotenv + +if [[ -z "${HOME_DIR}" ]]; then + echo "Home directory is not defined. Exit" + exit -1 +fi + +# change the working directory to the current project +cd ${HOME_DIR} + +# define parameters +awash="37.829583333333, 6.654583333333, 39.934583333333, 9.374583333333" +baro="34.221249999999, 7.362083333332, 36.450416666666, 9.503749999999" +shebelle="38.159583333333, 6.319583333333, 43.559583333333, 9.899583333333" +ganale="39.174583333333, 5.527916666666, 41.124583333333, 7.098749999999" +guder="37.149583333333, 8.596250000000, 38.266250000000, 9.904583333333" +muger="37.807916666667, 8.929583333333, 39.032916666667, 10.112916666667" +beko="35.241249999999, 6.967916666666, 35.992916666666, 7.502916666666" +alwero="34.206249999999, 7.415416666666, 35.249583333333, 8.143749999999" +AREAS=("$awash" "$baro" "$shebelle" "$ganale" "$guder" "$muger" "$beko" "$alwero") +AREA_NAMES=("awash" "baro" "shebelle" "ganale" "guder" "muger" "beko" "alwero") + +# START_TIMES +# end_year=2019 + +ARCSECS=(30 60) + +# create a basic command +CONFIG_FILE=$HOME_DIR/examples/topoflow4/dev/tf_climate.gldas_remote.yml +RUN_DIR=$HOME_DIR/data/tf_gldas +CMD="python -m dtran.main exec_pipeline --config $CONFIG_FILE" + +for ((i=0;i<${#AREAS[@]};++i)); do + area=${AREAS[i]} + area_name=${AREA_NAMES[i]} + + for arcsec in "${ARCSECS}"; do + year=2008 + echo --GLDAS.start_time $year-01-01T00:00:00 \ + --GLDAS.end_time $year-01-01T00:00:00 \ + --geotiff_writer.output_dir $RUN_DIR/$year/geotiff \ + --topoflow.cropped_geotiff_dir $RUN_DIR/$year/${area_name}_geotiff \ + --topoflow.xres_arcsecs $arcsec \ + --topoflow.xres_arcsecs $arcsec \ + --topoflow.bounds "\"$area\"" + done +done diff --git a/examples/topoflow4/dev/s00_download_gpm.py b/examples/topoflow4/dev/s00_download_gpm.py new file mode 100644 index 00000000..94b56f04 --- /dev/null +++ b/examples/topoflow4/dev/s00_download_gpm.py @@ -0,0 +1,37 @@ +import subprocess, glob, os, shutil +from tqdm.auto import tqdm +from pathlib import Path + +files = [ + "https://files.mint.isi.edu/s/3RZwyxbi5PpcqeV/download", + "https://files.mint.isi.edu/s/cvY8E1GPC3v8Fi6/download", + "https://files.mint.isi.edu/s/RzERxVVIh2M7Qzc/download", + "https://files.mint.isi.edu/s/DswKIJadJHkwiPo/download", +] +download_dir = "/workspace/mint/MINT-Transformation/data/gpm_download" +dest_dir = "/workspace/mint/MINT-Transformation/data/GPM" + +cmds = [] + +# # download the files +# for i, file in enumerate(files): +# cmds.append(f"wget {file} -O {download_dir}/file_{i}.tar.gz") +# for cmd in tqdm(cmds): +# subprocess.check_call(cmd, shell=True) + +# extract files +# cwd = download_dir +# for file in glob.glob(download_dir + "/*.tar.gz"): +# cmds.append(f"tar -xzf {file}") +# print(cmds[-1]) +# for cmd in tqdm(cmds): +# subprocess.check_call(cmd, shell=True) + +# copy files +cwd = download_dir +for year in tqdm(range(2008, 2021)): + (Path(dest_dir) / str(year)).mkdir(exist_ok=True, parents=True) + download_files = glob.glob(download_dir + f"/*/*3IMERG.{year}*") + print(year, len(download_files)) + for file in download_files: + shutil.move(file, dest_dir + f'/{year}/') \ No newline at end of file diff --git a/examples/topoflow4/dev/s10_run_gldas.py b/examples/topoflow4/dev/s10_run_gldas.py new file mode 100644 index 00000000..83e92bbd --- /dev/null +++ b/examples/topoflow4/dev/s10_run_gldas.py @@ -0,0 +1,86 @@ +import subprocess +import os +from pathlib import Path +from dateutil import parser +from datetime import timedelta, datetime +from dataclasses import dataclass +from tqdm.auto import tqdm + + +HOME_DIR = Path(os.path.abspath(os.environ['HOME_DIR'])) +DOWNLOAD_DIR = Path(os.path.abspath(os.environ['DATA_CATALOG_DOWNLOAD_DIR'])) + +arcsecs = [30, 60] +areas = { + "awash": "37.829583333333, 6.654583333333, 39.934583333333, 9.374583333333", + "baro": "34.221249999999, 7.362083333332, 36.450416666666, 9.503749999999", + "shebelle": "38.159583333333, 6.319583333333, 43.559583333333, 9.899583333333", + "ganale": "39.174583333333, 5.527916666666, 41.124583333333, 7.098749999999", + "guder": "37.149583333333, 8.596250000000, 38.266250000000, 9.904583333333", + "muger": "37.807916666667, 8.929583333333, 39.032916666667, 10.112916666667", + "beko": "35.241249999999, 6.967916666666, 35.992916666666, 7.502916666666", + "alwero": "34.206249999999, 7.415416666666, 35.249583333333, 8.143749999999" +} + +run_dir = HOME_DIR / "data" / "tf_gldas" +commands = [] + +def add_geotif_command(commands, start_time, end_time, geotiff_dir): + writegeotiff_config_file = str(HOME_DIR / "examples" / "topoflow4" / "dev" / "tf_climate_writegeotiff.gldas.yml") + cmd = f"""dotenv run python -m dtran.main exec_pipeline --config {writegeotiff_config_file} \ + --dataset.start_time={start_time} \ + --dataset.end_time={end_time} \ + --geotiff_writer.output_dir={geotiff_dir} \ + --geotiff_writer.skip_on_exist=true \ + """.strip() + commands.append(cmd) + + +def add_rts_command(commands, start_time, end_time, geotiff_dir, geotiff_dir_crop, output_file, arcsec, area): + makerts_config_file = str(HOME_DIR / "examples" / "topoflow4" / "dev" / "tf_climate_make_rts.yml") + cmd = f"""dotenv run python -m dtran.main exec_pipeline --config {makerts_config_file} \ + --topoflow.geotiff_files='{geotiff_dir}*/*.tif' \ + --topoflow.cropped_geotiff_dir={geotiff_dir_crop} \ + --topoflow.output_file={output_file} \ + --topoflow.skip_crop_on_exist=true \ + --topoflow.xres_arcsecs={arcsec} \ + --topoflow.yres_arcsecs={arcsec} \ + --topoflow.bounds="{area}" \ + --topoflow.unit_multiplier=3600 + """.strip() + commands.append(cmd) + + +for year in range(2016, 2020): + for month in range(1, 13): + s0 = parser.parse(f"{year}-{month:02d}-01T00:00:00") + if month == 12: + s1 = s0.replace(day=31, hour=23, minute=59, second=59) + else: + s1 = s0.replace(month=s0.month + 1) - timedelta(seconds=1) + + start_time = s0.isoformat() + end_time = s1.isoformat() + geotiff_dir = str(run_dir / str(year) / f"data_m{month:02d}") + add_geotif_command(commands, start_time, end_time, geotiff_dir) + + for area_name, area in areas.items(): + for arcsec in arcsecs: + geotiff_dir_crop = str(run_dir / str(year) / area_name / f"geotiff_crop_{arcsec}") + output_file = str(run_dir / str(year) / area_name / f"output_r{arcsec}_m{month:02d}.rts") + add_rts_command(commands, start_time, end_time, geotiff_dir, geotiff_dir_crop, output_file, arcsec, area) + + start_time = f"{year}-01-01T00:00:00" + end_time = f"{year}-12-31T23:59:59" + geotiff_dir = str(run_dir / str(year) / f"data_m") + + for area_name, area in areas.items(): + for arcsec in arcsecs: + geotiff_dir_crop = str(run_dir / str(year) / area_name / f"geotiff_crop_{arcsec}") + output_file = str(run_dir / str(year) / area_name / f"output_r{arcsec}.rts") + add_rts_command(commands, start_time, end_time, geotiff_dir, geotiff_dir_crop, output_file, arcsec, area) + + +commands = commands[:] +for cmd in tqdm(commands, desc="run commands"): + output = subprocess.check_output(cmd, shell=True, cwd=str(HOME_DIR)) \ No newline at end of file diff --git a/examples/topoflow4/dev/s11_run_gpm.py b/examples/topoflow4/dev/s11_run_gpm.py new file mode 100644 index 00000000..54b4e982 --- /dev/null +++ b/examples/topoflow4/dev/s11_run_gpm.py @@ -0,0 +1,101 @@ +import subprocess +import os +from pathlib import Path +from dateutil import parser +from datetime import timedelta, datetime +from dataclasses import dataclass +from tqdm.auto import tqdm + + +HOME_DIR = Path(os.path.abspath(os.environ['HOME_DIR'])) +DOWNLOAD_DIR = HOME_DIR / "data/GPM" + +arcsecs = [30, 60] +areas = { + "awash": "37.829583333333, 6.654583333333, 39.934583333333, 9.374583333333", + "baro": "34.221249999999, 7.362083333332, 36.450416666666, 9.503749999999", + "shebelle": "38.159583333333, 6.319583333333, 43.559583333333, 9.899583333333", + "ganale": "39.174583333333, 5.527916666666, 41.124583333333, 7.098749999999", + "guder": "37.149583333333, 8.596250000000, 38.266250000000, 9.904583333333", + "muger": "37.807916666667, 8.929583333333, 39.032916666667, 10.112916666667", + "beko": "35.241249999999, 6.967916666666, 35.992916666666, 7.502916666666", + "alwero": "34.206249999999, 7.415416666666, 35.249583333333, 8.143749999999" +} + +run_dir = HOME_DIR / "data" / "tf_gpm" + +def add_geotif_command(commands, year, month, geotiff_dir): + writegeotiff_config_file = str(HOME_DIR / "examples" / "topoflow4" / "dev" / "tf_climate_writegeotiff.gpm.yml") + resource_path = str(DOWNLOAD_DIR / str(year) / f"*3IMERG.{year}{month:02d}*") + gpm_file = str(HOME_DIR / "examples/topoflow4/dev/gpm.yml") + + cmd = f"""dotenv run python -m dtran.main exec_pipeline --config {writegeotiff_config_file} \ + --dataset.resource_path={resource_path} \ + --dataset.repr_file={gpm_file} \ + --geotiff_writer.output_dir={geotiff_dir} \ + --geotiff_writer.skip_on_exist=true \ + """.strip() + commands.append(cmd) + + +def add_rts_command(commands, geotiff_dir, geotiff_dir_crop, output_file, arcsec, area): + makerts_config_file = str(HOME_DIR / "examples" / "topoflow4" / "dev" / "tf_climate_make_rts.yml") + cmd = f"""dotenv run python -m dtran.main exec_pipeline --config {makerts_config_file} \ + --topoflow.geotiff_files='{geotiff_dir}*/*.tif' \ + --topoflow.cropped_geotiff_dir={geotiff_dir_crop} \ + --topoflow.output_file={output_file} \ + --topoflow.skip_crop_on_exist=true \ + --topoflow.xres_arcsecs={arcsec} \ + --topoflow.yres_arcsecs={arcsec} \ + --topoflow.bounds="{area}" \ + --topoflow.unit_multiplier=1 + """.strip() + commands.append(cmd) + + +for year in range(2010, 2020): + commands = [] + (run_dir / str(year)).mkdir(exist_ok=True, parents=True) + for month in range(1, 13): + s0 = parser.parse(f"{year}-{month:02d}-01T00:00:00") + if month == 12: + s1 = s0.replace(day=31, hour=23, minute=59, second=59) + else: + s1 = s0.replace(month=s0.month + 1) - timedelta(seconds=1) + + start_time = s0.isoformat() + end_time = s1.isoformat() + geotiff_dir = str(run_dir / str(year) / f"data_m{month:02d}") + add_geotif_command(commands, year, month, geotiff_dir) + + for area_name, area in areas.items(): + for arcsec in arcsecs: + geotiff_dir_crop = str(run_dir / str(year) / area_name / f"geotiff_crop_{arcsec}") + output_file = str(run_dir / str(year) / area_name / f"output_r{arcsec}_m{month:02d}.rts") + add_rts_command(commands, geotiff_dir, geotiff_dir_crop, output_file, arcsec, area) + + start_time = f"{year}-01-01T00:00:00" + end_time = f"{year}-12-31T23:59:59" + geotiff_dir = str(run_dir / str(year) / f"data_m") + + for area_name, area in areas.items(): + for arcsec in arcsecs: + geotiff_dir_crop = str(run_dir / str(year) / area_name / f"geotiff_crop_{arcsec}") + output_file = str(run_dir / str(year) / area_name / f"output_r{arcsec}.rts") + add_rts_command(commands, geotiff_dir, geotiff_dir_crop, output_file, arcsec, area) + + # compress the data + for area_name in areas.keys(): + input_dir = run_dir / str(year) / area_name + commands.append(f"cd {input_dir.parent} && tar -czf {input_dir.name}.tar.gz {input_dir.name}") + + # upload the file + commands.append(f""" + dotenv run python dtran/dcat/scripts/upload_files_in_batch.py upload_files \ + --server=OWNCLOUD --dir={run_dir / str(year)} \ + --ext=tar.gz --upload_dir=Topoflow/GPM_version_2/{year} + """) + commands.append(f"rm -rf {run_dir / str(year)}") + + for cmd in tqdm(commands, desc=f"run commands for year {year}"): + output = subprocess.check_output(cmd, shell=True, cwd=str(HOME_DIR)) \ No newline at end of file diff --git a/examples/topoflow4/dev/s20_verify_result.py b/examples/topoflow4/dev/s20_verify_result.py new file mode 100644 index 00000000..c4da8bf3 --- /dev/null +++ b/examples/topoflow4/dev/s20_verify_result.py @@ -0,0 +1,57 @@ +import subprocess +import os +from pathlib import Path +from dateutil import parser +from datetime import timedelta, datetime +from dataclasses import dataclass +from tqdm.auto import tqdm +import numpy as np + +HOME_DIR = Path(os.path.abspath(os.environ['HOME_DIR'])) +DOWNLOAD_DIR = Path(os.path.abspath(os.environ['DATA_CATALOG_DOWNLOAD_DIR'])) + +# gldas is 8, gpm is 48 +n_files_per_day = 48 +run_dir = HOME_DIR / "data" / "tf_gpm" + + +def get_size(infile): + data = np.fromfile(infile, dtype=np.float32) + return data.shape[0] + +n_month_days = [ + (parser.parse(f'2000-{i+1:02}-01T00:00') - parser.parse(f'2000-{i:02}-01T00:00')).days + for i in range(1, 12) +] +n_month_days.append(31) + +for year in range(2008, 2009): + n_year_days = (parser.parse(f'{year+1}-01-01T00:00') - parser.parse(f'{year}-01-01T00:00')).days + n_month_days[1] = (parser.parse(f'{year}-03-01T00:00') - parser.parse(f'{year}-02-01T00:00')).days + + for area_dir in (run_dir / str(year)).iterdir(): + area_name = area_dir.name + if area_name.startswith("data_m"): + continue + print(f"check {year} area {area_name}") + + for arcsec in [30, 60]: + with open(area_dir / f"output_r{arcsec}.rti", "r") as f: + lines = f.readlines() + n_cols = -1 + n_rows = -1 + for line in lines: + if line.startswith("Number of columns:"): + n_cols = int(line.replace("Number of columns:", "").strip()) + if line.startswith("Number of rows:"): + n_rows = int(line.replace("Number of rows:", "").strip()) + assert n_cols != -1 and n_rows != -1 + + infile = area_dir / f"output_r{arcsec}.rts" + size = get_size(infile) + assert size == n_rows * n_cols * n_files_per_day * n_year_days, f"Incorrect {infile}" + + for month in range(1, 13): + infile = area_dir / f"output_r{arcsec}_m{month:02}.rts" + size = get_size(infile) + assert size == n_rows * n_cols * n_files_per_day * n_month_days[month-1] \ No newline at end of file diff --git a/examples/topoflow4/dev/s30_upload.py b/examples/topoflow4/dev/s30_upload.py new file mode 100644 index 00000000..294ad7b0 --- /dev/null +++ b/examples/topoflow4/dev/s30_upload.py @@ -0,0 +1 @@ +dotenv run python dtran/dcat/scripts/upload_files_in_batch.py upload_files --server=OWNCLOUD --dir=data/tf_gldas --ext=tar.gz --upload_dir=Topoflow/GLDAS_version_2 \ No newline at end of file diff --git a/examples/topoflow4/dev/tf_climate.gldas_remote.yml b/examples/topoflow4/dev/tf_climate.gldas_remote.yml index e965072d..1477ca1e 100644 --- a/examples/topoflow4/dev/tf_climate.gldas_remote.yml +++ b/examples/topoflow4/dev/tf_climate.gldas_remote.yml @@ -1,23 +1,22 @@ version: "1" adapters: - weather_data: + dataset: comment: | Weather dataset - adapter: funcs.DcatReadFunc + adapter: funcs.readers.dcat_read_func.DcatReadStreamFunc inputs: # gldas dataset_id: 5babae3f-c468-4e01-862e-8b201468e3b5 start_time: 2014-08-01 00:00:00 end_time: 2014-09-01 00:00:00 - lazy_load_enabled: false # override_drepr: /home/rook/workspace/mint/MINT-Transformation/examples/topoflow4/dev/gldas.yml geotiff_writer: adapter: funcs.GeoTiffWriteFunc inputs: - dataset: $.weather_data.data + dataset: $.dataset.data variable_name: atmosphere_water__rainfall_mass_flux output_dir: /home/rook/workspace/mint/MINT-Transformation/examples/topoflow4/dev/data/geotiff - tf_trans: + topoflow: adapter: funcs.topoflow.topoflow_climate.Topoflow4ClimateWriteFunc inputs: geotiff_files: $.geotiff_writer.output_files diff --git a/examples/topoflow4/dev/tf_climate_make_rts.yml b/examples/topoflow4/dev/tf_climate_make_rts.yml new file mode 100644 index 00000000..0289d12d --- /dev/null +++ b/examples/topoflow4/dev/tf_climate_make_rts.yml @@ -0,0 +1,13 @@ +version: "1" +adapters: + topoflow: + adapter: funcs.topoflow.topoflow_climate.Topoflow4ClimateWriteFunc + inputs: + geotiff_files: /home/rook/workspace/mint/MINT-Transformation/examples/topoflow4/dev/data/geotiff/*.tif + cropped_geotiff_dir: /home/rook/workspace/mint/MINT-Transformation/examples/topoflow4/dev/data/geotiff_crop + output_file: /home/rook/workspace/mint/MINT-Transformation/examples/topoflow4/dev/data/output.rts + bounds: "34.221249999999, 7.353749999999, 36.45458333333234, 9.503749999999" + xres_arcsecs: 60 + yres_arcsecs: 60 + # required for GLDAS (unit transformation = 3600) + unit_multiplier: 1 \ No newline at end of file diff --git a/examples/topoflow4/dev/tf_climate_writegeotiff.gldas.yml b/examples/topoflow4/dev/tf_climate_writegeotiff.gldas.yml new file mode 100644 index 00000000..905ed43c --- /dev/null +++ b/examples/topoflow4/dev/tf_climate_writegeotiff.gldas.yml @@ -0,0 +1,16 @@ +version: "1" +adapters: + dataset: + comment: | + Weather dataset + adapter: funcs.readers.dcat_read_func.DcatReadStreamFunc + inputs: + dataset_id: 5babae3f-c468-4e01-862e-8b201468e3b5 + start_time: 2014-08-01 00:00:00 + end_time: 2014-09-01 00:00:00 + geotiff_writer: + adapter: funcs.GeoTiffWriteFunc + inputs: + dataset: $.dataset.data + variable_name: atmosphere_water__rainfall_mass_flux + output_dir: /home/rook/workspace/mint/MINT-Transformation/examples/topoflow4/dev/data/geotiff \ No newline at end of file diff --git a/examples/topoflow4/dev/tf_climate_writegeotiff.gpm.yml b/examples/topoflow4/dev/tf_climate_writegeotiff.gpm.yml new file mode 100644 index 00000000..cf3b03e4 --- /dev/null +++ b/examples/topoflow4/dev/tf_climate_writegeotiff.gpm.yml @@ -0,0 +1,15 @@ +version: "1" +adapters: + dataset: + comment: | + Weather dataset + adapter: funcs.ReadFunc + inputs: + resource_path: /home/rook/workspace/mint/MINT-Transformation/examples/topoflow4/dev/gpm/download/*.nc4 + repr_file: /home/rook/workspace/mint/MINT-Transformation/examples/topoflow4/dev/gpm.yml + geotiff_writer: + adapter: funcs.GeoTiffWriteFunc + inputs: + dataset: $.dataset.data + variable_name: atmosphere_water__precipitation_mass_flux + output_dir: /home/rook/workspace/mint/MINT-Transformation/examples/topoflow4/dev/gpm/geotiff diff --git a/funcs/cycles/gldas2cycles.py b/funcs/cycles/gldas2cycles.py index 7522577b..be8cc6ad 100644 --- a/funcs/cycles/gldas2cycles.py +++ b/funcs/cycles/gldas2cycles.py @@ -1,4 +1,3 @@ -import argparse import math import os import shutil @@ -8,6 +7,7 @@ import numpy as np from netCDF4 import Dataset + from dtran import IFunc, ArgType from dtran.ifunc import IFuncType from dtran.metadata import Metadata @@ -84,10 +84,10 @@ def change_metadata( return metadata -def Closest(lat, lon, path): +def closest(lat, lon, path): elevation_fp = path + "/GLDASp4_elevation_025d.nc4" - nc = Dataset(elevation_fp, "r") + nc = xr.ope(elevation_fp, "r") best_y = (np.abs(nc.variables["lat"][:] - lat)).argmin() best_x = (np.abs(nc.variables["lon"][:] - lon)).argmin() @@ -101,7 +101,7 @@ def Closest(lat, lon, path): ) -def ReadVar(y, x, nc_name): +def read_var(y, x, nc_name): with Dataset(nc_name, "r") as nc: _prcp = nc["Rainf_f_tavg"][0, y, x] _temp = nc["Tair_f_inst"][0, y, x] @@ -148,7 +148,7 @@ def process_day(t, y, x, path): for nc_name in os.listdir(nc_path): if nc_name.endswith(".nc4"): - (_prcp, _temp, _wind, _solar, _rh) = ReadVar( + (_prcp, _temp, _wind, _solar, _rh) = read_var( y, x, os.path.join(nc_path, nc_name) ) @@ -235,7 +235,7 @@ def gldas2cycles( for lat, lon, fname in coords: print("Processing data for {0}, {1}".format(lat, lon)) - (y, x, grid_lat, grid_lon, elevation) = Closest(lat, lon, data_path) + (y, x, grid_lat, grid_lon, elevation) = closest(lat, lon, data_path) if grid_lat < 0.0: lat_str = "%.2fS" % (abs(grid_lat)) @@ -256,8 +256,8 @@ def gldas2cycles( Path(output_path).mkdir(parents=True, exist_ok=True) # fname = "met" + lat_str + "x" + lon_str + ".weather" outfp = open(os.path.join(output_path, fname), "w") - outfp.write("LATITUDE %.2f\n" % (grid_lat)) - outfp.write("ALTITUDE %.2f\n" % (elevation)) + outfp.write("LATITUDE %.2f\n" % grid_lat) + outfp.write("ALTITUDE %.2f\n" % elevation) outfp.write("SCREENING_HEIGHT 2\n") outfp.write( "YEAR DOY PP TX TN SOLAR RHX RHN WIND\n" diff --git a/funcs/dcat_write_func.py b/funcs/dcat_write_func.py index 5ce46331..10d0a5bb 100644 --- a/funcs/dcat_write_func.py +++ b/funcs/dcat_write_func.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- import subprocess from pathlib import Path -from typing import Union +from typing import Union, Optional, Dict import os import ujson as json @@ -10,6 +10,7 @@ from dtran.argtype import ArgType from dtran.dcat.api import DCatAPI from dtran.ifunc import IFunc, IFuncType +from dtran.metadata import Metadata class DcatWriteFunc(IFunc): @@ -24,10 +25,9 @@ class DcatWriteFunc(IFunc): } outputs = {"data": ArgType.String} friendly_name: str = "Data Catalog Writer" - func_type = IFuncType.WRITER example = { "resource_path": "$.my_graph_write_func.output_file", - "metadata": '[{"name": "WFP Food Prices - South Sudan", "description": "Food price dataset for South Sudan (2012-2019)"}]' + "metadata": '[{"name": "WFP Food Prices - South Sudan", "description": "Food price dataset for South Sudan (2012-2019)"}]', } PROVENANCE_ID = "b3e79dc2-8fa1-4203-ac82-b5267925191f" @@ -59,28 +59,28 @@ def exec(self) -> dict: def validate(self) -> bool: return True + def change_metadata( + self, metadata: Optional[Dict[str, Metadata]] + ) -> Dict[str, Metadata]: + pass + class DcatWriteMetadataFunc(IFunc): id = "dcat_write_metadata_func" description = """ A data catalog metadata writer adapter. """ func_type = IFuncType.WRITER - inputs = { - "metadata": ArgType.String, - "dataset_id": ArgType.String - } + inputs = {"metadata": ArgType.String, "dataset_id": ArgType.String} outputs = {"data": ArgType.String} friendly_name: str = "Data Catalog Metadata Writer" example = { "metadata": '[{"name": "WFP Food Prices - South Sudan", "description": "Food price dataset for South Sudan (2012-2019)"}]', - "dataset_id": "ea0e86f3-9470-4e7e-a581-df85b4a7075d" + "dataset_id": "ea0e86f3-9470-4e7e-a581-df85b4a7075d", } PROVENANCE_ID = "b3e79dc2-8fa1-4203-ac82-b5267925191f" - def __init__( - self, metadata: str, dataset_id: str - ): + def __init__(self, metadata: str, dataset_id: str): self.metadata = json.loads(metadata) self.dataset_id = dataset_id self.dcat = DCatAPI.get_instance() @@ -93,3 +93,8 @@ def exec(self) -> dict: def validate(self) -> bool: return True + + def change_metadata( + self, metadata: Optional[Dict[str, Metadata]] + ) -> Dict[str, Metadata]: + pass diff --git a/funcs/dem/__init__.py b/funcs/dem/__init__.py new file mode 100644 index 00000000..45569351 --- /dev/null +++ b/funcs/dem/__init__.py @@ -0,0 +1 @@ +from .crop_func import DEMCropFunc \ No newline at end of file diff --git a/funcs/dem/crop_func.py b/funcs/dem/crop_func.py new file mode 100644 index 00000000..fec93dc5 --- /dev/null +++ b/funcs/dem/crop_func.py @@ -0,0 +1,51 @@ +from typing import Optional, Dict + +import rioxarray + +from dtran import IFunc, ArgType +from dtran.ifunc import IFuncType +from dtran.metadata import Metadata + + +class DEMCropFunc(IFunc): + id = "dem_crop_func" + description = """ A reader-transformation-writer multi-adapter. + Crop a raster file by bounding box. + """ + inputs = { + "input_file": ArgType.String, + "output_file": ArgType.String, + "xmin": ArgType.Number, + "ymin": ArgType.Number, + "xmax": ArgType.Number, + "ymax": ArgType.Number, + } + outputs = {"output_file": ArgType.String} + friendly_name: str = "DEMCrop" + func_type = IFuncType.MODEL_TRANS + + def __init__(self, input_file, output_file, xmin, ymin, xmax, ymax): + self.input_file = input_file + self.output_file = output_file + self.xmin = xmin + self.ymin = ymin + self.xmax = xmax + self.ymax = ymax + + def validate(self) -> bool: + return True + + def exec(self) -> dict: + ds = rioxarray.open_rasterio(self.input_file) + mask_lon = (ds.x >= self.xmin) & (ds.x <= self.xmax) + mask_lat = (ds.y >= self.ymin) & (ds.y <= self.ymax) + + cropped_ds = ds.where(mask_lon & mask_lat, drop=True) + cropped_ds.rio.to_raster(self.output_file) + + return {"output_file": self.output_file} + + def change_metadata( + self, metadata: Optional[Dict[str, Metadata]] + ) -> Dict[str, Metadata]: + return metadata diff --git a/funcs/readers/dcat_read_func.py b/funcs/readers/dcat_read_func.py index 3adeb2a5..c715c225 100644 --- a/funcs/readers/dcat_read_func.py +++ b/funcs/readers/dcat_read_func.py @@ -14,13 +14,13 @@ from functools import partial from playhouse.kv import KeyValue from peewee import SqliteDatabase, Model, UUIDField, IntegerField, BooleanField, BigIntegerField, DoesNotExist - from drepr import DRepr from drepr.outputs import ArrayBackend, GraphBackend from dtran.argtype import ArgType from dtran.backend import ShardedBackend, ShardedClassID, LazyLoadBackend from dtran.dcat.api import DCatAPI from dtran.ifunc import IFunc, IFuncType +from tqdm.auto import tqdm DATA_CATALOG_DOWNLOAD_DIR = os.path.abspath(os.environ["DATA_CATALOG_DOWNLOAD_DIR"]) if os.environ['NO_CHECK_CERTIFICATE'].lower().strip() == 'true': @@ -54,8 +54,8 @@ class ResourceManager: instance = None def __init__(self): - self.max_capacity = 200 * UNITS_MAPPING['MB'] - self.max_clear_size = 100 * UNITS_MAPPING['MB'] + self.max_capacity = 32 * UNITS_MAPPING['GB'] + self.max_clear_size = 20 * UNITS_MAPPING['GB'] assert self.max_capacity >= self.max_clear_size, "max_capacity cannot be less than max_clear_size" self.poll_interval = 10 self.compressed_resource_types = {".zip", ".tar.gz", ".tar"} @@ -143,14 +143,14 @@ def download(self, resource_id: str, resource_metadata: Dict[str, str], should_r DcatReadFunc.logger.debug(f"Downloading resource {resource_id} ...") if is_compressed: temp_path = path + resource_metadata['resource_type'] - subprocess.check_call(f"wget -q \"{resource_metadata['resource_data_url']}\" -O {temp_path}", shell=True, close_fds=False) + subprocess.check_call(f"wget \"{resource_metadata['resource_data_url']}\" -O {temp_path}", shell=True) self.uncompress(resource_metadata['resource_type'], path) # adjust required_size when the resource is compressed required_size = -resource.size required_size += sum(f.stat().st_size for f in Path(path).rglob('*')) + Path(path).stat().st_size Path(temp_path).unlink() else: - subprocess.check_call(f"wget -q \"{resource_metadata['resource_data_url']}\" -O {path}", shell=True, close_fds=False) + subprocess.check_call(f"wget \"{resource_metadata['resource_data_url']}\" -O {path}", shell=True) required_size = 0 with self.db.atomic('EXCLUSIVE'): @@ -195,7 +195,7 @@ def clear(self) -> int: return size def uncompress(self, resource_type: str, path: Union[Path, str]): - subprocess.check_call(f"unzip {path + resource_type} -d {path}", shell=True, close_fds=False) + subprocess.check_call(f"unzip {path + resource_type} -d {path}", shell=True) # flatten the structure (max two levels) for fpath in Path(path).iterdir(): if fpath.is_dir(): @@ -340,3 +340,98 @@ def __del__(self): def validate(self) -> bool: return True + + +# TODO: this is a temporary class, should be remove before move on with existing pipeline +class DcatReadStreamFunc(IFunc): + id = "dcat_read_stream_func" + description = """ An entry point in the pipeline. + Fetches a dataset and its metadata from the MINT Data-Catalog. + """ + func_type = IFuncType.READER + friendly_name: str = "Data Catalog Reader" + inputs = { + "dataset_id": ArgType.String, + "start_time": ArgType.DateTime(optional=True), + "end_time": ArgType.DateTime(optional=True), + "should_redownload": ArgType.Boolean(optional=True), + "override_drepr": ArgType.String(optional=True), + } + outputs = {"data": ArgType.DataSet(None), "data_path": ArgType.ListString(optional=True)} + example = { + "dataset_id": "ea0e86f3-9470-4e7e-a581-df85b4a7075d", + "start_time": "2020-03-02T12:30:55", + "end_time": "2020-03-02T12:30:55", + "should_redownload": "False", + "override_drepr": "/tmp/model.yml" + } + logger = logging.getLogger(__name__) + + def __init__(self, + dataset_id: str, + start_time: datetime = None, + end_time: datetime = None, + should_redownload: bool = False, + override_drepr: str = None + ): + self.dataset_id = dataset_id + self.should_redownload = should_redownload + self.resource_manager = ResourceManager.get_instance() + dataset = DCatAPI.get_instance().find_dataset_by_id(dataset_id) + + assert ('resource_repr' in dataset['metadata']) or ('dataset_repr' in dataset['metadata']), \ + "Dataset is missing both 'resource_repr' and 'dataset_repr'" + assert not (('resource_repr' in dataset['metadata']) and ('dataset_repr' in dataset['metadata'])), \ + "Dataset has both 'resource_repr' and 'dataset_repr'" + + resources = DCatAPI.get_instance().find_resources_by_dataset_id(dataset_id, start_time, end_time) + + self.resources = OrderedDict() + if 'resource_repr' in dataset['metadata']: + if override_drepr is not None: + self.drepr = DRepr.parse_from_file(override_drepr) + else: + self.drepr = DRepr.parse(dataset['metadata']['resource_repr']) + for resource in resources: + self.resources[resource['resource_id']] = {key: resource[key] for key in + {'resource_data_url', 'resource_type'}} + self.repr_type = 'resource_repr' + else: + # TODO: fix me!! + assert len(resources) == 1 + self.resources[resources[0]['resource_id']] = {key: resources[0][key] for key in + {'resource_data_url', 'resource_type'}} + if override_drepr is not None: + self.drepr = DRepr.parse_from_file(override_drepr) + else: + self.drepr = DRepr.parse(dataset['metadata']['dataset_repr']) + self.repr_type = 'dataset_repr' + + self.logger.debug(f"Found key '{self.repr_type}'") + + async def exec(self) -> dict: + # TODO: fix me! incorrect way to choose backend + if self.get_preference("data") is None or self.get_preference("data") == 'array': + backend = ArrayBackend + else: + backend = GraphBackend + + if self.repr_type == 'dataset_repr': + resource_id, resource_metadata = list(self.resources.items())[0] + resource_file = self.resource_manager.download(resource_id, resource_metadata, self.should_redownload) + yield {"data": backend.from_drepr(self.drepr, resource_file), "data_path": [resource_file]} + else: + # data_path is location of the resources in disk, for pipeline that wants to download the file + for resource_id, resource_metadata in tqdm(self.resources.items(), total=len(self.resources), desc='dcat_read'): + resource_file = self.resource_manager.download(resource_id, resource_metadata, self.should_redownload) + yield { + "data": backend.from_drepr(self.drepr, resource_file), + "data_path": [resource_file] + } + + def __del__(self): + for resource_id, resource_metadata in self.resources.items(): + self.resource_manager.unlink(resource_id) + + def validate(self) -> bool: + return True diff --git a/funcs/readers/dcat_read_no_repr.py b/funcs/readers/dcat_read_no_repr.py index 855fbe11..60373a72 100644 --- a/funcs/readers/dcat_read_no_repr.py +++ b/funcs/readers/dcat_read_no_repr.py @@ -3,10 +3,21 @@ import subprocess from pathlib import Path +from typing import Dict, Optional from dtran.argtype import ArgType from dtran.ifunc import IFunc, IFuncType -from funcs.readers.dcat_read_func import DCatAPI +from dtran.metadata import Metadata +from funcs.readers.dcat_read_func import DCatAPI, ResourceManager +import os + +DATA_CATALOG_DOWNLOAD_DIR = os.path.abspath(os.environ["DATA_CATALOG_DOWNLOAD_DIR"]) +if os.environ["NO_CHECK_CERTIFICATE"].lower().strip() == "true": + DOWNLOAD_CMD = "wget --no-check-certificate" +else: + DOWNLOAD_CMD = "wget" + +Path(DATA_CATALOG_DOWNLOAD_DIR).mkdir(exist_ok=True, parents=True) class DcatReadNoReprFunc(IFunc): @@ -17,45 +28,34 @@ class DcatReadNoReprFunc(IFunc): func_type = IFuncType.READER friendly_name: str = " Data Catalog Reader Without repr File" inputs = {"dataset_id": ArgType.String} - outputs = {"data": ArgType.String} - example = { - "dataset_id": "05c43c58-ed42-4830-9b1f-f01059c4b96f" - } + outputs = {"data_path": ArgType.String} + example = {"dataset_id": "05c43c58-ed42-4830-9b1f-f01059c4b96f"} def __init__(self, dataset_id: str): - # TODO: move to a diff arch (pointer to Data-Catalog URL) - DCAT_URL = "https://api.mint-data-catalog.org" - self.dataset_id = dataset_id + self.resource = [] - resource_results = DCatAPI.get_instance(DCAT_URL).find_resources_by_dataset_id( - dataset_id - ) - # TODO: fix me!! - assert len(resource_results) == 1 - resource_ids = {"default": resource_results[0]["resource_data_url"]} - Path("/tmp/dcat_read_func").mkdir(exist_ok=True, parents=True) - - self.resources = {} - for resource_id, resource_url in resource_ids.items(): - file_full_path = f"/tmp/dcat_read_func/{resource_id}.dat" - subprocess.check_call( - f"wget {resource_url} -O {file_full_path}", shell=True - ) - self.resources[resource_id] = file_full_path + resources = DCatAPI.get_instance().find_resources_by_dataset_id(dataset_id) + + self.resource_manager = ResourceManager.get_instance() + + assert len(resources) == 1 + + self.resource_id = resources[0]["resource_id"] + self.resource_metadata = { + key: resources[0][key] for key in {"resource_data_url", "resource_type"} + } def exec(self) -> dict: - input_dir_full_path = f"/data/{self.dataset_id}" - for resource in self.resources.values(): - if not Path(input_dir_full_path).exists(): - print("Not exists") - Path(input_dir_full_path).mkdir(parents=True) - else: - subprocess.check_output(f"rm -rf {input_dir_full_path}/*", shell=True) - subprocess.check_call( - f"tar -xvzf {resource} -C {input_dir_full_path}/", shell=True - ) - return {"data": input_dir_full_path} + data_path = self.resource_manager.download( + self.resource_id, self.resource_metadata, should_redownload=False + ) + return {"data_path": data_path} def validate(self) -> bool: return True + + def change_metadata( + self, metadata: Optional[Dict[str, Metadata]] + ) -> Dict[str, Metadata]: + return metadata diff --git a/funcs/readers/read_func.py b/funcs/readers/read_func.py index c96b50e8..21d6acbb 100644 --- a/funcs/readers/read_func.py +++ b/funcs/readers/read_func.py @@ -56,3 +56,48 @@ def exec(self) -> dict: def validate(self) -> bool: return True + + +class ReadStreamFunc(IFunc): + id = "read_func" + description = """ An entry point in the pipeline. + Reads an input file (or multiple files) and a yml file describing the D-REPR layout of each file. + Return a Dataset object + """ + friendly_name: str = "Local File Reader" + func_type = IFuncType.READER + inputs = {"repr_file": ArgType.FilePath, "resource_path": ArgType.FilePath} + outputs = {"data": ArgType.DataSet(None)} + example = { + "repr_file": "./wfp_food_prices_south-sudan.repr.yml", + "resources": "./wfp_food_prices_south-sudan.csv", + } + + def __init__(self, repr_file: Union[str, Path], resource_path: Union[str, Path]): + resource_path = str(resource_path) + + self.repr = DRepr.parse_from_file(str(repr_file)) + self.resources = glob.glob(resource_path) + + assert len(self.resources) > 0 + + def exec(self) -> dict: + if self.get_preference("data") is None or self.get_preference("data") == "array": + backend = ArrayBackend + else: + backend = GraphBackend + + if len(self.resources) == 1: + return { + "data": backend.from_drepr(self.repr, self.resources[0]) + } + else: + dataset = ShardedBackend(len(self.resources)) + for resource in self.resources: + dataset.add( + backend.from_drepr(self.repr, resource, dataset.inject_class_id) + ) + return {"data": dataset} + + def validate(self) -> bool: + return True diff --git a/funcs/topoflow/topoflow/__init__.py b/funcs/topoflow/topoflow/__init__.py index 85299cc8..cd326d2f 100644 --- a/funcs/topoflow/topoflow/__init__.py +++ b/funcs/topoflow/topoflow/__init__.py @@ -1,5 +1,5 @@ -SILENT = False +SILENT = True if not(SILENT): print('Importing TopoFlow 3.6 packages:') print(' topoflow.utils') diff --git a/funcs/topoflow/topoflow/components/tests/__init__.py b/funcs/topoflow/topoflow/components/tests/__init__.py index 85299cc8..cd326d2f 100644 --- a/funcs/topoflow/topoflow/components/tests/__init__.py +++ b/funcs/topoflow/topoflow/components/tests/__init__.py @@ -1,5 +1,5 @@ -SILENT = False +SILENT = True if not(SILENT): print('Importing TopoFlow 3.6 packages:') print(' topoflow.utils') diff --git a/funcs/topoflow/topoflow/framework/emeli.py b/funcs/topoflow/topoflow/framework/emeli.py index fb6f5b8d..0158a442 100644 --- a/funcs/topoflow/topoflow/framework/emeli.py +++ b/funcs/topoflow/topoflow/framework/emeli.py @@ -187,7 +187,7 @@ parent_dir = parent_dir + os.sep examples_dir = examples_dir + os.sep -SILENT = False +SILENT = True if not(SILENT): # print ' ' print('Paths for this package:') diff --git a/funcs/topoflow/topoflow/framework/emeli_with_cfunits.py b/funcs/topoflow/topoflow/framework/emeli_with_cfunits.py index ea281b4a..9be17881 100644 --- a/funcs/topoflow/topoflow/framework/emeli_with_cfunits.py +++ b/funcs/topoflow/topoflow/framework/emeli_with_cfunits.py @@ -183,7 +183,7 @@ parent_dir = parent_dir + os.sep examples_dir = examples_dir + os.sep -SILENT = False +SILENT = True if not(SILENT): # print ' ' print('Paths for this package:') diff --git a/funcs/topoflow/topoflow_climate.py b/funcs/topoflow/topoflow_climate.py index 2698d231..21328e74 100644 --- a/funcs/topoflow/topoflow_climate.py +++ b/funcs/topoflow/topoflow_climate.py @@ -24,13 +24,14 @@ class Topoflow4ClimateWriteFunc(IFunc): description = '''A model-specific transformation. Prepare the topoflow RTS & RTI files. ''' inputs = { - "geotiff_files": ArgType.ListString, + "geotiff_files": ArgType.ListOrOneString, "cropped_geotiff_dir": ArgType.String, "output_file": ArgType.String, "bounds": ArgType.String, "xres_arcsecs": ArgType.Number, "yres_arcsecs": ArgType.Number, - "unit_multiplier": ArgType.Number(optional=True) + "unit_multiplier": ArgType.Number(optional=True), + "skip_crop_on_exist": ArgType.Boolean } outputs = {"output_file": ArgType.String} friendly_name: str = "Topoflow Climate" @@ -45,7 +46,7 @@ class Topoflow4ClimateWriteFunc(IFunc): "unit_multiplier": 1 } - def __init__(self, geotiff_files: List[str], cropped_geotiff_dir: str, output_file: str, bounds: str, xres_arcsecs: int, yres_arcsecs: int, unit_multiplier: float=1): + def __init__(self, geotiff_files: Union[str, List[str]], cropped_geotiff_dir: str, output_file: str, bounds: str, xres_arcsecs: int, yres_arcsecs: int, unit_multiplier: float=1, skip_crop_on_exist: bool=False): x_min, y_min, x_max, y_max = [float(x.strip()) for x in bounds.split(",")] assert x_max > x_min and y_min < y_max self.bounding_box = BoundingBox(x_min, y_min, x_max, y_max) @@ -54,9 +55,10 @@ def __init__(self, geotiff_files: List[str], cropped_geotiff_dir: str, output_fi self.yres_arcsecs = yres_arcsecs self.output_file = os.path.abspath(output_file) if isinstance(geotiff_files, str): - geotiff_files = glob.glob(self.geotiff_files) + geotiff_files = glob.glob(geotiff_files) self.geotiff_files = geotiff_files self.cropped_geotiff_dir = os.path.abspath(cropped_geotiff_dir) + self.skip_crop_on_exist = skip_crop_on_exist if not os.path.exists(self.cropped_geotiff_dir): Path(self.cropped_geotiff_dir).mkdir(exist_ok=True, parents=True) @@ -65,7 +67,7 @@ def __init__(self, geotiff_files: List[str], cropped_geotiff_dir: str, output_fi Path(self.cropped_geotiff_dir).mkdir(exist_ok=True, parents=True) Path(output_file).parent.mkdir(exist_ok=True, parents=True) - assert not os.path.exists(output_file) + assert not os.path.exists(output_file), output_file def exec(self) -> dict: if self.output_file.endswith(".zip"): @@ -74,18 +76,23 @@ def exec(self) -> dict: else: rts_file = self.output_file - create_rts_rti(self.geotiff_files, rts_file, self.cropped_geotiff_dir, self.bounding_box, self.xres_arcsecs, self.yres_arcsecs, self.unit_multiplier) + create_rts_rti(self.geotiff_files, rts_file, self.cropped_geotiff_dir, self.bounding_box, self.xres_arcsecs, self.yres_arcsecs, self.unit_multiplier, self.skip_crop_on_exist) if self.output_file.endswith(".zip"): # compress the outfile with ZipFile(self.output_file, 'w') as z: z.write(rts_file, os.path.basename(rts_file)) z.write(rti_file, os.path.basename(rti_file)) + os.remove(rts_file) + os.remove(rti_file) return {"output_file": self.output_file} def validate(self) -> bool: return True + def change_metadata(self, metadata: Optional[Dict[str, Metadata]]) -> Dict[str, Metadata]: + return metadata + class Topoflow4ClimateWriteWrapperFunc(IFunc): func_cls = Topoflow4ClimateWriteFunc diff --git a/funcs/topoflow/topoflow_funcs.py b/funcs/topoflow/topoflow_funcs.py index 87dea2e3..e3f326b2 100644 --- a/funcs/topoflow/topoflow_funcs.py +++ b/funcs/topoflow/topoflow_funcs.py @@ -23,7 +23,7 @@ def crop_geotiff(args): return True -def create_rts_rti(tif_files, out_file, crop_dir: str, out_bounds: BoundingBox, out_xres_sec: int, out_yres_sec: int, unit_multiplier: float): +def create_rts_rti(tif_files, out_file, crop_dir: str, out_bounds: BoundingBox, out_xres_sec: int, out_yres_sec: int, unit_multiplier: float, skip_crop_on_exist: bool): """Create RTS file from TIF files. Names of TIF files must be sorted by time""" assert out_file.endswith(".rts") and len(out_file.split(".rts")) == 2 assert len(tif_files) > 0 @@ -40,6 +40,8 @@ def create_rts_rti(tif_files, out_file, crop_dir: str, out_bounds: BoundingBox, # ])) # print(res) for tif_file, out_crop_file in tqdm(zip(tif_files, out_crop_files)): + if skip_crop_on_exist and os.path.exists(out_crop_file): + continue args = (tif_file, out_crop_file, out_bounds, out_xres_sec, out_yres_sec) assert crop_geotiff(args) @@ -62,7 +64,7 @@ def create_rts_rti(tif_files, out_file, crop_dir: str, out_bounds: BoundingBox, def create_rti(tif_file, out_file): """Create RTI from the TIF file""" - import_grid.read_from_geotiff(tif_file, REPORT=True, rti_file=out_file) + import_grid.read_from_geotiff(tif_file, REPORT=False, rti_file=out_file) if __name__ == '__main__': diff --git a/funcs/topoflow/write_topoflow4_climate_func.py b/funcs/topoflow/write_topoflow4_climate_func.py index 9b968836..3b75e536 100644 --- a/funcs/topoflow/write_topoflow4_climate_func.py +++ b/funcs/topoflow/write_topoflow4_climate_func.py @@ -71,6 +71,9 @@ def exec(self) -> dict: def validate(self) -> bool: return True + def change_metadata(self, metadata: Optional[Dict[str, Metadata]]) -> Dict[str, Metadata]: + return metadata + class Topoflow4ClimateWritePerMonthFunc(IFunc): id = "topoflow4_climate_write_per_month_func" @@ -87,7 +90,7 @@ class Topoflow4ClimateWritePerMonthFunc(IFunc): func_type = IFuncType.MODEL_TRANS example = { "grid_dir": f"/data/mint/gpm_grid_baro", - "date_regex": '3B-HHR-E.MS.MRG.3IMERG.(?P\d{4})(?P\d{2})(?P\d{2})', + "date_regex": r'3B-HHR-E.MS.MRG.3IMERG.(?P\d{4})(?P\d{2})(?P\d{2})', "output_file": f"/data/mint/baro/climate.rts", } diff --git a/funcs/writers/geotiff_write_func.py b/funcs/writers/geotiff_write_func.py index be72241b..770ec0fe 100644 --- a/funcs/writers/geotiff_write_func.py +++ b/funcs/writers/geotiff_write_func.py @@ -17,15 +17,17 @@ class GeoTiffWriteFunc(IFunc): "dataset": ArgType.DataSet(None), "variable_name": ArgType.String, "output_dir": ArgType.String, + "skip_on_exist": ArgType.Boolean, } outputs = { "output_files": ArgType.ListString } - def __init__(self, dataset: BaseOutputSM, variable_name: str, output_dir: Union[str, Path]): + def __init__(self, dataset: BaseOutputSM, variable_name: str, output_dir: Union[str, Path], skip_on_exist: bool=False): self.dataset = dataset self.variable_name = variable_name self.output_dir = os.path.abspath(str(output_dir)) + self.skip_on_exist = skip_on_exist if not os.path.exists(self.output_dir): Path(self.output_dir).mkdir(exist_ok=True, parents=True) @@ -34,11 +36,13 @@ def exec(self): rasters = CroppingTransFunc.extract_raster(self.dataset, self.variable_name) rasters = sorted(rasters, key=lambda x: x['timestamp']) outfiles = [ - os.path.join(self.output_dir, - datetime.fromtimestamp(raster['timestamp'], tz=timezone.utc).strftime(f"%Y%m%d%H%M%S.{i}.tif")) + os.path.join(self.output_dir, datetime.fromtimestamp(raster['timestamp'], tz=timezone.utc).strftime(f"%Y%m%d%H%M%S.{i}.tif")) for i, raster in enumerate(rasters) ] + for outfile, raster in zip(outfiles, rasters): + if self.skip_on_exist and os.path.exists(outfile): + continue raster['raster'].to_geotiff(outfile) return {"output_files": outfiles} @@ -47,4 +51,4 @@ def validate(self) -> bool: return True def change_metadata(self, metadata: Optional[Dict[str, Metadata]]) -> Dict[str, Metadata]: - return metadata + return metadata \ No newline at end of file diff --git a/run b/run index 0c9b76b9..ec9f5bd9 100755 --- a/run +++ b/run @@ -4,7 +4,7 @@ env_file=$(readlink -f ./env.env) BASEDIR=`dirname $0` pushd /ws echo "Running DAME execution" -dotenv -f env.env run python -m dtran.dame.exec "$@" +dotenv -f $env_file run python -m dtran.dame.exec "$@" while [[ "$#" -gt 0 ]] do diff --git a/uploaded.json b/uploaded.json new file mode 100644 index 00000000..527b4d72 --- /dev/null +++ b/uploaded.json @@ -0,0 +1,10 @@ +{ + "shebelle.tar.gz": "http://files.mint.isi.edu/s/KTDn3YQ347yNqBd/download", + "awash.tar.gz": "http://files.mint.isi.edu/s/abr6goUK5Gxfysu/download", + "baro.tar.gz": "http://files.mint.isi.edu/s/ZD9ssfwt57yikFW/download", + "beko.tar.gz": "http://files.mint.isi.edu/s/9ToC9VlSqM8SdHs/download", + "muger.tar.gz": "http://files.mint.isi.edu/s/pBHNGXtYeA5ZLcE/download", + "alwero.tar.gz": "http://files.mint.isi.edu/s/38r6gMtnE6yKYkr/download", + "ganale.tar.gz": "http://files.mint.isi.edu/s/3jOjvSpVHaito5M/download", + "guder.tar.gz": "http://files.mint.isi.edu/s/xfOnnl66mPPFVWC/download" +} \ No newline at end of file