From fe17faedb55613c01fea2a59fd0a5f17368a35ed Mon Sep 17 00:00:00 2001 From: AaronV77 Date: Wed, 4 Apr 2018 15:29:14 -0500 Subject: [PATCH 01/12] Dataset display name fix (More readable). --- quest/api/services.py | 2 - quest/services/ISEP_Girder.py | 90 +++++++++++++++++++++++++++++++++++ quest/services/noaa_ncep.py | 80 +++++++++++++++++++++++++++++++ 3 files changed, 170 insertions(+), 2 deletions(-) create mode 100644 quest/services/ISEP_Girder.py create mode 100644 quest/services/noaa_ncep.py diff --git a/quest/api/services.py b/quest/api/services.py index 2c225f3c..b2b78f0d 100644 --- a/quest/api/services.py +++ b/quest/api/services.py @@ -78,8 +78,6 @@ def get_publishers(expand=None, publisher_type=None): Returns: providers (list or dict,Default=list): list of all available providers - - """ providers = util.load_providers() publishers = {} diff --git a/quest/services/ISEP_Girder.py b/quest/services/ISEP_Girder.py new file mode 100644 index 00000000..e3cdfc47 --- /dev/null +++ b/quest/services/ISEP_Girder.py @@ -0,0 +1,90 @@ +from .base import ProviderBase, SingleFileServiceBase, PublishBase +from ..api.database import get_db, db_session +from shapely.geometry import Point, box +from ..api.metadata import get_metadata +from ..util import param_util +from getpass import getpass +import girder_client +import pandas as pd +import param +import os + + +class ISEPServiceBase(SingleFileServiceBase): + + token = param.String(default=None, doc="Token #", precedence=1) + + @property + def gc(self): + return self.provider.get_gc() + + def get_features(self, **kwargs): + raise NotImplementedError() + + +class ISEPPublisher(PublishBase): + publisher_name = "girder_pub" + display_name = "Girder Publisher" + description = "Girder is a repository for ERDC ERS." + + token = param.String(default=None, doc="Token #", precedence=1) + title = param.String(default="example title", doc="Title of resource", precedence=2) + collection_description = param.String(default="", doc="Description of resource", precedence=3) + folder_name = param.String(default="example folder title", doc="Folder Title", precedence=4) + folder_description = param.String(default="", doc="Folder Description", precedence=5) + # Have the option to make the resource public. + dataset = param_util.DatasetListSelector(default=(), + filters={'status': 'downloaded'}, + precedence=6, + doc="dataset to publish to HydroShare") + + def __init__(self, provider, **kwargs): + super(ISEPPublisher, self).__init__(provider, **kwargs) + + @property + def gc(self): + return self.provider.get_gc() + + def publish(self, options=None): + try: + with self.gc.session() as session: + p = param.ParamOverrides(self, options) + session.verify = '/home/valoroso/DoD_Cert.pem' + self.gc.token = p.token + params = {'name': p.title, 'description': p.collection_description} + resource_information_dict = self.gc.createResource(path='collection', params=params) + folder_creation_dict = self.gc.createFolder(parentId=resource_information_dict['_id'], + name=p.folder_name, + description=p.folder_description, + parentType='collection') + for dataset in p.dataset: + dataset_metadata = get_metadata(dataset)[dataset] + fpath = dataset_metadata['file_path'] + self.gc.uploadFileToFolder(folder_creation_dict['_id'], fpath) + + except Exception as e: + raise e + + return resource_information_dict['_id'] + + +class GirderProvider(ProviderBase): + service_base_class = ISEPServiceBase + publishers_list = [ISEPPublisher] + display_name = 'Girder Services' + description = 'Services avaliable through the AFRL Girder Server.' + organization_name = 'U.S. Air Force Research Laboratory' + organization_abbr = 'AFRL' + + def get_gc(self): + connection_info = 'https://talonw4.afrl.hpc.mil/api/v1' + try: + gc = girder_client.GirderClient(apiUrl=connection_info) + except: + raise ValueError("Cannot connect to the ISEP Girder.") + + return gc + + def authenticate_me(self, **kwargs): + pass + diff --git a/quest/services/noaa_ncep.py b/quest/services/noaa_ncep.py new file mode 100644 index 00000000..284823f9 --- /dev/null +++ b/quest/services/noaa_ncep.py @@ -0,0 +1,80 @@ +from .base import ProviderBase, ServiceBase +from ncep_client import NCEP_Client +from shapely.geometry import box +import pandas as pd +import param + + +class NCEPServiceBase(ServiceBase): + + def get_features(self, **kwargs): + raise NotImplementedError() + + +class NCEP_GFS_Service(NCEPServiceBase): + ncep = NCEP_Client() + service_name = "ncep_gfs" + display_name = "NCEP GFS Service" + description = 'NCEP GFS is a noaa repository for global weather data.' + service_type = "norm-discrete" + geographical_areas = ['Worldwide'] + bounding_boxes = [ + [-180, -90, 180, 90], + ] + feature_id = "ncep" + # These are slowing down the api because it has to load the web page. + _possible_types = ncep.get_provider_types("Global Forecast System") + _possible_products = ncep.get_provider_products("Global Forecast System") + _possible_formats = ncep.get_formats_of_a_product("Global Forecast System") + _parameter_map = {} + + date = param.String(default=None, doc="YYYYMMDD", precedence=1) + res = param.String(default=None, doc="Froecast Resolution", precedence=2) + cycle = param.String(default=None, doc="Forecast Cycle Runtime", precedence=3) + start = param.String(default=None, doc="Forecast start time (f###)", precedence=4) + end = param.String(default=None, doc="Forecast end time (f###)", precedence=5) + format = param.ObjectSelector(default=None, doc="Paramerter", objects=_possible_formats, precedence=6) + type = param.ObjectSelector(default=None, doc="Parameter2", objects=_possible_types, precedence=7) + product = param.ObjectSelector(default=None, doc="Parameter3", objects=_possible_products, precedence=8) + + def get_features(self, **kwargs): + the_feature = {"service_id": "ncep", "display_name": "ncep", "geometry": box(-180, -90, 180, 90)} + feature = pd.DataFrame(the_feature, index=[0]) + return feature + + def download(self, feature, file_path, dataset, **params): + ncep = NCEP_Client() + p = param.ParamOverrides(self, params) + + if p.product == "GFS" or p.product == "GDAS": + raise ValueError("Please specify a specific product not GFS or GDAS") + + results = ncep.get_ncep_product_data(ncep_provider="Global Forecast System", product_type=p.type, + product_date=p.date, resolution=p.res, cycle_runtime=p.cycle, + forecast_start=p.start, forecast_end=p.end, product_format=p.format, + name_of_product=p.product) + print(results) + if len(results) > 0: + ncep.download_data(file_path, results) + metadata = { + 'metadata': results, + 'file_path': file_path, + 'file_format': 'weather-specific', + 'datatype': 'weather', + 'parameter': "ncep_parameter", + 'unit': "unkown", + } + else: + raise ValueError("There is no data found on those parameters.") + + return metadata + + +class NCEPProvider(ProviderBase): + service_base_class = NCEPServiceBase + publishers_list = [] + display_name = 'NCEP GFS Provider' + description = 'Services avaliable through the NOAA NCEP Server.' + organization_name = 'National Centers for Environmental Prediction' + organization_abbr = 'NCEP' + From 8c282f5eeadebefe6414f259bd64f8c70cc21010 Mon Sep 17 00:00:00 2001 From: Aaron Valoroso Date: Mon, 9 Apr 2018 14:58:34 -0500 Subject: [PATCH 02/12] Ckan provider plugin added. --- py2_conda_environment.yml | 3 +- py3_conda_environment.yml | 3 +- quest/services/ckan.py | 112 ++++++++++++++++++++++++++++++++++++++ setup.cfg | 1 + 4 files changed, 117 insertions(+), 2 deletions(-) create mode 100644 quest/services/ckan.py diff --git a/py2_conda_environment.yml b/py2_conda_environment.yml index 67ae4cb0..0abaade2 100644 --- a/py2_conda_environment.yml +++ b/py2_conda_environment.yml @@ -40,4 +40,5 @@ dependencies: - sphinx_rtd_theme - hs_restclient - jupyter - - girder-client \ No newline at end of file + - girder-client + - ckanapi \ No newline at end of file diff --git a/py3_conda_environment.yml b/py3_conda_environment.yml index 07a0f9f7..419c2d0c 100644 --- a/py3_conda_environment.yml +++ b/py3_conda_environment.yml @@ -41,4 +41,5 @@ dependencies: - sphinx_rtd_theme - hs_restclient - jupyter - - girder-client \ No newline at end of file + - girder-client + - ckanapi \ No newline at end of file diff --git a/quest/services/ckan.py b/quest/services/ckan.py new file mode 100644 index 00000000..467cf425 --- /dev/null +++ b/quest/services/ckan.py @@ -0,0 +1,112 @@ +from .base import ProviderBase, ServiceBase, PublishBase +from ..api.metadata import get_metadata +from shapely.geometry import box +from ckanapi import RemoteCKAN +from ..util import param_util +import pandas as pd +import param + + +class CKANServiceBase(ServiceBase): + service_name = "ckan_service" + display_name = "CKAN Service" + description = 'To grab the single service from the CKAN repository.' + service_type = "norm-discrete" + geographical_areas = ['Worldwide'] + bounding_boxes = [ + [-180, -90, 180, 90], + ] + feature_id = "ckan" + _parameter_map = {} + + api_key = param.String(default=None, doc="user api key", precedence=1) + address = param.String(default=None, doc="Website URL", precedence=2) + + def get_features(self, **kwargs): + the_feature = {"service_id": "ckan", "display_name": "ckan", "geometry": box(-180, -90, 180, 90)} + feature = pd.DataFrame(the_feature, index=[0]) + return feature + + def download(self, feature, file_path, dataset, **params): + pass + # p = param.ParamOverrides(self, params) + # demo = RemoteCKAN(address=p.address, apikey=p.api_key) + + # Have to figure out what the user wants to grab and pull. + + # results = ncep.get_ncep_product_data(ncep_provider="Global Forecast System", product_type=p.type, + # product_date=p.date, resolution=p.res, cycle_runtime=p.cycle, + # forecast_start=p.start, forecast_end=p.end, product_format=p.format, + # name_of_product=p.product) + # print(results) + # if len(results) > 0: + # ncep.download_data(file_path, results) + # metadata = { + # 'metadata': results, + # 'file_path': file_path, + # 'file_format': 'unknown', + # 'datatype': 'unknown', + # 'parameter': "unknown", + # 'unit': "unknown", + # } + # else: + # raise ValueError("There is no data found on those parameters.") + # + # return metadata + + +class CKANPublishBase(PublishBase): + publisher_name = "ckan_pub" + display_name = "CKAN Publisher" + description = "To be able to push to the CKAN repository.." + + api_key = param.String(default=None, doc="", precedence=1) + address = param.String(default=None, doc="", precedence=2) + name = param.String(default="", doc="", precedence=3) + title = param.String(default="", doc="", precedence=4) + author = param.String(defualt="", doc="", precedence=5) + author_email = param.String(default="", doc="", precedence=6) + availability = param.ObjectSelector(default=None, doc="", objects=[True,False], precedence=7) + description = param.String(default="", doc="", precedence=8) + type = param.String(default="", doc="Data type", precedence=9) + dataset = param_util.DatasetListSelector(default=(), filters={'status': 'downloaded'}, precedence=10, + doc="dataset to publish to ckan") + + # Do we even need this? + def __init__(self, provider, **kwargs): + super(CKANPublishBase, self).__init__(provider, **kwargs) + + def publish(self, options=None): + try: + p = param.ParamOverrides(self, options) + demo = RemoteCKAN(address=p.address, apikey=p.api_key) + params = {"name": p.name, + "title": p.title, + "private": p.availability, + "author": p.author, + "author_email": p.author_email, + "maintainer": p.author, + "license_id": "None", + "notes": p.description, + "type": p.type + } + + the_resource = demo.action.package_create(**params) + + for dataset in p.dataset: + dataset_metadata = get_metadata(dataset)[dataset] + fpath = dataset_metadata['file_path'] + # Need to push the data to the folder. + return the_resource['id'] + except Exception as e: + raise e + +class NCEPProvider(ProviderBase): + # Why is one in a list again and one is not? + service_base_class = CKANServiceBase + publishers_list = [CKANPublishBase] + display_name = 'CKAN Provider' + description = 'Services avaliable through the CKAN applications.' + organization_name = 'CKAN' + organization_abbr = 'CKAN' + diff --git a/setup.cfg b/setup.cfg index 97e56815..63c7164e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -37,6 +37,7 @@ quest.services = usgs-nlcd = quest.services.usgs_nlcd:UsgsNlcdProvider cuahsi-hydroshare = quest.services.cuahsi_hs:HSProvider kitware-girder = quest.services.kitware_girder:GirderProvider + ckan = quest.services.ckan:CKANProvider # nasa = quest.services.nasa:NasaProvider quest.filters = From 6823482121c7a0d6dbb8c03dbf900aa4e096ba49 Mon Sep 17 00:00:00 2001 From: AaronV77 Date: Wed, 11 Apr 2018 13:47:33 -0500 Subject: [PATCH 03/12] Ckan plugin updated. --- quest/services/ISEP_Girder.py | 90 ----------------- quest/services/ckan.py | 179 ++++++++++++++++++++++++---------- 2 files changed, 130 insertions(+), 139 deletions(-) delete mode 100644 quest/services/ISEP_Girder.py diff --git a/quest/services/ISEP_Girder.py b/quest/services/ISEP_Girder.py deleted file mode 100644 index e3cdfc47..00000000 --- a/quest/services/ISEP_Girder.py +++ /dev/null @@ -1,90 +0,0 @@ -from .base import ProviderBase, SingleFileServiceBase, PublishBase -from ..api.database import get_db, db_session -from shapely.geometry import Point, box -from ..api.metadata import get_metadata -from ..util import param_util -from getpass import getpass -import girder_client -import pandas as pd -import param -import os - - -class ISEPServiceBase(SingleFileServiceBase): - - token = param.String(default=None, doc="Token #", precedence=1) - - @property - def gc(self): - return self.provider.get_gc() - - def get_features(self, **kwargs): - raise NotImplementedError() - - -class ISEPPublisher(PublishBase): - publisher_name = "girder_pub" - display_name = "Girder Publisher" - description = "Girder is a repository for ERDC ERS." - - token = param.String(default=None, doc="Token #", precedence=1) - title = param.String(default="example title", doc="Title of resource", precedence=2) - collection_description = param.String(default="", doc="Description of resource", precedence=3) - folder_name = param.String(default="example folder title", doc="Folder Title", precedence=4) - folder_description = param.String(default="", doc="Folder Description", precedence=5) - # Have the option to make the resource public. - dataset = param_util.DatasetListSelector(default=(), - filters={'status': 'downloaded'}, - precedence=6, - doc="dataset to publish to HydroShare") - - def __init__(self, provider, **kwargs): - super(ISEPPublisher, self).__init__(provider, **kwargs) - - @property - def gc(self): - return self.provider.get_gc() - - def publish(self, options=None): - try: - with self.gc.session() as session: - p = param.ParamOverrides(self, options) - session.verify = '/home/valoroso/DoD_Cert.pem' - self.gc.token = p.token - params = {'name': p.title, 'description': p.collection_description} - resource_information_dict = self.gc.createResource(path='collection', params=params) - folder_creation_dict = self.gc.createFolder(parentId=resource_information_dict['_id'], - name=p.folder_name, - description=p.folder_description, - parentType='collection') - for dataset in p.dataset: - dataset_metadata = get_metadata(dataset)[dataset] - fpath = dataset_metadata['file_path'] - self.gc.uploadFileToFolder(folder_creation_dict['_id'], fpath) - - except Exception as e: - raise e - - return resource_information_dict['_id'] - - -class GirderProvider(ProviderBase): - service_base_class = ISEPServiceBase - publishers_list = [ISEPPublisher] - display_name = 'Girder Services' - description = 'Services avaliable through the AFRL Girder Server.' - organization_name = 'U.S. Air Force Research Laboratory' - organization_abbr = 'AFRL' - - def get_gc(self): - connection_info = 'https://talonw4.afrl.hpc.mil/api/v1' - try: - gc = girder_client.GirderClient(apiUrl=connection_info) - except: - raise ValueError("Cannot connect to the ISEP Girder.") - - return gc - - def authenticate_me(self, **kwargs): - pass - diff --git a/quest/services/ckan.py b/quest/services/ckan.py index 467cf425..ce37057f 100644 --- a/quest/services/ckan.py +++ b/quest/services/ckan.py @@ -1,58 +1,136 @@ from .base import ProviderBase, ServiceBase, PublishBase from ..api.metadata import get_metadata -from shapely.geometry import box +from shapely.geometry import Point, box from ckanapi import RemoteCKAN from ..util import param_util import pandas as pd +import datetime import param - +import os class CKANServiceBase(ServiceBase): - service_name = "ckan_service" - display_name = "CKAN Service" - description = 'To grab the single service from the CKAN repository.' - service_type = "norm-discrete" + + def get_features(self, **kwargs): + raise NotImplementedError() + + api_key = param.String(default=None, doc="user api key", precedence=1) + address = param.String(default=None, doc="Website URL", precedence=2) + +class CKANGeoService(CKANServiceBase): + service_name = "ckan_geo_service" + display_name = "CKAN Geo Service" + description = 'To grab geo specific packages from a CKAN repository.' + service_type = 'geo-discrete' + unmapped_parameters_available = True + geom_type = 'Point' + datatype = 'timeseries' geographical_areas = ['Worldwide'] bounding_boxes = [ [-180, -90, 180, 90], ] - feature_id = "ckan" _parameter_map = {} - api_key = param.String(default=None, doc="user api key", precedence=1) - address = param.String(default=None, doc="Website URL", precedence=2) + def get_features(self, **kwargs): + p = param.ParamOverrides(self, kwargs['options']) + try: + if p.api_key is None: + demo = RemoteCKAN(address=p.address) + else: + demo = RemoteCKAN(address=p.address, apikey=p.api_key) + except: + raise ValueError("Not able to connect to the CKAN instance.") + + """ + How this works is by grabbing the first 1000 rows of datasets from the ckan application. + The package_search() call will return a dictionary that holds the total number of datasets + that is being returned. By default the CKAN application only returns a max of 1000 rows at + a time. Once I grab the first amount of datasets, I check to see if the count is greater + than 1000, and if so I set a counter to 1001. I do this because if I got from 0 - 1000, + then I want to grab from 1001 to 2001 and so on. I save the results to a list, then proceed + to loop. I then grab the next group of datasets, increment my counter, and then add it to + the list of other datasets. + """ + results = demo.action.package_search(start=0, rows=1000) + list_of_datasets = results['results'] + if results['count'] > 1000: + counter = 1001 + total_datasets = results['count'] + while counter < total_datasets: + results = demo.action.package_search(start=counter, rows=1000) + counter += 1001 + list_of_datasets.append(results['results']) + + features = pd.DataFrame(list_of_datasets) + features['geometry'] = features['extras'].apply(self.parse_coverages) + features['service_id'] = features['resource_id'].apply(str) + features.index = features['service_id'] + features.rename(columns={ + 'title': 'display_name', + }, inplace=True) + + return features + + def parse_coverages(self, resource_row): + geometry = None + for coverage in resource_row: + coverage_type = coverage.get('type') + if coverage_type == 'point': + geometry = Point(float(coverage.get('value').get('north')), float(coverage.get('value').get('east'))) + elif coverage_type == 'box': + geometry = box(float(coverage.get('value').get('westlimit')), + float(coverage.get('value').get('southlimit')), + float(coverage.get('value').get('eastlimit')), + float(coverage.get('value').get('northlimit'))) + + return geometry + + +class CKANNormService(CKANServiceBase): + service_name = "ckan_norm" + display_name = "CKAN Normal Service" + description = 'To grab non-geo specific packages from a CKAN repository.' + service_type = "norm-discrete" + unmapped_parameters_available = True + _parameter_map = {} def get_features(self, **kwargs): - the_feature = {"service_id": "ckan", "display_name": "ckan", "geometry": box(-180, -90, 180, 90)} - feature = pd.DataFrame(the_feature, index=[0]) - return feature - - def download(self, feature, file_path, dataset, **params): - pass - # p = param.ParamOverrides(self, params) - # demo = RemoteCKAN(address=p.address, apikey=p.api_key) - - # Have to figure out what the user wants to grab and pull. - - # results = ncep.get_ncep_product_data(ncep_provider="Global Forecast System", product_type=p.type, - # product_date=p.date, resolution=p.res, cycle_runtime=p.cycle, - # forecast_start=p.start, forecast_end=p.end, product_format=p.format, - # name_of_product=p.product) - # print(results) - # if len(results) > 0: - # ncep.download_data(file_path, results) - # metadata = { - # 'metadata': results, - # 'file_path': file_path, - # 'file_format': 'unknown', - # 'datatype': 'unknown', - # 'parameter': "unknown", - # 'unit': "unknown", - # } - # else: - # raise ValueError("There is no data found on those parameters.") - # - # return metadata + p = param.ParamOverrides(self, kwargs['options']) + try: + if p.api_key is None: + demo = RemoteCKAN(address=p.address) + else: + demo = RemoteCKAN(address=p.address, apikey=p.api_key) + except: + raise ValueError("Not able to connect to the CKAN instance.") + + """ + How this works is by grabbing the first 1000 rows of datasets from the ckan application. + The package_search() call will return a dictionary that holds the total number of datasets + that is being returned. By default the CKAN application only returns a max of 1000 rows at + a time. Once I grab the first amount of datasets, I check to see if the count is greater + than 1000, and if so I set a counter to 1001. I do this because if I got from 0 - 1000, + then I want to grab from 1001 to 2001 and so on. I save the results to a list, then proceed + to loop. I then grab the next group of datasets, increment my counter, and then add it to + the list of other datasets. + """ + results = demo.action.package_search(start=0, rows=1000) + list_of_datasets = results['results'] + if results['count'] > 1000: + counter = 1001 + total_datasets = results['count'] + while counter < total_datasets: + results = demo.action.package_search(start=counter, rows=1000) + counter += 1001 + list_of_datasets.append(results['results']) + + features = pd.DataFrame(list_of_datasets) + features['service_id'] = features['id'].apply(str) + features.index = features['service_id'] + features.rename(columns={ + 'title': 'display_name', + }, inplace=True) + + return features class CKANPublishBase(PublishBase): @@ -72,10 +150,6 @@ class CKANPublishBase(PublishBase): dataset = param_util.DatasetListSelector(default=(), filters={'status': 'downloaded'}, precedence=10, doc="dataset to publish to ckan") - # Do we even need this? - def __init__(self, provider, **kwargs): - super(CKANPublishBase, self).__init__(provider, **kwargs) - def publish(self, options=None): try: p = param.ParamOverrides(self, options) @@ -91,22 +165,29 @@ def publish(self, options=None): "type": p.type } - the_resource = demo.action.package_create(**params) + the_package = demo.action.package_create(**params) for dataset in p.dataset: dataset_metadata = get_metadata(dataset)[dataset] fpath = dataset_metadata['file_path'] - # Need to push the data to the folder. - return the_resource['id'] + filename, file_extension = os.path.splitext(fpath) + now = datetime.datetime.now() + params2 = {"package_id": the_package['id'], + "format": file_extension, + "name": filename, + "size": os.path.getsize(fpath), + "created": str(now)[:10] + } + demo.action.resource_create(**params2) + return the_package['id'] except Exception as e: raise e + class NCEPProvider(ProviderBase): - # Why is one in a list again and one is not? service_base_class = CKANServiceBase - publishers_list = [CKANPublishBase] + publishers_base_class = CKANPublishBase display_name = 'CKAN Provider' description = 'Services avaliable through the CKAN applications.' organization_name = 'CKAN' organization_abbr = 'CKAN' - From 5527b3c786d7cd2613426f5318a1b4b99b751299 Mon Sep 17 00:00:00 2001 From: AaronV77 Date: Thu, 12 Apr 2018 10:55:32 -0500 Subject: [PATCH 04/12] Updated the ckan provider plugin. The upload is not working correctly and there is not good working demo site inorder to test effectively. --- quest/services/base/provider_base.py | 3 +- quest/services/ckan.py | 187 +++++++++++++-------------- 2 files changed, 94 insertions(+), 96 deletions(-) diff --git a/quest/services/base/provider_base.py b/quest/services/base/provider_base.py index 7bc08b45..eba7bba2 100644 --- a/quest/services/base/provider_base.py +++ b/quest/services/base/provider_base.py @@ -60,7 +60,6 @@ def publishers(self): if self._publishers is None: publishers_list = self.publisher_base_class.__subclasses__() or [self.publisher_base_class] self._publishers = {p.publisher_name: p(name=p.publisher_name, provider=self) for p in publishers_list} - return self._publishers @property @@ -299,4 +298,4 @@ def unauthenticate_me(self): if p is None: raise ValueError('Provider does not exist in the database.') else: - p.delete() \ No newline at end of file + p.delete() diff --git a/quest/services/ckan.py b/quest/services/ckan.py index ce37057f..91aecc47 100644 --- a/quest/services/ckan.py +++ b/quest/services/ckan.py @@ -1,20 +1,49 @@ from .base import ProviderBase, ServiceBase, PublishBase from ..api.metadata import get_metadata -from shapely.geometry import Point, box +from ..api.database import get_db, db_session +from shapely.geometry import shape from ckanapi import RemoteCKAN from ..util import param_util +from getpass import getpass import pandas as pd import datetime +import geojson import param import os + class CKANServiceBase(ServiceBase): + @property + def demo(self): + return self.provider.get_demo() + def get_features(self, **kwargs): raise NotImplementedError() - api_key = param.String(default=None, doc="user api key", precedence=1) - address = param.String(default=None, doc="Website URL", precedence=2) + def get_data(self, **kwargs): + """ + How this works is by grabbing the first 1000 rows of datasets from the ckan application. + The package_search() call will return a dictionary that holds the total number of datasets + that is being returned. By default the CKAN application only returns a max of 1000 rows at + a time. Once I grab the first amount of datasets, I check to see if the count is greater + than 1000, and if so I set a counter to 1001. I do this because if I got from 0 - 1000, + then I want to grab from 1001 to 2001 and so on. I save the results to a list, then proceed + to loop. I then grab the next group of datasets, increment my counter, and then add it to + the list of other datasets. + """ + results = self.demo.action.package_search(**kwargs, start=0, rows=1000) + + list_of_datasets = results['results'] + if results['count'] > 1000: + counter = 1000 + total_datasets = results['count'] + while counter < total_datasets: + results = self.demo.action.package_search(**kwargs, start=counter, rows=1000) + counter += 1000 + list_of_datasets.extend(results['results']) + + return list_of_datasets class CKANGeoService(CKANServiceBase): service_name = "ckan_geo_service" @@ -31,38 +60,11 @@ class CKANGeoService(CKANServiceBase): _parameter_map = {} def get_features(self, **kwargs): - p = param.ParamOverrides(self, kwargs['options']) - try: - if p.api_key is None: - demo = RemoteCKAN(address=p.address) - else: - demo = RemoteCKAN(address=p.address, apikey=p.api_key) - except: - raise ValueError("Not able to connect to the CKAN instance.") - - """ - How this works is by grabbing the first 1000 rows of datasets from the ckan application. - The package_search() call will return a dictionary that holds the total number of datasets - that is being returned. By default the CKAN application only returns a max of 1000 rows at - a time. Once I grab the first amount of datasets, I check to see if the count is greater - than 1000, and if so I set a counter to 1001. I do this because if I got from 0 - 1000, - then I want to grab from 1001 to 2001 and so on. I save the results to a list, then proceed - to loop. I then grab the next group of datasets, increment my counter, and then add it to - the list of other datasets. - """ - results = demo.action.package_search(start=0, rows=1000) - list_of_datasets = results['results'] - if results['count'] > 1000: - counter = 1001 - total_datasets = results['count'] - while counter < total_datasets: - results = demo.action.package_search(start=counter, rows=1000) - counter += 1001 - list_of_datasets.append(results['results']) - + list_of_datasets = self.get_data(extras={"ext_bbox": "-180,-90,180,90"}) features = pd.DataFrame(list_of_datasets) - features['geometry'] = features['extras'].apply(self.parse_coverages) - features['service_id'] = features['resource_id'].apply(str) + features['extras'] = features['extras'].apply(lambda row: {i['key']: i['value'] for i in row}) + features['geometry'] = features['extras'].apply(lambda r: shape(geojson.loads(r['spatial']))) + features['service_id'] = features['id'].apply(str) features.index = features['service_id'] features.rename(columns={ 'title': 'display_name', @@ -70,20 +72,6 @@ def get_features(self, **kwargs): return features - def parse_coverages(self, resource_row): - geometry = None - for coverage in resource_row: - coverage_type = coverage.get('type') - if coverage_type == 'point': - geometry = Point(float(coverage.get('value').get('north')), float(coverage.get('value').get('east'))) - elif coverage_type == 'box': - geometry = box(float(coverage.get('value').get('westlimit')), - float(coverage.get('value').get('southlimit')), - float(coverage.get('value').get('eastlimit')), - float(coverage.get('value').get('northlimit'))) - - return geometry - class CKANNormService(CKANServiceBase): service_name = "ckan_norm" @@ -94,35 +82,7 @@ class CKANNormService(CKANServiceBase): _parameter_map = {} def get_features(self, **kwargs): - p = param.ParamOverrides(self, kwargs['options']) - try: - if p.api_key is None: - demo = RemoteCKAN(address=p.address) - else: - demo = RemoteCKAN(address=p.address, apikey=p.api_key) - except: - raise ValueError("Not able to connect to the CKAN instance.") - - """ - How this works is by grabbing the first 1000 rows of datasets from the ckan application. - The package_search() call will return a dictionary that holds the total number of datasets - that is being returned. By default the CKAN application only returns a max of 1000 rows at - a time. Once I grab the first amount of datasets, I check to see if the count is greater - than 1000, and if so I set a counter to 1001. I do this because if I got from 0 - 1000, - then I want to grab from 1001 to 2001 and so on. I save the results to a list, then proceed - to loop. I then grab the next group of datasets, increment my counter, and then add it to - the list of other datasets. - """ - results = demo.action.package_search(start=0, rows=1000) - list_of_datasets = results['results'] - if results['count'] > 1000: - counter = 1001 - total_datasets = results['count'] - while counter < total_datasets: - results = demo.action.package_search(start=counter, rows=1000) - counter += 1001 - list_of_datasets.append(results['results']) - + list_of_datasets = self.get_data() features = pd.DataFrame(list_of_datasets) features['service_id'] = features['id'].apply(str) features.index = features['service_id'] @@ -138,23 +98,23 @@ class CKANPublishBase(PublishBase): display_name = "CKAN Publisher" description = "To be able to push to the CKAN repository.." - api_key = param.String(default=None, doc="", precedence=1) - address = param.String(default=None, doc="", precedence=2) - name = param.String(default="", doc="", precedence=3) - title = param.String(default="", doc="", precedence=4) - author = param.String(defualt="", doc="", precedence=5) - author_email = param.String(default="", doc="", precedence=6) - availability = param.ObjectSelector(default=None, doc="", objects=[True,False], precedence=7) - description = param.String(default="", doc="", precedence=8) - type = param.String(default="", doc="Data type", precedence=9) - dataset = param_util.DatasetListSelector(default=(), filters={'status': 'downloaded'}, precedence=10, + title = param.String(default="", doc="", precedence=1) + dataset_name = param.String(default="", doc="", precedence=2) + author = param.String(default="", doc="", precedence=3) + author_email = param.String(default="", doc="", precedence=4) + availability = param.ObjectSelector(default=None, doc="", objects=[True,False], precedence=5) + description = param.String(default="", doc="", precedence=6) + type = param.String(default="", doc="Data type", precedence=7) + dataset = param_util.DatasetListSelector(default=(), filters={'status': 'downloaded'}, precedence=8, doc="dataset to publish to ckan") + @property + def demo(self): + return self.provider.get_demo() def publish(self, options=None): try: p = param.ParamOverrides(self, options) - demo = RemoteCKAN(address=p.address, apikey=p.api_key) - params = {"name": p.name, + params = {"name": p.dataset_name, "title": p.title, "private": p.availability, "author": p.author, @@ -165,7 +125,7 @@ def publish(self, options=None): "type": p.type } - the_package = demo.action.package_create(**params) + the_package = self.demo.action.package_create(**params) for dataset in p.dataset: dataset_metadata = get_metadata(dataset)[dataset] @@ -176,18 +136,57 @@ def publish(self, options=None): "format": file_extension, "name": filename, "size": os.path.getsize(fpath), - "created": str(now)[:10] + "created": str(now)[:10], + "upload": fpath } - demo.action.resource_create(**params2) + self.demo.action.resource_create(**params2) return the_package['id'] except Exception as e: raise e -class NCEPProvider(ProviderBase): +class CKANProvider(ProviderBase): service_base_class = CKANServiceBase - publishers_base_class = CKANPublishBase + publisher_base_class = CKANPublishBase display_name = 'CKAN Provider' description = 'Services avaliable through the CKAN applications.' organization_name = 'CKAN' organization_abbr = 'CKAN' + hostname = 'https://demo.ckan.org' + + def authenticate_me(self, **kwargs): + + api_key = getpass("Enter CKAN API key: ") + + db = get_db() + with db_session: + p = db.Providers.select().filter(provider=self.name).first() + + provider_metadata = { + 'provider': self.name, + 'username': 'placeholder', + 'password': api_key, + } + + if p is None: + db.Providers(**provider_metadata) + else: + p.set(**provider_metadata) + + return True + + def get_demo(self): + api_key = None + try: + api_key = self.credentials['password'] + except ValueError: + pass # ignore error if api_key has not been stored + + demo = RemoteCKAN(address=self.hostname, apikey=api_key) + demo.action.package_search(rows=1) + return demo + + def get_ckan_status(self): + demo = self.get_demo() + status = demo.action.status_show() + # check if status contains spatial_query_extension. From c179f4d97ec4ba04b5e22e4a96a301cbf1c0cde0 Mon Sep 17 00:00:00 2001 From: AaronV77 Date: Thu, 12 Apr 2018 13:00:48 -0500 Subject: [PATCH 05/12] noaa_ncep stuff was in the commits and needed to delete it. --- quest/services/noaa_ncep.py | 80 ------------------------------------- 1 file changed, 80 deletions(-) delete mode 100644 quest/services/noaa_ncep.py diff --git a/quest/services/noaa_ncep.py b/quest/services/noaa_ncep.py deleted file mode 100644 index 284823f9..00000000 --- a/quest/services/noaa_ncep.py +++ /dev/null @@ -1,80 +0,0 @@ -from .base import ProviderBase, ServiceBase -from ncep_client import NCEP_Client -from shapely.geometry import box -import pandas as pd -import param - - -class NCEPServiceBase(ServiceBase): - - def get_features(self, **kwargs): - raise NotImplementedError() - - -class NCEP_GFS_Service(NCEPServiceBase): - ncep = NCEP_Client() - service_name = "ncep_gfs" - display_name = "NCEP GFS Service" - description = 'NCEP GFS is a noaa repository for global weather data.' - service_type = "norm-discrete" - geographical_areas = ['Worldwide'] - bounding_boxes = [ - [-180, -90, 180, 90], - ] - feature_id = "ncep" - # These are slowing down the api because it has to load the web page. - _possible_types = ncep.get_provider_types("Global Forecast System") - _possible_products = ncep.get_provider_products("Global Forecast System") - _possible_formats = ncep.get_formats_of_a_product("Global Forecast System") - _parameter_map = {} - - date = param.String(default=None, doc="YYYYMMDD", precedence=1) - res = param.String(default=None, doc="Froecast Resolution", precedence=2) - cycle = param.String(default=None, doc="Forecast Cycle Runtime", precedence=3) - start = param.String(default=None, doc="Forecast start time (f###)", precedence=4) - end = param.String(default=None, doc="Forecast end time (f###)", precedence=5) - format = param.ObjectSelector(default=None, doc="Paramerter", objects=_possible_formats, precedence=6) - type = param.ObjectSelector(default=None, doc="Parameter2", objects=_possible_types, precedence=7) - product = param.ObjectSelector(default=None, doc="Parameter3", objects=_possible_products, precedence=8) - - def get_features(self, **kwargs): - the_feature = {"service_id": "ncep", "display_name": "ncep", "geometry": box(-180, -90, 180, 90)} - feature = pd.DataFrame(the_feature, index=[0]) - return feature - - def download(self, feature, file_path, dataset, **params): - ncep = NCEP_Client() - p = param.ParamOverrides(self, params) - - if p.product == "GFS" or p.product == "GDAS": - raise ValueError("Please specify a specific product not GFS or GDAS") - - results = ncep.get_ncep_product_data(ncep_provider="Global Forecast System", product_type=p.type, - product_date=p.date, resolution=p.res, cycle_runtime=p.cycle, - forecast_start=p.start, forecast_end=p.end, product_format=p.format, - name_of_product=p.product) - print(results) - if len(results) > 0: - ncep.download_data(file_path, results) - metadata = { - 'metadata': results, - 'file_path': file_path, - 'file_format': 'weather-specific', - 'datatype': 'weather', - 'parameter': "ncep_parameter", - 'unit': "unkown", - } - else: - raise ValueError("There is no data found on those parameters.") - - return metadata - - -class NCEPProvider(ProviderBase): - service_base_class = NCEPServiceBase - publishers_list = [] - display_name = 'NCEP GFS Provider' - description = 'Services avaliable through the NOAA NCEP Server.' - organization_name = 'National Centers for Environmental Prediction' - organization_abbr = 'NCEP' - From e0cd550ade31d5f0af8ea2251a8a8a85e63aa63a Mon Sep 17 00:00:00 2001 From: Aaron Valoroso Date: Mon, 16 Apr 2018 11:45:42 -0500 Subject: [PATCH 06/12] Minor updates (#38) * Added the cuahsi coverage fix. The change was not getting all the resources and just getting all resources that had a coverage from the world. * Made kwargs the standard over using options, params, and kwargs. * Updated the load_providers to only update if update-cache is True or if the global is None. --- quest/api/datasets.py | 4 ++-- quest/api/services.py | 2 ++ quest/services/base/provider_base.py | 4 ++-- quest/services/base/publish_base.py | 2 +- quest/services/base/service_base.py | 4 ++-- quest/services/cuahsi_hs.py | 20 +++++++++----------- quest/services/kitware_girder.py | 4 ++-- quest/services/noaa_coastwatch.py | 4 ++-- quest/services/noaa_ncdc.py | 4 ++-- quest/services/template_service.py | 2 +- quest/services/user_provider.py | 2 +- quest/services/usgs_nwis.py | 4 ++-- quest/util/misc.py | 28 +++++++++++++++++----------- 13 files changed, 45 insertions(+), 39 deletions(-) diff --git a/quest/api/datasets.py b/quest/api/datasets.py index 2619fd2b..0b7e47a3 100644 --- a/quest/api/datasets.py +++ b/quest/api/datasets.py @@ -60,10 +60,10 @@ def download(feature, file_path, dataset=None, **kwargs): @add_async -def publish(publisher_uri, options=None): +def publish(publisher_uri, **kwargs): provider, publisher, feature = util.parse_service_uri(publisher_uri) driver = util.load_providers()[provider] - data = driver.publish(publisher=publisher, options=options) + data = driver.publish(publisher=publisher, **kwargs) return data @add_async diff --git a/quest/api/services.py b/quest/api/services.py index 2c225f3c..1dd2a195 100644 --- a/quest/api/services.py +++ b/quest/api/services.py @@ -127,6 +127,7 @@ def add_provider(uri): util.update_settings({'USER_SERVICES': user_services}) util.save_settings() msg = 'service added' + util.load_providers(update_cache=True) else: msg = 'service already present' else: @@ -159,6 +160,7 @@ def delete_provider(uri): util.update_settings({'USER_SERVICES': user_services}) util.save_settings() msg = 'service removed' + util.load_providers(update_cache=True) else: msg = 'service not found' diff --git a/quest/services/base/provider_base.py b/quest/services/base/provider_base.py index 7bc08b45..7adcb4d3 100644 --- a/quest/services/base/provider_base.py +++ b/quest/services/base/provider_base.py @@ -283,8 +283,8 @@ def download_options(self, service, fmt): """ return self.services[service].download_options(fmt) - def publish(self, publisher, options): - return self.publishers[publisher].publish(options) + def publish(self, publisher, **kwargs): + return self.publishers[publisher].publish(**kwargs) def publish_options(self, publisher, fmt): return self.publishers[publisher].publish_options(fmt) diff --git a/quest/services/base/publish_base.py b/quest/services/base/publish_base.py index 0e6a80cb..a19ca1fb 100644 --- a/quest/services/base/publish_base.py +++ b/quest/services/base/publish_base.py @@ -38,5 +38,5 @@ def publish_options(self, fmt): return schema - def publish(self, options): + def publish(self, **kwargs): raise NotImplementedError() diff --git a/quest/services/base/service_base.py b/quest/services/base/service_base.py index 5dae3ace..27195b4d 100644 --- a/quest/services/base/service_base.py +++ b/quest/services/base/service_base.py @@ -92,7 +92,7 @@ def download_options(self, fmt): return schema - def download(self, feature, file_path, dataset, **params): + def download(self, feature, file_path, dataset, **kwargs): raise NotImplementedError() def get_features(self, **kwargs): @@ -138,7 +138,7 @@ class SingleFileServiceBase(ServiceBase): """Base file for datasets that are a single file download eg elevation raster etc """ - def download(self, feature, file_path, dataset, **params): + def download(self, feature, file_path, dataset, **kwargs): feature_id = util.construct_service_uri(self.provider.name, self.name, feature) feature = self.provider.get_features(self.name).loc[feature_id] reserved = feature.get('reserved') diff --git a/quest/services/cuahsi_hs.py b/quest/services/cuahsi_hs.py index 2557440d..100caf9e 100644 --- a/quest/services/cuahsi_hs.py +++ b/quest/services/cuahsi_hs.py @@ -9,6 +9,7 @@ import param import os + class HSServiceBase(SingleFileServiceBase): @property @@ -35,7 +36,7 @@ class HSGeoService(HSServiceBase): def get_features(self, **kwargs): - results = list(self.hs.resources()) + results = list(self.hs.resources(coverage_type="box", north="90", south="-90", east="180", west="-180")) if len(results) == 0: raise ValueError('No resource available from HydroShare.') @@ -126,21 +127,18 @@ class HSPublisher(PublishBase): 'Time Series': 'TimeSeriesResource' } - title = param.String(default="example title", doc="Title of resource", precedence=2) - abstract = param.String(default="example abstract", precedence=3, - doc="An description of the resource to be added to HydroShare.") - keywords = param.List(default=[], precedence=4, doc="list of keyword strings to describe the resource") - dataset = param_util.DatasetListSelector(default=(), filters={'status': 'downloaded'}, precedence=5, - doc="dataset to publish to HydroShare") - resource_type = param.ObjectSelector(doc='parameter', precedence=1, objects=sorted(_resource_type_map.keys())) + resource_type = param.ObjectSelector(default=None, doc="", precedence=1, objects=sorted(_resource_type_map.keys())) + title = param.String(default="", doc="", precedence=2) + abstract = param.String(default="", doc="", precedence=3) + keywords = param.List(default=[], doc="", precedence=4) + dataset = param_util.DatasetListSelector(default=(), filters={'status': 'downloaded'}, doc="", precedence=5) @property def hs(self): return self.provider.get_hs() - def publish(self, options=None): - - p = param.ParamOverrides(self, options) + def publish(self, **kwargs): + p = param.ParamOverrides(self, kwargs) valid_file_paths = [] valid_extensions = [] diff --git a/quest/services/kitware_girder.py b/quest/services/kitware_girder.py index 9a4bf34e..ba7bd674 100644 --- a/quest/services/kitware_girder.py +++ b/quest/services/kitware_girder.py @@ -34,9 +34,9 @@ class GirderPublisher(PublishBase): def gc(self): return self.provider.get_gc() - def publish(self, options=None): + def publish(self, **kwargs): try: - p = param.ParamOverrides(self, options) + p = param.ParamOverrides(self, kwargs) params = {'name': p.title, 'description': p.collection_description} resource_information_dict = self.gc.createResource(path='collection', params=params) folder_creation_dict = self.gc.createFolder(parentId=resource_information_dict['_id'], diff --git a/quest/services/noaa_coastwatch.py b/quest/services/noaa_coastwatch.py index bb79ba92..6e082007 100644 --- a/quest/services/noaa_coastwatch.py +++ b/quest/services/noaa_coastwatch.py @@ -65,8 +65,8 @@ def parameter_map(self, invert=False): return pmap - def download(self, feature, file_path, dataset, **params): - p = param.ParamOverrides(self, params) + def download(self, feature, file_path, dataset, **kwargs): + p = param.ParamOverrides(self, kwargs) self.parameter = p.parameter self.end = pd.to_datetime(p.end) self.start = pd.to_datetime(p.start) diff --git a/quest/services/noaa_ncdc.py b/quest/services/noaa_ncdc.py index 5bd26238..6bbe1d5f 100644 --- a/quest/services/noaa_ncdc.py +++ b/quest/services/noaa_ncdc.py @@ -74,8 +74,8 @@ def parameter_map(self, invert=False): return pmap - def download(self, feature, file_path, dataset, **params): - p = param.ParamOverrides(self, params) + def download(self, feature, file_path, dataset, **kwargs): + p = param.ParamOverrides(self, kwargs) self.parameter = p.parameter self.end = pd.to_datetime(p.end) self.start = pd.to_datetime(p.start) diff --git a/quest/services/template_service.py b/quest/services/template_service.py index 4d0cd8cf..b4e4ba6f 100644 --- a/quest/services/template_service.py +++ b/quest/services/template_service.py @@ -26,7 +26,7 @@ class ExampleServiceBase(ServiceBase): smtk_template = None _parameter_map = dict() - def download(self, feature, file_path, dataset, **params): + def download(self, feature, file_path, dataset, **kwwargs): metadata = {} # get metadata from service data = None # data structure containing downloaded data diff --git a/quest/services/user_provider.py b/quest/services/user_provider.py index c6647215..4e525127 100644 --- a/quest/services/user_provider.py +++ b/quest/services/user_provider.py @@ -51,7 +51,7 @@ def instance(cls, service_name, service_data, provider, uri, is_remote): return self - def download(self, feature, file_path, dataset, **params): + def download(self, feature, file_path, dataset, **kwargs): if self.datasets_mapping is not None: fnames = self.datasets_mapping if isinstance(dict, self.datasets_mapping): diff --git a/quest/services/usgs_nwis.py b/quest/services/usgs_nwis.py index 08233a39..cc81b72b 100644 --- a/quest/services/usgs_nwis.py +++ b/quest/services/usgs_nwis.py @@ -18,8 +18,8 @@ class NwisServiceBase(TimePeriodServiceBase): period = param.String(default='P365D', precedence=4, doc='time period (e.g. P365D = 365 days or P4W = 4 weeks)') smtk_template = 'start_end_or_period.sbt' - def download(self, feature, file_path, dataset, **params): - p = param.ParamOverrides(self, params) + def download(self, feature, file_path, dataset, **kwargs): + p = param.ParamOverrides(self, kwargs) parameter = p.parameter start = p.start diff --git a/quest/util/misc.py b/quest/util/misc.py index fac84738..5f03d405 100644 --- a/quest/util/misc.py +++ b/quest/util/misc.py @@ -19,6 +19,7 @@ from uuid import uuid4, UUID import quest +the_providers = None def generate_cache(update=False): """Downloads features for all services and caches results. @@ -238,22 +239,27 @@ def load_drivers(namespace, names=None): return {name: driver.DriverManager(namespace, name, invoke_on_load='True') for name in names} -def load_providers(): - settings = get_settings() +def load_providers(update_cache=False): + global the_providers - # add web services + settings = get_settings() web_services = list_drivers('services') web_services.remove('user') - providers = {name: driver.DriverManager('quest.services', name, invoke_on_load=True, invoke_kwds={'name': name}).driver for name in web_services} + if update_cache or the_providers is None: + providers = {name: driver.DriverManager('quest.services', name, invoke_on_load=True, invoke_kwds={'name': name}).driver for name in web_services} - if len(settings.get('USER_SERVICES', [])) > 0: - for uri in settings.get('USER_SERVICES', []): - try: - drv = driver.DriverManager('quest.services', 'user', invoke_on_load=True, invoke_kwds={'uri': uri}).driver - providers['user-' + drv.name] = drv - except Exception as e: - logger.error('Failed to load local service from %s, with exception: %s' % (uri, str(e))) + if len(settings.get('USER_SERVICES', [])) > 0: + for uri in settings.get('USER_SERVICES', []): + try: + drv = driver.DriverManager('quest.services', 'user', invoke_on_load=True, invoke_kwds={'uri': uri}).driver + providers['user-' + drv.name] = drv + except Exception as e: + logger.error('Failed to load local service from %s, with exception: %s' % (uri, str(e))) + + the_providers = providers + else: + providers = the_providers return providers From 7f211118c99306773b2ed52844ce555902604874 Mon Sep 17 00:00:00 2001 From: AaronV77 Date: Wed, 4 Apr 2018 15:29:14 -0500 Subject: [PATCH 07/12] Dataset display name fix (More readable). --- quest/api/services.py | 2 - quest/services/ISEP_Girder.py | 90 +++++++++++++++++++++++++++++++++++ quest/services/noaa_ncep.py | 80 +++++++++++++++++++++++++++++++ 3 files changed, 170 insertions(+), 2 deletions(-) create mode 100644 quest/services/ISEP_Girder.py create mode 100644 quest/services/noaa_ncep.py diff --git a/quest/api/services.py b/quest/api/services.py index 1dd2a195..0fd2b26e 100644 --- a/quest/api/services.py +++ b/quest/api/services.py @@ -78,8 +78,6 @@ def get_publishers(expand=None, publisher_type=None): Returns: providers (list or dict,Default=list): list of all available providers - - """ providers = util.load_providers() publishers = {} diff --git a/quest/services/ISEP_Girder.py b/quest/services/ISEP_Girder.py new file mode 100644 index 00000000..e3cdfc47 --- /dev/null +++ b/quest/services/ISEP_Girder.py @@ -0,0 +1,90 @@ +from .base import ProviderBase, SingleFileServiceBase, PublishBase +from ..api.database import get_db, db_session +from shapely.geometry import Point, box +from ..api.metadata import get_metadata +from ..util import param_util +from getpass import getpass +import girder_client +import pandas as pd +import param +import os + + +class ISEPServiceBase(SingleFileServiceBase): + + token = param.String(default=None, doc="Token #", precedence=1) + + @property + def gc(self): + return self.provider.get_gc() + + def get_features(self, **kwargs): + raise NotImplementedError() + + +class ISEPPublisher(PublishBase): + publisher_name = "girder_pub" + display_name = "Girder Publisher" + description = "Girder is a repository for ERDC ERS." + + token = param.String(default=None, doc="Token #", precedence=1) + title = param.String(default="example title", doc="Title of resource", precedence=2) + collection_description = param.String(default="", doc="Description of resource", precedence=3) + folder_name = param.String(default="example folder title", doc="Folder Title", precedence=4) + folder_description = param.String(default="", doc="Folder Description", precedence=5) + # Have the option to make the resource public. + dataset = param_util.DatasetListSelector(default=(), + filters={'status': 'downloaded'}, + precedence=6, + doc="dataset to publish to HydroShare") + + def __init__(self, provider, **kwargs): + super(ISEPPublisher, self).__init__(provider, **kwargs) + + @property + def gc(self): + return self.provider.get_gc() + + def publish(self, options=None): + try: + with self.gc.session() as session: + p = param.ParamOverrides(self, options) + session.verify = '/home/valoroso/DoD_Cert.pem' + self.gc.token = p.token + params = {'name': p.title, 'description': p.collection_description} + resource_information_dict = self.gc.createResource(path='collection', params=params) + folder_creation_dict = self.gc.createFolder(parentId=resource_information_dict['_id'], + name=p.folder_name, + description=p.folder_description, + parentType='collection') + for dataset in p.dataset: + dataset_metadata = get_metadata(dataset)[dataset] + fpath = dataset_metadata['file_path'] + self.gc.uploadFileToFolder(folder_creation_dict['_id'], fpath) + + except Exception as e: + raise e + + return resource_information_dict['_id'] + + +class GirderProvider(ProviderBase): + service_base_class = ISEPServiceBase + publishers_list = [ISEPPublisher] + display_name = 'Girder Services' + description = 'Services avaliable through the AFRL Girder Server.' + organization_name = 'U.S. Air Force Research Laboratory' + organization_abbr = 'AFRL' + + def get_gc(self): + connection_info = 'https://talonw4.afrl.hpc.mil/api/v1' + try: + gc = girder_client.GirderClient(apiUrl=connection_info) + except: + raise ValueError("Cannot connect to the ISEP Girder.") + + return gc + + def authenticate_me(self, **kwargs): + pass + diff --git a/quest/services/noaa_ncep.py b/quest/services/noaa_ncep.py new file mode 100644 index 00000000..284823f9 --- /dev/null +++ b/quest/services/noaa_ncep.py @@ -0,0 +1,80 @@ +from .base import ProviderBase, ServiceBase +from ncep_client import NCEP_Client +from shapely.geometry import box +import pandas as pd +import param + + +class NCEPServiceBase(ServiceBase): + + def get_features(self, **kwargs): + raise NotImplementedError() + + +class NCEP_GFS_Service(NCEPServiceBase): + ncep = NCEP_Client() + service_name = "ncep_gfs" + display_name = "NCEP GFS Service" + description = 'NCEP GFS is a noaa repository for global weather data.' + service_type = "norm-discrete" + geographical_areas = ['Worldwide'] + bounding_boxes = [ + [-180, -90, 180, 90], + ] + feature_id = "ncep" + # These are slowing down the api because it has to load the web page. + _possible_types = ncep.get_provider_types("Global Forecast System") + _possible_products = ncep.get_provider_products("Global Forecast System") + _possible_formats = ncep.get_formats_of_a_product("Global Forecast System") + _parameter_map = {} + + date = param.String(default=None, doc="YYYYMMDD", precedence=1) + res = param.String(default=None, doc="Froecast Resolution", precedence=2) + cycle = param.String(default=None, doc="Forecast Cycle Runtime", precedence=3) + start = param.String(default=None, doc="Forecast start time (f###)", precedence=4) + end = param.String(default=None, doc="Forecast end time (f###)", precedence=5) + format = param.ObjectSelector(default=None, doc="Paramerter", objects=_possible_formats, precedence=6) + type = param.ObjectSelector(default=None, doc="Parameter2", objects=_possible_types, precedence=7) + product = param.ObjectSelector(default=None, doc="Parameter3", objects=_possible_products, precedence=8) + + def get_features(self, **kwargs): + the_feature = {"service_id": "ncep", "display_name": "ncep", "geometry": box(-180, -90, 180, 90)} + feature = pd.DataFrame(the_feature, index=[0]) + return feature + + def download(self, feature, file_path, dataset, **params): + ncep = NCEP_Client() + p = param.ParamOverrides(self, params) + + if p.product == "GFS" or p.product == "GDAS": + raise ValueError("Please specify a specific product not GFS or GDAS") + + results = ncep.get_ncep_product_data(ncep_provider="Global Forecast System", product_type=p.type, + product_date=p.date, resolution=p.res, cycle_runtime=p.cycle, + forecast_start=p.start, forecast_end=p.end, product_format=p.format, + name_of_product=p.product) + print(results) + if len(results) > 0: + ncep.download_data(file_path, results) + metadata = { + 'metadata': results, + 'file_path': file_path, + 'file_format': 'weather-specific', + 'datatype': 'weather', + 'parameter': "ncep_parameter", + 'unit': "unkown", + } + else: + raise ValueError("There is no data found on those parameters.") + + return metadata + + +class NCEPProvider(ProviderBase): + service_base_class = NCEPServiceBase + publishers_list = [] + display_name = 'NCEP GFS Provider' + description = 'Services avaliable through the NOAA NCEP Server.' + organization_name = 'National Centers for Environmental Prediction' + organization_abbr = 'NCEP' + From 2e2124cfaec5e572bc567d3cdb9049c3cc4a63fb Mon Sep 17 00:00:00 2001 From: Aaron Valoroso Date: Mon, 9 Apr 2018 14:58:34 -0500 Subject: [PATCH 08/12] Ckan provider plugin added. --- py2_conda_environment.yml | 3 +- py3_conda_environment.yml | 3 +- quest/services/ckan.py | 112 ++++++++++++++++++++++++++++++++++++++ setup.cfg | 1 + 4 files changed, 117 insertions(+), 2 deletions(-) create mode 100644 quest/services/ckan.py diff --git a/py2_conda_environment.yml b/py2_conda_environment.yml index 67ae4cb0..0abaade2 100644 --- a/py2_conda_environment.yml +++ b/py2_conda_environment.yml @@ -40,4 +40,5 @@ dependencies: - sphinx_rtd_theme - hs_restclient - jupyter - - girder-client \ No newline at end of file + - girder-client + - ckanapi \ No newline at end of file diff --git a/py3_conda_environment.yml b/py3_conda_environment.yml index 07a0f9f7..419c2d0c 100644 --- a/py3_conda_environment.yml +++ b/py3_conda_environment.yml @@ -41,4 +41,5 @@ dependencies: - sphinx_rtd_theme - hs_restclient - jupyter - - girder-client \ No newline at end of file + - girder-client + - ckanapi \ No newline at end of file diff --git a/quest/services/ckan.py b/quest/services/ckan.py new file mode 100644 index 00000000..467cf425 --- /dev/null +++ b/quest/services/ckan.py @@ -0,0 +1,112 @@ +from .base import ProviderBase, ServiceBase, PublishBase +from ..api.metadata import get_metadata +from shapely.geometry import box +from ckanapi import RemoteCKAN +from ..util import param_util +import pandas as pd +import param + + +class CKANServiceBase(ServiceBase): + service_name = "ckan_service" + display_name = "CKAN Service" + description = 'To grab the single service from the CKAN repository.' + service_type = "norm-discrete" + geographical_areas = ['Worldwide'] + bounding_boxes = [ + [-180, -90, 180, 90], + ] + feature_id = "ckan" + _parameter_map = {} + + api_key = param.String(default=None, doc="user api key", precedence=1) + address = param.String(default=None, doc="Website URL", precedence=2) + + def get_features(self, **kwargs): + the_feature = {"service_id": "ckan", "display_name": "ckan", "geometry": box(-180, -90, 180, 90)} + feature = pd.DataFrame(the_feature, index=[0]) + return feature + + def download(self, feature, file_path, dataset, **params): + pass + # p = param.ParamOverrides(self, params) + # demo = RemoteCKAN(address=p.address, apikey=p.api_key) + + # Have to figure out what the user wants to grab and pull. + + # results = ncep.get_ncep_product_data(ncep_provider="Global Forecast System", product_type=p.type, + # product_date=p.date, resolution=p.res, cycle_runtime=p.cycle, + # forecast_start=p.start, forecast_end=p.end, product_format=p.format, + # name_of_product=p.product) + # print(results) + # if len(results) > 0: + # ncep.download_data(file_path, results) + # metadata = { + # 'metadata': results, + # 'file_path': file_path, + # 'file_format': 'unknown', + # 'datatype': 'unknown', + # 'parameter': "unknown", + # 'unit': "unknown", + # } + # else: + # raise ValueError("There is no data found on those parameters.") + # + # return metadata + + +class CKANPublishBase(PublishBase): + publisher_name = "ckan_pub" + display_name = "CKAN Publisher" + description = "To be able to push to the CKAN repository.." + + api_key = param.String(default=None, doc="", precedence=1) + address = param.String(default=None, doc="", precedence=2) + name = param.String(default="", doc="", precedence=3) + title = param.String(default="", doc="", precedence=4) + author = param.String(defualt="", doc="", precedence=5) + author_email = param.String(default="", doc="", precedence=6) + availability = param.ObjectSelector(default=None, doc="", objects=[True,False], precedence=7) + description = param.String(default="", doc="", precedence=8) + type = param.String(default="", doc="Data type", precedence=9) + dataset = param_util.DatasetListSelector(default=(), filters={'status': 'downloaded'}, precedence=10, + doc="dataset to publish to ckan") + + # Do we even need this? + def __init__(self, provider, **kwargs): + super(CKANPublishBase, self).__init__(provider, **kwargs) + + def publish(self, options=None): + try: + p = param.ParamOverrides(self, options) + demo = RemoteCKAN(address=p.address, apikey=p.api_key) + params = {"name": p.name, + "title": p.title, + "private": p.availability, + "author": p.author, + "author_email": p.author_email, + "maintainer": p.author, + "license_id": "None", + "notes": p.description, + "type": p.type + } + + the_resource = demo.action.package_create(**params) + + for dataset in p.dataset: + dataset_metadata = get_metadata(dataset)[dataset] + fpath = dataset_metadata['file_path'] + # Need to push the data to the folder. + return the_resource['id'] + except Exception as e: + raise e + +class NCEPProvider(ProviderBase): + # Why is one in a list again and one is not? + service_base_class = CKANServiceBase + publishers_list = [CKANPublishBase] + display_name = 'CKAN Provider' + description = 'Services avaliable through the CKAN applications.' + organization_name = 'CKAN' + organization_abbr = 'CKAN' + diff --git a/setup.cfg b/setup.cfg index 97e56815..63c7164e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -37,6 +37,7 @@ quest.services = usgs-nlcd = quest.services.usgs_nlcd:UsgsNlcdProvider cuahsi-hydroshare = quest.services.cuahsi_hs:HSProvider kitware-girder = quest.services.kitware_girder:GirderProvider + ckan = quest.services.ckan:CKANProvider # nasa = quest.services.nasa:NasaProvider quest.filters = From 4c8a65f3a3fa34ef3c6aa2eb141151ee28074520 Mon Sep 17 00:00:00 2001 From: AaronV77 Date: Wed, 11 Apr 2018 13:47:33 -0500 Subject: [PATCH 09/12] Ckan plugin updated. --- quest/services/ISEP_Girder.py | 90 ----------------- quest/services/ckan.py | 179 ++++++++++++++++++++++++---------- 2 files changed, 130 insertions(+), 139 deletions(-) delete mode 100644 quest/services/ISEP_Girder.py diff --git a/quest/services/ISEP_Girder.py b/quest/services/ISEP_Girder.py deleted file mode 100644 index e3cdfc47..00000000 --- a/quest/services/ISEP_Girder.py +++ /dev/null @@ -1,90 +0,0 @@ -from .base import ProviderBase, SingleFileServiceBase, PublishBase -from ..api.database import get_db, db_session -from shapely.geometry import Point, box -from ..api.metadata import get_metadata -from ..util import param_util -from getpass import getpass -import girder_client -import pandas as pd -import param -import os - - -class ISEPServiceBase(SingleFileServiceBase): - - token = param.String(default=None, doc="Token #", precedence=1) - - @property - def gc(self): - return self.provider.get_gc() - - def get_features(self, **kwargs): - raise NotImplementedError() - - -class ISEPPublisher(PublishBase): - publisher_name = "girder_pub" - display_name = "Girder Publisher" - description = "Girder is a repository for ERDC ERS." - - token = param.String(default=None, doc="Token #", precedence=1) - title = param.String(default="example title", doc="Title of resource", precedence=2) - collection_description = param.String(default="", doc="Description of resource", precedence=3) - folder_name = param.String(default="example folder title", doc="Folder Title", precedence=4) - folder_description = param.String(default="", doc="Folder Description", precedence=5) - # Have the option to make the resource public. - dataset = param_util.DatasetListSelector(default=(), - filters={'status': 'downloaded'}, - precedence=6, - doc="dataset to publish to HydroShare") - - def __init__(self, provider, **kwargs): - super(ISEPPublisher, self).__init__(provider, **kwargs) - - @property - def gc(self): - return self.provider.get_gc() - - def publish(self, options=None): - try: - with self.gc.session() as session: - p = param.ParamOverrides(self, options) - session.verify = '/home/valoroso/DoD_Cert.pem' - self.gc.token = p.token - params = {'name': p.title, 'description': p.collection_description} - resource_information_dict = self.gc.createResource(path='collection', params=params) - folder_creation_dict = self.gc.createFolder(parentId=resource_information_dict['_id'], - name=p.folder_name, - description=p.folder_description, - parentType='collection') - for dataset in p.dataset: - dataset_metadata = get_metadata(dataset)[dataset] - fpath = dataset_metadata['file_path'] - self.gc.uploadFileToFolder(folder_creation_dict['_id'], fpath) - - except Exception as e: - raise e - - return resource_information_dict['_id'] - - -class GirderProvider(ProviderBase): - service_base_class = ISEPServiceBase - publishers_list = [ISEPPublisher] - display_name = 'Girder Services' - description = 'Services avaliable through the AFRL Girder Server.' - organization_name = 'U.S. Air Force Research Laboratory' - organization_abbr = 'AFRL' - - def get_gc(self): - connection_info = 'https://talonw4.afrl.hpc.mil/api/v1' - try: - gc = girder_client.GirderClient(apiUrl=connection_info) - except: - raise ValueError("Cannot connect to the ISEP Girder.") - - return gc - - def authenticate_me(self, **kwargs): - pass - diff --git a/quest/services/ckan.py b/quest/services/ckan.py index 467cf425..ce37057f 100644 --- a/quest/services/ckan.py +++ b/quest/services/ckan.py @@ -1,58 +1,136 @@ from .base import ProviderBase, ServiceBase, PublishBase from ..api.metadata import get_metadata -from shapely.geometry import box +from shapely.geometry import Point, box from ckanapi import RemoteCKAN from ..util import param_util import pandas as pd +import datetime import param - +import os class CKANServiceBase(ServiceBase): - service_name = "ckan_service" - display_name = "CKAN Service" - description = 'To grab the single service from the CKAN repository.' - service_type = "norm-discrete" + + def get_features(self, **kwargs): + raise NotImplementedError() + + api_key = param.String(default=None, doc="user api key", precedence=1) + address = param.String(default=None, doc="Website URL", precedence=2) + +class CKANGeoService(CKANServiceBase): + service_name = "ckan_geo_service" + display_name = "CKAN Geo Service" + description = 'To grab geo specific packages from a CKAN repository.' + service_type = 'geo-discrete' + unmapped_parameters_available = True + geom_type = 'Point' + datatype = 'timeseries' geographical_areas = ['Worldwide'] bounding_boxes = [ [-180, -90, 180, 90], ] - feature_id = "ckan" _parameter_map = {} - api_key = param.String(default=None, doc="user api key", precedence=1) - address = param.String(default=None, doc="Website URL", precedence=2) + def get_features(self, **kwargs): + p = param.ParamOverrides(self, kwargs['options']) + try: + if p.api_key is None: + demo = RemoteCKAN(address=p.address) + else: + demo = RemoteCKAN(address=p.address, apikey=p.api_key) + except: + raise ValueError("Not able to connect to the CKAN instance.") + + """ + How this works is by grabbing the first 1000 rows of datasets from the ckan application. + The package_search() call will return a dictionary that holds the total number of datasets + that is being returned. By default the CKAN application only returns a max of 1000 rows at + a time. Once I grab the first amount of datasets, I check to see if the count is greater + than 1000, and if so I set a counter to 1001. I do this because if I got from 0 - 1000, + then I want to grab from 1001 to 2001 and so on. I save the results to a list, then proceed + to loop. I then grab the next group of datasets, increment my counter, and then add it to + the list of other datasets. + """ + results = demo.action.package_search(start=0, rows=1000) + list_of_datasets = results['results'] + if results['count'] > 1000: + counter = 1001 + total_datasets = results['count'] + while counter < total_datasets: + results = demo.action.package_search(start=counter, rows=1000) + counter += 1001 + list_of_datasets.append(results['results']) + + features = pd.DataFrame(list_of_datasets) + features['geometry'] = features['extras'].apply(self.parse_coverages) + features['service_id'] = features['resource_id'].apply(str) + features.index = features['service_id'] + features.rename(columns={ + 'title': 'display_name', + }, inplace=True) + + return features + + def parse_coverages(self, resource_row): + geometry = None + for coverage in resource_row: + coverage_type = coverage.get('type') + if coverage_type == 'point': + geometry = Point(float(coverage.get('value').get('north')), float(coverage.get('value').get('east'))) + elif coverage_type == 'box': + geometry = box(float(coverage.get('value').get('westlimit')), + float(coverage.get('value').get('southlimit')), + float(coverage.get('value').get('eastlimit')), + float(coverage.get('value').get('northlimit'))) + + return geometry + + +class CKANNormService(CKANServiceBase): + service_name = "ckan_norm" + display_name = "CKAN Normal Service" + description = 'To grab non-geo specific packages from a CKAN repository.' + service_type = "norm-discrete" + unmapped_parameters_available = True + _parameter_map = {} def get_features(self, **kwargs): - the_feature = {"service_id": "ckan", "display_name": "ckan", "geometry": box(-180, -90, 180, 90)} - feature = pd.DataFrame(the_feature, index=[0]) - return feature - - def download(self, feature, file_path, dataset, **params): - pass - # p = param.ParamOverrides(self, params) - # demo = RemoteCKAN(address=p.address, apikey=p.api_key) - - # Have to figure out what the user wants to grab and pull. - - # results = ncep.get_ncep_product_data(ncep_provider="Global Forecast System", product_type=p.type, - # product_date=p.date, resolution=p.res, cycle_runtime=p.cycle, - # forecast_start=p.start, forecast_end=p.end, product_format=p.format, - # name_of_product=p.product) - # print(results) - # if len(results) > 0: - # ncep.download_data(file_path, results) - # metadata = { - # 'metadata': results, - # 'file_path': file_path, - # 'file_format': 'unknown', - # 'datatype': 'unknown', - # 'parameter': "unknown", - # 'unit': "unknown", - # } - # else: - # raise ValueError("There is no data found on those parameters.") - # - # return metadata + p = param.ParamOverrides(self, kwargs['options']) + try: + if p.api_key is None: + demo = RemoteCKAN(address=p.address) + else: + demo = RemoteCKAN(address=p.address, apikey=p.api_key) + except: + raise ValueError("Not able to connect to the CKAN instance.") + + """ + How this works is by grabbing the first 1000 rows of datasets from the ckan application. + The package_search() call will return a dictionary that holds the total number of datasets + that is being returned. By default the CKAN application only returns a max of 1000 rows at + a time. Once I grab the first amount of datasets, I check to see if the count is greater + than 1000, and if so I set a counter to 1001. I do this because if I got from 0 - 1000, + then I want to grab from 1001 to 2001 and so on. I save the results to a list, then proceed + to loop. I then grab the next group of datasets, increment my counter, and then add it to + the list of other datasets. + """ + results = demo.action.package_search(start=0, rows=1000) + list_of_datasets = results['results'] + if results['count'] > 1000: + counter = 1001 + total_datasets = results['count'] + while counter < total_datasets: + results = demo.action.package_search(start=counter, rows=1000) + counter += 1001 + list_of_datasets.append(results['results']) + + features = pd.DataFrame(list_of_datasets) + features['service_id'] = features['id'].apply(str) + features.index = features['service_id'] + features.rename(columns={ + 'title': 'display_name', + }, inplace=True) + + return features class CKANPublishBase(PublishBase): @@ -72,10 +150,6 @@ class CKANPublishBase(PublishBase): dataset = param_util.DatasetListSelector(default=(), filters={'status': 'downloaded'}, precedence=10, doc="dataset to publish to ckan") - # Do we even need this? - def __init__(self, provider, **kwargs): - super(CKANPublishBase, self).__init__(provider, **kwargs) - def publish(self, options=None): try: p = param.ParamOverrides(self, options) @@ -91,22 +165,29 @@ def publish(self, options=None): "type": p.type } - the_resource = demo.action.package_create(**params) + the_package = demo.action.package_create(**params) for dataset in p.dataset: dataset_metadata = get_metadata(dataset)[dataset] fpath = dataset_metadata['file_path'] - # Need to push the data to the folder. - return the_resource['id'] + filename, file_extension = os.path.splitext(fpath) + now = datetime.datetime.now() + params2 = {"package_id": the_package['id'], + "format": file_extension, + "name": filename, + "size": os.path.getsize(fpath), + "created": str(now)[:10] + } + demo.action.resource_create(**params2) + return the_package['id'] except Exception as e: raise e + class NCEPProvider(ProviderBase): - # Why is one in a list again and one is not? service_base_class = CKANServiceBase - publishers_list = [CKANPublishBase] + publishers_base_class = CKANPublishBase display_name = 'CKAN Provider' description = 'Services avaliable through the CKAN applications.' organization_name = 'CKAN' organization_abbr = 'CKAN' - From d8dd4d24b9a1b6748fae71b30c3b1fac29cd4cb4 Mon Sep 17 00:00:00 2001 From: AaronV77 Date: Thu, 12 Apr 2018 10:55:32 -0500 Subject: [PATCH 10/12] Updated the ckan provider plugin. The upload is not working correctly and there is not good working demo site inorder to test effectively. --- quest/services/base/provider_base.py | 3 +- quest/services/ckan.py | 187 +++++++++++++-------------- 2 files changed, 94 insertions(+), 96 deletions(-) diff --git a/quest/services/base/provider_base.py b/quest/services/base/provider_base.py index 7adcb4d3..30ffc21d 100644 --- a/quest/services/base/provider_base.py +++ b/quest/services/base/provider_base.py @@ -60,7 +60,6 @@ def publishers(self): if self._publishers is None: publishers_list = self.publisher_base_class.__subclasses__() or [self.publisher_base_class] self._publishers = {p.publisher_name: p(name=p.publisher_name, provider=self) for p in publishers_list} - return self._publishers @property @@ -299,4 +298,4 @@ def unauthenticate_me(self): if p is None: raise ValueError('Provider does not exist in the database.') else: - p.delete() \ No newline at end of file + p.delete() diff --git a/quest/services/ckan.py b/quest/services/ckan.py index ce37057f..91aecc47 100644 --- a/quest/services/ckan.py +++ b/quest/services/ckan.py @@ -1,20 +1,49 @@ from .base import ProviderBase, ServiceBase, PublishBase from ..api.metadata import get_metadata -from shapely.geometry import Point, box +from ..api.database import get_db, db_session +from shapely.geometry import shape from ckanapi import RemoteCKAN from ..util import param_util +from getpass import getpass import pandas as pd import datetime +import geojson import param import os + class CKANServiceBase(ServiceBase): + @property + def demo(self): + return self.provider.get_demo() + def get_features(self, **kwargs): raise NotImplementedError() - api_key = param.String(default=None, doc="user api key", precedence=1) - address = param.String(default=None, doc="Website URL", precedence=2) + def get_data(self, **kwargs): + """ + How this works is by grabbing the first 1000 rows of datasets from the ckan application. + The package_search() call will return a dictionary that holds the total number of datasets + that is being returned. By default the CKAN application only returns a max of 1000 rows at + a time. Once I grab the first amount of datasets, I check to see if the count is greater + than 1000, and if so I set a counter to 1001. I do this because if I got from 0 - 1000, + then I want to grab from 1001 to 2001 and so on. I save the results to a list, then proceed + to loop. I then grab the next group of datasets, increment my counter, and then add it to + the list of other datasets. + """ + results = self.demo.action.package_search(**kwargs, start=0, rows=1000) + + list_of_datasets = results['results'] + if results['count'] > 1000: + counter = 1000 + total_datasets = results['count'] + while counter < total_datasets: + results = self.demo.action.package_search(**kwargs, start=counter, rows=1000) + counter += 1000 + list_of_datasets.extend(results['results']) + + return list_of_datasets class CKANGeoService(CKANServiceBase): service_name = "ckan_geo_service" @@ -31,38 +60,11 @@ class CKANGeoService(CKANServiceBase): _parameter_map = {} def get_features(self, **kwargs): - p = param.ParamOverrides(self, kwargs['options']) - try: - if p.api_key is None: - demo = RemoteCKAN(address=p.address) - else: - demo = RemoteCKAN(address=p.address, apikey=p.api_key) - except: - raise ValueError("Not able to connect to the CKAN instance.") - - """ - How this works is by grabbing the first 1000 rows of datasets from the ckan application. - The package_search() call will return a dictionary that holds the total number of datasets - that is being returned. By default the CKAN application only returns a max of 1000 rows at - a time. Once I grab the first amount of datasets, I check to see if the count is greater - than 1000, and if so I set a counter to 1001. I do this because if I got from 0 - 1000, - then I want to grab from 1001 to 2001 and so on. I save the results to a list, then proceed - to loop. I then grab the next group of datasets, increment my counter, and then add it to - the list of other datasets. - """ - results = demo.action.package_search(start=0, rows=1000) - list_of_datasets = results['results'] - if results['count'] > 1000: - counter = 1001 - total_datasets = results['count'] - while counter < total_datasets: - results = demo.action.package_search(start=counter, rows=1000) - counter += 1001 - list_of_datasets.append(results['results']) - + list_of_datasets = self.get_data(extras={"ext_bbox": "-180,-90,180,90"}) features = pd.DataFrame(list_of_datasets) - features['geometry'] = features['extras'].apply(self.parse_coverages) - features['service_id'] = features['resource_id'].apply(str) + features['extras'] = features['extras'].apply(lambda row: {i['key']: i['value'] for i in row}) + features['geometry'] = features['extras'].apply(lambda r: shape(geojson.loads(r['spatial']))) + features['service_id'] = features['id'].apply(str) features.index = features['service_id'] features.rename(columns={ 'title': 'display_name', @@ -70,20 +72,6 @@ def get_features(self, **kwargs): return features - def parse_coverages(self, resource_row): - geometry = None - for coverage in resource_row: - coverage_type = coverage.get('type') - if coverage_type == 'point': - geometry = Point(float(coverage.get('value').get('north')), float(coverage.get('value').get('east'))) - elif coverage_type == 'box': - geometry = box(float(coverage.get('value').get('westlimit')), - float(coverage.get('value').get('southlimit')), - float(coverage.get('value').get('eastlimit')), - float(coverage.get('value').get('northlimit'))) - - return geometry - class CKANNormService(CKANServiceBase): service_name = "ckan_norm" @@ -94,35 +82,7 @@ class CKANNormService(CKANServiceBase): _parameter_map = {} def get_features(self, **kwargs): - p = param.ParamOverrides(self, kwargs['options']) - try: - if p.api_key is None: - demo = RemoteCKAN(address=p.address) - else: - demo = RemoteCKAN(address=p.address, apikey=p.api_key) - except: - raise ValueError("Not able to connect to the CKAN instance.") - - """ - How this works is by grabbing the first 1000 rows of datasets from the ckan application. - The package_search() call will return a dictionary that holds the total number of datasets - that is being returned. By default the CKAN application only returns a max of 1000 rows at - a time. Once I grab the first amount of datasets, I check to see if the count is greater - than 1000, and if so I set a counter to 1001. I do this because if I got from 0 - 1000, - then I want to grab from 1001 to 2001 and so on. I save the results to a list, then proceed - to loop. I then grab the next group of datasets, increment my counter, and then add it to - the list of other datasets. - """ - results = demo.action.package_search(start=0, rows=1000) - list_of_datasets = results['results'] - if results['count'] > 1000: - counter = 1001 - total_datasets = results['count'] - while counter < total_datasets: - results = demo.action.package_search(start=counter, rows=1000) - counter += 1001 - list_of_datasets.append(results['results']) - + list_of_datasets = self.get_data() features = pd.DataFrame(list_of_datasets) features['service_id'] = features['id'].apply(str) features.index = features['service_id'] @@ -138,23 +98,23 @@ class CKANPublishBase(PublishBase): display_name = "CKAN Publisher" description = "To be able to push to the CKAN repository.." - api_key = param.String(default=None, doc="", precedence=1) - address = param.String(default=None, doc="", precedence=2) - name = param.String(default="", doc="", precedence=3) - title = param.String(default="", doc="", precedence=4) - author = param.String(defualt="", doc="", precedence=5) - author_email = param.String(default="", doc="", precedence=6) - availability = param.ObjectSelector(default=None, doc="", objects=[True,False], precedence=7) - description = param.String(default="", doc="", precedence=8) - type = param.String(default="", doc="Data type", precedence=9) - dataset = param_util.DatasetListSelector(default=(), filters={'status': 'downloaded'}, precedence=10, + title = param.String(default="", doc="", precedence=1) + dataset_name = param.String(default="", doc="", precedence=2) + author = param.String(default="", doc="", precedence=3) + author_email = param.String(default="", doc="", precedence=4) + availability = param.ObjectSelector(default=None, doc="", objects=[True,False], precedence=5) + description = param.String(default="", doc="", precedence=6) + type = param.String(default="", doc="Data type", precedence=7) + dataset = param_util.DatasetListSelector(default=(), filters={'status': 'downloaded'}, precedence=8, doc="dataset to publish to ckan") + @property + def demo(self): + return self.provider.get_demo() def publish(self, options=None): try: p = param.ParamOverrides(self, options) - demo = RemoteCKAN(address=p.address, apikey=p.api_key) - params = {"name": p.name, + params = {"name": p.dataset_name, "title": p.title, "private": p.availability, "author": p.author, @@ -165,7 +125,7 @@ def publish(self, options=None): "type": p.type } - the_package = demo.action.package_create(**params) + the_package = self.demo.action.package_create(**params) for dataset in p.dataset: dataset_metadata = get_metadata(dataset)[dataset] @@ -176,18 +136,57 @@ def publish(self, options=None): "format": file_extension, "name": filename, "size": os.path.getsize(fpath), - "created": str(now)[:10] + "created": str(now)[:10], + "upload": fpath } - demo.action.resource_create(**params2) + self.demo.action.resource_create(**params2) return the_package['id'] except Exception as e: raise e -class NCEPProvider(ProviderBase): +class CKANProvider(ProviderBase): service_base_class = CKANServiceBase - publishers_base_class = CKANPublishBase + publisher_base_class = CKANPublishBase display_name = 'CKAN Provider' description = 'Services avaliable through the CKAN applications.' organization_name = 'CKAN' organization_abbr = 'CKAN' + hostname = 'https://demo.ckan.org' + + def authenticate_me(self, **kwargs): + + api_key = getpass("Enter CKAN API key: ") + + db = get_db() + with db_session: + p = db.Providers.select().filter(provider=self.name).first() + + provider_metadata = { + 'provider': self.name, + 'username': 'placeholder', + 'password': api_key, + } + + if p is None: + db.Providers(**provider_metadata) + else: + p.set(**provider_metadata) + + return True + + def get_demo(self): + api_key = None + try: + api_key = self.credentials['password'] + except ValueError: + pass # ignore error if api_key has not been stored + + demo = RemoteCKAN(address=self.hostname, apikey=api_key) + demo.action.package_search(rows=1) + return demo + + def get_ckan_status(self): + demo = self.get_demo() + status = demo.action.status_show() + # check if status contains spatial_query_extension. From 28600faae67cd1812cb09fa11683f13ca5988a2d Mon Sep 17 00:00:00 2001 From: AaronV77 Date: Thu, 12 Apr 2018 13:00:48 -0500 Subject: [PATCH 11/12] noaa_ncep stuff was in the commits and needed to delete it. --- quest/services/noaa_ncep.py | 80 ------------------------------------- 1 file changed, 80 deletions(-) delete mode 100644 quest/services/noaa_ncep.py diff --git a/quest/services/noaa_ncep.py b/quest/services/noaa_ncep.py deleted file mode 100644 index 284823f9..00000000 --- a/quest/services/noaa_ncep.py +++ /dev/null @@ -1,80 +0,0 @@ -from .base import ProviderBase, ServiceBase -from ncep_client import NCEP_Client -from shapely.geometry import box -import pandas as pd -import param - - -class NCEPServiceBase(ServiceBase): - - def get_features(self, **kwargs): - raise NotImplementedError() - - -class NCEP_GFS_Service(NCEPServiceBase): - ncep = NCEP_Client() - service_name = "ncep_gfs" - display_name = "NCEP GFS Service" - description = 'NCEP GFS is a noaa repository for global weather data.' - service_type = "norm-discrete" - geographical_areas = ['Worldwide'] - bounding_boxes = [ - [-180, -90, 180, 90], - ] - feature_id = "ncep" - # These are slowing down the api because it has to load the web page. - _possible_types = ncep.get_provider_types("Global Forecast System") - _possible_products = ncep.get_provider_products("Global Forecast System") - _possible_formats = ncep.get_formats_of_a_product("Global Forecast System") - _parameter_map = {} - - date = param.String(default=None, doc="YYYYMMDD", precedence=1) - res = param.String(default=None, doc="Froecast Resolution", precedence=2) - cycle = param.String(default=None, doc="Forecast Cycle Runtime", precedence=3) - start = param.String(default=None, doc="Forecast start time (f###)", precedence=4) - end = param.String(default=None, doc="Forecast end time (f###)", precedence=5) - format = param.ObjectSelector(default=None, doc="Paramerter", objects=_possible_formats, precedence=6) - type = param.ObjectSelector(default=None, doc="Parameter2", objects=_possible_types, precedence=7) - product = param.ObjectSelector(default=None, doc="Parameter3", objects=_possible_products, precedence=8) - - def get_features(self, **kwargs): - the_feature = {"service_id": "ncep", "display_name": "ncep", "geometry": box(-180, -90, 180, 90)} - feature = pd.DataFrame(the_feature, index=[0]) - return feature - - def download(self, feature, file_path, dataset, **params): - ncep = NCEP_Client() - p = param.ParamOverrides(self, params) - - if p.product == "GFS" or p.product == "GDAS": - raise ValueError("Please specify a specific product not GFS or GDAS") - - results = ncep.get_ncep_product_data(ncep_provider="Global Forecast System", product_type=p.type, - product_date=p.date, resolution=p.res, cycle_runtime=p.cycle, - forecast_start=p.start, forecast_end=p.end, product_format=p.format, - name_of_product=p.product) - print(results) - if len(results) > 0: - ncep.download_data(file_path, results) - metadata = { - 'metadata': results, - 'file_path': file_path, - 'file_format': 'weather-specific', - 'datatype': 'weather', - 'parameter': "ncep_parameter", - 'unit': "unkown", - } - else: - raise ValueError("There is no data found on those parameters.") - - return metadata - - -class NCEPProvider(ProviderBase): - service_base_class = NCEPServiceBase - publishers_list = [] - display_name = 'NCEP GFS Provider' - description = 'Services avaliable through the NOAA NCEP Server.' - organization_name = 'National Centers for Environmental Prediction' - organization_abbr = 'NCEP' - From c6801c6ffe1f915ac9a7d92cdb29387dc2dbf4eb Mon Sep 17 00:00:00 2001 From: AaronV77 Date: Mon, 16 Apr 2018 13:20:43 -0500 Subject: [PATCH 12/12] I never added the ckan download options to the tests in data.py. This fixes the tests. --- test/data.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/data.py b/test/data.py index 01bc2e35..9766c6dd 100644 --- a/test/data.py +++ b/test/data.py @@ -203,6 +203,8 @@ 'title': 'NWIS Instantaneous Values Service Download Options'}, 'svc://cuahsi-hydroshare:hs_geo': {}, 'svc://cuahsi-hydroshare:hs_norm': {}, + 'svc://ckan:ckan_norm': {}, + 'svc://ckan:ckan_geo_service': {} }