diff --git a/cms-2016-simulated-datasets/code/eos_store.py b/cms-2016-simulated-datasets/code/eos_store.py index 0708d7aca..2b853ea30 100644 --- a/cms-2016-simulated-datasets/code/eos_store.py +++ b/cms-2016-simulated-datasets/code/eos_store.py @@ -15,22 +15,22 @@ import json import os -import sys import re import subprocess -from utils import get_dataset_name, \ - get_dataset_runperiod, \ - get_dataset_version, \ - get_dataset_format, \ - get_dataset_year +import sys + +from xrootdpyfs import XRootDPyFS -XROOTD_URI_BASE = 'root://eospublic.cern.ch/' +from utils import (get_dataset_format, get_dataset_name, get_dataset_runperiod, + get_dataset_version, get_dataset_year) -XROOTD_DIR_BASE = '/eos/opendata/' +XROOTD_URI_BASE = "root://eospublic.cern.ch/" -MCDIR_BASE = 'mc' +XROOTD_DIR_BASE = "/eos/opendata/" -EXPERIMENT = 'cms' +MCDIR_BASE = "mc" + +EXPERIMENT = "cms" DEBUG = True @@ -41,9 +41,13 @@ def check_datasets_in_eos_dir(datasets, eos_dir): dataset_full_names = [] for dataset in datasets: dataset_index_file_base = get_dataset_index_file_base(dataset) - if subprocess.call('ls ' + eos_dir + ' | grep -q ' + dataset_index_file_base, shell=True): - print('[ERROR] Missing EOS information, ignoring dataset ' + dataset, - file=sys.stderr) + if subprocess.call( + "ls " + eos_dir + " | grep -q " + dataset_index_file_base, shell=True + ): + print( + "[ERROR] Missing EOS information, ignoring dataset " + dataset, + file=sys.stderr, + ) else: dataset_full_names.append(dataset) @@ -52,106 +56,162 @@ def check_datasets_in_eos_dir(datasets, eos_dir): def get_dataset_index_file_base(dataset): "Return index file base for given dataset." - filebase = EXPERIMENT.upper() + '_' + \ - MCDIR_BASE + '_' + \ - get_dataset_runperiod(dataset) + '_' + \ - get_dataset_name(dataset) + '_' + \ - get_dataset_format(dataset) + '_' + \ - get_dataset_version(dataset) + filebase = ( + EXPERIMENT.upper() + + "_" + + MCDIR_BASE + + "_" + + get_dataset_runperiod(dataset) + + "_" + + get_dataset_name(dataset) + + "_" + + get_dataset_format(dataset) + + "_" + + get_dataset_version(dataset) + ) return filebase + def get_dataset_location(dataset): "Return EOS location of the dataset." - return XROOTD_DIR_BASE + \ - EXPERIMENT + '/' + \ - MCDIR_BASE + '/' + \ - get_dataset_runperiod(dataset) + '/' + \ - get_dataset_name(dataset) + '/' + \ - get_dataset_format(dataset) + '/' + \ - get_dataset_version(dataset) - - -def get_dataset_volumes(dataset): + return ( + XROOTD_DIR_BASE + + EXPERIMENT + + "/" + + MCDIR_BASE + + "/" + + get_dataset_runperiod(dataset) + + "/" + + get_dataset_name(dataset) + + "/" + + get_dataset_format(dataset) + + "/" + + get_dataset_version(dataset) + ) + + +def get_dataset_volumes(fs, dataset): "Return list of volumes for the given dataset." - volumes = [] dataset_location = get_dataset_location(dataset) - try: - output = subprocess.check_output('eos ls -1 ' + dataset_location, shell=True) - except subprocess.CalledProcessError: - return [] - output = str(output.decode("utf-8")) - for line in output.split('\n'): - if line and line != 'file-indexes': - volumes.append(line) + + volumes = [] + for entry in fs.listdir(dataset_location, dirs_only=True): + if entry != "file-indexes": + volumes.append(entry) return volumes -def get_dataset_volume_files(dataset, volume): +def get_dataset_volume_files(fs, dataset, volume): "Return file list with information about name, size, location for the given dataset and volume." - files = [] dataset_location = get_dataset_location(dataset) - output = subprocess.check_output('eos oldfind --size --checksum ' + dataset_location + '/' + volume, shell=True) - output = str(output.decode("utf-8")) - for line in output.split('\n'): - if line and line != 'file-indexes': - match = re.match(r'^path=(.*) size=(.*) checksum=(.*)$', line) - if match: - path, size, checksum = match.groups() - files.append({'filename': os.path.basename(path), - 'size': int(size), - 'checksum': 'adler32:' + checksum, - 'uri': XROOTD_URI_BASE + path}) - return files - - -def create_index_file(dataset, volume, files, eos_dir, style='txt'): + + all_dirs = [f"{dataset_location}/{volume}"] + all_files = [] + + for my_dir in all_dirs: + all_dirs += fs.listdir(my_dir, dirs_only=True, absolute=True) + all_files += fs.listdir(my_dir, files_only=True, absolute=True) + + all_entries = [] + for entry in all_files: + status, stat = fs._client.stat(entry) + checksum = ":".join(fs.xrd_checksum(entry)) + all_entries.append( + { + "filename": os.path.basename(entry), + "size": stat.size, + "checksum": checksum, + "uri": XROOTD_URI_BASE + entry, + } + ) + + return all_entries + + +def create_index_file(filebase, files, eos_dir, style, volume_dir): "Create index file in the given style format (text, json)." - filebase = get_dataset_index_file_base(dataset) + '_' + \ - volume + '_' + \ - 'file_index' - filename = filebase + '.' + style - fdesc = open(eos_dir + '/' + filename, 'w') - if style == 'txt': - for afile in files: - fdesc.write(afile['uri']) - fdesc.write('\n') - elif style == 'json': - fdesc.write(json.dumps(files, indent=2, sort_keys=True)) - fdesc.write('\n') - fdesc.close() + + filename = filebase + "." + style + try: + fdesc = open(f"{eos_dir}/{str(volume_dir)}/{filename}", "w") + if style == "txt": + for afile in files: + fdesc.write(afile["uri"]) + fdesc.write("\n") + elif style == "json": + fdesc.write(json.dumps(files, indent=2, sort_keys=True)) + fdesc.write("\n") + fdesc.close() + except Exception as exc: + print("Error doing the file '", filename, "': ", exc) + return None return filename -def copy_index_file(dataset, volume, filename, eos_dir): +def copy_index_file(dataset, volume, filename, eos_dir, volume_dir): "Copy index file filename to its final destination on EOS." dataset_location = get_dataset_location(dataset) - cmd = 'eos cp ' + eos_dir + '/' + filename + ' ' + dataset_location + '/file-indexes/' + filename + cmd = ( + "eos cp " + + eos_dir + + "/" + + str(volume_dir) + + "/" + + filename + + " " + + dataset_location + + "/file-indexes/" + + filename + ) if DEBUG: print(cmd) else: os.system(cmd) -def create_index_files(dataset, volume, eos_dir): +def create_index_files(fs, dataset, volume, eos_dir, volume_dir): "Create index files for the given dataset and volumes." - files = get_dataset_volume_files(dataset, volume) - filename = create_index_file(dataset, volume, files, eos_dir, 'txt') - copy_index_file(dataset, volume, filename, eos_dir) - filename = create_index_file(dataset, volume, files, eos_dir, 'json') - copy_index_file(dataset, volume, filename, eos_dir) + files = get_dataset_volume_files(fs, dataset, volume) + filebase = get_dataset_index_file_base(dataset) + "_" + volume + "_" + "file_index" + + for output_type in ["txt", "json"]: + filename = create_index_file(filebase, files, eos_dir, output_type, volume_dir) + if filename: + copy_index_file(dataset, volume, filename, eos_dir, volume_dir) -def main(datasets = [], eos_dir = './inputs/eos-file-indexes/'): +def main(datasets=[], eos_dir="./inputs/eos-file-indexes/"): "Do the job." - if not os.path.exists(eos_dir): - os.makedirs(eos_dir) + volume_dir = 0 + volume_counter = 0 + if not os.path.isdir(f"{eos_dir}/{str(volume_dir)}"): + os.makedirs(f"{eos_dir}/{str(volume_dir)}") + try: + fs = XRootDPyFS(f"{XROOTD_URI_BASE}/") + except Exception as my_exc: + print("We can't get the xrootdpyfs instance:", my_exc) + return -1 + + dataset_counter = 1 for dataset in datasets: - volumes = get_dataset_volumes(dataset) + print(f"Doing {dataset} ({dataset_counter}/{len(datasets)})") + dataset_counter += 1 + volumes = get_dataset_volumes(fs, dataset) + if not volumes: + print(f"Error with the dataset '{dataset}'!") + return -1 for volume in volumes: - create_index_files(dataset, volume, eos_dir) + create_index_files(fs, dataset, volume, eos_dir, volume_dir) + volume_counter += 1 + if volume_counter > 999: + volume_counter = 0 + volume_dir += 1 + + if not os.path.isdir(f"{eos_dir}/{str(volume_dir)}"): + os.makedirs(f"{eos_dir}/{str(volume_dir)}") -if __name__ == '__main__': +if __name__ == "__main__": main()