diff --git a/datapusher/jobs.py b/datapusher/jobs.py index 8504a90..db25270 100644 --- a/datapusher/jobs.py +++ b/datapusher/jobs.py @@ -8,6 +8,7 @@ except ImportError: from urlparse import urlsplit +from dateutil.parser import ParserError import itertools import datetime import locale @@ -18,6 +19,10 @@ import tempfile import messytables +import numpy as np +from numpy.random import default_rng +import pandas as pd +import re import ckanserviceprovider.job as job import ckanserviceprovider.util as util @@ -29,6 +34,7 @@ else: locale.setlocale(locale.LC_ALL, '') +MAX_DESCRIPTION_LENGTH = web.app.config.get('MAX_DESCRIPTION_LENGTH') or 255 MAX_CONTENT_LENGTH = web.app.config.get('MAX_CONTENT_LENGTH') or 10485760 CHUNK_SIZE = web.app.config.get('CHUNK_SIZE') or 16384 CHUNK_INSERT_ROWS = web.app.config.get('CHUNK_INSERT_ROWS') or 250 @@ -51,7 +57,14 @@ # and type detection may not realize it needs to be big 'Integer': 'numeric', 'Decimal': 'numeric', - 'DateUtil': 'timestamp' + 'DateUtil': 'timestamp', + 'int4': 'int4', # PostgreSQL types are mapped 1:1 + 'int8': 'int8', + 'bigint': 'bigint', + 'numeric': 'numeric', + 'float': 'float', + 'timestamp': 'timestamp', + 'text': 'text' } _TYPES = [messytables.StringType, messytables.DecimalType, @@ -315,6 +328,149 @@ def validate_input(input): if not input.get('api_key'): raise util.JobError('No CKAN API key provided') +def old_sniff_algorithm( tmp, ct, resource_id, api_key, ckan_url, existing_info=None ): + + try: + table_set = messytables.any_tableset(tmp, mimetype=ct, extension=ct) + except messytables.ReadError as e: + # try again with format + tmp.seek(0) + try: + format = resource.get('format') + table_set = messytables.any_tableset(tmp, mimetype=format, extension=format) + except: + raise util.JobError(e) + + get_row_set = web.app.config.get('GET_ROW_SET', + lambda table_set: table_set.tables.pop()) + row_set = get_row_set(table_set) + offset, headers = messytables.headers_guess(row_set.sample) + + # Some headers might have been converted from strings to floats and such. + headers = [str(header) for header in headers] + + row_set.register_processor(messytables.headers_processor(headers)) + row_set.register_processor(messytables.offset_processor(offset + 1)) + types = messytables.type_guess(row_set.sample, types=TYPES, strict=True) + + # override with types user requested + if existing_info: + types = [{ + 'text': messytables.StringType(), + 'numeric': messytables.DecimalType(), + 'timestamp': messytables.DateUtilType(), + }.get(existing_info.get(h, {}).get('type_override'), t) + for t, h in zip(types, headers)] + + row_set.register_processor(messytables.types_processor(types)) + + headers = [header.strip() for header in headers if header.strip()] + headers_set = set(headers) + + def row_iterator(): + for row in row_set: + data_row = {} + for index, cell in enumerate(row): + column_name = cell.column.strip() + if column_name not in headers_set: + continue + if isinstance(cell.value, str): + try: + data_row[column_name] = cell.value.encode('latin-1').decode('utf-8') + except (UnicodeDecodeError, UnicodeEncodeError): + data_row[column_name] = cell.value + else: + data_row[column_name] = cell.value + yield data_row + result = row_iterator() + + return result, headers, types + +def sanitize_column_name( input ): + """Clean up column names for insertion into CKAN.""" + string = str(input).strip() + while string[0] == '_': + string = string[1:] + + if '%' in string: + string = f'[{string}]' + return string + +def reformat_pandas_description( series ): + + clipped = str(series.describe( datetime_is_numeric=True )).split('\n')[:-1] + compressed = [re.sub( r'^([\w\%]+)[ \t]+', r'**\1**: ', x ) for x in clipped] + return ';\n'.join( compressed ) + +def pandas_sniff_algorithm( tmp, ct, resource_id, api_key, ckan_url, ext='CSV' ): + + try: + if ext == 'TSV': + dataset = pd.read_csv( tmp, sep='\t' ) + else: + dataset = pd.read_csv( tmp ) # fall back to something more permissive + + except Exception as e: + raise util.JobError(e) + + # handle odd column names + if (dataset.columns[0] == 'Unnamed: 0'): + dataset.drop( ['Unnamed: 0'], axis=1, inplace=True ) # almost certainly an index column, thus redundant + + dataset.columns = [sanitize_column_name(x) for x in dataset.columns] # rename the source! + headers = dataset.columns + types = list() + notes = list() + + rng = default_rng() # for random sampling + for i,header in enumerate(headers): + + # pandas has done the heavy lifting for us + if (dataset.dtypes[i] == np.int64) or (dataset.dtypes[i] == np.int32): + if (dataset[header].max() < (1<<32)) and (dataset[header].min() >= -(1<<32)): + types.append( 'int4' ) + elif (dataset[header].max() < (1<<64)) and (dataset[header].min() >= -(1<<64)): + types.append( 'int8' ) + else: + types.append( 'numeric' ) + notes.append( reformat_pandas_description( dataset[header] ) ) + continue + + elif dataset.dtypes[i] == np.float64: + types.append( 'float' ) + notes.append( reformat_pandas_description( dataset[header] ) ) + continue + + # datetimes should be converted to something PostgreSQL understands + elif str(dataset.dtypes[i]).startswith('datetime64'): # TODO: fix this ugly hack + types.append( 'timestamp' ) + notes.append( reformat_pandas_description( dataset[header] ) ) + dataset[header] = dataset[header].dt.strftime( '%Y-%m-%d %H:%M:%S %z' ) + continue + + # let pandas handle the sniffing for us + try: + target = pd.to_datetime( dataset[header] ) + types.append( 'timestamp' ) + notes.append( reformat_pandas_description( target ) ) + dataset[header] = target.dt.strftime( '%Y-%m-%d %H:%M:%S %z' ) + + except (TypeError, ParserError) as e: + types.append( 'text' ) + cats = pd.Categorical( dataset[header] ) + proposed = 'One of "' + '", "'.join( cats.categories[:-1] ) + f'", or "{cats.categories[-1]}".' + + if len(proposed) > MAX_DESCRIPTION_LENGTH: + notes.append( reformat_pandas_description( dataset[header] ) ) + else: + notes.append( proposed ) + + def iterator(): + for idx,row in dataset.iterrows(): + output = {x:None if (type(row[x]) is float) and (np.isnan(row[x])) else row[x] for x in row.keys()} + yield output # we never reject a column; must be serializable + + return iterator(), headers, types, notes @job.asynchronous def push_to_datastore(task_id, input, dry_run=False): @@ -429,65 +585,25 @@ def push_to_datastore(task_id, input, dry_run=False): resource['hash'] = file_hash - try: - table_set = messytables.any_tableset(tmp, mimetype=ct, extension=ct) - except messytables.ReadError as e: - # try again with format - tmp.seek(0) - try: - format = resource.get('format') - table_set = messytables.any_tableset(tmp, mimetype=format, extension=format) - except: - raise util.JobError(e) - - get_row_set = web.app.config.get('GET_ROW_SET', - lambda table_set: table_set.tables.pop()) - row_set = get_row_set(table_set) - offset, headers = messytables.headers_guess(row_set.sample) - + # grab any existing info existing = datastore_resource_exists(resource_id, api_key, ckan_url) existing_info = None if existing: existing_info = dict((f['id'], f['info']) for f in existing.get('fields', []) if 'info' in f) - # Some headers might have been converted from strings to floats and such. - headers = [str(header) for header in headers] - - row_set.register_processor(messytables.headers_processor(headers)) - row_set.register_processor(messytables.offset_processor(offset + 1)) - types = messytables.type_guess(row_set.sample, types=TYPES, strict=True) - - # override with types user requested - if existing_info: - types = [{ - 'text': messytables.StringType(), - 'numeric': messytables.DecimalType(), - 'timestamp': messytables.DateUtilType(), - }.get(existing_info.get(h, {}).get('type_override'), t) - for t, h in zip(types, headers)] - - row_set.register_processor(messytables.types_processor(types)) - - headers = [header.strip() for header in headers if header.strip()] - headers_set = set(headers) - - def row_iterator(): - for row in row_set: - data_row = {} - for index, cell in enumerate(row): - column_name = cell.column.strip() - if column_name not in headers_set: - continue - if isinstance(cell.value, str): - try: - data_row[column_name] = cell.value.encode('latin-1').decode('utf-8') - except (UnicodeDecodeError, UnicodeEncodeError): - data_row[column_name] = cell.value - else: - data_row[column_name] = cell.value - yield data_row - result = row_iterator() + # branch depending on format + if '?' in url: + ext = url[ : url.index('?') ][-4:] # strip off the extra portion of the URL + else: + ext = url[-4:] + if ext in {'.csv','.CSV'}: + iterator, headers, types, notes = pandas_sniff_algorithm( tmp, ct, resource_id, api_key, ckan_url ) + elif ext in {'.tsv','.TSV'}: + iterator, headers, types, notes = pandas_sniff_algorithm( tmp, ct, resource_id, api_key, ckan_url, ext='TSV' ) + else: + iterator, headers, types = old_sniff_algorithm( tmp, ct, resource_id, api_key, ckan_url, existing_info ) + notes = [None] * len(headers) # fill in some dummy info ''' Delete existing datstore resource before proceeding. Otherwise @@ -499,8 +615,9 @@ def row_iterator(): res_id=resource_id)) delete_datastore_resource(resource_id, api_key, ckan_url) - headers_dicts = [dict(id=field[0], type=TYPE_MAPPING[str(field[1])]) - for field in zip(headers, types)] + headers_dicts = [dict(id=field[0], type=TYPE_MAPPING[str(field[1])], \ + info=dict(notes=field[2])) + for field in zip(headers, types, notes)] # Maintain data dictionaries from matching column names if existing_info: @@ -516,10 +633,10 @@ def row_iterator(): headers=headers_dicts)) if dry_run: - return headers_dicts, result + return headers_dicts, iterator count = 0 - for i, chunk in enumerate(chunky(result, CHUNK_INSERT_ROWS)): + for i, chunk in enumerate(chunky(iterator, CHUNK_INSERT_ROWS)): records, is_it_the_last_chunk = chunk count += len(records) logger.info('Saving chunk {number} {is_last}'.format( diff --git a/requirements.txt b/requirements.txt index 14164be..9445644 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ html5lib==1.0.1 messytables==0.15.2 certifi requests[security]==2.27.1 +pandas