From bed6642be8f29d36a8340526dba807e414d8b7ea Mon Sep 17 00:00:00 2001 From: dimapihtar Date: Wed, 4 Mar 2026 07:16:12 -0800 Subject: [PATCH 1/8] find optimal number of workers Signed-off-by: dimapihtar --- tools/preprocess_data.py | 276 +++++++++++++++++++++++---------------- 1 file changed, 160 insertions(+), 116 deletions(-) diff --git a/tools/preprocess_data.py b/tools/preprocess_data.py index c1f19f6be31..70527d4299c 100644 --- a/tools/preprocess_data.py +++ b/tools/preprocess_data.py @@ -5,15 +5,15 @@ import math import json import os +import pickle import sys sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir))) import time import gzip import glob -import torch -import numpy as np import multiprocessing +import numpy as np try: import nltk from nltk.tokenize.punkt import PunktLanguageVars @@ -110,9 +110,10 @@ def encode(self, json_line): class Partition(object): - def __init__(self, args, workers): + def __init__(self, args, workers, performance): self.args = args self.workers = workers + self.performance = performance def print_processing_stats(self, count, proc_start, total_bytes_processed): if count % self.args.log_interval == 0: @@ -122,6 +123,7 @@ def print_processing_stats(self, count, proc_start, total_bytes_processed): print(f"Processed {count} documents", f"({count/elapsed} docs/s, {mbs} MB/s).", file=sys.stderr) + self.performance[self.workers].append(count/elapsed) def split_sentences(self, file_name): input_file_name, output_file_name = file_name @@ -185,6 +187,9 @@ def process_json_file(self, file_name): builders[key].add_document(doc[key], sentence_lens[key]) self.print_processing_stats(i, proc_start, total_bytes_processed) + with open("performance.pkl", "wb") as perf: + pickle.dump(self.performance, perf) + fin.close() builders[key].finalize(output_idx_files[key]) @@ -216,6 +221,19 @@ def get_args(): help=('Number of worker processes to launch.' 'A good default for fast pre-processing ' 'is: (workers * partitions) = available CPU cores.')) + group.add_argument('--find-optimal-num-workers', action='store_true', + help=('Find optimal number of workers.' + 'Script will run few small jobs with ' + 'different number of workers to define ' + 'optimal number of workers in terms of performance.')) + group.add_argument('--workers-to-check', nargs='+', default=[16, 32, 64], + help=('list of workers to run data processing with ' + 'to find optimal number of workers. ' + 'Works only when --find-optimal-num-workers is enabled. ')) + group.add_argument('--max-documents', type=int, default=100_000, + help=('Maximum number of documents to preprocess ' + 'to find optimal number of workers.' + 'Works only when --find-optimal-num-workers is enabled. ')) group.add_argument('--partitions', type=int, default=1, help='Number of file partitions') group.add_argument('--log-interval', type=int, default=1000, @@ -260,88 +278,121 @@ def check_files_exist(in_ss_out_names, key, num_partitions): def main(): args = get_args() - if args.split_sentences: - if nltk_available: - nltk.download("punkt", quiet=True, download_dir=os.environ.get("NLTK_DATA")) - else: - raise Exception( - "nltk library required for sentence splitting is not available.") - - in_ss_out_names = [] - if args.partitions == 1: - file_name, extension = os.path.splitext(args.input) - sentence_split_file = file_name + "_ss" + extension - file_names = { - 'partition': args.input, - 'sentence_split': sentence_split_file, - 'output_prefix': args.output_prefix} - in_ss_out_names.append(file_names) + workers = args.workers_to_check if args.find_optimal_num_workers else [args.workers] + for num_workers in workers: + if num_workers % args.partitions != 0: + print( + f"Removing num_workers ({num_workers}) from workers list " + f"because it's not divisible by num_partitions ({args.partitions})" + ) + workers.remove(num_workers) + assert workers, "Please, provide valid number of workers which is divisible by number of partitions." + if args.find_optimal_num_workers: + args.log_interval = 1000 + performance = {num_workers: [] for num_workers in workers} else: - in_file_names = glob.glob(args.input) - - # Count total number of lines across .jsonl files - if args.keep_sequential_samples: - total_sample_count = 0 - for filename in in_file_names: - with open(filename, "r") as fin: - for fc, _ in enumerate(fin): - pass - total_sample_count += (fc + 1) - partition_size = math.ceil(total_sample_count / args.partitions) - - # create .jsonl parition files - for idx in range(args.partitions): - in_ss_out_name = get_file_name(args, idx) - in_ss_out_names.append(in_ss_out_name) - - # check to see if paritions were already created - partitions_present = check_files_exist(in_ss_out_names, 'partition', args.partitions) + performance = None - # check to see if paritions with split sentences already created - split_sentences_present = check_files_exist(in_ss_out_names, 'sentence_split', args.partitions) + for num_workers in workers: + print(f"Processing data with {num_workers} workers.") + if args.split_sentences: + if nltk_available: + nltk.download("punkt", quiet=True, download_dir=os.environ.get("NLTK_DATA")) + else: + raise Exception( + "nltk library required for sentence splitting is not available.") - if not partitions_present and not split_sentences_present: - # populate .jsonl partition files from parent files - partitioned_input_files = [] + in_ss_out_names = [] + if args.partitions == 1: + file_name, extension = os.path.splitext(args.input) + sentence_split_file = file_name + "_ss" + extension + file_names = { + 'partition': args.input, + 'sentence_split': sentence_split_file, + 'output_prefix': args.output_prefix} + in_ss_out_names.append(file_names) + else: + in_file_names = glob.glob(args.input) + + # Count total number of lines across .jsonl files + if args.keep_sequential_samples: + total_sample_count = 0 + for filename in in_file_names: + with open(filename, "r") as fin: + for fc, _ in enumerate(fin): + pass + total_sample_count += (fc + 1) + partition_size = math.ceil(total_sample_count / args.partitions) + + # create .jsonl parition files for idx in range(args.partitions): - partitioned_input_file = open(in_ss_out_names[idx]['partition'], 'w') - partitioned_input_files.append(partitioned_input_file) - - index = 0 - if args.keep_sequential_samples: line_count = 0 - for in_file_name in in_file_names: - # support for gzip files - if in_file_name.endswith(".gz"): - fin = gzip.open(in_file_name, 'rt') - else: - fin = open(in_file_name, 'r', encoding='utf-8') - - for line in fin: - partitioned_input_files[index].write(line) - if args.keep_sequential_samples: - line_count += 1 - if line_count % partition_size == 0: - index += 1 + in_ss_out_name = get_file_name(args, idx) + in_ss_out_names.append(in_ss_out_name) + + # check to see if paritions were already created + partitions_present = check_files_exist(in_ss_out_names, 'partition', args.partitions) + + # check to see if paritions with split sentences already created + split_sentences_present = check_files_exist(in_ss_out_names, 'sentence_split', args.partitions) + + if not partitions_present and not split_sentences_present: + # populate .jsonl partition files from parent files + partitioned_input_files = [] + for idx in range(args.partitions): + partitioned_input_file = open(in_ss_out_names[idx]['partition'], 'w') + partitioned_input_files.append(partitioned_input_file) + + index = 0 + if args.keep_sequential_samples: line_count = 0 + for in_file_name in in_file_names: + # support for gzip files + if in_file_name.endswith(".gz"): + fin = gzip.open(in_file_name, 'rt') else: - index = (index + 1)%args.partitions + fin = open(in_file_name, 'r', encoding='utf-8') - fin.close() + for line in fin: + partitioned_input_files[index].write(line) + if args.keep_sequential_samples: + line_count += 1 + if line_count % partition_size == 0: + index += 1 + else: + index = (index + 1)%args.partitions - for idx in range(args.partitions): - partitioned_input_files[idx].close() + fin.close() + + for idx in range(args.partitions): + partitioned_input_files[idx].close() + + assert num_workers % args.partitions == 0 + partition = Partition(args, num_workers//args.partitions, performance) + + # check to see if paritions with split sentences already created + split_sentences_present = check_files_exist(in_ss_out_names, 'sentence_split', args.partitions) + + # split sentences in partition files + if args.split_sentences and not split_sentences_present: + processes = [] + for name in in_ss_out_names: + p = multiprocessing.Process(target=partition.split_sentences, + args=((name['partition'], name['sentence_split']),)) + p.start() + processes.append(p) - assert args.workers % args.partitions == 0 - partition = Partition(args, args.workers//args.partitions) + for p in processes: + p.join() - # check to see if paritions with split sentences already created - split_sentences_present = check_files_exist(in_ss_out_names, 'sentence_split', args.partitions) + if args.partitions == 1: + continue - # split sentences in partition files - if args.split_sentences and not split_sentences_present: + + # encode partition files in parallel processes = [] + input_key = 'sentence_split' if args.split_sentences else 'partition' for name in in_ss_out_names: - p = multiprocessing.Process(target=partition.split_sentences, - args=((name['partition'], name['sentence_split']),)) + p = multiprocessing.Process(target=partition.process_json_file, + args=((name[input_key], name['output_prefix']),)) p.start() processes.append(p) @@ -349,51 +400,44 @@ def main(): p.join() if args.partitions == 1: - return - - - # encode partition files in parallel - processes = [] - input_key = 'sentence_split' if args.split_sentences else 'partition' - for name in in_ss_out_names: - p = multiprocessing.Process(target=partition.process_json_file, - args=((name[input_key], name['output_prefix']),)) - p.start() - processes.append(p) - - for p in processes: - p.join() - - if args.partitions == 1: - return - - # merge bin/idx partitions - level = "document" - if args.split_sentences: - level = "sentence" - - output_bin_files = {} - output_idx_files = {} - builders = {} - tokenizer = build_tokenizer(args) - - for key in args.json_keys: - output_bin_files[key] = "{}_{}_{}.bin".format(args.output_prefix, - key, level) - output_idx_files[key] = "{}_{}_{}.idx".format(args.output_prefix, - key, level) - builders[key] = indexed_dataset.IndexedDatasetBuilder( - output_bin_files[key], - dtype=indexed_dataset.DType.optimal_dtype(tokenizer.vocab_size), - ) + continue - for name in in_ss_out_names: - parition_output_prefix = name['output_prefix'] - full_partition_output_prefix = "{}_{}_{}".format(parition_output_prefix, - key, level) - builders[key].add_index(full_partition_output_prefix) - builders[key].finalize(output_idx_files[key]) + # merge bin/idx partitions + level = "document" + if args.split_sentences: + level = "sentence" + + output_bin_files = {} + output_idx_files = {} + builders = {} + tokenizer = build_tokenizer(args) + + for key in args.json_keys: + output_bin_files[key] = "{}_{}_{}.bin".format(args.output_prefix, + key, level) + output_idx_files[key] = "{}_{}_{}.idx".format(args.output_prefix, + key, level) + builders[key] = indexed_dataset.IndexedDatasetBuilder( + output_bin_files[key], + dtype=indexed_dataset.DType.optimal_dtype(tokenizer.vocab_size), + ) + + for name in in_ss_out_names: + parition_output_prefix = name['output_prefix'] + full_partition_output_prefix = "{}_{}_{}".format(parition_output_prefix, + key, level) + builders[key].add_index(full_partition_output_prefix) + builders[key].finalize(output_idx_files[key]) + + with open("performance.pkl", "rb") as perf: + performance = pickle.load(perf) + for key, value in performance.items(): + performance[key] = np.mean(value) + + max_key = max(performance, key=performance.get) + max_value = performance[max_key] + print(f"Optimal number of workers is {max_key} with avg. {max_value}.") if __name__ == '__main__': From 00736f4c796c3b9b4bc0609320064a0afdcd8edc Mon Sep 17 00:00:00 2001 From: dimapihtar Date: Wed, 4 Mar 2026 09:42:25 -0800 Subject: [PATCH 2/8] fix implementation Signed-off-by: dimapihtar --- tools/preprocess_data.py | 70 +++++++++++++++++++++++++++++----------- 1 file changed, 51 insertions(+), 19 deletions(-) diff --git a/tools/preprocess_data.py b/tools/preprocess_data.py index 70527d4299c..7ad3622563b 100644 --- a/tools/preprocess_data.py +++ b/tools/preprocess_data.py @@ -5,7 +5,6 @@ import math import json import os -import pickle import sys sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir))) @@ -110,10 +109,10 @@ def encode(self, json_line): class Partition(object): - def __init__(self, args, workers, performance): + def __init__(self, args, workers): self.args = args self.workers = workers - self.performance = performance + self.performance = {self.workers: []} def print_processing_stats(self, count, proc_start, total_bytes_processed): if count % self.args.log_interval == 0: @@ -187,8 +186,10 @@ def process_json_file(self, file_name): builders[key].add_document(doc[key], sentence_lens[key]) self.print_processing_stats(i, proc_start, total_bytes_processed) - with open("performance.pkl", "wb") as perf: - pickle.dump(self.performance, perf) + if self.args.find_optimal_num_workers: + perf_file_path = os.path.join(self.args.performance_dir, f"{self.workers}_workers.json") + with open(perf_file_path, "w") as perf_file: + json.dump(self.performance, perf_file) fin.close() builders[key].finalize(output_idx_files[key]) @@ -233,7 +234,10 @@ def get_args(): group.add_argument('--max-documents', type=int, default=100_000, help=('Maximum number of documents to preprocess ' 'to find optimal number of workers.' - 'Works only when --find-optimal-num-workers is enabled. ')) + 'Works only when --find-optimal-num-workers is enabled.')) + group.add_argument('--performance-dir', type=str, default=None, + help=('Path where to save performance results. ' + 'Works only when --find-optimal-num-workers is enabled.')) group.add_argument('--partitions', type=int, default=1, help='Number of file partitions') group.add_argument('--log-interval', type=int, default=1000, @@ -275,6 +279,43 @@ def check_files_exist(in_ss_out_names, key, num_partitions): return True +def find_optimal_num_workers(directory: str): + """Parses saved .json files with perf. numbers and prints optimal number of workers""" + results = [] + + for filename in os.listdir(directory): + if not filename.endswith(".json"): + continue + + filepath = os.path.join(directory, filename) + + with open(filepath, "r") as f: + data = json.load(f) + + # each file assumed to contain a single {workers: [perf_list]} + for workers, perf_list in data.items(): + workers = int(workers) + avg_perf = np.mean(perf_list) + results.append((workers, avg_perf)) + + # sort by average performance (descending: fastest first) + results.sort(key=lambda x: x[1], reverse=True) + + print("\n-----------------------------------") + print("Performance results (fastest → slowest):") + for workers, avg_perf in results: + print(f"{workers} workers → avg docs/s: {avg_perf:.4f}") + + best_workers, best_perf = results[0] + + print("\n-----------------------------------") + print( + f"The most optimal num of workers is {best_workers} " + f"with avg. preprocessed docs/s {best_perf:.4f}." + ) + print("-----------------------------------") + + def main(): args = get_args() @@ -288,10 +329,9 @@ def main(): workers.remove(num_workers) assert workers, "Please, provide valid number of workers which is divisible by number of partitions." if args.find_optimal_num_workers: + assert args.performance_dir, "Directory where to save performance results should be specified." + os.makedirs(args.performance_dir, exist_ok=True) args.log_interval = 1000 - performance = {num_workers: [] for num_workers in workers} - else: - performance = None for num_workers in workers: print(f"Processing data with {num_workers} workers.") @@ -366,7 +406,7 @@ def main(): partitioned_input_files[idx].close() assert num_workers % args.partitions == 0 - partition = Partition(args, num_workers//args.partitions, performance) + partition = Partition(args, num_workers//args.partitions) # check to see if paritions with split sentences already created split_sentences_present = check_files_exist(in_ss_out_names, 'sentence_split', args.partitions) @@ -429,15 +469,7 @@ def main(): builders[key].add_index(full_partition_output_prefix) builders[key].finalize(output_idx_files[key]) - with open("performance.pkl", "rb") as perf: - performance = pickle.load(perf) - for key, value in performance.items(): - performance[key] = np.mean(value) - - max_key = max(performance, key=performance.get) - max_value = performance[max_key] - - print(f"Optimal number of workers is {max_key} with avg. {max_value}.") + find_optimal_num_workers(args.performance_dir) if __name__ == '__main__': From de1cc036275d2c681d2baaf111517d1053cf89c6 Mon Sep 17 00:00:00 2001 From: dimapihtar Date: Wed, 4 Mar 2026 10:04:21 -0800 Subject: [PATCH 3/8] fix partitions Signed-off-by: dimapihtar --- tools/preprocess_data.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tools/preprocess_data.py b/tools/preprocess_data.py index 7ad3622563b..3ebb2bb6028 100644 --- a/tools/preprocess_data.py +++ b/tools/preprocess_data.py @@ -279,15 +279,15 @@ def check_files_exist(in_ss_out_names, key, num_partitions): return True -def find_optimal_num_workers(directory: str): +def find_optimal_num_workers(args): """Parses saved .json files with perf. numbers and prints optimal number of workers""" results = [] - for filename in os.listdir(directory): + for filename in os.listdir(args.performance_dir): if not filename.endswith(".json"): continue - filepath = os.path.join(directory, filename) + filepath = os.path.join(args.performance_dir, filename) with open(filepath, "r") as f: data = json.load(f) @@ -304,14 +304,14 @@ def find_optimal_num_workers(directory: str): print("\n-----------------------------------") print("Performance results (fastest → slowest):") for workers, avg_perf in results: - print(f"{workers} workers → avg docs/s: {avg_perf:.4f}") + print(f"{workers * args.partitions} workers → avg. docs/s: {avg_perf:.4f}") best_workers, best_perf = results[0] print("\n-----------------------------------") print( - f"The most optimal num of workers is {best_workers} " - f"with avg. preprocessed docs/s {best_perf:.4f}." + f"The most optimal num of workers is {best_workers * args.partitions} " + f"with avg. preprocessed docs/s: {best_perf:.4f}." ) print("-----------------------------------") @@ -469,7 +469,7 @@ def main(): builders[key].add_index(full_partition_output_prefix) builders[key].finalize(output_idx_files[key]) - find_optimal_num_workers(args.performance_dir) + find_optimal_num_workers(args) if __name__ == '__main__': From a31f42b5cae171412db57a8ea8ab064dd7cea51c Mon Sep 17 00:00:00 2001 From: dimapihtar Date: Wed, 4 Mar 2026 10:11:27 -0800 Subject: [PATCH 4/8] fix comments Signed-off-by: dimapihtar --- tools/preprocess_data.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tools/preprocess_data.py b/tools/preprocess_data.py index 3ebb2bb6028..f2813e8a6a7 100644 --- a/tools/preprocess_data.py +++ b/tools/preprocess_data.py @@ -186,6 +186,7 @@ def process_json_file(self, file_name): builders[key].add_document(doc[key], sentence_lens[key]) self.print_processing_stats(i, proc_start, total_bytes_processed) + # Save performance data (preprocessed docs/s) if self.args.find_optimal_num_workers: perf_file_path = os.path.join(self.args.performance_dir, f"{self.workers}_workers.json") with open(perf_file_path, "w") as perf_file: @@ -405,7 +406,6 @@ def main(): for idx in range(args.partitions): partitioned_input_files[idx].close() - assert num_workers % args.partitions == 0 partition = Partition(args, num_workers//args.partitions) # check to see if paritions with split sentences already created @@ -469,6 +469,7 @@ def main(): builders[key].add_index(full_partition_output_prefix) builders[key].finalize(output_idx_files[key]) + # Find the most optimal number of workers find_optimal_num_workers(args) if __name__ == '__main__': From 3ca08810f3322171095bf1ccb396db6a0f207c82 Mon Sep 17 00:00:00 2001 From: dimapihtar Date: Wed, 4 Mar 2026 10:18:28 -0800 Subject: [PATCH 5/8] add max_documents Signed-off-by: dimapihtar --- tools/preprocess_data.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/tools/preprocess_data.py b/tools/preprocess_data.py index f2813e8a6a7..5b0ed0882ce 100644 --- a/tools/preprocess_data.py +++ b/tools/preprocess_data.py @@ -181,10 +181,13 @@ def process_json_file(self, file_name): total_bytes_processed = 0 print("Time to startup:", startup_end - startup_start) for i, (doc, sentence_lens, bytes_processed) in enumerate(encoded_docs, start=1): - total_bytes_processed += bytes_processed - for key in doc.keys(): - builders[key].add_document(doc[key], sentence_lens[key]) - self.print_processing_stats(i, proc_start, total_bytes_processed) + if self.args.find_optimal_num_workers and i > self.args.max_documents: + break + else: + total_bytes_processed += bytes_processed + for key in doc.keys(): + builders[key].add_document(doc[key], sentence_lens[key]) + self.print_processing_stats(i, proc_start, total_bytes_processed) # Save performance data (preprocessed docs/s) if self.args.find_optimal_num_workers: @@ -470,7 +473,8 @@ def main(): builders[key].finalize(output_idx_files[key]) # Find the most optimal number of workers - find_optimal_num_workers(args) + if args.find_optimal_num_workers: + find_optimal_num_workers(args) if __name__ == '__main__': From f2c8015003171a27d394de25f49d00c992fde0b2 Mon Sep 17 00:00:00 2001 From: dimapihtar Date: Wed, 4 Mar 2026 10:56:42 -0800 Subject: [PATCH 6/8] update documentation Signed-off-by: dimapihtar --- docs/user-guide/data-preparation.md | 27 +++++++++++++++++++++++++++ tools/preprocess_data.py | 6 +++--- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/docs/user-guide/data-preparation.md b/docs/user-guide/data-preparation.md index 18da2d80fe1..27e9b18418c 100644 --- a/docs/user-guide/data-preparation.md +++ b/docs/user-guide/data-preparation.md @@ -46,6 +46,33 @@ python tools/preprocess_data.py \ | `--workers` | Number of parallel workers for processing | | `--append-eod` | Add end-of-document token | +## Finding Optimal Number of Workers + +Use the `--find-optimal-num-workers` flag to find number of workers which gives the best performance in terms of preprocessed documents per second. +Script will lauch a few short data preprocessing runs with a different number of workers to define the fastest run in respect to collected performance data. + +```bash +python tools/preprocess_data.py \ + --input data.jsonl \ + --output-prefix processed_data \ + --tokenizer-type HuggingFaceTokenizer \ + --tokenizer-model /path/to/tokenizer.model \ + --workers 8 \ + --find-optimal-num-workers \ + --workers-to-check 4 8 16 32 \ + --performance-dir /path/to/save/perf/results \ + --max-documents 50000 +``` + +**Required arguments** + +| Argument | Description | +|----------|-------------| +| `--find-optimal-num-workers` | Activates search of optimal number of workers | +| `--workers-to-check` | List of possible number of workers to run | +| `--performance-dir` | Directory where to save performance results | +| `--max-documents` | Number of documents to be preprocessed during each run | + ## Output Files The preprocessing tool generates two files: diff --git a/tools/preprocess_data.py b/tools/preprocess_data.py index 5b0ed0882ce..27ba3429116 100644 --- a/tools/preprocess_data.py +++ b/tools/preprocess_data.py @@ -231,7 +231,7 @@ def get_args(): 'Script will run few small jobs with ' 'different number of workers to define ' 'optimal number of workers in terms of performance.')) - group.add_argument('--workers-to-check', nargs='+', default=[16, 32, 64], + group.add_argument('--workers-to-check', nargs='+', type=int, default=[16, 32, 64], help=('list of workers to run data processing with ' 'to find optimal number of workers. ' 'Works only when --find-optimal-num-workers is enabled. ')) @@ -307,8 +307,8 @@ def find_optimal_num_workers(args): print("\n-----------------------------------") print("Performance results (fastest → slowest):") - for workers, avg_perf in results: - print(f"{workers * args.partitions} workers → avg. docs/s: {avg_perf:.4f}") + for i, (workers, avg_perf) in enumerate(results): + print(f"{i+1}. {workers * args.partitions} workers → avg. docs/s: {avg_perf:.4f}") best_workers, best_perf = results[0] From 77823ecea2dbe76ac666f6f64c9db623f32b058a Mon Sep 17 00:00:00 2001 From: dimapihtar Date: Wed, 4 Mar 2026 10:59:11 -0800 Subject: [PATCH 7/8] update docs Signed-off-by: dimapihtar --- docs/user-guide/data-preparation.md | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/docs/user-guide/data-preparation.md b/docs/user-guide/data-preparation.md index 27e9b18418c..45000390ff0 100644 --- a/docs/user-guide/data-preparation.md +++ b/docs/user-guide/data-preparation.md @@ -73,6 +73,21 @@ python tools/preprocess_data.py \ | `--performance-dir` | Directory where to save performance results | | `--max-documents` | Number of documents to be preprocessed during each run | +**Output example** + +```bash +----------------------------------- +Performance results (fastest → slowest): +1. 16 workers → avg. docs/s: 9606.6476 +2. 32 workers → avg. docs/s: 9275.3284 +3. 8 workers → avg. docs/s: 9151.9280 +4. 4 workers → avg. docs/s: 6391.3819 + +----------------------------------- +The most optimal num of workers is 16 with avg. preprocessed docs/s: 9606.6476. +----------------------------------- +``` + ## Output Files The preprocessing tool generates two files: From f1c263148846f2c0ce3c262076520bcb77665b4e Mon Sep 17 00:00:00 2001 From: dimapihtar Date: Wed, 4 Mar 2026 11:54:21 -0800 Subject: [PATCH 8/8] add unit test Signed-off-by: dimapihtar --- tests/unit_tests/data/test_preprocess_data.py | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/tests/unit_tests/data/test_preprocess_data.py b/tests/unit_tests/data/test_preprocess_data.py index e6922ec3748..618d873a079 100644 --- a/tests/unit_tests/data/test_preprocess_data.py +++ b/tests/unit_tests/data/test_preprocess_data.py @@ -2,6 +2,7 @@ import json import os +import runpy import sys import tempfile @@ -201,6 +202,42 @@ def test_preprocess_data_gpt(): do_test_preprocess_data(temp_dir, extra_args=gpt_args) +def test_preprocess_data_gpt_optimal_workers(): + with tempfile.TemporaryDirectory() as temp_dir: + + # gpt specific args + gpt_args = [ + "--input", + "/opt/data/datasets/dclm/dclm.jsonl", + "--output-prefix", + f"{temp_dir}/optimal_workers", + "--tokenizer-type", + "GPT2BPETokenizer", + "--vocab-file", + "/opt/data/tokenizers/megatron/gpt2-vocab.json", + "--merge-file", + "/opt/data/tokenizers/megatron/gpt2-merges.txt", + "--append-eod", + "--workers", + "2", + "--log-interval", + "1", + "--find-optimal-num-workers", + "--workers-to-check", + "2", + "4", + "8", + "--performance-dir", + f"{temp_dir}/perf", + "--max-documents", + "1002", + ] + sys.argv = ["/opt/megatron-lm/tools/preprocess_data.py"] + gpt_args + runpy.run_path("/opt/megatron-lm/tools/preprocess_data.py", run_name="__main__") + + assert os.path.exists(f"{temp_dir}/perf") + + def bert_vocab(odir): if os.path.exists(__LOCAL_BERT_VOCAB): return __LOCAL_BERT_VOCAB @@ -237,3 +274,4 @@ def test_preprocess_data_bert(): if __name__ == "__main__": test_preprocess_data_gpt() test_preprocess_data_bert() + test_preprocess_data_gpt_optimal_workers()