diff --git a/.gitignore b/.gitignore index c5236de..4270fed 100644 --- a/.gitignore +++ b/.gitignore @@ -92,4 +92,7 @@ ENV/ # Rope project settings .ropeproject -.idea \ No newline at end of file +.idea + +# Vim swp files +*.swp diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..d002c7a --- /dev/null +++ b/.travis.yml @@ -0,0 +1,11 @@ +dist: trusty +sudo: false +language: python +python: + - "3.5" + - "3.6" + - "nightly" + +script: + - python -m unittest + diff --git a/__init__.py b/__init__.py index e69ab3c..16a5a1a 100644 --- a/__init__.py +++ b/__init__.py @@ -1,138 +1,9 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -import os, sys, csv, heapq, shutil from optparse import OptionParser - -class CsvSortError(Exception): - pass - - -def csvsort(input_filename, columns, output_filename='', max_size=100, has_header=True, delimiter=',', quoting=csv.QUOTE_MINIMAL, encoding='utf-8'): - """Sort the CSV file on disk rather than in memory - The merge sort algorithm is used to break the file into smaller sub files and - - :param input_filename: the CSV filename to sort - :param columns: a list of column to sort on (can be 0 based indices or header keys) - :param output_filename: optional filename for sorted file. If not given then input file will be overriden. - :param max_size: the maximum size (in MB) of CSV file to load in memory at once - :param has_header: whether the CSV contains a header to keep separated from sorting - :param delimiter: character used to separate fields, default ',' - :param quoting: type of quoting used in the output - :param encoding: file encoding used in input/output files - """ - tmp_dir = '.csvsorter.{}'.format(os.getpid()) - os.makedirs(tmp_dir, exist_ok=True) - - try: - with open(input_filename, 'r', encoding=encoding) as input_fp: - reader = csv.reader(input_fp, delimiter=delimiter) - if has_header: - header = next(reader) - else: - header = None - - columns = parse_columns(columns, header) - - filenames = csvsplit(reader, max_size, encoding, tmp_dir) - for filename in filenames: - memorysort(filename, columns, encoding) - sorted_filename = mergesort(filenames, columns, tmp_dir=tmp_dir, encoding=encoding) - - # XXX make more efficient by passing quoting, delimiter, and moving result - # generate the final output file - with open(output_filename or input_filename, 'w', newline='\n', encoding=encoding) as output_fp: - writer = csv.writer(output_fp, delimiter=delimiter, quoting=quoting) - if header: - writer.writerow(header) - with open(sorted_filename, 'r', encoding=encoding) as sorted_fp: - rows = csv.reader(sorted_fp) - writer.writerows(rows) - finally: - shutil.rmtree(tmp_dir, ignore_errors=True) - - -def parse_columns(columns, header): - """check the provided column headers - """ - for i, column in enumerate(columns): - if isinstance(column, int): - if (header and column >= len(header)): - raise CsvSortError('Column index is out of range: "{}"'.format(column)) - else: - # find index of column from header - if header: - if column in header: - columns[i] = header.index(column) - else: - raise CsvSortError('Column name is not found in header: "{}"'.format(column)) - else: - raise CsvSortError('CSV needs a header to find index of this column name: "{}"'.format(column)) - return columns - - -def csvsplit(reader, max_size, encoding, tmp_dir): - """Split into smaller CSV files of maximum size and return the list of filenames - """ - max_size = max_size * 1024 * 1024 # convert to bytes - writer = None - current_size = 0 - split_filenames = [] - - # break CSV file into smaller merge files - for row in reader: - if not writer: - filename = os.path.join(tmp_dir, 'split{}.csv'.format(len(split_filenames))) - writer = csv.writer(open(filename, 'w', newline='\n', encoding=encoding)) - split_filenames.append(filename) - - writer.writerow(row) - current_size += sys.getsizeof(row) - if current_size > max_size: - writer = None - current_size = 0 - return split_filenames - - -def memorysort(filename, columns, encoding): - """Sort this CSV file in memory on the given columns - """ - with open(filename, encoding=encoding) as input_fp: - rows = list(csv.reader(input_fp)) - rows.sort(key=lambda row: [row[column] for column in columns]) - with open(filename, 'w', newline='\n', encoding=encoding) as output_fp: - writer = csv.writer(output_fp) - writer.writerows(rows) - - -def yield_csv_rows(filename, columns, encoding): - """Iterator to sort CSV rows - """ - with open(filename, 'r', encoding=encoding) as fp: - for row in csv.reader(fp): - yield row - - -def mergesort(sorted_filenames, columns, nway=2, tmp_dir='', encoding='utf-8'): - """Merge these 2 sorted csv files into a single output file - """ - merge_n = 0 - while len(sorted_filenames) > 1: - merge_filenames, sorted_filenames = sorted_filenames[:nway], sorted_filenames[nway:] - - output_filename = os.path.join(tmp_dir, 'merge{}.csv'.format(merge_n)) - with open(output_filename, 'w', newline='\n', encoding=encoding) as output_fp: - writer = csv.writer(output_fp) - merge_n += 1 - rows = (yield_csv_rows(filename, columns, encoding) for filename in merge_filenames) - writer.writerows(heapq.merge(*rows)) - sorted_filenames.append(output_filename) - - for filename in merge_filenames: - os.remove(filename) - return sorted_filenames[0] - +from .csvsorter import csvsort def main(): parser = OptionParser() @@ -155,3 +26,5 @@ def main(): if __name__ == '__main__': main() + + diff --git a/csvsorter.py b/csvsorter.py new file mode 100644 index 0000000..83e94fc --- /dev/null +++ b/csvsorter.py @@ -0,0 +1,140 @@ + + +import os, sys, csv, heapq, shutil + +class CsvSortError(Exception): + pass + + +def csvsort(input_filename, columns, output_filename='', max_size=100, has_header=True, delimiter=',', quoting=csv.QUOTE_MINIMAL, encoding='utf-8'): + """Sort the CSV file on disk rather than in memory + The merge sort algorithm is used to break the file into smaller sub files and + + :param input_filename: the CSV filename to sort + :param columns: a list of column to sort on (can be 0 based indices or header keys) + :param output_filename: optional filename for sorted file. If not given then input file will be overriden. + :param max_size: the maximum size (in MB) of CSV file to load in memory at once + :param has_header: whether the CSV contains a header to keep separated from sorting + :param delimiter: character used to separate fields, default ',' + :param quoting: type of quoting used in the output + :param encoding: file encoding used in input/output files + """ + tmp_dir = '.csvsorter.{}'.format(os.getpid()) + os.makedirs(tmp_dir, exist_ok=True) + + try: + with open(input_filename, 'r', encoding=encoding) as input_fp: + reader = csv.reader(input_fp, delimiter=delimiter) + if has_header: + header = next(reader) + else: + header = None + + columns = parse_columns(columns, header) + + filenames = csvsplit(reader, max_size, encoding, tmp_dir) + for filename in filenames: + memorysort(filename, columns, encoding) + sorted_filename = mergesort(filenames, columns, tmp_dir=tmp_dir, encoding=encoding) + + # XXX make more efficient by passing quoting, delimiter, and moving result + # generate the final output file + with open(output_filename or input_filename, 'w', newline='\n', encoding=encoding) as output_fp: + writer = csv.writer(output_fp, delimiter=delimiter, quoting=quoting) + if header: + writer.writerow(header) + with open(sorted_filename, 'r', encoding=encoding) as sorted_fp: + rows = csv.reader(sorted_fp) + writer.writerows(rows) + finally: + shutil.rmtree(tmp_dir, ignore_errors=True) + + +def parse_columns(columns, header): + """check the provided column headers + """ + for i, column in enumerate(columns): + if isinstance(column, int): + if (header and column >= len(header)): + raise CsvSortError('Column index is out of range: "{}"'.format(column)) + else: + # find index of column from header + if header: + if column in header: + columns[i] = header.index(column) + else: + raise CsvSortError('Column name is not found in header: "{}"'.format(column)) + else: + raise CsvSortError('CSV needs a header to find index of this column name: "{}"'.format(column)) + return columns + + +def csvsplit(reader, max_size, encoding, tmp_dir): + """Split into smaller CSV files of maximum size and return the list of filenames + """ + max_size = max_size * 1024 * 1024 # convert to bytes + writer = None + current_size = 0 + split_filenames = [] + fout = None + + # break CSV file into smaller merge files + for row in reader: + if not writer: + filename = os.path.join(tmp_dir, 'split{}.csv'.format(len(split_filenames))) + fout = open(filename, 'w', newline='\n', encoding=encoding) + writer = csv.writer(fout) + split_filenames.append(filename) + + writer.writerow(row) + current_size += sys.getsizeof(row) + if current_size > max_size: + writer = None + fout.close() + current_size = 0 + + if not fout.closed: + fout.close() + return split_filenames + + +def memorysort(filename, columns, encoding): + """Sort this CSV file in memory on the given columns + """ + with open(filename, encoding=encoding) as input_fp: + rows = list(csv.reader(input_fp)) + rows.sort(key=lambda row: [row[column] for column in columns]) + with open(filename, 'w', newline='\n', encoding=encoding) as output_fp: + writer = csv.writer(output_fp) + writer.writerows(rows) + + +def yield_csv_rows(filename, columns, encoding): + """Iterator to sort CSV rows + """ + with open(filename, 'r', encoding=encoding) as fp: + for row in csv.reader(fp): + yield row + + +def mergesort(sorted_filenames, columns, nway=2, tmp_dir='', encoding='utf-8'): + """Merge these 2 sorted csv files into a single output file + """ + merge_n = 0 + while len(sorted_filenames) > 1: + merge_filenames, sorted_filenames = sorted_filenames[:nway], sorted_filenames[nway:] + + output_filename = os.path.join(tmp_dir, 'merge{}.csv'.format(merge_n)) + with open(output_filename, 'w', newline='\n', encoding=encoding) as output_fp: + writer = csv.writer(output_fp) + merge_n += 1 + rows = (yield_csv_rows(filename, columns, encoding) for filename in merge_filenames) + keyfunc = lambda row: [row[column] for column in columns] + writer.writerows(heapq.merge(*rows, key=keyfunc)) + sorted_filenames.append(output_filename) + + for filename in merge_filenames: + os.remove(filename) + return sorted_filenames[0] + + diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/context.py b/tests/context.py new file mode 100644 index 0000000..d86d6f3 --- /dev/null +++ b/tests/context.py @@ -0,0 +1,11 @@ + +# +# Inserts module at beginning of sys.path in order to test source code, not +# installed package. +# + +import os +import sys +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +import csvsorter diff --git a/tests/test.py b/tests/test.py new file mode 100644 index 0000000..2043116 --- /dev/null +++ b/tests/test.py @@ -0,0 +1,88 @@ + +import os +import uuid +import unittest + +from .context import csvsorter + +class TestCSVSorter(unittest.TestCase): + + def setUp(self): + # Create CSV file 5MB in size, reverse sorted + self.tmp_name = str(uuid.uuid4().hex) + '.csv' + self.num_lines = 200000 + with open(self.tmp_name, 'w') as fout: + for line in range(self.num_lines, 0, -1): + print('{},{},{},{}'.format(line+4, line+3, line+2, line+1), file=fout) + + def tearDown(self): + os.remove(self.tmp_name) + + + def check_file_sorted(self): + with open(self.tmp_name, 'r') as fin: + prev_line = fin.readline() + for line in fin: + self.assertTrue(prev_line < line) + prev_line = line + + def check_col_sorted(self, col, skip_header=False): + with open(self.tmp_name, 'r') as fin: + sorted_lines = fin.readlines() + if skip_header: + sorted_lines.pop(0) + gold = sorted(sorted_lines, key=lambda x : x.split(',')[col]) + + for x in range(len(sorted_lines)): + self.assertEqual(sorted_lines[x], gold[x]) + + + def test_memorysort_allcols(self): + csvsorter.memorysort(self.tmp_name, [0,1,2,3], 'utf-8') + self.check_file_sorted() + + def test_memorysort_onecol(self): + csvsorter.memorysort(self.tmp_name, [3], 'utf-8') + self.check_col_sorted(3) + + def test_csvsort(self): + # sort and force merges + csvsorter.csvsort(self.tmp_name, [0,1,2,3], max_size=1, has_header=False) + self.check_file_sorted() + + # make sure all the lines are here after merging + linecount = 0 + with open(self.tmp_name, 'r') as fin: + for line in fin: + linecount += 1 + self.assertEqual(self.num_lines, linecount) + + def test_csvsort_onecol(self): + # sort and force merges + csvsorter.csvsort(self.tmp_name, [3], max_size=1, has_header=False) + self.check_col_sorted(3) + + # make sure all the lines are here after merging + linecount = 0 + with open(self.tmp_name, 'r') as fin: + for line in fin: + linecount += 1 + self.assertEqual(self.num_lines, linecount) + + def test_header(self): + # sort and force merges + csvsorter.csvsort(self.tmp_name, [3], max_size=1, has_header=True) + self.check_col_sorted(3, skip_header=True) + + # make sure all the lines are present (header not missing) + with open(self.tmp_name, 'r') as fin: + header = fin.readline() + self.assertEqual(header, '{},{},{},{}\n'.format( + self.num_lines+4, self.num_lines+3, self.num_lines+2, + self.num_lines+1)) + linecount = 1 + for line in fin: + linecount += 1 + self.assertEqual(self.num_lines, linecount) + +