diff --git a/README.md b/README.md index 4fbea96..9ffb643 100644 --- a/README.md +++ b/README.md @@ -181,7 +181,7 @@ Here's a summary of the options available. | CHUNK_SIZE | '16384' | Chunk size when processing the data file | | CHUNK_INSERT_ROWS | '250' | Number of records to send a request to datastore | | DOWNLOAD_TIMEOUT | '30' | Download timeout for requesting the file | -| SSL_VERIFY | False | Do not validate SSL certificates when requesting the data file (*Warning*: Do not use this setting in production) | +| DATAPUSHER_SSL_VERIFY | True | Do not validate SSL certificates when requesting the data file (*Warning*: Do not use this setting in production). Was used a different name from ckan SSL_VERIFY to prevent overlapping with the value set in the ckan imports | | TYPES | [messytables.StringType, messytables.DecimalType, messytables.IntegerType, messytables.DateUtilType] | [Messytables][] types used internally, can be modified to customize the type guessing | | TYPE_MAPPING | {'String': 'text', 'Integer': 'numeric', 'Decimal': 'numeric', 'DateUtil': 'timestamp'} | Internal Messytables type mapping | diff --git a/datapusher/jobs.py b/datapusher/jobs.py index 76828b7..38a9ff3 100644 --- a/datapusher/jobs.py +++ b/datapusher/jobs.py @@ -29,6 +29,7 @@ else: locale.setlocale(locale.LC_ALL, '') +DATAPUSHER_SSL_VERIFY = web.app.config.get('DATAPUSHER_SSL_VERIFY', True) MAX_CONTENT_LENGTH = web.app.config.get('MAX_CONTENT_LENGTH') or 10485760 CHUNK_SIZE = web.app.config.get('CHUNK_SIZE') or 16384 CHUNK_INSERT_ROWS = web.app.config.get('CHUNK_INSERT_ROWS') or 250 @@ -37,12 +38,12 @@ if USE_PROXY: DOWNLOAD_PROXY = web.app.config.get('DOWNLOAD_PROXY') -if web.app.config.get('SSL_VERIFY') in ['False', 'FALSE', '0', False, 0]: - SSL_VERIFY = False +if DATAPUSHER_SSL_VERIFY in ['False', 'FALSE', '0', False, 0]: + DATAPUSHER_SSL_VERIFY = False else: - SSL_VERIFY = True + DATAPUSHER_SSL_VERIFY = True -if not SSL_VERIFY: +if not DATAPUSHER_SSL_VERIFY: requests.packages.urllib3.disable_warnings() _TYPE_MAPPING = { @@ -202,7 +203,7 @@ def delete_datastore_resource(resource_id, api_key, ckan_url): try: delete_url = get_url('datastore_delete', ckan_url) response = requests.post(delete_url, - verify=SSL_VERIFY, + verify=DATAPUSHER_SSL_VERIFY, data=json.dumps({'id': resource_id, 'force': True}), headers={'Content-Type': 'application/json', @@ -218,7 +219,7 @@ def datastore_resource_exists(resource_id, api_key, ckan_url): try: search_url = get_url('datastore_search', ckan_url) response = requests.post(search_url, - verify=SSL_VERIFY, + verify=DATAPUSHER_SSL_VERIFY, data=json.dumps({'id': resource_id, 'limit': 0}), headers={'Content-Type': 'application/json', @@ -251,7 +252,7 @@ def send_resource_to_datastore(resource, headers, records, url = get_url('datastore_create', ckan_url) r = requests.post(url, - verify=SSL_VERIFY, + verify=DATAPUSHER_SSL_VERIFY, data=json.dumps(request, cls=DatastoreEncoder), headers={'Content-Type': 'application/json', 'Authorization': api_key} @@ -269,7 +270,7 @@ def update_resource(resource, api_key, ckan_url): url = get_url('resource_update', ckan_url) r = requests.post( url, - verify=SSL_VERIFY, + verify=DATAPUSHER_SSL_VERIFY, data=json.dumps(resource), headers={'Content-Type': 'application/json', 'Authorization': api_key} @@ -284,7 +285,7 @@ def get_resource(resource_id, ckan_url, api_key): """ url = get_url('resource_show', ckan_url) r = requests.post(url, - verify=SSL_VERIFY, + verify=DATAPUSHER_SSL_VERIFY, data=json.dumps({'id': resource_id}), headers={'Content-Type': 'application/json', 'Authorization': api_key} @@ -323,19 +324,38 @@ def push_to_datastore(task_id, input, dry_run=False): :type dry_run: boolean ''' + logging.debug("Executing push_to_datastore job") + logging.debug("DATAPUSHER_SSL_VERIFY %s" % DATAPUSHER_SSL_VERIFY) + + # This response_logger is used to report job activity to ckan handler = util.StoringHandler(task_id, input) - logger = logging.getLogger(task_id) - logger.addHandler(handler) - logger.setLevel(logging.DEBUG) + response_logger = logging.getLogger(task_id) + response_logger.addHandler(handler) + response_logger.setLevel(logging.DEBUG) validate_input(input) data = input['metadata'] ckan_url = data['ckan_url'] + + # See ckan/ckanext/datapusher/logic/action.py + # See https://github.com/ckan/ckan-service-provider/blob/master/ckanserviceprovider/web.py + callback_url = input.get('result_url') + resource_id = data['resource_id'] + original_url = data.get('original_url', None) + original_base_url = data.get('original_base_url', '') + api_key = input.get('api_key') + logging.debug("callback_url: %s" % callback_url) + logging.debug("resource_id: %s" % resource_id) + logging.debug("api_key: %s............" % api_key[0:8]) + logging.debug("ckan_url: %s" % ckan_url) + logging.debug("original_url: %s" % original_url) + logging.debug("original_base_url: %s" % original_base_url) + try: resource = get_resource(resource_id, ckan_url, api_key) except util.JobError as e: @@ -345,11 +365,29 @@ def push_to_datastore(task_id, input, dry_run=False): # check if the resource url_type is a datastore if resource.get('url_type') == 'datastore': - logger.info('Dump files are managed with the Datastore API') + response_logger.info('Dump files are managed with the Datastore API') return # check scheme + # this url is the external url of the resource if behind reverse proxy + # we had to replace this url_base with callback_url_base url = resource.get('url') + + # here we replace original_base_url, with ckan_url in the url variable + # example: + # ckan_url: http://ckan:5000/ + # original_base_url: https://ckanportal.mydomain.com + # + # url: + # before: https://ckanportal.mydomain.com/dataset/515fa5c0-058e-4b53-b5ed-74ff38aca428/resource/203de699-c2fc-40f7-a740-0369a2ebaa78/download/test.csv + # after: http://ckan:5000/dataset/515fa5c0-058e-4b53-b5ed-74ff38aca428/resource/203de699-c2fc-40f7-a740-0369a2ebaa78/download/test.csv + if (original_base_url): + original_base_url_strip = original_base_url.rstrip("/") + ckan_url_strip = ckan_url.rstrip("/") + url = url.replace(original_base_url_strip, ckan_url_strip) + + logging.debug("Verifying resource from url: %s" % url) + scheme = urlsplit(url).scheme if scheme not in ('http', 'https', 'ftp'): raise util.JobError( @@ -357,7 +395,7 @@ def push_to_datastore(task_id, input, dry_run=False): ) # fetch the resource data - logger.info('Fetching from: {0}'.format(url)) + response_logger.info('Fetching from: {0}'.format(url)) headers = {} if resource.get('url_type') == 'upload': # If this is an uploaded file to CKAN, authenticate the request, @@ -365,7 +403,7 @@ def push_to_datastore(task_id, input, dry_run=False): headers['Authorization'] = api_key try: kwargs = {'headers': headers, 'timeout': DOWNLOAD_TIMEOUT, - 'verify': SSL_VERIFY, 'stream': True} + 'verify': DATAPUSHER_SSL_VERIFY, 'stream': True} if USE_PROXY: kwargs['proxies'] = {'http': DOWNLOAD_PROXY, 'https': DOWNLOAD_PROXY} response = requests.get(url, **kwargs) @@ -409,7 +447,7 @@ def push_to_datastore(task_id, input, dry_run=False): if (resource.get('hash') == file_hash and not data.get('ignore_hash')): - logger.info("The file hash hasn't changed: {hash}.".format( + response_logger.info("The file hash hasn't changed: {hash}.".format( hash=file_hash)) return @@ -481,7 +519,7 @@ def row_iterator(): the fields have significantly changed, it may also fail. ''' if existing: - logger.info('Deleting "{res_id}" from datastore.'.format( + response_logger.info('Deleting "{res_id}" from datastore.'.format( res_id=resource_id)) delete_datastore_resource(resource_id, api_key, ckan_url) @@ -498,7 +536,7 @@ def row_iterator(): if type_override in list(_TYPE_MAPPING.values()): h['type'] = type_override - logger.info('Determined headers and types: {headers}'.format( + response_logger.info('Determined headers and types: {headers}'.format( headers=headers_dicts)) if dry_run: @@ -508,12 +546,12 @@ def row_iterator(): for i, chunk in enumerate(chunky(result, CHUNK_INSERT_ROWS)): records, is_it_the_last_chunk = chunk count += len(records) - logger.info('Saving chunk {number} {is_last}'.format( + response_logger.info('Saving chunk {number} {is_last}'.format( number=i, is_last='(last)' if is_it_the_last_chunk else '')) send_resource_to_datastore(resource, headers_dicts, records, is_it_the_last_chunk, api_key, ckan_url) - logger.info('Successfully pushed {n} entries to "{res_id}".'.format( + response_logger.info('Successfully pushed {n} entries to "{res_id}".'.format( n=count, res_id=resource_id)) if data.get('set_url_type', False): diff --git a/deployment/datapusher_settings.py b/deployment/datapusher_settings.py index 0134a24..cdff12b 100644 --- a/deployment/datapusher_settings.py +++ b/deployment/datapusher_settings.py @@ -25,8 +25,8 @@ CHUNK_INSERT_ROWS = int(os.environ.get('DATAPUSHER_CHUNK_INSERT_ROWS', '250')) DOWNLOAD_TIMEOUT = int(os.environ.get('DATAPUSHER_DOWNLOAD_TIMEOUT', '30')) -# Verify SSL -SSL_VERIFY = os.environ.get('DATAPUSHER_SSL_VERIFY', True) +# Verify SSL (Prevent overlapping with CKAN SSL_VERIFY) +DATAPUSHER_SSL_VERIFY = os.environ.get('DATAPUSHER_SSL_VERIFY', True) # logging #LOG_FILE = '/tmp/ckan_service.log'