diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 0fdb220..a3ff469 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -12,14 +12,14 @@ on: jobs: build: - runs-on: ubuntu-20.04 + runs-on: ubuntu-24.04 strategy: matrix: - python-version: ["3.7", "3.9", "3.11"] + python-version: ["3.8", "3.9", "3.11"] steps: - uses: actions/checkout@v2 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - name: Install dependencies @@ -34,8 +34,8 @@ jobs: run: | python setup.py bdist_wheel - name: Archive artifact - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v4 with: - name: dist + name: dist-${{ matrix.python-version }} path: | dist diff --git a/parallelpipe.py b/parallelpipe.py index db7a199..a805d2f 100644 --- a/parallelpipe.py +++ b/parallelpipe.py @@ -5,10 +5,8 @@ \ map / """ from multiprocessing import Process, Queue -from time import sleep -import inspect -import collections from collections.abc import Iterable +import time import dill @@ -87,6 +85,11 @@ def run(self): for i in range(self._num_followers): put_item(EXIT) self._que_err.put(EXIT) + while not self._que_out.empty(): + time.sleep(0.1) + while not self._que_err.empty(): + time.sleep(0.1) + class Stage(object): """Represent a pool of parallel tasks that perform the same type of action on the input.""" diff --git a/setup.py b/setup.py index 137c485..95c9a94 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ setup( name='parallelpipe', - version='0.3.0', + version='0.3.1', author='Giuseppe Tribulato', author_email='gtsystem@gmail.com', py_modules=['parallelpipe'], diff --git a/tests/test_parallelpipe.py b/tests/test_parallelpipe.py index 4deb1e1..be61776 100644 --- a/tests/test_parallelpipe.py +++ b/tests/test_parallelpipe.py @@ -1,5 +1,6 @@ import unittest from parallelpipe import Stage, TaskException, stage, map_stage +from time import sleep def t1(x, fail_at=None): """Produce values from the given input iterator. @@ -124,7 +125,7 @@ def test_exception_propagation(self): reducer = Stage(t3, sum).setup(workers=2, qsize=3) pipe = producer | mapper | reducer - with self.assertRaisesRegexp(TaskException, "failed at 200"): + with self.assertRaisesRegex(TaskException, "failed at 200"): for res in pipe.results(): pass @@ -132,7 +133,7 @@ def test_exception_propagation(self): producer = Stage(t1, range(1000), 10).setup(workers=2, qsize=10) pipe = producer | mapper | reducer - with self.assertRaisesRegexp(TaskException, "failed at 10"): + with self.assertRaisesRegex(TaskException, "failed at 10"): for res in pipe.results(): pass @@ -178,10 +179,37 @@ def consume(n): res = (range(1000) | fail2(5) | consume).execute() self.assertEqual(res, 10) - with self.assertRaisesRegexp(TaskException, "failure"): + with self.assertRaisesRegex(TaskException, "failure"): (range(1000) | fail1(5) | consume).execute() + + def test_slow_second_stage(self): + + @stage(workers=2) + def mapit(it): + for item in it: + yield item + 1 + @stage(workers=1) + def reduce(it): + sleep(3) # simulate a long startup time + tot = 0 + for item in it: + tot += item + sleep(2) + yield 5 + yield tot + + @stage(workers=2) + def write(it): + for item in it: + yield item + + res = list(([1] | mapit | reduce | write).results()) + self.assertEqual(res, [5, 2]) + if __name__ == '__main__': + import multiprocessing + multiprocessing.set_start_method('fork', force=False) unittest.main()