From 0875232c20c74262c88eefa90122b834fbb3299b Mon Sep 17 00:00:00 2001 From: bennahugo Date: Fri, 7 Mar 2025 12:14:39 +0200 Subject: [PATCH 1/2] Update benchmark to include chunking options Fixes and enhances: - Assumptions of bytes per row - Add rechunking options to enlarge chunk size per worker for tuning --- scripts/mvf_read_benchmark.py | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/scripts/mvf_read_benchmark.py b/scripts/mvf_read_benchmark.py index 4c007ca8..b4bbd64d 100755 --- a/scripts/mvf_read_benchmark.py +++ b/scripts/mvf_read_benchmark.py @@ -24,6 +24,7 @@ import numpy as np import katdal +from ipdb import set_trace from katdal.lazy_indexer import DaskLazyIndexer parser = argparse.ArgumentParser() @@ -34,6 +35,9 @@ parser.add_argument('--joint', action='store_true', help='Load vis, weights, flags together') parser.add_argument('--applycal', help='Calibration solutions to apply') parser.add_argument('--workers', type=int, help='Number of dask workers') +parser.add_argument('--blperchunk', type=int, help='Adjust chunks to specified number of baselines per chunk') +parser.add_argument('--chperchunk', type=int, help='Adjust chunks to specified number of channels per chunk') + args = parser.parse_args() logging.basicConfig(level='INFO', format='%(asctime)s [%(levelname)s] %(message)s') @@ -51,7 +55,24 @@ f.select(dumps=np.s_[:args.dumps]) # Trigger creation of the dask graphs, population of sensor cache for applycal etc _ = (f.vis[0, 0, 0], f.weights[0, 0, 0], f.flags[0, 0, 0]) +nblc = min(args.blperchunk if args.blperchunk else f.vis.dataset.chunksize[0], + f.vis.dataset.shape[0]) +nch = min(args.chperchunk if args.chperchunk else f.vis.dataset.chunksize[1], + f.vis.dataset.shape[1]) +cs = f.vis.dataset.chunksize +cs = tuple([nblc, nch, cs[2]]) +f.vis.dataset.rechunk(cs) +f.weights.dataset.rechunk(cs) +f.flags.dataset.rechunk(cs) +csMB = np.prod(tuple([*cs[0:2], args.time])) * (f.vis.dataset.nbytes // f.vis.dataset.size) / 1024.0**2 +visshpGB = f.vis.dataset.nbytes / 1024.0**3 +logging.info(f'Selected visibility chunk size {csMB:.2f} MiB of ' + f'total selection size {visshpGB:.2f} GiB') logging.info('Selection complete') +chunk_sizeB = f.vis.dataset.nbytes // f.vis.dataset.size + \ + f.weights.dataset.nbytes // f.weights.dataset.size + \ + f.flags.dataset.nbytes // f.weights.dataset.size + start = time.time() last_time = start for st in range(0, f.shape[0], args.time): @@ -65,8 +86,8 @@ current_time = time.time() elapsed = current_time - last_time last_time = current_time - size = np.prod(vis.shape) * 10 - logging.info('Loaded %d dumps (%.3f MB/s)', vis.shape[0], size / elapsed / 1e6) -size = np.prod(f.shape) * 10 + size = np.prod(vis.shape) * chunk_sizeB / 1024.**2 + logging.info('Loaded %d dumps (%.3f MiB/s)', vis.shape[0], size / elapsed) +size = np.prod(f.shape) * chunk_sizeB / 1024.**2 elapsed = time.time() - start -logging.info('Loaded %d bytes in %.3f s (%.3f MB/s)', size, elapsed, size / elapsed / 1e6) +logging.info('Loaded %d bytes in %.3f s (%.3f MiB/s)', size, elapsed, size / elapsed) From 08eff6cc04beeba933bd7120a1893b25b06f2035 Mon Sep 17 00:00:00 2001 From: Benjamin Hugo Date: Fri, 7 Mar 2025 12:31:08 +0200 Subject: [PATCH 2/2] Remove debug tracer import Remove ipdb --- scripts/mvf_read_benchmark.py | 1 - 1 file changed, 1 deletion(-) diff --git a/scripts/mvf_read_benchmark.py b/scripts/mvf_read_benchmark.py index b4bbd64d..021763ab 100755 --- a/scripts/mvf_read_benchmark.py +++ b/scripts/mvf_read_benchmark.py @@ -24,7 +24,6 @@ import numpy as np import katdal -from ipdb import set_trace from katdal.lazy_indexer import DaskLazyIndexer parser = argparse.ArgumentParser()