-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscore.py
More file actions
344 lines (284 loc) · 12 KB
/
score.py
File metadata and controls
344 lines (284 loc) · 12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
import glob
import re
import os
import json
import subprocess
import logging
from mpi4py import MPI
import click
def setup_logging(logfile=None):
logger = logging.getLogger("score")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler() if logfile is None else logging.FileHandler(logfile)
formatter = logging.Formatter('[%(asctime)s] %(levelname)s: %(message)s')
handler.setFormatter(formatter)
logger.handlers = [] # Remove any default handlers
logger.addHandler(handler)
return logger
def parse_compare_output(filename):
"""Parses a diff output file and returns test results by suite and test name.
The function scans a text file containing output from a test suite diffing framework.
It identifies test sections by header lines in the format:
"***Diffing: <** suite : test_name **>"
and determines whether each test PASSED or FAILED based on the presence of diff output
lines between headers. Any non-blank, non-header line between two headers causes FAILED status.
If there are only blank lines or header lines between sections, the test is marked as PASSED.
Args:
filename (str): Path to the diff output file to be parsed.
Returns:
dict: A nested dictionary of results in the format:
{
suite1: {
test_name1: 'PASSED' or 'FAILED',
test_name2: 'PASSED' or 'FAILED',
...
},
suite2: {
...
},
...
}
"""
header_pattern = re.compile(r"\*\*\*Diffing: <\*\* (\w+) : ([\w\-]+) \*\*>")
results = {}
current_suite = None
current_test = None
test_lines = []
with open(filename, "r") as infile:
lines = infile.readlines()
for line in lines:
header_match = header_pattern.match(line)
if header_match:
# Process previous test if present
if current_suite is not None and current_test is not None:
# If test_lines contains ONLY blank lines or lines that start with "***Diffing", it's PASSED
has_diff = any(l.strip() and not l.startswith('***Diffing') for l in test_lines)
status = "FAILED" if has_diff else "PASSED"
results.setdefault(current_suite, {})[current_test] = status
# Start new test
current_suite = header_match.group(1)
current_test = header_match.group(2)
test_lines = []
else:
# Collect lines between headers
if current_suite and current_test:
test_lines.append(line)
# Handle last test section
if current_suite is not None and current_test is not None:
has_diff = any(l.strip() and not l.startswith('***Diffing') for l in test_lines)
status = "FAILED" if has_diff else "PASSED"
results.setdefault(current_suite, {})[current_test] = status
return results
def grep_output_file(path, pattern, status, negate):
grep_status = None
try:
if negate:
result = subprocess.run(["grep", "-L", pattern, path], stdout=subprocess.PIPE, text=True)
if result.stdout.strip():
grep_status = status
else:
result = subprocess.run(["grep", pattern, path], stdout=subprocess.PIPE, text=True)
if result.returncode == 0:
grep_status = status
except Exception:
grep_status = "ERROR (grep subprocess failed)"
return grep_status
def loop_grep(build_name, test_name):
path = f"install-{build_name}/test/output/{test_name}.out"
if not os.path.exists(path):
return "OUTPUT FILE NOT FOUND"
# pattern order matters for correct results
patterns = [
# bad patterns first
("TESTCASE RESULT: FAIL", "FAILED", False),
("ABNORMAL TERMINATION", "ABNORMAL TERMINATION", False),
("TERMINATION", "NO TERMINATION", True), # -L, i.e. NOT found
# good patterns next
("TESTCASE RESULT: PASS", "PASSED", False),
# ("Test Ok", "PASSED", False),
# meh patterns
("TESTCASE RESULT: SKIP", "SKIPPED", False),
("test not performed", "SKIPPED", False)
]
for pattern, status, negate in patterns:
grep_status = grep_output_file(path, pattern, status, negate)
if grep_status:
return grep_status
return None
def extract_elapsed_time(build_name, test_name):
path = f"install-{build_name}/test/output/{test_name}.out"
if not os.path.exists(path):
return None
try:
result = subprocess.run(["grep", "ELAPSED TIME:", path], stdout=subprocess.PIPE, text=True)
match = re.search(r"ELAPSED TIME:\s+([\d\.]+)\s+SECONDS", result.stdout)
if match:
return float(match.group(1))
except Exception:
pass
return None
def get_all_build_names():
rpt_files = glob.glob('install-*/test/output.rpt')
build_names = []
for rpt in rpt_files:
m = re.match(r'install-(.*)/test/output\.rpt', rpt)
if m:
build_names.append(m.group(1))
return build_names
# def analyze_diff(diff_buffer, ignore):
# """
# Removes lines from diff_buffer that match any pattern in ignore.
# Returns "BAD DIFF" if lines remain, else "PASSED".
# """
# filtered = []
#
# for line in diff_buffer:
# # If any ignore pattern matches, exclude this line
# if any(re.search(pattern, line) for pattern in ignore):
# continue
# filtered.append(line)
#
# return "BAD DIFF" if filtered else "PASSED"
def normalize_status(status):
if status.startswith("PASSED"):
return "PASSED"
if status.startswith("SKIPPED"):
return "SKIPPED"
if status.startswith("FAILED"):
return "FAILED"
if status.startswith("BAD DIFF"):
return "FAILED"
if status.startswith("ABNORMAL TERMINATION"):
return "ERROR"
if status.startswith("NO TERMINATION"):
return "ERROR"
if status.startswith("OUTPUT FILE NOT FOUND"):
return "ERROR"
return "ERROR"
def score_test(build_name, compare_results, current_test, diff_buf):
suite = current_test['suite']
test_name = current_test['test_name']
output_status = loop_grep(build_name, test_name)
if (not output_status) and diff_buf:
# output_status = analyze_diff(diff_buf, ignore)
output_status = compare_results[suite][test_name]
if output_status != 'PASSED':
output_status = 'BAD DIFF'
elapsed_time = extract_elapsed_time(build_name, test_name)
status = normalize_status(output_status)
return {
'build_name': build_name,
'suite': suite,
'test_name': test_name,
'timestamp': current_test['timestamp'],
'status': status,
'output_status': output_status,
'elapsed_time': elapsed_time,
}
@click.command()
@click.argument('build_names', nargs=-1)
@click.option('-o', '--output', type=click.Path(writable=True), help='File to write JSON output.')
@click.option('-p', '--prefix', type=click.Path(writable=True), help='File name prefix to write JSON output to each build test dir.')
@click.option('--log', type=click.Path(writable=True), help='File to write log output. If not given, logs go to stdout.')
def score(build_names, output, prefix, log):
"""
score.py: Process build test logs using MPI and output JSON summary.
Optionally specify build names as space-delimited arguments.
If none are given, all builds found under install-*/test/output.rpt will be processed.
-o/--output: Output JSON to file.
-p/--prefix: Output JSON to file name prefix in each build test dir.
--log: Write log messages to file instead of stdout.
"""
logger = setup_logging(log if (output or prefix) else None) # Log to file if --log used, else to stdout if --output or --prefix
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
if output and prefix:
prefix = False
if log:
# Only rank 0 will write output/log,
# but all ranks log their own progress if desired
logger.info(f"Rank {rank}: Starting score.py with out file '{output or prefix or 'stdout'}' and log '{log or 'stdout'}'.")
# If no build_names provided, discover all
if not build_names:
build_names = get_all_build_names()
if log:
logger.info(f"Rank {rank}: No build names given, found {len(build_names)} builds to process.")
else:
build_names = sorted(set(build_names))
if log:
logger.info(f"Rank {rank}: Build names given: {build_names}")
# Filter for actually existing builds
build_names_actual = []
for bn in build_names:
rpt_file = f"install-{bn}/test/output.rpt"
if os.path.exists(rpt_file):
build_names_actual.append(bn)
elif log:
logger.warning(f"Rank {rank}: Build '{bn}' has no output.rpt, skipping.")
# Partition builds to processes
my_builds = build_names_actual[rank::size]
if log:
logger.info(f"Rank {rank}: Assigned builds: {my_builds}")
header_regex = re.compile(r'<\*\*\s*(\w+)\s*:\s*([\w\d_]+)\s*\*\*>\s*(.*)')
results = []
for build_name in my_builds:
compare_file = f"install-{build_name}/test/compare.out"
if log:
logger.info(f"Rank {rank}: Parsing '{compare_file}'")
compare_results = parse_compare_output(compare_file)
rpt_file = f"install-{build_name}/test/output.rpt"
if log:
logger.info(f"Rank {rank}: Parsing '{rpt_file}'")
if not os.path.exists(rpt_file):
continue
with open(rpt_file, 'r') as f:
lines = f.readlines()
current_test = {}
diff_buf = []
for line in lines:
header_match = header_regex.match(line)
if header_match:
if current_test:
test_result = score_test(build_name, compare_results,
current_test, diff_buf)
results.append(test_result)
diff_buf = []
if log:
logger.info(f"Rank {rank}: Finished test '{current_test['test_name']}' in suite '{current_test['suite']}' with status '{test_result['status']}'.")
current_test = {
'suite': header_match.group(1),
'test_name': header_match.group(2),
'timestamp': header_match.group(3).strip(),
}
else:
diff_buf.append(line)
if current_test:
test_result = score_test(build_name, compare_results,
current_test, diff_buf)
results.append(test_result)
if log:
logger.info(f"Rank {rank}: Finished test '{current_test['test_name']}' in suite '{current_test['suite']}' with status '{test_result['status']}'.")
if log:
logger.info(f"Rank {rank}: Finished scoring build '{bn}'.")
if prefix:
out_file = f"install-{build_name}/test/{prefix}.json"
if log:
logger.info(f"Rank {rank}: Writing scores to '{out_file}'.")
with open(out_file, "w") as outjson:
json.dump(results, outjson, indent=2)
if log:
logger.info(f"Rank {rank}: Finished writing scores to '{out_file}'.")
if not prefix:
all_results = comm.gather(results, root=0)
if (not prefix) and rank == 0:
final_results = [item for sublist in all_results for item in sublist]
final_results.sort(key=lambda test: (test['build_name'].lower(), test['suite'].lower(), test['test_name'].lower()))
if log:
logger.info(f"Rank 0: Writing JSON output to '{output}'")
with open(output, "w") as outjson:
json.dump(final_results, outjson, indent=2)
else:
print(json.dumps(final_results, indent=2))
if __name__ == "__main__":
score()