-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhelpers.py
More file actions
227 lines (182 loc) · 6.55 KB
/
helpers.py
File metadata and controls
227 lines (182 loc) · 6.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
import multiprocessing
import os
import sys
import traceback
from concurrent.futures import ProcessPoolExecutor, as_completed
from multiprocessing import Manager, Process
from pathlib import Path
import psutil
class PredictionProcessingError(Exception):
def __init__(
self,
predictions,
message="One or more errors occurred during prediction processing.",
):
self.predictions = predictions
super().__init__(message)
def display_processing_report(succeeded, canceled, failed):
print("PROCESSING REPORT")
total = len(succeeded) + len(canceled) + len(failed)
print(f"Succeeded ({len(succeeded)}/{total}):")
for s in succeeded or "-":
print(f"\t{s}")
print(f"Failed ({len(failed)}/{total}):")
for f in failed or "-":
print(f"\t{f}")
print(f"Canceled ({len(canceled)}/{total}):")
for c in canceled or "-":
print(f"\t{c}")
print("")
def get_max_workers():
"""
Returns the maximum number of concurrent workers
The optimal number of workers ultimately depends on how many resources
each process will call upon.
To limit this, update the Dockerfile GRAND_CHALLENGE_MAX_WORKERS
"""
environ_cpu_limit = os.getenv("GRAND_CHALLENGE_MAX_WORKERS")
cpu_count = multiprocessing.cpu_count()
return min(
[
int(environ_cpu_limit or cpu_count),
cpu_count,
]
)
def run_prediction_processing(*, fn, predictions):
"""
Processes predictions in a separate process.
This takes child processes into account:
- if any child process is terminated, all prediction processing will abort
- after prediction processing is done, all child processes are terminated
Note that the results are returned in completing order.
Parameters
----------
fn : function
Function to execute that will process each prediction
predictions : list
List of predictions.
Returns
-------
A list of results
"""
with Manager() as manager:
results = manager.dict()
errors = manager.dict()
pool_worker = _start_pool_worker(
fn=fn,
predictions=predictions,
max_workers=get_max_workers(),
results=results,
errors=errors,
)
try:
pool_worker.join()
finally:
pool_worker.terminate()
failed = set(errors.keys())
succeeded = set(results.keys())
canceled = set(p["pk"] for p in predictions) - (failed | succeeded)
display_processing_report(succeeded, canceled, failed)
if errors:
for prediction_pk, tb_str in errors.items():
print(
f"Error in prediction: {prediction_pk}\n{tb_str}",
file=sys.stderr,
)
raise PredictionProcessingError(errors.keys())
return list(results.values())
def _start_pool_worker(fn, predictions, max_workers, results, errors):
process = Process(
target=_pool_worker,
name="PredictionProcessing",
kwargs=dict(
fn=fn,
predictions=predictions,
max_workers=max_workers,
results=results,
errors=errors,
),
)
process.start()
return process
def _pool_worker(*, fn, predictions, max_workers, results, errors):
executor = ProcessPoolExecutor(max_workers=max_workers)
try:
# Submit the processing tasks of the predictions
future_to_predictions = {}
for p in predictions:
future = executor.submit(fn, p)
future_to_predictions[future] = p
for future in as_completed(future_to_predictions):
try:
result = future.result()
except Exception:
break
else:
prediction = future_to_predictions[future]
prediction_pk = prediction["pk"]
results[prediction_pk] = result
finally:
executor.shutdown(
wait=False, # Do not wait for any resources to free themselves
cancel_futures=True,
)
_collect_errors(future_to_predictions, errors)
# Aggressively terminate any child processes
_terminate_child_processes()
def _collect_errors(future_to_predictions, errors):
# Collect any failures that occurred during processing
# Workaround for https://github.com/python/cpython/issues/136655
# Which prevents us relying solely on as_completed to catch exceptions
def failed_futures():
for f, p in future_to_predictions.items():
if f.done():
exc = f.exception()
if exc is not None:
yield p, exc
for prediction, exc in failed_futures():
tb_exception = traceback.TracebackException.from_exception(exc)
# Cannot pickle a stack trace, so we render it here
formatted_tb = "".join(tb_exception.format())
prediction_pk = prediction["pk"]
errors[prediction_pk] = formatted_tb
def _terminate_child_processes():
process = psutil.Process(os.getpid())
children = process.children(recursive=True)
for child in children:
try:
child.terminate()
except psutil.NoSuchProcess:
pass # Not a problem
# Wait for processes to terminate
_, still_alive = psutil.wait_procs(children, timeout=5)
# Forcefully kill any remaining processes
for p in still_alive:
try:
p.kill()
except psutil.NoSuchProcess:
pass # That is fine
# Finally, prevent zombies by waiting for all child processes
try:
os.waitpid(-1, 0)
except ChildProcessError:
pass # No child processes, that if fine
def tree(dir_path: Path, prefix: str = ""):
"""A recursive generator, given a directory Path object
will yield a visual tree structure line by line
with each line prefixed by the same characters
"""
space = " "
branch = "│ "
# pointers:
tee = "├── "
last = "└── "
contents = list(dir_path.iterdir())
# contents each get pointers that are ├── with a final └── :
pointers = [tee] * (len(contents) - 1) + [last]
for pointer, path in zip(pointers, contents, strict=True):
yield prefix + pointer + path.name
if path.is_dir(): # extend the prefix and recurse:
extension = branch if pointer == tee else space
# i.e. space because last, └── , above so no more |
yield from tree(path, prefix=prefix + extension)