Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 190 additions & 0 deletions src/born_digital_docs_scripts/lint_bdami.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
import argparse
import logging
import pathlib
import re
from pathlib import Path

import bagit

LOGGER = logging.getLogger(__name__)


def parse_args():
# validate and return paths for main directory and subdirs
def main_dir(arg):
path = Path(arg)
if not path.is_dir():
raise argparse.ArgumentTypeError(f"{path} is not a directory")
return path

def dir_of_dirs(arg):
path = main_dir(arg)
subdirs = []
for child in path.iterdir():
if child.is_dir():
subdirs.append(child)
return subdirs

parser = argparse.ArgumentParser(description="takes package directory")
# argument for single package to be validated
parser.add_argument(
"-p",
"--package",
type=main_dir,
help="input path to an ami package",
# required=True,
dest="packages",
action="append",
)
parser.add_argument(
"-d",
"--directory",
type=dir_of_dirs,
help="input path to a directory of ami packages",
# required=True,
dest="packages",
action="extend",
)

return parser.parse_args()


def is_valid_bag(package: pathlib.Path) -> bool:
bag = bagit.Bag(str(package))
return bag.validate()


# # get structure would have to change to incorporate a list of packages
# def get_structure(package: pathlib.Path) -> list:
# contents = []
# meta = []
# for item in package.iterdir():
# if item.is_dir() and item.name == "data":
# for subdir in Path(item).iterdir():
# contents.append(subdir)

# else:
# meta.append(item.name)

# # print(contents)
# # print(f'the following files are on the first level: {meta}')
# return contents


def valid_structure(package: Path) -> bool:
expected = set(
"ArchiveOriginals",
"EditMasters",
"ServiceCopies",
"Images",
"Transcripts",
"Captions",
"Releases",
"Project Files")

found = set([x.name for x in (package).iterdir() if x.is_dir()])

if found <= expected:
return True
else:
LOGGER.error(
f"{package.name} top-level folders should only be {', '.join(expected)}, found {found}"
)
return False



# def get_files(package: pathlib.Path) -> list:
# all_items = Path(package).rglob("*")
# all_files = [x for x in all_items if x.is_file()]
# files_dict = []
# for file in all_files:
# dict = {"name": file.name, "strpath": file.absolute(), "pospath": file}
# files_dict.append(dict)
# # print(test)
# return files_dict


# check to see the expected folders are in package based on file extension
def files_named_correctly(package, dir_name, ending) -> bool:

# what happens if EditMasters doesn't exist
dir = (package / dir_name)
if dir.exists():
contents = dir.rglob("*")
#else:
#Logger Error could not check folder, folder does not exist

expected = False
for item in contents:
if not item.stem.endswith(ending):
LOGGER.error(f"{package.name} has item in {dir_name} missing expected {ending}, found {item.name}")
expected = False

if not expected:
return False
else:
return True

def edits_named_correctly(package) -> bool:
return files_named_correctly(package, "EditMasters", "_em")

def service_named_correctly(package) -> bool:
return files_named_correctly(package, "ServiceCopies", "_sc")

def ao_named_correctly(package) -> bool:
return files_named_correctly(package, "ArchiveOriginals", "_ao")

types = {
"_ao": "ArchiveOriginals",
"_em": "EditMasters",
"_sc": "ServiceCopies",
"_pm": "PreservationMasters",
}

# dict ={'name': file.name,
# 'path':file}

inspect = []

# for item in files_dict:
# for key in types:
# if re.search(key, files_dict['name']):
# print(f'{files_dict["name"]} is {types[key]}')

for item in files_dict:
for key in types:
if re.search(key, item["name"]) and re.search(types[key], item["strpath"]):
print(f'{item["name"]} is in {types[key]} as expected')
else:
inspect.append(item)

# for item in inspect:
# print(f'what is this?: {item}')


# if this works, try with not and result = true/false as written below
# result = True
# for item in contents:
# if not item.name in expected:
# result = False

# return result

# #check to see files are in appropriate folders:
# def validate_folders_file_match():
# return True


def main():
args = parse_args()
print(args)
# for loop for accessing namespace list of one or more
for source in args.packages:
folders = get_structure(source)
files = get_files(source)
validate_folder_content_types(files)


if __name__ == "__main__":
main()
135 changes: 135 additions & 0 deletions src/born_digital_docs_scripts/make_sc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
from pathlib import Path
import logging
import subprocess
import argparse
import re

# Accept a directory (could hardcode or argparse)
def parse_args() -> argparse.Namespace:
def extant_path(p: str) -> Path:
path = Path(p)
if not path.exists():
raise argparse.ArgumentTypeError(f"{path} does not exist")
return path

# def rclone_remote(p: str) -> Path:
# if not re.match(r'*:*', p):
# raise argparse.ArgumentTypeError(f"{p} doesn't looke like an rclone remote")
# return p

parser = argparse.ArgumentParser(description="path to a directory of born digital ami")
parser.add_argument("--source", "-s", required=True, type=extant_path)
parser.add_argument("--dest", "-d", required=True, type=str)
return parser.parse_args()


# Function take directory (or staged excel), Find all EM files, return list
def get_em(path: Path) -> list[Path]:
source = path
ems = []
for x in source.rglob("*_em.*"):
if not str(x).endswith('mov'):
print(f"is this okay?: {x}")
else:
ems.append(x)

# print(ems)
return ems


# Function takes list of EM files, find if interlaced or not, return list of [path, interlaced] → mediainfo (use Inform argument)
def find_interlace(path: list[Path]) -> list[list[Path,str]]:
interlacing = subprocess.check_output(['mediainfo',
"--Inform=Video;%ScanType%",
path],
encoding='utf-8').strip()



#below for testing purposes
# if len(str(path)) / 2 == 0: # just to create variation for now
# interlacing.append([path, 'interlaced'])
# else:
# interlacing.append([path, 'progressive'])
# print(interlacing)
return path, interlacing


# Function take list of of [path, interlaced], create ffmpeg commands (need to adjust interlacing and service file path per command) (may need to create servicecopy folder before ffmpeg runs), return list of commands


# Overwrite servicecopies, may need to add flag to ffmpeg to do this (-y?)
def make_commands(file: list[list[Path,str]]) -> list[list[str]]:
em_path = file[0]
em_path_str = str(em_path)
# PosixPath('test_ems/dir_3/data/EditMaster/sample_dig_3_em.mov')
base = em_path.parent.parent
'''
May be useful to include a check for service copy directory already existing.
added because ffmpeg did not like the directory not already existing
'''
dest = base / 'ServiceCopies'
subprocess.run(['mkdir', f'{dest}'])
sc_path_str = str(base / 'ServiceCopies' / em_path.name.replace("em.mov", "sc.mp4"))
if file[1] == 'interlaced':
cmd = ['ffmpeg', '-i', em_path_str, '-map', '0:v', '-map', '0:a', '-c:v', 'libx264', '-movflags', '+faststart', '-crf', '20', '-maxrate', '7.5M', '-bufsize', '7.5M', '-vf', 'yadif', '-c:a', 'aac', '-b:a', '320000', '-ar', '48000', sc_path_str]
else:
cmd = ['ffmpeg', '-i', em_path_str, '-map', '0:v', '-map', '0:a', '-c:v', 'libx264', '-movflags', '+faststart', '-crf', '20', '-maxrate', '7.5M', '-bufsize', '7.5M', '-c:a', 'aac', '-b:a', '320000', '-ar', '48000', sc_path_str]


return cmd



# # Function take list of commands, run each command, return list of sc files
def make_sc(command: list[list[str]]) -> list[str]:
# logging.DEBUG(f"Running this command {c}")
subprocess.run(command)
sc = command[-1]
logging.info(f"{command[-1]} created")

return sc

# # Function take list of sc files, make rclone command, return list of commands
def make_rclone(file: str, dest: str) -> list[list[str]]:
fn = Path(file).name
rc = ['rclone', 'copyto', file, f'{dest}/{fn}', '-P']

# print(commands)
return rc

# # Function take list of rclone commands, run each, return none
def run_rclone(command: list[str]) -> None:
logging.info(f"transferring {command[2]}")
subprocess.run(command)
logging.info(f"{command[2]} has been transferred")

return None

def main():
source = parse_args().source
dest = parse_args().dest
ems = get_em(source)
for em in ems:
em_path = find_interlace(em)
ff_cmds = make_commands(em_path)
if Path(ff_cmds[-1]).exists():
continue
print(em, ff_cmds[-1])
if not str(em.name).startswith('myd'):
continue
sc = make_sc(ff_cmds)
rc_cmds = make_rclone(ff_cmds[-1],dest)

run_rclone(rc_cmds)
# sc

# for cmd in ff_cmds:
# print(cmd[-1])





if __name__ == "__main__":
main()
10 changes: 5 additions & 5 deletions tests/test_bd_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pytest

import born_digital_docs_scripts.bd_validator as bv
import born_digital_docs_scripts.lint_bdami as bd


@pytest.fixture
Expand All @@ -12,22 +12,22 @@ def good_package():

@pytest.fixture
def good_structure(good_package):
return bv.get_structure(good_package)
return bd.get_structure(good_package)


def test_is_package_bag(good_package):
result = bv.is_valid_bag(good_package)
result = bd.is_valid_bag(good_package)
assert result is True


def test_expected_folders_present(good_structure):
result = bv.valid_structure(good_structure)
result = bd.valid_structure(good_structure)
assert result


def test_warning_unexpected_folder(good_structure):
good_structure.append(Path("unknown_folder")) # not sure if this is correct
result = bv.valid_structure(good_structure)
result = bd.valid_structure(good_structure)
assert not result


Expand Down
Loading