diff --git a/quest/api/datasets.py b/quest/api/datasets.py index 074380ea..3a0c6376 100644 --- a/quest/api/datasets.py +++ b/quest/api/datasets.py @@ -11,6 +11,7 @@ from .. import static from ..plugins import load_providers, load_plugins, list_plugins from ..database.database import get_db, db_session, select_datasets +import json @add_async @@ -374,6 +375,14 @@ def open_dataset(dataset, fmt=None, **kwargs): m = get_metadata(dataset).get(dataset) file_format = m.get('file_format') path = m.get('file_path') + intake_plugin = m.get('intake_plugin') + required = [] + args = m.get('intake_args') + if len(args.strip()): + # Get args from json if available + required = json.loads(m.get('intake_args')) + else: + raise ValueError('No intake plugin found') if path is None: raise ValueError('No dataset file found') @@ -381,9 +390,15 @@ def open_dataset(dataset, fmt=None, **kwargs): if file_format not in list_plugins(static.PluginType.IO): raise ValueError('No reader available for: %s' % file_format) - io = load_plugins(static.PluginType.IO, file_format)[file_format] - return io.open(path, fmt=fmt, **kwargs) + # Use intake plugin to open + if intake_plugin: + # New code, with 'intake_plugin' added to the local .db + plugin_name = 'open_' + intake_plugin + module = __import__('intake') + func = getattr(module, plugin_name) + source = func(*required, **kwargs) + return source.read() @add_async def visualize_dataset(dataset, update_cache=False, **kwargs): diff --git a/quest/database/database.py b/quest/database/database.py index c5065aeb..9dae6b48 100644 --- a/quest/database/database.py +++ b/quest/database/database.py @@ -46,6 +46,8 @@ class Dataset(db.Entity): message = orm.Optional(str) file_path = orm.Optional(str, nullable=True) visualization_path = orm.Optional(str) + intake_plugin = orm.Optional(str) + intake_args = orm.Optional(str) # setup relationships collection = orm.Required(Collection) diff --git a/quest_provider_plugins/noaa_coastwatch.py b/quest_provider_plugins/noaa_coastwatch.py index 9cf56fce..4e805685 100644 --- a/quest_provider_plugins/noaa_coastwatch.py +++ b/quest_provider_plugins/noaa_coastwatch.py @@ -3,6 +3,7 @@ import param import pandas as pd from urllib.error import HTTPError +import json from urllib.parse import quote, urlencode from quest.util.log import logger @@ -38,7 +39,7 @@ def parameters(self): def search_catalog(self, **kwargs): raise NotImplementedError() # TODO drop duplicates? - + @property def catalog_id(self): return self._catalog_id @@ -91,6 +92,8 @@ def download(self, catalog_id, file_path, dataset, **kwargs): 'file_path': file_path, 'file_format': 'timeseries-hdf5', 'datatype': DataType.TIMESERIES, + 'intake_plugin': 'quest_timeseries_hdf5', + 'intake_args': json.dumps([file_path]), 'parameter': p.parameter, 'unit': units[self.parameter_code], 'service_id': 'svc://noaa:{}/{}'.format(self.service_name, catalog_id) diff --git a/quest_provider_plugins/noaa_ncdc.py b/quest_provider_plugins/noaa_ncdc.py index f0091522..749ea39f 100644 --- a/quest_provider_plugins/noaa_ncdc.py +++ b/quest_provider_plugins/noaa_ncdc.py @@ -1,4 +1,5 @@ import os +import json import param import pandas as pd @@ -7,7 +8,6 @@ from quest.static import ServiceType, GeomType, DataType from quest.plugins import ProviderBase, TimePeriodServiceBase, load_plugins - BASE_PATH = 'ncdc' @@ -26,7 +26,7 @@ def metadata(self): 'geographical_areas': self.geographical_areas, 'bounding_boxes': self.bounding_boxes } - + @property def parameters(self): return { @@ -95,6 +95,8 @@ def download(self, catalog_id, file_path, dataset, **kwargs): 'file_path': file_path, 'file_format': 'timeseries-hdf5', 'datatype': DataType.TIMESERIES, + 'intake_plugin': 'quest_timeseries_hdf5', + 'intake_args': json.dumps([file_path]), 'parameter': self.parameter, 'unit': self._unit_map[self.parameter], 'service_id': 'svc://ncdc:{}/{}'.format(self.service_name, catalog_id) diff --git a/quest_provider_plugins/usgs_ned.py b/quest_provider_plugins/usgs_ned.py index a7ccf1af..8cd649f9 100644 --- a/quest_provider_plugins/usgs_ned.py +++ b/quest_provider_plugins/usgs_ned.py @@ -1,4 +1,12 @@ import os +import json + +from ulmo.usgs import ned + +from quest import util +from quest.static import ServiceType, DataType, GeomType +from quest.plugins import ProviderBase, SingleFileServiceBase + from ulmo.usgs import ned @@ -22,6 +30,12 @@ class UsgsNedServiceBase(SingleFileServiceBase): 'elevation': 'elevation' } + def download(self, catalog_id, file_path, dataset, **kwargs): + # Call the base to download, but then update the dictionary with intake info + metadata = super().download(catalog_id, file_path, dataset, **kwargs) + metadata.update({'intake_plugin': 'rasterio', 'intake_args': json.dumps([file_path, {}])}) + return metadata + def search_catalog(self, **kwargs): service = self._description catalog_entries = util.to_geodataframe( diff --git a/quest_provider_plugins/usgs_nlcd.py b/quest_provider_plugins/usgs_nlcd.py index f85140c2..81a32488 100644 --- a/quest_provider_plugins/usgs_nlcd.py +++ b/quest_provider_plugins/usgs_nlcd.py @@ -1,4 +1,5 @@ import requests +import json import pandas as pd from quest import util @@ -17,6 +18,12 @@ class UsgsNlcdServiceBase(SingleFileServiceBase): 'landcover': 'landcover' } + def download(self, catalog_id, file_path, dataset, **kwargs): + # Call the base to download, but then update the dictionary with intake info + metadata = super().download(catalog_id, file_path, dataset, **kwargs) + metadata.update({'intake_plugin': 'rasterio', 'intake_args': json.dumps([file_path, {}])}) + return metadata + def search_catalog(self, **kwargs): base_url = 'https://www.sciencebase.gov/catalog/items' params = [ diff --git a/quest_provider_plugins/usgs_nwis.py b/quest_provider_plugins/usgs_nwis.py index 72ee5e85..3071e2f6 100644 --- a/quest_provider_plugins/usgs_nwis.py +++ b/quest_provider_plugins/usgs_nwis.py @@ -8,6 +8,7 @@ from quest import util from quest.static import ServiceType, GeomType, DataType +import json from quest.plugins import ProviderBase, TimePeriodServiceBase, load_plugins @@ -70,6 +71,8 @@ def download(self, catalog_id, file_path, dataset, **kwargs): 'name': dataset, 'metadata': data, 'file_path': file_path, + 'intake_plugin': 'quest_timeseries_hdf5', + 'intake_args': json.dumps([file_path]), 'file_format': 'timeseries-hdf5', 'datatype': DataType.TIMESERIES, 'parameter': parameter, diff --git a/quest_provider_plugins/wmts_imagery.py b/quest_provider_plugins/wmts_imagery.py index af12fe46..62664fdf 100644 --- a/quest_provider_plugins/wmts_imagery.py +++ b/quest_provider_plugins/wmts_imagery.py @@ -10,6 +10,7 @@ import param import requests import rasterio +import json from imageio import imread from quest.static import ServiceType from requests.packages.urllib3.util.retry import Retry @@ -92,6 +93,8 @@ def download(self, catalog_id, file_path, dataset, **kwargs): 'metadata': {'bbox': adjusted_bbox}, 'file_path': file_path, 'file_format': 'raster-gdal', + 'intake_plugin': 'rasterio', + 'intake_args': json.dumps([file_path, {}]), 'datatype': 'image', } diff --git a/quest_tool_plugins/raster/rst_base.py b/quest_tool_plugins/raster/rst_base.py index 7d65f8df..9761b061 100644 --- a/quest_tool_plugins/raster/rst_base.py +++ b/quest_tool_plugins/raster/rst_base.py @@ -1,8 +1,9 @@ +import json import rasterio from quest import util from quest.plugins import ToolBase -from quest.api import get_metadata +from quest.api import get_metadata, update_metadata from quest.static import DataType, UriType @@ -37,18 +38,20 @@ def _run_tool(self): "width": out_image.shape[1], "transform": None}) + new_dset, file_path, catalog_entry = self._create_new_dataset( + old_dataset=dataset, + ext='.tif' + ) + new_metadata = { 'parameter': orig_metadata['parameter'], 'datatype': orig_metadata['datatype'], 'file_format': orig_metadata['file_format'], + 'intake_plugin': orig_metadata['intake_plugin'], + 'intake_args': json.dumps([file_path, {}]), 'unit': orig_metadata['unit'] } - - new_dset, file_path, catalog_entry = self._create_new_dataset( - old_dataset=dataset, - ext='.tif', - dataset_metadata=new_metadata, - ) + update_metadata(new_dset, quest_metadata=new_metadata) with rasterio.open(file_path, "w", **out_meta) as dest: dest.write(out_image) diff --git a/quest_tool_plugins/raster/rst_merge.py b/quest_tool_plugins/raster/rst_merge.py index e1158799..2608c1e7 100644 --- a/quest_tool_plugins/raster/rst_merge.py +++ b/quest_tool_plugins/raster/rst_merge.py @@ -1,3 +1,4 @@ +import json import param import rasterio import rasterio.mask @@ -42,18 +43,20 @@ def _run_tool(self): if get_metadata(dataset)[dataset]['unit'] != orig_metadata['unit']: raise ValueError('Units must match for all datasets') + new_dset, file_path, catalog_entry = self._create_new_dataset( + old_dataset=datasets[0], + ext='.tif' + ) + new_metadata = { 'parameter': orig_metadata['parameter'], 'datatype': orig_metadata['datatype'], 'file_format': orig_metadata['file_format'], + 'intake_plugin': orig_metadata['intake_plugin'], + 'intake_args': json.dumps([file_path, {}]), 'unit': orig_metadata['unit'], } - - new_dset, file_path, catalog_entry = self._create_new_dataset( - old_dataset=datasets[0], - ext='.tif', - dataset_metadata=new_metadata, - ) + update_metadata(new_dset, quest_metadata=new_metadata) open_datasets = [rasterio.open(d) for d in raster_files] profile = open_datasets[0].profile diff --git a/quest_tool_plugins/raster/rst_reprojection.py b/quest_tool_plugins/raster/rst_reprojection.py index e3a69434..a82cd580 100644 --- a/quest_tool_plugins/raster/rst_reprojection.py +++ b/quest_tool_plugins/raster/rst_reprojection.py @@ -1,3 +1,4 @@ +import json import param import rasterio import subprocess @@ -35,17 +36,19 @@ def _run_tool(self): dst_crs = self.new_crs + new_dset, file_path, catalog_entry = self._create_new_dataset( + old_dataset=dataset, + ext='.tif' + ) + new_metadata = { 'parameter': orig_metadata['parameter'], 'datatype': orig_metadata['datatype'], 'file_format': orig_metadata['file_format'], + 'intake_plugin': orig_metadata['intake_plugin'], + 'intake_args': json.dumps([file_path, {}]), } - - new_dset, file_path, catalog_entry = self._create_new_dataset( - old_dataset=dataset, - ext='.tif', - dataset_metadata=new_metadata, - ) + update_metadata(new_dset, quest_metadata=new_metadata) # run filter with rasterio.open(src_path) as src: diff --git a/quest_tool_plugins/timeseries/ts_base.py b/quest_tool_plugins/timeseries/ts_base.py index 6fd463c9..b40e673e 100644 --- a/quest_tool_plugins/timeseries/ts_base.py +++ b/quest_tool_plugins/timeseries/ts_base.py @@ -1,6 +1,7 @@ +import json from quest import util from quest.plugins import ToolBase -from quest.api import get_metadata +from quest.api import get_metadata, update_metadata from quest.plugins import load_plugins from quest.static import UriType, DataType @@ -35,18 +36,20 @@ def _run_tool(self): # setup new dataset + new_dset, file_path, catalog_entry = self._create_new_dataset( + old_dataset=dataset, + ext='.h5' + ) + new_metadata = { 'parameter': new_df.metadata.get('parameter'), 'unit': new_df.metadata.get('unit'), 'datatype': orig_metadata['datatype'], 'file_format': orig_metadata['file_format'], + 'intake_plugin': orig_metadata['intake_plugin'], + 'intake_args': json.dumps([file_path]), } - - new_dset, file_path, catalog_entry = self._create_new_dataset( - old_dataset=dataset, - ext='.h5', - dataset_metadata=new_metadata, - ) + update_metadata(new_dset, quest_metadata=new_metadata) # save dataframe io.write(file_path, new_df, new_metadata) diff --git a/quest_tool_plugins/timeseries/ts_flow_duration.py b/quest_tool_plugins/timeseries/ts_flow_duration.py index 081d498c..ac77e3ef 100644 --- a/quest_tool_plugins/timeseries/ts_flow_duration.py +++ b/quest_tool_plugins/timeseries/ts_flow_duration.py @@ -1,6 +1,7 @@ +import json from quest import util from quest.plugins import ToolBase -from quest.api import get_metadata +from quest.api import get_metadata, update_metadata from quest.static import DataType, UriType from quest.plugins import load_plugins from quest.util import setattr_on_dataframe @@ -43,19 +44,22 @@ def _run_tool(self): setattr_on_dataframe(df, 'metadata', metadata) new_df = df # setup new dataset + + new_dset, file_path, catalog_entry = self._create_new_dataset( + old_dataset=dataset, + ext='.h5', + ) + new_metadata = { 'parameter': new_df.metadata.get('parameter'), 'datatype': orig_metadata['datatype'], 'options': self.set_options, 'file_format': orig_metadata['file_format'], 'unit': new_df.metadata.get('unit'), + 'intake_plugin': orig_metadata['intake_plugin'], + 'intake_args': json.dumps([file_path]), } - - new_dset, file_path, catalog_entry = self._create_new_dataset( - old_dataset=dataset, - ext='.h5', - dataset_metadata=new_metadata, - ) + update_metadata(new_dset, quest_metadata=new_metadata) # save dataframe output = load_plugins('io', 'xy-hdf5')['xy-hdf5'] diff --git a/quest_tool_plugins/whitebox/whitebox_watershed.py b/quest_tool_plugins/whitebox/whitebox_watershed.py index c5d7336f..cb66558e 100644 --- a/quest_tool_plugins/whitebox/whitebox_watershed.py +++ b/quest_tool_plugins/whitebox/whitebox_watershed.py @@ -1,4 +1,5 @@ import param +import json import numpy as np from quest import util @@ -44,6 +45,8 @@ def _run_tool(self): 'parameter': 'streams', 'datatype': orig_metadata['datatype'], 'file_format': orig_metadata['file_format'], + 'intake_plugin': orig_metadata['intake_plugin'], + 'intake_args': json.dumps([file_path, {}]), } update_metadata(new_dset, quest_metadata=quest_metadata) @@ -108,14 +111,18 @@ def _run_tool(self): new_dset, file_path, catalog_entry = self._create_new_dataset( old_dataset=dataset, - ext='.tif', - dataset_metadata={ - 'parameter': 'streams', - 'datatype': orig_metadata['datatype'], - 'file_format': orig_metadata['file_format'], - } + ext='.tif' ) + quest_metadata = { + 'parameter': 'streams', + 'datatype': orig_metadata['datatype'], + 'file_format': orig_metadata['file_format'], + 'intake_plugin': orig_metadata['intake_plugin'], + 'intake_args': json.dumps([file_path, {}]), + } + update_metadata(new_dset, quest_metadata=quest_metadata) + fa = self.flow_accumulation if self.flow_accumulation is not None else wbt.d_inf_flow_accumulation(elev_file) # fa = wbt.d8_flow_accumulation(fill) wbt.extract_streams( @@ -189,13 +196,18 @@ def _run_tool(self): new_dset, file_path, catalog_entry = self._create_new_dataset( old_dataset=dataset, ext='.tif', - dataset_metadata={ - 'parameter': 'watershed_boundary', - 'datatype': orig_metadata['datatype'], - 'file_format': orig_metadata['file_format'], - } ) + quest_metadata = { + 'parameter': 'watershed_boundary', + 'datatype': orig_metadata['datatype'], + 'file_format': orig_metadata['file_format'], + 'intake_plugin': orig_metadata['intake_plugin'], + 'intake_args': json.dumps([file_path, {}]), + } + + update_metadata(new_dset, quest_metadata=quest_metadata) + d8 = wbt.d8_pointer(elev_file) point_shp = points_to_shp(original_outlets) @@ -238,6 +250,8 @@ def _run_tool(self): 'parameter': 'streams', 'datatype': orig_metadata['datatype'], 'file_format': orig_metadata['file_format'], + 'intake_plugin': orig_metadata['intake_plugin'], + 'intake_args': json.dumps([file_path, {}]), 'file_path': file_path, } diff --git a/test/files/projects_template/default/metadata.db b/test/files/projects_template/default/metadata.db index 7b447320..d7b7f768 100644 Binary files a/test/files/projects_template/default/metadata.db and b/test/files/projects_template/default/metadata.db differ diff --git a/test/files/projects_template/project1/metadata.db b/test/files/projects_template/project1/metadata.db index f740db31..e4920cff 100644 Binary files a/test/files/projects_template/project1/metadata.db and b/test/files/projects_template/project1/metadata.db differ diff --git a/test/files/projects_template/project2/metadata.db b/test/files/projects_template/project2/metadata.db index 1a786ed9..16a59d64 100644 Binary files a/test/files/projects_template/project2/metadata.db and b/test/files/projects_template/project2/metadata.db differ diff --git a/test/files/projects_template/test_data/metadata.db b/test/files/projects_template/test_data/metadata.db index 7b3c72d2..132524c4 100644 Binary files a/test/files/projects_template/test_data/metadata.db and b/test/files/projects_template/test_data/metadata.db differ