diff --git a/tests/basic_zip_in_out_test.py b/tests/basic_zip_in_out_test.py index 20422aa..93d61b7 100644 --- a/tests/basic_zip_in_out_test.py +++ b/tests/basic_zip_in_out_test.py @@ -24,15 +24,18 @@ def test_no_hang(self, process_mock): tells our test class that it actually cleaned up workers. ''' test_passes = {} - processes = vimap.pool.fork(worker_proc.init_args(init=i) for i in [1, 2, 3]) - processes.finish_workers = lambda: test_passes.setdefault('result', True) + processes = vimap.pool.fork( + worker_proc.init_args(init=i) for i in [1, 2, 3]) + processes.finish_workers = lambda: test_passes.setdefault('result', + True) del processes # will happen if it falls out of scope # gc.collect() -- doesn't seem necessary T.assert_dicts_equal(test_passes, {'result': True}) def test_basic(self): - processes = vimap.pool.fork(worker_proc.init_args(init=i) for i in [1, 2, 3]) + processes = vimap.pool.fork( + worker_proc.init_args(init=i) for i in [1, 2, 3]) list(processes.zip_in_out()) def test_reuse_pool(self): @@ -40,20 +43,25 @@ def test_reuse_pool(self): Test that process pools can be re-used. This is important for avoiding forking costs. ''' - processes = vimap.pool.fork(worker_proc.init_args(init=i) for i in [1, 2, 3]) + processes = vimap.pool.fork( + worker_proc.init_args(init=i) for i in [1, 2, 3]) - results = list(processes.imap([4, 4, 4]).zip_in_out(close_if_done=False)) + results = list( + processes.imap([4, 4, 4]).zip_in_out(close_if_done=False)) assert set(results) == set([(4, 5), (4, 6), (4, 7)]) - results = list(processes.imap([4, 4, 4]).zip_in_out(close_if_done=True)) + results = list( + processes.imap([4, 4, 4]).zip_in_out(close_if_done=True)) assert set(results) == set([(4, 5), (4, 6), (4, 7)]) def test_really_parallel(self): - '''Make sure things run in parallel: Determine that different threads are - handling different inputs (via time.sleep stuff). This could fail if the - sleep values are too small to compensate for the forking overhead. + '''Make sure things run in parallel: Determine that different + threads are handling different inputs (via time.sleep stuff). + This could fail if the sleep values are too small to + compensate for the forking overhead. ''' - processes = vimap.pool.fork(worker_proc.init_args(init=i) for i in [1, 2, 3]) + processes = vimap.pool.fork( + worker_proc.init_args(init=i) for i in [1, 2, 3]) results = [] for input, output in processes.imap([4, 4, 4] * 3).zip_in_out(): results.append((input, output)) diff --git a/tests/chunking_test.py b/tests/chunking_test.py index 3341157..f977613 100644 --- a/tests/chunking_test.py +++ b/tests/chunking_test.py @@ -8,7 +8,6 @@ import vimap.pool import vimap.worker_process - basic_worker = vimap.worker_process.worker(lambda inputs: inputs) diff --git a/tests/closes_fds_test.py b/tests/closes_fds_test.py index 5d43bdf..87bec2c 100644 --- a/tests/closes_fds_test.py +++ b/tests/closes_fds_test.py @@ -21,7 +21,6 @@ from vimap.testing import repeat_test_to_catch_flakiness import vimap.worker_process - # decrypt POSIX stuff readable_mode_strings = { 'directory': stat.S_ISDIR, @@ -32,7 +31,6 @@ 'symlink': stat.S_ISLNK, 'socket': stat.S_ISSOCK} - FDInfo = namedtuple("FDInfo", ["modes", "symlink"]) @@ -41,13 +39,15 @@ def current_proc_fd_dir(*subpaths): def fd_type_if_open(fd_number): - """For a given open file descriptor, return information about that file descriptor. + """For a given open file descriptor, + return information about that file descriptor. 'modes' are a list of human-readable strings describing the file type; 'symlink' is the target of the file descriptor (often a pipe name) """ fd_stat = os.fstat(fd_number) - modes = [k for k, v in readable_mode_strings.items() if v(fd_stat.st_mode)] + modes = [k for k, v in readable_mode_strings.items() if + v(fd_stat.st_mode)] if os.path.isdir(current_proc_fd_dir()): return FDInfo( modes=modes, @@ -61,7 +61,8 @@ def list_fds_linux(): fds = [ (int(i), current_proc_fd_dir(str(i))) for i in os.listdir(current_proc_fd_dir())] - # NOTE: Sometimes, an FD is used to list the above directory. Hence, we should + # NOTE: Sometimes, an FD is used to list the above directory. + # Hence, we should # re-check whether the FD still exists (via os.path.exists) return [i for (i, path) in fds if (i >= 3 and os.path.exists(path))] @@ -69,13 +70,16 @@ def list_fds_linux(): def list_fds_other(): """A method to list open FDs that doesn't need /proc/{pid}.""" max_fds_soft, _ = resource.getrlimit(resource.RLIMIT_NOFILE) - if max_fds_soft == resource.RLIM_INFINITY or not (3 < max_fds_soft < 4096): + if max_fds_soft == resource.RLIM_INFINITY or not ( + 3 < max_fds_soft < 4096): logging.warning( - "max_fds_soft invalid ({0}), assuming 4096 is a sufficient upper bound" + "max_fds_soft invalid ({0}), " + "assuming 4096 is a sufficient upper bound" .format(max_fds_soft)) max_fds_soft = 4096 - # The first three FDs are stdin, stdout, and stderr. We're interested in + # The first three FDs are stdin, stdout, and stderr. + # We're interested in # everything after. for i in xrange(3, max_fds_soft): try: @@ -113,12 +117,17 @@ def difference_open_fds(before, after): those FDs which were opened (present in `after` but not `before`) and closed. """ + # "a - b" for dicts -- remove anything in 'a' that has a key in b def dict_diff(a, b): return dict((k, a[k]) for k in (frozenset(a) - frozenset(b))) + for k in (frozenset(after) & frozenset(before)): if before[k] != after[k]: - print("WARNING: FD {0} changed from {1} to {2}".format(k, before[k], after[k])) + print( + "WARNING: FD {0} changed from {1} to {2}".format(k, + before[k], + after[k])) return { 'closed': dict_diff(before, after), 'opened': dict_diff(after, before)} @@ -128,6 +137,7 @@ class TestOpenFdsMethods(T.TestCase): """ Tests that we can detect open file descriptors. """ + def test_open_fds(self): first = get_open_fds() fd = open('vimap/pool.py', 'r') @@ -136,10 +146,14 @@ def test_open_fds(self): fd.close() third = get_open_fds() fd2.close() - T.assert_equal(len(difference_open_fds(first, second)['opened']), 2) - T.assert_equal(len(difference_open_fds(first, second)['closed']), 0) - T.assert_equal(len(difference_open_fds(second, third)['closed']), 1) - T.assert_equal(len(difference_open_fds(second, third)['opened']), 0) + T.assert_equal(len(difference_open_fds(first, second)['opened']), + 2) + T.assert_equal(len(difference_open_fds(first, second)['closed']), + 0) + T.assert_equal(len(difference_open_fds(second, third)['closed']), + 1) + T.assert_equal(len(difference_open_fds(second, third)['opened']), + 0) @vimap.worker_process.worker @@ -160,6 +174,7 @@ def instrumented_init(*args, **kwargs): self.queue_fds = difference_open_fds( self.before_queue_manager_init, self.after_queue_manager_init)['opened'] + with mock.patch.object( vimap.queue_manager.VimapQueueManager, '__init__', @@ -175,22 +190,29 @@ def test_all_fds_cleaned_up(self): after_finish_open_fds = get_open_fds() # Check that some FDs were opened after forking - after_fork = difference_open_fds(initial_open_fds, after_fork_open_fds) + after_fork = difference_open_fds(initial_open_fds, + after_fork_open_fds) # T.assert_equal(after_fork['closed'], []) - T.assert_gte(len(after_fork['opened']), 2) # should have at least 3 open fds + T.assert_gte(len(after_fork['opened']), + 2) # should have at least 3 open fds # All opened files should be FIFOs - if not all(info.modes == ['fifo'] for info in after_fork['opened'].values()): + if not all(info.modes == ['fifo'] for info in + after_fork['opened'].values()): print("Infos: {0}".format(after_fork['opened'])) T.assert_not_reached("Some infos are not FIFOs") - after_cleanup = difference_open_fds(after_fork_open_fds, after_finish_open_fds) + after_cleanup = difference_open_fds(after_fork_open_fds, + after_finish_open_fds) T.assert_gte(len(after_cleanup['closed']), 2) - left_around = difference_open_fds(initial_open_fds, after_finish_open_fds) + left_around = difference_open_fds(initial_open_fds, + after_finish_open_fds) if len(left_around['opened']) != 0: queue_fds_left_around = dict( - item for item in self.queue_fds.items() if item[0] in left_around['opened']) - print("Queue FDs left around: {0}".format(queue_fds_left_around)) + item for item in self.queue_fds.items() if + item[0] in left_around['opened']) + print( + "Queue FDs left around: {0}".format(queue_fds_left_around)) T.assert_equal(len(left_around['opened']), 0) diff --git a/tests/exception_handling_test.py b/tests/exception_handling_test.py index dbede99..ea838de 100644 --- a/tests/exception_handling_test.py +++ b/tests/exception_handling_test.py @@ -33,7 +33,8 @@ def test_formatted_traceback(self): testify.assert_in("in get_ec", ec.formatted_traceback) def test_formatted_exception(self): - testify.assert_equal("ValueError: hi", self.get_ec().formatted_exception) + testify.assert_equal("ValueError: hi", + self.get_ec().formatted_exception) def test_reraise(self): testify.assert_raises_and_contains( @@ -51,7 +52,8 @@ def test_unpickleable(self): cPickle.dumps(ec) # make sure it can indeed be serialized testify.assert_isinstance(ec, exception_handling.ExceptionContext) testify.assert_isinstance(ec.value, Exception) - testify.assert_equal(str(ec.value), "UNPICKLEABLE AND UNSERIALIZABLE MESSAGE") + testify.assert_equal(str(ec.value), + "UNPICKLEABLE AND UNSERIALIZABLE MESSAGE") def test_unpickleable_with_uninitializable_exception(self): """ @@ -59,6 +61,7 @@ def test_unpickleable_with_uninitializable_exception(self): not being in global variables) and one that can't be reinitialized with a single string argument. """ + class CustomException(Exception): def __init__(self, a, b): self.a, self.b = a, b @@ -72,5 +75,6 @@ def __str__(self): except: ec = exception_handling.ExceptionContext.current() cPickle.dumps(ec) # make sure it can indeed be serialized - testify.assert_isinstance(ec.value, exception_handling.UnpickleableException) + testify.assert_isinstance(ec.value, + exception_handling.UnpickleableException) testify.assert_equal(str(ec.value), "CustomException: 3, 4") diff --git a/tests/exceptions_test.py b/tests/exceptions_test.py index 9225b97..82d0d04 100644 --- a/tests/exceptions_test.py +++ b/tests/exceptions_test.py @@ -65,7 +65,8 @@ def run_test_pool(self, worker_f, inputs=None): (worker_f.init_args(init=i) for i in range(3)), - timeouts_config=vimap.config.TimeoutConfig(general_timeout=0.5), + timeouts_config=vimap.config.TimeoutConfig( + general_timeout=0.5), debug=False, # NOTE: Uncomment this to debug ) # Give every worker something to chew on. @@ -80,18 +81,25 @@ def check_died_prematurely_warning(self, print_warning_mock): for (args, kwargs) in print_warning_mock.call_args_list: T.assert_equal(args, ('All processes died prematurely!',)) - def check_printed_exceptions(self, print_exc_mock, expected_exception, count): - # From a mocked call to print_exception(), extract ExceptionContext.value + def check_printed_exceptions(self, print_exc_mock, expected_exception, + count): + # From a mocked call to print_exception(), + # extract ExceptionContext.value def get_ec_value((args, kwargs)): return (kwargs.get('ec') or args[0]).value + T.assert_equal( - [serialize_error(get_ec_value(call)) for call in print_exc_mock.call_args_list], + [serialize_error(get_ec_value(call)) for call in + print_exc_mock.call_args_list], [serialize_error(expected_exception)] * count) @vimap.testing.queue_feed_ignore_ioerrors_mock - @mock.patch.object(vimap.exception_handling, 'print_warning', autospec=True) - @mock.patch.object(vimap.exception_handling, 'print_exception', autospec=True) - def test_exception_before_iteration(self, print_exc_mock, print_warning_mock): + @mock.patch.object(vimap.exception_handling, 'print_warning', + autospec=True) + @mock.patch.object(vimap.exception_handling, 'print_exception', + autospec=True) + def test_exception_before_iteration(self, print_exc_mock, + print_warning_mock): """For workers that raise exceptions before they even look at their input generators, make sure we return exceptions appropriately. @@ -104,47 +112,60 @@ def test_exception_before_iteration(self, print_exc_mock, print_warning_mock): (inp, serialize_error(ec.value), typ) for inp, ec, typ in self.run_test_pool(worker_raise_exc_immediately) - ] + ] # Each worker will stop processing once an exception makes it to # the top so we only get that number of exceptions back out. expected_res_to_compare = [ - (vimap.pool.NO_INPUT, serialize_error(ValueError("hello")), 'exception'), - ] * self.chunk_adjusted_num_output_exceptions(3) + (vimap.pool.NO_INPUT, + serialize_error( + ValueError("hello")), + 'exception'), + ] * self.\ + chunk_adjusted_num_output_exceptions( + 3) T.assert_sorted_equal(res_to_compare, expected_res_to_compare) self.check_died_prematurely_warning(print_warning_mock) # No matter how many exceptions are returned (3 in the default case, 1 # in the chunked case), there should always be an exception printed for # each worker before the pool is shut down. - self.check_printed_exceptions(print_exc_mock, ValueError("hello"), 3) + self.check_printed_exceptions(print_exc_mock, ValueError("hello"), + 3) @vimap.testing.queue_feed_ignore_ioerrors_mock - @mock.patch.object(vimap.exception_handling, 'print_warning', autospec=True) - @mock.patch.object(vimap.exception_handling, 'print_exception', autospec=True) - def test_exception_before_iteration_doesnt_freeze(self, print_exc_mock, print_warning_mock): + @mock.patch.object(vimap.exception_handling, 'print_warning', + autospec=True) + @mock.patch.object(vimap.exception_handling, 'print_exception', + autospec=True) + def test_exception_before_iteration_doesnt_freeze(self, print_exc_mock, + print_warning_mock): """This test checks that if exceptions are raised and a lot of input is queued up, the system will stop normally. """ - for inp, ec, typ in self.run_test_pool(worker_raise_exc_immediately, inputs=range(1, 101)): + for inp, ec, typ in self.run_test_pool( + worker_raise_exc_immediately, + inputs=range(1, 101)): pass - @mock.patch.object(vimap.exception_handling, 'print_exception', autospec=True) + @mock.patch.object(vimap.exception_handling, 'print_exception', + autospec=True) def test_exception_after_iteration_not_returned(self, print_exc_mock): res_to_compare = [ (inp, out, typ) for inp, out, typ in self.run_test_pool(worker_raise_exc_after_iteration) - ] + ] # The pool notices that all output has been returned, so doesn't # wait for any more responses. We shouldn't see exceptions. expected_res_to_compare = [ (inp, inp, 'output') for inp in range(1, 10) - ] + ] T.assert_sorted_equal(res_to_compare, expected_res_to_compare) @vimap.testing.queue_feed_ignore_ioerrors_mock - @mock.patch.object(vimap.exception_handling, 'clean_print', autospec=True) + @mock.patch.object(vimap.exception_handling, 'clean_print', + autospec=True) def test_exception_formatting(self, clean_print_mock): '''Test the formatting of exceptions (they should include the error in red, and the start of a traceback). @@ -157,9 +178,12 @@ def test_exception_formatting(self, clean_print_mock): T.assert_starts_with(args[0], expected_message) @vimap.testing.queue_feed_ignore_ioerrors_mock - @mock.patch.object(vimap.exception_handling, 'print_warning', autospec=True) - @mock.patch.object(vimap.exception_handling, 'print_exception', autospec=True) - def test_exception_with_curleys(self, print_exc_mock, print_warning_mock): + @mock.patch.object(vimap.exception_handling, 'print_warning', + autospec=True) + @mock.patch.object(vimap.exception_handling, 'print_exception', + autospec=True) + def test_exception_with_curleys(self, print_exc_mock, + print_warning_mock): '''Dumb test ... I aim to write tests for most every bug that had existed, but this is kinda 1-off ... (.format() got a curley brace). ''' @@ -167,59 +191,80 @@ def test_exception_with_curleys(self, print_exc_mock, print_warning_mock): (serialize_error(ec.value), typ) for _, ec, typ in self.run_test_pool(worker_raise_exc_with_curleys) - ] + ] # Each worker will stop processing once an exception makes it to # the top so we only get that number of exceptions back out. expected_res_to_compare = [ - # We're not sure which inputs will get picked, but all - # should return this exception. - (serialize_error(ValueError("{0} curley braces!")), 'exception'), - ] * self.chunk_adjusted_num_output_exceptions(3) + # We're not sure which inputs + # will get picked, but all + # should return this exception. + (serialize_error( + ValueError( + "{0} curley braces!")), + 'exception'), + ] * self.\ + chunk_adjusted_num_output_exceptions( + 3) T.assert_sorted_equal(res_to_compare, expected_res_to_compare) self.check_died_prematurely_warning(print_warning_mock) - @mock.patch.object(vimap.exception_handling, 'print_exception', autospec=True) + @mock.patch.object(vimap.exception_handling, 'print_exception', + autospec=True) def test_unconsumed_exceptions(self, print_exc_mock): '''Unconsumed exceptions should only be printed. ''' processes = self.fork_pool( - worker_raise_exc_immediately.init_args(init=i) for i in [1, 1, 1]) + worker_raise_exc_immediately.init_args(init=i) for i in + [1, 1, 1]) del processes - self.check_printed_exceptions(print_exc_mock, ValueError("hello"), 3) + self.check_printed_exceptions(print_exc_mock, ValueError("hello"), + 3) - @mock.patch.object(vimap.exception_handling, 'print_exception', autospec=True) + @mock.patch.object(vimap.exception_handling, 'print_exception', + autospec=True) def test_a_few_error(self, print_exc_mock): processes = self.fork_pool( - (worker_raise_exc_with_curleys.init_args(init=i) for i in xrange(2)), + (worker_raise_exc_with_curleys.init_args(init=i) for i in + xrange(2)), max_real_in_flight_factor=2) with T.assert_raises(WorkerException): processes.imap([1]).block_ignore_output() del processes calls = print_exc_mock.call_args_list - errors = [serialize_error(call_args[0].value) for call_args, _ in calls] - T.assert_equal(errors, [serialize_error(ValueError("{0} curley braces!"))]) + errors = [serialize_error(call_args[0].value) for call_args, _ in + calls] + T.assert_equal(errors, + [serialize_error(ValueError("{0} curley braces!"))]) - @mock.patch.object(vimap.exception_handling, 'print_exception', autospec=True) + @mock.patch.object(vimap.exception_handling, 'print_exception', + autospec=True) def test_a_few_error_with_zip_in_out(self, print_exc_mock): - """Same as the above, but use zip_in_out() instead of block_ignore_output().""" + """Same as the above, but use zip_in_out() + instead of block_ignore_output().""" processes = self.fork_pool( - (worker_raise_exc_with_curleys.init_args(init=i) for i in xrange(2)), + (worker_raise_exc_with_curleys.init_args(init=i) for i in + xrange(2)), max_real_in_flight_factor=2) with T.assert_raises(WorkerException): tuple(processes.imap([1]).zip_in_out()) del processes calls = print_exc_mock.call_args_list - errors = [serialize_error(call_args[0].value) for call_args, _ in calls] - T.assert_equal(errors, [serialize_error(ValueError("{0} curley braces!"))]) + errors = [serialize_error(call_args[0].value) for call_args, _ in + calls] + T.assert_equal(errors, + [serialize_error(ValueError("{0} curley braces!"))]) @repeat_test_to_catch_flakiness(5) - @mock.patch.object(vimap.exception_handling, 'print_warning', autospec=True) - @mock.patch.object(vimap.exception_handling, 'print_exception', autospec=True) + @mock.patch.object(vimap.exception_handling, 'print_warning', + autospec=True) + @mock.patch.object(vimap.exception_handling, 'print_exception', + autospec=True) def test_fail_after_a_while(self, print_exc_mock, print_warning_mock): processes = self.fork_pool( - (worker_raise_exc_with_curleys.init_args(init=i) for i in xrange(100)), + (worker_raise_exc_with_curleys.init_args(init=i) for i in + xrange(100)), max_real_in_flight_factor=2 ) processes.imap([-1] * 3000 + list(range(50))) @@ -228,42 +273,54 @@ def test_fail_after_a_while(self, print_exc_mock, print_warning_mock): res_to_compare = [] for inp, out, typ in processes.zip_in_out_typ(): if typ == 'exception': - res_to_compare.append((inp, serialize_error(out.value), typ)) + res_to_compare.append( + (inp, serialize_error(out.value), typ)) else: res_to_compare.append((inp, out, typ)) # All the -1s will produce None output. expected_res_to_compare = [ - (-1, None, 'output') - ] * 3000 + (-1, None, 'output') + ] * 3000 # Once we get to the positive numbers, we start causing 50 of # the 100 workers to throw exceptions. - expected_res_to_compare.extend([ - (i, serialize_error(ValueError("{0} curley braces!")), 'exception') - for i in range(self.chunk_adjusted_num_output_exceptions(50)) - ]) + expected_res_to_compare\ + .extend([ + (i, serialize_error(ValueError( + "{0} curley braces!")), + 'exception') + for i in range( + self + .chunk_adjusted_num_output_exceptions(50)) + ]) T.assert_sorted_equal(res_to_compare, expected_res_to_compare) # Check out exception logging. calls = print_exc_mock.call_args_list - errors = [serialize_error(call_args[0].value) for call_args, _ in calls] + errors = [serialize_error(call_args[0].value) for call_args, _ in + calls] T.assert_equal(errors, ( [serialize_error(ValueError("{0} curley braces!"))] * self.chunk_adjusted_num_output_exceptions(50))) - # NOTE: Sometimes, the weakref in the pool is deleted, so 'has_exceptions' is - # not set, and the pool prints warnings we don't actually care about. Make + # NOTE: Sometimes, the weakref in the pool is deleted, + # so 'has_exceptions' is + # not set, and the pool prints warnings + # we don't actually care about. Make # sure that this is the only warning printed. if print_warning_mock.call_args_list: T.assert_equal(len(print_warning_mock.call_args_list), 1) [warning] = print_warning_mock.call_args_list - T.assert_in('Pool disposed before input was consumed', warning[0][0]) + T.assert_in('Pool disposed before input was consumed', + warning[0][0]) class ChunkedExceptionsTest(ExceptionsTest): chunk_size = vimap.pool._DEFAULT_DEFAULT_CHUNK_SIZE def fork_pool(self, *args, **kwargs): - return vimap.pool.fork_chunked(*args, default_chunk_size=self.chunk_size, **kwargs) + return vimap.pool.fork_chunked(*args, + default_chunk_size=self.chunk_size, + **kwargs) def chunk_adjusted_num_output_exceptions(self, n): return int(math.ceil(float(n) / self.chunk_size)) @@ -281,7 +338,8 @@ def test_no_exception_currently(self): def test_formats_correctly(self): expected_formatted_tb = ['Traceback:\n', 'stuff\n', 'more stuff\n'] expected_ex = ValueError('test') - expected_formatted_tb_str = """Traceback:\nstuff\nmore stuff\nValueError('test',)""" + expected_formatted_tb_str =\ + """Traceback:\nstuff\nmore stuff\nValueError('test',)""" expected_ec = ExceptionContext( value=expected_ex, formatted_traceback=expected_formatted_tb_str) @@ -289,16 +347,16 @@ def test_formats_correctly(self): mock_exc_info = (None, expected_ex, expected_tb) with mock.patch.object( - vimap.exception_handling.sys, - 'exc_info', - autospec=True, - return_value=mock_exc_info, + vimap.exception_handling.sys, + 'exc_info', + autospec=True, + return_value=mock_exc_info, ): with mock.patch.object( - vimap.exception_handling.traceback, - 'format_tb', - autospec=True, - return_value=expected_formatted_tb, + vimap.exception_handling.traceback, + 'format_tb', + autospec=True, + return_value=expected_formatted_tb, ): found_ec = ExceptionContext.current() T.assert_equal(found_ec, expected_ec) diff --git a/tests/ext/sugar_test.py b/tests/ext/sugar_test.py index 7780b8f..ace969a 100644 --- a/tests/ext/sugar_test.py +++ b/tests/ext/sugar_test.py @@ -15,7 +15,8 @@ def run_exception_test(imap_ordered_or_unordered): """ - Checks that exceptions are re-thrown, for either imap_unordered or imap_ordered. + Checks that exceptions are re-thrown, + for either imap_unordered or imap_ordered. :param imap_ordered_or_unordered: either vimap.ext.sugar.imap_unordered or ...imap_ordered @@ -82,15 +83,19 @@ def test_streaming(self): T.assert_gt( len(unspooled_input), 9000, - message="Most inputs should not be processed (too much spooling / " - "not lazy). Only {0} remained.".format(len(unspooled_input)) + message="Most inputs should not be processed " + "(too much spooling / " + "not lazy). Only {0} remained." + .format(len(unspooled_input)) ) - assert num_processed + len(unspooled_input) == 10000, "Something got dropped" + assert num_processed + len(unspooled_input) == 10000,\ + "Something got dropped" T.assert_equal( consumed + rest, tuple(2 * i for i in xrange(num_processed)), - message="Processed inputs weren't the first in the stream, or are out of order." + message="Processed inputs weren't the first " + "in the stream, or are out of order." ) def test_exceptions(self): @@ -115,8 +120,10 @@ def test_chunking(self): # itself along with the process pid so that we know which process gets # which input value. def input_with_pid(input): - # Delay a bit here to prevent the test being flaky when one worker - # finishing with one chunk of input and grabbing next before we assign + # Delay a bit here to prevent the test + # being flaky when one worker + # finishing with one chunk of input + # and grabbing next before we assign # that next chunk to another worker time.sleep(0.1) return (input, multiprocessing.current_process().pid) @@ -127,12 +134,15 @@ def input_with_pid(input): chunk_size=3 )) - # By grouping return values by pid, we could get all the input values for - # each worker process. Make sure the input value groups are the same as + # By grouping return values by pid, we could + # get all the input values for + # each worker process. Make sure the input + # value groups are the same as # the result of chunking (expected_input_chunks). expected_input_chunks = [(0, 1, 2), (3, 4, 5), (6, 7)] actual_input_chunks = [] - for pid, group in itertools.groupby(input_with_pids, key=lambda (x, pid): pid): + for pid, group in itertools.groupby(input_with_pids, + key=lambda (x, pid): pid): input_chunk, pids = zip(*group) actual_input_chunks.append(input_chunk) diff --git a/tests/fuzz_test.py b/tests/fuzz_test.py index a40c728..ccedf94 100644 --- a/tests/fuzz_test.py +++ b/tests/fuzz_test.py @@ -21,12 +21,15 @@ def fork_pool(self, *args, **kwargs): def test_fuzz(self): for n in xrange(1, 100): - processes = self.fork_pool(worker_proc.init_args(init=i) for i in [1, 1, 1]) + processes = self.fork_pool( + worker_proc.init_args(init=i) for i in [1, 1, 1]) res = list(processes.imap(list(range(1, n))).zip_in_out()) - T.assert_sets_equal(set(out for in_, out in res), set(range(2, n+1))) + T.assert_sets_equal(set(out for in_, out in res), + set(range(2, n + 1))) def test_try_overwhelm_output_queue(self): - processes = self.fork_pool(worker_proc.init_args(init=i) for i in [1, 1, 1]) + processes = self.fork_pool( + worker_proc.init_args(init=i) for i in [1, 1, 1]) processes.imap(xrange(10000)).block_ignore_output() def test_slow_consumer(self): diff --git a/tests/instancemethod_worker_test.py b/tests/instancemethod_worker_test.py index 8f688f0..b89d6e1 100644 --- a/tests/instancemethod_worker_test.py +++ b/tests/instancemethod_worker_test.py @@ -22,9 +22,11 @@ def worker(self, iterable, init_arg): class InstancemethodWorkerTest(T.TestCase): def test_basic(self): ti = TestInstance(unpickleable) - pool = vimap.pool.fork(ti.worker.init_args(init_arg=4) for _ in xrange(3)) + pool = vimap.pool.fork( + ti.worker.init_args(init_arg=4) for _ in xrange(3)) result = list(pool.imap([2100, 2200, 2300]).zip_in_out()) - T.assert_equal(set(result), set([(2300, 2307), (2100, 2107), (2200, 2207)])) + T.assert_equal(set(result), + set([(2300, 2307), (2100, 2107), (2200, 2207)])) def test_unpickleable(self): T.assert_raises( diff --git a/tests/performance_test.py b/tests/performance_test.py index c0106a9..4099f6d 100644 --- a/tests/performance_test.py +++ b/tests/performance_test.py @@ -26,14 +26,19 @@ def assert_memory_use(name, lower, upper): def rss(): return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + gc.collect() before = rss() yield gc.collect() mem_mb = (rss() - before) / 1024.0 - print("For {0}, memory use increased by {1:.2f} MB".format(name, mem_mb)) - assert lower <= mem_mb, "Memory usage expected to increase by at least {0} MB".format(lower) - assert mem_mb < upper, "Memory usage expected to increase by at most {0} MB".format(upper) + print( + "For {0}, memory use increased by {1:.2f} MB".format(name, mem_mb)) + assert lower <= mem_mb, \ + "Memory usage expected to increase by at least {0} MB".format( + lower) + assert mem_mb < upper, \ + "Memory usage expected to increase by at most {0} MB".format(upper) def factorial(n): @@ -74,23 +79,29 @@ def _retry_test(test_fcn): """To avoid flakiness inherent in performance tests, we allow the test function to fail once, so long as it succeeds a second time. """ + @functools.wraps(test_fcn) def inner(*args, **kwargs): try: return test_fcn(*args, **kwargs) except Exception as e: logging.warning( - "Warning: performance test {0} failed with exception {0}: {1}, retrying" + "Warning: performance test {0} " + "failed with exception {0}: {1}, retrying" .format(test_fcn.__name__, type(e), e)) return test_fcn(*args, **kwargs) + return inner -@T.suite("performance", "These tests may flake if your testing server is overloaded.") +@T.suite("performance", + "These tests may flake if your testing server is overloaded.") class PerformanceTest(T.TestCase): def get_speedup_factor(self, baseline_fcn, optimized_fcn, num_tests): - baseline_performance = timeit.timeit(baseline_fcn, number=num_tests) - optimized_performance = timeit.timeit(optimized_fcn, number=num_tests) + baseline_performance = timeit.timeit(baseline_fcn, + number=num_tests) + optimized_performance = timeit.timeit(optimized_fcn, + number=num_tests) _message = "Performance test too fast, susceptible to overhead" T.assert_gt(baseline_performance, 0.005, _message) T.assert_gt(optimized_performance, 0.005, _message) @@ -98,11 +109,14 @@ def get_speedup_factor(self, baseline_fcn, optimized_fcn, num_tests): def test_retry_raises_on_second_failure(self): """Dumb paranoia test to check our _retry_test decorator.""" + @_retry_test def always_fails(): raise ValueError() + with T.assert_raises(ValueError): - with mock.patch.object(logging, 'warning'): # suppress console spam + with mock.patch.object(logging, + 'warning'): # suppress console spam always_fails() @_retry_test @@ -110,7 +124,8 @@ def test_performance(self): # NOTE: Avoid hyperthreading, which doesn't help performance # in our test case. num_workers = min(4, multiprocessing.cpu_count() / 2) - T.assert_gt(num_workers, 1, "Too few cores to run performance test.") + T.assert_gt(num_workers, 1, + "Too few cores to run performance test.") start = 15000 num_inputs = 2 * num_workers @@ -120,15 +135,19 @@ def factor_sequential(): for i in inputs: factorial(i) - pool = vimap.pool.fork_identical(factorial_worker, num_workers=num_workers) + pool = vimap.pool.fork_identical(factorial_worker, + num_workers=num_workers) def factor_parallel(): pool.imap(inputs).block_ignore_output(close_if_done=False) - speedup_ratio = self.get_speedup_factor(factor_sequential, factor_parallel, 4) + speedup_ratio = self.get_speedup_factor(factor_sequential, + factor_parallel, 4) efficiency = speedup_ratio / num_workers - print("Easy performance test efficiency: {0:.1f}% ({1:.1f}x speedup)".format( - efficiency * 100., speedup_ratio)) + print( + "Easy performance test efficiency: " + "{0:.1f}% ({1:.1f}x speedup)".format( + efficiency * 100., speedup_ratio)) T.assert_gt(efficiency, 0.65, "Failed performance test!!") @_retry_test @@ -137,43 +156,56 @@ def test_chunking_really_is_faster(self): communication overhead is the biggest factor). """ inputs = tuple(xrange(1, 10)) * 1000 - normal_pool = vimap.pool.fork_identical(factorial_worker, num_workers=2) - chunked_pool = vimap.pool.fork_identical_chunked(factorial_worker, num_workers=2) + normal_pool = vimap.pool.fork_identical(factorial_worker, + num_workers=2) + chunked_pool = vimap.pool.fork_identical_chunked(factorial_worker, + num_workers=2) def factor_normal(): - normal_pool.imap(inputs).block_ignore_output(close_if_done=False) + normal_pool.imap(inputs).block_ignore_output( + close_if_done=False) def factor_chunked(): - chunked_pool.imap(inputs).block_ignore_output(close_if_done=False) - - speedup_ratio = self.get_speedup_factor(factor_normal, factor_chunked, 2) - print("Chunked performance test: {0:.1f}x speedup".format(speedup_ratio)) + chunked_pool.imap(inputs).block_ignore_output( + close_if_done=False) + + speedup_ratio = self.get_speedup_factor(factor_normal, + factor_chunked, + 2) + print( + "Chunked performance test: {0:.1f}x speedup".format( + speedup_ratio)) T.assert_gt(speedup_ratio, 5) - def run_big_fork_test(self, time_sleep_s, num_workers, num_inputs, num_test_iterations): + def run_big_fork_test(self, time_sleep_s, num_workers, num_inputs, + num_test_iterations): """Common setup for the big fork test; see usage in the two tests below. :returns: The amount of time the test took, in seconds. """ - pool = vimap.pool.fork_identical(simple_sleep_worker, num_workers=num_workers) + pool = vimap.pool.fork_identical(simple_sleep_worker, + num_workers=num_workers) def sleep_in_parallel(): pool.imap(time_sleep_s for _ in xrange(num_inputs)) pool.block_ignore_output(close_if_done=False) - return timeit.timeit(sleep_in_parallel, number=num_test_iterations) / num_test_iterations + return timeit.timeit(sleep_in_parallel, + number=num_test_iterations) / num_test_iterations @_retry_test def test_big_fork(self): """Tests that we can fork a large number of processes, each of which will wait for a few milliseconds, and return. - NOTE: currently fails if you bump 70 up to 200. We're going to fix this very soon. + NOTE: currently fails if you bump 70 up to 200. + We're going to fix this very soon. """ time_sleep_s = 0.2 test_time = self.run_big_fork_test(time_sleep_s, 70, 70, 3) - print("Big fork performance test: {0:.2f} s (nominal: {1:.2f} s)".format( - test_time, time_sleep_s)) + print( + "Big fork performance test: {0:.2f} s (nominal: {1:.2f} s)".format( + test_time, time_sleep_s)) T.assert_lt(test_time, time_sleep_s * 2) def test_big_fork_test(self): @@ -186,17 +218,22 @@ def test_big_fork_test(self): def test_memory_consumption_reading_input(self): """Tests that memory usage increases when expected.""" - with assert_memory_use("test-should-increase-read-urandom-streaming", 0, 0.5): + with assert_memory_use( + "test-should-increase-read-urandom-streaming", + 0, 0.5): for _ in urandom_stream(size_kb=2048): pass - with assert_memory_use("test-should-increase-read-urandom", 0.5, 11): + with assert_memory_use("test-should-increase-read-urandom", 0.5, + 11): buf = list(urandom_stream(size_kb=10 * 1024)) buf.pop(0) def test_memory_consumption_stream_processing(self): - """Tests that memory usage is constant when processing a large stream.""" + """Tests that memory usage is constant when + processing a large stream.""" pool = vimap.pool.fork_identical(string_length_worker) with assert_memory_use("test-stream-40-mb", 0, 3): - pool.imap(urandom_stream(size_kb=40 * 1024)).block_ignore_output() + pool.imap( + urandom_stream(size_kb=40 * 1024)).block_ignore_output() diff --git a/tests/queue_manager_test.py b/tests/queue_manager_test.py index dcb0515..712b357 100644 --- a/tests/queue_manager_test.py +++ b/tests/queue_manager_test.py @@ -55,11 +55,11 @@ def get_nowait(self): def get_qm( - input_queue, - output_queue, - max_real_in_flight=10, - max_total_in_flight=20, - timeouts_config=vimap.config.TimeoutConfig.default_config() + input_queue, + output_queue, + max_real_in_flight=10, + max_total_in_flight=20, + timeouts_config=vimap.config.TimeoutConfig.default_config() ): qm = vimap.queue_manager.VimapQueueManager( max_real_in_flight=max_real_in_flight, diff --git a/tests/streaming_test.py b/tests/streaming_test.py index 81c8456..3df1383 100644 --- a/tests/streaming_test.py +++ b/tests/streaming_test.py @@ -39,7 +39,8 @@ def test_streaming(self): def input_generator(): for i in sorted(inputs_which_must_be_processed): yield i - while not already_processed.issuperset(inputs_which_must_be_processed): + while not already_processed.issuperset( + inputs_which_must_be_processed): yield None pool = self.fork_pool() @@ -51,7 +52,8 @@ def input_generator(): # input_generator(). It can be greater than zero, when the worker # hasn't finished processing the first 100 numerical inputs, but our # main thread wants to enqueue more inputs (to keep the workers busy). - streaming_lookahead = num_elements_total - len(inputs_which_must_be_processed) + streaming_lookahead = num_elements_total - len( + inputs_which_must_be_processed) T.assert_gte( streaming_lookahead, 0, @@ -76,8 +78,10 @@ class StrictInflightStreamingTest(StreamingTest): """Checks that max_total_in_flight acts as a hard upper bound on the number of inputs spooled. """ + def fork_pool(self): - pool = vimap.pool.fork([do_nothing_worker.init_args()], max_total_in_flight_factor=2) + pool = vimap.pool.fork([do_nothing_worker.init_args()], + max_total_in_flight_factor=2) # This assert checks that the max_total_in_flight argument is properly # propagated to the queue manager, and makes it directly obvious that @@ -95,6 +99,7 @@ def __init__(self, x): class InputRefsReleasedTest(T.TestCase): """Checks that references to input elements are released.""" + def test(self): weakrefs = [] inputs = (MyFancyClass(x) for x in xrange(100)) diff --git a/tests/testing_test.py b/tests/testing_test.py index 8a8b4dc..64c2fd2 100644 --- a/tests/testing_test.py +++ b/tests/testing_test.py @@ -13,11 +13,11 @@ def worker_proc(seq, init): class DebugPoolTest(T.TestCase): - @vimap.testing.mock_debug_pool() def test_debug_pool(self): T.assert_equal(vimap.pool.VimapPool, vimap.testing.DebugPool) - processes = vimap.pool.fork(worker_proc.init_args(init=i) for i in [1, 1, 1]) + processes = vimap.pool.fork( + worker_proc.init_args(init=i) for i in [1, 1, 1]) processes.imap([1, 2, 3]).block_ignore_output() for i in [1, 2, 3]: @@ -25,7 +25,6 @@ def test_debug_pool(self): class SerialPoolTest(T.TestCase): - @T.setup_teardown def mock_pool(self): with vimap.testing.mock_serial_pool(): @@ -34,7 +33,8 @@ def mock_pool(self): @mock.patch('multiprocessing.Process.start') def test_serial_pool_doesnt_fork(self, start): - processes = vimap.pool.fork(worker_proc.init_args(init=i) for i in [1, 1, 1]) + processes = vimap.pool.fork( + worker_proc.init_args(init=i) for i in [1, 1, 1]) processes.imap([1, 2, 3]) processes.block_ignore_output() @@ -44,34 +44,39 @@ def test_serial_pool_doesnt_fork(self, start): assert not start.called def test_zip_in_out(self): - processes = vimap.pool.fork(worker_proc.init_args(init=i) for i in [1, 2, 3]) + processes = vimap.pool.fork( + worker_proc.init_args(init=i) for i in [1, 2, 3]) - results = list(processes.imap([4, 4, 4]).zip_in_out(close_if_done=False)) + results = list( + processes.imap([4, 4, 4]).zip_in_out(close_if_done=False)) T.assert_sets_equal(set(results), set([(4, 5), (4, 6), (4, 7)])) - results = list(processes.imap([4, 4, 4]).zip_in_out(close_if_done=True)) + results = list( + processes.imap([4, 4, 4]).zip_in_out(close_if_done=True)) T.assert_sets_equal(set(results), set([(4, 5), (4, 6), (4, 7)])) def test_zip_in_out_lots_of_input(self): - processes = vimap.pool.fork(worker_proc.init_args(init=i) for i in [1, 2, 3]) + processes = vimap.pool.fork( + worker_proc.init_args(init=i) for i in [1, 2, 3]) results = list(processes.imap([4, 4, 4] * 3).zip_in_out()) T.assert_equal(set(results[0:3]), set([(4, 5), (4, 6), (4, 7)])) T.assert_equal(set(results[3:6]), set([(4, 5), (4, 6), (4, 7)])) T.assert_equal(set(results[6:9]), set([(4, 5), (4, 6), (4, 7)])) def test_zip_in_out_more_workers_than_input(self): - processes = vimap.pool.fork(worker_proc.init_args(init=i) for i in [1, -1, 0]) + processes = vimap.pool.fork( + worker_proc.init_args(init=i) for i in [1, -1, 0]) results = list(processes.imap([4, 4]).zip_in_out()) T.assert_equal(set(results), set([(4, 5), (4, 3)])) def test_zip_in_out_no_input(self): - processes = vimap.pool.fork(worker_proc.init_args(init=i) for i in [1, -1, 0]) + processes = vimap.pool.fork( + worker_proc.init_args(init=i) for i in [1, -1, 0]) results = processes.imap([]).zip_in_out() T.assert_equal(set(results), set([])) class NoWarningsTest(T.TestCase): - def test_no_warnings(self): with mock.patch('sys.stderr') as stderr: vimap.exception_handling.print_warning('') @@ -82,5 +87,6 @@ def test_no_warnings(self): with vimap.testing.no_warnings(): vimap.exception_handling.print_warning('') + if __name__ == '__main__': T.run() diff --git a/tox.ini b/tox.ini index c19c4d1..a5fc5ea 100644 --- a/tox.ini +++ b/tox.ini @@ -14,7 +14,6 @@ commands = testify {posargs:tests --exclude-suite=disabled --summary} [testenv:lint] deps = {[testenv:py]deps} commands = - flake8 vimap flake8 tests # somehow broken: pylint -E vimap diff --git a/vimap/pool.py b/vimap/pool.py index e5564ec..f1e26de 100644 --- a/vimap/pool.py +++ b/vimap/pool.py @@ -8,7 +8,8 @@ for in_, out in pool.imap(my_input).zip_in_out(): ... -You can also use it in a more "async" manner, e.g. when your input sequences are +You can also use it in a more "async" manner, e.g. +when your input sequences are relatively small and/or calculated ahead of time, you can write, processes.map(seq1) @@ -32,10 +33,8 @@ import vimap.chunked_real_worker_routine import vimap.util - NO_INPUT = 'NO_INPUT' - _DEFAULT_DEFAULT_CHUNK_SIZE = 100 @@ -88,15 +87,22 @@ def __init__( def check_output_for_error(item): uid, typ, output = item if typ == 'exception': - # NOTE(gatoatigrado|2015-02-23): While we may eventually want to remove - # this, since zip_in_out() rethrows exceptions and such, it's probably - # okay for now since there are a few conditions, like exceptions being - # raised on worker startup, or the main process not finishing - # iterating through zip_in_out(), where only this would print errors. + # NOTE(gatoatigrado|2015-02-23): While we may + # eventually want to remove + # this, since zip_in_out() rethrows exceptions and such, + # it's probably + # okay for now since there are a few conditions, + # like exceptions being + # raised on worker startup, or the main process + # not finishing + # iterating through zip_in_out(), where only this + # would print errors. # c.f. ExceptionTest.test_unconsumed_exceptions. - vimap.exception_handling.print_exception(output, None, None) + vimap.exception_handling.print_exception(output, None, + None) if self_ref(): self_ref().has_exceptions = True + self.qm.add_output_hook(check_output_for_error) self.processes = [] @@ -126,7 +132,9 @@ def print_output_progress(item): state['output_counter'] += 1 if time.time() - state['last_printed'] > print_interval_s: state['last_printed'] = time.time() - print_fcn("Processed {0} {1}".format(state['output_counter'], item_type)) + print_fcn("Processed {0} {1}" + .format(state['output_counter'], item_type)) + self.qm.add_output_hook(print_output_progress) return self @@ -134,7 +142,8 @@ def fork(self, debug=None): debug = self.debug if debug is None else debug for i, worker in enumerate(self.worker_sequence): routine = self.worker_routine_class( - worker.fcn, worker.args, worker.kwargs, index=i, debug=debug) + worker.fcn, worker.args, worker.kwargs, index=i, + debug=debug) process = self.process_class( target=routine.run, args=(self.qm.input_queue, self.qm.output_queue)) @@ -177,14 +186,16 @@ def join_and_consume_output(self): # queue as processes die, or else other processes may not # be able to enqueue their final items to the output queue # (since it's full). - while not self.all_processes_died(exception_check_optimization=False): + while not self.all_processes_died( + exception_check_optimization=False): self.qm.feed_out_to_tmp(max_time_s=None) time.sleep(0.001) self.qm.feed_out_to_tmp(max_time_s=None) for process in self.processes: process.join() - # NOTE: Not only prevents future erroneous accesses, 'del' is actually + # NOTE: Not only prevents future erroneous accesses, + # 'del' is actually # necessary to clean up / close the pipes used by the process. del self.processes @@ -192,10 +203,12 @@ def join_and_consume_output(self): @vimap.util.instancemethod_runonce() def finish_workers(self): - '''Sends stop tokens to subprocesses, then joins them. There may still be + '''Sends stop tokens to subprocesses, then joins them. + There may still be unconsumed output. - This method is called when you call zip_in_out() with finish_workers=True + This method is called when you call zip_in_out() with + finish_workers=True (the default), as well as when the GC reclaims the pool. ''' if self.debug: @@ -231,20 +244,25 @@ def all_input_serialized(self): '''Input from all calls to imap; downside of this approach is that it keeps around dead iterators. ''' + def get_serialized((x, xser)): uid = self.input_uid_ctr self.input_uid_ctr += 1 self.input_uid_to_input[uid] = x return (uid, xser) - return (get_serialized(x) for seq in self.input_sequences for x in seq) + + return (get_serialized(x) for seq in + self.input_sequences for x in seq) def spool_input(self, close_if_done=False): '''Put input on the queue. If `close_if_done` and we reach the end of the input stream, send stop tokens. ''' - if self.qm.spool_input(self.all_input_serialized) and close_if_done: + if self.qm.spool_input( + self.all_input_serialized) and close_if_done: # reached the end of the stream self.send_stop_tokens() + # ------ def get_corresponding_input(self, uid, output): @@ -290,7 +308,8 @@ def zip_in_out_typ(self, close_if_done=True): raise if close_if_done: self.finish_workers() - # Return when input given is exhausted, or workers die from exceptions + # Return when input given is exhausted, or workers + # die from exceptions def zip_in_out(self, *args, **kwargs): '''Yield (input, output) tuples for each input item processed. @@ -298,14 +317,17 @@ def zip_in_out(self, *args, **kwargs): they will be re-raised on the main process. :param *args: args (currently, close_if_done) passed to zip_in_out_typ - :param **kwargs: kwargs (currently, close_if_done) passed to zip_in_out_typ + :param **kwargs: kwargs (currently, close_if_done) + passed to zip_in_out_typ ''' for inp, output, typ in self.zip_in_out_typ(*args, **kwargs): if typ == 'output': yield inp, output elif typ == 'exception': - assert isinstance(output, vimap.exception_handling.ExceptionContext) + assert isinstance(output, + vimap.exception_handling.ExceptionContext) output.reraise() + # ------ def block_ignore_output(self, *args, **kwargs): @@ -314,7 +336,8 @@ def block_ignore_output(self, *args, **kwargs): class ChunkedPool(VimapPool): - worker_routine_class = vimap.chunked_real_worker_routine.ChunkedWorkerRoutine + worker_routine_class = vimap.chunked_real_worker_routine \ + .ChunkedWorkerRoutine def __init__(self, *args, **kwargs): self.default_chunk_size = kwargs.pop( @@ -332,26 +355,32 @@ def imap(self, input_sequence, chunk_size=None): """By default, using the regular vimap API (in this case, imap) will automatically chunk input. """ - chunk_size = (self.default_chunk_size if chunk_size is None else chunk_size) + chunk_size = (self.default_chunk_size + if chunk_size is None else chunk_size) self.check_chunk_size(chunk_size) - return self.imap_chunks(vimap.util.chunk(input_sequence, chunk_size)) + return self.imap_chunks( + vimap.util.chunk(input_sequence, chunk_size)) def zip_in_out_typ(self, *args, **kwargs): """By default, the regular vimap API will un-chunk output. So, as we get back output from the base API, we yield an input-output pair for everything in the chunk. """ - for inp, output, typ in self.zip_in_out_typ_chunks(*args, **kwargs): + for inp, output, typ in self.zip_in_out_typ_chunks(*args, + **kwargs): if typ == 'output': assert len(inp) == len(output) for in_elt, out_elt in zip(inp, output): yield in_elt, out_elt, typ else: - # To have a consistent API, we untuple the input and say the exception + # To have a consistent API, we untuple the + # input and say the exception # happened on the first element (which may not be true); if the # input is malformed in any way (or the NO_INPUT token), we # pass it through unprocessed. - inp = (inp[0] if (isinstance(inp, (list, tuple)) and inp) else inp) + inp = (inp[0] if (isinstance(inp, + (list, + tuple)) and inp) else inp) yield inp, output, typ # Chunked API -- provides a lower-level API to directly enqueue or consume @@ -374,7 +403,8 @@ def zip_in_out_chunks(self, *args, **kwargs): """To provide chunked APIs for all methods, we also add a zip_in_out_chunks() which is like zip_in_out_typ_chunks but filters on typ == 'output'. """ - for inp, output, typ in self.zip_in_out_typ_chunks(*args, **kwargs): + for inp, output, typ in self.zip_in_out_typ_chunks(*args, + **kwargs): if typ == 'output': yield inp, output @@ -392,7 +422,8 @@ def _fork_identical_base(fork_method, worker_fcn, *args, **kwargs): arguments. ''' num_workers = kwargs.pop('num_workers', multiprocessing.cpu_count()) - return fork_method(worker_fcn.init_args(*args, **kwargs) for _ in range(num_workers)) + return fork_method(worker_fcn.init_args(*args, **kwargs) + for _ in range(num_workers)) def fork_identical(*args, **kwargs):