From 48fa06e823c5eb5d864fa6a050531363f6de0a2c Mon Sep 17 00:00:00 2001 From: dreyjo <59994482+dreyjo@users.noreply.github.com> Date: Tue, 3 Sep 2024 18:01:58 -0400 Subject: [PATCH 1/5] added service copy script --- src/born_digital_docs_scripts/make_sc.py | 147 +++++++++++++++++++++++ 1 file changed, 147 insertions(+) create mode 100644 src/born_digital_docs_scripts/make_sc.py diff --git a/src/born_digital_docs_scripts/make_sc.py b/src/born_digital_docs_scripts/make_sc.py new file mode 100644 index 0000000..d37cdb8 --- /dev/null +++ b/src/born_digital_docs_scripts/make_sc.py @@ -0,0 +1,147 @@ +from pathlib import Path +import logging +import subprocess +import argparse +import re + +# Accept a directory (could hardcode or argparse) +def parse_args() -> argparse.Namespace: + def extant_path(p: str) -> Path: + path = Path(p) + if not path.exists(): + raise argparse.ArgumentTypeError(f"{path} does not exist") + return path + + # def rclone_remote(p: str) -> Path: + # if not re.match(r'*:*', p): + # raise argparse.ArgumentTypeError(f"{p} doesn't looke like an rclone remote") + # return p + + parser = argparse.ArgumentParser(description="path to a directory of born digital ami") + parser.add_argument("--source", "-s", required=True, type=extant_path) + parser.add_argument("--dest", "-d", required=True, type=str) + return parser.parse_args() + + +# Function take directory (or staged excel), Find all EM files, return list +def get_em(path: Path) -> list[Path]: + source = path + ems = [] + for x in source.rglob("*_em.*"): + if not str(x).endswith('mov'): + print(f"is this okay?: {x}") + else: + ems.append(x) + + # print(ems) + return ems + + +# Function takes list of EM files, find if interlaced or not, return list of [path, interlaced] → mediainfo (use Inform argument) +def find_interlace(paths: list[Path]) -> list[list[Path,str]]: + interlacing = [] + for path in paths: + # mediainfo results = subprocess. + # if mediainfo_result == ... + # mediainfo --Output="Video;%ScanType%" + mediainfo = subprocess.check_output(['mediainfo', + "--Inform=Video;%ScanType%", + path], + encoding='utf-8').strip() + + interlacing.append([path,mediainfo]) + + #below for testing purposes + # if len(str(path)) / 2 == 0: # just to create variation for now + # interlacing.append([path, 'interlaced']) + # else: + # interlacing.append([path, 'progressive']) + # print(interlacing) + return interlacing + + +# Function take list of of [path, interlaced], create ffmpeg commands (need to adjust interlacing and service file path per command) (may need to create servicecopy folder before ffmpeg runs), return list of commands + + +# Overwrite servicecopies, may need to add flag to ffmpeg to do this (-y?) +def make_commands(files: list[list[Path,str]]) -> list[list[str]]: + commands = [] + + for x in files: + em_path = x[0] + em_path_str = str(em_path) + # PosixPath('test_ems/dir_3/data/EditMaster/sample_dig_3_em.mov') + base = em_path.parent.parent + ''' + May be useful to include a check for service copy directory already existing. + added because ffmpeg did not like the directory not already existing + ''' + dest = base / 'ServiceCopies' + subprocess.run(['mkdir', f'{dest}']) + sc_path_str = str(base / 'ServiceCopies' / em_path.name.replace("em.mov", "sc.mp4")) + if x[1] == 'interlaced': + cmd = ['ffmpeg', '-i', em_path_str, '-map', '0:v', '-map', '0:a', '-c:v', 'libx264', '-movflags', '+faststart', '-crf', '20', '-maxrate', '7.5M', '-bufsize', '7.5M', '-vf', 'yadif', '-c:a', 'aac', '-b:a', '320000', '-ar', '48000', sc_path_str] + else: + cmd = ['ffmpeg', '-i', em_path_str, '-map', '0:v', '-map', '0:a', '-c:v', 'libx264', '-movflags', '+faststart', '-crf', '20', '-maxrate', '7.5M', '-bufsize', '7.5M', '-c:a', 'aac', '-b:a', '320000', '-ar', '48000', sc_path_str] + + commands.append(cmd) + + return commands + + + +# # Function take list of commands, run each command, return list of sc files +def make_sc(commands: list[list[str]]) -> list[str]: + sc = [] + for c in commands: + # logging.DEBUG(f"Running this command {c}") + subprocess.run(c) + sc.append(c[-1]) + logging.info(f"{c[-1]} created") + + return sc + +# # Function take list of sc files, make rclone command, return list of commands +def make_rclone(files: list[str], dest) -> list[list[str]]: + commands = [] + + for sc in files: + fn = Path(sc).name + print(fn) + rc = ['rclone', 'copyto', sc, f'{dest}/{fn}', '-P'] + commands.append(rc) + + # print(commands) + return commands + +# # Function take list of rclone commands, run each, return none +def run_rclone(commands: list[list[str]]) -> None: + for c in commands: + print(c) + logging.info(f"transferring {c[2]}") + subprocess.run(c) + logging.info(f"{c[2]} has been transferred") + + return None + +def main(): + source = parse_args().source + dest = parse_args().dest + ems = get_em(source) + em_paths = find_interlace(ems) + ff_cmds = make_commands(em_paths) + sc = make_sc(ff_cmds) + rc_cmds = make_rclone(sc,dest) + + run_rclone(rc_cmds) + # sc + + # for cmd in ff_cmds: + # print(cmd[-1]) + + + + + +if __name__ == "__main__": + main() \ No newline at end of file From 74131ee18c4ace3cc7b14a555419fc567b901cc2 Mon Sep 17 00:00:00 2001 From: Nick Krabbenhoeft Date: Thu, 16 Jan 2025 11:54:53 -0800 Subject: [PATCH 2/5] Update make_sc --- src/born_digital_docs_scripts/make_sc.py | 112 ++++++++++------------- 1 file changed, 50 insertions(+), 62 deletions(-) diff --git a/src/born_digital_docs_scripts/make_sc.py b/src/born_digital_docs_scripts/make_sc.py index d37cdb8..0a8d771 100644 --- a/src/born_digital_docs_scripts/make_sc.py +++ b/src/born_digital_docs_scripts/make_sc.py @@ -38,18 +38,13 @@ def get_em(path: Path) -> list[Path]: # Function takes list of EM files, find if interlaced or not, return list of [path, interlaced] → mediainfo (use Inform argument) -def find_interlace(paths: list[Path]) -> list[list[Path,str]]: - interlacing = [] - for path in paths: - # mediainfo results = subprocess. - # if mediainfo_result == ... - # mediainfo --Output="Video;%ScanType%" - mediainfo = subprocess.check_output(['mediainfo', - "--Inform=Video;%ScanType%", - path], - encoding='utf-8').strip() +def find_interlace(path: list[Path]) -> list[list[Path,str]]: + interlacing = subprocess.check_output(['mediainfo', + "--Inform=Video;%ScanType%", + path], + encoding='utf-8').strip() - interlacing.append([path,mediainfo]) + #below for testing purposes # if len(str(path)) / 2 == 0: # just to create variation for now @@ -57,70 +52,57 @@ def find_interlace(paths: list[Path]) -> list[list[Path,str]]: # else: # interlacing.append([path, 'progressive']) # print(interlacing) - return interlacing + return path, interlacing # Function take list of of [path, interlaced], create ffmpeg commands (need to adjust interlacing and service file path per command) (may need to create servicecopy folder before ffmpeg runs), return list of commands # Overwrite servicecopies, may need to add flag to ffmpeg to do this (-y?) -def make_commands(files: list[list[Path,str]]) -> list[list[str]]: - commands = [] - - for x in files: - em_path = x[0] - em_path_str = str(em_path) - # PosixPath('test_ems/dir_3/data/EditMaster/sample_dig_3_em.mov') - base = em_path.parent.parent - ''' - May be useful to include a check for service copy directory already existing. - added because ffmpeg did not like the directory not already existing - ''' - dest = base / 'ServiceCopies' - subprocess.run(['mkdir', f'{dest}']) - sc_path_str = str(base / 'ServiceCopies' / em_path.name.replace("em.mov", "sc.mp4")) - if x[1] == 'interlaced': - cmd = ['ffmpeg', '-i', em_path_str, '-map', '0:v', '-map', '0:a', '-c:v', 'libx264', '-movflags', '+faststart', '-crf', '20', '-maxrate', '7.5M', '-bufsize', '7.5M', '-vf', 'yadif', '-c:a', 'aac', '-b:a', '320000', '-ar', '48000', sc_path_str] - else: - cmd = ['ffmpeg', '-i', em_path_str, '-map', '0:v', '-map', '0:a', '-c:v', 'libx264', '-movflags', '+faststart', '-crf', '20', '-maxrate', '7.5M', '-bufsize', '7.5M', '-c:a', 'aac', '-b:a', '320000', '-ar', '48000', sc_path_str] +def make_commands(file: list[list[Path,str]]) -> list[list[str]]: + em_path = file[0] + em_path_str = str(em_path) + # PosixPath('test_ems/dir_3/data/EditMaster/sample_dig_3_em.mov') + base = em_path.parent.parent + ''' + May be useful to include a check for service copy directory already existing. + added because ffmpeg did not like the directory not already existing + ''' + dest = base / 'ServiceCopies' + subprocess.run(['mkdir', f'{dest}']) + sc_path_str = str(base / 'ServiceCopies' / em_path.name.replace("em.mov", "sc.mp4")) + if file[1] == 'interlaced': + cmd = ['ffmpeg', '-i', em_path_str, '-map', '0:v', '-map', '0:a', '-c:v', 'libx264', '-movflags', '+faststart', '-crf', '20', '-maxrate', '7.5M', '-bufsize', '7.5M', '-vf', 'yadif', '-c:a', 'aac', '-b:a', '320000', '-ar', '48000', sc_path_str] + else: + cmd = ['ffmpeg', '-i', em_path_str, '-map', '0:v', '-map', '0:a', '-c:v', 'libx264', '-movflags', '+faststart', '-crf', '20', '-maxrate', '7.5M', '-bufsize', '7.5M', '-c:a', 'aac', '-b:a', '320000', '-ar', '48000', sc_path_str] - commands.append(cmd) - return commands + return cmd # # Function take list of commands, run each command, return list of sc files -def make_sc(commands: list[list[str]]) -> list[str]: - sc = [] - for c in commands: - # logging.DEBUG(f"Running this command {c}") - subprocess.run(c) - sc.append(c[-1]) - logging.info(f"{c[-1]} created") +def make_sc(command: list[list[str]]) -> list[str]: + # logging.DEBUG(f"Running this command {c}") + subprocess.run(command) + sc = command[-1] + logging.info(f"{command[-1]} created") return sc # # Function take list of sc files, make rclone command, return list of commands -def make_rclone(files: list[str], dest) -> list[list[str]]: - commands = [] - - for sc in files: - fn = Path(sc).name - print(fn) - rc = ['rclone', 'copyto', sc, f'{dest}/{fn}', '-P'] - commands.append(rc) +def make_rclone(file: str, dest: str) -> list[list[str]]: + fn = Path(file).name + rc = ['rclone', 'copyto', file, f'{dest}/{fn}', '-P'] # print(commands) - return commands + return rc # # Function take list of rclone commands, run each, return none -def run_rclone(commands: list[list[str]]) -> None: - for c in commands: - print(c) - logging.info(f"transferring {c[2]}") - subprocess.run(c) - logging.info(f"{c[2]} has been transferred") +def run_rclone(command: list[str]) -> None: + logging.info(f"transferring {command[2]}") + subprocess.run(command) + logging.info(f"{command[2]} has been transferred") return None @@ -128,12 +110,18 @@ def main(): source = parse_args().source dest = parse_args().dest ems = get_em(source) - em_paths = find_interlace(ems) - ff_cmds = make_commands(em_paths) - sc = make_sc(ff_cmds) - rc_cmds = make_rclone(sc,dest) - - run_rclone(rc_cmds) + for em in ems: + em_path = find_interlace(em) + ff_cmds = make_commands(em_path) + if Path(ff_cmds[-1]).exists(): + continue + print(em, ff_cmds[-1]) + if not str(em.name).startswith('myd'): + continue + sc = make_sc(ff_cmds) + rc_cmds = make_rclone(ff_cmds[-1],dest) + + run_rclone(rc_cmds) # sc # for cmd in ff_cmds: @@ -144,4 +132,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() From 35557bf71fa0d84721bc4557937eb61ba74744b1 Mon Sep 17 00:00:00 2001 From: dreyjo <59994482+dreyjo@users.noreply.github.com> Date: Mon, 3 Feb 2025 16:31:16 -0500 Subject: [PATCH 3/5] renamed to lint_bdami, changed references in test script --- src/born_digital_docs_scripts/lint_bdami.py | 188 ++++++++++++++++++++ tests/test_bd_validator.py | 10 +- 2 files changed, 193 insertions(+), 5 deletions(-) create mode 100644 src/born_digital_docs_scripts/lint_bdami.py diff --git a/src/born_digital_docs_scripts/lint_bdami.py b/src/born_digital_docs_scripts/lint_bdami.py new file mode 100644 index 0000000..0aada10 --- /dev/null +++ b/src/born_digital_docs_scripts/lint_bdami.py @@ -0,0 +1,188 @@ +import argparse +import logging +import pathlib +import re +from pathlib import Path + +import bagit + +LOGGER = logging.getLogger(__name__) + + +def parse_args(): + # validate and return paths for main directory and subdirs + def main_dir(arg): + path = Path(arg) + if not path.is_dir(): + raise argparse.ArgumentTypeError(f"{path} is not a directory") + return path + + def dir_of_dirs(arg): + path = main_dir(arg) + subdirs = [] + for child in path.iterdir(): + if child.is_dir(): + subdirs.append(child) + return subdirs + + parser = argparse.ArgumentParser(description="takes package directory") + # argument for single package to be validated + parser.add_argument( + "-p", + "--package", + type=main_dir, + help="input path to an ami package", + # required=True, + dest="packages", + action="append", + ) + parser.add_argument( + "-d", + "--directory", + type=dir_of_dirs, + help="input path to a directory of ami packages", + # required=True, + dest="packages", + action="extend", + ) + + return parser.parse_args() + + +def is_valid_bag(package: pathlib.Path) -> bool: + bag = bagit.Bag(str(package)) + return bag.validate() + + +# # get structure would have to change to incorporate a list of packages +# def get_structure(package: pathlib.Path) -> list: +# contents = [] +# meta = [] +# for item in package.iterdir(): +# if item.is_dir() and item.name == "data": +# for subdir in Path(item).iterdir(): +# contents.append(subdir) + +# else: +# meta.append(item.name) + +# # print(contents) +# # print(f'the following files are on the first level: {meta}') +# return contents + + +def valid_structure(package: Path) -> bool: + expected = set( + "ArchiveOriginals", + "EditMasters", + "ServiceCopies", + "Images", + "Transcripts", + "Captions", + "Releases", + "Project Files") + + found = set([x.name for x in (package).iterdir() if x.is_dir()]) + + if found <= expected: + return True + else: + LOGGER.error( + f"{package.name} top-level folders should only be {', '.join(expected)}, found {found}" + ) + return False + + + +# def get_files(package: pathlib.Path) -> list: +# all_items = Path(package).rglob("*") +# all_files = [x for x in all_items if x.is_file()] +# files_dict = [] +# for file in all_files: +# dict = {"name": file.name, "strpath": file.absolute(), "pospath": file} +# files_dict.append(dict) +# # print(test) +# return files_dict + + +# check to see the expected folders are in package based on file extension +def files_named_correctly(package, dir_name, ending) -> bool: + + # what happens if EditMasters doesn't exist + dir = (package / dir_name) + if dir.exists(): + contents = dir.rglob("*") + + expected = False + for item in contents: + if not item.stem.endswith(ending): + LOGGER.error(f"{package.name} has item in {dir_name} missing expected {ending}, found {item.name}") + expected = False + + if not expected: + return False + else: + return True + +def edits_named_correctly(package) -> bool: + return files_named_correctly(package, "EditMasters", "_em") + +def service_named_correctly(package) -> bool: + return files_named_correctly(package, "ServiceCopies", "_sc") + +def ao_named_correctly(package) -> bool: + return files_named_correctly(package, "ArchiveOriginals", "_ao") + + types = { + "_ao": "ArchiveOriginals", + "_em": "EditMasters", + "_sc": "ServiceCopies", + "_pm": "PreservationMasters", + } + + # dict ={'name': file.name, + # 'path':file} + + inspect = [] + + # for item in files_dict: + # for key in types: + # if re.search(key, files_dict['name']): + # print(f'{files_dict["name"]} is {types[key]}') + + for item in files_dict: + for key in types: + if re.search(key, item["name"]) and re.search(types[key], item["strpath"]): + print(f'{item["name"]} is in {types[key]} as expected') + else: + inspect.append(item) + + # for item in inspect: + # print(f'what is this?: {item}') + + +# if this works, try with not and result = true/false as written below +# result = True +# for item in contents: +# if not item.name in expected: +# result = False + +# return result + +# #check to see files are in appropriate folders: +# def validate_folders_file_match(): +# return True + + +def main(): + args = parse_args() + print(args) + # for loop for accessing namespace list of one or more + for source in args.packages: + folders = get_structure(source) + files = get_files(source) + validate_folder_content_types(files) + + +if __name__ == "__main__": + main() diff --git a/tests/test_bd_validator.py b/tests/test_bd_validator.py index 982b60e..24e3ffe 100644 --- a/tests/test_bd_validator.py +++ b/tests/test_bd_validator.py @@ -2,7 +2,7 @@ import pytest -import born_digital_docs_scripts.bd_validator as bv +import born_digital_docs_scripts.lint_bdami as bd @pytest.fixture @@ -12,22 +12,22 @@ def good_package(): @pytest.fixture def good_structure(good_package): - return bv.get_structure(good_package) + return bd.get_structure(good_package) def test_is_package_bag(good_package): - result = bv.is_valid_bag(good_package) + result = bd.is_valid_bag(good_package) assert result is True def test_expected_folders_present(good_structure): - result = bv.valid_structure(good_structure) + result = bd.valid_structure(good_structure) assert result def test_warning_unexpected_folder(good_structure): good_structure.append(Path("unknown_folder")) # not sure if this is correct - result = bv.valid_structure(good_structure) + result = bd.valid_structure(good_structure) assert not result From 8a1534a463abbd1a5a5275138a9907ccd0d23148 Mon Sep 17 00:00:00 2001 From: dreyjo <59994482+dreyjo@users.noreply.github.com> Date: Mon, 10 Mar 2025 14:20:57 -0400 Subject: [PATCH 4/5] changed test script name --- tests/test_lint_bdami.py | 56 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 tests/test_lint_bdami.py diff --git a/tests/test_lint_bdami.py b/tests/test_lint_bdami.py new file mode 100644 index 0000000..24e3ffe --- /dev/null +++ b/tests/test_lint_bdami.py @@ -0,0 +1,56 @@ +from pathlib import Path + +import pytest + +import born_digital_docs_scripts.lint_bdami as bd + + +@pytest.fixture +def good_package(): + return Path("fixtures/simple_video_pk") + + +@pytest.fixture +def good_structure(good_package): + return bd.get_structure(good_package) + + +def test_is_package_bag(good_package): + result = bd.is_valid_bag(good_package) + assert result is True + + +def test_expected_folders_present(good_structure): + result = bd.valid_structure(good_structure) + assert result + + +def test_warning_unexpected_folder(good_structure): + good_structure.append(Path("unknown_folder")) # not sure if this is correct + result = bd.valid_structure(good_structure) + assert not result + + +def test_required_folders_present(good_structure): + # do we have these? + assert False + + +def test_warn_on_required_folders_missing(good_structure): + # do we have these? + assert False + + # def test_expected_folders_match_package_contents(good_package): + # present = bv.get_structure(good_package) + assert result + + +# filetypes = {'ArchiveOriginals':'ao', 'EditMasters':'em','ServiceCopies':'sc','Images':['.jpg','.JPEG','.tif','.tiff'],'Transcripts':['.pdf'],'Captions','Releases', 'Project Files'} + +# @pytest.parametrize(filetypes) +# def test_warn_on_folder_file_mismatch(good_package, filetypes): +# corrupt one folder at a time and get the right warning message + +# def arguments_capture_valid_package_path(good_package) + +# def arguments_capture_valid_directory_paths(good_package) From 626dee2f7358d107c5d0032b1af45ffe364d9dba Mon Sep 17 00:00:00 2001 From: dreyjo <59994482+dreyjo@users.noreply.github.com> Date: Tue, 20 May 2025 12:01:49 -0400 Subject: [PATCH 5/5] changed document names --- src/born_digital_docs_scripts/lint_bdami.py | 2 ++ tests/test_lint_bdami.py | 36 +++++++++++++++++++-- 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/src/born_digital_docs_scripts/lint_bdami.py b/src/born_digital_docs_scripts/lint_bdami.py index 0aada10..95b8980 100644 --- a/src/born_digital_docs_scripts/lint_bdami.py +++ b/src/born_digital_docs_scripts/lint_bdami.py @@ -112,6 +112,8 @@ def files_named_correctly(package, dir_name, ending) -> bool: dir = (package / dir_name) if dir.exists(): contents = dir.rglob("*") + #else: + #Logger Error could not check folder, folder does not exist expected = False for item in contents: diff --git a/tests/test_lint_bdami.py b/tests/test_lint_bdami.py index 24e3ffe..df11e54 100644 --- a/tests/test_lint_bdami.py +++ b/tests/test_lint_bdami.py @@ -6,8 +6,40 @@ @pytest.fixture -def good_package(): - return Path("fixtures/simple_video_pk") +def good_package(tm_path: Path): + pkg = tmp_path.joinpath("fixtures/simple_bdami_pk") + #pkg = tmp_path.joinpath("fixtures/ncov1234") + + ao_folder = pkg.joinpath("data/ArchiveOriginals") + ao_folder.mkdir(parents=True) + #here add a fake video but also a folder with a couple levels + + em_folder = pkg.joinpath("data/EditMasters") + em_folder.mkdir(parents=True) + + sc_folder = pkg.joinpath("data/ServiceCopies") + sc_folder.mkdir(parents=True) + + + ao_filepath = ao_folder.joinpath("myd_mgzidf123456_v01_ao.mp4") + ao_folderpath = ao_folder.joinpath("/myd_mgzidf123456_v01_ao/CLIPS") + ao_mxf = ao_folderpath.joinpath("myd_mgzidf123456_v01_ao.mp4") + ao_xml = ao_folderpath.joinpath("myd_mgzidf123456_v01_ao.xml") + ao_bpav = + + + em_filepath = em_folder.joinpath("myd_mgzidf123456_v01_em.mov") + sc_filepath = sc_folder.joinpath("myd_mgzidf123456_v01_sc.mp4") + + for file in [ + ao_filepath, + em_filepath, + sc_filepath, + (pkg/"bagit.txt"), + (pkg/"manifest-md5.txt") + ] + + return pkg @pytest.fixture