From c6e7dc86e59097327e3f95e77f4b15166cc9f51c Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Thu, 11 Apr 2019 15:48:46 -0400 Subject: [PATCH 1/2] support custom delimiters and gzipped files --- README.md | 6 ++++++ tap_s3_csv/config.py | 2 ++ tap_s3_csv/csv_handler.py | 34 ++++++++++++++++++++++++++++++++-- 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 8392529..cd9278a 100644 --- a/README.md +++ b/README.md @@ -130,6 +130,12 @@ See below for an exhaustive list of configuration fields: // format, either "csv" or "excel" "format": "csv", + // record delimter (optional), defaults to "," + "delimiter": "|", + + // if true, unzip the file before reading it at a csv + "unzip": true, + // if the files don't have a header row, you can specify the field names "field_names": ["id", "first_name", "last_name"], diff --git a/tap_s3_csv/config.py b/tap_s3_csv/config.py index 9becc52..f884a47 100644 --- a/tap_s3_csv/config.py +++ b/tap_s3_csv/config.py @@ -13,6 +13,8 @@ Required('pattern'): str, Required('key_properties'): [str], Required('format'): Any('csv', 'excel'), + Optional('unzip'): bool, + Optional('delimiter'): str, Optional('search_prefix'): str, Optional('field_names'): [str], Optional('worksheet_name'): str, diff --git a/tap_s3_csv/csv_handler.py b/tap_s3_csv/csv_handler.py index 491165a..cead343 100644 --- a/tap_s3_csv/csv_handler.py +++ b/tap_s3_csv/csv_handler.py @@ -2,6 +2,9 @@ import csv import re +import zlib +import io + def generator_wrapper(reader): to_return = {} @@ -24,18 +27,45 @@ def generator_wrapper(reader): yield to_return +def gunzip(stream): + dec = zlib.decompressobj(32 + zlib.MAX_WBITS) + for chunk in stream: + rv = dec.decompress(chunk) + if rv: + yield rv + + +def iter_lines(stream): + buf = "" + for chunk in stream: + for byte in chunk: + char = chr(byte) + if char == '\n': + yield buf.encode('utf-8') + buf = "" + else: + buf += char + + def get_row_iterator(table_spec, file_handle): + + if table_spec.get('unzip'): + raw_stream = iter_lines(gunzip(file_handle._raw_stream)) + else: + raw_stream = file_handle._raw_stream + # we use a protected member of the s3 object, _raw_stream, here to create # a generator for data from the s3 file. # pylint: disable=protected-access file_stream = codecs.iterdecode( - file_handle._raw_stream, encoding='utf-8') + raw_stream, encoding='utf-8') field_names = None if 'field_names' in table_spec: field_names = table_spec['field_names'] - reader = csv.DictReader(file_stream, fieldnames=field_names) + delimiter = table_spec.get('delimiter', ',') + reader = csv.DictReader(file_stream, delimiter=delimiter, fieldnames=field_names) return generator_wrapper(reader) From b5c13ffad1b73b5ea0168ca5668c747ad1a00a92 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Thu, 11 Apr 2019 18:02:10 -0400 Subject: [PATCH 2/2] handle use case in which no fields are quoted --- README.md | 4 ++++ tap_s3_csv/config.py | 1 + tap_s3_csv/csv_handler.py | 33 +++++++-------------------------- 3 files changed, 12 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index cd9278a..ef85f42 100644 --- a/README.md +++ b/README.md @@ -136,6 +136,10 @@ See below for an exhaustive list of configuration fields: // if true, unzip the file before reading it at a csv "unzip": true, + // if specified, override the default CSV quoting config + // More info: https://docs.python.org/3/library/csv.html#csv.QUOTE_ALL + "quoting": "QUOTE_NONE", + // if the files don't have a header row, you can specify the field names "field_names": ["id", "first_name", "last_name"], diff --git a/tap_s3_csv/config.py b/tap_s3_csv/config.py index f884a47..5703470 100644 --- a/tap_s3_csv/config.py +++ b/tap_s3_csv/config.py @@ -15,6 +15,7 @@ Required('format'): Any('csv', 'excel'), Optional('unzip'): bool, Optional('delimiter'): str, + Optional('quoting'): Any('QUOTE_MINIMAL', 'QUOTE_ALL', 'QUOTE_NONNUMERIC', 'QUOTE_NONE'), Optional('search_prefix'): str, Optional('field_names'): [str], Optional('worksheet_name'): str, diff --git a/tap_s3_csv/csv_handler.py b/tap_s3_csv/csv_handler.py index cead343..0aa1c61 100644 --- a/tap_s3_csv/csv_handler.py +++ b/tap_s3_csv/csv_handler.py @@ -1,9 +1,7 @@ import codecs import csv import re - -import zlib -import io +import gzip def generator_wrapper(reader): @@ -27,30 +25,9 @@ def generator_wrapper(reader): yield to_return -def gunzip(stream): - dec = zlib.decompressobj(32 + zlib.MAX_WBITS) - for chunk in stream: - rv = dec.decompress(chunk) - if rv: - yield rv - - -def iter_lines(stream): - buf = "" - for chunk in stream: - for byte in chunk: - char = chr(byte) - if char == '\n': - yield buf.encode('utf-8') - buf = "" - else: - buf += char - - def get_row_iterator(table_spec, file_handle): - if table_spec.get('unzip'): - raw_stream = iter_lines(gunzip(file_handle._raw_stream)) + raw_stream = gzip.GzipFile(fileobj=file_handle._raw_stream) else: raw_stream = file_handle._raw_stream @@ -66,6 +43,10 @@ def get_row_iterator(table_spec, file_handle): field_names = table_spec['field_names'] delimiter = table_spec.get('delimiter', ',') - reader = csv.DictReader(file_stream, delimiter=delimiter, fieldnames=field_names) + + quote_config = table_spec.get('quoting', 'QUOTE_MINIMAL') + quoting = getattr(csv, quote_config) + + reader = csv.DictReader(file_stream, quoting=quoting, delimiter=delimiter, fieldnames=field_names) return generator_wrapper(reader)