-
Notifications
You must be signed in to change notification settings - Fork 9
Change opening datasets to use Intake plugins #121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
2a0b968
058179d
0b226c7
239f6bc
057a856
3cd9586
8ebfb06
f96c533
4954a99
e4743a9
cd37b93
b407e86
5fe24ba
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,7 @@ | |
| from .. import static | ||
| from ..plugins import load_providers, load_plugins, list_plugins | ||
| from ..database.database import get_db, db_session, select_datasets | ||
| import json | ||
|
|
||
|
|
||
| @add_async | ||
|
|
@@ -374,16 +375,30 @@ def open_dataset(dataset, fmt=None, **kwargs): | |
| m = get_metadata(dataset).get(dataset) | ||
| file_format = m.get('file_format') | ||
| path = m.get('file_path') | ||
| intake_plugin = m.get('intake_plugin') | ||
| required = [] | ||
| args = m.get('intake_args') | ||
| if len(args.strip()): | ||
| # Get args from json if available | ||
| required = json.loads(m.get('intake_args')) | ||
| else: | ||
| raise ValueError('No intake plugin found') | ||
|
|
||
| if path is None: | ||
| raise ValueError('No dataset file found') | ||
|
|
||
| if file_format not in list_plugins(static.PluginType.IO): | ||
| raise ValueError('No reader available for: %s' % file_format) | ||
|
|
||
| io = load_plugins(static.PluginType.IO, file_format)[file_format] | ||
| return io.open(path, fmt=fmt, **kwargs) | ||
| # Use intake plugin to open | ||
| if intake_plugin: | ||
| # New code, with 'intake_plugin' added to the local .db | ||
| plugin_name = 'open_' + intake_plugin | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems somewhat fragile. or, perhaps better, you could construct either the relavant YAML block or a Note that in the Intake world, the driver here could be something like "parquet", but it can also be the fully-qualified class name like "intake_parquet.ParquetSource". Of course, if you have additional constrains within Quest, that's fine. |
||
|
|
||
| module = __import__('intake') | ||
| func = getattr(module, plugin_name) | ||
| source = func(*required, **kwargs) | ||
| return source.read() | ||
|
|
||
| @add_async | ||
| def visualize_dataset(dataset, update_cache=False, **kwargs): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -46,6 +46,8 @@ class Dataset(db.Entity): | |
| message = orm.Optional(str) | ||
| file_path = orm.Optional(str, nullable=True) | ||
| visualization_path = orm.Optional(str) | ||
| intake_plugin = orm.Optional(str) | ||
| intake_args = orm.Optional(str) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A JSON representation of arguments, correct? If these are stored as strings, would it make sense to use the same YAML spec used by Intake text-file catalogs? |
||
|
|
||
| # setup relationships | ||
| collection = orm.Required(Collection) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,7 @@ | |
| import param | ||
| import pandas as pd | ||
| from urllib.error import HTTPError | ||
| import json | ||
| from urllib.parse import quote, urlencode | ||
|
|
||
| from quest.util.log import logger | ||
|
|
@@ -38,7 +39,7 @@ def parameters(self): | |
| def search_catalog(self, **kwargs): | ||
| raise NotImplementedError() | ||
| # TODO drop duplicates? | ||
|
|
||
| @property | ||
| def catalog_id(self): | ||
| return self._catalog_id | ||
|
|
@@ -91,6 +92,8 @@ def download(self, catalog_id, file_path, dataset, **kwargs): | |
| 'file_path': file_path, | ||
| 'file_format': 'timeseries-hdf5', | ||
| 'datatype': DataType.TIMESERIES, | ||
| 'intake_plugin': 'quest_timeseries_hdf5', | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. E241 multiple spaces after ':' |
||
| 'intake_args': json.dumps([file_path]), | ||
| 'parameter': p.parameter, | ||
| 'unit': units[self.parameter_code], | ||
| 'service_id': 'svc://noaa:{}/{}'.format(self.service_name, catalog_id) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,12 @@ | ||
| import os | ||
| import json | ||
|
|
||
| from ulmo.usgs import ned | ||
|
|
||
| from quest import util | ||
| from quest.static import ServiceType, DataType, GeomType | ||
| from quest.plugins import ProviderBase, SingleFileServiceBase | ||
|
|
||
|
|
||
| from ulmo.usgs import ned | ||
|
|
||
|
|
@@ -22,6 +30,12 @@ class UsgsNedServiceBase(SingleFileServiceBase): | |
| 'elevation': 'elevation' | ||
| } | ||
|
|
||
| def download(self, catalog_id, file_path, dataset, **kwargs): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do not know the context here, but you should be aware that Intake also has the ability to download source data files on first use https://intake.readthedocs.io/en/latest/catalog.html#caching-source-files-locally |
||
| # Call the base to download, but then update the dictionary with intake info | ||
| metadata = super().download(catalog_id, file_path, dataset, **kwargs) | ||
| metadata.update({'intake_plugin': 'rasterio', 'intake_args': json.dumps([file_path, {}])}) | ||
| return metadata | ||
|
|
||
| def search_catalog(self, **kwargs): | ||
| service = self._description | ||
| catalog_entries = util.to_geodataframe( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,8 +1,9 @@ | ||
| import json | ||
| import rasterio | ||
|
|
||
| from quest import util | ||
| from quest.plugins import ToolBase | ||
| from quest.api import get_metadata | ||
| from quest.api import get_metadata, update_metadata | ||
| from quest.static import DataType, UriType | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. F401 'quest.static.UriType' imported but unused |
||
|
|
||
|
|
||
|
|
@@ -37,18 +38,20 @@ def _run_tool(self): | |
| "width": out_image.shape[1], | ||
| "transform": None}) | ||
|
|
||
| new_dset, file_path, catalog_entry = self._create_new_dataset( | ||
| old_dataset=dataset, | ||
| ext='.tif' | ||
| ) | ||
|
|
||
| new_metadata = { | ||
| 'parameter': orig_metadata['parameter'], | ||
| 'datatype': orig_metadata['datatype'], | ||
| 'file_format': orig_metadata['file_format'], | ||
| 'intake_plugin': orig_metadata['intake_plugin'], | ||
| 'intake_args': json.dumps([file_path, {}]), | ||
| 'unit': orig_metadata['unit'] | ||
| } | ||
|
|
||
| new_dset, file_path, catalog_entry = self._create_new_dataset( | ||
| old_dataset=dataset, | ||
| ext='.tif', | ||
| dataset_metadata=new_metadata, | ||
| ) | ||
| update_metadata(new_dset, quest_metadata=new_metadata) | ||
|
|
||
| with rasterio.open(file_path, "w", **out_meta) as dest: | ||
| dest.write(out_image) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| import json | ||
| from quest import util | ||
| from quest.plugins import ToolBase | ||
| from quest.api import get_metadata | ||
| from quest.api import get_metadata, update_metadata | ||
| from quest.plugins import load_plugins | ||
| from quest.static import UriType, DataType | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. F401 'quest.static.UriType' imported but unused |
||
|
|
||
|
|
@@ -35,18 +36,20 @@ def _run_tool(self): | |
|
|
||
|
|
||
| # setup new dataset | ||
| new_dset, file_path, catalog_entry = self._create_new_dataset( | ||
| old_dataset=dataset, | ||
| ext='.h5' | ||
| ) | ||
|
|
||
| new_metadata = { | ||
| 'parameter': new_df.metadata.get('parameter'), | ||
| 'unit': new_df.metadata.get('unit'), | ||
| 'datatype': orig_metadata['datatype'], | ||
| 'file_format': orig_metadata['file_format'], | ||
| 'intake_plugin': orig_metadata['intake_plugin'], | ||
| 'intake_args': json.dumps([file_path]), | ||
| } | ||
|
|
||
| new_dset, file_path, catalog_entry = self._create_new_dataset( | ||
| old_dataset=dataset, | ||
| ext='.h5', | ||
| dataset_metadata=new_metadata, | ||
| ) | ||
| update_metadata(new_dset, quest_metadata=new_metadata) | ||
|
|
||
| # save dataframe | ||
| io.write(file_path, new_df, new_metadata) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| import json | ||
| from quest import util | ||
| from quest.plugins import ToolBase | ||
| from quest.api import get_metadata | ||
| from quest.api import get_metadata, update_metadata | ||
| from quest.static import DataType, UriType | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. F401 'quest.static.UriType' imported but unused |
||
| from quest.plugins import load_plugins | ||
| from quest.util import setattr_on_dataframe | ||
|
|
@@ -43,19 +44,22 @@ def _run_tool(self): | |
| setattr_on_dataframe(df, 'metadata', metadata) | ||
| new_df = df | ||
| # setup new dataset | ||
|
|
||
| new_dset, file_path, catalog_entry = self._create_new_dataset( | ||
| old_dataset=dataset, | ||
| ext='.h5', | ||
| ) | ||
|
|
||
| new_metadata = { | ||
| 'parameter': new_df.metadata.get('parameter'), | ||
| 'datatype': orig_metadata['datatype'], | ||
| 'options': self.set_options, | ||
| 'file_format': orig_metadata['file_format'], | ||
| 'unit': new_df.metadata.get('unit'), | ||
| 'intake_plugin': orig_metadata['intake_plugin'], | ||
| 'intake_args': json.dumps([file_path]), | ||
| } | ||
|
|
||
| new_dset, file_path, catalog_entry = self._create_new_dataset( | ||
| old_dataset=dataset, | ||
| ext='.h5', | ||
| dataset_metadata=new_metadata, | ||
| ) | ||
| update_metadata(new_dset, quest_metadata=new_metadata) | ||
|
|
||
| # save dataframe | ||
| output = load_plugins('io', 'xy-hdf5')['xy-hdf5'] | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that the class that loads data for intake is now usually called a "driver" - there are many types of plugins. https://intake.readthedocs.io/en/latest/glossary.html