From 756fb4a45aa686a6a3dd6d8bfb75981b2d509d49 Mon Sep 17 00:00:00 2001 From: Nicola Inchingolo Date: Thu, 23 Dec 2021 15:58:02 +0100 Subject: [PATCH 1/2] replace ckan url with callback url 'everywhere' --- README.md | 2 +- README.md.bak | 245 +++++++++++ datapusher/jobs.py | 78 +++- datapusher/jobs.py.bak | 559 ++++++++++++++++++++++++++ deployment/datapusher_settings.py | 4 +- deployment/datapusher_settings.py.bak | 31 ++ 6 files changed, 896 insertions(+), 23 deletions(-) create mode 100644 README.md.bak create mode 100644 datapusher/jobs.py.bak create mode 100644 deployment/datapusher_settings.py.bak diff --git a/README.md b/README.md index 4fbea96..9ffb643 100644 --- a/README.md +++ b/README.md @@ -181,7 +181,7 @@ Here's a summary of the options available. | CHUNK_SIZE | '16384' | Chunk size when processing the data file | | CHUNK_INSERT_ROWS | '250' | Number of records to send a request to datastore | | DOWNLOAD_TIMEOUT | '30' | Download timeout for requesting the file | -| SSL_VERIFY | False | Do not validate SSL certificates when requesting the data file (*Warning*: Do not use this setting in production) | +| DATAPUSHER_SSL_VERIFY | True | Do not validate SSL certificates when requesting the data file (*Warning*: Do not use this setting in production). Was used a different name from ckan SSL_VERIFY to prevent overlapping with the value set in the ckan imports | | TYPES | [messytables.StringType, messytables.DecimalType, messytables.IntegerType, messytables.DateUtilType] | [Messytables][] types used internally, can be modified to customize the type guessing | | TYPE_MAPPING | {'String': 'text', 'Integer': 'numeric', 'Decimal': 'numeric', 'DateUtil': 'timestamp'} | Internal Messytables type mapping | diff --git a/README.md.bak b/README.md.bak new file mode 100644 index 0000000..d7f690b --- /dev/null +++ b/README.md.bak @@ -0,0 +1,245 @@ +[![Build Status](https://travis-ci.org/ckan/datapusher.png?branch=master)](https://travis-ci.org/ckan/datapusher) +[![Coverage Status](https://coveralls.io/repos/ckan/datapusher/badge.png?branch=master)](https://coveralls.io/r/ckan/datapusher?branch=master) +[![Latest Version](https://img.shields.io/pypi/v/datapusher.svg)](https://pypi.python.org/pypi/datapusher/) +[![Downloads](https://img.shields.io/pypi/dm/datapusher.svg)](https://pypi.python.org/pypi/datapusher/) +[![Supported Python versions](https://img.shields.io/pypi/pyversions/datapusher.svg)](https://pypi.python.org/pypi/datapusher/) +[![License](https://img.shields.io/badge/license-GPL-blue.svg)](https://pypi.python.org/pypi/datapusher/) + +[CKAN Service Provider]: https://github.com/ckan/ckan-service-provider +[Messytables]: https://github.com/okfn/messytables + + +# DataPusher + +DataPusher is a standalone web service that automatically downloads any tabular +data files like CSV or Excel from a CKAN site's resources when they are added to the +CKAN site, parses them to pull out the actual data, then uses the DataStore API +to push the data into the CKAN site's DataStore. + +This makes the data from the resource files available via CKAN's DataStore API. +In particular, many of CKAN's data preview and visualization plugins will only +work (or will work much better) with files whose contents are in the DataStore. + +To get it working you have to: + +1. Deploy a DataPusher instance to a server (or use an existing DataPusher + instance) +2. Enable and configure the `datastore` plugin on your CKAN site. +3. Enable and configure the `datapusher` plugin on your CKAN site. + +Note that if you installed CKAN using the _package install_ option then a +DataPusher instance should be automatically installed and configured to work +with your CKAN site. + +DataPusher is built using [CKAN Service Provider][] and [Messytables][]. + +The original author of DataPusher was +Dominik Moritz . For the current list of contributors +see [github.com/ckan/datapusher/contributors](https://github.com/ckan/datapusher/contributors) + +## Development installation + +Install the required packages:: + + sudo apt-get install python-dev python-virtualenv build-essential libxslt1-dev libxml2-dev zlib1g-dev git libffi-dev + +Get the code:: + + git clone https://github.com/ckan/datapusher + cd datapusher + +Install the dependencies:: + + pip install -r requirements.txt + pip install -r requirements-dev.txt + pip install -e . + +Run the DataPusher:: + + python datapusher/main.py deployment/datapusher_settings.py + +By default DataPusher should be running at the following port: + + http://localhost:8800/ + +If you need to change the host or port, copy `deployment/datapusher_settings.py` to +`deployment/datapusher_local_settings.py` and modify the file to suit your needs. Also if running a production setup, make sure that the host and port matcht the `http` settings in the uWSGI configuration. + +To run the tests: + + nosetests + +## Production deployment + +*Note*: If you installed CKAN via a [package install](http://docs.ckan.org/en/latest/install-from-package.html), the DataPusher has already been installed and deployed for you. You can skip directly to the [Configuring](#configuring) section. + + +Thes instructions assume you already have CKAN installed on this server in the default +location described in the CKAN install documentation +(`/usr/lib/ckan/default`). If this is correct you should be able to run the +following commands directly, if not you will need to adapt the previous path to +your needs. + +These instructions set up the DataPusher web service on [uWSGI](https://uwsgi-docs.readthedocs.io/en/latest/) running on port 8800, but can be easily adapted to other WSGI servers like Gunicorn. You'll +probably need to set up Nginx as a reverse proxy in front of it and something like +Supervisor to keep the process up. + + + # Install requirements for the DataPusher + sudo apt install python3-venv python3-dev build-essential + sudo apt-get install python-dev python-virtualenv build-essential libxslt1-dev libxml2-dev git libffi-dev + + # Create a virtualenv for datapusher + sudo python3 -m venv /usr/lib/ckan/datapusher + + # Create a source directory and switch to it + sudo mkdir /usr/lib/ckan/datapusher/src + cd /usr/lib/ckan/datapusher/src + + # Clone the source (you should target the latest tagged version) + sudo git clone -b 0.0.17 https://github.com/ckan/datapusher.git + + # Install the DataPusher and its requirements + cd datapusher + sudo /usr/lib/ckan/datapusher/bin/pip install -r requirements.txt + sudo /usr/lib/ckan/datapusher/bin/python setup.py develop + + # Create a user to run the web service (if necessary) + sudo addgroup www-data + sudo adduser -G www-data www-data + + # Install uWSGI + sudo /usr/lib/ckan/datapusher/bin/pip install uwsgi + +At this point you can run DataPusher with the following command: + + /usr/lib/ckan/datapusher/bin/uwsgi -i /usr/lib/ckan/datapusher/src/datapusher/deployment/datapusher-uwsgi.ini + + +*Note*: If you are installing the DataPusher on a different location than the default +one you need to adapt the relevant paths in the `datapusher-uwsgi.ini` to the ones you are using. Also you might need to change the `uid` and `guid` settings when using a different user. + + +### High Availability Setup + +The default DataPusher configuration uses SQLite as the backend for the jobs database and a single uWSGI thread. To increase performance and concurrency you can configure DataPusher in the following way: + +1. Use Postgres as database backend, which will allow concurrent writes (and provide a more reliable backend anyway). To use Postgres, create a user and a database and update the `SQLALCHEMY_DATABASE_URI` settting accordingly: + + ``` + # This assumes DataPusher is already installed + sudo apt-get install postgresql libpq-dev + sudo -u postgres createuser -S -D -R -P datapusher_jobs + sudo -u postgres createdb -O datapusher_jobs datapusher_jobs -E utf-8 + + # Run this in the virtualenv where DataPusher is installed + pip install psycopg2 + + # Edit SQLALCHEMY_DATABASE_URI in datapusher_settings.py accordingly + # eg SQLALCHEMY_DATABASE_URI=postgresql://datapusher_jobs:YOURPASSWORD@localhost/datapusher_jobs + ``` + +2. Start more uWSGI threads. On the `deployment/datapusher-uwsgi.ini` file, set `workers` and `threads` to a value that suits your needs, and add the `lazy-apps=true` setting to avoid concurrency issues with SQLAlchemy, eg: + + ``` + # ... rest of datapusher-uwsgi.ini + workers = 3 + threads = 3 + lazy-apps = true + ``` + +## Configuring + + +### CKAN Configuration + +Add `datapusher` to the plugins in your CKAN configuration file +(generally located at `/etc/ckan/default/production.ini` or `/etc/ckan/default/ckan.ini`): + + ckan.plugins = datapusher + +In order to tell CKAN where this webservice is located, the following must be +added to the `[app:main]` section of your CKAN configuration file : + + ckan.datapusher.url = http://127.0.0.1:8800/ + +and with a published url of ckan different from the internal url of ckan visible to datapusher (as in a reverse proxy configuration): + + ckan.datapusher.callback_url_base = http://ckan:5000 + +Must be also configured the authentication using jwt tokens, modifying the following CKAN configurations: + + api_token.jwt.encode.secret = string:samerandomstring + api_token.jwt.decode.secret = string:samerandomstring + +Then generating a new JWT token in the Administration section for the user default (the user with the same name of site_id) +which is used by datapusher and configuring the generated token with this configuration in the CKAN ini: + + ckan.datapusher.token = adsadasdsads.sdsad____ + + +There are other CKAN configuration options that allow to customize the CKAN - DataPusher +integation. Please refer to the [DataPusher Settings](https://docs.ckan.org/en/latest/maintaining/configuration.html#datapusher-settings) section in the CKAN documentation for more details. + + +### DataPusher Configuration + +The DataPusher instance is configured in the `deployment/datapusher_settings.py` file. +Here's a summary of the options available. + +| Name | Default | Description | +| -- | -- | -- | +| HOST | '0.0.0.0' | Web server host | +| PORT | 8800 | Web server port | +| SQLALCHEMY_DATABASE_URI | 'sqlite:////tmp/job_store.db' | SQLAlchemy Database URL. See note about database backend below. | +| MAX_CONTENT_LENGTH | '1024000' | Max size of files to process in bytes | +| CHUNK_SIZE | '16384' | Chunk size when processing the data file | +| CHUNK_INSERT_ROWS | '250' | Number of records to send a request to datastore | +| DOWNLOAD_TIMEOUT | '30' | Download timeout for requesting the file | +| DATAPUSHER_SSL_VERIFY | True | Do not validate SSL certificates when requesting the data file (*Warning*: Do not use this setting in production). Was used a different name from ckan SSL_VERIFY to prevent overlapping with the value set in the ckan imports | +| TYPES | [messytables.StringType, messytables.DecimalType, messytables.IntegerType, messytables.DateUtilType] | [Messytables][] types used internally, can be modified to customize the type guessing | +| TYPE_MAPPING | {'String': 'text', 'Integer': 'numeric', 'Decimal': 'numeric', 'DateUtil': 'timestamp'} | Internal Messytables type mapping | + + +Most of the configuration options above can be also provided as environment variables prepending the name with `DATAPUSHER_`, eg `DATAPUSHER_SQLALCHEMY_DATABASE_URI`, `DATAPUSHER_PORT`, etc. + + +By default DataPusher uses SQLite as the database backend for the jobs information. This is fine for local development and sites with low activity, but for sites that need more performance should use Postgres as the backend for the jobs database (eg `SQLALCHEMY_DATABASE_URI=postgresql://datapusher_jobs:YOURPASSWORD@localhost/datapusher_jobs`. See also [High Availability Setup](#high-availability-setup). If SQLite is used, is probably a good idea to store the database in a location other than `/tmp`. This will prevent the database being dropped, causing out of sync errors in the CKAN side. A good place to store it is the CKAN storage folder (if DataPusher is installed in the same server), generally in `/var/lib/ckan/`. + + +## Usage + +Any file that has one of the supported formats (defined in [`ckan.datapusher.formats`](https://docs.ckan.org/en/latest/maintaining/configuration.html#ckan-datapusher-formats)) will be attempted to be loaded +into the DataStore. + +You can also manually trigger resources to be resubmitted. When editing a resource in CKAN (clicking the "Manage" button on a resource page), a new tab named "DataStore" will appear. This will contain a log of the last attempted upload and a button to retry the upload. + +![DataPusher UI](images/ui.png) + +### Command line + +Run the following command to submit all resources to datapusher, although it will skip files whose hash of the data file has not changed: + + ckan -c /etc/ckan/default/ckan.ini datapusher resubmit + +On CKAN<=2.8: + + paster --plugin=ckan datapusher resubmit -c /etc/ckan/default/ckan.ini + +To Resubmit a specific resource, whether or not the hash of the data file has changed:: + + ckan -c /etc/ckan/default/ckan.ini datapusher submit {dataset_id} + +On CKAN<=2.8: + + paster --plugin=ckan datapusher submit -c /etc/ckan/default/ckan.ini + + +## License + +This material is copyright (c) 2020 Open Knowledge Foundation and other contributors + +It is open and licensed under the GNU Affero General Public License (AGPL) v3.0 +whose full text may be found at: + +[http://www.fsf.org/licensing/licenses/agpl-3.0.html]() diff --git a/datapusher/jobs.py b/datapusher/jobs.py index 76828b7..38a9ff3 100644 --- a/datapusher/jobs.py +++ b/datapusher/jobs.py @@ -29,6 +29,7 @@ else: locale.setlocale(locale.LC_ALL, '') +DATAPUSHER_SSL_VERIFY = web.app.config.get('DATAPUSHER_SSL_VERIFY', True) MAX_CONTENT_LENGTH = web.app.config.get('MAX_CONTENT_LENGTH') or 10485760 CHUNK_SIZE = web.app.config.get('CHUNK_SIZE') or 16384 CHUNK_INSERT_ROWS = web.app.config.get('CHUNK_INSERT_ROWS') or 250 @@ -37,12 +38,12 @@ if USE_PROXY: DOWNLOAD_PROXY = web.app.config.get('DOWNLOAD_PROXY') -if web.app.config.get('SSL_VERIFY') in ['False', 'FALSE', '0', False, 0]: - SSL_VERIFY = False +if DATAPUSHER_SSL_VERIFY in ['False', 'FALSE', '0', False, 0]: + DATAPUSHER_SSL_VERIFY = False else: - SSL_VERIFY = True + DATAPUSHER_SSL_VERIFY = True -if not SSL_VERIFY: +if not DATAPUSHER_SSL_VERIFY: requests.packages.urllib3.disable_warnings() _TYPE_MAPPING = { @@ -202,7 +203,7 @@ def delete_datastore_resource(resource_id, api_key, ckan_url): try: delete_url = get_url('datastore_delete', ckan_url) response = requests.post(delete_url, - verify=SSL_VERIFY, + verify=DATAPUSHER_SSL_VERIFY, data=json.dumps({'id': resource_id, 'force': True}), headers={'Content-Type': 'application/json', @@ -218,7 +219,7 @@ def datastore_resource_exists(resource_id, api_key, ckan_url): try: search_url = get_url('datastore_search', ckan_url) response = requests.post(search_url, - verify=SSL_VERIFY, + verify=DATAPUSHER_SSL_VERIFY, data=json.dumps({'id': resource_id, 'limit': 0}), headers={'Content-Type': 'application/json', @@ -251,7 +252,7 @@ def send_resource_to_datastore(resource, headers, records, url = get_url('datastore_create', ckan_url) r = requests.post(url, - verify=SSL_VERIFY, + verify=DATAPUSHER_SSL_VERIFY, data=json.dumps(request, cls=DatastoreEncoder), headers={'Content-Type': 'application/json', 'Authorization': api_key} @@ -269,7 +270,7 @@ def update_resource(resource, api_key, ckan_url): url = get_url('resource_update', ckan_url) r = requests.post( url, - verify=SSL_VERIFY, + verify=DATAPUSHER_SSL_VERIFY, data=json.dumps(resource), headers={'Content-Type': 'application/json', 'Authorization': api_key} @@ -284,7 +285,7 @@ def get_resource(resource_id, ckan_url, api_key): """ url = get_url('resource_show', ckan_url) r = requests.post(url, - verify=SSL_VERIFY, + verify=DATAPUSHER_SSL_VERIFY, data=json.dumps({'id': resource_id}), headers={'Content-Type': 'application/json', 'Authorization': api_key} @@ -323,19 +324,38 @@ def push_to_datastore(task_id, input, dry_run=False): :type dry_run: boolean ''' + logging.debug("Executing push_to_datastore job") + logging.debug("DATAPUSHER_SSL_VERIFY %s" % DATAPUSHER_SSL_VERIFY) + + # This response_logger is used to report job activity to ckan handler = util.StoringHandler(task_id, input) - logger = logging.getLogger(task_id) - logger.addHandler(handler) - logger.setLevel(logging.DEBUG) + response_logger = logging.getLogger(task_id) + response_logger.addHandler(handler) + response_logger.setLevel(logging.DEBUG) validate_input(input) data = input['metadata'] ckan_url = data['ckan_url'] + + # See ckan/ckanext/datapusher/logic/action.py + # See https://github.com/ckan/ckan-service-provider/blob/master/ckanserviceprovider/web.py + callback_url = input.get('result_url') + resource_id = data['resource_id'] + original_url = data.get('original_url', None) + original_base_url = data.get('original_base_url', '') + api_key = input.get('api_key') + logging.debug("callback_url: %s" % callback_url) + logging.debug("resource_id: %s" % resource_id) + logging.debug("api_key: %s............" % api_key[0:8]) + logging.debug("ckan_url: %s" % ckan_url) + logging.debug("original_url: %s" % original_url) + logging.debug("original_base_url: %s" % original_base_url) + try: resource = get_resource(resource_id, ckan_url, api_key) except util.JobError as e: @@ -345,11 +365,29 @@ def push_to_datastore(task_id, input, dry_run=False): # check if the resource url_type is a datastore if resource.get('url_type') == 'datastore': - logger.info('Dump files are managed with the Datastore API') + response_logger.info('Dump files are managed with the Datastore API') return # check scheme + # this url is the external url of the resource if behind reverse proxy + # we had to replace this url_base with callback_url_base url = resource.get('url') + + # here we replace original_base_url, with ckan_url in the url variable + # example: + # ckan_url: http://ckan:5000/ + # original_base_url: https://ckanportal.mydomain.com + # + # url: + # before: https://ckanportal.mydomain.com/dataset/515fa5c0-058e-4b53-b5ed-74ff38aca428/resource/203de699-c2fc-40f7-a740-0369a2ebaa78/download/test.csv + # after: http://ckan:5000/dataset/515fa5c0-058e-4b53-b5ed-74ff38aca428/resource/203de699-c2fc-40f7-a740-0369a2ebaa78/download/test.csv + if (original_base_url): + original_base_url_strip = original_base_url.rstrip("/") + ckan_url_strip = ckan_url.rstrip("/") + url = url.replace(original_base_url_strip, ckan_url_strip) + + logging.debug("Verifying resource from url: %s" % url) + scheme = urlsplit(url).scheme if scheme not in ('http', 'https', 'ftp'): raise util.JobError( @@ -357,7 +395,7 @@ def push_to_datastore(task_id, input, dry_run=False): ) # fetch the resource data - logger.info('Fetching from: {0}'.format(url)) + response_logger.info('Fetching from: {0}'.format(url)) headers = {} if resource.get('url_type') == 'upload': # If this is an uploaded file to CKAN, authenticate the request, @@ -365,7 +403,7 @@ def push_to_datastore(task_id, input, dry_run=False): headers['Authorization'] = api_key try: kwargs = {'headers': headers, 'timeout': DOWNLOAD_TIMEOUT, - 'verify': SSL_VERIFY, 'stream': True} + 'verify': DATAPUSHER_SSL_VERIFY, 'stream': True} if USE_PROXY: kwargs['proxies'] = {'http': DOWNLOAD_PROXY, 'https': DOWNLOAD_PROXY} response = requests.get(url, **kwargs) @@ -409,7 +447,7 @@ def push_to_datastore(task_id, input, dry_run=False): if (resource.get('hash') == file_hash and not data.get('ignore_hash')): - logger.info("The file hash hasn't changed: {hash}.".format( + response_logger.info("The file hash hasn't changed: {hash}.".format( hash=file_hash)) return @@ -481,7 +519,7 @@ def row_iterator(): the fields have significantly changed, it may also fail. ''' if existing: - logger.info('Deleting "{res_id}" from datastore.'.format( + response_logger.info('Deleting "{res_id}" from datastore.'.format( res_id=resource_id)) delete_datastore_resource(resource_id, api_key, ckan_url) @@ -498,7 +536,7 @@ def row_iterator(): if type_override in list(_TYPE_MAPPING.values()): h['type'] = type_override - logger.info('Determined headers and types: {headers}'.format( + response_logger.info('Determined headers and types: {headers}'.format( headers=headers_dicts)) if dry_run: @@ -508,12 +546,12 @@ def row_iterator(): for i, chunk in enumerate(chunky(result, CHUNK_INSERT_ROWS)): records, is_it_the_last_chunk = chunk count += len(records) - logger.info('Saving chunk {number} {is_last}'.format( + response_logger.info('Saving chunk {number} {is_last}'.format( number=i, is_last='(last)' if is_it_the_last_chunk else '')) send_resource_to_datastore(resource, headers_dicts, records, is_it_the_last_chunk, api_key, ckan_url) - logger.info('Successfully pushed {n} entries to "{res_id}".'.format( + response_logger.info('Successfully pushed {n} entries to "{res_id}".'.format( n=count, res_id=resource_id)) if data.get('set_url_type', False): diff --git a/datapusher/jobs.py.bak b/datapusher/jobs.py.bak new file mode 100644 index 0000000..3d2a92c --- /dev/null +++ b/datapusher/jobs.py.bak @@ -0,0 +1,559 @@ +# -*- coding: utf-8 -*- + + +import json +import requests +try: + from urllib.parse import urlsplit +except ImportError: + from urlparse import urlsplit + +import itertools +import datetime +import locale +import logging +import decimal +import hashlib +import time +import tempfile +import sys + +import messytables + +import ckanserviceprovider.job as job +import ckanserviceprovider.util as util +from ckanserviceprovider import web + +if locale.getdefaultlocale()[0]: + lang, encoding = locale.getdefaultlocale() + locale.setlocale(locale.LC_ALL, locale=(lang, encoding)) +else: + locale.setlocale(locale.LC_ALL, '') + +DATAPUSHER_SSL_VERIFY = web.app.config.get('DATAPUSHER_SSL_VERIFY', True) +MAX_CONTENT_LENGTH = web.app.config.get('MAX_CONTENT_LENGTH') or 10485760 +CHUNK_SIZE = web.app.config.get('CHUNK_SIZE') or 16384 +CHUNK_INSERT_ROWS = web.app.config.get('CHUNK_INSERT_ROWS') or 250 +DOWNLOAD_TIMEOUT = web.app.config.get('DOWNLOAD_TIMEOUT') or 30 +USE_PROXY = 'DOWNLOAD_PROXY' in web.app.config +if USE_PROXY: + DOWNLOAD_PROXY = web.app.config.get('DOWNLOAD_PROXY') + +if DATAPUSHER_SSL_VERIFY in ['False', 'FALSE', '0', False, 0]: + DATAPUSHER_SSL_VERIFY = False +else: + DATAPUSHER_SSL_VERIFY = True + +if not DATAPUSHER_SSL_VERIFY: + requests.packages.urllib3.disable_warnings() + +_TYPE_MAPPING = { + 'String': 'text', + # 'int' may not be big enough, + # and type detection may not realize it needs to be big + 'Integer': 'numeric', + 'Decimal': 'numeric', + 'DateUtil': 'timestamp' +} + +_TYPES = [messytables.StringType, messytables.DecimalType, + messytables.IntegerType, messytables.DateUtilType] + +TYPE_MAPPING = web.app.config.get('TYPE_MAPPING', _TYPE_MAPPING) +TYPES = web.app.config.get('TYPES', _TYPES) + +DATASTORE_URLS = { + 'datastore_delete': '{ckan_url}/api/action/datastore_delete', + 'resource_update': '{ckan_url}/api/action/resource_update' +} + + +class HTTPError(util.JobError): + """Exception that's raised if a job fails due to an HTTP problem.""" + + def __init__(self, message, status_code, request_url, response): + """Initialise a new HTTPError. + + :param message: A human-readable error message + :type message: string + + :param status_code: The status code of the errored HTTP response, + e.g. 500 + :type status_code: int + + :param request_url: The URL that was requested + :type request_url: string + + :param response: The body of the errored HTTP response as unicode + (if you have a requests.Response object then response.text will + give you this) + :type response: unicode + + """ + super(HTTPError, self).__init__(message) + self.status_code = status_code + self.request_url = request_url + self.response = response + + def as_dict(self): + """Return a JSON-serializable dictionary representation of this error. + + Suitable for ckanserviceprovider to return to the client site as the + value for the "error" key in the job dict. + + """ + if self.response and len(self.response) > 200: + response = self.response[:200] + '...' + else: + response = self.response + return { + "message": self.message, + "HTTP status code": self.status_code, + "Requested URL": self.request_url, + "Response": response, + } + + def __str__(self): + return '{} status={} url={} response={}'.format( + self.message, self.status_code, self.request_url, self.response) \ + .encode('ascii', 'replace') + + +def get_url(action, ckan_url): + """ + Get url for ckan action + """ + if not urlsplit(ckan_url).scheme: + ckan_url = 'http://' + ckan_url.lstrip('/') + ckan_url = ckan_url.rstrip('/') + return '{ckan_url}/api/3/action/{action}'.format( + ckan_url=ckan_url, action=action) + + +def check_response(response, request_url, who, good_status=(201, 200), ignore_no_success=False): + """ + Checks the response and raises exceptions if something went terribly wrong + + :param who: A short name that indicated where the error occurred + (for example "CKAN") + :param good_status: Status codes that should not raise an exception + + """ + if not response.status_code: + raise HTTPError( + 'DataPusher received an HTTP response with no status code', + status_code=None, request_url=request_url, response=response.text) + + message = '{who} bad response. Status code: {code} {reason}. At: {url}.' + try: + if response.status_code not in good_status: + json_response = response.json() + if not ignore_no_success or json_response.get('success'): + try: + message = json_response["error"]["message"] + except Exception: + message = message.format( + who=who, code=response.status_code, + reason=response.reason, url=request_url) + raise HTTPError( + message, status_code=response.status_code, + request_url=request_url, response=response.text) + except ValueError: + message = message.format( + who=who, code=response.status_code, reason=response.reason, + url=request_url, resp=response.text[:200]) + raise HTTPError( + message, status_code=response.status_code, request_url=request_url, + response=response.text) + + +def chunky(items, num_items_per_chunk): + """ + Breaks up a list of items into chunks - multiple smaller lists of items. + The last chunk is flagged up. + + :param items: Size of each chunks + :type items: iterable + :param num_items_per_chunk: Size of each chunks + :type num_items_per_chunk: int + + :returns: multiple tuples: (chunk, is_it_the_last_chunk) + :rtype: generator of (list, bool) + """ + items_ = iter(items) + chunk = list(itertools.islice(items_, num_items_per_chunk)) + while chunk: + next_chunk = list(itertools.islice(items_, num_items_per_chunk)) + chunk_is_the_last_one = not next_chunk + yield chunk, chunk_is_the_last_one + chunk = next_chunk + + +class DatastoreEncoder(json.JSONEncoder): + # Custon JSON encoder + def default(self, obj): + if isinstance(obj, datetime.datetime): + return obj.isoformat() + if isinstance(obj, decimal.Decimal): + return str(obj) + + return json.JSONEncoder.default(self, obj) + + +def delete_datastore_resource(resource_id, api_key, ckan_url): + try: + delete_url = get_url('datastore_delete', ckan_url) + response = requests.post(delete_url, + verify=DATAPUSHER_SSL_VERIFY, + data=json.dumps({'id': resource_id, + 'force': True}), + headers={'Content-Type': 'application/json', + 'Authorization': api_key} + ) + check_response(response, delete_url, 'CKAN', + good_status=(201, 200, 404), ignore_no_success=True) + except requests.exceptions.RequestException: + raise util.JobError('Deleting existing datastore failed.') + + +def datastore_resource_exists(resource_id, api_key, ckan_url): + try: + search_url = get_url('datastore_search', ckan_url) + response = requests.post(search_url, + verify=DATAPUSHER_SSL_VERIFY, + data=json.dumps({'id': resource_id, + 'limit': 0}), + headers={'Content-Type': 'application/json', + 'Authorization': api_key} + ) + if response.status_code == 404: + return False + elif response.status_code == 200: + return response.json().get('result', {'fields': []}) + else: + raise HTTPError( + 'Error getting datastore resource.', + response.status_code, search_url, response, + ) + except requests.exceptions.RequestException as e: + raise util.JobError( + 'Error getting datastore resource ({!s}).'.format(e)) + + +def send_resource_to_datastore(resource, headers, records, + is_it_the_last_chunk, api_key, ckan_url): + """ + Stores records in CKAN datastore + """ + request = {'resource_id': resource['id'], + 'fields': headers, + 'force': True, + 'records': records, + 'calculate_record_count': is_it_the_last_chunk} + + url = get_url('datastore_create', ckan_url) + r = requests.post(url, + verify=DATAPUSHER_SSL_VERIFY, + data=json.dumps(request, cls=DatastoreEncoder), + headers={'Content-Type': 'application/json', + 'Authorization': api_key} + ) + check_response(r, url, 'CKAN DataStore') + + +def update_resource(resource, api_key, ckan_url): + """ + Update webstore_url and webstore_last_updated in CKAN + """ + + resource['url_type'] = 'datapusher' + + url = get_url('resource_update', ckan_url) + r = requests.post( + url, + verify=DATAPUSHER_SSL_VERIFY, + data=json.dumps(resource), + headers={'Content-Type': 'application/json', + 'Authorization': api_key} + ) + + check_response(r, url, 'CKAN') + + +def get_resource(resource_id, ckan_url, api_key): + """ + Gets available information about the resource from CKAN + """ + url = get_url('resource_show', ckan_url) + r = requests.post(url, + verify=DATAPUSHER_SSL_VERIFY, + data=json.dumps({'id': resource_id}), + headers={'Content-Type': 'application/json', + 'Authorization': api_key} + ) + check_response(r, url, 'CKAN') + + return r.json()['result'] + + +def validate_input(input): + # Especially validate metdata which is provided by the user + if 'metadata' not in input: + raise util.JobError('Metadata missing') + + data = input['metadata'] + + if 'resource_id' not in data: + raise util.JobError('No id provided.') + if 'ckan_url' not in data: + raise util.JobError('No ckan_url provided.') + if not input.get('api_key'): + raise util.JobError('No CKAN API key provided') + + +@job.asynchronous +def push_to_datastore(task_id, input, dry_run=False): + '''Download and parse a resource push its data into CKAN's DataStore. + + An asynchronous job that gets a resource from CKAN, downloads the + resource's data file and, if the data file has changed since last time, + parses the data and posts it into CKAN's DataStore. + + :param dry_run: Fetch and parse the data file but don't actually post the + data to the DataStore, instead return the data headers and rows that + would have been posted. + :type dry_run: boolean + + ''' + logging.debug("Executing push_to_datastore job") + logging.debug("DATAPUSHER_SSL_VERIFY %s" % DATAPUSHER_SSL_VERIFY) + + # This response_logger is used to report job activity to ckan + handler = util.StoringHandler(task_id, input) + response_logger = logging.getLogger(task_id) + response_logger.addHandler(handler) + response_logger.setLevel(logging.DEBUG) + + validate_input(input) + + data = input['metadata'] + + ckan_url = data['ckan_url'] + + # See ckan/ckanext/datapusher/logic/action.py + # See https://github.com/ckan/ckan-service-provider/blob/master/ckanserviceprovider/web.py + callback_url = input.get('result_url') + + resource_id = data['resource_id'] + original_url = data.get('original_url', None) + original_base_url = data.get('original_base_url', '') + + api_key = input.get('api_key') + + logging.debug("callback_url: %s" % callback_url) + logging.debug("resource_id: %s" % resource_id) + logging.debug("api_key: %s............" % api_key[0:8]) + logging.debug("ckan_url: %s" % ckan_url) + logging.debug("original_url: %s" % original_url) + logging.debug("original_base_url: %s" % original_base_url) + + try: + resource = get_resource(resource_id, ckan_url, api_key) + except util.JobError as e: + # try again in 5 seconds just incase CKAN is slow at adding resource + time.sleep(5) + resource = get_resource(resource_id, ckan_url, api_key) + + # check if the resource url_type is a datastore + if resource.get('url_type') == 'datastore': + response_logger.info('Dump files are managed with the Datastore API') + return + + # check scheme + # this url is the external url of the resource if behind reverse proxy + # we had to replace this url_base with callback_url_base + url = resource.get('url') + + # here we replace original_base_url, with ckan_url in the url variable + # example: + # ckan_url: http://ckan:5000/ + # original_base_url: https://ckanportal.mydomain.com + # + # url: + # before: https://ckanportal.mydomain.com/dataset/515fa5c0-058e-4b53-b5ed-74ff38aca428/resource/203de699-c2fc-40f7-a740-0369a2ebaa78/download/test.csv + # after: http://ckan:5000/dataset/515fa5c0-058e-4b53-b5ed-74ff38aca428/resource/203de699-c2fc-40f7-a740-0369a2ebaa78/download/test.csv + if (original_base_url): + original_base_url_strip = original_base_url.rstrip("/") + ckan_url_strip = ckan_url.rstrip("/") + url = url.replace(original_base_url_strip, ckan_url_strip) + + logging.debug("Verifying resource from url: %s" % url) + + scheme = urlsplit(url).scheme + if scheme not in ('http', 'https', 'ftp'): + raise util.JobError( + 'Only http, https, and ftp resources may be fetched.' + ) + + # fetch the resource data + response_logger.info('Fetching from: {0}'.format(url)) + headers = {} + if resource.get('url_type') == 'upload': + # If this is an uploaded file to CKAN, authenticate the request, + # otherwise we won't get file from private resources + headers['Authorization'] = api_key + try: + kwargs = {'headers': headers, 'timeout': DOWNLOAD_TIMEOUT, + 'verify': DATAPUSHER_SSL_VERIFY, 'stream': True} + if USE_PROXY: + kwargs['proxies'] = {'http': DOWNLOAD_PROXY, 'https': DOWNLOAD_PROXY} + response = requests.get(url, **kwargs) + response.raise_for_status() + + cl = response.headers.get('content-length') + try: + if cl and int(cl) > MAX_CONTENT_LENGTH: + raise util.JobError( + 'Resource too large to download: {cl} > max ({max_cl}).' + .format(cl=cl, max_cl=MAX_CONTENT_LENGTH)) + except ValueError: + pass + + tmp = tempfile.TemporaryFile() + length = 0 + m = hashlib.md5() + for chunk in response.iter_content(CHUNK_SIZE): + length += len(chunk) + if length > MAX_CONTENT_LENGTH: + raise util.JobError( + 'Resource too large to process: {cl} > max ({max_cl}).' + .format(cl=length, max_cl=MAX_CONTENT_LENGTH)) + tmp.write(chunk) + m.update(chunk) + + ct = response.headers.get('content-type', '').split(';', 1)[0] + + except requests.HTTPError as e: + raise HTTPError( + "DataPusher received a bad HTTP response when trying to download " + "the data file", status_code=e.response.status_code, + request_url=url, response=e.response.content) + except requests.RequestException as e: + raise HTTPError( + message=str(e), status_code=None, + request_url=url, response=None) + + file_hash = m.hexdigest() + tmp.seek(0) + + if (resource.get('hash') == file_hash + and not data.get('ignore_hash')): + response_logger.info("The file hash hasn't changed: {hash}.".format( + hash=file_hash)) + return + + resource['hash'] = file_hash + + try: + table_set = messytables.any_tableset(tmp, mimetype=ct, extension=ct) + except messytables.ReadError as e: + # try again with format + tmp.seek(0) + try: + format = resource.get('format') + table_set = messytables.any_tableset(tmp, mimetype=format, extension=format) + except: + raise util.JobError(e) + + get_row_set = web.app.config.get('GET_ROW_SET', + lambda table_set: table_set.tables.pop()) + row_set = get_row_set(table_set) + offset, headers = messytables.headers_guess(row_set.sample) + + existing = datastore_resource_exists(resource_id, api_key, ckan_url) + existing_info = None + if existing: + existing_info = dict((f['id'], f['info']) + for f in existing.get('fields', []) if 'info' in f) + + # Some headers might have been converted from strings to floats and such. + headers = [str(header) for header in headers] + + row_set.register_processor(messytables.headers_processor(headers)) + row_set.register_processor(messytables.offset_processor(offset + 1)) + types = messytables.type_guess(row_set.sample, types=TYPES, strict=True) + + # override with types user requested + if existing_info: + types = [{ + 'text': messytables.StringType(), + 'numeric': messytables.DecimalType(), + 'timestamp': messytables.DateUtilType(), + }.get(existing_info.get(h, {}).get('type_override'), t) + for t, h in zip(types, headers)] + + row_set.register_processor(messytables.types_processor(types)) + + headers = [header.strip() for header in headers if header.strip()] + headers_set = set(headers) + + def row_iterator(): + for row in row_set: + data_row = {} + for index, cell in enumerate(row): + column_name = cell.column.strip() + if column_name not in headers_set: + continue + if isinstance(cell.value, str): + try: + data_row[column_name] = cell.value.encode('latin-1').decode('utf-8') + except (UnicodeDecodeError, UnicodeEncodeError): + data_row[column_name] = cell.value + else: + data_row[column_name] = cell.value + yield data_row + result = row_iterator() + + ''' + Delete existing datstore resource before proceeding. Otherwise + 'datastore_create' will append to the existing datastore. And if + the fields have significantly changed, it may also fail. + ''' + if existing: + response_logger.info('Deleting "{res_id}" from datastore.'.format( + res_id=resource_id)) + delete_datastore_resource(resource_id, api_key, ckan_url) + + headers_dicts = [dict(id=field[0], type=TYPE_MAPPING[str(field[1])]) + for field in zip(headers, types)] + + # Maintain data dictionaries from matching column names + if existing_info: + for h in headers_dicts: + if h['id'] in existing_info: + h['info'] = existing_info[h['id']] + # create columns with types user requested + type_override = existing_info[h['id']].get('type_override') + if type_override in list(_TYPE_MAPPING.values()): + h['type'] = type_override + + response_logger.info('Determined headers and types: {headers}'.format( + headers=headers_dicts)) + + if dry_run: + return headers_dicts, result + + count = 0 + for i, chunk in enumerate(chunky(result, CHUNK_INSERT_ROWS)): + records, is_it_the_last_chunk = chunk + count += len(records) + response_logger.info('Saving chunk {number} {is_last}'.format( + number=i, is_last='(last)' if is_it_the_last_chunk else '')) + send_resource_to_datastore(resource, headers_dicts, records, + is_it_the_last_chunk, api_key, ckan_url) + + response_logger.info('Successfully pushed {n} entries to "{res_id}".'.format( + n=count, res_id=resource_id)) + + if data.get('set_url_type', False): + update_resource(resource, api_key, ckan_url) diff --git a/deployment/datapusher_settings.py b/deployment/datapusher_settings.py index 0134a24..cdff12b 100644 --- a/deployment/datapusher_settings.py +++ b/deployment/datapusher_settings.py @@ -25,8 +25,8 @@ CHUNK_INSERT_ROWS = int(os.environ.get('DATAPUSHER_CHUNK_INSERT_ROWS', '250')) DOWNLOAD_TIMEOUT = int(os.environ.get('DATAPUSHER_DOWNLOAD_TIMEOUT', '30')) -# Verify SSL -SSL_VERIFY = os.environ.get('DATAPUSHER_SSL_VERIFY', True) +# Verify SSL (Prevent overlapping with CKAN SSL_VERIFY) +DATAPUSHER_SSL_VERIFY = os.environ.get('DATAPUSHER_SSL_VERIFY', True) # logging #LOG_FILE = '/tmp/ckan_service.log' diff --git a/deployment/datapusher_settings.py.bak b/deployment/datapusher_settings.py.bak new file mode 100644 index 0000000..e8e395b --- /dev/null +++ b/deployment/datapusher_settings.py.bak @@ -0,0 +1,31 @@ +import os +import uuid + +DEBUG = False +TESTING = False +SECRET_KEY = str(uuid.uuid4()) +USERNAME = str(uuid.uuid4()) +PASSWORD = str(uuid.uuid4()) + +NAME = 'datapusher' + +# Webserver host and port + +HOST = os.environ.get('DATAPUSHER_HOST', '0.0.0.0') +PORT = os.environ.get('DATAPUSHER_PORT', 8800) + +# Database + +SQLALCHEMY_DATABASE_URI = os.environ.get('DATAPUSHER_SQLALCHEMY_DATABASE_URI', 'sqlite:////tmp/job_store.db') + +# Download and streaming settings + +MAX_CONTENT_LENGTH = int(os.environ.get('DATAPUSHER_MAX_CONTENT_LENGTH', '1024000')) +CHUNK_SIZE = int(os.environ.get('DATAPUSHER_CHUNK_SIZE', '16384')) +CHUNK_INSERT_ROWS = int(os.environ.get('DATAPUSHER_CHUNK_INSERT_ROWS', '250')) +DOWNLOAD_TIMEOUT = int(os.environ.get('DATAPUSHER_DOWNLOAD_TIMEOUT', '30')) + +# Verify SSL (Prevent overlapping with CKAN SSL_VERIFY) +DATAPUSHER_SSL_VERIFY = os.environ.get('DATAPUSHER_SSL_VERIFY', True) +# logging +#LOG_FILE = '/tmp/ckan_service.log' From c40e50eba95b2e51c6b2f9cbf78b2963dc8e6a0a Mon Sep 17 00:00:00 2001 From: Nicola Inchingolo Date: Thu, 23 Dec 2021 16:13:23 +0100 Subject: [PATCH 2/2] removed winmerge bak files --- README.md.bak | 245 ----------- datapusher/jobs.py.bak | 559 -------------------------- deployment/datapusher_settings.py.bak | 31 -- 3 files changed, 835 deletions(-) delete mode 100644 README.md.bak delete mode 100644 datapusher/jobs.py.bak delete mode 100644 deployment/datapusher_settings.py.bak diff --git a/README.md.bak b/README.md.bak deleted file mode 100644 index d7f690b..0000000 --- a/README.md.bak +++ /dev/null @@ -1,245 +0,0 @@ -[![Build Status](https://travis-ci.org/ckan/datapusher.png?branch=master)](https://travis-ci.org/ckan/datapusher) -[![Coverage Status](https://coveralls.io/repos/ckan/datapusher/badge.png?branch=master)](https://coveralls.io/r/ckan/datapusher?branch=master) -[![Latest Version](https://img.shields.io/pypi/v/datapusher.svg)](https://pypi.python.org/pypi/datapusher/) -[![Downloads](https://img.shields.io/pypi/dm/datapusher.svg)](https://pypi.python.org/pypi/datapusher/) -[![Supported Python versions](https://img.shields.io/pypi/pyversions/datapusher.svg)](https://pypi.python.org/pypi/datapusher/) -[![License](https://img.shields.io/badge/license-GPL-blue.svg)](https://pypi.python.org/pypi/datapusher/) - -[CKAN Service Provider]: https://github.com/ckan/ckan-service-provider -[Messytables]: https://github.com/okfn/messytables - - -# DataPusher - -DataPusher is a standalone web service that automatically downloads any tabular -data files like CSV or Excel from a CKAN site's resources when they are added to the -CKAN site, parses them to pull out the actual data, then uses the DataStore API -to push the data into the CKAN site's DataStore. - -This makes the data from the resource files available via CKAN's DataStore API. -In particular, many of CKAN's data preview and visualization plugins will only -work (or will work much better) with files whose contents are in the DataStore. - -To get it working you have to: - -1. Deploy a DataPusher instance to a server (or use an existing DataPusher - instance) -2. Enable and configure the `datastore` plugin on your CKAN site. -3. Enable and configure the `datapusher` plugin on your CKAN site. - -Note that if you installed CKAN using the _package install_ option then a -DataPusher instance should be automatically installed and configured to work -with your CKAN site. - -DataPusher is built using [CKAN Service Provider][] and [Messytables][]. - -The original author of DataPusher was -Dominik Moritz . For the current list of contributors -see [github.com/ckan/datapusher/contributors](https://github.com/ckan/datapusher/contributors) - -## Development installation - -Install the required packages:: - - sudo apt-get install python-dev python-virtualenv build-essential libxslt1-dev libxml2-dev zlib1g-dev git libffi-dev - -Get the code:: - - git clone https://github.com/ckan/datapusher - cd datapusher - -Install the dependencies:: - - pip install -r requirements.txt - pip install -r requirements-dev.txt - pip install -e . - -Run the DataPusher:: - - python datapusher/main.py deployment/datapusher_settings.py - -By default DataPusher should be running at the following port: - - http://localhost:8800/ - -If you need to change the host or port, copy `deployment/datapusher_settings.py` to -`deployment/datapusher_local_settings.py` and modify the file to suit your needs. Also if running a production setup, make sure that the host and port matcht the `http` settings in the uWSGI configuration. - -To run the tests: - - nosetests - -## Production deployment - -*Note*: If you installed CKAN via a [package install](http://docs.ckan.org/en/latest/install-from-package.html), the DataPusher has already been installed and deployed for you. You can skip directly to the [Configuring](#configuring) section. - - -Thes instructions assume you already have CKAN installed on this server in the default -location described in the CKAN install documentation -(`/usr/lib/ckan/default`). If this is correct you should be able to run the -following commands directly, if not you will need to adapt the previous path to -your needs. - -These instructions set up the DataPusher web service on [uWSGI](https://uwsgi-docs.readthedocs.io/en/latest/) running on port 8800, but can be easily adapted to other WSGI servers like Gunicorn. You'll -probably need to set up Nginx as a reverse proxy in front of it and something like -Supervisor to keep the process up. - - - # Install requirements for the DataPusher - sudo apt install python3-venv python3-dev build-essential - sudo apt-get install python-dev python-virtualenv build-essential libxslt1-dev libxml2-dev git libffi-dev - - # Create a virtualenv for datapusher - sudo python3 -m venv /usr/lib/ckan/datapusher - - # Create a source directory and switch to it - sudo mkdir /usr/lib/ckan/datapusher/src - cd /usr/lib/ckan/datapusher/src - - # Clone the source (you should target the latest tagged version) - sudo git clone -b 0.0.17 https://github.com/ckan/datapusher.git - - # Install the DataPusher and its requirements - cd datapusher - sudo /usr/lib/ckan/datapusher/bin/pip install -r requirements.txt - sudo /usr/lib/ckan/datapusher/bin/python setup.py develop - - # Create a user to run the web service (if necessary) - sudo addgroup www-data - sudo adduser -G www-data www-data - - # Install uWSGI - sudo /usr/lib/ckan/datapusher/bin/pip install uwsgi - -At this point you can run DataPusher with the following command: - - /usr/lib/ckan/datapusher/bin/uwsgi -i /usr/lib/ckan/datapusher/src/datapusher/deployment/datapusher-uwsgi.ini - - -*Note*: If you are installing the DataPusher on a different location than the default -one you need to adapt the relevant paths in the `datapusher-uwsgi.ini` to the ones you are using. Also you might need to change the `uid` and `guid` settings when using a different user. - - -### High Availability Setup - -The default DataPusher configuration uses SQLite as the backend for the jobs database and a single uWSGI thread. To increase performance and concurrency you can configure DataPusher in the following way: - -1. Use Postgres as database backend, which will allow concurrent writes (and provide a more reliable backend anyway). To use Postgres, create a user and a database and update the `SQLALCHEMY_DATABASE_URI` settting accordingly: - - ``` - # This assumes DataPusher is already installed - sudo apt-get install postgresql libpq-dev - sudo -u postgres createuser -S -D -R -P datapusher_jobs - sudo -u postgres createdb -O datapusher_jobs datapusher_jobs -E utf-8 - - # Run this in the virtualenv where DataPusher is installed - pip install psycopg2 - - # Edit SQLALCHEMY_DATABASE_URI in datapusher_settings.py accordingly - # eg SQLALCHEMY_DATABASE_URI=postgresql://datapusher_jobs:YOURPASSWORD@localhost/datapusher_jobs - ``` - -2. Start more uWSGI threads. On the `deployment/datapusher-uwsgi.ini` file, set `workers` and `threads` to a value that suits your needs, and add the `lazy-apps=true` setting to avoid concurrency issues with SQLAlchemy, eg: - - ``` - # ... rest of datapusher-uwsgi.ini - workers = 3 - threads = 3 - lazy-apps = true - ``` - -## Configuring - - -### CKAN Configuration - -Add `datapusher` to the plugins in your CKAN configuration file -(generally located at `/etc/ckan/default/production.ini` or `/etc/ckan/default/ckan.ini`): - - ckan.plugins = datapusher - -In order to tell CKAN where this webservice is located, the following must be -added to the `[app:main]` section of your CKAN configuration file : - - ckan.datapusher.url = http://127.0.0.1:8800/ - -and with a published url of ckan different from the internal url of ckan visible to datapusher (as in a reverse proxy configuration): - - ckan.datapusher.callback_url_base = http://ckan:5000 - -Must be also configured the authentication using jwt tokens, modifying the following CKAN configurations: - - api_token.jwt.encode.secret = string:samerandomstring - api_token.jwt.decode.secret = string:samerandomstring - -Then generating a new JWT token in the Administration section for the user default (the user with the same name of site_id) -which is used by datapusher and configuring the generated token with this configuration in the CKAN ini: - - ckan.datapusher.token = adsadasdsads.sdsad____ - - -There are other CKAN configuration options that allow to customize the CKAN - DataPusher -integation. Please refer to the [DataPusher Settings](https://docs.ckan.org/en/latest/maintaining/configuration.html#datapusher-settings) section in the CKAN documentation for more details. - - -### DataPusher Configuration - -The DataPusher instance is configured in the `deployment/datapusher_settings.py` file. -Here's a summary of the options available. - -| Name | Default | Description | -| -- | -- | -- | -| HOST | '0.0.0.0' | Web server host | -| PORT | 8800 | Web server port | -| SQLALCHEMY_DATABASE_URI | 'sqlite:////tmp/job_store.db' | SQLAlchemy Database URL. See note about database backend below. | -| MAX_CONTENT_LENGTH | '1024000' | Max size of files to process in bytes | -| CHUNK_SIZE | '16384' | Chunk size when processing the data file | -| CHUNK_INSERT_ROWS | '250' | Number of records to send a request to datastore | -| DOWNLOAD_TIMEOUT | '30' | Download timeout for requesting the file | -| DATAPUSHER_SSL_VERIFY | True | Do not validate SSL certificates when requesting the data file (*Warning*: Do not use this setting in production). Was used a different name from ckan SSL_VERIFY to prevent overlapping with the value set in the ckan imports | -| TYPES | [messytables.StringType, messytables.DecimalType, messytables.IntegerType, messytables.DateUtilType] | [Messytables][] types used internally, can be modified to customize the type guessing | -| TYPE_MAPPING | {'String': 'text', 'Integer': 'numeric', 'Decimal': 'numeric', 'DateUtil': 'timestamp'} | Internal Messytables type mapping | - - -Most of the configuration options above can be also provided as environment variables prepending the name with `DATAPUSHER_`, eg `DATAPUSHER_SQLALCHEMY_DATABASE_URI`, `DATAPUSHER_PORT`, etc. - - -By default DataPusher uses SQLite as the database backend for the jobs information. This is fine for local development and sites with low activity, but for sites that need more performance should use Postgres as the backend for the jobs database (eg `SQLALCHEMY_DATABASE_URI=postgresql://datapusher_jobs:YOURPASSWORD@localhost/datapusher_jobs`. See also [High Availability Setup](#high-availability-setup). If SQLite is used, is probably a good idea to store the database in a location other than `/tmp`. This will prevent the database being dropped, causing out of sync errors in the CKAN side. A good place to store it is the CKAN storage folder (if DataPusher is installed in the same server), generally in `/var/lib/ckan/`. - - -## Usage - -Any file that has one of the supported formats (defined in [`ckan.datapusher.formats`](https://docs.ckan.org/en/latest/maintaining/configuration.html#ckan-datapusher-formats)) will be attempted to be loaded -into the DataStore. - -You can also manually trigger resources to be resubmitted. When editing a resource in CKAN (clicking the "Manage" button on a resource page), a new tab named "DataStore" will appear. This will contain a log of the last attempted upload and a button to retry the upload. - -![DataPusher UI](images/ui.png) - -### Command line - -Run the following command to submit all resources to datapusher, although it will skip files whose hash of the data file has not changed: - - ckan -c /etc/ckan/default/ckan.ini datapusher resubmit - -On CKAN<=2.8: - - paster --plugin=ckan datapusher resubmit -c /etc/ckan/default/ckan.ini - -To Resubmit a specific resource, whether or not the hash of the data file has changed:: - - ckan -c /etc/ckan/default/ckan.ini datapusher submit {dataset_id} - -On CKAN<=2.8: - - paster --plugin=ckan datapusher submit -c /etc/ckan/default/ckan.ini - - -## License - -This material is copyright (c) 2020 Open Knowledge Foundation and other contributors - -It is open and licensed under the GNU Affero General Public License (AGPL) v3.0 -whose full text may be found at: - -[http://www.fsf.org/licensing/licenses/agpl-3.0.html]() diff --git a/datapusher/jobs.py.bak b/datapusher/jobs.py.bak deleted file mode 100644 index 3d2a92c..0000000 --- a/datapusher/jobs.py.bak +++ /dev/null @@ -1,559 +0,0 @@ -# -*- coding: utf-8 -*- - - -import json -import requests -try: - from urllib.parse import urlsplit -except ImportError: - from urlparse import urlsplit - -import itertools -import datetime -import locale -import logging -import decimal -import hashlib -import time -import tempfile -import sys - -import messytables - -import ckanserviceprovider.job as job -import ckanserviceprovider.util as util -from ckanserviceprovider import web - -if locale.getdefaultlocale()[0]: - lang, encoding = locale.getdefaultlocale() - locale.setlocale(locale.LC_ALL, locale=(lang, encoding)) -else: - locale.setlocale(locale.LC_ALL, '') - -DATAPUSHER_SSL_VERIFY = web.app.config.get('DATAPUSHER_SSL_VERIFY', True) -MAX_CONTENT_LENGTH = web.app.config.get('MAX_CONTENT_LENGTH') or 10485760 -CHUNK_SIZE = web.app.config.get('CHUNK_SIZE') or 16384 -CHUNK_INSERT_ROWS = web.app.config.get('CHUNK_INSERT_ROWS') or 250 -DOWNLOAD_TIMEOUT = web.app.config.get('DOWNLOAD_TIMEOUT') or 30 -USE_PROXY = 'DOWNLOAD_PROXY' in web.app.config -if USE_PROXY: - DOWNLOAD_PROXY = web.app.config.get('DOWNLOAD_PROXY') - -if DATAPUSHER_SSL_VERIFY in ['False', 'FALSE', '0', False, 0]: - DATAPUSHER_SSL_VERIFY = False -else: - DATAPUSHER_SSL_VERIFY = True - -if not DATAPUSHER_SSL_VERIFY: - requests.packages.urllib3.disable_warnings() - -_TYPE_MAPPING = { - 'String': 'text', - # 'int' may not be big enough, - # and type detection may not realize it needs to be big - 'Integer': 'numeric', - 'Decimal': 'numeric', - 'DateUtil': 'timestamp' -} - -_TYPES = [messytables.StringType, messytables.DecimalType, - messytables.IntegerType, messytables.DateUtilType] - -TYPE_MAPPING = web.app.config.get('TYPE_MAPPING', _TYPE_MAPPING) -TYPES = web.app.config.get('TYPES', _TYPES) - -DATASTORE_URLS = { - 'datastore_delete': '{ckan_url}/api/action/datastore_delete', - 'resource_update': '{ckan_url}/api/action/resource_update' -} - - -class HTTPError(util.JobError): - """Exception that's raised if a job fails due to an HTTP problem.""" - - def __init__(self, message, status_code, request_url, response): - """Initialise a new HTTPError. - - :param message: A human-readable error message - :type message: string - - :param status_code: The status code of the errored HTTP response, - e.g. 500 - :type status_code: int - - :param request_url: The URL that was requested - :type request_url: string - - :param response: The body of the errored HTTP response as unicode - (if you have a requests.Response object then response.text will - give you this) - :type response: unicode - - """ - super(HTTPError, self).__init__(message) - self.status_code = status_code - self.request_url = request_url - self.response = response - - def as_dict(self): - """Return a JSON-serializable dictionary representation of this error. - - Suitable for ckanserviceprovider to return to the client site as the - value for the "error" key in the job dict. - - """ - if self.response and len(self.response) > 200: - response = self.response[:200] + '...' - else: - response = self.response - return { - "message": self.message, - "HTTP status code": self.status_code, - "Requested URL": self.request_url, - "Response": response, - } - - def __str__(self): - return '{} status={} url={} response={}'.format( - self.message, self.status_code, self.request_url, self.response) \ - .encode('ascii', 'replace') - - -def get_url(action, ckan_url): - """ - Get url for ckan action - """ - if not urlsplit(ckan_url).scheme: - ckan_url = 'http://' + ckan_url.lstrip('/') - ckan_url = ckan_url.rstrip('/') - return '{ckan_url}/api/3/action/{action}'.format( - ckan_url=ckan_url, action=action) - - -def check_response(response, request_url, who, good_status=(201, 200), ignore_no_success=False): - """ - Checks the response and raises exceptions if something went terribly wrong - - :param who: A short name that indicated where the error occurred - (for example "CKAN") - :param good_status: Status codes that should not raise an exception - - """ - if not response.status_code: - raise HTTPError( - 'DataPusher received an HTTP response with no status code', - status_code=None, request_url=request_url, response=response.text) - - message = '{who} bad response. Status code: {code} {reason}. At: {url}.' - try: - if response.status_code not in good_status: - json_response = response.json() - if not ignore_no_success or json_response.get('success'): - try: - message = json_response["error"]["message"] - except Exception: - message = message.format( - who=who, code=response.status_code, - reason=response.reason, url=request_url) - raise HTTPError( - message, status_code=response.status_code, - request_url=request_url, response=response.text) - except ValueError: - message = message.format( - who=who, code=response.status_code, reason=response.reason, - url=request_url, resp=response.text[:200]) - raise HTTPError( - message, status_code=response.status_code, request_url=request_url, - response=response.text) - - -def chunky(items, num_items_per_chunk): - """ - Breaks up a list of items into chunks - multiple smaller lists of items. - The last chunk is flagged up. - - :param items: Size of each chunks - :type items: iterable - :param num_items_per_chunk: Size of each chunks - :type num_items_per_chunk: int - - :returns: multiple tuples: (chunk, is_it_the_last_chunk) - :rtype: generator of (list, bool) - """ - items_ = iter(items) - chunk = list(itertools.islice(items_, num_items_per_chunk)) - while chunk: - next_chunk = list(itertools.islice(items_, num_items_per_chunk)) - chunk_is_the_last_one = not next_chunk - yield chunk, chunk_is_the_last_one - chunk = next_chunk - - -class DatastoreEncoder(json.JSONEncoder): - # Custon JSON encoder - def default(self, obj): - if isinstance(obj, datetime.datetime): - return obj.isoformat() - if isinstance(obj, decimal.Decimal): - return str(obj) - - return json.JSONEncoder.default(self, obj) - - -def delete_datastore_resource(resource_id, api_key, ckan_url): - try: - delete_url = get_url('datastore_delete', ckan_url) - response = requests.post(delete_url, - verify=DATAPUSHER_SSL_VERIFY, - data=json.dumps({'id': resource_id, - 'force': True}), - headers={'Content-Type': 'application/json', - 'Authorization': api_key} - ) - check_response(response, delete_url, 'CKAN', - good_status=(201, 200, 404), ignore_no_success=True) - except requests.exceptions.RequestException: - raise util.JobError('Deleting existing datastore failed.') - - -def datastore_resource_exists(resource_id, api_key, ckan_url): - try: - search_url = get_url('datastore_search', ckan_url) - response = requests.post(search_url, - verify=DATAPUSHER_SSL_VERIFY, - data=json.dumps({'id': resource_id, - 'limit': 0}), - headers={'Content-Type': 'application/json', - 'Authorization': api_key} - ) - if response.status_code == 404: - return False - elif response.status_code == 200: - return response.json().get('result', {'fields': []}) - else: - raise HTTPError( - 'Error getting datastore resource.', - response.status_code, search_url, response, - ) - except requests.exceptions.RequestException as e: - raise util.JobError( - 'Error getting datastore resource ({!s}).'.format(e)) - - -def send_resource_to_datastore(resource, headers, records, - is_it_the_last_chunk, api_key, ckan_url): - """ - Stores records in CKAN datastore - """ - request = {'resource_id': resource['id'], - 'fields': headers, - 'force': True, - 'records': records, - 'calculate_record_count': is_it_the_last_chunk} - - url = get_url('datastore_create', ckan_url) - r = requests.post(url, - verify=DATAPUSHER_SSL_VERIFY, - data=json.dumps(request, cls=DatastoreEncoder), - headers={'Content-Type': 'application/json', - 'Authorization': api_key} - ) - check_response(r, url, 'CKAN DataStore') - - -def update_resource(resource, api_key, ckan_url): - """ - Update webstore_url and webstore_last_updated in CKAN - """ - - resource['url_type'] = 'datapusher' - - url = get_url('resource_update', ckan_url) - r = requests.post( - url, - verify=DATAPUSHER_SSL_VERIFY, - data=json.dumps(resource), - headers={'Content-Type': 'application/json', - 'Authorization': api_key} - ) - - check_response(r, url, 'CKAN') - - -def get_resource(resource_id, ckan_url, api_key): - """ - Gets available information about the resource from CKAN - """ - url = get_url('resource_show', ckan_url) - r = requests.post(url, - verify=DATAPUSHER_SSL_VERIFY, - data=json.dumps({'id': resource_id}), - headers={'Content-Type': 'application/json', - 'Authorization': api_key} - ) - check_response(r, url, 'CKAN') - - return r.json()['result'] - - -def validate_input(input): - # Especially validate metdata which is provided by the user - if 'metadata' not in input: - raise util.JobError('Metadata missing') - - data = input['metadata'] - - if 'resource_id' not in data: - raise util.JobError('No id provided.') - if 'ckan_url' not in data: - raise util.JobError('No ckan_url provided.') - if not input.get('api_key'): - raise util.JobError('No CKAN API key provided') - - -@job.asynchronous -def push_to_datastore(task_id, input, dry_run=False): - '''Download and parse a resource push its data into CKAN's DataStore. - - An asynchronous job that gets a resource from CKAN, downloads the - resource's data file and, if the data file has changed since last time, - parses the data and posts it into CKAN's DataStore. - - :param dry_run: Fetch and parse the data file but don't actually post the - data to the DataStore, instead return the data headers and rows that - would have been posted. - :type dry_run: boolean - - ''' - logging.debug("Executing push_to_datastore job") - logging.debug("DATAPUSHER_SSL_VERIFY %s" % DATAPUSHER_SSL_VERIFY) - - # This response_logger is used to report job activity to ckan - handler = util.StoringHandler(task_id, input) - response_logger = logging.getLogger(task_id) - response_logger.addHandler(handler) - response_logger.setLevel(logging.DEBUG) - - validate_input(input) - - data = input['metadata'] - - ckan_url = data['ckan_url'] - - # See ckan/ckanext/datapusher/logic/action.py - # See https://github.com/ckan/ckan-service-provider/blob/master/ckanserviceprovider/web.py - callback_url = input.get('result_url') - - resource_id = data['resource_id'] - original_url = data.get('original_url', None) - original_base_url = data.get('original_base_url', '') - - api_key = input.get('api_key') - - logging.debug("callback_url: %s" % callback_url) - logging.debug("resource_id: %s" % resource_id) - logging.debug("api_key: %s............" % api_key[0:8]) - logging.debug("ckan_url: %s" % ckan_url) - logging.debug("original_url: %s" % original_url) - logging.debug("original_base_url: %s" % original_base_url) - - try: - resource = get_resource(resource_id, ckan_url, api_key) - except util.JobError as e: - # try again in 5 seconds just incase CKAN is slow at adding resource - time.sleep(5) - resource = get_resource(resource_id, ckan_url, api_key) - - # check if the resource url_type is a datastore - if resource.get('url_type') == 'datastore': - response_logger.info('Dump files are managed with the Datastore API') - return - - # check scheme - # this url is the external url of the resource if behind reverse proxy - # we had to replace this url_base with callback_url_base - url = resource.get('url') - - # here we replace original_base_url, with ckan_url in the url variable - # example: - # ckan_url: http://ckan:5000/ - # original_base_url: https://ckanportal.mydomain.com - # - # url: - # before: https://ckanportal.mydomain.com/dataset/515fa5c0-058e-4b53-b5ed-74ff38aca428/resource/203de699-c2fc-40f7-a740-0369a2ebaa78/download/test.csv - # after: http://ckan:5000/dataset/515fa5c0-058e-4b53-b5ed-74ff38aca428/resource/203de699-c2fc-40f7-a740-0369a2ebaa78/download/test.csv - if (original_base_url): - original_base_url_strip = original_base_url.rstrip("/") - ckan_url_strip = ckan_url.rstrip("/") - url = url.replace(original_base_url_strip, ckan_url_strip) - - logging.debug("Verifying resource from url: %s" % url) - - scheme = urlsplit(url).scheme - if scheme not in ('http', 'https', 'ftp'): - raise util.JobError( - 'Only http, https, and ftp resources may be fetched.' - ) - - # fetch the resource data - response_logger.info('Fetching from: {0}'.format(url)) - headers = {} - if resource.get('url_type') == 'upload': - # If this is an uploaded file to CKAN, authenticate the request, - # otherwise we won't get file from private resources - headers['Authorization'] = api_key - try: - kwargs = {'headers': headers, 'timeout': DOWNLOAD_TIMEOUT, - 'verify': DATAPUSHER_SSL_VERIFY, 'stream': True} - if USE_PROXY: - kwargs['proxies'] = {'http': DOWNLOAD_PROXY, 'https': DOWNLOAD_PROXY} - response = requests.get(url, **kwargs) - response.raise_for_status() - - cl = response.headers.get('content-length') - try: - if cl and int(cl) > MAX_CONTENT_LENGTH: - raise util.JobError( - 'Resource too large to download: {cl} > max ({max_cl}).' - .format(cl=cl, max_cl=MAX_CONTENT_LENGTH)) - except ValueError: - pass - - tmp = tempfile.TemporaryFile() - length = 0 - m = hashlib.md5() - for chunk in response.iter_content(CHUNK_SIZE): - length += len(chunk) - if length > MAX_CONTENT_LENGTH: - raise util.JobError( - 'Resource too large to process: {cl} > max ({max_cl}).' - .format(cl=length, max_cl=MAX_CONTENT_LENGTH)) - tmp.write(chunk) - m.update(chunk) - - ct = response.headers.get('content-type', '').split(';', 1)[0] - - except requests.HTTPError as e: - raise HTTPError( - "DataPusher received a bad HTTP response when trying to download " - "the data file", status_code=e.response.status_code, - request_url=url, response=e.response.content) - except requests.RequestException as e: - raise HTTPError( - message=str(e), status_code=None, - request_url=url, response=None) - - file_hash = m.hexdigest() - tmp.seek(0) - - if (resource.get('hash') == file_hash - and not data.get('ignore_hash')): - response_logger.info("The file hash hasn't changed: {hash}.".format( - hash=file_hash)) - return - - resource['hash'] = file_hash - - try: - table_set = messytables.any_tableset(tmp, mimetype=ct, extension=ct) - except messytables.ReadError as e: - # try again with format - tmp.seek(0) - try: - format = resource.get('format') - table_set = messytables.any_tableset(tmp, mimetype=format, extension=format) - except: - raise util.JobError(e) - - get_row_set = web.app.config.get('GET_ROW_SET', - lambda table_set: table_set.tables.pop()) - row_set = get_row_set(table_set) - offset, headers = messytables.headers_guess(row_set.sample) - - existing = datastore_resource_exists(resource_id, api_key, ckan_url) - existing_info = None - if existing: - existing_info = dict((f['id'], f['info']) - for f in existing.get('fields', []) if 'info' in f) - - # Some headers might have been converted from strings to floats and such. - headers = [str(header) for header in headers] - - row_set.register_processor(messytables.headers_processor(headers)) - row_set.register_processor(messytables.offset_processor(offset + 1)) - types = messytables.type_guess(row_set.sample, types=TYPES, strict=True) - - # override with types user requested - if existing_info: - types = [{ - 'text': messytables.StringType(), - 'numeric': messytables.DecimalType(), - 'timestamp': messytables.DateUtilType(), - }.get(existing_info.get(h, {}).get('type_override'), t) - for t, h in zip(types, headers)] - - row_set.register_processor(messytables.types_processor(types)) - - headers = [header.strip() for header in headers if header.strip()] - headers_set = set(headers) - - def row_iterator(): - for row in row_set: - data_row = {} - for index, cell in enumerate(row): - column_name = cell.column.strip() - if column_name not in headers_set: - continue - if isinstance(cell.value, str): - try: - data_row[column_name] = cell.value.encode('latin-1').decode('utf-8') - except (UnicodeDecodeError, UnicodeEncodeError): - data_row[column_name] = cell.value - else: - data_row[column_name] = cell.value - yield data_row - result = row_iterator() - - ''' - Delete existing datstore resource before proceeding. Otherwise - 'datastore_create' will append to the existing datastore. And if - the fields have significantly changed, it may also fail. - ''' - if existing: - response_logger.info('Deleting "{res_id}" from datastore.'.format( - res_id=resource_id)) - delete_datastore_resource(resource_id, api_key, ckan_url) - - headers_dicts = [dict(id=field[0], type=TYPE_MAPPING[str(field[1])]) - for field in zip(headers, types)] - - # Maintain data dictionaries from matching column names - if existing_info: - for h in headers_dicts: - if h['id'] in existing_info: - h['info'] = existing_info[h['id']] - # create columns with types user requested - type_override = existing_info[h['id']].get('type_override') - if type_override in list(_TYPE_MAPPING.values()): - h['type'] = type_override - - response_logger.info('Determined headers and types: {headers}'.format( - headers=headers_dicts)) - - if dry_run: - return headers_dicts, result - - count = 0 - for i, chunk in enumerate(chunky(result, CHUNK_INSERT_ROWS)): - records, is_it_the_last_chunk = chunk - count += len(records) - response_logger.info('Saving chunk {number} {is_last}'.format( - number=i, is_last='(last)' if is_it_the_last_chunk else '')) - send_resource_to_datastore(resource, headers_dicts, records, - is_it_the_last_chunk, api_key, ckan_url) - - response_logger.info('Successfully pushed {n} entries to "{res_id}".'.format( - n=count, res_id=resource_id)) - - if data.get('set_url_type', False): - update_resource(resource, api_key, ckan_url) diff --git a/deployment/datapusher_settings.py.bak b/deployment/datapusher_settings.py.bak deleted file mode 100644 index e8e395b..0000000 --- a/deployment/datapusher_settings.py.bak +++ /dev/null @@ -1,31 +0,0 @@ -import os -import uuid - -DEBUG = False -TESTING = False -SECRET_KEY = str(uuid.uuid4()) -USERNAME = str(uuid.uuid4()) -PASSWORD = str(uuid.uuid4()) - -NAME = 'datapusher' - -# Webserver host and port - -HOST = os.environ.get('DATAPUSHER_HOST', '0.0.0.0') -PORT = os.environ.get('DATAPUSHER_PORT', 8800) - -# Database - -SQLALCHEMY_DATABASE_URI = os.environ.get('DATAPUSHER_SQLALCHEMY_DATABASE_URI', 'sqlite:////tmp/job_store.db') - -# Download and streaming settings - -MAX_CONTENT_LENGTH = int(os.environ.get('DATAPUSHER_MAX_CONTENT_LENGTH', '1024000')) -CHUNK_SIZE = int(os.environ.get('DATAPUSHER_CHUNK_SIZE', '16384')) -CHUNK_INSERT_ROWS = int(os.environ.get('DATAPUSHER_CHUNK_INSERT_ROWS', '250')) -DOWNLOAD_TIMEOUT = int(os.environ.get('DATAPUSHER_DOWNLOAD_TIMEOUT', '30')) - -# Verify SSL (Prevent overlapping with CKAN SSL_VERIFY) -DATAPUSHER_SSL_VERIFY = os.environ.get('DATAPUSHER_SSL_VERIFY', True) -# logging -#LOG_FILE = '/tmp/ckan_service.log'