Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 175 additions & 58 deletions datapusher/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
except ImportError:
from urlparse import urlsplit

from dateutil.parser import ParserError
import itertools
import datetime
import locale
Expand All @@ -18,6 +19,10 @@
import tempfile

import messytables
import numpy as np
from numpy.random import default_rng
import pandas as pd
import re

import ckanserviceprovider.job as job
import ckanserviceprovider.util as util
Expand All @@ -29,6 +34,7 @@
else:
locale.setlocale(locale.LC_ALL, '')

MAX_DESCRIPTION_LENGTH = web.app.config.get('MAX_DESCRIPTION_LENGTH') or 255
MAX_CONTENT_LENGTH = web.app.config.get('MAX_CONTENT_LENGTH') or 10485760
CHUNK_SIZE = web.app.config.get('CHUNK_SIZE') or 16384
CHUNK_INSERT_ROWS = web.app.config.get('CHUNK_INSERT_ROWS') or 250
Expand All @@ -51,7 +57,14 @@
# and type detection may not realize it needs to be big
'Integer': 'numeric',
'Decimal': 'numeric',
'DateUtil': 'timestamp'
'DateUtil': 'timestamp',
'int4': 'int4', # PostgreSQL types are mapped 1:1
'int8': 'int8',
'bigint': 'bigint',
'numeric': 'numeric',
'float': 'float',
'timestamp': 'timestamp',
'text': 'text'
}

_TYPES = [messytables.StringType, messytables.DecimalType,
Expand Down Expand Up @@ -315,6 +328,149 @@ def validate_input(input):
if not input.get('api_key'):
raise util.JobError('No CKAN API key provided')

def old_sniff_algorithm( tmp, ct, resource_id, api_key, ckan_url, existing_info=None ):

try:
table_set = messytables.any_tableset(tmp, mimetype=ct, extension=ct)
except messytables.ReadError as e:
# try again with format
tmp.seek(0)
try:
format = resource.get('format')
table_set = messytables.any_tableset(tmp, mimetype=format, extension=format)
except:
raise util.JobError(e)

get_row_set = web.app.config.get('GET_ROW_SET',
lambda table_set: table_set.tables.pop())
row_set = get_row_set(table_set)
offset, headers = messytables.headers_guess(row_set.sample)

# Some headers might have been converted from strings to floats and such.
headers = [str(header) for header in headers]

row_set.register_processor(messytables.headers_processor(headers))
row_set.register_processor(messytables.offset_processor(offset + 1))
types = messytables.type_guess(row_set.sample, types=TYPES, strict=True)

# override with types user requested
if existing_info:
types = [{
'text': messytables.StringType(),
'numeric': messytables.DecimalType(),
'timestamp': messytables.DateUtilType(),
}.get(existing_info.get(h, {}).get('type_override'), t)
for t, h in zip(types, headers)]

row_set.register_processor(messytables.types_processor(types))

headers = [header.strip() for header in headers if header.strip()]
headers_set = set(headers)

def row_iterator():
for row in row_set:
data_row = {}
for index, cell in enumerate(row):
column_name = cell.column.strip()
if column_name not in headers_set:
continue
if isinstance(cell.value, str):
try:
data_row[column_name] = cell.value.encode('latin-1').decode('utf-8')
except (UnicodeDecodeError, UnicodeEncodeError):
data_row[column_name] = cell.value
else:
data_row[column_name] = cell.value
yield data_row
result = row_iterator()

return result, headers, types

def sanitize_column_name( input ):
"""Clean up column names for insertion into CKAN."""
string = str(input).strip()
while string[0] == '_':
string = string[1:]

if '%' in string:
string = f'[{string}]'
return string

def reformat_pandas_description( series ):

clipped = str(series.describe( datetime_is_numeric=True )).split('\n')[:-1]
compressed = [re.sub( r'^([\w\%]+)[ \t]+', r'**\1**: ', x ) for x in clipped]
return ';\n'.join( compressed )

def pandas_sniff_algorithm( tmp, ct, resource_id, api_key, ckan_url, ext='CSV' ):

try:
if ext == 'TSV':
dataset = pd.read_csv( tmp, sep='\t' )
else:
dataset = pd.read_csv( tmp ) # fall back to something more permissive

except Exception as e:
raise util.JobError(e)

# handle odd column names
if (dataset.columns[0] == 'Unnamed: 0'):
dataset.drop( ['Unnamed: 0'], axis=1, inplace=True ) # almost certainly an index column, thus redundant

dataset.columns = [sanitize_column_name(x) for x in dataset.columns] # rename the source!
headers = dataset.columns
types = list()
notes = list()

rng = default_rng() # for random sampling
for i,header in enumerate(headers):

# pandas has done the heavy lifting for us
if (dataset.dtypes[i] == np.int64) or (dataset.dtypes[i] == np.int32):
if (dataset[header].max() < (1<<32)) and (dataset[header].min() >= -(1<<32)):
types.append( 'int4' )
elif (dataset[header].max() < (1<<64)) and (dataset[header].min() >= -(1<<64)):
types.append( 'int8' )
else:
types.append( 'numeric' )
notes.append( reformat_pandas_description( dataset[header] ) )
continue

elif dataset.dtypes[i] == np.float64:
types.append( 'float' )
notes.append( reformat_pandas_description( dataset[header] ) )
continue

# datetimes should be converted to something PostgreSQL understands
elif str(dataset.dtypes[i]).startswith('datetime64'): # TODO: fix this ugly hack
types.append( 'timestamp' )
notes.append( reformat_pandas_description( dataset[header] ) )
dataset[header] = dataset[header].dt.strftime( '%Y-%m-%d %H:%M:%S %z' )
continue

# let pandas handle the sniffing for us
try:
target = pd.to_datetime( dataset[header] )
types.append( 'timestamp' )
notes.append( reformat_pandas_description( target ) )
dataset[header] = target.dt.strftime( '%Y-%m-%d %H:%M:%S %z' )

except (TypeError, ParserError) as e:
types.append( 'text' )
cats = pd.Categorical( dataset[header] )
proposed = 'One of "' + '", "'.join( cats.categories[:-1] ) + f'", or "{cats.categories[-1]}".'

if len(proposed) > MAX_DESCRIPTION_LENGTH:
notes.append( reformat_pandas_description( dataset[header] ) )
else:
notes.append( proposed )

def iterator():
for idx,row in dataset.iterrows():
output = {x:None if (type(row[x]) is float) and (np.isnan(row[x])) else row[x] for x in row.keys()}
yield output # we never reject a column; must be serializable

return iterator(), headers, types, notes

@job.asynchronous
def push_to_datastore(task_id, input, dry_run=False):
Expand Down Expand Up @@ -429,65 +585,25 @@ def push_to_datastore(task_id, input, dry_run=False):

resource['hash'] = file_hash

try:
table_set = messytables.any_tableset(tmp, mimetype=ct, extension=ct)
except messytables.ReadError as e:
# try again with format
tmp.seek(0)
try:
format = resource.get('format')
table_set = messytables.any_tableset(tmp, mimetype=format, extension=format)
except:
raise util.JobError(e)

get_row_set = web.app.config.get('GET_ROW_SET',
lambda table_set: table_set.tables.pop())
row_set = get_row_set(table_set)
offset, headers = messytables.headers_guess(row_set.sample)

# grab any existing info
existing = datastore_resource_exists(resource_id, api_key, ckan_url)
existing_info = None
if existing:
existing_info = dict((f['id'], f['info'])
for f in existing.get('fields', []) if 'info' in f)

# Some headers might have been converted from strings to floats and such.
headers = [str(header) for header in headers]

row_set.register_processor(messytables.headers_processor(headers))
row_set.register_processor(messytables.offset_processor(offset + 1))
types = messytables.type_guess(row_set.sample, types=TYPES, strict=True)

# override with types user requested
if existing_info:
types = [{
'text': messytables.StringType(),
'numeric': messytables.DecimalType(),
'timestamp': messytables.DateUtilType(),
}.get(existing_info.get(h, {}).get('type_override'), t)
for t, h in zip(types, headers)]

row_set.register_processor(messytables.types_processor(types))

headers = [header.strip() for header in headers if header.strip()]
headers_set = set(headers)

def row_iterator():
for row in row_set:
data_row = {}
for index, cell in enumerate(row):
column_name = cell.column.strip()
if column_name not in headers_set:
continue
if isinstance(cell.value, str):
try:
data_row[column_name] = cell.value.encode('latin-1').decode('utf-8')
except (UnicodeDecodeError, UnicodeEncodeError):
data_row[column_name] = cell.value
else:
data_row[column_name] = cell.value
yield data_row
result = row_iterator()
# branch depending on format
if '?' in url:
ext = url[ : url.index('?') ][-4:] # strip off the extra portion of the URL
else:
ext = url[-4:]
if ext in {'.csv','.CSV'}:
iterator, headers, types, notes = pandas_sniff_algorithm( tmp, ct, resource_id, api_key, ckan_url )
elif ext in {'.tsv','.TSV'}:
iterator, headers, types, notes = pandas_sniff_algorithm( tmp, ct, resource_id, api_key, ckan_url, ext='TSV' )
else:
iterator, headers, types = old_sniff_algorithm( tmp, ct, resource_id, api_key, ckan_url, existing_info )
notes = [None] * len(headers) # fill in some dummy info

'''
Delete existing datstore resource before proceeding. Otherwise
Expand All @@ -499,8 +615,9 @@ def row_iterator():
res_id=resource_id))
delete_datastore_resource(resource_id, api_key, ckan_url)

headers_dicts = [dict(id=field[0], type=TYPE_MAPPING[str(field[1])])
for field in zip(headers, types)]
headers_dicts = [dict(id=field[0], type=TYPE_MAPPING[str(field[1])], \
info=dict(notes=field[2]))
for field in zip(headers, types, notes)]

# Maintain data dictionaries from matching column names
if existing_info:
Expand All @@ -516,10 +633,10 @@ def row_iterator():
headers=headers_dicts))

if dry_run:
return headers_dicts, result
return headers_dicts, iterator

count = 0
for i, chunk in enumerate(chunky(result, CHUNK_INSERT_ROWS)):
for i, chunk in enumerate(chunky(iterator, CHUNK_INSERT_ROWS)):
records, is_it_the_last_chunk = chunk
count += len(records)
logger.info('Saving chunk {number} {is_last}'.format(
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ html5lib==1.0.1
messytables==0.15.2
certifi
requests[security]==2.27.1
pandas