diff --git a/noxfile.py b/noxfile.py index 6f1427f..cbbcd74 100644 --- a/noxfile.py +++ b/noxfile.py @@ -2,23 +2,27 @@ python_versions = ["3.11", "3.10"] + @nox.session(python=python_versions) def test(session): session.install(".") session.install("pytest") session.run("pytest") + @nox.session(python=python_versions) def lint(session): session.install("flake8") session.run("flake8", "src", "tests", "noxfile.py") + @nox.session(python=python_versions[0]) def format(session): session.install("black", "isort") session.run("black", "src", "tests") session.run("isort", "src", "tests") + @nox.session(python=python_versions[0]) def types(session): session.install(".") diff --git a/src/digarch_scripts/lint/lint_ft.py b/src/digarch_scripts/lint/lint_ft.py index 258adbc..21420e9 100644 --- a/src/digarch_scripts/lint/lint_ft.py +++ b/src/digarch_scripts/lint/lint_ft.py @@ -7,6 +7,7 @@ LOGGER = logging.getLogger(__name__) + def _configure_logging(log_folder: Path): log_fn = datetime.now().strftime("lint_%Y_%m_%d_%H_%M.log") log_fpath = log_folder / log_fn @@ -21,15 +22,14 @@ def _configure_logging(log_folder: Path): encoding="utf-8", ) + def parse_args() -> argparse.Namespace: """Validate and return command-line args""" def extant_dir(p): path = Path(p) if not path.is_dir(): - raise argparse.ArgumentTypeError( - f'{path} does not exist' - ) + raise argparse.ArgumentTypeError(f"{path} does not exist") return path def list_of_paths(p): @@ -43,28 +43,21 @@ def list_of_paths(p): parser = argparse.ArgumentParser() parser.add_argument( - '--package', - type=extant_dir, - nargs='+', - dest='packages', - action='extend' + "--package", type=extant_dir, nargs="+", dest="packages", action="extend" ) parser.add_argument( - '--directory', - type=list_of_paths, - dest='packages', - action='extend' + "--directory", type=list_of_paths, dest="packages", action="extend" ) parser.add_argument( - '--log_folder', - help='''Optional. Designate where to save the log file, - or it will be saved in current directory''', - default='.' + "--log_folder", + help="""Optional. Designate where to save the log file, + or it will be saved in current directory""", + default=".", ) - return parser.parse_args() + def package_has_valid_name(package: Path) -> bool: """Top level folder name has to conform to ACQ_####_######""" folder_name = package.name @@ -76,15 +69,17 @@ def package_has_valid_name(package: Path) -> bool: LOGGER.error(f"{folder_name} does not conform to ACQ_####_######") return False + def package_has_two_subfolders(package: Path) -> bool: """There must be two subfolders in the package""" - pkg_folders = [ x for x in package.iterdir() if x.is_dir() ] + pkg_folders = [x for x in package.iterdir() if x.is_dir()] if len(pkg_folders) == 2: return True else: LOGGER.error(f"{package} does not have exactly two subfolders") return False + def package_has_valid_subfolder_names(package: Path) -> bool: """Second level folders must be objects and metadata folder""" expected = set(["objects", "metadata"]) @@ -98,6 +93,7 @@ def package_has_valid_subfolder_names(package: Path) -> bool: ) return False + def package_has_no_hidden_file(package: Path) -> bool: """The package should not have any hidden file""" hidden_ls = [ @@ -111,10 +107,11 @@ def package_has_no_hidden_file(package: Path) -> bool: else: return True + def package_has_no_zero_bytes_file(package: Path) -> bool: """The package should not have any zero bytes file""" - all_file = [ f for f in package.rglob("*") if f.is_file() ] - zero_bytes_ls = [ f for f in all_file if f.stat().st_size == 0 ] + all_file = [f for f in package.rglob("*") if f.is_file()] + zero_bytes_ls = [f for f in all_file if f.stat().st_size == 0] if zero_bytes_ls: LOGGER.warning(f"{package.name} has zero bytes file {zero_bytes_ls}") @@ -122,6 +119,7 @@ def package_has_no_zero_bytes_file(package: Path) -> bool: else: return True + def metadata_folder_is_flat(package: Path) -> bool: """The metadata folder should not have folder structure""" metadata_path = package / "metadata" @@ -132,40 +130,49 @@ def metadata_folder_is_flat(package: Path) -> bool: else: return True + def metadata_folder_has_files(package: Path) -> bool: """The metadata folder should have one or more file""" metadata_path = package / "metadata" - md_files_ls = [ x for x in metadata_path.rglob("*") if x.is_file() ] + md_files_ls = [x for x in metadata_path.rglob("*") if x.is_file()] if md_files_ls: return True else: LOGGER.warning(f"{package.name} metadata folder does not have any files") return False + def metadata_has_correct_naming_convention(package: Path) -> bool: """The metadata file name should be in the accepted list""" metadata_path = package / "metadata" accepted_fn = ["rclone.log"] - md_files_ls = [ x for x in metadata_path.rglob("*") if x.is_file() ] + md_files_ls = [x for x in metadata_path.rglob("*") if x.is_file()] nonconforming = [] for file in md_files_ls: - if not file.name in accepted_fn: + if file.name not in accepted_fn: nonconforming.append(file) if nonconforming: - LOGGER.error(f"""{package.name} has nonconforming metadata file(s): - {nonconforming}""") + LOGGER.error( + f"""{package.name} has nonconforming metadata file(s): + {nonconforming}""" + ) return False else: return True + def objects_folder_correct_structure(package: Path) -> bool: """objects folder should have a data folder, which includes four files: bag-info.txt, bagit.txt, manifest-md5.txt and tagmanifest-md5.txt""" expected_paths = [] - expected_files = ["bag-info.txt", "bagit.txt", - "manifest-md5.txt", "tagmanifest-md5.txt"] + expected_files = [ + "bag-info.txt", + "bagit.txt", + "manifest-md5.txt", + "tagmanifest-md5.txt", + ] missing = [] data_folder = package / "objects" / "data" @@ -180,16 +187,19 @@ def objects_folder_correct_structure(package: Path) -> bool: missing.append(fp.name) if missing: - LOGGER.error(f"""{package.name} has incorrect structure. - missing {missing}""") + LOGGER.error( + f"""{package.name} has incorrect structure. + missing {missing}""" + ) return False else: return True + def objects_folder_has_no_empty_folder(package: Path) -> bool: """The objects folder should not have any empty folders""" objects_path = package / "objects" - folder_in_obj = [ x for x in objects_path.rglob("*") if x.is_dir() ] + folder_in_obj = [x for x in objects_path.rglob("*") if x.is_dir()] empty = [] for folder in folder_in_obj: @@ -202,6 +212,7 @@ def objects_folder_has_no_empty_folder(package: Path) -> bool: else: return True + def lint_package(package: Path) -> Literal["valide", "invalide", "needs review"]: """Run all linting tests against a package""" result = "valid" @@ -209,7 +220,7 @@ def lint_package(package: Path) -> Literal["valide", "invalide", "needs review"] less_strict_tests = [ package_has_no_hidden_file, package_has_no_zero_bytes_file, - metadata_folder_has_files + metadata_folder_has_files, ] for test in less_strict_tests: @@ -223,7 +234,7 @@ def lint_package(package: Path) -> Literal["valide", "invalide", "needs review"] metadata_folder_is_flat, metadata_has_correct_naming_convention, objects_folder_correct_structure, - objects_folder_has_no_empty_folder + objects_folder_has_no_empty_folder, ] for test in strict_tests: @@ -232,6 +243,7 @@ def lint_package(package: Path) -> Literal["valide", "invalide", "needs review"] return result + def main(): args = parse_args() _configure_logging(args.log_folder) @@ -266,7 +278,9 @@ def main(): print( f""" The following {len(needs_review)} packages need review. - They may be passed without change after review: {needs_review}""") + They may be passed without change after review: {needs_review}""" + ) + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/src/digarch_scripts/package/package_cloud.py b/src/digarch_scripts/package/package_cloud.py index 23e033a..0618fc5 100644 --- a/src/digarch_scripts/package/package_cloud.py +++ b/src/digarch_scripts/package/package_cloud.py @@ -1,9 +1,9 @@ import argparse -from datetime import date import logging import os -from pathlib import Path import re +from datetime import date +from pathlib import Path import bagit @@ -35,6 +35,7 @@ def digital_carrier_label(id: str) -> Path: return parser.parse_args() + def create_base_dir(dest: Path, id: str) -> Path: acq_id = id.rsplit("_", 1)[0] package_base = dest / acq_id / id @@ -49,6 +50,7 @@ def create_base_dir(dest: Path, id: str) -> Path: raise PermissionError(f"{dest} is not writable") return package_base + def move_metadata_file(md_path: Path, pkg_dir: Path) -> None: md_dir = pkg_dir / "metadata" if not md_dir.exists(): @@ -61,6 +63,7 @@ def move_metadata_file(md_path: Path, pkg_dir: Path) -> None: md_path.rename(new_md_path) return None + def create_bag_in_objects(payload_path: Path, md5_path: Path, pkg_dir: Path) -> None: bag_dir = pkg_dir / "objects" bag_dir.mkdir() @@ -70,10 +73,11 @@ def create_bag_in_objects(payload_path: Path, md5_path: Path, pkg_dir: Path) -> create_bag_tag_files(bag_dir) return None + def move_payload(payload_path: Path, bag_dir: Path) -> None: - #instantiate a var for objects dir + # instantiate a var for objects dir payload_dir = bag_dir / "data" - #if the object folder does not exist create it + # if the object folder does not exist create it if not payload_dir.exists(): payload_dir.mkdir(parents=True) else: @@ -81,15 +85,16 @@ def move_payload(payload_path: Path, bag_dir: Path) -> None: for a_file in payload_path.iterdir(): new_ob_path = payload_dir / a_file.name - #if a payload file is already in the object directory do not move, raise error + # if a payload file is already in the object directory do not move, raise error if new_ob_path.exists(): - raise FileExistsError(f"{new_ob_path} already exists. Not moving.") + raise FileExistsError(f"{new_ob_path} already exists. Not moving.") a_file.rename(new_ob_path) return None + def convert_to_bagit_manifest(md5_path: Path, bag_dir: Path) -> None: - #check for manifest + # check for manifest new_md5_path = bag_dir / "manifest-md5.txt" if new_md5_path.exists(): raise FileExistsError("manifest-md5.txt already exists, review package") @@ -97,17 +102,16 @@ def convert_to_bagit_manifest(md5_path: Path, bag_dir: Path) -> None: with open(md5_path, "r") as f: manifest_data = f.readlines() - updated_manifest = [ - line.replace(" ", " data/") for line in manifest_data - ] - #re-writes the manifest lines + updated_manifest = [line.replace(" ", " data/") for line in manifest_data] + # re-writes the manifest lines with open(md5_path, "w") as f: f.writelines(updated_manifest) - #move md5 file to manifest-md5.txt in bag + # move md5 file to manifest-md5.txt in bag md5_path.rename(new_md5_path) return None + def create_bag_tag_files(bag_dir: Path): txt = """BagIt-Version: 0.97\nTag-File-Character-Encoding: UTF-8\n""" with open(bag_dir / "bagit.txt", "w") as bagit_file: @@ -125,7 +129,7 @@ def get_oxum(payload_dir: Path) -> (int, int): total_bytes = 0 total_files = 0 - for payload_file in payload_dir.rglob('*'): + for payload_file in payload_dir.rglob("*"): if payload_file.is_file(): total_files += 1 total_bytes += os.stat(payload_file).st_size @@ -152,5 +156,6 @@ def main(): create_bag_in_objects(args.payload, args.md5, base_dir) validate_bag_in_payload(base_dir) + if __name__ == "__main__": main() diff --git a/src/digarch_scripts/report/report_ftk_extents.py b/src/digarch_scripts/report/report_ftk_extents.py index 3401650..04ba198 100644 --- a/src/digarch_scripts/report/report_ftk_extents.py +++ b/src/digarch_scripts/report/report_ftk_extents.py @@ -1,34 +1,33 @@ -from lxml import etree -import json -import re import argparse +import json +import logging import os import pathlib -import logging +import re + +from lxml import etree LOGGER = logging.getLogger(__name__) # Namespace for the FTK output XML -FO_NAMESPACE = {'fo': 'http://www.w3.org/1999/XSL/Format'} +FO_NAMESPACE = {"fo": "http://www.w3.org/1999/XSL/Format"} def _make_parser(): def validate_file_input(f) -> pathlib.Path: - ''' + """ Ensure the input file exists - ''' + """ path = pathlib.Path(f) if not path.exists(): - raise argparse.ArgumentTypeError( - f'Directory or file does not exist: {f}' - ) + raise argparse.ArgumentTypeError(f"Directory or file does not exist: {f}") - if not path.suffix.lower() in ['.xml', '.fo']: + if not path.suffix.lower() in [".xml", ".fo"]: raise argparse.ArgumentTypeError( - 'Not a valid file type. Expect .xml or .fo' + "Not a valid file type. Expect .xml or .fo" ) return path @@ -38,47 +37,42 @@ def validate_output_dir(f) -> pathlib.Path: path = pathlib.Path(f) if not path.exists(): - raise argparse.ArgumentTypeError( - f'Output directory does not exist: {f}' - ) + raise argparse.ArgumentTypeError(f"Output directory does not exist: {f}") return path - parser = argparse.ArgumentParser( - description='Create a JSON report from XML' - ) + parser = argparse.ArgumentParser(description="Create a JSON report from XML") parser.add_argument( - '-f', '--file', + "-f", + "--file", help="path to FTK XML report", type=validate_file_input, - required=True + required=True, ) parser.add_argument( - '-o', '--output', + "-o", + "--output", help="destination directory", type=validate_output_dir, - required=True + required=True, ) return parser.parse_args() -def create_er_list( - tree: etree.ElementTree -) -> list[list[str, str]]: - - ''' +def create_er_list(tree: etree.ElementTree) -> list[list[str, str]]: + """ This transforms the table of contents into a list of lists where each list item has the hierarchy of titles and a reference-id. This list is the intermediate data structure used to build the nested dict. The function returns the entire list. - ''' + """ tree = tree.xpath( '/fo:root/fo:page-sequence[@master-reference="TOC"]/fo:flow', - namespaces=FO_NAMESPACE + namespaces=FO_NAMESPACE, )[0] ers = [] @@ -89,27 +83,23 @@ def create_er_list( continue indent = int(child.get("start-indent").split(sep="pt")[0]) - level = (indent//12) - 2 + level = (indent // 12) - 2 if level >= 0: # build a list of parents based on level if level <= len(hierarchy) - 1: hierarchy = hierarchy[:level] elif level > len(hierarchy) + 1: - raise ValueError( - f'Unexpected jump in hierarchy at {child.text}' - ) + raise ValueError(f"Unexpected jump in hierarchy at {child.text}") hierarchy.append(child.text) # only record if entry is an ER possible_ref = child.xpath( - 'fo:basic-link/fo:page-number-citation', namespaces=FO_NAMESPACE + "fo:basic-link/fo:page-number-citation", namespaces=FO_NAMESPACE ) - if possible_ref and hierarchy[-1].startswith('ER'): - refid = possible_ref[0].get('ref-id') - ers.append( - ['/'.join(hierarchy.copy()), refid, hierarchy[-1]] - ) + if possible_ref and hierarchy[-1].startswith("ER"): + refid = possible_ref[0].get("ref-id") + ers.append(["/".join(hierarchy.copy()), refid, hierarchy[-1]]) audit_ers(ers) @@ -119,7 +109,7 @@ def create_er_list( def audit_ers(ers: list[list[str, str, str]]) -> None: er_numbers_used = {} for er in ers: - number = re.match(r'ER (\d+):', er[2]) + number = re.match(r"ER (\d+):", er[2]) er_number = int(number[1]) if er_number not in er_numbers_used.keys(): er_numbers_used[er_number] = [er[2]] @@ -132,79 +122,92 @@ def audit_ers(ers: list[list[str, str, str]]) -> None: for i in range(er_min, er_max): if i not in er_numbers_used.keys(): LOGGER.warning( - f'Collection uses ER {er_min} to ER {er_max}. ER {i} is skipped. Review the ERs with the processing archivist' + ( + f"Collection uses ER {er_min} to ER {er_max}. ER {i} is skipped. " + "Review the ERs with the processing archivist" + ) ) # test for duplicate ers for er_number, er_names in er_numbers_used.items(): if len(er_names) > 1: LOGGER.warning( - f'ER {er_number} is used multiple times: {", ".join(er_names)}. Review the ERs with the processing archivist' + ( + f'ER {er_number} is used multiple times: {", ".join(er_names)}. ' + "Review the ERs with the processing archivist" + ) ) return None -def transform_bookmark_tables( - tree: etree.ElementTree -) -> list[dict]: - - ''' +def transform_bookmark_tables(tree: etree.ElementTree) -> list[dict]: + """ transforms each row in the 'bookmarksPage' table into a string. this string contains all the extent information that will be summarized later. the return is a list of lists where the first item is the id with the prefix bk and the second item is a string serialized from the XML. - ''' + """ extent_tree = tree.xpath( - '/fo:root/fo:page-sequence[@master-reference="bookmarksPage"]/fo:flow/fo:table[@id]', - namespaces=FO_NAMESPACE + ( + '/fo:root/fo:page-sequence[@master-reference="bookmarksPage"]/fo:flow/' + "fo:table[@id]" + ), + namespaces=FO_NAMESPACE, ) bookmark_contents = [] for row in extent_tree: # row is an /fo:row in /fo:table[@id] file_table = row.xpath( - './fo:table-body/fo:table-row/fo:table-cell/fo:block', - namespaces=FO_NAMESPACE + "./fo:table-body/fo:table-row/fo:table-cell/fo:block", + namespaces=FO_NAMESPACE, ) file_dict = { file_table[i].text: file_table[i + 1].text for i in range(0, len(file_table), 2) } - file_dict['file_id'] = row.get('id') - file_dict['bookmark_id'] = row.get('id').split('_')[0] + file_dict["file_id"] = row.get("id") + file_dict["bookmark_id"] = row.get("id").split("_")[0] bookmark_contents.append(file_dict) return bookmark_contents def add_extents_to_ers( - er_list: list[list[str, str]], - bookmark_tables: list[dict] + er_list: list[list[str, str]], bookmark_tables: list[dict] ) -> list[list[str, int, int]]: - - ''' + """ summarizes the extent for each ER by correlating the table of contents with the bookmark tables. Returns list of lists with hierarchal ER string, file size, and file count. - ''' + """ ers_with_extents = [] for er in er_list: bookmark_id = er[1] - er_name = er[0].split('/')[-1] + er_name = er[0].split("/")[-1] size, count = get_er_report(bookmark_tables, bookmark_id, er_name) if count == 0: LOGGER.warning( - f'{er_name} does not contain any files. It will be omitted from the report.') + ( + f"{er_name} does not contain any files. " + "It will be omitted from the report." + ) + ) continue if size == 0: LOGGER.warning( - f'{er_name} contains no files with bytes. This ER is omitted from report. Review this ER with the processing archivist.') + ( + f"{er_name} contains no files with bytes. " + "This ER is omitted from report. " + "Review this ER with the processing archivist." + ) + ) continue ers_with_extents.append([er[0], size, count]) @@ -213,34 +216,34 @@ def add_extents_to_ers( def get_er_report( - er_files: list[dict], - bookmark_id: str, - er_name: str + er_files: list[dict], bookmark_id: str, er_name: str ) -> tuple([int, int]): - - ''' + """ extract the total file size and file count for a given bookmark ID Returns a tuple with the file size and file count. - ''' + """ size = 0 count = 0 - prefix = bookmark_id.replace('k', 'f') + prefix = bookmark_id.replace("k", "f") for entry in er_files: - if entry['bookmark_id'] == prefix: + if entry["bookmark_id"] == prefix: - byte_string = entry['Logical Size'] - bytes = re.findall(r'(\d+)\sB', byte_string) + byte_string = entry["Logical Size"] + bytes = re.findall(r"(\d+)\sB", byte_string) if bytes: count += 1 file_size = int(bytes[0]) if file_size == 0: - file_name = entry['Name'] - #extract file name, might have to parse file table better + file_name = entry["Name"] + # extract file name, might have to parse file table better LOGGER.warning( - f'{er_name} contains the following 0-byte file: {file_name}. Review this file with the processing archivist.') + (f"{er_name} contains the following 0-byte file: {file_name}. " + "Review this file with the processing archivist." + ) + ) size += file_size else: @@ -249,93 +252,87 @@ def get_er_report( return size, count -def create_report( - input: list[str, int, int], - report: dict -) -> dict: - - ''' +def create_report(input: list[str, int, int], report: dict) -> dict: + """ recursive function to insert a given bookmark into a nested dictionary based on the hierarchy of component titles. Returns a nested dictionary - ''' - - if not '/' in input[0]: - number, name = input[0].split(':', maxsplit=1) - report['children'].append({ - 'title': input[0], - 'er_number': number, - 'er_name': name.strip(), - 'file_size': input[1], - 'file_count': input[2] - }) + """ + + if "/" not in input[0]: + number, name = input[0].split(":", maxsplit=1) + report["children"].append( + { + "title": input[0], + "er_number": number, + "er_name": name.strip(), + "file_size": input[1], + "file_count": input[2], + } + ) else: - parent, child = input[0].split('/', maxsplit=1) + parent, child = input[0].split("/", maxsplit=1) input[0] = child - for item in report['children']: - if item['title'] == parent: + for item in report["children"]: + if item["title"] == parent: item = create_report(input, item) return report - report['children'].append( - create_report(input, {'title': parent, 'children': []}) + report["children"].append( + create_report(input, {"title": parent, "children": []}) ) return report -def extract_collection_title( - tree: etree.ElementTree - ) -> str: + +def extract_collection_title(tree: etree.ElementTree) -> str: case_info = tree.xpath( - '/fo:root/fo:page-sequence[@master-reference="caseInfoPage"]/fo:flow/fo:table'\ - '/fo:table-body/fo:table-row/fo:table-cell/fo:block/text()', - namespaces=FO_NAMESPACE + '/fo:root/fo:page-sequence[@master-reference="caseInfoPage"]/fo:flow/fo:table' + "/fo:table-body/fo:table-row/fo:table-cell/fo:block/text()", + namespaces=FO_NAMESPACE, ) for i, txt in enumerate(case_info): if txt == "Case Name": - collname = case_info[i+1] + collname = case_info[i + 1] return collname -def make_json( - destination: pathlib.Path, - report: dict, - collname -) -> None: - ''' +def make_json(destination: pathlib.Path, report: dict, collname) -> None: + """ creates a json file with the name of the collection as the file name destination is the file path from args parse and report is the collection style dict - ''' + """ name = collname name = name.replace(" ", "_") - with open(os.path.join(destination, f'{name}.json'), 'w') as file: + with open(os.path.join(destination, f"{name}.json"), "w") as file: json.dump(report, file) def main() -> None: args = _make_parser() - print('Parsing XML ...') + print("Parsing XML ...") tree = etree.parse(args.file) - print('Creating report ...') + print("Creating report ...") ers = create_er_list(tree) bookmark_tables = transform_bookmark_tables(tree) ers_with_extents = add_extents_to_ers(ers, bookmark_tables) colltitle = extract_collection_title(tree) - dct = {'title': colltitle, 'children': []} + dct = {"title": colltitle, "children": []} for er in ers_with_extents: dct = create_report(er, dct) print("Writing report ...") make_json(args.output, dct, colltitle) -if __name__ == '__main__': + +if __name__ == "__main__": main() diff --git a/src/digarch_scripts/report/report_hdd_extents.py b/src/digarch_scripts/report/report_hdd_extents.py index 7916089..88c3b6d 100644 --- a/src/digarch_scripts/report/report_hdd_extents.py +++ b/src/digarch_scripts/report/report_hdd_extents.py @@ -1,26 +1,22 @@ import argparse -import os import json -import pathlib import logging +import os +import pathlib import re + LOGGER = logging.getLogger(__name__) + def parse_args(): parser = argparse.ArgumentParser() - def validate_dir( - d: str - ) -> pathlib.Path: + def validate_dir(d: str) -> pathlib.Path: path = pathlib.Path(d) if not path.exists(): - raise argparse.ArgumentTypeError( - f'Specified directory does not exist: {d}' - ) + raise argparse.ArgumentTypeError(f"Specified directory does not exist: {d}") if not path.is_dir(): - raise argparse.ArgumentTypeError( - f'Specified path is not a directory: {d}' - ) + raise argparse.ArgumentTypeError(f"Specified path is not a directory: {d}") return path @@ -29,35 +25,33 @@ def validate_output_dir(f) -> pathlib.Path: path = pathlib.Path(f) if not path.exists(): - raise argparse.ArgumentTypeError( - f'Output directory does not exist: {f}' - ) + raise argparse.ArgumentTypeError(f"Output directory does not exist: {f}") return path parser.add_argument( - "-d", "--dir", + "-d", + "--dir", type=validate_dir, help="Path to the parent directory, e.g. M###_FAComponents", - required = True + required=True, ) parser.add_argument( - '-o', '--output', + "-o", + "--output", help="report destination directory", type=validate_output_dir, - required=True + required=True, ) return parser.parse_args() -def get_ers( - facomponent_dir: pathlib.Path -) -> list[str, int, int, str]: +def get_ers(facomponent_dir: pathlib.Path) -> list[str, int, int, str]: ers = [] - for possible_er in facomponent_dir.glob('**/ER *'): - objects_dir = possible_er.joinpath('objects') + for possible_er in facomponent_dir.glob("**/ER *"): + objects_dir = possible_er.joinpath("objects") if possible_er.is_dir(): if objects_dir.is_dir(): er = possible_er.relative_to(facomponent_dir) @@ -69,39 +63,63 @@ def get_ers( fp = os.path.join(path, f) if os.path.getsize(fp) == 0: LOGGER.warning( - f'{possible_er.name} contains the following 0-byte file: {f}. Review this file with the processing archivist.') + ( + f"{possible_er.name} " + "contains the following 0-byte file: {f}. " + "Review this file with the processing archivist." + ) + ) size += os.path.getsize(fp) else: LOGGER.warning( - f'{possible_er.name} does not contain an object folder. It will be omitted from the report.') + ( + f"{possible_er.name} does not contain an object folder. " + "It will be omitted from the report." + ) + ) continue if count == 0: LOGGER.warning( - f'{possible_er.name} does not contain any files. It will be omitted from the report.') + ( + f"{possible_er.name} does not contain any files. " + "It will be omitted from the report." + ) + ) continue if size == 0: LOGGER.warning( - f'{possible_er.name} contains no files with bytes. This ER is omitted from report. Review this ER with the processing archivist.') + ( + f"{possible_er.name} contains no files with bytes. " + "This ER is omitted from report. " + "Review this ER with the processing archivist." + ) + ) continue ers.append([str(er), size, count, possible_er.name]) return ers + def extract_collection_title(hdd_dir: pathlib.Path) -> str: for item in hdd_dir.iterdir(): - if re.match(r'M\d+\_FAcomponents', item.name): + if re.match(r"M\d+\_FAcomponents", item.name): return item.name else: LOGGER.warning( - 'Cannot find CollectionID_FAcomponents directory. Please use CollectionID_FAcomponents naming convention for the directory containing all ERs.' + ( + "Cannot find CollectionID_FAcomponents directory. " + "Please use CollectionID_FAcomponents naming convention " + "for the directory containing all ERs." + ) ) + def audit_ers(ers: list[list[str, str, str]]) -> None: er_numbers_used = {} for er in ers: - number = re.match(r'ER (\d+)', er[3]) + number = re.match(r"ER (\d+)", er[3]) er_number = int(number[1]) - if not er_number in er_numbers_used.keys(): + if er_number not in er_numbers_used.keys(): er_numbers_used[er_number] = [er[3]] else: er_numbers_used[er_number].append(er[3]) @@ -112,79 +130,79 @@ def audit_ers(ers: list[list[str, str, str]]) -> None: for i in range(er_min, er_max): if i not in er_numbers_used.keys(): LOGGER.warning( - f'Collection uses ER {er_min} to ER {er_max}. ER {i} is skipped. Review the ERs with the processing archivist' + ( + f"Collection uses ER {er_min} to ER {er_max}. ER {i} is skipped. " + "Review the ERs with the processing archivist" + ) ) # test for duplicate ers for er_number, er_names in er_numbers_used.items(): if len(er_names) > 1: LOGGER.warning( - f'ER {er_number} is used multiple times: {", ".join(er_names)}. Review the ERs with the processing archivist' + ( + f'ER {er_number} is used multiple times: {", ".join(er_names)}. ' + "Review the ERs with the processing archivist" + ) ) return None -def create_report( - input: list[list[str, int, int]], - report: dict -) -> dict: +def create_report(input: list[list[str, int, int]], report: dict) -> dict: for er in input: report = process_item(er, report) return report -def process_item( - input: list[str, int, int], - report: dict -) -> dict: - if not '/' in input[0]: - parts = re.match(r'(ER \d+)\s(.*)', input[0]) - report['children'].append({ - 'title': input[0], - 'er_number': parts.group(1), - 'er_name': parts.group(2), - 'file_size': input[1], - 'file_count': input[2] - }) +def process_item(input: list[str, int, int], report: dict) -> dict: + if "/" not in input[0]: + parts = re.match(r"(ER \d+)\s(.*)", input[0]) + report["children"].append( + { + "title": input[0], + "er_number": parts.group(1), + "er_name": parts.group(2), + "file_size": input[1], + "file_count": input[2], + } + ) else: - parent, child = input[0].split('/', maxsplit=1) + parent, child = input[0].split("/", maxsplit=1) input[0] = child - for item in report['children']: - if item['title'] == parent: + for item in report["children"]: + if item["title"] == parent: item = process_item(input, item) return report - report['children'].append( - process_item(input, {'title': parent, 'children': []}) + report["children"].append( + process_item(input, {"title": parent, "children": []}) ) return report -def write_report( - report: dict, - dest: pathlib.Path -) -> None: - with open(dest, 'w') as f: + +def write_report(report: dict, dest: pathlib.Path) -> None: + with open(dest, "w") as f: json.dump(report, f) + def main(): args = parse_args() - LOGGER.info('retrieving ER folder paths') + LOGGER.info("retrieving ER folder paths") ers = get_ers(args.dir) - LOGGER.info('creating report') + LOGGER.info("creating report") colltitle = extract_collection_title(args.dir) - stub_report = {'title': colltitle, 'children': []} + stub_report = {"title": colltitle, "children": []} full_report = create_report(ers, stub_report) - - LOGGER.info('writing report') - report_file = args.output.joinpath(f'{colltitle}.json') + LOGGER.info("writing report") + report_file = args.output.joinpath(f"{colltitle}.json") write_report(full_report, report_file) -if __name__=="__main__": +if __name__ == "__main__": main() diff --git a/tests/test_lint_ft.py b/tests/test_lint_ft.py index 8d9eb9b..0b536e2 100644 --- a/tests/test_lint_ft.py +++ b/tests/test_lint_ft.py @@ -4,6 +4,7 @@ import digarch_scripts.lint.lint_ft as lint_ft + # Unit tests # Argument tests def test_package_argument(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): @@ -30,6 +31,7 @@ def test_directory_argument(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): assert child_dir in args.packages + # linting tests @pytest.fixture def good_package(tmp_path: Path): @@ -37,8 +39,7 @@ def good_package(tmp_path: Path): f_object_data = pkg / "objects" / "data" f_object_data.mkdir(parents=True) - bag_files = ["bag-info.txt", "bagit.txt", - "manifest-md5.txt", "tagmanifest-md5.txt"] + bag_files = ["bag-info.txt", "bagit.txt", "manifest-md5.txt", "tagmanifest-md5.txt"] for f in bag_files: filepath = pkg / "objects" / f filepath.touch() @@ -64,6 +65,7 @@ def good_package(tmp_path: Path): return pkg + def test_top_folder_valid_name(good_package): """Top level folder name has to conform to ACQ_####_######""" result = lint_ft.package_has_valid_name(good_package) @@ -81,12 +83,14 @@ def test_top_folder_invalid_name(good_package): assert not result + def test_package_has_two_subfolders(good_package): """Second level folders must be two""" result = lint_ft.package_has_two_subfolders(good_package) assert result + def test_package_does_not_have_two_subfolders(good_package): """Test that package fails function when second level folders are not the correct number, i.e. 2""" @@ -98,6 +102,7 @@ def test_package_does_not_have_two_subfolders(good_package): assert not result + def test_sec_level_folder_valid_names(good_package): """Second level folders must only have objects and metadata folder""" result = lint_ft.package_has_valid_subfolder_names(good_package) @@ -116,12 +121,14 @@ def test_sec_level_folder_invalid_names(good_package): assert not result + def test_package_has_no_hidden_file(good_package): """The package should not have any hidden file""" result = lint_ft.package_has_no_hidden_file(good_package) assert result + def test_package_has_hidden_file(good_package): """Test that package fails function when there is any hidden file""" bad_package = good_package @@ -134,12 +141,14 @@ def test_package_has_hidden_file(good_package): assert not result + def test_package_has_no_zero_bytes_file(good_package): """The package should not have any zero bytes file""" result = lint_ft.package_has_no_zero_bytes_file(good_package) assert result + def test_package_has_zero_bytes_file(good_package): """Test that package fails function when there is any zero bytes file""" bad_package = good_package @@ -150,6 +159,7 @@ def test_package_has_zero_bytes_file(good_package): assert not result + def test_metadata_folder_is_flat(good_package): """The metadata folder should not have folder structure""" result = lint_ft.metadata_folder_is_flat(good_package) @@ -168,12 +178,14 @@ def test_metadata_folder_has_random_folder(good_package): assert not result + def test_metadata_folder_has_files(good_package): """The metadata folder should have one or more file""" result = lint_ft.metadata_folder_has_files(good_package) assert result + def test_metadata_folder_empty(good_package): """Test that package fails function when the metadata does not have any files""" @@ -185,12 +197,14 @@ def test_metadata_folder_empty(good_package): assert not result + def test_metadata_has_correct_naming_convention(good_package): """The metadata file name should be in the accepted list""" result = lint_ft.metadata_has_correct_naming_convention(good_package) assert result + def test_metadata_has_incorrect_naming_convention(good_package): """Test that package fails function when metadata file(s) has incorrect naming conventions""" @@ -202,6 +216,7 @@ def test_metadata_has_incorrect_naming_convention(good_package): assert not result + def test_objects_folder_correct_structure(good_package): """objects folder should have a data folder, which includes four files: bag-info.txt, bagit.txt, manifest-md5.txt and tagmanifest-md5.txt""" @@ -209,6 +224,7 @@ def test_objects_folder_correct_structure(good_package): assert result + def test_objects_folder_incorrect_structure(good_package): """Test that package fails function if it does not have the data folder, or missing any of the four files: bag-info.txt, bagit.txt, manifest-md5.txt @@ -221,12 +237,14 @@ def test_objects_folder_incorrect_structure(good_package): assert not result + def test_objects_folder_has_no_empty_folder(good_package): """The objects folder should not have any empty folders""" result = lint_ft.objects_folder_has_no_empty_folder(good_package) assert result + def test_objects_folder_has_empty_folder(good_package): """Test that package fails function if its objects folder has empty folder(s)""" bad_package = good_package @@ -238,12 +256,14 @@ def test_objects_folder_has_empty_folder(good_package): assert not result + def test_valid_package(good_package): """Test that package returns 'valid' when all tests are passed""" result = lint_ft.lint_package(good_package) assert result == "valid" + def test_invalid_package(good_package): """Test that package returns 'invalid' when failing some tests""" bad_package = good_package @@ -255,6 +275,7 @@ def test_invalid_package(good_package): assert result == "invalid" + def test_unclear_package(good_package): """Test that package returns 'needs review' when failing some tests""" bad_package = good_package diff --git a/tests/test_package_cloud.py b/tests/test_package_cloud.py index 55e5561..5af5211 100644 --- a/tests/test_package_cloud.py +++ b/tests/test_package_cloud.py @@ -1,12 +1,11 @@ -import digarch_scripts.package.package_cloud as pc - -import argparse import os -from pathlib import Path -import pytest import shutil +from pathlib import Path import bagit +import pytest + +import digarch_scripts.package.package_cloud as pc @pytest.fixture @@ -83,7 +82,7 @@ def test_id_arg_must_match_pattern( stderr = capsys.readouterr().err - assert f"bad_id does not match" in stderr + assert "bad_id does not match" in stderr def test_create_package_basedir_exc_on_readonly(tmp_path: Path, args: list): @@ -166,6 +165,7 @@ def test_do_not_overwrite_metadata(transfer_files: Path, package_base_dir: Path) assert source_log.exists() assert f"{rclone_log} already exists. Not moving." in str(exc.value) + def test_move_payload(transfer_files: Path, package_base_dir: Path): """Test that entirety of payload is moved and hierarchy is preserved""" @@ -203,6 +203,7 @@ def test_do_not_overwrite_payload(transfer_files: Path, package_base_dir: Path): assert source_contents == [file for file in source_payload.rglob("*")] assert f"{bag_payload} already exists. Not moving files." in str(exc.value) + @pytest.fixture def bag_payload(transfer_files: Path, package_base_dir: Path): pc.move_payload(transfer_files / "rclone_files", package_base_dir) @@ -210,6 +211,7 @@ def bag_payload(transfer_files: Path, package_base_dir: Path): return bag_payload + def test_convert_md5(bag_payload: Path, transfer_files: Path): rclone_md5 = transfer_files / "rclone.md5" pc.convert_to_bagit_manifest(rclone_md5, bag_payload.parent) @@ -218,10 +220,10 @@ def test_convert_md5(bag_payload: Path, transfer_files: Path): # Get path to correct payload in data # read md5 and extract filepaths with open(bag_md5) as m: - md5_paths = [line.strip().split(' ')[-1] for line in m.readlines()] + md5_paths = [line.strip().split(" ")[-1] for line in m.readlines()] payload_files = [ - str(path.relative_to(bag_payload.parent)) for path in bag_payload.rglob('*') + str(path.relative_to(bag_payload.parent)) for path in bag_payload.rglob("*") ] for a_file in md5_paths: assert a_file in payload_files @@ -273,11 +275,12 @@ def test_validate_invalid_bag(transfer_files, caplog): test_bag = bagit.make_bag(object_dir) print(list(Path(test_bag.path).iterdir())) - (Path(test_bag.path) / 'bag-info.txt').unlink() + (Path(test_bag.path) / "bag-info.txt").unlink() pc.validate_bag_in_payload(transfer_files) - - assert f"{test_bag.path} is not valid. Check the bag manifest and oxum." in caplog.text + assert ( + f"{test_bag.path} is not valid. Check the bag manifest and oxum." in caplog.text + ) def test_full_run( @@ -290,6 +293,6 @@ def test_full_run( pkg_dir = Path(args[-3]) / args[-1][:-7] / args[-1] assert pkg_dir.exists() - assert bagit.Bag(str(pkg_dir / 'objects')).validate() + assert bagit.Bag(str(pkg_dir / "objects")).validate() - assert 'rclone.log' in [x.name for x in (pkg_dir / 'metadata').iterdir()] + assert "rclone.log" in [x.name for x in (pkg_dir / "metadata").iterdir()] diff --git a/tests/test_report_ftk_extents.py b/tests/test_report_ftk_extents.py index 6b0ff00..94f6856 100644 --- a/tests/test_report_ftk_extents.py +++ b/tests/test_report_ftk_extents.py @@ -1,6 +1,9 @@ -import digarch_scripts.report.report_ftk_extents as rfe -import pytest import json + +import pytest + +import src.digarch_scripts.report.report_ftk_extents as rfe + try: from lxml import etree except ImportError: @@ -9,17 +12,19 @@ @pytest.fixture def parsed_report(): - return etree.parse('tests/fixtures/report/Report.xml') + return etree.parse("tests/fixtures/report/Report.xml") + def test_identify_all_ers(parsed_report): """Function should list every bookmark starting with ER""" ers = rfe.create_er_list(parsed_report) - just_ers = [er[0].split('/')[-1].split(':')[0] for er in ers] + just_ers = [er[0].split("/")[-1].split(":")[0] for er in ers] for i in range(1, 12): - assert f'ER {i}' in just_ers - assert 'ER 23' in just_ers + assert f"ER {i}" in just_ers + assert "ER 23" in just_ers + def test_hierarchy_nests_down_correctly(parsed_report): """Function should include organization hierarchy. @@ -27,45 +32,71 @@ def test_hierarchy_nests_down_correctly(parsed_report): ers = rfe.create_er_list(parsed_report) just_titles = [er[0] for er in ers] - assert 'Extents Test papers/Series 1/Subseries(1)/ER 1: Text, 2023' in just_titles - assert 'Extents Test papers/Series 1/Subseries(1)/Subsubseries(2)/ER 2: File 15, 2023' in just_titles + assert "Extents Test papers/Series 1/Subseries(1)/ER 1: Text, 2023" in just_titles + assert ( + "Extents Test papers/Series 1/Subseries(1)/Subsubseries(2)/ER 2: File 15, 2023" + in just_titles + ) + def test_hierarchy_nests_empty_subseries(parsed_report): """Function should include organization hierarchy including empty levels""" ers = rfe.create_er_list(parsed_report) just_titles = [er[0] for er in ers] - assert 'Extents Test papers/Series 1/Subseries(1)/Subsubseries(2)/Subsubsubseries(3)/Subsubsubsubseries(4)/ER 10: Folder 2, 2023' in just_titles + assert ( + "Extents Test papers/Series 1/Subseries(1)/Subsubseries(2)/Subsubsubseries(3)" + "/Subsubsubsubseries(4)/ER 10: Folder 2, 2023" + in just_titles + ) + def test_hierarchy_nests_up_correctly(parsed_report): """Function should be able to step down in hierarchy""" ers = rfe.create_er_list(parsed_report) just_titles = [er[0] for er in ers] - assert 'Extents Test papers/Series 1/Subseries(1)/Subsubseries(2) the second/ER 23: File 17, 2023' in just_titles - assert 'Extents Test papers/Series 1/Subseries(1) the second/ER 4: File 18, 2023' in just_titles + assert ( + "Extents Test papers/Series 1/Subseries(1)/Subsubseries(2) the second/ER 23: " + "File 17, 2023" + in just_titles + ) + assert ( + "Extents Test papers/Series 1/Subseries(1) the second/ER 4: File 18, 2023" + in just_titles + ) + def test_hierarchy_nests_reverse_order_bookmarks(parsed_report): """Function should parse bottom-up hierarchy""" ers = rfe.create_er_list(parsed_report) just_titles = [er[0] for er in ers] - assert 'Extents Test papers/Series 2/ER 9: File 20,2023' in just_titles - assert 'Extents Test papers/Series 2/Subseries(1) of Series 2/ER 8: File 2, 2023' in just_titles - assert 'Extents Test papers/Series 2/Subseries(1) of Series 2/Subsubseries(2) of Series 2/ER 7: File 19, 2023' in just_titles + assert "Extents Test papers/Series 2/ER 9: File 20,2023" in just_titles + assert ( + "Extents Test papers/Series 2/Subseries(1) of Series 2/ER 8: File 2, 2023" + in just_titles + ) + assert ( + "Extents Test papers/Series 2/Subseries(1) of Series 2/Subsubseries(2) of " + "Series 2/ER 7: File 19, 2023" + in just_titles + ) + def test_er_outside_of_series(parsed_report): """Function should include capture ERs even if they're not in a series""" ers = rfe.create_er_list(parsed_report) just_titles = [er[0] for er in ers] - assert 'Extents Test papers/ER 10: File 21,2023' in just_titles + assert "Extents Test papers/ER 10: File 21,2023" in just_titles + def test_correct_report_many_files(parsed_report): """Test if file count and byte count is completed correctly""" bookmark_tables = rfe.transform_bookmark_tables(parsed_report) - er_with_many_files = [['ER 1', 'bk6001']] + er_with_many_files = [["ER 1", "bk6001"]] extents = rfe.add_extents_to_ers(er_with_many_files, bookmark_tables) # bytes @@ -73,12 +104,13 @@ def test_correct_report_many_files(parsed_report): # files assert extents[0][2] == 7 + def test_correct_report_on_er_with_folder_bookmarked(parsed_report): """Test if file count and byte count is completed correctly when bookmark includes a folder that is bookmarked""" bookmark_tables = rfe.transform_bookmark_tables(parsed_report) - er_with_folder = [['ER 10', 'bk12001']] + er_with_folder = [["ER 10", "bk12001"]] extents = rfe.add_extents_to_ers(er_with_folder, bookmark_tables) # bytes @@ -86,12 +118,13 @@ def test_correct_report_on_er_with_folder_bookmarked(parsed_report): # files assert extents[0][2] == 5 + def test_correct_report_on_er_with_folder_not_bookmarked(parsed_report): """Test if file count and byte count is completed correctly when bookmark includes a folder that isn't bookmarked""" bookmark_tables = rfe.transform_bookmark_tables(parsed_report) - er_with_folder = [['ER 3', 'bk11001']] + er_with_folder = [["ER 3", "bk11001"]] extents = rfe.add_extents_to_ers(er_with_folder, bookmark_tables) # bytes @@ -99,11 +132,12 @@ def test_correct_report_on_er_with_folder_not_bookmarked(parsed_report): # files assert extents[0][2] == 5 + def test_correct_report_1_file(parsed_report): """Test if file count and byte count is completed correctly for one file""" bookmark_tables = rfe.transform_bookmark_tables(parsed_report) - er_with_one_file = [['ER 2', 'bk9001']] + er_with_one_file = [["ER 2", "bk9001"]] extents = rfe.add_extents_to_ers(er_with_one_file, bookmark_tables) # bytes @@ -111,38 +145,45 @@ def test_correct_report_1_file(parsed_report): # files assert extents[0][2] == 1 + def test_warn_on_no_files_in_er(parsed_report, caplog): """Test if warning is logged for empty bookmarks and ER is omitted from report""" bookmark_tables = rfe.transform_bookmark_tables(parsed_report) - er_with_no_files = [['ER 5: No Files, 2023', 'bk27001']] + er_with_no_files = [["ER 5: No Files, 2023", "bk27001"]] extents = rfe.add_extents_to_ers(er_with_no_files, bookmark_tables) assert extents == [] - log_msg = f'{er_with_no_files[0][0]} does not contain any files. It will be omitted from the report.' + log_msg = f"{er_with_no_files[0][0]} does not contain any files. It will be " + "omitted from the report." assert log_msg in caplog.text + def test_warn_on_a_no_byte_file_in_er(parsed_report, caplog): """Test if warning is logged for empty files in an ER""" bookmark_tables = rfe.transform_bookmark_tables(parsed_report) - er_with_no_bytes = [['ER 6: Zero Length, 2023', 'bk28001']] + er_with_no_bytes = [["ER 6: Zero Length, 2023", "bk28001"]] rfe.add_extents_to_ers(er_with_no_bytes, bookmark_tables) - log_msg = f'{er_with_no_bytes[0][0]} contains the following 0-byte file: file00.txt. Review this file with the processing archivist.' + log_msg = f"{er_with_no_bytes[0][0]} contains the following 0-byte file: " + "file00.txt. Review this file with the processing archivist." assert log_msg in caplog.text + def test_warn_on_no_bytes_in_er(parsed_report, caplog): - """Test if warning is logged for bookmarks with 0 bytes total and ER is omitted from report""" + """Test if warning is logged for bookmarks with 0 bytes total and ER is omitted " + "from report""" bookmark_tables = rfe.transform_bookmark_tables(parsed_report) - er_with_no_bytes = [['ER 6: Zero Length, 2023', 'bk28001']] + er_with_no_bytes = [["ER 6: Zero Length, 2023", "bk28001"]] extents = rfe.add_extents_to_ers(er_with_no_bytes, bookmark_tables) assert extents == [] - log_msg = f'{er_with_no_bytes[0][0]} contains no files with bytes. This ER is omitted from report. Review this ER with the processing archivist.' + log_msg = f"{er_with_no_bytes[0][0]} contains no files with bytes. This ER is " + "omitted from report. Review this ER with the processing archivist." assert log_msg in caplog.text @@ -150,7 +191,8 @@ def test_extract_collection_name_from_report(parsed_report): """Test if collection name is taken from XML""" coll_name = rfe.extract_collection_title(parsed_report) - assert coll_name == 'M12345 Extents Test' + assert coll_name == "M12345 Extents Test" + @pytest.fixture def ers_with_extents_list(parsed_report): @@ -160,40 +202,47 @@ def ers_with_extents_list(parsed_report): return ers_with_extents + def test_json_objects_contains_expected_fields(ers_with_extents_list): """Test if final report aligns with expectations for ASpace import""" - full_dict = {'title': 'slug', 'children': []} + full_dict = {"title": "slug", "children": []} for er in ers_with_extents_list: rfe.create_report(er, full_dict) def recursive_validator(er_dict): for key, value in er_dict.items(): - if key == 'title': + if key == "title": assert type(value) is str - elif key == 'children': + elif key == "children": assert type(value) is list for child in value: recursive_validator(child) - elif key == 'er_number': + elif key == "er_number": assert type(value) is str - elif key == 'er_name': + elif key == "er_name": assert type(value) is str - elif key == 'file_size': + elif key == "file_size": assert type(value) is int - elif key == 'file_count': + elif key == "file_count": assert type(value) is int else: assert False recursive_validator(full_dict) + def test_skipped_ER_number_behavior(parsed_report, caplog): """Test if script flags when ER numbering is not sequential""" ers = rfe.create_er_list(parsed_report) for i in range(13, 23): - assert f'Collection uses ER 1 to ER 23. ER {i} is skipped. Review the ERs with the processing archivist' in caplog.text + assert ( + f"Collection uses ER 1 to ER 23. ER {i} is skipped. Review the ERs with " + "the processing archivist" + in caplog.text + ) + def test_repeated_ER_number_behavior(parsed_report, caplog): """Test if script flags when ER number is reused""" @@ -201,18 +250,22 @@ def test_repeated_ER_number_behavior(parsed_report, caplog): rfe.audit_ers(ers) - log_msg = f'ER 10 is used multiple times: ER 10: File 21,2023, ER 10: Folder 2, 2023. Review the ERs with the processing archivist' + log_msg = ("ER 10 is used multiple times: ER 10: File 21,2023, " + "ER 10: Folder 2, 2023. Review the ERs with the processing archivist" + ) assert log_msg in caplog.text + @pytest.fixture def expected_json(): - with open('tests/fixtures/report/report.json') as f: + with open("tests/fixtures/report/report.json") as f: report = json.load(f) return report + def test_create_correct_json(ers_with_extents_list, expected_json): """Test that final report matches total expectations""" - dct = {'title': 'coll', 'children': []} + dct = {"title": "coll", "children": []} for er in ers_with_extents_list: dct = rfe.create_report(er, dct) diff --git a/tests/test_report_hdd_extents.py b/tests/test_report_hdd_extents.py index ca17165..f36a48b 100644 --- a/tests/test_report_hdd_extents.py +++ b/tests/test_report_hdd_extents.py @@ -1,27 +1,32 @@ -import digarch_scripts.report.report_hdd_extents as rhe -import pytest -import shutil -import re -import pathlib import json +import pathlib +import re +import shutil + +import pytest + +import src.digarch_scripts.report.report_hdd_extents as rhe + @pytest.fixture() def arranged_collection(tmp_path: pathlib.Path): - path = tmp_path.joinpath('hdd') - shutil.copytree('tests/fixtures/report', path) + path = tmp_path.joinpath("hdd") + shutil.copytree("tests/fixtures/report", path) return path + def test_identify_all_ers(arranged_collection): """Function should list every folder starting with ER""" ers = rhe.get_ers(arranged_collection) print(ers) - just_ers = [re.search(r'ER\s\d+', er[0]).group() for er in ers] + just_ers = [re.search(r"ER\s\d+", er[0]).group() for er in ers] for i in range(1, 4): - assert f'ER {i}' in just_ers + assert f"ER {i}" in just_ers for i in range(7, 12): - assert f'ER {i}' in just_ers - assert 'ER 23' in just_ers + assert f"ER {i}" in just_ers + assert "ER 23" in just_ers + def test_hierarchy_nests_down_correctly(arranged_collection): """Function should include organization hierarchy. @@ -30,28 +35,38 @@ def test_hierarchy_nests_down_correctly(arranged_collection): just_titles = [er[0] for er in ers] print(just_titles) - assert 'M12345_FAcomponents/Series 1/Subseries(1)/ER 1 Text, 2023' in just_titles - assert 'M12345_FAcomponents/Series 1/Subseries(1)/Subsubseries(2)/ER 2 File 15, 2023' in just_titles + assert "M12345_FAcomponents/Series 1/Subseries(1)/ER 1 Text, 2023" in just_titles + assert ( + "M12345_FAcomponents/Series 1/Subseries(1)/Subsubseries(2)/ER 2 File 15, 2023" + in just_titles + ) + def test_hierarchy_nests_empty_subseries(arranged_collection): """Function should include organization hierarchy including empty levels""" ers = rhe.get_ers(arranged_collection) just_titles = [er[0] for er in ers] - assert 'M12345_FAcomponents/Series 1/Subseries(1)/Subsubseries(2)/Subsubsubseries(3)/Subsubsubsubseries(4)/ER 10 Folder 2, 2023' in just_titles + assert ( + "M12345_FAcomponents/Series 1/Subseries(1)/Subsubseries(2)/Subsubsubseries(3)" + "/Subsubsubsubseries(4)/ER 10 Folder 2, 2023" + in just_titles + ) + def test_er_outside_of_series(arranged_collection): """Function should include capture ERs even if they're not in a series""" ers = rhe.get_ers(arranged_collection) just_titles = [er[0] for er in ers] - assert 'M12345_FAcomponents/ER 10 File 21,2023' in just_titles + assert "M12345_FAcomponents/ER 10 File 21,2023" in just_titles + def test_correct_report_many_files(arranged_collection): """Test if file count and byte count is completed correctly""" ers = rhe.get_ers(arranged_collection) - er_with_many_files = 'ER 1 Text, 2023' + er_with_many_files = "ER 1 Text, 2023" for er in ers: if er[3] == er_with_many_files: bytes, files = er[1:3] @@ -62,12 +77,13 @@ def test_correct_report_many_files(arranged_collection): # files assert files == 7 + def test_correct_report_on_er_with_folder_included(arranged_collection): """Test if file count and byte count is completed correctly when bookmark includes a folder that is bookmarked""" ers = rhe.get_ers(arranged_collection) - er_with_folder = 'ER 10 Folder 2, 2023' + er_with_folder = "ER 10 Folder 2, 2023" for er in ers: if er[3] == er_with_folder: bytes, files = er[1:3] @@ -82,7 +98,7 @@ def test_correct_report_1_file(arranged_collection): """Test if file count and byte count is completed correctly for one file""" ers = rhe.get_ers(arranged_collection) - er_with_one_file = 'ER 2 File 15, 2023' + er_with_one_file = "ER 2 File 15, 2023" for er in ers: if er[3] == er_with_one_file: bytes, files = er[1:3] @@ -97,9 +113,12 @@ def test_warn_on_no_files_in_er(arranged_collection, caplog): """Test if warning is logged for empty bookmarks and ER is omitted from report""" ers = rhe.get_ers(arranged_collection) - er_with_no_files = 'ER 5 No Files, 2023' + er_with_no_files = "ER 5 No Files, 2023" - log_msg = f'{er_with_no_files} does not contain any files. It will be omitted from the report.' + log_msg = ( + f"{er_with_no_files} does not contain any files. It will be omitted from " + "the report." + ) assert log_msg in caplog.text @@ -107,23 +126,27 @@ def test_warn_on_a_no_byte_file_in_er(arranged_collection, caplog): """Test if warning is logged for empty files in an ER""" ers = rhe.get_ers(arranged_collection) - er_with_no_bytes = 'ER 6 Zero Length, 2023' + er_with_no_bytes = "ER 6 Zero Length, 2023" # rfe.add_extents_to_ers(er_with_no_bytes, bookmark_tables) # log warning, script should continue running # 'ER xxx: Title contain zero byte files.' - log_msg = f'{er_with_no_bytes} contains the following 0-byte file: file00.txt. Review this file with the processing archivist.' + log_msg = f"{er_with_no_bytes} contains the following 0-byte file: file00.txt. " + "Review this file with the processing archivist." assert log_msg in caplog.text def test_warn_on_no_bytes_in_er(arranged_collection, caplog): - """Test if warning is logged for bookmarks with 0 bytes total and ER is omitted from report""" + """Test if warning is logged for bookmarks with 0 bytes total and ER is omitted + from report""" + ers = rhe.get_ers(arranged_collection) - er_with_no_bytes = 'ER 6 Zero Length, 2023' + er_with_no_bytes = "ER 6 Zero Length, 2023" # rfe.add_extents_to_ers(er_with_no_bytes, bookmark_tables) # log warning, script should continue running # 'ER xxx: Title does not contain any bytes. It will be omitted from the report' - log_msg = f'{er_with_no_bytes} contains no files with bytes. This ER is omitted from report. Review this ER with the processing archivist.' + log_msg = f"{er_with_no_bytes} contains no files with bytes. This ER is omitted " + "from report. Review this ER with the processing archivist." assert log_msg in caplog.text @@ -131,9 +154,11 @@ def test_warn_on_no_objects_in_er(arranged_collection, caplog): """Test if warning is logged for empty bookmarks and ER is omitted from report""" ers = rhe.get_ers(arranged_collection) - er_with_no_files = 'ER 13 No objects, 2023' + er_with_no_files = "ER 13 No objects, 2023" - log_msg = f'{er_with_no_files} does not contain an object folder. It will be omitted from the report.' + log_msg = (f"{er_with_no_files} does not contain an object folder. It will be " + "omitted from the report." + ) assert log_msg in caplog.text @@ -141,58 +166,69 @@ def test_extract_collection_name(arranged_collection): """Test if collection name is taken from XML""" coll_name = rhe.extract_collection_title(arranged_collection) - assert coll_name == 'M12345_FAcomponents' + assert coll_name == "M12345_FAcomponents" + def test_warn_on_bad_collection_name(arranged_collection, caplog): """Test if collection name is taken from XML""" - coll_name_folder = arranged_collection / 'M12345_FAcomponents' - coll_name_folder.rename(arranged_collection / 'Test_Coll') - coll_name = rhe.extract_collection_title(arranged_collection) - log_msg = 'Cannot find CollectionID_FAcomponents directory. Please use CollectionID_FAcomponents naming convention for the directory containing all ERs.' + coll_name_folder = arranged_collection / "M12345_FAcomponents" + coll_name_folder.rename(arranged_collection / "Test_Coll") + rhe.extract_collection_title(arranged_collection) + log_msg = ("Cannot find CollectionID_FAcomponents directory. Please use " + "CollectionID_FAcomponents naming convention for the directory " + "containing all ERs." + ) assert log_msg in caplog.text + def test_skipped_ER_number_behavior(arranged_collection, caplog): ers = rhe.get_ers(arranged_collection) rhe.audit_ers(ers) # log warning, but continue operation - for number in range(13,22): - log_msg = f'Collection uses ER 1 to ER 23. ER {number} is skipped. Review the ERs with the processing archivist' + for number in range(13, 22): + log_msg = (f"Collection uses ER 1 to ER 23. ER {number} is skipped. " + "Review the ERs with the processing archivist" + ) assert log_msg in caplog.text + def test_repeated_ER_number_behavior(arranged_collection, caplog): ers = rhe.get_ers(arranged_collection) rhe.audit_ers(ers) - log_msg = 'ER 10 is used multiple times' + log_msg = ("ER 10 is used multiple times" + ) assert log_msg in caplog.text + @pytest.fixture def extracted_ers(arranged_collection): return rhe.get_ers(arranged_collection) + def test_json_objects_contains_expected_fields(extracted_ers): """Test if final report aligns with expectations for ASpace import""" - full_dict = rhe.create_report(extracted_ers, {'title': 'test', 'children': []}) + full_dict = rhe.create_report(extracted_ers, {"title": "test", "children": []}) def recursive_validator(er_dict): for key, value in er_dict.items(): - if key == 'title': + if key == "title": assert type(value) is str - elif key == 'children': + elif key == "children": assert type(value) is list for child in value: recursive_validator(child) - elif key == 'er_number': + elif key == "er_number": assert type(value) is str - elif key == 'er_name': + elif key == "er_name": assert type(value) is str - elif key == 'file_size': + elif key == "file_size": assert type(value) is int - elif key == 'file_count': + elif key == "file_count": assert type(value) is int else: assert False @@ -202,19 +238,19 @@ def recursive_validator(er_dict): @pytest.fixture def expected_json(): - with open('tests/fixtures/report/report.json') as f: + with open("tests/fixtures/report/report.json") as f: raw = f.read() - #adjust fixture for hdd conventions + # adjust fixture for hdd conventions colons_removed = re.sub(r"(ER \d+):", r"\1", raw) report = json.loads(colons_removed) - report['children'][0]['title'] = 'M12345_FAcomponents' - + report["children"][0]["title"] = "M12345_FAcomponents" return report + def test_create_correct_json(extracted_ers, expected_json): """Test that final report matches total expectations""" - dct = rhe.create_report(extracted_ers, {'title': 'coll', 'children': []}) + dct = rhe.create_report(extracted_ers, {"title": "coll", "children": []}) assert dct == expected_json