From 5b87da6f56c4f636c0c55a27cad29bf7a3fef8cf Mon Sep 17 00:00:00 2001 From: Cole Kendrick Date: Thu, 12 May 2022 14:49:05 -0400 Subject: [PATCH 1/3] Add initial buffered version of compile function --- acc/run_script.py | 136 ++++++++++++++++++++++++++++++++++++++- tests/test_run_script.py | 11 +++- 2 files changed, 144 insertions(+), 3 deletions(-) diff --git a/acc/run_script.py b/acc/run_script.py index c44b771d..4c466195 100644 --- a/acc/run_script.py +++ b/acc/run_script.py @@ -8,7 +8,8 @@ def run(script, workdir, ncount, overwrite_datafiles=True, - ntotalthreads=None, threads_per_block=None, + ntotalthreads=int(1e6), threads_per_block=512, + use_buffer=False, **kwds): """run a mcvine.acc simulation script on one node. The script must define the instrument. @@ -24,7 +25,13 @@ def run(script, workdir, ncount, os.makedirs(workdir) curdir = os.path.abspath(os.curdir) compiled_script_path = os.path.join(workdir, 'compiled_mcvine_acc_instrument.py') - compiled_script = compile(script, compiled_script=compiled_script_path, **kwds) + if use_buffer: + compiled_script = compile_buffered(script, + compiled_script=compiled_script_path, + **kwds) + else: + compiled_script = compile(script, compiled_script=compiled_script_path, + **kwds) m = imp.load_source('mcvinesim', compiled_script) os.chdir(workdir) try: @@ -79,6 +86,48 @@ def compile(script, compiled_script=None, **kwds): stream.write(text) return compiled_script + +def compile_buffered(script, compiled_script=None, **kwds): + script = os.path.abspath(script) + instrument = loadInstrument(script, **kwds) + comps = instrument.components + modules = [] + body = [] + for i, comp in enumerate(comps): + modules.append((comp.__module__, comp.__class__.__name__)) + prefix = ( + "rng_states, " + if isinstance(comp, StochasticComponentBase) + else "" + ) + if i>0: + body.append("transform_kernel[nblocks, tpb](neutrons_d, n_neutrons_per_thread, rotmats[{}], offsets[{}])".format(i-1, i-1)) + body.append("process_kernel{}[nblocks, tpb]({} neutrons_d, n_neutrons_per_thread, args[{}])".format(i, prefix, i)) + continue + module_imports = ['from {} import {} as comp{}'.format(m, c, i) for i, (m, c) in enumerate(modules)] + module_imports = '\n'.join(module_imports) + propagate_defs = ['process_kernel{} = comp{}.process_kernel'.format(i, i) for i in range(len(comps))] + propagate_defs = '\n'.join(propagate_defs) + args = [f'args{i}' for i in range(len(comps))] + args = ', '.join(args) + indent = 4*' ' + body = '\n'.join([indent+line for line in body]) + text = compiled_script_template_buffered.format( + script = script, + module_imports = module_imports, + propagate_definitions = propagate_defs, + args=args, instrument_body=body + ) + if compiled_script is None: + f, ext = os.path.splitext(script) + kwds_str = str(kwds) + uid = hashlib.sha224(kwds_str.encode("UTF-8")).hexdigest()[:8] + compiled_script = f + "_compiled_" + uid + ext + with open(compiled_script, 'wt') as stream: + stream.write(text) + return compiled_script + + def calcTransformations(instrument): """given a mcni.Instrument instance, calculate transformation matrices and offset vectors from one component to the next @@ -152,6 +201,89 @@ def run(ncount, ntotalthreads=None, threads_per_block=None, **kwds): saveMonitorOutputs(instrument, scale_factor=1.0/ncount) """ +compiled_script_template_buffered = """#!/usr/bin/env python + +script = {script!r} +from mcvine.acc.run_script import loadInstrument, calcTransformations, saveMonitorOutputs + +from numba import cuda +import numba as nb +import numpy as np +import math +from numba.cuda.random import create_xoroshiro128p_states +from mcvine.acc.neutron import abs2rel +from mcvine.acc.config import get_numba_floattype, get_numpy_floattype +NB_FLOAT = get_numba_floattype() + +{module_imports} + +{propagate_definitions} + + +@cuda.jit() +def transform_kernel(neutrons, n_neutrons_per_thread, rotmat, offset): + ''' + Kernel to adjust neutrons between two components + ''' + N = len(neutrons) + thread_index = cuda.grid(1) + start_index = thread_index*n_neutrons_per_thread + end_index = min(start_index+n_neutrons_per_thread, N) + r = cuda.local.array(3, dtype=NB_FLOAT) + v = cuda.local.array(3, dtype=NB_FLOAT) + for i in range(start_index, end_index): + neutron = neutrons[i] + abs2rel(neutron[:3], neutron[3:6], rotmat, offset, r, v) + + +def instrument_kernel(rng_states, N, n_neutrons_per_thread, nblocks, tpb, args): + ''' + Driver function to run all kernels needed for the instrument + ''' + + {args}, offsets, rotmats = args + + # Create neutron device buffer + neutrons = np.zeros((N, 10), dtype=np.float64) + neutrons_d = cuda.to_device(neutrons) + +{instrument_body} + + cuda.synchronize() + + #neutrons = neutrons_d.copy_to_host() + + +from mcvine.acc.components.sources.SourceBase import SourceBase +class InstrumentBase(SourceBase): + def __init__(self, instrument): + offsets, rotmats = calcTransformations(instrument) + self.propagate_params = tuple(c.propagate_params for c in instrument.components) + self.propagate_params += (offsets, rotmats) + return + def propagate(self): + pass +InstrumentBase.process_kernel_no_buffer = instrument_kernel + +def run(ncount, ntotalthreads=int(1e6), threads_per_block=512, **kwds): + instrument = loadInstrument(script, **kwds) + + ntotthreads = min(ncount, int(ntotalthreads)) + nblocks = math.ceil(ntotthreads / threads_per_block) + actual_nthreads = threads_per_block * nblocks + n_neutrons_per_thread = math.ceil(ncount / actual_nthreads) + print("%s blocks, %s threads, %s neutrons per thread" % (nblocks, threads_per_block, n_neutrons_per_thread)) + rng_states = create_xoroshiro128p_states(actual_nthreads, seed=1) + + base = InstrumentBase(instrument) + + instrument_kernel(rng_states, ncount, n_neutrons_per_thread, nblocks, threads_per_block, base.propagate_params) + cuda.synchronize() + + saveMonitorOutputs(instrument, scale_factor=1.0/ncount) +""" + + def saveMonitorOutputs(instrument, scale_factor=1.0): for comp in instrument.components: from .components.monitors.MonitorBase import MonitorBase diff --git a/tests/test_run_script.py b/tests/test_run_script.py index 0d8c6f35..93df400f 100755 --- a/tests/test_run_script.py +++ b/tests/test_run_script.py @@ -15,15 +15,24 @@ def test_compile(): run_script.compile(script) return +@pytest.mark.skipif(not test.USE_CUDA, reason='No CUDA') +def test_compile_buffered(): + run_script.compile_buffered(script) + return + @pytest.mark.skipif(not test.USE_CUDA, reason='No CUDA') def test_run(): run_script.run(script, workdir, ncount=ncount) return +@pytest.mark.skipif(not test.USE_CUDA, reason='No CUDA') +def test_run_buffered(): + run_script.run(script, workdir, ncount=int(1e6), use_buffer=True) + return def main(): test_run() - os.system("plothist --min=0 --max=1e-6 {}/divpos.h5".format(workdir)) + os.system("plothist --min=0 --max=1e-6 {}/posdiv.h5".format(workdir)) return if __name__ == '__main__': main() From afab738234629f575abd9e18a115404643273aab Mon Sep 17 00:00:00 2001 From: Cole Kendrick Date: Thu, 16 Jun 2022 14:25:13 -0400 Subject: [PATCH 2/3] Adjust buffered run to Process ncount in buffersize blocks --- acc/run_script.py | 38 ++++++++++++++++++++------- tests/instruments/VERDI/test_verdi.py | 10 +++++++ 2 files changed, 39 insertions(+), 9 deletions(-) diff --git a/acc/run_script.py b/acc/run_script.py index 4c466195..52a0e9e4 100644 --- a/acc/run_script.py +++ b/acc/run_script.py @@ -1,3 +1,4 @@ +#!/usr/bin/env python # # Jiao Lin # @@ -10,6 +11,7 @@ def run(script, workdir, ncount, overwrite_datafiles=True, ntotalthreads=int(1e6), threads_per_block=512, use_buffer=False, + buffer_size=int(1e8), **kwds): """run a mcvine.acc simulation script on one node. The script must define the instrument. @@ -35,7 +37,7 @@ def run(script, workdir, ncount, m = imp.load_source('mcvinesim', compiled_script) os.chdir(workdir) try: - m.run(ncount, ntotalthreads=ntotalthreads, threads_per_block=threads_per_block, **kwds) + m.run(ncount, ntotalthreads=ntotalthreads, threads_per_block=threads_per_block, buffer_size=buffer_size, **kwds) finally: os.chdir(curdir) return @@ -110,7 +112,7 @@ def compile_buffered(script, compiled_script=None, **kwds): propagate_defs = '\n'.join(propagate_defs) args = [f'args{i}' for i in range(len(comps))] args = ', '.join(args) - indent = 4*' ' + indent = 8*' ' body = '\n'.join([indent+line for line in body]) text = compiled_script_template_buffered.format( script = script, @@ -236,20 +238,38 @@ def transform_kernel(neutrons, n_neutrons_per_thread, rotmat, offset): abs2rel(neutron[:3], neutron[3:6], rotmat, offset, r, v) -def instrument_kernel(rng_states, N, n_neutrons_per_thread, nblocks, tpb, args): +def instrument_kernel(rng_states, N, n_neutrons_per_thread, nblocks, tpb, buffer_size, args): ''' Driver function to run all kernels needed for the instrument ''' {args}, offsets, rotmats = args - # Create neutron device buffer - neutrons = np.zeros((N, 10), dtype=np.float64) - neutrons_d = cuda.to_device(neutrons) + buffer_size = int(buffer_size) + + iters = math.ceil(N / buffer_size) + max_blocks = math.floor( buffer_size / (n_neutrons_per_thread * tpb)) + print(" Total N split into %s iterations of buffer size %s" % (iters, buffer_size)) + + blocks_left = nblocks + + i = 0 + while blocks_left > 0: + + # Create neutron device buffer + neutrons = np.zeros((buffer_size, 10), dtype=np.float64) + neutrons_d = cuda.to_device(neutrons) + + # adjust launch config for this iter + nblocks = min(blocks_left, max_blocks) + print(" iter %s - nblocks = %s" % (i+1, nblocks)) {instrument_body} - cuda.synchronize() + cuda.synchronize() + + blocks_left -= nblocks + i += 1 #neutrons = neutrons_d.copy_to_host() @@ -265,7 +285,7 @@ def propagate(self): pass InstrumentBase.process_kernel_no_buffer = instrument_kernel -def run(ncount, ntotalthreads=int(1e6), threads_per_block=512, **kwds): +def run(ncount, ntotalthreads=int(1e6), threads_per_block=512, buffer_size=int(1e6), **kwds): instrument = loadInstrument(script, **kwds) ntotthreads = min(ncount, int(ntotalthreads)) @@ -277,7 +297,7 @@ def run(ncount, ntotalthreads=int(1e6), threads_per_block=512, **kwds): base = InstrumentBase(instrument) - instrument_kernel(rng_states, ncount, n_neutrons_per_thread, nblocks, threads_per_block, base.propagate_params) + instrument_kernel(rng_states, ncount, n_neutrons_per_thread, nblocks, threads_per_block, buffer_size, base.propagate_params) cuda.synchronize() saveMonitorOutputs(instrument, scale_factor=1.0/ncount) diff --git a/tests/instruments/VERDI/test_verdi.py b/tests/instruments/VERDI/test_verdi.py index a81a95e0..4e38b50f 100755 --- a/tests/instruments/VERDI/test_verdi.py +++ b/tests/instruments/VERDI/test_verdi.py @@ -22,8 +22,18 @@ def test_run(): ntotalthreads=int(1e6/512*256), threads_per_block=256) return +@pytest.mark.skipif(not test.USE_CUDA, reason='No CUDA') +def test_run_buffered(): + run_script.run( + script, workdir=workdir, ncount=ncount, + ntotalthreads=int(1e8), threads_per_block=512, + use_buffer=True) + return + + def main(): test_run() + # test_run_buffered() # test_compile() return From 679267a65019c7bbaf733cff136b0c0558ca0327 Mon Sep 17 00:00:00 2001 From: Cole Kendrick Date: Fri, 17 Jun 2022 09:38:05 -0400 Subject: [PATCH 3/3] Expand buffered run script test --- acc/run_script.py | 4 +++- acc/test/utils.py | 43 ++++++++++++++++++++++++++++++++++++++++ tests/test_run_script.py | 19 ++++++++++++++++-- 3 files changed, 63 insertions(+), 3 deletions(-) create mode 100644 acc/test/utils.py diff --git a/acc/run_script.py b/acc/run_script.py index 52a0e9e4..b57b4632 100644 --- a/acc/run_script.py +++ b/acc/run_script.py @@ -245,7 +245,9 @@ def instrument_kernel(rng_states, N, n_neutrons_per_thread, nblocks, tpb, buffer {args}, offsets, rotmats = args - buffer_size = int(buffer_size) + # buffer size should be a power of 2, so get closest one + assert buffer_size > tpb + buffer_size = int(2**math.ceil(math.log2(buffer_size))) iters = math.ceil(N / buffer_size) max_blocks = math.floor( buffer_size / (n_neutrons_per_thread * tpb)) diff --git a/acc/test/utils.py b/acc/test/utils.py new file mode 100644 index 00000000..1e72de7e --- /dev/null +++ b/acc/test/utils.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python + +import os +import histogram.hdf as hh +import numpy as np + + +def check_histogram_match(histfile_a, histfile_b, tolerance=1e-8, interactive=False): + """ + Test helper to load two histogram hdf files and compare them within a tolerance + + Parameters + ---------- + histfile_a : str + path of first file to compare + histfile_b : str + path of second file to compare + tolerance : float + tolerance for float comparisons + interactive: bool + Whether to plot each histogram and the relative difference + + Returns + ------- + bool + True if data of both histogram files are within tolerance, false otherwise + """ + + assert os.path.exists(histfile_a) + assert os.path.exists(histfile_b) + + hist_a = hh.load(histfile_a) + hist_b = hh.load(histfile_b) + + if interactive: + from histogram import plot as plotHist + plotHist(hist_a) + plotHist(hist_b) + plotHist((hist_a - hist_b) / hist_a) + + assert hist_a.shape() == hist_b.shape() + return np.allclose(hist_a.data().storage(), hist_b.data().storage(), + atol=tolerance) diff --git a/tests/test_run_script.py b/tests/test_run_script.py index 93df400f..a0360104 100755 --- a/tests/test_run_script.py +++ b/tests/test_run_script.py @@ -3,6 +3,7 @@ import os, pytest thisdir = os.path.abspath(os.path.dirname(__file__)) from mcvine.acc import test +from mcvine.acc.test.utils import check_histogram_match script = os.path.join(thisdir, 'acc_sgm_instrument.py') workdir = 'out.acc_sgm' @@ -27,8 +28,22 @@ def test_run(): @pytest.mark.skipif(not test.USE_CUDA, reason='No CUDA') def test_run_buffered(): - run_script.run(script, workdir, ncount=int(1e6), use_buffer=True) - return + # run with ~1e6 neutrons, 2 neutrons per thread - for this test, this needs 2 neutrons per + # thread to ensure the same RNG states for each neutron when run with two iterations below + run_script.run(script, workdir, ncount=int(2 ** 20), + ntotalthreads=int(2 ** 19), use_buffer=True) + # run again with a smaller buffer size to force multiple iterations + # ~1e6 neutrons, 1 neutron per thread, but 2 iterations of 1024 blocks to work within the max buffer size + workdir_small_buffer = os.path.join(workdir, 'small_buffer') + run_script.run(script, workdir_small_buffer, ncount=int(2 ** 20), + ntotalthreads=int(2 ** 20), use_buffer=True, + buffer_size=int(2 ** 19)) + + # ensure both results match + assert check_histogram_match(os.path.join(workdir, "posdiv.h5"), + os.path.join(workdir_small_buffer, "posdiv.h5"), + interactive=True) + def main(): test_run()