From 264bb2c6fe9ed2c301ea5a27f88b2fdde04380ee Mon Sep 17 00:00:00 2001 From: Pablo Saiz Date: Wed, 11 Sep 2024 19:46:21 +0200 Subject: [PATCH 1/3] eos: limit the number of index_files to 2k per directory --- cms-2016-simulated-datasets/code/eos_store.py | 60 ++++++++++++------- 1 file changed, 37 insertions(+), 23 deletions(-) diff --git a/cms-2016-simulated-datasets/code/eos_store.py b/cms-2016-simulated-datasets/code/eos_store.py index 0708d7aca..5ec1f235f 100644 --- a/cms-2016-simulated-datasets/code/eos_store.py +++ b/cms-2016-simulated-datasets/code/eos_store.py @@ -104,54 +104,68 @@ def get_dataset_volume_files(dataset, volume): return files -def create_index_file(dataset, volume, files, eos_dir, style='txt'): +def create_index_file(filebase, files, eos_dir, style, volume_dir): "Create index file in the given style format (text, json)." - filebase = get_dataset_index_file_base(dataset) + '_' + \ - volume + '_' + \ - 'file_index' + filename = filebase + '.' + style - fdesc = open(eos_dir + '/' + filename, 'w') - if style == 'txt': - for afile in files: - fdesc.write(afile['uri']) + try: + fdesc = open(f"{eos_dir}/{str(volume_dir)}/{filename}", 'w') + if style == 'txt': + for afile in files: + fdesc.write(afile['uri']) + fdesc.write('\n') + elif style == 'json': + fdesc.write(json.dumps(files, indent=2, sort_keys=True)) fdesc.write('\n') - elif style == 'json': - fdesc.write(json.dumps(files, indent=2, sort_keys=True)) - fdesc.write('\n') - fdesc.close() + fdesc.close() + except Exception as exc: + print("Error doing the file '", filename, "': ", exc) + return None return filename -def copy_index_file(dataset, volume, filename, eos_dir): + +def copy_index_file(dataset, volume, filename, eos_dir, volume_dir): "Copy index file filename to its final destination on EOS." dataset_location = get_dataset_location(dataset) - cmd = 'eos cp ' + eos_dir + '/' + filename + ' ' + dataset_location + '/file-indexes/' + filename + cmd = 'eos cp ' + eos_dir + '/' + str(volume_dir) + '/' + filename + ' ' + dataset_location + '/file-indexes/' + filename if DEBUG: print(cmd) else: os.system(cmd) -def create_index_files(dataset, volume, eos_dir): +def create_index_files(dataset, volume, eos_dir, volume_dir): "Create index files for the given dataset and volumes." files = get_dataset_volume_files(dataset, volume) - filename = create_index_file(dataset, volume, files, eos_dir, 'txt') - copy_index_file(dataset, volume, filename, eos_dir) - filename = create_index_file(dataset, volume, files, eos_dir, 'json') - copy_index_file(dataset, volume, filename, eos_dir) + filebase = get_dataset_index_file_base(dataset) + '_' + \ + volume + '_' + 'file_index' + + for output_type in ['txt', 'json']: + filename = create_index_file(filebase, files, eos_dir, output_type, volume_dir) + if filename: + copy_index_file(dataset, volume, filename, eos_dir, volume_dir) def main(datasets = [], eos_dir = './inputs/eos-file-indexes/'): "Do the job." - if not os.path.exists(eos_dir): - os.makedirs(eos_dir) + volume_dir = 0 + volume_counter=0 + if not os.path.isdir(f"{eos_dir}/{str(volume_dir)}"): + os.makedirs(f"{eos_dir}/{str(volume_dir)}") for dataset in datasets: volumes = get_dataset_volumes(dataset) for volume in volumes: - create_index_files(dataset, volume, eos_dir) - + create_index_files(dataset, volume, eos_dir, volume_dir) + volume_counter+=1 + if volume_counter >999: + volume_counter =0 + volume_dir +=1 + + if not os.path.isdir(f"{eos_dir}/{str(volume_dir)}"): + os.makedirs(f"{eos_dir}/{str(volume_dir)}") if __name__ == '__main__': main() From 2472779296dbe6bfa16310bcbfcaee37f2818fd9 Mon Sep 17 00:00:00 2001 From: Pablo Saiz Date: Wed, 11 Sep 2024 19:47:11 +0200 Subject: [PATCH 2/3] eos: black and isort the file --- cms-2016-simulated-datasets/code/eos_store.py | 145 +++++++++++------- 1 file changed, 89 insertions(+), 56 deletions(-) diff --git a/cms-2016-simulated-datasets/code/eos_store.py b/cms-2016-simulated-datasets/code/eos_store.py index 5ec1f235f..677daffa9 100644 --- a/cms-2016-simulated-datasets/code/eos_store.py +++ b/cms-2016-simulated-datasets/code/eos_store.py @@ -15,22 +15,20 @@ import json import os -import sys import re import subprocess -from utils import get_dataset_name, \ - get_dataset_runperiod, \ - get_dataset_version, \ - get_dataset_format, \ - get_dataset_year +import sys + +from utils import (get_dataset_format, get_dataset_name, get_dataset_runperiod, + get_dataset_version, get_dataset_year) -XROOTD_URI_BASE = 'root://eospublic.cern.ch/' +XROOTD_URI_BASE = "root://eospublic.cern.ch/" -XROOTD_DIR_BASE = '/eos/opendata/' +XROOTD_DIR_BASE = "/eos/opendata/" -MCDIR_BASE = 'mc' +MCDIR_BASE = "mc" -EXPERIMENT = 'cms' +EXPERIMENT = "cms" DEBUG = True @@ -41,9 +39,13 @@ def check_datasets_in_eos_dir(datasets, eos_dir): dataset_full_names = [] for dataset in datasets: dataset_index_file_base = get_dataset_index_file_base(dataset) - if subprocess.call('ls ' + eos_dir + ' | grep -q ' + dataset_index_file_base, shell=True): - print('[ERROR] Missing EOS information, ignoring dataset ' + dataset, - file=sys.stderr) + if subprocess.call( + "ls " + eos_dir + " | grep -q " + dataset_index_file_base, shell=True + ): + print( + "[ERROR] Missing EOS information, ignoring dataset " + dataset, + file=sys.stderr, + ) else: dataset_full_names.append(dataset) @@ -52,23 +54,38 @@ def check_datasets_in_eos_dir(datasets, eos_dir): def get_dataset_index_file_base(dataset): "Return index file base for given dataset." - filebase = EXPERIMENT.upper() + '_' + \ - MCDIR_BASE + '_' + \ - get_dataset_runperiod(dataset) + '_' + \ - get_dataset_name(dataset) + '_' + \ - get_dataset_format(dataset) + '_' + \ - get_dataset_version(dataset) + filebase = ( + EXPERIMENT.upper() + + "_" + + MCDIR_BASE + + "_" + + get_dataset_runperiod(dataset) + + "_" + + get_dataset_name(dataset) + + "_" + + get_dataset_format(dataset) + + "_" + + get_dataset_version(dataset) + ) return filebase + def get_dataset_location(dataset): "Return EOS location of the dataset." - return XROOTD_DIR_BASE + \ - EXPERIMENT + '/' + \ - MCDIR_BASE + '/' + \ - get_dataset_runperiod(dataset) + '/' + \ - get_dataset_name(dataset) + '/' + \ - get_dataset_format(dataset) + '/' + \ - get_dataset_version(dataset) + return ( + XROOTD_DIR_BASE + + EXPERIMENT + + "/" + + MCDIR_BASE + + "/" + + get_dataset_runperiod(dataset) + + "/" + + get_dataset_name(dataset) + + "/" + + get_dataset_format(dataset) + + "/" + + get_dataset_version(dataset) + ) def get_dataset_volumes(dataset): @@ -76,12 +93,12 @@ def get_dataset_volumes(dataset): volumes = [] dataset_location = get_dataset_location(dataset) try: - output = subprocess.check_output('eos ls -1 ' + dataset_location, shell=True) + output = subprocess.check_output("eos ls -1 " + dataset_location, shell=True) except subprocess.CalledProcessError: return [] output = str(output.decode("utf-8")) - for line in output.split('\n'): - if line and line != 'file-indexes': + for line in output.split("\n"): + if line and line != "file-indexes": volumes.append(line) return volumes @@ -90,33 +107,39 @@ def get_dataset_volume_files(dataset, volume): "Return file list with information about name, size, location for the given dataset and volume." files = [] dataset_location = get_dataset_location(dataset) - output = subprocess.check_output('eos oldfind --size --checksum ' + dataset_location + '/' + volume, shell=True) + output = subprocess.check_output( + "eos oldfind --size --checksum " + dataset_location + "/" + volume, shell=True + ) output = str(output.decode("utf-8")) - for line in output.split('\n'): - if line and line != 'file-indexes': - match = re.match(r'^path=(.*) size=(.*) checksum=(.*)$', line) + for line in output.split("\n"): + if line and line != "file-indexes": + match = re.match(r"^path=(.*) size=(.*) checksum=(.*)$", line) if match: path, size, checksum = match.groups() - files.append({'filename': os.path.basename(path), - 'size': int(size), - 'checksum': 'adler32:' + checksum, - 'uri': XROOTD_URI_BASE + path}) + files.append( + { + "filename": os.path.basename(path), + "size": int(size), + "checksum": "adler32:" + checksum, + "uri": XROOTD_URI_BASE + path, + } + ) return files def create_index_file(filebase, files, eos_dir, style, volume_dir): "Create index file in the given style format (text, json)." - filename = filebase + '.' + style + filename = filebase + "." + style try: - fdesc = open(f"{eos_dir}/{str(volume_dir)}/{filename}", 'w') - if style == 'txt': + fdesc = open(f"{eos_dir}/{str(volume_dir)}/{filename}", "w") + if style == "txt": for afile in files: - fdesc.write(afile['uri']) - fdesc.write('\n') - elif style == 'json': + fdesc.write(afile["uri"]) + fdesc.write("\n") + elif style == "json": fdesc.write(json.dumps(files, indent=2, sort_keys=True)) - fdesc.write('\n') + fdesc.write("\n") fdesc.close() except Exception as exc: print("Error doing the file '", filename, "': ", exc) @@ -124,11 +147,21 @@ def create_index_file(filebase, files, eos_dir, style, volume_dir): return filename - def copy_index_file(dataset, volume, filename, eos_dir, volume_dir): "Copy index file filename to its final destination on EOS." dataset_location = get_dataset_location(dataset) - cmd = 'eos cp ' + eos_dir + '/' + str(volume_dir) + '/' + filename + ' ' + dataset_location + '/file-indexes/' + filename + cmd = ( + "eos cp " + + eos_dir + + "/" + + str(volume_dir) + + "/" + + filename + + " " + + dataset_location + + "/file-indexes/" + + filename + ) if DEBUG: print(cmd) else: @@ -138,20 +171,19 @@ def copy_index_file(dataset, volume, filename, eos_dir, volume_dir): def create_index_files(dataset, volume, eos_dir, volume_dir): "Create index files for the given dataset and volumes." files = get_dataset_volume_files(dataset, volume) - filebase = get_dataset_index_file_base(dataset) + '_' + \ - volume + '_' + 'file_index' + filebase = get_dataset_index_file_base(dataset) + "_" + volume + "_" + "file_index" - for output_type in ['txt', 'json']: + for output_type in ["txt", "json"]: filename = create_index_file(filebase, files, eos_dir, output_type, volume_dir) if filename: copy_index_file(dataset, volume, filename, eos_dir, volume_dir) -def main(datasets = [], eos_dir = './inputs/eos-file-indexes/'): +def main(datasets=[], eos_dir="./inputs/eos-file-indexes/"): "Do the job." volume_dir = 0 - volume_counter=0 + volume_counter = 0 if not os.path.isdir(f"{eos_dir}/{str(volume_dir)}"): os.makedirs(f"{eos_dir}/{str(volume_dir)}") @@ -159,13 +191,14 @@ def main(datasets = [], eos_dir = './inputs/eos-file-indexes/'): volumes = get_dataset_volumes(dataset) for volume in volumes: create_index_files(dataset, volume, eos_dir, volume_dir) - volume_counter+=1 - if volume_counter >999: - volume_counter =0 - volume_dir +=1 + volume_counter += 1 + if volume_counter > 999: + volume_counter = 0 + volume_dir += 1 if not os.path.isdir(f"{eos_dir}/{str(volume_dir)}"): os.makedirs(f"{eos_dir}/{str(volume_dir)}") -if __name__ == '__main__': + +if __name__ == "__main__": main() From d6e967ea311811df895cfc356bbac865d28ba653 Mon Sep 17 00:00:00 2001 From: Pablo Saiz Date: Wed, 11 Sep 2024 20:00:31 +0200 Subject: [PATCH 3/3] eos: replace system calls by xrootdpyfs --- cms-2016-simulated-datasets/code/eos_store.py | 81 +++++++++++-------- 1 file changed, 47 insertions(+), 34 deletions(-) diff --git a/cms-2016-simulated-datasets/code/eos_store.py b/cms-2016-simulated-datasets/code/eos_store.py index 677daffa9..2b853ea30 100644 --- a/cms-2016-simulated-datasets/code/eos_store.py +++ b/cms-2016-simulated-datasets/code/eos_store.py @@ -19,6 +19,8 @@ import subprocess import sys +from xrootdpyfs import XRootDPyFS + from utils import (get_dataset_format, get_dataset_name, get_dataset_runperiod, get_dataset_version, get_dataset_year) @@ -88,43 +90,42 @@ def get_dataset_location(dataset): ) -def get_dataset_volumes(dataset): +def get_dataset_volumes(fs, dataset): "Return list of volumes for the given dataset." - volumes = [] dataset_location = get_dataset_location(dataset) - try: - output = subprocess.check_output("eos ls -1 " + dataset_location, shell=True) - except subprocess.CalledProcessError: - return [] - output = str(output.decode("utf-8")) - for line in output.split("\n"): - if line and line != "file-indexes": - volumes.append(line) + + volumes = [] + for entry in fs.listdir(dataset_location, dirs_only=True): + if entry != "file-indexes": + volumes.append(entry) return volumes -def get_dataset_volume_files(dataset, volume): +def get_dataset_volume_files(fs, dataset, volume): "Return file list with information about name, size, location for the given dataset and volume." - files = [] dataset_location = get_dataset_location(dataset) - output = subprocess.check_output( - "eos oldfind --size --checksum " + dataset_location + "/" + volume, shell=True - ) - output = str(output.decode("utf-8")) - for line in output.split("\n"): - if line and line != "file-indexes": - match = re.match(r"^path=(.*) size=(.*) checksum=(.*)$", line) - if match: - path, size, checksum = match.groups() - files.append( - { - "filename": os.path.basename(path), - "size": int(size), - "checksum": "adler32:" + checksum, - "uri": XROOTD_URI_BASE + path, - } - ) - return files + + all_dirs = [f"{dataset_location}/{volume}"] + all_files = [] + + for my_dir in all_dirs: + all_dirs += fs.listdir(my_dir, dirs_only=True, absolute=True) + all_files += fs.listdir(my_dir, files_only=True, absolute=True) + + all_entries = [] + for entry in all_files: + status, stat = fs._client.stat(entry) + checksum = ":".join(fs.xrd_checksum(entry)) + all_entries.append( + { + "filename": os.path.basename(entry), + "size": stat.size, + "checksum": checksum, + "uri": XROOTD_URI_BASE + entry, + } + ) + + return all_entries def create_index_file(filebase, files, eos_dir, style, volume_dir): @@ -168,9 +169,9 @@ def copy_index_file(dataset, volume, filename, eos_dir, volume_dir): os.system(cmd) -def create_index_files(dataset, volume, eos_dir, volume_dir): +def create_index_files(fs, dataset, volume, eos_dir, volume_dir): "Create index files for the given dataset and volumes." - files = get_dataset_volume_files(dataset, volume) + files = get_dataset_volume_files(fs, dataset, volume) filebase = get_dataset_index_file_base(dataset) + "_" + volume + "_" + "file_index" for output_type in ["txt", "json"]: @@ -187,10 +188,22 @@ def main(datasets=[], eos_dir="./inputs/eos-file-indexes/"): if not os.path.isdir(f"{eos_dir}/{str(volume_dir)}"): os.makedirs(f"{eos_dir}/{str(volume_dir)}") + try: + fs = XRootDPyFS(f"{XROOTD_URI_BASE}/") + except Exception as my_exc: + print("We can't get the xrootdpyfs instance:", my_exc) + return -1 + + dataset_counter = 1 for dataset in datasets: - volumes = get_dataset_volumes(dataset) + print(f"Doing {dataset} ({dataset_counter}/{len(datasets)})") + dataset_counter += 1 + volumes = get_dataset_volumes(fs, dataset) + if not volumes: + print(f"Error with the dataset '{dataset}'!") + return -1 for volume in volumes: - create_index_files(dataset, volume, eos_dir, volume_dir) + create_index_files(fs, dataset, volume, eos_dir, volume_dir) volume_counter += 1 if volume_counter > 999: volume_counter = 0