diff --git a/docs/cookbook.md b/docs/cookbook.md index c8e8816..e8f1bcf 100644 --- a/docs/cookbook.md +++ b/docs/cookbook.md @@ -672,8 +672,15 @@ define a [`filter_context()`][labtech.types.Task.filter_context] in order to only pass necessary parts of the context to each task. If you are running a Lab with with `runner_backend='fork'` (the -default on Linux), then you can rely on Labtech to share results and -context between task processes using shared memory. +default on Linux), then you can rely on Labtech to share the context +between task processes using shared memory. Furthermore, +`runner_backend='fork-per-task'` will also share task results between +processes using shared memory, but at the cost of forking a new +subprocess for each task - `runner_backend='fork-per-task'` is best +used when dependency task results are large (so time will be saved +through memory sharing) compared to the overall number of tasks (for +large numbers of tasks, forking a separate process for each may be a +substantial overhead). ### How can I see when a task was run and how long it took to execute? @@ -688,9 +695,9 @@ print(f'The task execution took: {aggregation_task.result_meta.duration}') ### How can I access the results of intermediate/dependency tasks? -To conserve memory, labtech's default behaviour is to unload the -results of intermediate/dependency tasks once their directly dependent -tasks have finished executing. +To conserve memory, labtech unloads the results of +intermediate/dependency tasks once their directly dependent tasks have +finished executing. A simple approach to access the results of an intermediate task may simply be to include it's results as part of the result of the task diff --git a/docs/runners.md b/docs/runners.md index 68af181..0cdaa71 100644 --- a/docs/runners.md +++ b/docs/runners.md @@ -4,12 +4,17 @@ You can control how tasks are executed in parallel by specifying an instance of one of the following Runner Backend classes for the `runner_backend` argument of your [`Lab`][labtech.Lab]: -::: labtech.runners.ForkRunnerBackend +::: labtech.runners.SpawnPoolRunnerBackend options: heading_level: 3 show_source: False -::: labtech.runners.SpawnRunnerBackend +::: labtech.runners.ForkPoolRunnerBackend + options: + heading_level: 3 + show_source: False + +::: labtech.runners.ForkPerTaskRunnerBackend options: heading_level: 3 show_source: False diff --git a/examples/cookbook.ipynb b/examples/cookbook.ipynb index cccb062..c4ad341 100644 --- a/examples/cookbook.ipynb +++ b/examples/cookbook.ipynb @@ -11,7 +11,7 @@ "You can also run this cookbook as an [interactive\n", "notebook](https://mybinder.org/v2/gh/ben-denham/labtech/main?filepath=examples/cookbook.ipynb)." ], - "id": "364635a1-54b3-4bba-b443-2ad252916653" + "id": "018a57d7-11f4-490b-b726-e6c183758c1b" }, { "cell_type": "code", @@ -21,7 +21,7 @@ "source": [ "%pip install labtech fsspec mlflow pandas scikit-learn setuptools" ], - "id": "5d00f3c1-3365-4599-8655-6bcd77ee7f27" + "id": "03016d94-70c8-4dde-890e-20d7fd2aea33" }, { "cell_type": "code", @@ -31,7 +31,7 @@ "source": [ "!mkdir storage" ], - "id": "fd946354-cf8f-4039-b903-522d4c796a4b" + "id": "b63bc804-ff16-4f9a-aa57-adfa0477f713" }, { "cell_type": "code", @@ -50,7 +50,7 @@ "digits_X, digits_y = datasets.load_digits(return_X_y=True)\n", "digits_X = StandardScaler().fit_transform(digits_X)" ], - "id": "a2a4329d-0788-4188-b39f-3e01d36727b3" + "id": "63c832d8-2ba5-4cbd-8cd8-6cdc8dd5ae41" }, { "cell_type": "markdown", @@ -64,7 +64,7 @@ "is sent to `STDOUT` (e.g. calls to `print()`) or `STDERR` (e.g. uncaught\n", "exceptions) will also be captured and logged:" ], - "id": "8304d056-7765-439b-bca7-68704d528a5f" + "id": "9690c1c2-ba16-4ea7-aa57-36796b01dec2" }, { "cell_type": "code", @@ -91,7 +91,7 @@ "lab = labtech.Lab(storage=None)\n", "results = lab.run_tasks(experiments)" ], - "id": "cbe87a89-a25e-49e4-8dcd-83bc108198bd" + "id": "7503db7d-0335-4502-836a-2150e48b1927" }, { "cell_type": "markdown", @@ -130,7 +130,7 @@ "learning model (like `LRClassifierTask` below), and then make a task of\n", "that type a parameter for your primary experiment task:" ], - "id": "d4983b56-b663-4ac6-ae08-03a615f60936" + "id": "2df30640-45bf-4d43-9528-33e2ca826db8" }, { "cell_type": "code", @@ -171,7 +171,7 @@ "lab = labtech.Lab(storage=None)\n", "results = lab.run_tasks([experiment])" ], - "id": "1ae81ffa-08e9-4c35-a8d6-d1364775ff80" + "id": "37132698-d3f7-49b4-b17b-77727d4ae493" }, { "cell_type": "markdown", @@ -182,7 +182,7 @@ "[Protocol](https://docs.python.org/3/library/typing.html#typing.Protocol)\n", "that defines their common result type:" ], - "id": "475d760e-3c26-4002-a3e0-442fcd909d82" + "id": "18373985-4a11-4a4a-9102-f38894034e07" }, { "cell_type": "code", @@ -242,7 +242,7 @@ "lab = labtech.Lab(storage=None)\n", "results = lab.run_tasks(experiments)" ], - "id": "023ad2b6-5ef6-4020-b248-6a6c6727f29b" + "id": "5e68d591-4290-4688-9b09-30ed6f26341e" }, { "cell_type": "markdown", @@ -262,7 +262,7 @@ "> `Enum` must support equality between identical (but distinct) object\n", "> instances." ], - "id": "ebe47933-08da-4389-9311-84c4e8f961cd" + "id": "439c38de-bf0f-4e55-9002-f94645754724" }, { "cell_type": "code", @@ -310,7 +310,7 @@ "lab = labtech.Lab(storage=None)\n", "results = lab.run_tasks(experiments)" ], - "id": "4e346a5a-5d55-4652-8145-4b4f0ade903f" + "id": "99a44858-f1fd-4afe-8a7b-78249dc99ccd" }, { "cell_type": "markdown", @@ -333,7 +333,7 @@ "The following example demonstrates specifying a `dataset_key` parameter\n", "to a task that is used to look up a dataset from the lab context:" ], - "id": "36c5721d-82c8-41a2-a84a-d4f700602dbd" + "id": "49b5a378-0716-486b-98c0-7f83100c4d74" }, { "cell_type": "code", @@ -370,7 +370,7 @@ ")\n", "results = lab.run_tasks(experiments)" ], - "id": "aef913b8-bda7-4581-8968-917d8dff2872" + "id": "0f312541-c489-40d9-90fc-6125b3b47591" }, { "cell_type": "markdown", @@ -388,7 +388,7 @@ "cross-validation within the task using a number of workers specified in\n", "the lab context as `within_task_workers`:" ], - "id": "6a4317f8-c586-44e9-94b2-266f4cfa2e3a" + "id": "a1e3b0d3-183a-4dc6-a78a-47385fd17180" }, { "cell_type": "code", @@ -429,7 +429,7 @@ ")\n", "results = lab.run_tasks(experiments)" ], - "id": "09babc37-e996-4eea-80b3-779304159437" + "id": "0f769852-2804-4c92-8160-1dc9f8ce11ce" }, { "cell_type": "markdown", @@ -456,7 +456,7 @@ "raised during the execution of a task will be logged, but the execution\n", "of other tasks will continue:" ], - "id": "e44e3be7-7c52-4fb5-b6f3-a58666c91a50" + "id": "094a2009-e173-4e0e-b3c9-7baf023641e3" }, { "cell_type": "code", @@ -469,7 +469,7 @@ " continue_on_failure=True,\n", ")" ], - "id": "b7e09493-a45c-4898-889d-298c640a5234" + "id": "a62c7ae5-91e4-46b4-a35d-56ab1db69d29" }, { "cell_type": "markdown", @@ -488,7 +488,7 @@ "sub-class for that extension so that you can continue using caches for\n", "the base class:" ], - "id": "704ea6e1-ff9a-4a27-baeb-056dc97a119a" + "id": "f3f9b209-6fbd-4e5c-afcd-612ea0653d93" }, { "cell_type": "code", @@ -512,7 +512,7 @@ " base_result = super().run()\n", " return base_result * self.multiplier" ], - "id": "a66736a2-0583-4ab3-be9e-4175fa5aedef" + "id": "0c8dbd57-b80c-4346-bda7-537877e92c71" }, { "cell_type": "markdown", @@ -524,7 +524,7 @@ "all cached task instances for a list of task types. You can then “run”\n", "the tasks to load their cached results:" ], - "id": "ebddf631-7c9d-40c7-a48a-345638749367" + "id": "94c8f311-8ded-46ac-a693-d696b9c9fb25" }, { "cell_type": "code", @@ -535,7 +535,7 @@ "cached_cvexperiment_tasks = lab.cached_tasks([CVExperiment])\n", "results = lab.run_tasks(cached_cvexperiment_tasks)" ], - "id": "97da569d-3d96-41df-bd53-8b2dea62beb8" + "id": "90c62001-0620-4f58-95c9-b95f31cc38bd" }, { "cell_type": "markdown", @@ -546,7 +546,7 @@ "You can clear the cache for a list of tasks using the `uncache_tasks()`\n", "method of a `Lab` instance:" ], - "id": "566dd0b8-1f88-498f-bc0c-6e549f976724" + "id": "96458c81-c7dc-4bba-ab39-849571597207" }, { "cell_type": "code", @@ -556,7 +556,7 @@ "source": [ "lab.uncache_tasks(cached_cvexperiment_tasks)" ], - "id": "bbd00906-d7a7-4329-821f-c360bae7dba5" + "id": "1eaa11f0-de72-42f7-a157-221256acfaf8" }, { "cell_type": "markdown", @@ -565,7 +565,7 @@ "You can also ignore all previously cached results when running a list of\n", "tasks by passing the `bust_cache` option to `run_tasks()`:" ], - "id": "d553cb51-9b41-4282-b3a3-f3b308fa2bf3" + "id": "f7ad7ac6-e3ef-4850-acb5-8a8ec24ec6e4" }, { "cell_type": "code", @@ -575,7 +575,7 @@ "source": [ "lab.run_tasks(cached_cvexperiment_tasks, bust_cache=True)" ], - "id": "7b53d28b-42d0-4468-b0aa-118bb69798a7" + "id": "d9bc2ae9-6d7b-4424-83a1-b70c1b744b6d" }, { "cell_type": "markdown", @@ -587,7 +587,7 @@ "(i.e. most changes to the `run()` method or the code it depends on) you\n", "should add or updated the `code_version` in `@task`. For example:" ], - "id": "09f964d7-3377-40d9-a722-c5964a51d7f4" + "id": "15457df7-41dd-4a07-a7d9-bb794f323b67" }, { "cell_type": "code", @@ -599,7 +599,7 @@ "class Experiment:\n", " ..." ], - "id": "315bb5b2-6c0e-4d61-b5a5-b804e349cd95" + "id": "77d7e97f-d244-45ff-8f35-5fb6d8a34013" }, { "cell_type": "markdown", @@ -614,7 +614,7 @@ "results where the `code_version` does not match the\n", "`current_code_version`:" ], - "id": "dbb8113e-d1e1-4233-a806-a5a086c8d23b" + "id": "558730bd-60d8-4016-b81c-278f3270ebcd" }, { "cell_type": "code", @@ -632,7 +632,7 @@ "]\n", "lab.uncache_tasks(stale_cached_tasks)" ], - "id": "4f097252-6644-43dc-bbb6-332c20b1ca11" + "id": "1bee9543-e6c8-4d12-aaba-f0af650a1134" }, { "cell_type": "markdown", @@ -656,7 +656,7 @@ "consider using a\n", "[`TypeDict`](https://docs.python.org/3/library/typing.html#typing.TypedDict):" ], - "id": "23739b98-86a3-44e0-9d19-cd97d5549ab8" + "id": "af017217-bd24-47a4-b577-a338aecacaf3" }, { "cell_type": "code", @@ -683,7 +683,7 @@ " model_weights=np.array([self.seed, self.seed ** 2]),\n", " )" ], - "id": "9823a688-dffe-4993-8121-0a2207bbf374" + "id": "72cb728b-20d3-4e47-b1fa-a00f7fd6f7c5" }, { "cell_type": "markdown", @@ -701,7 +701,7 @@ "The following example demonstrates defining and using a custom cache\n", "type to store Pandas DataFrames as parquet files:" ], - "id": "dca9209d-15fe-434a-9612-009449b4eeb1" + "id": "66b47a9a-3deb-495c-84d1-7e7dac8aab7d" }, { "cell_type": "code", @@ -743,7 +743,7 @@ "lab = labtech.Lab(storage='storage/parquet_example')\n", "lab.run_tasks([TabularTask()])" ], - "id": "71d1a759-e719-4342-903e-e0806d4e50d6" + "id": "e5ec6c85-34ba-4df5-8936-191c96688b0a" }, { "cell_type": "markdown", @@ -765,7 +765,7 @@ "storage providers like [Azure Blob\n", "Storage](https://github.com/fsspec/adlfs)." ], - "id": "1a91866a-a191-4115-bd2f-c524c6a13444" + "id": "5c19dc03-4ded-4569-904a-54e58d796ef7" }, { "cell_type": "code", @@ -775,7 +775,7 @@ "source": [ "%pip install s3fs" ], - "id": "8913a01c-01d9-43e4-86c4-ca568e639c80" + "id": "3e71090e-6555-4fa8-a2e9-edca1ce6e193" }, { "cell_type": "code", @@ -819,7 +819,7 @@ ")\n", "results = lab.run_tasks(experiments)" ], - "id": "47d2a888-c751-47dd-b2c8-863d01a4f635" + "id": "fabb8e24-cd23-4073-a340-7421d6e3517d" }, { "cell_type": "markdown", @@ -850,7 +850,7 @@ "`AggregationTask` to aggregate the results from many individual tasks to\n", "create an aggregated cache that can be loaded more efficiently:" ], - "id": "031466a8-d4db-4179-ab94-9c8b92421965" + "id": "dd941f48-da9b-4299-88d9-2871c8425c78" }, { "cell_type": "code", @@ -891,7 +891,7 @@ "lab = labtech.Lab(storage='storage/aggregation_lab')\n", "result = lab.run_task(aggregation_task)" ], - "id": "0c85100b-1c2e-41b6-a2e0-97c1865d0b33" + "id": "72ba856c-2b12-491c-bd02-220fd5da8012" }, { "cell_type": "markdown", @@ -910,8 +910,15 @@ "only pass necessary parts of the context to each task.\n", "\n", "If you are running a Lab with with `runner_backend='fork'` (the default\n", - "on Linux), then you can rely on Labtech to share results and context\n", - "between task processes using shared memory.\n", + "on Linux), then you can rely on Labtech to share the context between\n", + "task processes using shared memory. Furthermore,\n", + "`runner_backend='fork-per-task'` will also share task results between\n", + "processes using shared memory, but at the cost of forking a new\n", + "subprocess for each task - `runner_backend='fork-per-task'` is best used\n", + "when dependency task results are large (so time will be saved through\n", + "memory sharing) compared to the overall number of tasks (for large\n", + "numbers of tasks, forking a separate process for each may be a\n", + "substantial overhead).\n", "\n", "### How can I see when a task was run and how long it took to execute?\n", "\n", @@ -919,7 +926,7 @@ "it was originally executed and how long it took to execute from the\n", "task’s `.result_meta` attribute:" ], - "id": "d7006021-4f19-40da-8084-a39a51083a99" + "id": "3a70d99f-5623-4a28-8c5d-7638b433266c" }, { "cell_type": "code", @@ -930,7 +937,7 @@ "print(f'The task was executed at: {aggregation_task.result_meta.start}')\n", "print(f'The task execution took: {aggregation_task.result_meta.duration}')" ], - "id": "264e1538-7e09-4206-9724-6dde000f67af" + "id": "57ab3724-e692-4dd3-a6a5-79c8d5764864" }, { "cell_type": "markdown", @@ -938,9 +945,9 @@ "source": [ "### How can I access the results of intermediate/dependency tasks?\n", "\n", - "To conserve memory, labtech’s default behaviour is to unload the results\n", - "of intermediate/dependency tasks once their directly dependent tasks\n", - "have finished executing.\n", + "To conserve memory, labtech unloads the results of\n", + "intermediate/dependency tasks once their directly dependent tasks have\n", + "finished executing.\n", "\n", "A simple approach to access the results of an intermediate task may\n", "simply be to include it’s results as part of the result of the task that\n", @@ -950,7 +957,7 @@ "Another approach is to include all of the intermediate tasks for which\n", "you wish to access the results for in the call to `run_tasks()`:" ], - "id": "d24b1665-5e74-43da-9aac-61d28c6d6d6a" + "id": "1645b811-e319-43b6-bd7d-b0cf94fb2ed5" }, { "cell_type": "code", @@ -978,7 +985,7 @@ " for experiment in experiments\n", "])" ], - "id": "48a86572-37e4-4bf5-a35f-ae2008af8b31" + "id": "a5a73bed-a845-415b-bbc4-ec2e2a0cf2c4" }, { "cell_type": "markdown", @@ -994,7 +1001,7 @@ "This is modeled in labtech by defining a task type for each step, and\n", "having each step depend on the result from the previous step:" ], - "id": "1ec8595a-148e-48db-acc2-e84090200d2c" + "id": "707ce19f-7485-4af4-89c8-5573d61e3f9d" }, { "cell_type": "code", @@ -1044,7 +1051,7 @@ "result = lab.run_task(task_c)\n", "print(result)" ], - "id": "592413ea-cf8f-4cd2-8cb8-89e77e1e5629" + "id": "f2b1a32d-3202-436f-b2f3-cd7cf963ee74" }, { "cell_type": "markdown", @@ -1056,7 +1063,7 @@ "[Mermaid diagram](https://mermaid.js.org/syntax/classDiagram.html) of\n", "task types for a given list of tasks:" ], - "id": "82bf7154-b81d-4cb4-8792-af0f1b0e180b" + "id": "00f1c855-33a3-446c-abec-33ad9114bc87" }, { "cell_type": "code", @@ -1071,7 +1078,7 @@ " direction='RL',\n", ")" ], - "id": "002c3981-5afc-4641-a0a9-2cc1490463ea" + "id": "15d93d7b-2a49-455a-a8c0-e5f78c5c1bab" }, { "cell_type": "markdown", @@ -1095,7 +1102,7 @@ "additional tracking calls (such as `mlflow.log_metric()` or\n", "`mlflow.log_model()`) in the body of your task’s `run()` method:" ], - "id": "1dff695d-201b-425e-9dee-5714b01aad81" + "id": "3309dd84-6557-496b-adac-9a8072e754e6" }, { "cell_type": "code", @@ -1142,7 +1149,7 @@ "lab = labtech.Lab(storage=None)\n", "results = lab.run_tasks(runs)" ], - "id": "ebf1abbd-92c3-4308-b4be-4b92e1645960" + "id": "6ea43d30-4ec6-4487-baec-57c89f552923" }, { "cell_type": "markdown", @@ -1191,7 +1198,7 @@ "non-definition code for a Python script in a `main()` function, and then\n", "guard the call to `main()` with `__name__ == '__main__'`:" ], - "id": "99ffcf6d-0ef9-4d30-b2f2-90daec2e1578" + "id": "e955e54a-e479-4467-9d5a-14975d39ff24" }, { "cell_type": "code", @@ -1222,7 +1229,7 @@ "if __name__ == '__main__':\n", " main()" ], - "id": "2978a1cc-a541-4df8-adf4-4deaca7ee629" + "id": "5be23e12-734c-4e38-bcfa-86f1e0a9624a" }, { "cell_type": "markdown", @@ -1231,25 +1238,31 @@ "For details, see [Safe importing of main\n", "module](https://docs.python.org/3/library/multiprocessing.html#multiprocessing-safe-main-import).\n", "\n", - "### Why do I see the following error: `AttributeError: Can't get attribute 'YOUR_TASK_CLASS' on `?\n", + "### Why do I see the following error: `RunnerError: Unable to submit YourTaskType tasks to SpawnProcessRunner because the task type is defined in the __main__ module from an interactive Python session`?\n", "\n", - "You will see this error (as part of a very long stack trace) when\n", - "running a Lab with `runner_backend='spawn'` (the default on macOS and\n", - "Windows) from an interactive Python shell.\n", + "You may see this error when running a Lab with `runner_backend='spawn'`\n", + "(the default on macOS and Windows) from an interactive Python shell\n", + "(e.g. a Jupyter notebook session or a Python script).\n", "\n", - "The solution to this error is to define all of your labtech `Task` types\n", - "in a separate `.py` Python module file which you can import into your\n", - "interactive shell session (e.g. `from my_module import MyTask`).\n", + "The solution to this error is to define all of the classes you are using\n", + "from your labtech context and tasks (including task types) in a separate\n", + "`.py` Python module file which you can import into your interactive\n", + "shell session (e.g. `from my_module import MyClass`).\n", "\n", "The reason for this error is that “spawned” task subprocesses will not\n", "receive a copy the current state of your `__main__` module (which\n", "contains the variables you declare interactively in the Python shell,\n", - "including task definitions). This error does not occur with\n", + "including class definitions). This error does not occur with\n", "`runner_backend='fork'` (the default on Linux) because forked\n", "subprocesses *do* receive the current state of all modules (including\n", - "`__main__`) from the parent process." + "`__main__`) from the parent process.\n", + "\n", + "### Why do I see the following error: `AttributeError: Can't get attribute 'YOUR_CLASS' on `?\n", + "\n", + "[See the answer to the question directly\n", + "above.](#spawn-interactive-main)" ], - "id": "63ad8a10-fe70-461d-8087-56a3e300531c" + "id": "7f15e42b-1c86-4c68-87b6-c162559c5347" } ], "nbformat": 4, diff --git a/examples/tutorial.ipynb b/examples/tutorial.ipynb index a125298..9c3edd3 100644 --- a/examples/tutorial.ipynb +++ b/examples/tutorial.ipynb @@ -15,7 +15,7 @@ "Before we begin, let’s install `labtech` along with some other\n", "dependencies we will use in this tutorial:" ], - "id": "2d2a9b34-b2b2-4048-abaa-f6d0f45e9288" + "id": "c02059a1-5983-4a6b-a1c8-cf30fa5dc97a" }, { "cell_type": "code", @@ -25,7 +25,7 @@ "source": [ "%pip install labtech mlflow scikit-learn" ], - "id": "711274c8-add0-4e88-9bce-3db22e7f1174" + "id": "55a83c61-7be7-412e-9752-119bc993d356" }, { "cell_type": "markdown", @@ -34,7 +34,7 @@ "Let’s also clear any caches that were created by previous runs of this\n", "tutorial:" ], - "id": "d11d3ff3-5fe0-420f-81b5-6339ae76e028" + "id": "2cc1deac-2f64-4611-9525-5b27e852c8dd" }, { "cell_type": "code", @@ -45,7 +45,7 @@ "!rm -rf storage/tutorial/\n", "!mkdir -p storage/tutorial/" ], - "id": "58b8ed8d-9ce1-4c54-b6e0-c05789dc762c" + "id": "80d4c40c-a541-474c-9104-eb3a8f29b29c" }, { "cell_type": "markdown", @@ -56,7 +56,7 @@ "To get started, we’ll take the following simple machine learning\n", "experiment code and convert it to be run with labtech." ], - "id": "7fd5137c-c633-4fd3-8319-bed9a9a3620d" + "id": "3409c920-4c4d-4af5-802e-29ecbbd86886" }, { "cell_type": "code", @@ -81,7 +81,7 @@ "\n", "print(f'{log_loss(digits_y, prob_y) = :.3}')" ], - "id": "6560b421-93c5-499b-99b9-3c6fe02e9533" + "id": "472d811c-e437-474e-b50a-dfac4ad4f684" }, { "cell_type": "markdown", @@ -119,7 +119,7 @@ "method that performs the experiment and returns its result (the\n", "predicted probabilities):" ], - "id": "378bf464-0012-403f-9b62-445b1d67cdcb" + "id": "1fea2cc0-c772-4ffb-9244-5fcbbbe7d609" }, { "cell_type": "code", @@ -144,7 +144,7 @@ " prob_y = clf.predict_proba(digits_X)\n", " return prob_y" ], - "id": "8a375c74-8dd1-4838-94ff-eeb5a3c1df63" + "id": "6a17aed9-a8fe-4202-b470-e8e3cb6a989f" }, { "cell_type": "markdown", @@ -155,7 +155,7 @@ "`storage/tutorial/classification_lab_1` and to display notebook-friendly\n", "progress bars:" ], - "id": "b79cb5be-18e9-49cf-b10a-ebd2d9307ea9" + "id": "f7970afd-7549-473d-a933-3c8fb7af293e" }, { "cell_type": "code", @@ -165,7 +165,7 @@ "source": [ "lab = labtech.Lab(storage='storage/tutorial/classification_lab_1')" ], - "id": "b3ba5acf-f1b0-43e5-b513-5a0844c39bf4" + "id": "76ffe679-ad49-4656-abdf-d1a75e7e1a6e" }, { "cell_type": "markdown", @@ -176,7 +176,7 @@ "probabilities returned by the task’s `run()` method, so we can calculate\n", "the loss from them as before:" ], - "id": "28e799ad-b37f-485a-9205-53ac39f14525" + "id": "7b400a63-52bd-42e4-bcd9-91e401ead91a" }, { "cell_type": "code", @@ -188,7 +188,7 @@ "prob_y = lab.run_task(classifier_experiment)\n", "print(f'{log_loss(digits_y, prob_y) = :.3}')" ], - "id": "2404f4bb-1448-44b7-925f-a290c99dba4c" + "id": "02ab72c7-e8fe-441a-afdf-4b37abd3be71" }, { "cell_type": "markdown", @@ -199,7 +199,7 @@ "calls to run the same experiment (even after restarting Python) will\n", "load the result from the cache:" ], - "id": "8f357f93-98ab-4110-bff3-41f8aa20cc9c" + "id": "da5bbe14-be2d-4c3a-b348-3361998c48e2" }, { "cell_type": "code", @@ -210,7 +210,7 @@ "prob_y = lab.run_task(classifier_experiment)\n", "print(f'{log_loss(digits_y, prob_y) = :.3}')" ], - "id": "15975fa5-7b33-4359-be2c-35e6b6702479" + "id": "feb26567-f71b-42e9-8228-598673b4c0e7" }, { "cell_type": "markdown", @@ -227,7 +227,7 @@ "(or we could pass a list of tasks to `lab.run_tasks()`, as we will see\n", "in the next section of this tutorial)." ], - "id": "4bd3abc6-8c1a-4fd9-9d49-16336c552565" + "id": "f2df1d77-ac18-434e-8009-0bd97d23591a" }, { "cell_type": "code", @@ -239,7 +239,7 @@ " ClassifierExperiment,\n", "])" ], - "id": "d047c769-0310-4578-b7b6-5c92ba4c6718" + "id": "2da17b35-add0-4d3d-be2b-f0163d318d3a" }, { "cell_type": "markdown", @@ -257,7 +257,7 @@ "You may like to save storage space by clearing up old cached results\n", "with `lab.uncache_tasks()`:" ], - "id": "e2a2ef05-6298-4df7-b951-f7dde9a3f599" + "id": "af684fc9-ccce-42de-94d4-7f3761cf8bff" }, { "cell_type": "code", @@ -269,7 +269,7 @@ " classifier_experiment,\n", "])" ], - "id": "ef7da846-0ed5-4db7-8b6b-1451f66bb657" + "id": "b451fc76-ecb3-4531-9069-d3cbc38b3ea5" }, { "cell_type": "markdown", @@ -286,7 +286,7 @@ "the same way as\n", "[dataclass](https://docs.python.org/3/library/dataclasses.html) fields:" ], - "id": "b8d8545a-751d-4302-b8c9-f20cbfba3de2" + "id": "20e53d43-a028-4dba-8527-530b6209cbf9" }, { "cell_type": "code", @@ -312,7 +312,7 @@ " prob_y = clf.predict_proba(digits_X)\n", " return prob_y" ], - "id": "eca46278-fa75-4574-9a10-30499c3b00f1" + "id": "a650f30a-4747-44cb-86d9-b42a4e70bad7" }, { "cell_type": "markdown", @@ -321,7 +321,7 @@ "Now we’ll use a list comprehension to construct a list of\n", "`ClassifierExperiment` tasks with different `n_estimators` values:" ], - "id": "67ceaa46-a471-43ef-8cf9-a205a0fe4a14" + "id": "3c670ff2-5805-4cd1-b0fe-b6cf0c8c79ff" }, { "cell_type": "code", @@ -336,7 +336,7 @@ " for n_estimators in range(1, 11)\n", "]" ], - "id": "ee86c550-e398-4d86-a051-a901852e2938" + "id": "e1721efc-15d1-440d-8cfd-fe7594d66b69" }, { "cell_type": "markdown", @@ -350,7 +350,7 @@ "caches for the new definition separate by constructing a new lab that\n", "uses a different storage directory:" ], - "id": "6ae023f0-8598-461f-a2f0-d05e4d332865" + "id": "8171c8fa-41e0-49b1-ab20-807dd40b618d" }, { "cell_type": "code", @@ -361,7 +361,7 @@ "lab = labtech.Lab(storage='storage/tutorial/classification_lab_2')\n", "results = lab.run_tasks(classifier_experiments)" ], - "id": "cbdeb082-8d17-4022-ae14-876bca5fc75a" + "id": "da9e0411-4bc4-428d-8b62-4c1cc0e2a1d4" }, { "cell_type": "markdown", @@ -371,7 +371,7 @@ "result it returned, which we can loop over to print loss metrics for\n", "each experiment:" ], - "id": "e2416e76-1c18-49ea-8686-95aec7254916" + "id": "9b13c6ca-d6e5-4c31-b808-032fdc93797a" }, { "cell_type": "code", @@ -382,7 +382,7 @@ "for experiment, prob_y in results.items():\n", " print(f'{experiment}: {log_loss(digits_y, prob_y) = :.3}')" ], - "id": "9ab73459-3b30-4c4c-a128-13c52488f37f" + "id": "6da4888c-f5c3-4ff6-b844-116476de93ab" }, { "cell_type": "markdown", @@ -408,7 +408,7 @@ "allowing us to access the result from the `.result` attribute of the\n", "task parameter (i.e. `self.classifier_experiment.result`):" ], - "id": "2abae8a6-2b3e-45b9-b91f-9b11ca358cb4" + "id": "99375805-d882-4fd3-8a29-3c017c0354cc" }, { "cell_type": "code", @@ -431,7 +431,7 @@ " min_max_prob_y[np.arange(len(prob_y)), prob_y.argmax(axis=1)] = 1\n", " return min_max_prob_y" ], - "id": "c886b332-faa8-4fa8-9047-1a31fb6be2f4" + "id": "d0658ca3-b867-4f97-a2b1-3bbf4f7327a6" }, { "cell_type": "markdown", @@ -444,7 +444,7 @@ "`MinMaxProbabilityExperiment` is run, re-using results depended on by\n", "multiple tasks and loading previously cached results wherever possible:" ], - "id": "1bf7347f-ea9b-4023-9f00-af93670d353f" + "id": "65fbbd0f-b452-46b1-99ed-7e474633ba29" }, { "cell_type": "code", @@ -463,7 +463,7 @@ "for experiment, prob_y in results.items():\n", " print(f'{experiment}: {log_loss(digits_y, prob_y) = :.3}')" ], - "id": "f51b8877-dfb8-4f4f-a58c-390c6d139858" + "id": "9bfd9f10-82bb-4b4a-ab7b-7d10d3482d87" }, { "cell_type": "markdown", @@ -506,7 +506,7 @@ " `ClassifierExperiment` tasks, the `run()` method first creates\n", " its own copy of the classifier with `clone()`." ], - "id": "ca1788e2-1eb6-4bb0-b764-de76009ecbc6" + "id": "640591ed-88d2-4604-a636-d15238c3178b" }, { "cell_type": "code", @@ -560,7 +560,7 @@ " prob_y = clf.predict_proba(digits_X)\n", " return prob_y" ], - "id": "9f3934b7-738c-4e16-917d-ec5d446e3136" + "id": "e1044da1-abb7-444f-b0ed-7e25b5686ac5" }, { "cell_type": "markdown", @@ -571,7 +571,7 @@ "for each of these `RFClassifierTask` tasks as well as an\n", "`LRClassifierTask` task:" ], - "id": "6c49872c-a7bb-40df-a7fd-41a721ff3a1c" + "id": "06834351-1466-40ea-9d8e-c9bc9c8b0131" }, { "cell_type": "code", @@ -601,7 +601,7 @@ "for experiment, prob_y in results.items():\n", " print(f'{experiment}: {log_loss(digits_y, prob_y) = :.3}')" ], - "id": "f59b8d72-aaa9-4f09-8fd3-20c06344ce49" + "id": "b918c6e3-d7da-421a-88ae-e585ccfe4e06" }, { "cell_type": "markdown", @@ -615,7 +615,7 @@ "our experiments) outside of any task, allowing us to inspect these\n", "datasets before and after the tasks have been run:" ], - "id": "58a0099a-18aa-46b0-a12d-45b2aa027535" + "id": "eafb68f6-c38b-4d6f-b21d-5b7de5e9f59e" }, { "cell_type": "code", @@ -631,7 +631,7 @@ " 'iris': {'X': iris_X, 'y': iris_y},\n", "}" ], - "id": "71970b53-23a0-40c6-affb-f45476ec9092" + "id": "b8b8baf8-3895-4550-bbc1-cc7233335dae" }, { "cell_type": "markdown", @@ -648,7 +648,7 @@ "3. Alter the task generation and evaluation code to handle multiple\n", " datasets." ], - "id": "83eb7eda-06d5-4a6d-8c44-59d9041c6005" + "id": "b108e730-4cc9-433b-a30e-38140bff7093" }, { "cell_type": "code", @@ -694,7 +694,7 @@ " dataset_y = DATASETS[experiment.dataset_key][\"y\"]\n", " print(f'{experiment}: {log_loss(dataset_y, prob_y) = :.3}')" ], - "id": "717aa712-f655-437f-96fd-6523dab6dd53" + "id": "99a0e7d5-5dec-44d9-bd91-2f3b654e4ce3" }, { "cell_type": "markdown", @@ -740,7 +740,7 @@ " `mlflow.set_experiment('example_labtech_experiment')` before the\n", " tasks are run." ], - "id": "cc698fd0-98f7-4bfd-962b-3daed974515f" + "id": "aabb242f-232b-42c5-9c9b-a4a9774cb76d" }, { "cell_type": "code", @@ -912,7 +912,7 @@ "for experiment, result in evaluation_result.items():\n", " print(f'{experiment}: log_loss = {result[\"log_loss\"]:.3}')" ], - "id": "99aa111c-1a86-467b-b9ab-2d5dd81ec678" + "id": "fa57fec5-dde8-4ce9-8a14-43a559c2c23a" }, { "cell_type": "markdown", @@ -923,7 +923,7 @@ "Finally, we can use Labtech to generate a diagram of a list of tasks\n", "that shows all of the task types, parameters, and dependencies:" ], - "id": "14838576-79ea-41f6-b179-974d89cb65d2" + "id": "1189903a-b819-4e30-8a0f-f8dd9c4f901c" }, { "cell_type": "code", @@ -937,7 +937,7 @@ " evaluation_task,\n", "], direction='BT')" ], - "id": "de344d04-26e3-4e8c-bc0e-2a6807f4e23d" + "id": "7a6fd33d-011b-4c22-9377-1142f847d422" }, { "cell_type": "markdown", @@ -999,7 +999,7 @@ "- [More\n", " examples](https://github.com/ben-denham/labtech/tree/main/examples)" ], - "id": "59d2db01-5ae0-4626-b0f6-7a327d957992" + "id": "f97c3e93-0930-4939-b2fd-b66a63146860" } ], "nbformat": 4, diff --git a/labtech/lab.py b/labtech/lab.py index 22bf8f3..b078626 100644 --- a/labtech/lab.py +++ b/labtech/lab.py @@ -12,7 +12,7 @@ from .exceptions import LabError, TaskNotFound from .monitor import TaskMonitor -from .runners import ForkRunnerBackend, SerialRunnerBackend, SpawnRunnerBackend, ThreadRunnerBackend +from .runners import ForkPerTaskRunnerBackend, ForkPoolRunnerBackend, SerialRunnerBackend, SpawnPoolRunnerBackend, ThreadRunnerBackend from .storage import LocalStorage, NullStorage from .tasks import get_direct_dependencies from .types import ResultMeta, is_task, is_task_type @@ -343,21 +343,29 @@ def __init__(self, *, runner_backend: Controls how tasks are run in parallel. It can optionally be set to one of the following options: - * `'fork'`: Uses the - [`ForkRunnerBackend`][labtech.runners.ForkRunnerBackend] - to run each task in a forked subprocess. Memory use - is reduced by sharing the context and dependency task - results between tasks with memory inherited from the - parent process. The default on platforms that support - forked Python subprocesses when `max_workers > 1`: Linux - and other POSIX systems, but not macOS or Windows. * `'spawn'`: Uses the - [`SpawnRunnerBackend`][labtech.runners.SpawnRunnerBackend] - to run each task in a spawned subprocess. The + [`SpawnPoolRunnerBackend`][labtech.runners.SpawnPoolRunnerBackend] + to run tasks on a pool of spawned subprocesses. The context and dependency task results are copied/duplicated into the memory of each subprocess. The default on macOS and Windows when `max_workers > 1`. + * `'fork'`: Uses the + [`ForkPoolRunnerBackend`][labtech.runners.ForkPoolRunnerBackend] + to run tasks on a pool of forked subprocesses. + Memory use is reduced by sharing the context between + tasks with memory inherited from the parent process. + The default on platforms that support forked Python + subprocesses when `max_workers > 1`: Linux and other + POSIX systems, but not macOS or Windows. + * `'fork-per-task'`: Uses the + [`ForkPerTaskRunnerBackend`][labtech.runners.ForkPerTaskRunnerBackend] + to run each task in a forked subprocess. Shares + dependency task results as well as the context in + memory shared between subprocesses but at the cost + of forking a new subprocess for each task. Best used + when dependency task results are large compared to + the overall number of tasks. * `'thread'`: Uses the [`ThreadRunnerBackend`][labtech.runners.ThreadRunnerBackend] to run each task in a separate Python thread. Because @@ -402,18 +410,20 @@ def __init__(self, *, if self.max_workers == 1: runner_backend = ThreadRunnerBackend() elif 'fork' in start_methods: - runner_backend = ForkRunnerBackend() + runner_backend = ForkPoolRunnerBackend() elif 'spawn' in start_methods: - runner_backend = SpawnRunnerBackend() + runner_backend = SpawnPoolRunnerBackend() else: raise LabError(('Default \'fork\' and \'spawn\' multiprocessing runner ' 'backends are not supported on your system.' 'Please specify a system-compatible runner_backend.')) elif isinstance(runner_backend, str): if runner_backend == 'fork': - runner_backend = ForkRunnerBackend() + runner_backend = ForkPoolRunnerBackend() + elif runner_backend == 'fork-per-task': + runner_backend = ForkPerTaskRunnerBackend() elif runner_backend == 'spawn': - runner_backend = SpawnRunnerBackend() + runner_backend = SpawnPoolRunnerBackend() elif runner_backend == 'serial': runner_backend = SerialRunnerBackend() elif runner_backend == 'thread': diff --git a/labtech/runners/__init__.py b/labtech/runners/__init__.py index 8b7fa77..6efa9bf 100644 --- a/labtech/runners/__init__.py +++ b/labtech/runners/__init__.py @@ -1,10 +1,15 @@ -from .process import ForkRunnerBackend, SpawnRunnerBackend +from .process import ( + ForkPerTaskRunnerBackend, + ForkPoolRunnerBackend, + SpawnPoolRunnerBackend, +) from .serial import SerialRunnerBackend from .thread import ThreadRunnerBackend __all__ = [ - 'ForkRunnerBackend', - 'SpawnRunnerBackend', + 'ForkPoolRunnerBackend', + 'SpawnPoolRunnerBackend', + 'ForkPerTaskRunnerBackend', 'SerialRunnerBackend', 'ThreadRunnerBackend', ] diff --git a/labtech/runners/_process_executor.py b/labtech/runners/_process_executor.py new file mode 100644 index 0000000..a2c79bb --- /dev/null +++ b/labtech/runners/_process_executor.py @@ -0,0 +1,220 @@ +from __future__ import annotations + +import functools +import multiprocessing +import os +from dataclasses import dataclass, field +from enum import StrEnum, auto +from itertools import count +from queue import Empty +from threading import Thread +from typing import TYPE_CHECKING + +from labtech.exceptions import TaskDiedError + +if TYPE_CHECKING: + from collections.abc import Callable, Sequence + from multiprocessing.context import BaseContext + from queue import Queue + from typing import Any + + +class FutureStateError(Exception): + pass + + +class FutureState(StrEnum): + PENDING = auto() + CANCELLED = auto() + FINISHED = auto() + + +@dataclass +class ExecutorFuture: + """Representation of a result to be returned in the future by a runner. + + An ExecutorFuture's state transitions between states according to + the following finite state machine: + + * An ExecutorFuture starts in a PENDING state + * A PENDING ExecutorFuture can be transitioned to FINISHED by calling + set_result() or set_exception() + * Any ExecutorFuture can be transitioned to CANCELLED by calling + cancel() + * result() can only be called on a FINISHED ExecutorFuture, and it will + either return the result set by set_result() or raise the exception + set by set_exception() + + """ + # Auto-incrementing ID (does not need to be process-safe because + # all futures are generated in the main process): + id: int = field(default_factory=count().__next__, init=False) + _state: FutureState = FutureState.PENDING + _ex: BaseException | None = None + _result: Any | None = None + + def __eq__(self, other: object) -> bool: + if not isinstance(other, self.__class__): + return False + return id(self.id) == id(other.id) + + def __hash__(self) -> int: + return hash(self.id) + + def done(self) -> bool: + return self._state in {FutureState.FINISHED, FutureState.CANCELLED} + + def cancelled(self) -> bool: + return self._state == FutureState.CANCELLED + + def set_result(self, result: Any): + if self.done(): + raise FutureStateError(f'Attempted to set a result on a {self._state} future.') + self._result = result + self._state = FutureState.FINISHED + + def set_exception(self, ex: BaseException): + if self.done(): + raise FutureStateError(f'Attempted to set an exception on a {self._state} future.') + self._ex = ex + self._state = FutureState.FINISHED + + def cancel(self): + self._state = FutureState.CANCELLED + + def result(self) -> Any: + if self._state != FutureState.FINISHED: + raise FutureStateError(f'Attempted to get result from a {self._state} future.') + if self._ex is not None: + raise self._ex + return self._result + + +def split_done_futures(futures: Sequence[ExecutorFuture]) -> tuple[list[ExecutorFuture], list[ExecutorFuture]]: + done_futures = [] + not_done_futures = [] + for future in futures: + if future.done(): + done_futures.append(future) + else: + not_done_futures.append(future) + return (done_futures, not_done_futures) + + +def _subprocess_target(*, future_id: int, thunk: Callable[[], Any], result_queue: Queue) -> None: + try: + result = thunk() + except BaseException as ex: + result_queue.put((future_id, ex)) + else: + result_queue.put((future_id, result)) + + +class ProcessExecutor: + + def __init__(self, mp_context: BaseContext, max_workers: int | None): + self.mp_context = mp_context + self.max_workers = (os.cpu_count() or 1) if max_workers is None else max_workers + self._pending_future_to_thunk: dict[ExecutorFuture, Callable[[], Any]] = {} + self._running_id_to_future_and_process: dict[int, tuple[ExecutorFuture, multiprocessing.Process]] = {} + # Use a Manager().Queue() to be able to share with subprocesses + self._result_queue: Queue = multiprocessing.Manager().Queue(-1) + + def _start_processes(self): + """Start processes for the oldest pending futures to bring + running process count up to max_workers.""" + start_count = max(0, self.max_workers - len(self._running_id_to_future_and_process)) + futures_to_start = list(self._pending_future_to_thunk.keys())[:start_count] + for future in futures_to_start: + thunk = self._pending_future_to_thunk[future] + del self._pending_future_to_thunk[future] + process = self.mp_context.Process( + target=_subprocess_target, + kwargs=dict( + future_id=future.id, + thunk=thunk, + result_queue=self._result_queue, + ), + ) + self._running_id_to_future_and_process[future.id] = (future, process) + process.start() + + def submit(self, fn: Callable, /, *args, **kwargs) -> ExecutorFuture: + """Schedule the given fn to be called with the given *args and + **kwargs, and return an ExecutorFuture that will be updated + with the outcome of function call.""" + future = ExecutorFuture() + self._pending_future_to_thunk[future] = functools.partial(fn, *args, **kwargs) + self._start_processes() + return future + + def cancel(self) -> None: + """Cancel all pending futures.""" + pending_futures = list(self._pending_future_to_thunk.keys()) + for future in pending_futures: + future.cancel() + del self._pending_future_to_thunk[future] + + def stop(self) -> None: + """Cancel all running futures and immediately terminate their execution.""" + future_process_pairs = list(self._running_id_to_future_and_process.values()) + for future, process in future_process_pairs: + process.terminate() + future.cancel() + del self._running_id_to_future_and_process[future.id] + + def _consume_result_queue(self, *, timeout_seconds: float | None): + # Avoid race condition of a process finishing after we have + # consumed the result_queue by fetching process statuses + # before checking for process completion. + dead_process_futures = [ + future for future, process in self._running_id_to_future_and_process.values() + if not process.is_alive() + ] + + def _consume(): + inner_timeout_seconds = timeout_seconds + while True: + try: + future_id, result_or_ex = self._result_queue.get(True, timeout=inner_timeout_seconds) + except Empty: + break + + # Don't wait for the timeout on subsequent calls to + # self._result_queue.get() + inner_timeout_seconds = 0 + + future, _ = self._running_id_to_future_and_process[future_id] + del self._running_id_to_future_and_process[future_id] + if not future.done(): + if isinstance(result_or_ex, BaseException): + future.set_exception(result_or_ex) + else: + future.set_result(result_or_ex) + + # Consume the result queue in a thread so that it is not + # interrupt by KeyboardInterrupt, which can result in us not + # fully processing a completed result. Despite the fact we are + # using subprocesses, starting a thread at this point should + # be safe because we will not start any subprocesses while + # this is running? + consumer_thread = Thread(target=_consume) + consumer_thread.start() + consumer_thread.join() + + # If any processes have died without the future being + # cancelled or finished, then set an exception for it. + for future in dead_process_futures: + if future.done(): + continue + future.set_exception(TaskDiedError()) + del self._running_id_to_future_and_process[future.id] + + def wait(self, futures: Sequence[ExecutorFuture], *, timeout_seconds: float | None) -> tuple[list[ExecutorFuture], list[ExecutorFuture]]: + """Wait up to timeout_seconds or until at least one of the + given futures is done, then return a list of futures in a done + state and a list of futures in all other states.""" + self._consume_result_queue(timeout_seconds=timeout_seconds) + # Having consumed completed results, start new processes + self._start_processes() + return split_done_futures(futures) diff --git a/labtech/runners/process.py b/labtech/runners/process.py index 7c461e4..4add445 100644 --- a/labtech/runners/process.py +++ b/labtech/runners/process.py @@ -1,289 +1,79 @@ from __future__ import annotations -import functools import logging import multiprocessing -import os import signal import sys from abc import ABC, abstractmethod -from dataclasses import dataclass, field -from enum import StrEnum, auto -from itertools import count +from concurrent.futures import FIRST_COMPLETED, ProcessPoolExecutor +from concurrent.futures import Future as ConcurrentFuture +from concurrent.futures import wait as wait_futures +from dataclasses import dataclass from logging.handlers import QueueHandler from queue import Empty -from threading import Thread -from typing import TYPE_CHECKING, cast +from typing import TYPE_CHECKING, Generic, TypeVar, cast from uuid import uuid4 import psutil -from labtech.exceptions import RunnerError, TaskDiedError +from labtech.exceptions import RunnerError from labtech.monitor import get_process_info from labtech.tasks import get_direct_dependencies from labtech.types import Runner, RunnerBackend from labtech.utils import LoggerFileProxy, get_supported_start_methods, is_interactive, logger +from ._process_executor import ExecutorFuture, ProcessExecutor from .base import run_or_load_task if TYPE_CHECKING: - from collections.abc import Callable, Iterator, Sequence - from multiprocessing.context import BaseContext, SpawnContext + from collections.abc import Iterator, Sequence + from multiprocessing.context import BaseContext from queue import Queue - from typing import Any from uuid import UUID from labtech.types import LabContext, ResultMeta, ResultsMap, Storage, Task, TaskMonitorInfo, TaskResult - if sys.platform != 'win32': - from multiprocessing.context import ForkContext - else: - ForkContext = BaseContext - - -class FutureStateError(Exception): - pass - - -class FutureState(StrEnum): - PENDING = auto() - CANCELLED = auto() - FINISHED = auto() - - -@dataclass -class Future: - """Representation of a result to be returned in the future by a runner. - - A Future's state transitions between states according to the following - finite state machine: - - * A Future starts in a PENDING state - * A PENDING Future can be transitioned to FINISHED by calling - set_result() or set_exception() - * Any Future can be transitioned to CANCELLED by calling cancel() - * result() can only be called on a FINISHED Future, and it will either - return the result set by set_result() or raise the exception set by - set_exception() - - """ - # Auto-incrementing ID (does not need to be process-safe because - # all futures are generated in the main process): - id: int = field(default_factory=count().__next__, init=False) - _state: FutureState = FutureState.PENDING - _ex: BaseException | None = None - _result: Any | None = None - - def __eq__(self, other: object) -> bool: - if not isinstance(other, self.__class__): - return False - return id(self.id) == id(other.id) - - def __hash__(self) -> int: - return hash(self.id) - - @property - def done(self) -> bool: - return self._state in {FutureState.FINISHED, FutureState.CANCELLED} - - @property - def cancelled(self) -> bool: - return self._state == FutureState.CANCELLED - - def set_result(self, result: Any): - if self.done: - raise FutureStateError(f'Attempted to set a result on a {self._state} future.') - self._result = result - self._state = FutureState.FINISHED - - def set_exception(self, ex: BaseException): - if self.done: - raise FutureStateError(f'Attempted to set an exception on a {self._state} future.') - self._ex = ex - self._state = FutureState.FINISHED - - def cancel(self): - self._state = FutureState.CANCELLED - - def result(self) -> Any: - if self._state != FutureState.FINISHED: - raise FutureStateError(f'Attempted to get result from a {self._state} future.') - if self._ex is not None: - raise self._ex - return self._result - - -def split_done_futures(futures: Sequence[Future]) -> tuple[list[Future], list[Future]]: - done_futures = [] - not_done_futures = [] - for future in futures: - if future.done: - done_futures.append(future) - else: - not_done_futures.append(future) - return (done_futures, not_done_futures) - - -def _subprocess_target(*, future_id: int, thunk: Callable[[], Any], result_queue: Queue) -> None: - try: - result = thunk() - except BaseException as ex: - result_queue.put((future_id, ex)) - else: - result_queue.put((future_id, result)) - - -class ProcessExecutor: - - def __init__(self, mp_context: BaseContext, max_workers: int | None): - self.mp_context = mp_context - self.max_workers = (os.cpu_count() or 1) if max_workers is None else max_workers - self._pending_future_to_thunk: dict[Future, Callable[[], Any]] = {} - self._running_id_to_future_and_process: dict[int, tuple[Future, multiprocessing.Process]] = {} - # Use a Manager().Queue() to be able to share with subprocesses - self._result_queue: Queue = multiprocessing.Manager().Queue(-1) - - def _start_processes(self): - """Start processes for the oldest pending futures to bring - running process count up to max_workers.""" - start_count = max(0, self.max_workers - len(self._running_id_to_future_and_process)) - futures_to_start = list(self._pending_future_to_thunk.keys())[:start_count] - for future in futures_to_start: - thunk = self._pending_future_to_thunk[future] - del self._pending_future_to_thunk[future] - process = self.mp_context.Process( - target=_subprocess_target, - kwargs=dict( - future_id=future.id, - thunk=thunk, - result_queue=self._result_queue, - ), - ) - self._running_id_to_future_and_process[future.id] = (future, process) - process.start() - - def submit(self, fn: Callable, /, *args, **kwargs) -> Future: - """Schedule the given fn to be called with the given *args and - **kwargs, and return a Future that will be updated with the - outcome of function call.""" - future = Future() - self._pending_future_to_thunk[future] = functools.partial(fn, *args, **kwargs) - self._start_processes() - return future - - def cancel(self) -> None: - """Cancel all pending futures.""" - pending_futures = list(self._pending_future_to_thunk.keys()) - for future in pending_futures: - future.cancel() - del self._pending_future_to_thunk[future] - - def stop(self) -> None: - """Cancel all running futures and immediately terminate their execution.""" - future_process_pairs = list(self._running_id_to_future_and_process.values()) - for future, process in future_process_pairs: - process.terminate() - future.cancel() - del self._running_id_to_future_and_process[future.id] - - def _consume_result_queue(self, *, timeout_seconds: float | None): - # Avoid race condition of a process finishing after we have - # consumed the result_queue by fetching process statuses - # before checking for process completion. - dead_process_futures = [ - future for future, process in self._running_id_to_future_and_process.values() - if not process.is_alive() - ] - - def _consume(): - inner_timeout_seconds = timeout_seconds - while True: - try: - future_id, result_or_ex = self._result_queue.get(True, timeout=inner_timeout_seconds) - except Empty: - break - - # Don't wait for the timeout on subsequent calls to - # self._result_queue.get() - inner_timeout_seconds = 0 - - future, _ = self._running_id_to_future_and_process[future_id] - del self._running_id_to_future_and_process[future_id] - if not future.done: - if isinstance(result_or_ex, BaseException): - future.set_exception(result_or_ex) - else: - future.set_result(result_or_ex) - - # Consume the result queue in a thread so that it is not - # interrupt by KeyboardInterrupt, which can result in us not - # fully processing a completed result. Despite the fact we are - # using subprocesses, starting a thread at this point should - # be safe because we will not start any subprocesses while - # this is running? - consumer_thread = Thread(target=_consume) - consumer_thread.start() - consumer_thread.join() - - # If any processes have died without the future being - # cancelled or finished, then set an exception for it. - for future in dead_process_futures: - if future.done: - continue - future.set_exception(TaskDiedError()) - del self._running_id_to_future_and_process[future.id] - - def wait(self, futures: Sequence[Future], *, timeout_seconds: float | None) -> tuple[list[Future], list[Future]]: - """Wait up to timeout_seconds or until at least one of the - given futures is done, then return a list of futures in a done - state and a list of futures in all other states.""" - self._consume_result_queue(timeout_seconds=timeout_seconds) - # Having consumed completed results, start new processes - self._start_processes() - return split_done_futures(futures) - - -class ProcessEvent: - pass +Future = ConcurrentFuture | ExecutorFuture +FutureT = TypeVar('FutureT', bound=Future, covariant=True) @dataclass(frozen=True) -class ProcessStartEvent(ProcessEvent): +class TaskStartEvent: task_name: str pid: int use_cache: bool @dataclass(frozen=True) -class ProcessEndEvent(ProcessEvent): +class TaskEndEvent: task_name: str class ProcessMonitor: - def __init__(self, *, process_event_queue: Queue): - self.process_event_queue = process_event_queue - self.active_process_events: dict[str, ProcessStartEvent] = {} + def __init__(self, *, task_event_queue: Queue): + self.task_event_queue = task_event_queue + self.active_task_events: dict[str, TaskStartEvent] = {} self.active_processes_and_children: dict[str, tuple[psutil.Process, dict[int, psutil.Process]]] = {} def _consume_monitor_queue(self): while True: try: - event = self.process_event_queue.get_nowait() + event = self.task_event_queue.get_nowait() except Empty: break - if isinstance(event, ProcessStartEvent): - self.active_process_events[event.task_name] = event - elif isinstance(event, ProcessEndEvent): - if event.task_name in self.active_process_events: - del self.active_process_events[event.task_name] + if isinstance(event, TaskStartEvent): + self.active_task_events[event.task_name] = event + elif isinstance(event, TaskEndEvent): + if event.task_name in self.active_task_events: + del self.active_task_events[event.task_name] if event.task_name in self.active_processes_and_children: del self.active_processes_and_children[event.task_name] else: - raise RunnerError(f'Unexpected process event: {event}') + raise RunnerError(f'Unexpected task event: {event}') - def _get_process_info(self, start_event: ProcessStartEvent) -> TaskMonitorInfo | None: + def _get_process_info(self, start_event: TaskStartEvent) -> TaskMonitorInfo | None: pid = start_event.pid try: if start_event.task_name not in self.active_processes_and_children: @@ -304,28 +94,70 @@ def _get_process_info(self, start_event: ProcessStartEvent) -> TaskMonitorInfo | def get_process_infos(self) -> list[TaskMonitorInfo]: self._consume_monitor_queue() process_infos: list[TaskMonitorInfo] = [] - for start_event in self.active_process_events.values(): + for start_event in self.active_task_events.values(): process_info = self._get_process_info(start_event) if process_info is not None: process_infos.append(process_info) return process_infos -class ProcessRunner(Runner, ABC): - """Base class for Runner's based on Python multiprocessing.""" +def _task_subprocess_func(*, task: Task, task_name: str, use_cache: bool, + results_map: ResultsMap, + filtered_context: LabContext, storage: Storage, + task_event_queue: Queue, log_queue: Queue) -> TaskResult: + signal.signal(signal.SIGINT, signal.SIG_IGN) + # Subprocesses should log onto the queue in order to printed + # in serial by the main process. + logger.handlers = [] + logger.addHandler(QueueHandler(log_queue)) + orig_stdout = sys.stdout + orig_stderr = sys.stderr + # Ignore type errors for type of value used to override stdout and stderr + sys.stdout = LoggerFileProxy(logger.info, 'Captured STDOUT:\n') # type: ignore[assignment] + sys.stderr = LoggerFileProxy(logger.error, 'Captured STDERR:\n') # type: ignore[assignment] - def __init__(self, *, context: LabContext, storage: Storage, max_workers: int | None): - mp_context = self._get_mp_context() - self.process_event_queue = mp_context.Manager().Queue(-1) - self.process_monitor = ProcessMonitor(process_event_queue = self.process_event_queue) + try: + current_process = multiprocessing.current_process() + task_event_queue.put(TaskStartEvent( + task_name=task_name, + pid=cast('int', current_process.pid), + use_cache=use_cache, + )) + + for dependency_task in get_direct_dependencies(task, all_identities=True): + dependency_task._set_results_map(results_map) + + orig_process_name = current_process.name + try: + current_process.name = task_name + return run_or_load_task( + task=task, + use_cache=use_cache, + filtered_context=filtered_context, + storage=storage + ) + finally: + current_process.name = orig_process_name + finally: + task_event_queue.put(TaskEndEvent( + task_name=task_name, + )) + sys.stdout.flush() + sys.stderr.flush() + sys.stdout = orig_stdout + sys.stderr = orig_stderr + + +class ProcessRunner(Runner, Generic[FutureT], ABC): + """Runner based on Python multiprocessing.""" + + def __init__(self) -> None: self.log_queue = multiprocessing.Manager().Queue(-1) - self.executor = ProcessExecutor( - mp_context=mp_context, - max_workers=max_workers, - ) + self.task_event_queue = multiprocessing.Manager().Queue(-1) + self.process_monitor = ProcessMonitor(task_event_queue = self.task_event_queue) self.results_map: dict[Task, TaskResult] = {} - self.future_to_task: dict[Future, Task] = {} + self.future_to_task: dict[FutureT, Task] = {} def _consume_log_queue(self): # See: https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes @@ -337,70 +169,23 @@ def _consume_log_queue(self): logger = logging.getLogger(record.name) logger.handle(record) - @staticmethod - def _subprocess_func(*, task: Task, task_name: str, use_cache: bool, - results_map: ResultsMap, filtered_context: LabContext, - storage: Storage, process_event_queue: Queue, - log_queue: Queue) -> TaskResult: - signal.signal(signal.SIGINT, signal.SIG_IGN) - # Subprocesses should log onto the queue in order to printed - # in serial by the main process. - logger.handlers = [] - logger.addHandler(QueueHandler(log_queue)) - orig_stdout = sys.stdout - orig_stderr = sys.stderr - # Ignore type errors for type of value used to override stdout and stderr - sys.stdout = LoggerFileProxy(logger.info, 'Captured STDOUT:\n') # type: ignore[assignment] - sys.stderr = LoggerFileProxy(logger.error, 'Captured STDERR:\n') # type: ignore[assignment] - - try: - current_process = multiprocessing.current_process() - process_event_queue.put(ProcessStartEvent( - task_name=task_name, - pid=cast('int', current_process.pid), - use_cache=use_cache, - )) - - for dependency_task in get_direct_dependencies(task, all_identities=True): - dependency_task._set_results_map(results_map) - - orig_process_name = current_process.name - try: - current_process.name = task_name - return run_or_load_task( - task=task, - use_cache=use_cache, - filtered_context=filtered_context, - storage=storage - ) - finally: - current_process.name = orig_process_name - finally: - process_event_queue.put(ProcessEndEvent( - task_name=task_name, - )) - sys.stdout.flush() - sys.stderr.flush() - sys.stdout = orig_stdout - sys.stderr = orig_stderr - def submit_task(self, task: Task, task_name: str, use_cache: bool) -> None: - future = self._submit_task( - executor=self.executor, + future = self._schedule_subprocess( task=task, task_name=task_name, use_cache=use_cache, - process_event_queue=self.process_event_queue, - log_queue=self.log_queue, ) self.future_to_task[future] = task def wait(self, *, timeout_seconds: float | None) -> Iterator[tuple[Task, ResultMeta | BaseException]]: self._consume_log_queue() - done, _ = self.executor.wait(list(self.future_to_task.keys()), timeout_seconds=timeout_seconds) + done = self._get_completed_futures( + futures=list(self.future_to_task.keys()), + timeout_seconds=timeout_seconds, + ) for future in done: task = self.future_to_task[future] - if future.cancelled: + if future.cancelled(): continue try: task_result = future.result() @@ -415,18 +200,16 @@ def wait(self, *, timeout_seconds: float | None) -> Iterator[tuple[Task, ResultM if future not in done } - def cancel(self) -> None: - self.executor.cancel() - - def stop(self) -> None: - self.executor.stop() - def close(self) -> None: self._consume_log_queue() + self._close_executor() def pending_task_count(self) -> int: return len(self.future_to_task) + def get_task_infos(self) -> list[TaskMonitorInfo]: + return self.process_monitor.get_process_infos() + def get_result(self, task: Task) -> TaskResult: return self.results_map[task] @@ -437,46 +220,100 @@ def remove_results(self, tasks: Sequence[Task]) -> None: logger.debug(f"Removing result from in-memory cache for task: '{task}'") del self.results_map[task] - def get_task_infos(self) -> list[TaskMonitorInfo]: - return self.process_monitor.get_process_infos() + @abstractmethod + def _schedule_subprocess(self, *, task: Task, task_name: str, use_cache: bool) -> FutureT: + """Should submit the execution of _task_subprocess_func() + for the given task in a subprocess and return the resulting Future. + + The implementation of this method to load or otherwise prepare + context or dependency results for the task. + + """ @abstractmethod - def _get_mp_context(self) -> BaseContext: - """Return a multiprocessing context from which to start subprocesses.""" + def _get_completed_futures(self, futures: list[FutureT], timeout_seconds: float | None) -> list[FutureT]: + """Return a sub-sequence of the given futures that have been completed.""" @abstractmethod - def _submit_task(self, executor: ProcessExecutor, task: Task, task_name: str, - use_cache: bool, process_event_queue: Queue, log_queue: Queue) -> Future: - """Should submit the execution of self._subprocess_func() on the given - task to the given executor and return the resulting Future. + def cancel(self) -> None: + pass - Sub-classes can use the implementation of this method to load - or otherwise prepare context or dependency results for the task. + @abstractmethod + def stop(self) -> None: + pass - """ + @abstractmethod + def _close_executor(self) -> None: + """Stop all currently running subprocesses.""" + + +def _spawn_start_method_check() -> None: + if 'spawn' not in get_supported_start_methods(): + raise RunnerError( + ("The 'spawn' start method for processes is not supported by your operating system. " + "Please specify a system-compatible runner_backend.") + ) + + +def _spawn_interactive_main_check(cls: type, task: Task) -> None: + if is_interactive() and task.__class__.__module__ == '__main__': + raise RunnerError( + (f'Unable to submit {task.__class__.__qualname__} tasks to ' + f'{cls.__qualname__} because the task type is defined in the ' + '__main__ module from an interactive Python session. ' + 'Please define your task types in a separate `.py` Python ' + 'module file. For details, see: ' + 'https://ben-denham.github.io/labtech/cookbook/#spawn-interactive-main') + ) + + +def _fork_start_method_check() -> None: + if 'fork' not in get_supported_start_methods(): + raise RunnerError( + ("The 'fork' start method for processes is not supported by your operating system. " + "Try switching to runner_backend='spawn' or specify another system-compatible runner_backend.") + ) + + +# === Subprocess Pools === + +class PoolProcessRunner(ProcessRunner[ConcurrentFuture], ABC): + + def __init__(self, *, mp_context: BaseContext, max_workers: int | None) -> None: + super().__init__() + self.concurrent_executor = ProcessPoolExecutor( + mp_context=mp_context, + max_workers=max_workers, + ) + + def _get_completed_futures(self, futures: list[ConcurrentFuture], timeout_seconds: float | None) -> list[ConcurrentFuture]: + done, _ = wait_futures(futures, timeout=timeout_seconds, return_when=FIRST_COMPLETED) + return list(done) + + def cancel(self) -> None: + self.concurrent_executor.shutdown(wait=True, cancel_futures=True) + + def stop(self) -> None: + self.concurrent_executor.shutdown(wait=True, cancel_futures=True) + for process in self.concurrent_executor._processes.values(): + process.terminate() + + def _close_executor(self) -> None: + self.concurrent_executor.shutdown(wait=True) -class SpawnProcessRunner(ProcessRunner): +class SpawnPoolProcessRunner(PoolProcessRunner): - def __init__(self, *, context: LabContext, storage: Storage, max_workers: int | None): - super().__init__(context=context, storage=storage, max_workers=max_workers) + def __init__(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> None: + super().__init__( + mp_context=multiprocessing.get_context('spawn'), + max_workers=max_workers, + ) self.context = context self.storage = storage - def _get_mp_context(self) -> SpawnContext: - return multiprocessing.get_context('spawn') - - def _submit_task(self, executor: ProcessExecutor, task: Task, task_name: str, - use_cache: bool, process_event_queue: Queue, log_queue: Queue) -> Future: - if is_interactive() and task.__class__.__module__ == '__main__': - raise RunnerError( - (f'Unable to submit {task.__class__.__qualname__} tasks to ' - 'SpawnProcessRunner because the task type is defined in the ' - '__main__ module from an interactive Python session. ' - 'Please define your task types in a separate `.py` Python ' - 'module file. For details, see: ' - 'https://ben-denham.github.io/labtech/cookbook/#spawn-interactive-main') - ) + def _schedule_subprocess(self, *, task: Task, task_name: str, use_cache: bool) -> ConcurrentFuture: + _spawn_interactive_main_check(self.__class__, task) filtered_context: LabContext = {} results_map: dict[Task, TaskResult] = {} @@ -490,121 +327,231 @@ def _submit_task(self, executor: ProcessExecutor, task: Task, task_name: str, dependency_task: self.results_map[dependency_task] for dependency_task in get_direct_dependencies(task, all_identities=False) } - return executor.submit( - self._subprocess_func, + + return self.concurrent_executor.submit( + _task_subprocess_func, task=task, task_name=task_name, use_cache=use_cache, results_map=results_map, filtered_context=filtered_context, storage=self.storage, - process_event_queue=process_event_queue, - log_queue=log_queue, + task_event_queue=self.task_event_queue, + log_queue=self.log_queue, ) -class SpawnRunnerBackend(RunnerBackend): +class SpawnPoolRunnerBackend(RunnerBackend): """ - Runner Backend that runs each task in a spawned subprocess. + Runner Backend that runs tasks on a pool of spawned subprocesses. The required context and dependency task results are copied/duplicated into the memory of each subprocess. """ - def build_runner(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> SpawnProcessRunner: - if 'spawn' not in get_supported_start_methods(): - raise RunnerError( - ("The 'spawn' start method for processes is not supported by your operating system. " - "Please specify a system-compatible runner_backend.") - ) + def build_runner(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> SpawnPoolProcessRunner: + _spawn_start_method_check() + return SpawnPoolProcessRunner( + context=context, + storage=storage, + max_workers=max_workers, + ) + + +@dataclass +class PoolRunnerMemory: + context: LabContext + storage: Storage + + +_RUNNER_FORK_POOL_MEMORY: dict[UUID, PoolRunnerMemory] = {} + + +class ForkPoolProcessRunner(PoolProcessRunner): + + def __init__(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> None: + super().__init__( + mp_context=multiprocessing.get_context('fork'), + max_workers=max_workers, + ) + self.uuid = uuid4() + _RUNNER_FORK_POOL_MEMORY[self.uuid] = PoolRunnerMemory( + context=context, + storage=storage, + ) + + @staticmethod + def _fork_task_subprocess_func(*, task: Task, task_name: str, use_cache: bool, + results_map: ResultsMap, task_event_queue: Queue, + log_queue: Queue, uuid: UUID) -> TaskResult: + runner_memory = _RUNNER_FORK_POOL_MEMORY[uuid] + return _task_subprocess_func( + task=task, + task_name=task_name, + use_cache=use_cache, + filtered_context=task.filter_context(runner_memory.context), + storage=runner_memory.storage, + results_map=results_map, + task_event_queue=task_event_queue, + log_queue=log_queue, + ) + + def _schedule_subprocess(self, *, task: Task, task_name: str, use_cache: bool) -> ConcurrentFuture: + results_map: dict[Task, TaskResult] = {} + if not use_cache: + # In order to minimise memory use, only transfer results + # to the subprocess if we are going to run the task (and + # not just load its result from cache). + results_map = { + dependency_task: self.results_map[dependency_task] + for dependency_task in get_direct_dependencies(task, all_identities=False) + } + return self.concurrent_executor.submit( + self._fork_task_subprocess_func, + task=task, + task_name=task_name, + use_cache=use_cache, + results_map=results_map, + task_event_queue=self.task_event_queue, + log_queue=self.log_queue, + uuid=self.uuid, + ) - return SpawnProcessRunner( + def _close_executor(self) -> None: + super()._close_executor() + try: + del _RUNNER_FORK_POOL_MEMORY[self.uuid] + except KeyError: + # uuid not may be found if _close_executor() is called twice. + pass + + +class ForkPoolRunnerBackend(RunnerBackend): + """Runner Backend that runs tasks on a pool of forked subprocesses. + + The context is shared in-memory between each subprocess. Dependency task + results are copied/duplicated into the memory of each subprocess. + + Because process forking is more efficient than spawning, and + because of the added benefit of not duplicating the context for + each task, this runner backend is recommended for any system that + supports process forking. + + """ + + def build_runner(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> ForkPoolProcessRunner: + _fork_start_method_check() + return ForkPoolProcessRunner( context=context, storage=storage, max_workers=max_workers, ) +# === Subprocesses Per-Task === + +class PerTaskProcessRunner(ProcessRunner[ExecutorFuture], ABC): + + def __init__(self, *, mp_context: BaseContext, max_workers: int | None) -> None: + super().__init__() + self.executor = ProcessExecutor( + mp_context=mp_context, + max_workers=max_workers, + ) + + def _get_completed_futures(self, futures: list[ExecutorFuture], timeout_seconds: float | None) -> list[ExecutorFuture]: + done, _ = self.executor.wait(futures, timeout_seconds=timeout_seconds) + return done + + def cancel(self) -> None: + self.executor.cancel() + + def stop(self) -> None: + self.executor.stop() + + def _close_executor(self) -> None: + pass + + @dataclass -class RunnerMemory: +class PerTaskRunnerMemory: context: LabContext storage: Storage results_map: ResultsMap -_RUNNER_FORK_MEMORY: dict[UUID, RunnerMemory] = {} +_RUNNER_FORK_PER_TASK_MEMORY: dict[UUID, PerTaskRunnerMemory] = {} -class ForkProcessRunner(ProcessRunner): +class ForkPerTaskProcessRunner(PerTaskProcessRunner): - def __init__(self, *, context: LabContext, storage: Storage, max_workers: int | None): - super().__init__(context=context, storage=storage, max_workers=max_workers) + def __init__(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> None: + super().__init__( + mp_context=multiprocessing.get_context('fork'), + max_workers=max_workers, + ) self.uuid = uuid4() - _RUNNER_FORK_MEMORY[self.uuid] = RunnerMemory( + _RUNNER_FORK_PER_TASK_MEMORY[self.uuid] = PerTaskRunnerMemory( context=context, storage=storage, results_map=self.results_map, ) - def _get_mp_context(self) -> ForkContext: - return multiprocessing.get_context('fork') - @staticmethod - def _fork_subprocess_func(*, _subprocess_func: Callable, task: Task, task_name: str, - use_cache: bool, process_event_queue: Queue, log_queue: Queue, - uuid: UUID) -> TaskResult: - runner_memory = _RUNNER_FORK_MEMORY[uuid] - return _subprocess_func( + def _fork_task_subprocess_func(*, task: Task, task_name: str, use_cache: bool, + task_event_queue: Queue, log_queue: Queue, + uuid: UUID) -> TaskResult: + runner_memory = _RUNNER_FORK_PER_TASK_MEMORY[uuid] + return _task_subprocess_func( task=task, task_name=task_name, use_cache=use_cache, filtered_context=task.filter_context(runner_memory.context), storage=runner_memory.storage, results_map=runner_memory.results_map, - process_event_queue=process_event_queue, + task_event_queue=task_event_queue, log_queue=log_queue, ) - def _submit_task(self, executor: ProcessExecutor, task: Task, task_name: str, - use_cache: bool, process_event_queue: Queue, log_queue: Queue) -> Future: - return executor.submit( - self._fork_subprocess_func, - _subprocess_func=self._subprocess_func, + def _schedule_subprocess(self, *, task: Task, task_name: str, use_cache: bool) -> ExecutorFuture: + return self.executor.submit( + self._fork_task_subprocess_func, task=task, task_name=task_name, use_cache=use_cache, - process_event_queue=process_event_queue, - log_queue=log_queue, + task_event_queue=self.task_event_queue, + log_queue=self.log_queue, uuid=self.uuid, ) - def close(self) -> None: - super().close() + def _close_executor(self) -> None: + super()._close_executor() try: - del _RUNNER_FORK_MEMORY[self.uuid] + del _RUNNER_FORK_PER_TASK_MEMORY[self.uuid] except KeyError: - # uuid not may be found if close() is called twice. + # uuid not may be found if _close_executor() is called twice. pass -class ForkRunnerBackend(RunnerBackend): - """ - Runner Backend that runs each task in a forked subprocess. +class ForkPerTaskRunnerBackend(RunnerBackend): + """Runner Backend that runs each task in a separate forked + subprocess. The context and dependency task results are shared in-memory - between each subprocess. + between each subprocess but at the cost of forking a new + subprocess for each task. - """ + This runner backend is best used when dependency task results are + large (so time will be saved through memory sharing) compared to + the overall number of tasks (for large numbers of tasks, forking a + separate process for each may be a substantial overhead). - def build_runner(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> ForkProcessRunner: - if 'fork' not in get_supported_start_methods(): - raise RunnerError( - ("The 'fork' start method for processes is not supported by your operating system. " - "Try switching to runner_backend='spawn' or specify another system-compatible runner_backend.") - ) + """ - return ForkProcessRunner( + def build_runner(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> ForkPerTaskProcessRunner: + _fork_start_method_check() + return ForkPerTaskProcessRunner( context=context, storage=storage, max_workers=max_workers, diff --git a/tests/integration/test_e2e.py b/tests/integration/test_e2e.py index 24a972a..613bad7 100644 --- a/tests/integration/test_e2e.py +++ b/tests/integration/test_e2e.py @@ -181,13 +181,13 @@ def evaluations(context: dict[str, Any]) -> dict[str, Evaluation]: class TestE2E: @pytest.mark.parametrize('max_workers', [1, 4, None]) - @pytest.mark.parametrize('runner_backend', ['serial', 'fork', 'spawn', 'thread']) + @pytest.mark.parametrize('runner_backend', ['serial', 'fork', 'fork-per-task', 'spawn', 'thread']) @pytest.mark.parametrize('evaluation_key', active_evaluation_keys) def test_e2e(self, max_workers: int, runner_backend: str, evaluation_key: str, context: dict[str, Any], evaluations: dict[str, Evaluation]) -> None: evaluation = evaluations[evaluation_key] # macOS and Windows don't support fork, so test graceful failure: - if runner_backend == 'fork' and platform.system() in {'Darwin', 'Windows'}: + if runner_backend in {'fork', 'fork-per-task'} and platform.system() in {'Darwin', 'Windows'}: lab = labtech.Lab( storage=None, context=context,