diff --git a/py2_conda_environment.yml b/py2_conda_environment.yml index 67ae4cb0..0abaade2 100644 --- a/py2_conda_environment.yml +++ b/py2_conda_environment.yml @@ -40,4 +40,5 @@ dependencies: - sphinx_rtd_theme - hs_restclient - jupyter - - girder-client \ No newline at end of file + - girder-client + - ckanapi \ No newline at end of file diff --git a/py3_conda_environment.yml b/py3_conda_environment.yml index 07a0f9f7..419c2d0c 100644 --- a/py3_conda_environment.yml +++ b/py3_conda_environment.yml @@ -41,4 +41,5 @@ dependencies: - sphinx_rtd_theme - hs_restclient - jupyter - - girder-client \ No newline at end of file + - girder-client + - ckanapi \ No newline at end of file diff --git a/quest/api/datasets.py b/quest/api/datasets.py index 2619fd2b..0b7e47a3 100644 --- a/quest/api/datasets.py +++ b/quest/api/datasets.py @@ -60,10 +60,10 @@ def download(feature, file_path, dataset=None, **kwargs): @add_async -def publish(publisher_uri, options=None): +def publish(publisher_uri, **kwargs): provider, publisher, feature = util.parse_service_uri(publisher_uri) driver = util.load_providers()[provider] - data = driver.publish(publisher=publisher, options=options) + data = driver.publish(publisher=publisher, **kwargs) return data @add_async diff --git a/quest/api/services.py b/quest/api/services.py index 2c225f3c..0fd2b26e 100644 --- a/quest/api/services.py +++ b/quest/api/services.py @@ -78,8 +78,6 @@ def get_publishers(expand=None, publisher_type=None): Returns: providers (list or dict,Default=list): list of all available providers - - """ providers = util.load_providers() publishers = {} @@ -127,6 +125,7 @@ def add_provider(uri): util.update_settings({'USER_SERVICES': user_services}) util.save_settings() msg = 'service added' + util.load_providers(update_cache=True) else: msg = 'service already present' else: @@ -159,6 +158,7 @@ def delete_provider(uri): util.update_settings({'USER_SERVICES': user_services}) util.save_settings() msg = 'service removed' + util.load_providers(update_cache=True) else: msg = 'service not found' diff --git a/quest/services/base/provider_base.py b/quest/services/base/provider_base.py index 7bc08b45..30ffc21d 100644 --- a/quest/services/base/provider_base.py +++ b/quest/services/base/provider_base.py @@ -60,7 +60,6 @@ def publishers(self): if self._publishers is None: publishers_list = self.publisher_base_class.__subclasses__() or [self.publisher_base_class] self._publishers = {p.publisher_name: p(name=p.publisher_name, provider=self) for p in publishers_list} - return self._publishers @property @@ -283,8 +282,8 @@ def download_options(self, service, fmt): """ return self.services[service].download_options(fmt) - def publish(self, publisher, options): - return self.publishers[publisher].publish(options) + def publish(self, publisher, **kwargs): + return self.publishers[publisher].publish(**kwargs) def publish_options(self, publisher, fmt): return self.publishers[publisher].publish_options(fmt) @@ -299,4 +298,4 @@ def unauthenticate_me(self): if p is None: raise ValueError('Provider does not exist in the database.') else: - p.delete() \ No newline at end of file + p.delete() diff --git a/quest/services/base/publish_base.py b/quest/services/base/publish_base.py index 0e6a80cb..a19ca1fb 100644 --- a/quest/services/base/publish_base.py +++ b/quest/services/base/publish_base.py @@ -38,5 +38,5 @@ def publish_options(self, fmt): return schema - def publish(self, options): + def publish(self, **kwargs): raise NotImplementedError() diff --git a/quest/services/base/service_base.py b/quest/services/base/service_base.py index 5dae3ace..27195b4d 100644 --- a/quest/services/base/service_base.py +++ b/quest/services/base/service_base.py @@ -92,7 +92,7 @@ def download_options(self, fmt): return schema - def download(self, feature, file_path, dataset, **params): + def download(self, feature, file_path, dataset, **kwargs): raise NotImplementedError() def get_features(self, **kwargs): @@ -138,7 +138,7 @@ class SingleFileServiceBase(ServiceBase): """Base file for datasets that are a single file download eg elevation raster etc """ - def download(self, feature, file_path, dataset, **params): + def download(self, feature, file_path, dataset, **kwargs): feature_id = util.construct_service_uri(self.provider.name, self.name, feature) feature = self.provider.get_features(self.name).loc[feature_id] reserved = feature.get('reserved') diff --git a/quest/services/ckan.py b/quest/services/ckan.py new file mode 100644 index 00000000..91aecc47 --- /dev/null +++ b/quest/services/ckan.py @@ -0,0 +1,192 @@ +from .base import ProviderBase, ServiceBase, PublishBase +from ..api.metadata import get_metadata +from ..api.database import get_db, db_session +from shapely.geometry import shape +from ckanapi import RemoteCKAN +from ..util import param_util +from getpass import getpass +import pandas as pd +import datetime +import geojson +import param +import os + + +class CKANServiceBase(ServiceBase): + + @property + def demo(self): + return self.provider.get_demo() + + def get_features(self, **kwargs): + raise NotImplementedError() + + def get_data(self, **kwargs): + """ + How this works is by grabbing the first 1000 rows of datasets from the ckan application. + The package_search() call will return a dictionary that holds the total number of datasets + that is being returned. By default the CKAN application only returns a max of 1000 rows at + a time. Once I grab the first amount of datasets, I check to see if the count is greater + than 1000, and if so I set a counter to 1001. I do this because if I got from 0 - 1000, + then I want to grab from 1001 to 2001 and so on. I save the results to a list, then proceed + to loop. I then grab the next group of datasets, increment my counter, and then add it to + the list of other datasets. + """ + results = self.demo.action.package_search(**kwargs, start=0, rows=1000) + + list_of_datasets = results['results'] + if results['count'] > 1000: + counter = 1000 + total_datasets = results['count'] + while counter < total_datasets: + results = self.demo.action.package_search(**kwargs, start=counter, rows=1000) + counter += 1000 + list_of_datasets.extend(results['results']) + + return list_of_datasets + +class CKANGeoService(CKANServiceBase): + service_name = "ckan_geo_service" + display_name = "CKAN Geo Service" + description = 'To grab geo specific packages from a CKAN repository.' + service_type = 'geo-discrete' + unmapped_parameters_available = True + geom_type = 'Point' + datatype = 'timeseries' + geographical_areas = ['Worldwide'] + bounding_boxes = [ + [-180, -90, 180, 90], + ] + _parameter_map = {} + + def get_features(self, **kwargs): + list_of_datasets = self.get_data(extras={"ext_bbox": "-180,-90,180,90"}) + features = pd.DataFrame(list_of_datasets) + features['extras'] = features['extras'].apply(lambda row: {i['key']: i['value'] for i in row}) + features['geometry'] = features['extras'].apply(lambda r: shape(geojson.loads(r['spatial']))) + features['service_id'] = features['id'].apply(str) + features.index = features['service_id'] + features.rename(columns={ + 'title': 'display_name', + }, inplace=True) + + return features + + +class CKANNormService(CKANServiceBase): + service_name = "ckan_norm" + display_name = "CKAN Normal Service" + description = 'To grab non-geo specific packages from a CKAN repository.' + service_type = "norm-discrete" + unmapped_parameters_available = True + _parameter_map = {} + + def get_features(self, **kwargs): + list_of_datasets = self.get_data() + features = pd.DataFrame(list_of_datasets) + features['service_id'] = features['id'].apply(str) + features.index = features['service_id'] + features.rename(columns={ + 'title': 'display_name', + }, inplace=True) + + return features + + +class CKANPublishBase(PublishBase): + publisher_name = "ckan_pub" + display_name = "CKAN Publisher" + description = "To be able to push to the CKAN repository.." + + title = param.String(default="", doc="", precedence=1) + dataset_name = param.String(default="", doc="", precedence=2) + author = param.String(default="", doc="", precedence=3) + author_email = param.String(default="", doc="", precedence=4) + availability = param.ObjectSelector(default=None, doc="", objects=[True,False], precedence=5) + description = param.String(default="", doc="", precedence=6) + type = param.String(default="", doc="Data type", precedence=7) + dataset = param_util.DatasetListSelector(default=(), filters={'status': 'downloaded'}, precedence=8, + doc="dataset to publish to ckan") + @property + def demo(self): + return self.provider.get_demo() + + def publish(self, options=None): + try: + p = param.ParamOverrides(self, options) + params = {"name": p.dataset_name, + "title": p.title, + "private": p.availability, + "author": p.author, + "author_email": p.author_email, + "maintainer": p.author, + "license_id": "None", + "notes": p.description, + "type": p.type + } + + the_package = self.demo.action.package_create(**params) + + for dataset in p.dataset: + dataset_metadata = get_metadata(dataset)[dataset] + fpath = dataset_metadata['file_path'] + filename, file_extension = os.path.splitext(fpath) + now = datetime.datetime.now() + params2 = {"package_id": the_package['id'], + "format": file_extension, + "name": filename, + "size": os.path.getsize(fpath), + "created": str(now)[:10], + "upload": fpath + } + self.demo.action.resource_create(**params2) + return the_package['id'] + except Exception as e: + raise e + + +class CKANProvider(ProviderBase): + service_base_class = CKANServiceBase + publisher_base_class = CKANPublishBase + display_name = 'CKAN Provider' + description = 'Services avaliable through the CKAN applications.' + organization_name = 'CKAN' + organization_abbr = 'CKAN' + hostname = 'https://demo.ckan.org' + + def authenticate_me(self, **kwargs): + + api_key = getpass("Enter CKAN API key: ") + + db = get_db() + with db_session: + p = db.Providers.select().filter(provider=self.name).first() + + provider_metadata = { + 'provider': self.name, + 'username': 'placeholder', + 'password': api_key, + } + + if p is None: + db.Providers(**provider_metadata) + else: + p.set(**provider_metadata) + + return True + + def get_demo(self): + api_key = None + try: + api_key = self.credentials['password'] + except ValueError: + pass # ignore error if api_key has not been stored + + demo = RemoteCKAN(address=self.hostname, apikey=api_key) + demo.action.package_search(rows=1) + return demo + + def get_ckan_status(self): + demo = self.get_demo() + status = demo.action.status_show() + # check if status contains spatial_query_extension. diff --git a/quest/services/cuahsi_hs.py b/quest/services/cuahsi_hs.py index 2557440d..100caf9e 100644 --- a/quest/services/cuahsi_hs.py +++ b/quest/services/cuahsi_hs.py @@ -9,6 +9,7 @@ import param import os + class HSServiceBase(SingleFileServiceBase): @property @@ -35,7 +36,7 @@ class HSGeoService(HSServiceBase): def get_features(self, **kwargs): - results = list(self.hs.resources()) + results = list(self.hs.resources(coverage_type="box", north="90", south="-90", east="180", west="-180")) if len(results) == 0: raise ValueError('No resource available from HydroShare.') @@ -126,21 +127,18 @@ class HSPublisher(PublishBase): 'Time Series': 'TimeSeriesResource' } - title = param.String(default="example title", doc="Title of resource", precedence=2) - abstract = param.String(default="example abstract", precedence=3, - doc="An description of the resource to be added to HydroShare.") - keywords = param.List(default=[], precedence=4, doc="list of keyword strings to describe the resource") - dataset = param_util.DatasetListSelector(default=(), filters={'status': 'downloaded'}, precedence=5, - doc="dataset to publish to HydroShare") - resource_type = param.ObjectSelector(doc='parameter', precedence=1, objects=sorted(_resource_type_map.keys())) + resource_type = param.ObjectSelector(default=None, doc="", precedence=1, objects=sorted(_resource_type_map.keys())) + title = param.String(default="", doc="", precedence=2) + abstract = param.String(default="", doc="", precedence=3) + keywords = param.List(default=[], doc="", precedence=4) + dataset = param_util.DatasetListSelector(default=(), filters={'status': 'downloaded'}, doc="", precedence=5) @property def hs(self): return self.provider.get_hs() - def publish(self, options=None): - - p = param.ParamOverrides(self, options) + def publish(self, **kwargs): + p = param.ParamOverrides(self, kwargs) valid_file_paths = [] valid_extensions = [] diff --git a/quest/services/kitware_girder.py b/quest/services/kitware_girder.py index 9a4bf34e..ba7bd674 100644 --- a/quest/services/kitware_girder.py +++ b/quest/services/kitware_girder.py @@ -34,9 +34,9 @@ class GirderPublisher(PublishBase): def gc(self): return self.provider.get_gc() - def publish(self, options=None): + def publish(self, **kwargs): try: - p = param.ParamOverrides(self, options) + p = param.ParamOverrides(self, kwargs) params = {'name': p.title, 'description': p.collection_description} resource_information_dict = self.gc.createResource(path='collection', params=params) folder_creation_dict = self.gc.createFolder(parentId=resource_information_dict['_id'], diff --git a/quest/services/noaa_coastwatch.py b/quest/services/noaa_coastwatch.py index bb79ba92..6e082007 100644 --- a/quest/services/noaa_coastwatch.py +++ b/quest/services/noaa_coastwatch.py @@ -65,8 +65,8 @@ def parameter_map(self, invert=False): return pmap - def download(self, feature, file_path, dataset, **params): - p = param.ParamOverrides(self, params) + def download(self, feature, file_path, dataset, **kwargs): + p = param.ParamOverrides(self, kwargs) self.parameter = p.parameter self.end = pd.to_datetime(p.end) self.start = pd.to_datetime(p.start) diff --git a/quest/services/noaa_ncdc.py b/quest/services/noaa_ncdc.py index 5bd26238..6bbe1d5f 100644 --- a/quest/services/noaa_ncdc.py +++ b/quest/services/noaa_ncdc.py @@ -74,8 +74,8 @@ def parameter_map(self, invert=False): return pmap - def download(self, feature, file_path, dataset, **params): - p = param.ParamOverrides(self, params) + def download(self, feature, file_path, dataset, **kwargs): + p = param.ParamOverrides(self, kwargs) self.parameter = p.parameter self.end = pd.to_datetime(p.end) self.start = pd.to_datetime(p.start) diff --git a/quest/services/template_service.py b/quest/services/template_service.py index 4d0cd8cf..b4e4ba6f 100644 --- a/quest/services/template_service.py +++ b/quest/services/template_service.py @@ -26,7 +26,7 @@ class ExampleServiceBase(ServiceBase): smtk_template = None _parameter_map = dict() - def download(self, feature, file_path, dataset, **params): + def download(self, feature, file_path, dataset, **kwwargs): metadata = {} # get metadata from service data = None # data structure containing downloaded data diff --git a/quest/services/user_provider.py b/quest/services/user_provider.py index c6647215..4e525127 100644 --- a/quest/services/user_provider.py +++ b/quest/services/user_provider.py @@ -51,7 +51,7 @@ def instance(cls, service_name, service_data, provider, uri, is_remote): return self - def download(self, feature, file_path, dataset, **params): + def download(self, feature, file_path, dataset, **kwargs): if self.datasets_mapping is not None: fnames = self.datasets_mapping if isinstance(dict, self.datasets_mapping): diff --git a/quest/services/usgs_nwis.py b/quest/services/usgs_nwis.py index 08233a39..cc81b72b 100644 --- a/quest/services/usgs_nwis.py +++ b/quest/services/usgs_nwis.py @@ -18,8 +18,8 @@ class NwisServiceBase(TimePeriodServiceBase): period = param.String(default='P365D', precedence=4, doc='time period (e.g. P365D = 365 days or P4W = 4 weeks)') smtk_template = 'start_end_or_period.sbt' - def download(self, feature, file_path, dataset, **params): - p = param.ParamOverrides(self, params) + def download(self, feature, file_path, dataset, **kwargs): + p = param.ParamOverrides(self, kwargs) parameter = p.parameter start = p.start diff --git a/quest/util/misc.py b/quest/util/misc.py index fac84738..5f03d405 100644 --- a/quest/util/misc.py +++ b/quest/util/misc.py @@ -19,6 +19,7 @@ from uuid import uuid4, UUID import quest +the_providers = None def generate_cache(update=False): """Downloads features for all services and caches results. @@ -238,22 +239,27 @@ def load_drivers(namespace, names=None): return {name: driver.DriverManager(namespace, name, invoke_on_load='True') for name in names} -def load_providers(): - settings = get_settings() +def load_providers(update_cache=False): + global the_providers - # add web services + settings = get_settings() web_services = list_drivers('services') web_services.remove('user') - providers = {name: driver.DriverManager('quest.services', name, invoke_on_load=True, invoke_kwds={'name': name}).driver for name in web_services} + if update_cache or the_providers is None: + providers = {name: driver.DriverManager('quest.services', name, invoke_on_load=True, invoke_kwds={'name': name}).driver for name in web_services} - if len(settings.get('USER_SERVICES', [])) > 0: - for uri in settings.get('USER_SERVICES', []): - try: - drv = driver.DriverManager('quest.services', 'user', invoke_on_load=True, invoke_kwds={'uri': uri}).driver - providers['user-' + drv.name] = drv - except Exception as e: - logger.error('Failed to load local service from %s, with exception: %s' % (uri, str(e))) + if len(settings.get('USER_SERVICES', [])) > 0: + for uri in settings.get('USER_SERVICES', []): + try: + drv = driver.DriverManager('quest.services', 'user', invoke_on_load=True, invoke_kwds={'uri': uri}).driver + providers['user-' + drv.name] = drv + except Exception as e: + logger.error('Failed to load local service from %s, with exception: %s' % (uri, str(e))) + + the_providers = providers + else: + providers = the_providers return providers diff --git a/setup.cfg b/setup.cfg index 97e56815..63c7164e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -37,6 +37,7 @@ quest.services = usgs-nlcd = quest.services.usgs_nlcd:UsgsNlcdProvider cuahsi-hydroshare = quest.services.cuahsi_hs:HSProvider kitware-girder = quest.services.kitware_girder:GirderProvider + ckan = quest.services.ckan:CKANProvider # nasa = quest.services.nasa:NasaProvider quest.filters = diff --git a/test/data.py b/test/data.py index 01bc2e35..9766c6dd 100644 --- a/test/data.py +++ b/test/data.py @@ -203,6 +203,8 @@ 'title': 'NWIS Instantaneous Values Service Download Options'}, 'svc://cuahsi-hydroshare:hs_geo': {}, 'svc://cuahsi-hydroshare:hs_norm': {}, + 'svc://ckan:ckan_norm': {}, + 'svc://ckan:ckan_geo_service': {} }