diff --git a/cleanroom/binarymanager.py b/cleanroom/binarymanager.py index aa661b4..7523af8 100644 --- a/cleanroom/binarymanager.py +++ b/cleanroom/binarymanager.py @@ -17,33 +17,36 @@ class Binaries(Enum): """Important binaries.""" + APT_GET = auto() BORG = auto() BTRFS = auto() - MKNOD = auto() - PACMAN = auto() - PACMAN_KEY = auto() - APT_GET = auto() - DPKG = auto() + CHROOT_HELPER = auto() + CPIO = auto() DEBOOTSTRAP = auto() - SBSIGN = auto() - OBJCOPY = auto() - MKSQUASHFS = auto() - VERITYSETUP = auto() - TAR = auto() - USERMOD = auto() - USERADD = auto() + DEPMOD = auto() + DPKG = auto() + FIND = auto() + FLOCK = auto() GROUPADD = auto() GROUPMOD = auto() - CHROOT_HELPER = auto() - SYSTEMCTL = auto() - SFDISK = auto() - FLOCK = auto() + MKFS_VFAT = auto() + MKNOD = auto() + MKSQUASHFS = auto() + MODPROBE = auto() + NBD_CLIENT = auto() + OBJCOPY = auto() + PACMAN = auto() + PACMAN_KEY = auto() QEMU_IMG = auto() QEMU_NBD = auto() - NBD_CLIENT = auto() - MKFS_VFAT = auto() + SBSIGN = auto() + SFDISK = auto() SYNC = auto() - MODPROBE = auto() + SYSTEMCTL = auto() + TAR = auto() + USERADD = auto() + USERMOD = auto() + VERITYSETUP = auto() def _check_for_binary(binary: str) -> str: @@ -67,25 +70,28 @@ def _find_binaries() -> typing.Dict[Binaries, str]: binaries = { Binaries.BORG: _check_for_binary("/usr/bin/borg"), Binaries.BTRFS: _check_for_binary("/usr/bin/btrfs"), - Binaries.SBSIGN: _check_for_binary("/usr/bin/sbsign"), - Binaries.OBJCOPY: _check_for_binary("/usr/bin/objcopy"), - Binaries.MKNOD: _check_for_binary("/usr/bin/mknod"), - Binaries.MKSQUASHFS: _check_for_binary("/usr/bin/mksquashfs"), - Binaries.TAR: _check_for_binary("/usr/bin/tar"), - Binaries.USERMOD: _check_for_binary("/usr/sbin/usermod"), - Binaries.USERADD: _check_for_binary("/usr/sbin/useradd"), - Binaries.GROUPMOD: _check_for_binary("/usr/sbin/groupmod"), - Binaries.GROUPADD: _check_for_binary("/usr/sbin/groupadd"), Binaries.CHROOT_HELPER: _check_for_binary("/usr/bin/arch-chroot"), - Binaries.SYSTEMCTL: _check_for_binary("/usr/bin/systemctl"), + Binaries.CPIO: _check_for_binary("/usr/bin/cpio"), + Binaries.DEPMOD: _check_for_binary("/usr/bin/depmod"), + Binaries.FIND: _check_for_binary("/usr/bin/find"), Binaries.FLOCK: _check_for_binary("/usr/bin/flock"), - Binaries.SFDISK: _check_for_binary("/usr/bin/sfdisk"), + Binaries.GROUPADD: _check_for_binary("/usr/sbin/groupadd"), + Binaries.GROUPMOD: _check_for_binary("/usr/sbin/groupmod"), + Binaries.MKFS_VFAT: _check_for_binary("/usr/bin/mkfs.vfat"), + Binaries.MKNOD: _check_for_binary("/usr/bin/mknod"), + Binaries.MKSQUASHFS: _check_for_binary("/usr/bin/mksquashfs"), + Binaries.MODPROBE: _check_for_binary("/usr/bin/modprobe"), + Binaries.NBD_CLIENT: _check_for_binary("/usr/bin/nbd-client"), + Binaries.OBJCOPY: _check_for_binary("/usr/bin/objcopy"), Binaries.QEMU_IMG: _check_for_binary("/usr/bin/qemu-img"), Binaries.QEMU_NBD: _check_for_binary("/usr/bin/qemu-nbd"), - Binaries.NBD_CLIENT: _check_for_binary("/usr/bin/nbd-client"), - Binaries.MKFS_VFAT: _check_for_binary("/usr/bin/mkfs.vfat"), + Binaries.SBSIGN: _check_for_binary("/usr/bin/sbsign"), + Binaries.SFDISK: _check_for_binary("/usr/bin/sfdisk"), Binaries.SYNC: _check_for_binary("/usr/bin/sync"), - Binaries.MODPROBE: _check_for_binary("/usr/bin/modprobe"), + Binaries.SYSTEMCTL: _check_for_binary("/usr/bin/systemctl"), + Binaries.TAR: _check_for_binary("/usr/bin/tar"), + Binaries.USERADD: _check_for_binary("/usr/sbin/useradd"), + Binaries.USERMOD: _check_for_binary("/usr/sbin/usermod"), } os_binaries: typing.Dict[Binaries, str] = {} distribution = _get_distribution() @@ -93,14 +99,14 @@ def _find_binaries() -> typing.Dict[Binaries, str]: if distribution == "debian": os_binaries = { Binaries.APT_GET: _check_for_binary("/usr/bin/apt-get"), - Binaries.DPKG: _check_for_binary("/usr/bin/dpkg"), Binaries.DEBOOTSTRAP: _check_for_binary("/usr/sbin/debootstrap"), + Binaries.DPKG: _check_for_binary("/usr/bin/dpkg"), Binaries.VERITYSETUP: _check_for_binary("/usr/sbin/veritysetup"), } elif distribution == "arch" or distribution == "archlinux": os_binaries = { - Binaries.PACMAN: _check_for_binary("/usr/bin/pacman"), Binaries.PACMAN_KEY: _check_for_binary("/usr/bin/pacman-key"), + Binaries.PACMAN: _check_for_binary("/usr/bin/pacman"), Binaries.VERITYSETUP: _check_for_binary("/usr/bin/veritysetup"), } else: @@ -116,6 +122,7 @@ class BinaryManager: def __init__(self) -> None: """Constructor.""" self._binaries = _find_binaries() + self._optionals: typing.List[Binaries] = [] def preflight_check(self) -> None: passed = True @@ -123,8 +130,11 @@ def preflight_check(self) -> None: if b[1]: debug("{} found: {}...".format(b[0], b[1])) else: - warn("{} not found.".format(b[0])) - passed = False + if b in self._optionals: + debug("[OPTIONAL] {} not found, ignoring.") + else: + warn("{} not found.".format(b[0])) + passed = False if not passed: raise PreflightError("Required binaries are not available.") diff --git a/cleanroom/command.py b/cleanroom/command.py index 0481ad9..eb97e5d 100644 --- a/cleanroom/command.py +++ b/cleanroom/command.py @@ -14,7 +14,7 @@ from .exceptions import GenerateError, ParseError from .execobject import ExecObject from .location import Location -from .printer import fail, h3, success, verbose +from .printer import debug, fail, h3, success, verbose from .systemcontext import SystemContext import os @@ -47,18 +47,23 @@ def __init__( file: str, syntax: str = "", help_string: str, - **services: typing.Any + **services: typing.Any, ) -> None: """Constructor.""" self._name = name self._syntax_string = syntax self._help_string = help_string helper_directory = os.path.join( - os.path.dirname(os.path.realpath(file)), "helper", self._name + os.path.dirname(os.path.realpath(file)), os.path.basename(file)[:-3], ) self.__helper_directory = ( helper_directory if os.path.isdir(helper_directory) else None ) + if self.__helper_directory is None: + debug(f"Checked {helper_directory} for helpers for command {name}: NONE") + else: + debug(f"Checked {helper_directory} for helpers for command {name}: FOUND") + self._services = services @property @@ -103,7 +108,7 @@ def __call__( location: Location, system_context: SystemContext, *args: typing.Any, - **kwargs: typing.Any + **kwargs: typing.Any, ) -> None: """Implement this! @@ -117,7 +122,7 @@ def _execute( system_context: SystemContext, command: str, *args: typing.Any, - **kwargs: typing.Any + **kwargs: typing.Any, ) -> None: command_info = self._service("command_manager").command(command) if not command_info: @@ -132,7 +137,7 @@ def _add_hook( hook_name: str, command: str, *args: typing.Any, - **kwargs: typing.Any + **kwargs: typing.Any, ) -> None: """Add a hook.""" command_info = self._service("command_manager").command(command) @@ -170,7 +175,7 @@ def _helper_directory(self) -> typing.Optional[str]: """Return the helper directory.""" return self.__helper_directory - def _config_directory(self, system_context) -> str: + def _config_directory(self, system_context: SystemContext) -> str: return os.path.join( system_context.systems_definition_directory, "config", self.name ) @@ -188,7 +193,7 @@ def _validate_arguments_exact( arg_count: int, message: str, *args: typing.Any, - **kwargs: typing.Any + **kwargs: typing.Any, ) -> None: self._validate_args_exact(location, arg_count, message, *args) self._validate_kwargs(location, (), **kwargs) @@ -199,7 +204,7 @@ def _validate_arguments_at_least( arg_count: int, message: str, *args: typing.Any, - **kwargs: typing.Any + **kwargs: typing.Any, ) -> None: self._validate_args_at_least(location, arg_count, message, *args) self._validate_kwargs(location, (), **kwargs) @@ -223,7 +228,7 @@ def _validate_kwargs( self, location: Location, known_kwargs: typing.Tuple[str, ...], - **kwargs: typing.Any + **kwargs: typing.Any, ) -> None: if not known_kwargs: if kwargs: @@ -244,7 +249,7 @@ def _require_kwargs( self, location: Location, required_kwargs: typing.Tuple[str, ...], - **kwargs: typing.Any + **kwargs: typing.Any, ) -> None: for key in required_kwargs: if key not in kwargs: diff --git a/cleanroom/commandmanager.py b/cleanroom/commandmanager.py index 02826eb..d7bf8a9 100644 --- a/cleanroom/commandmanager.py +++ b/cleanroom/commandmanager.py @@ -8,10 +8,9 @@ from .command import Command, stringify from .exceptions import PreflightError from .location import Location -from .printer import debug, error, h2, success, trace -from .systemcontext import SystemContext, _recursive_expand +from .printer import debug, h2, success, trace +from .systemcontext import SystemContext -import collections import importlib.util import inspect import os @@ -19,29 +18,40 @@ import typing -CommandInfo = collections.namedtuple( - "CommandInfo", - [ - "name", - "syntax_string", - "help_string", - "file_name", - "dependency_func", - "validate_func", - "execute_func", - "register_substitutions", - ], -) +class CommandInfo(typing.NamedTuple): + name: str + syntax_string: str + help_string: str + file_name: str + dependency_func: typing.Callable[ + [typing.Tuple[typing.Any, ...], typing.Dict[str, typing.Any]], + typing.Optional[str], + ] + validate_func: typing.Callable[ + [Location, typing.Tuple[typing.Any, ...], typing.Dict[str, typing.Any],], None, + ] + execute_func: typing.Callable[ + [ + Location, + SystemContext, + typing.Tuple[typing.Any, ...], + typing.Dict[str, typing.Any], + ], + None, + ] + register_substitutions: typing.Callable[ + [], typing.List[typing.Tuple[str, str, str]] + ] def _process_args(system_context: SystemContext, *args: typing.Any) -> typing.Any: - return tuple(map(lambda a: _recursive_expand(system_context, a), args)) + return tuple(map(lambda a: system_context.expand(a), args)) def _process_kwargs( system_context: SystemContext, **kwargs: typing.Any ) -> typing.Dict[str, typing.Any]: - return {k: _recursive_expand(system_context, v) for k, v in kwargs.items()} + return {k: system_context.expand(v) for k, v in kwargs.items()} def call_command( @@ -49,7 +59,7 @@ def call_command( system_context: SystemContext, command: Command, *args: typing.Any, - **kwargs: typing.Dict[str, typing.Any] + **kwargs: typing.Any ): _args = _process_args(system_context, *args) _kwargs = _process_kwargs(system_context, **kwargs) @@ -82,20 +92,33 @@ def print_commands(self) -> None: ) ) - def _collect_substitutions(self) -> typing.List[typing.Tuple[str, str, str, str]]: - result: typing.List[typing.Tuple[str, str, str, str]] = [] - duplications: typing.Dict[str, str] = {} + def _collect_substitutions( + self, + ) -> typing.List[typing.Tuple[str, str, str, typing.Tuple[str, ...]]]: + result: typing.Dict[ + str, typing.Tuple[str, str, str, typing.Tuple[str, ...]] + ] = {} + for cmd in self._commands.keys(): command_info = self.command(cmd) assert command_info name = command_info.name for (key, value, description) in command_info.register_substitutions(): - result.append((key, value, description, name)) - assert not key in duplications - duplications[key] = name - - return result + if not key in result: + result[key] = (key, value, description, (name,)) + else: + (old_key, old_value, old_description, old_names) = result[key] + assert ( + old_key == key + and old_value == value + and old_description == description + and not name in old_names + and old_names + ) + result[key] = (key, value, description, (*old_names, name)) + + return [v for v in result.values()] def print_substitutions(self) -> None: h2("Predefined Substitutions:") @@ -117,10 +140,10 @@ def print_substitutions(self) -> None: print(' {} ("{}"): {}\n {}\n'.format(key, value, name, description)) def setup_substitutions(self, system_context: SystemContext): - if system_context._base_context: + if system_context.base_context: debug( 'System Context inherited, using substitutions from "{}".'.format( - system_context._base_context.system_name + system_context.base_context.system_name ) ) return @@ -216,7 +239,7 @@ def _find_commands_in_directory(self, directory: str) -> None: assert spec and spec.loader spec.loader.exec_module(cmd_module) - def is_command(x): + def is_command(x: typing.Any) -> bool: return ( inspect.isclass(x) and x.__name__.endswith("Command") diff --git a/cleanroom/commands/_create_clrm_initrd_extra.py b/cleanroom/commands/_create_clrm_initrd_extra.py new file mode 100644 index 0000000..af9de00 --- /dev/null +++ b/cleanroom/commands/_create_clrm_initrd_extra.py @@ -0,0 +1,633 @@ +# -*- coding: utf-8 -*- +"""create_clrm_initrd_extra command. + +@author: Tobias Hunger +""" + +from cleanroom.exceptions import GenerateError +from cleanroom.binarymanager import Binaries +from cleanroom.command import Command +from cleanroom.helper.run import run +from cleanroom.location import Location +from cleanroom.systemcontext import SystemContext +from cleanroom.printer import debug, info, trace + +import os +from shutil import copyfile +from tempfile import TemporaryDirectory +import typing + + +_modules: typing.Set[str] = set() +_extra_modules: typing.List[str] = [] + + +def _populate_modules(fs_directory: str): + module_directory = os.path.join(fs_directory, "usr/lib/modules") + + for _, _, files in os.walk(module_directory): + for f in files: + module_name = os.path.basename(f) + + ko_pos = module_name.find(".ko") + if ko_pos >= 0: + module_name = module_name[:ko_pos] + + trace(f"Found a kernel module: {module_name}.") + _modules.add(module_name) + + +def _device_ify(device: str) -> str: + if not device: + return "" + if device.startswith("PARTLABEL="): + device = "/dev/disk/by-partlabel/" + device[10:] + elif device.startswith("LABEL="): + device = "/dev/disk/by-label/" + device[6:] + elif device.startswith("PARTUUID="): + device = "/dev/disk/by-partuuid/" + device[9:] + elif device.startswith("UUID="): + device = "/dev/disk/by-uuid/" + device[5:] + elif device.startswith("ID="): + device = "/dev/disk/by-id/" + device[3:] + elif device.startswith("PATH="): + device = "/dev/disk/by-path/" + device[5:] + assert device.startswith("/dev/") + return device + + +def _escape_device(device: str) -> str: + device = _device_ify(device) + + device = device.replace("-", "\\x2d") + device = device.replace("=", "\\x3d") + device = device.replace(";", "\\x3b") + device = device.replace("/", "-") + + return device[1:] + + +def _trim_empty_directories(root_dir: str): + for root, directories, files in os.walk(root_dir, topdown=False): + if not directories and not files: + os.removedirs(os.path.join(root_dir, root)) + + +def _tokenize_line(line: str) -> typing.List[str]: + token = "" + token_list: typing.List[str] = [] + while line: + current_char = line[0] + line = line[1:] + + if current_char == "#": + line = "" + continue + + if current_char.isspace(): + if token: + token_list.append(token) + token = "" + continue + + token += current_char + + if token: + token_list.append(token) + + return token_list + + +def _replace( + contents: str, replacements: typing.Dict[str, typing.Optional[str]] +) -> typing.Tuple[str, bool]: + replacement_failed = False + + did_replacement = False + + input_contents = contents + + for k, v in replacements.items(): + r = "{" + k + "}" + if r in contents: + did_replacement = True + old_contents = contents + if v is None: + debug(f' SKIPPING replacement of "{r}": value is None.') + replacement_failed = True + else: + debug(f' Replacing "{r}" with "{v}".') + contents = contents.replace(r, v) + assert old_contents != contents + + assert not did_replacement or replacement_failed or input_contents != contents + + return (contents, replacement_failed) + + +def _do_file( + is_optional: bool, + *args: str, + base_dir: str, + fs_dir: str, + system_fs_directory: str, + replacements: typing.Dict[str, typing.Optional[str]], +): + # validate the inputs: + assert len(args) >= 1 and len(args) <= 2 + src = args[0] + dest = "" + if len(args) == 2: + dest = args[1] + else: + assert os.path.isabs(src) + dest = src + assert src and dest + assert os.path.isabs(dest) + + (real_src, src_failed) = _replace(src, replacements) + (real_dest, dest_failed) = _replace(dest, replacements) + + if src_failed and not is_optional: + raise GenerateError(f"FILE failed: {src} failed to replace") + if dest_failed and not is_optional: + raise GenerateError(f"FILE failed: {dest} failed to replace.") + + replace_contents = False + if os.path.isabs(real_src): + trace(f"{real_src} is absolute, resolving relative to {system_fs_directory}.") + real_src = os.path.join(system_fs_directory, real_src[1:]) + else: + replace_contents = True + trace(f"{real_src} is not absolute, resolving relative to {base_dir}.") + real_src = os.path.join(base_dir, real_src) + + if not os.path.isfile(real_src): + trace(f"FILE: Source file {real_src} does not exist.") + if is_optional: + return + else: + raise GenerateError(f"FILE: Source file {src} does not exist.") + + real_dest = os.path.join(fs_dir, real_dest[1:]) + + if not os.path.exists(os.path.dirname(real_dest)): + os.makedirs(os.path.dirname(real_dest)) + + trace(f"FILE: Copying data from {real_src} -> {real_dest}.") + + if replace_contents: + with open(real_src, "r") as fd: + contents = fd.read() + + (contents, fail) = _replace(contents, replacements) + if fail: + if is_optional: + return + else: + raise GenerateError("FILE failed: contents failed to replace.") + + with open(real_dest, "w") as fd: + fd.write(contents) + else: + copyfile(real_src, real_dest) + + assert os.path.isfile(real_dest) + + debug( + f"FILE action {real_src} -> {real_dest} (optional={is_optional}, replace_contents={replace_contents}): SUCCESS" + ) + + +def _do_binary( + is_optional: bool, + *args: str, + base_dir: str, + fs_dir: str, + system_fs_directory: str, + replacements: typing.Dict[str, typing.Optional[str]], +): + assert len(args) >= 1 and len(args) <= 2 + src = args[0] + dest = "" + if len(args) == 2: + dest = args[1] + else: + assert os.path.isabs(src) + dest = src + assert src and dest + assert os.path.isabs(dest) + + (real_src, src_failed) = _replace( + os.path.join(system_fs_directory, src[1:]), replacements + ) + (real_dest, dest_failed) = _replace(os.path.join(fs_dir, dest[1:]), replacements) + + if src_failed and not is_optional: + raise GenerateError(f"BINARY failed: {src} failed to replace") + if dest_failed and not is_optional: + raise GenerateError(f"BINARY failed: {dest} failed to replace.") + + if os.path.isfile(real_src): + trace(f"BINARY: Copying data from {real_src} -> {real_dest}.") + + if not os.path.exists(os.path.dirname(real_dest)): + os.makedirs(os.path.dirname(real_dest)) + + copyfile(real_src, real_dest) + assert os.path.isfile(real_dest) + # Fix up permissions and ownership: + os.chmod(real_dest, 0o755) + os.chown(real_dest, 0, 0) + else: + if not is_optional: + raise GenerateError(f"BINARY does not exist at {real_src}.") + trace(f"Binary {src} not installed into extra initrd!") + + ### TODO: Handle dependencies! + + debug(f"BINARY action {src} -> {dest} (optional={is_optional}): SUCCESS") + + +def _do_link( + is_optional: bool, + *args: str, + base_dir: str, + fs_dir: str, + system_fs_directory: str, + replacements: typing.Dict[str, typing.Optional[str]], +): + assert len(args) == 2 + src = args[0] + assert os.path.isabs(src) + dest = args[1] + + (real_src, src_failed) = _replace(os.path.join(fs_dir, src[1:]), replacements) + (real_dest, dest_failed) = _replace(dest, replacements) + + if src_failed: + if is_optional: + return + else: + raise GenerateError(f"LINK failed: {src} failed to replace") + if dest_failed: + if is_optional: + return + else: + raise GenerateError(f"LINK failed: {dest} failed to replace.") + + if not os.path.isdir(os.path.dirname(real_src)): + os.makedirs(os.path.dirname(real_src)) + + if not real_dest.startswith("/dev/"): + if os.path.isabs(dest): + initrd_dest = os.path.join(fs_dir, real_dest[1:]) + else: + initrd_dest = os.path.join(os.path.dirname(real_src), real_dest) + + if not os.path.exists(initrd_dest): + trace( + f"Link target {dest} does not exist in extra initrd!\n full target path: {initrd_dest}...\n is_optional: {is_optional}...\n files: {os.listdir(os.path.dirname(initrd_dest))}..." + ) + if is_optional: + return + else: + raise GenerateError(f"LINK target {dest} does not exist.") + + trace(f"LINK: Creating symlink from {real_src} -> {real_dest}.") + os.symlink(real_dest, real_src) + + assert os.path.islink(real_src) + + debug(f"LINK action {src} -> {dest} (optional={is_optional}): SUCCESS") + + +def _do_module( + is_optional: bool, + *args: str, + base_dir: str, + fs_dir: str, + system_fs_directory: str, + replacements: typing.Dict[str, typing.Optional[str]], +): + assert len(args) == 1 + module = args[0] + + if not module in _modules: + if not is_optional: + raise GenerateError(f"MODULE {module} was not found.") + trace(f"Module {module} not installed into extra initrd!") + return + + debug(f"MODULE action {module} (optional={is_optional}): SUCCESS") + _extra_modules.append(module) + + module_file = os.path.basename(base_dir) + module_file_path = os.path.join(fs_dir, f"etc/modules-load.d/{module_file}.conf") + contents = "" + + if not os.path.exists(os.path.dirname(module_file_path)): + os.makedirs(os.path.dirname(module_file_path)) + + if os.path.exists(module_file_path): + with open(module_file_path, "r") as fd_in: + contents = fd_in.read() + + if not contents: + contents += f"# Load modules for {module_file}:\n" + contents += module + + with open(module_file_path, "w") as fd_out: + fd_out.write(contents) + + +def _do( + action: str, + is_optional: bool, + *args: str, + base_dir: str, + fs_dir: str, + system_fs_directory: str, + replacements: typing.Dict[str, typing.Optional[str]], +): + trace( + f"Do {action} {args} (is_optional={is_optional})\n base_dir: {base_dir}...\n fs_dir: {fs_dir}...\n system_fs_directory: {system_fs_directory}..." + ) + if action == "FILE": + _do_file( + is_optional, + *args, + base_dir=base_dir, + fs_dir=fs_dir, + system_fs_directory=system_fs_directory, + replacements=replacements, + ) + elif action == "BINARY": + _do_binary( + is_optional, + *args, + base_dir=base_dir, + fs_dir=fs_dir, + system_fs_directory=system_fs_directory, + replacements=replacements, + ) + elif action == "LINK": + _do_link( + is_optional, + *args, + base_dir=base_dir, + fs_dir=fs_dir, + system_fs_directory=system_fs_directory, + replacements=replacements, + ) + elif action == "MODULE": + _do_module( + is_optional, + *args, + base_dir=base_dir, + fs_dir=fs_dir, + system_fs_directory=system_fs_directory, + replacements=replacements, + ) + else: + raise GenerateError("Unknown keyword {action} in initrd contents file.") + + +def _parse_line( + line: str, + *, + base_dir: str, + fs_dir: str, + system_fs_directory: str, + replacements: typing.Dict[str, typing.Optional[str]], +): + trace(f'Parsing line "{line}".') + tokens = _tokenize_line(line) + + is_optional = False + action = "" + + while tokens: + current = tokens[0] + tokens = tokens[1:] + + if current == "OPTIONAL": + is_optional = True + if action: + _do( + action, + is_optional, + *tokens, + base_dir=base_dir, + fs_dir=fs_dir, + system_fs_directory=system_fs_directory, + replacements=replacements, + ) + break + continue + + if action: + _do( + action, + is_optional, + current, + *tokens, + base_dir=base_dir, + fs_dir=fs_dir, + system_fs_directory=system_fs_directory, + replacements=replacements, + ) + break + else: + action = current + + +class CreateClrmInitrdExtraCommand(Command): + """The create_clrm_initrd_extra command.""" + + def __init__(self, **services: typing.Any) -> None: + """Constructor.""" + super().__init__( + "create_clrm_initrd_extra", + syntax="", + help_string="Create CLRM-specific initrd extra parts.", + file=__file__, + **services, + ) + + self._vg = "" + self._image_fs = "" + self._image_device = "" + self._image_options = "" + self._full_name = "" + + def validate( + self, location: Location, *args: typing.Any, **kwargs: typing.Any + ) -> None: + """Validate the arguments.""" + self._validate_arguments_exact( + location, 1, '"{}" takes an initrd to create.', *args, **kwargs + ) + + def register_substitutions(self) -> typing.List[typing.Tuple[str, str, str]]: + return [ + ("IMAGE_FS", "ext2", "The filesystem type to load clrm-images from",), + ("IMAGE_DEVICE", "", "The device to load clrm-images from",), + ( + "IMAGE_OPTIONS", + "rw", + "The filesystem options to mount the IMAGE_DEVICE with", + ), + ( + "DEFAULT_VG", + "", + "The volume group to look for clrm rootfs/verity partitions on", + ), + ] + + def _process_contents_file( + self, + contents_file: str, + *, + fs_dir: str, + system_fs_directory: str, + replacements: typing.Dict[str, typing.Optional[str]], + ): + debug(f"Processing initrd setup file {contents_file}.") + base_dir = os.path.dirname(contents_file) + with open(contents_file, "r") as contents: + for line in contents: + _parse_line( + line, + base_dir=base_dir, + fs_dir=fs_dir, + system_fs_directory=system_fs_directory, + replacements=replacements, + ) + debug(f"Done with initrd setup file {contents_file}.") + + def _process_helper_folders( + self, + *, + helper_dir: str, + fs_dir: str, + system_fs_directory: str, + replacements: typing.Dict[str, typing.Optional[str]], + ): + for dir in os.listdir(helper_dir): + contents_file = os.path.join(helper_dir, dir, "contents") + if os.path.isfile(contents_file): + self._process_contents_file( + contents_file, + fs_dir=fs_dir, + system_fs_directory=system_fs_directory, + replacements=replacements, + ) + trace(f"All helper folders processed!") + + def _create_initrd( + self, + initrd: str, + *, + helper_dir: str, + system_fs_directory: str, + replacements: typing.Dict[str, typing.Optional[str]], + ): + with TemporaryDirectory(prefix="clrm_initrd_") as fs_dir: + self._process_helper_folders( + helper_dir=helper_dir, + fs_dir=fs_dir, + system_fs_directory=system_fs_directory, + replacements=replacements, + ) + + _trim_empty_directories(fs_dir) + + # Document the files and directories: + trace(f"Temporary directory: {fs_dir}...") + for root, _, files in os.walk(fs_dir): + trace(f"+ {root}") + for f in files: + trace(f"| + {f}") + + # Package up the initrd: + run( + "/usr/bin/sh", + "-c", + f'"{self._binary(Binaries.FIND)}" . | "{self._binary(Binaries.CPIO)}" -o -H newc | gzip > "{initrd}"', + work_directory=fs_dir, + returncode=0, + ) + + trace("Extra initrd created.") + + def __call__( + self, + location: Location, + system_context: SystemContext, + *args: typing.Any, + **kwargs: typing.Any, + ) -> None: + """Execute command.""" + + # scan for the modules! + if not _modules: + _populate_modules(system_context.fs_directory) + assert _modules + + if not os.path.exists(os.path.join(system_context.boot_directory, "vmlinuz")): + info("Skipping clrm initrd extra generation: No vmlinuz in boot directory.") + return + + self._vg = system_context.substitution_expanded("DEFAULT_VG", "") + self._image_fs = system_context.substitution_expanded("IMAGE_FS", "") + self._image_device = _device_ify( + system_context.substitution_expanded("IMAGE_DEVICE", "") + ) + self._image_options = system_context.substitution_expanded("IMAGE_OPTIONS", "") + + image_name = system_context.substitution_expanded("CLRM_IMAGE_FILENAME", "") + self._full_name = image_name + + initrd = args[0] + + helper_dir = self._helper_directory + assert helper_dir + + trace(f"Looking for clrm initrd configuration in {helper_dir}.") + + image_base_name = self._full_name + pos = image_base_name.find(".") + if pos >= 0: + image_base_name = image_base_name[:pos] + + replacements: typing.Dict[str, typing.Optional[str]] = { + "image_device": self._image_device if self._image_device else None, + "escaped_image_device": _escape_device(self._image_device) + if self._image_device + else None, + "image_fs": self._image_fs if self._image_fs else None, + "image_options": self._image_options, # These may be empty! + "volume_group": self._vg if self._vg else None, + "image_full_name": self._full_name, + "image_base_name": image_base_name, + } + + for k, v in replacements.items(): + trace(f'Set up replacement: {k} -> {"" if v is None else v}...') + + self._create_initrd( + initrd, + helper_dir=helper_dir, + system_fs_directory=system_context.fs_directory, + replacements=replacements, + ) + + modules = ( + system_context.substitution("INITRD_EXTRA_MODULES", "").split(" ") + + _extra_modules + ) + system_context.set_substitution("INITRD_EXTRA_MODULES", " ".join(modules)) + + trace("Done with extra initrd creation.") + assert os.path.isfile(initrd) diff --git a/cleanroom/commands/_create_clrm_initrd_extra/image_boot/contents b/cleanroom/commands/_create_clrm_initrd_extra/image_boot/contents new file mode 100644 index 0000000..382e562 --- /dev/null +++ b/cleanroom/commands/_create_clrm_initrd_extra/image_boot/contents @@ -0,0 +1,9 @@ +OPTIONAL FILE ./images.mount /usr/lib/systemd/system/images.mount +FILE ./initrd-find-image-partitions.service /usr/lib/systemd/system/initrd-find-image-partitions.service +OPTIONAL FILE ./initrd-find-root-lv-partitions.service /usr/lib/systemd/system/initrd-find-root-lv-partitions.service + +MODULE loop + +OPTIONAL LINK /usr/lib/systemd/system/dev-{volume_group}-{image_full_name}.device.wants/initrd-find-root-lv-partitions.service ../initrd-find-root-lv-partitions.service +OPTIONAL LINK /usr/lib/systemd/system/{escaped_image_device}.device.wants/images.mount ../images.mount +LINK /usr/lib/systemd/system/images.mount.wants/initrd-find-image-partitions.service ../initrd-find-image-partitions.service diff --git a/cleanroom/commands/_create_clrm_initrd_extra/image_boot/images.mount b/cleanroom/commands/_create_clrm_initrd_extra/image_boot/images.mount new file mode 100644 index 0000000..6165f14 --- /dev/null +++ b/cleanroom/commands/_create_clrm_initrd_extra/image_boot/images.mount @@ -0,0 +1,10 @@ +[Unit] +Description=Mount /images in initrd +DefaultDependencies=no +After=systemd-cryptsetup@main.service + +[Mount] +What={image_device} +Where=/images +Type={image_fs} +Options={image_options},nodev,noexec,nosuid,ro diff --git a/cleanroom/commands/_create_clrm_initrd_extra/image_boot/initrd-find-image-partitions.service b/cleanroom/commands/_create_clrm_initrd_extra/image_boot/initrd-find-image-partitions.service new file mode 100644 index 0000000..7a9ea6e --- /dev/null +++ b/cleanroom/commands/_create_clrm_initrd_extra/image_boot/initrd-find-image-partitions.service @@ -0,0 +1,17 @@ +[Unit] +Description=Find partitions in image files +DefaultDependencies=no +ConditionFileNotEmpty=/images/{image_full_name} +After=images.mount +BindsTo=images.mount +Requisite=images.mount + +[Service] +WorkingDirectory=/ +Type=oneshot +RemainAfterExit=yes +ExecStart=/usr/bin/losetup -rP /dev/loop7 /images/{image_full_name} +ExecStop=/usr/bin/losetup -d /dev/loop7 + +[Install] +WantedBy=images.mount diff --git a/cleanroom/commands/_create_clrm_initrd_extra/image_boot/initrd-find-root-lv-partitions.service b/cleanroom/commands/_create_clrm_initrd_extra/image_boot/initrd-find-root-lv-partitions.service new file mode 100644 index 0000000..1f55c7c --- /dev/null +++ b/cleanroom/commands/_create_clrm_initrd_extra/image_boot/initrd-find-root-lv-partitions.service @@ -0,0 +1,16 @@ +[Unit] +Description=Find partitions in root LV +DefaultDependencies=no +ConditionPathExists=/dev/{volume_group}/{image_base_name} +After=dev-{volume_group}-{image_base_name}.device +BindsTo=dev-{volume_group}-{image_base_name}.device +Requisite=dev-{volume_group}-{image_base_name}.device + +[Service] +WorkingDirectory=/ +Type=oneshot +RemainAfterExit=yes +ExecStart=/usr/bin/partprobe /dev/{volume_group}/{image_base_name} + +[Install] +WantedBy=dev-{volume_group}-{image_base_name}.device diff --git a/cleanroom/commands/_create_clrm_initrd_extra/root_passwd/contents b/cleanroom/commands/_create_clrm_initrd_extra/root_passwd/contents new file mode 100644 index 0000000..2630230 --- /dev/null +++ b/cleanroom/commands/_create_clrm_initrd_extra/root_passwd/contents @@ -0,0 +1 @@ +OPTIONAL FILE /etc/shadow.initramfs /etc/shadow diff --git a/cleanroom/commands/_create_clrm_initrd_extra/stateless/contents b/cleanroom/commands/_create_clrm_initrd_extra/stateless/contents new file mode 100644 index 0000000..d34cea0 --- /dev/null +++ b/cleanroom/commands/_create_clrm_initrd_extra/stateless/contents @@ -0,0 +1,8 @@ +OPTIONAL FILE /usr/lib/systemd/system/sysroot-var.mount +FILE ./initrd-sysroot-setup.service /usr/lib/systemd/system/initrd-sysroot-setup.service + +OPTIONAL LINK /usr/lib/systemd/system/initrd.target.wants ../sysroot-var.mount +LINK /usr/lib/systemd/system/initrd.target.wants/initrd-sysroot-setup.service ../initrd-sysroot-setup.service +OPTIONAL LINK /usr/lib/systemd/system/initrd-fs.target.wants/sysroot-var.mount ../sysroot-var.mount + +MODULE squashfs diff --git a/cleanroom/commands/_create_clrm_initrd_extra/stateless/initrd-sysroot-setup.service b/cleanroom/commands/_create_clrm_initrd_extra/stateless/initrd-sysroot-setup.service new file mode 100644 index 0000000..d593e2a --- /dev/null +++ b/cleanroom/commands/_create_clrm_initrd_extra/stateless/initrd-sysroot-setup.service @@ -0,0 +1,15 @@ +[Unit] +Description=Set up root fs in /sysroot +DefaultDependencies=no +ConditionPathExists=/sysroot/usr/lib/boot/root-fs.tar +Requires=sysroot.mount +After=sysroot.mount systemd-volatile-root.service +Before=initrd-root-fs.target shutdown.target +Conflicts=shutdown.target +AssertPathExists=/etc/initrd-release + +[Service] +Type=oneshot +RemainAfterExit=yes +ExecStart=/usr/bin/tar -C /sysroot -xf /sysroot/usr/lib/boot/root-fs.tar + diff --git a/cleanroom/commands/_create_clrm_initrd_extra/verity/contents b/cleanroom/commands/_create_clrm_initrd_extra/verity/contents new file mode 100644 index 0000000..a9bb9e3 --- /dev/null +++ b/cleanroom/commands/_create_clrm_initrd_extra/verity/contents @@ -0,0 +1,5 @@ +BINARY /usr/lib/systemd/systemd-veritysetup +BINARY /usr/lib/systemd/system-generators/systemd-veritysetup-generator + +MODULE dm-verity + diff --git a/cleanroom/commands/_create_clrm_initrd_extra/volatile/contents b/cleanroom/commands/_create_clrm_initrd_extra/volatile/contents new file mode 100644 index 0000000..0744ac0 --- /dev/null +++ b/cleanroom/commands/_create_clrm_initrd_extra/volatile/contents @@ -0,0 +1,5 @@ +BINARY /usr/lib/systemd/systemd-volatile-root + +FILE /usr/lib/systemd/system/systemd-volatile-root.service + +LINK /usr/lib/systemd/system/initrd.target.wants/systemd-volatile-root.service ../systemd-volatile-root.service diff --git a/cleanroom/commands/_depmod_all.py b/cleanroom/commands/_depmod_all.py new file mode 100644 index 0000000..ef56586 --- /dev/null +++ b/cleanroom/commands/_depmod_all.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- +"""_depmod_all command. + +@author: Tobias Hunger +""" + + +from cleanroom.binarymanager import Binaries +from cleanroom.command import Command +from cleanroom.location import Location +from cleanroom.systemcontext import SystemContext + +import os +import typing + + +class DepmodAllCommand(Command): + """The depmod_all command.""" + + def __init__(self, **services: typing.Any) -> None: + """Constructor.""" + super().__init__( + "_depmod_all", + help_string="Make sure all module dependecies are up to date.", + file=__file__, + **services + ) + + def validate( + self, location: Location, *args: typing.Any, **kwargs: typing.Any + ) -> None: + """Validate the arguments.""" + self._validate_no_arguments(location, *args, **kwargs) + + def __call__( + self, + location: Location, + system_context: SystemContext, + *args: typing.Any, + **kwargs: typing.Any + ) -> None: + """Execute command.""" + modules = system_context.file_name("/usr/lib/modules") + if not os.path.isdir(modules): + return # No kernel installed, nothing to do. + + for kver in [ + f for f in os.listdir(modules) if os.path.isdir(os.path.join(modules, f)) + ]: + location.set_description("Run depmod for kernel version {}...".format(kver)) + self._execute( + location, + system_context, + "run", + self._binary(Binaries.DEPMOD), + "-a", + "-b", + system_context.fs_directory, + kver, + ) diff --git a/cleanroom/commands/_pacman_keyinit.py b/cleanroom/commands/_pacman_keyinit.py index 3b1ef21..9737d37 100644 --- a/cleanroom/commands/_pacman_keyinit.py +++ b/cleanroom/commands/_pacman_keyinit.py @@ -10,7 +10,6 @@ from cleanroom.systemcontext import SystemContext from cleanroom.helper.run import run -import os.path import typing @@ -28,7 +27,9 @@ def __init__(self, **services: typing.Any) -> None: **services ) - def validate(self, location: Location, *args: str, **kwargs: typing.Any) -> None: + def validate( + self, location: Location, *args: typing.Any, **kwargs: typing.Any + ) -> None: """Validate the arguments.""" self._validate_no_args(location, *args) self._validate_kwargs(location, ("pacman_key", "gpg_dir"), **kwargs) @@ -38,13 +39,13 @@ def __call__( self, location: Location, system_context: SystemContext, - *args: str, + *args: typing.Any, **kwargs: typing.Any ) -> None: """Execute command.""" - pacman_key_command = kwargs.get("pacman_key") - gpg_dir = kwargs.get("gpg_dir") + pacman_key_command = kwargs.get("pacman_key", "") + gpg_dir = kwargs.get("gpg_dir", "") run( pacman_key_command, diff --git a/cleanroom/commands/_restore.py b/cleanroom/commands/_restore.py index 50ce415..7c3c5ae 100644 --- a/cleanroom/commands/_restore.py +++ b/cleanroom/commands/_restore.py @@ -44,7 +44,10 @@ def __call__( """Execute command.""" base = args[0] - assert system_context.base_context.system_name == base + assert ( + system_context.base_context + and system_context.base_context.system_name == base + ) btrfs_helper = self._service("btrfs_helper") diff --git a/cleanroom/commands/_setup.py b/cleanroom/commands/_setup.py index 5323a03..6147c5f 100644 --- a/cleanroom/commands/_setup.py +++ b/cleanroom/commands/_setup.py @@ -5,7 +5,7 @@ """ -from cleanroom.binarymanager import BinaryManager, Binaries +from cleanroom.binarymanager import Binaries from cleanroom.command import Command from cleanroom.helper.btrfs import BtrfsHelper from cleanroom.helper.run import run @@ -49,7 +49,7 @@ def _setup_fs_directory(system_context: SystemContext, mknod_command: str) -> No ) -class _SetupCommand(Command): +class SetupCommand(Command): """The _setup Command.""" def __init__(self, **services: typing.Any) -> None: diff --git a/cleanroom/commands/_store.py b/cleanroom/commands/_store.py index 1648f4a..45c9c15 100644 --- a/cleanroom/commands/_store.py +++ b/cleanroom/commands/_store.py @@ -13,7 +13,7 @@ import typing -class _StoreCommand(Command): +class StoreCommand(Command): """The _store command.""" def __init__(self, **services: typing.Any) -> None: diff --git a/cleanroom/commands/_teardown.py b/cleanroom/commands/_teardown.py index 770a65f..4062a03 100644 --- a/cleanroom/commands/_teardown.py +++ b/cleanroom/commands/_teardown.py @@ -13,7 +13,7 @@ import typing -class _TeardownCommand(Command): +class TeardownCommand(Command): """The _teardown Command.""" def __init__(self, **services: typing.Any) -> None: diff --git a/cleanroom/commands/_test.py b/cleanroom/commands/_test.py index 2e3612c..c42ce2f 100644 --- a/cleanroom/commands/_test.py +++ b/cleanroom/commands/_test.py @@ -40,7 +40,7 @@ def _find_tests(system_context: SystemContext) -> typing.Generator[str, None, No yield test -class _TestCommand(Command): +class TestCommand(Command): """The _test Command.""" def __init__(self, **services: typing.Any) -> None: diff --git a/cleanroom/commands/_write_deploy_info.py b/cleanroom/commands/_write_deploy_info.py index 73fadca..b3b8ac9 100644 --- a/cleanroom/commands/_write_deploy_info.py +++ b/cleanroom/commands/_write_deploy_info.py @@ -7,7 +7,6 @@ from cleanroom.command import Command from cleanroom.exceptions import GenerateError -from cleanroom.helper.file import makedirs from cleanroom.location import Location from cleanroom.systemcontext import SystemContext diff --git a/cleanroom/commands/add_partition.py b/cleanroom/commands/add_partition.py index 58cfefe..5540fbe 100644 --- a/cleanroom/commands/add_partition.py +++ b/cleanroom/commands/add_partition.py @@ -11,7 +11,7 @@ from cleanroom.printer import trace from cleanroom.systemcontext import SystemContext -import os.path +import os import typing @@ -30,7 +30,9 @@ def __init__(self, **services: typing.Any) -> None: **services ) - def validate(self, location: Location, *args: str, **kwargs: typing.Any) -> None: + def validate( + self, location: Location, *args: typing.Any, **kwargs: typing.Any + ) -> None: """Validate the arguments.""" self._validate_args_exact( location, 1, '"{}" needs a name for the partition file.', *args @@ -93,7 +95,7 @@ def __call__( self, location: Location, system_context: SystemContext, - *args: str, + *args: typing.Any, **kwargs: typing.Any ) -> None: """Execute command.""" diff --git a/cleanroom/commands/based_on.py b/cleanroom/commands/based_on.py index d4534f6..0661ddb 100644 --- a/cleanroom/commands/based_on.py +++ b/cleanroom/commands/based_on.py @@ -9,7 +9,7 @@ from cleanroom.exceptions import ParseError from cleanroom.location import Location from cleanroom.systemcontext import SystemContext -from cleanroom.printer import trace, verbose +from cleanroom.printer import verbose import re import typing @@ -68,7 +68,10 @@ def __call__( self._add_hook(location, system_context, "testing", "_test") self._execute(location, system_context, "_setup") else: - assert system_context.base_context.system_name == base_system + assert ( + system_context.base_context + and system_context.base_context.system_name == base_system + ) verbose("Building on top of {}.".format(base_system)) self._execute(location, system_context, "_restore", base_system) diff --git a/cleanroom/commands/create.py b/cleanroom/commands/create.py index 99a383c..1f4db2e 100644 --- a/cleanroom/commands/create.py +++ b/cleanroom/commands/create.py @@ -16,7 +16,7 @@ class CreateCommand(Command): """The create command.""" - def __init__(self, **services) -> None: + def __init__(self, **services: typing.Any) -> None: """Constructor.""" super().__init__( "create", diff --git a/cleanroom/commands/create_efi_kernel.py b/cleanroom/commands/create_efi_kernel.py index 95eb3fa..7081691 100644 --- a/cleanroom/commands/create_efi_kernel.py +++ b/cleanroom/commands/create_efi_kernel.py @@ -11,10 +11,10 @@ from cleanroom.helper.run import run from cleanroom.location import Location from cleanroom.systemcontext import SystemContext -from cleanroom.printer import debug +from cleanroom.printer import debug, trace from glob import glob -import os.path +import os import tempfile import typing @@ -40,7 +40,7 @@ def _get_initrd_parts(location: Location, path: str) -> typing.List[str]: if not path: raise GenerateError("No initrd-parts directory.", location=location) - initrd_parts = [] # type: typing.List[str] + initrd_parts: typing.List[str] = [] for f in glob(os.path.join(path, "*")): if os.path.isfile(f): initrd_parts.append(f) @@ -49,6 +49,8 @@ def _get_initrd_parts(location: Location, path: str) -> typing.List[str]: 'No initrd-parts found in directory "{}".'.format(path), location=location ) initrd_parts.sort() + for ip in initrd_parts: + trace(f" Adding into initrd: {ip} ...") return initrd_parts @@ -64,7 +66,7 @@ def __init__(self, **services: typing.Any) -> None: "commandline=", help_string="Create a efi kernel with built-in initrd.", file=__file__, - **services + **services, ) def validate( @@ -94,7 +96,7 @@ def __call__( location: Location, system_context: SystemContext, *args: typing.Any, - **kwargs: typing.Any + **kwargs: typing.Any, ) -> None: """Execute command.""" output = args[0] @@ -115,7 +117,7 @@ def __call__( debug("{}: osrelease: {}.".format(self.name, osrelease_file)) debug("{}: efistub : {}.".format(self.name, efistub)) - self._validate_files(kernel, *initrd_files, osrelease_file, efistub) + self._validate_files(location, kernel, *initrd_files, osrelease_file, efistub) with tempfile.TemporaryDirectory() as tmp: initrd = _create_initrd(tmp, *initrd_files) cmdline = _create_cmdline_file(tmp, cmdline_input) diff --git a/cleanroom/commands/create_initrd.py b/cleanroom/commands/create_initrd.py index 0acf250..2fe7694 100644 --- a/cleanroom/commands/create_initrd.py +++ b/cleanroom/commands/create_initrd.py @@ -6,56 +6,17 @@ from cleanroom.binarymanager import Binaries from cleanroom.command import Command -from cleanroom.helper.file import chmod, copy, create_file, remove, move +from cleanroom.helper.file import copy, create_file, remove, move from cleanroom.helper.run import run from cleanroom.location import Location from cleanroom.systemcontext import SystemContext -from cleanroom.printer import info, debug, trace +from cleanroom.printer import info -import os.path +import os import textwrap import typing -def _deviceify(device: str) -> str: - if not device: - return "" - if device.startswith("PARTLABEL="): - device = "/dev/disk/by-partlabel/" + device[10:] - elif device.startswith("LABEL="): - device = "/dev/disk/by-label/" + device[6:] - elif device.startswith("PARTUUID="): - device = "/dev/disk/by-partuuid/" + device[9:] - elif device.startswith("UUID="): - device = "/dev/disk/by-uuid/" + device[5:] - elif device.startswith("ID="): - device = "/dev/disk/by-id/" + device[3:] - elif device.startswith("PATH="): - device = "/dev/disk/by-path/" + device[5:] - assert device.startswith("/dev/") - return device - - -def _escape_device(device: str) -> str: - device = _deviceify(device) - - device = device.replace("-", "\\x2d") - device = device.replace("=", "\\x3d") - device = device.replace(";", "\\x3b") - device = device.replace("/", "-") - - return device[1:] - - -def _create_install_hook( - location: Location, system_context: SystemContext, name: str, contents: str -) -> str: - location.set_description("install mkinitcpio install hook {}".format(name)) - path = os.path.join("/usr/lib/initcpio/install", name) - create_file(system_context, path, contents.encode("utf-8")) - return path - - def _cleanup_extra_files( location: Location, system_context: SystemContext, *files: str ) -> None: @@ -75,11 +36,6 @@ def __init__(self, **services: typing.Any) -> None: **services ) - self._vg = None # type: typing.Optional[str] - self._image_fs = None # type: typing.Optional[str] - self._image_device = None # type: typing.Optional[str] - self._image_options = None # type: typing.Optional[str] - def validate( self, location: Location, *args: typing.Any, **kwargs: typing.Any ) -> None: @@ -88,356 +44,17 @@ def validate( location, 1, '"{}" takes an initrd to create.', *args, **kwargs ) - def _create_systemd_units( - self, location: Location, system_context: SystemContext - ) -> typing.Sequence[str]: - location.set_description("Install extra systemd units") - to_clean_up = [ - "/usr/lib/systemd/system/initrd-check-bios.service", - "/usr/lib/systemd/system/initrd-sysroot-setup.service", - "/usr/lib/systemd/system/initrd-find-root-lv-partitions.service", - "/usr/lib/systemd/system/images.mount", - "/usr/lib/systemd/system/initrd-find-image-partitions.service", - ] - create_file( - system_context, - "/usr/lib/systemd/system/initrd-check-bios.service", - textwrap.dedent( - """\ - [Unit] - Description=Print TPM configuration - DefaultDependencies=no - Requires=sysroot.mount - After=sysroot.mount systemd-volatile-root.service - Before=initrd-root-fs.target shutdown.target - Conflicts=shutdown.target - - [Service] - Type=oneshot - RemainAfterExit=yes - ExecStart=/usr/bin/initrd-check-bios.sh - StandardOutput=journal+console - - [Install] - WantedBy=initrd-root-device.target - """ - ).encode("utf-8"), - mode=0o644, - ) - trace("Wrote initrd-check-bios.service") - - create_file( - system_context, - "/usr/lib/systemd/system/initrd-sysroot-setup.service", - textwrap.dedent( - """\ - [Unit] - Description=Set up root fs in /sysroot - DefaultDependencies=no - ConditionPathExists=/sysroot/usr/lib/boot/root-fs.tar - Requires=sysroot.mount - After=sysroot.mount systemd-volatile-root.service - Before=initrd-root-fs.target shutdown.target - Conflicts=shutdown.target - AssertPathExists=/etc/initrd-release - - [Service] - Type=oneshot - RemainAfterExit=yes - ExecStart=/usr/bin/tar -C /sysroot -xf /sysroot/usr/lib/boot/root-fs.tar - """ - ).encode("utf-8"), - mode=0o644, - ) - trace("Wrote initrd-sysroot-setup.service") - - if self._vg is not None: - device_name = "dev-{}-{}".format(self._vg, self._full_name) - create_file( - system_context, - "/usr/lib/systemd/system/initrd-find-root-lv-partitions.service", - textwrap.dedent( - """\ - [Unit] - Description=Find partitions in root LV - DefaultDependencies=no - ConditionPathExists=/dev/{1}/{2} - After={0}.device - BindsTo={0}.device - Requisite={0}.device - - [Service] - WorkingDirectory=/ - Type=oneshot - RemainAfterExit=yes - ExecStart=/usr/bin/partprobe /dev/{1}/{2} - - [Install] - WantedBy={0}.device - """ - ) - .format(device_name, self._vg, self._full_name) - .encode("utf-8"), - mode=0o644, - ) - trace("Wrote initrd-find-root-lv-partitions.service") - - if self._image_device: - create_file( - system_context, - "/usr/lib/systemd/system/images.mount", - textwrap.dedent( - """\ - [Unit] - Description=Mount /images in initrd - DefaultDependencies=no - After=systemd-cryptsetup@main.service - - [Mount] - What={} - Where=/images - Type={} - Options={},nodev,noexec,nosuid,ro - """ - ) - .format(self._image_device, self._image_fs, self._image_options) - .encode("utf-8"), - mode=0o644, - ) - trace( - "Wrote images.mount (Where={}, Type={}, Options={})".format( - self._image_device, self._image_fs, self._image_options - ) - ) - - create_file( - system_context, - "/usr/lib/systemd/system/initrd-find-image-partitions.service", - textwrap.dedent( - """\ - [Unit] - Description=Find partitions in image files - DefaultDependencies=no - ConditionFileNotEmpty=/images/{0} - After=images.mount - BindsTo=images.mount - Requisite=images.mount - - [Service] - WorkingDirectory=/ - Type=oneshot - RemainAfterExit=yes - ExecStart=/usr/bin/losetup --find --partscan /images/{0} - ExecStop=/usr/bin/losetup --detach-all - - [Install] - WantedBy=images.mount - """ - ) - .format(self._full_name) - .encode("utf-8"), - mode=0o644, - ) - trace( - "Wrote initrd-find-image-partitions (/images/{}".format(self._full_name) - ) - - return to_clean_up - - def _sd_boot_image_hook(self) -> str: - hook = textwrap.dedent( - """\ - #!/usr/bin/bash - - build() { - """ - ) - if self._vg is not None: - hook += """ # partprobe LV: - add_systemd_unit "initrd-find-root-lv-partitions.service" - """ - hook += ( - " add_symlink \"/usr/lib/systemd/system/dev-{0}-{1}'" - '.device.wants/initrd-find-root-lv-partitions.service" \\'.format( - self._vg, self._full_name - ) - ) - hook += """ - "../initrd-find-root-lv-partitions.service" - -""" - if self._image_device is not None: - escaped_device = _escape_device(self._image_device) - hook += """ - # losetup image files: - add_binary /usr/bin/losetup - - add_systemd_unit "images.mount" - add_symlink "/usr/lib/systemd/system/{}.device.wants/images.mount" \ - "../images.mount" - add_systemd_unit "initrd-find-image-partitions.service" - add_symlink "/usr/lib/systemd/system/images.mount.wants/initrd-find-image-partitions.service" \ - "../initrd-find-image-partitions.service" -""".format( - escaped_device - ) - hook += textwrap.dedent( - """\ - } - - help() { - cat < typing.Sequence[str]: - to_clean_up = [ - _create_install_hook( - location, - system_context, - "sd-check-bios", - textwrap.dedent( - """\ - #!/usr/bin/bash - - build() { - # Setup rescue target: - add_binary "/usr/bin/initrd-check-bios.sh" - add_binary "/usr/bin/initrd-mnencode" - add_binary "/usr/bin/md5sum" - - add_systemd_unit "initrd-check-bios.service" - add_symlink "/usr/lib/systemd/system/initrd-root-device.target.wants/initrd-check-bios.service" \ - "../initrd-check-bios.service" - add_module tpm_tis tpm_atmel tpm_nsc - } - - help() { - cat < typing.Sequence[str]: to_clean_up = ["/etc/mkinitcpio.d", "/etc/mkinitcpio.conf", "/boot/vmlinu*"] @@ -462,10 +79,7 @@ def _install_mkinitcpio( "sed", "/^HOOKS=/ " "cHOOKS=(base systemd keyboard sd-vconsole " - "sd-encrypt block sd-lvm2 filesystems btrfs " - "sd-check-bios sd-stateless sd-verity " - "sd-volatile sd-boot-image " - "sd-shutdown)", + "sd-encrypt block sd-lvm2 filesystems btrfs sd-shutdown)", "/etc/mkinitcpio.conf", ) @@ -487,21 +101,11 @@ def _install_mkinitcpio( PRESETS=('default') - #default_config="/etc/mkinitcpio.conf" default_image="/boot/initramfs.img" - #default_options="" """ ).encode("utf-8"), ) - self._execute( - location.next_line(), - system_context, - "sed", - "s%/initramfs-linux.*.img%/initrd%", - "/etc/mkinitcpio.d/cleanroom.preset", - ) - return to_clean_up def _remove_mkinitcpio( @@ -523,18 +127,6 @@ def _remove_mkinitcpio( def register_substitutions(self) -> typing.List[typing.Tuple[str, str, str]]: return [ - ("IMAGE_FS", "ext2", "The filesystem type to load clrm-images from",), - ("IMAGE_DEVICE", "", "The device to load clrm-images from",), - ( - "IMAGE_OPTIONS", - "rw", - "The filesystem options to mount the IMAGE_DEVICE with", - ), - ( - "DEFAULT_VG", - "", - "The volume group to look for clrm rootfs/verity partitions on", - ), ("MKINITCPIO_EXTRA_MODULES", "", "Extra modules to add to the initrd",), ("MKINITCPIO_EXTRA_HOOKS", "", "Extra hooks to add to the initrd",), ("MKINITCPIO_EXTRA_BINARIES", "", "Extra binaries to add to the initrd",), @@ -553,30 +145,11 @@ def __call__( info("Skipping initrd generation: No vmlinuz in boot directory.") return - self._vg = system_context.substitution_expanded("DEFAULT_VG", "") - if not self._vg: - self._vg = None - - self._image_fs = system_context.substitution_expanded("IMAGE_FS", "") - self._image_device = _deviceify( - system_context.substitution_expanded("IMAGE_DEVICE", "") - ) - self._image_options = system_context.substitution_expanded("IMAGE_OPTIONS", "") - - name_prefix = system_context.substitution_expanded("DISTRO_ID", "") - name_version = system_context.substitution_expanded( - "DISTRO_VERSION_ID", system_context.timestamp - ) - self._full_name = "{}_{}".format(name_prefix, name_version) - initrd = args[0] - to_clean_up = [] # type: typing.List[str] + to_clean_up: typing.List[str] = [] to_clean_up += "/boot/vmlinuz" - to_clean_up += self._install_extra_binaries(location, system_context) - to_clean_up += self._create_systemd_units(location, system_context) to_clean_up += self._install_mkinitcpio(location, system_context) - to_clean_up += self._install_mkinitcpio_hooks(location, system_context) copy( system_context, @@ -593,34 +166,10 @@ def __call__( chroot_helper=self._binary(Binaries.CHROOT_HELPER), ) - initrd_directory = os.path.dirname(initrd) - os.makedirs(initrd_directory, exist_ok=True) + os.makedirs(os.path.dirname(initrd), exist_ok=True) move(system_context, "/boot/initramfs.img", initrd, to_outside=True) - _cleanup_extra_files(location, system_context, *to_clean_up) - self._remove_mkinitcpio(location, system_context) + # _cleanup_extra_files(location, system_context, *to_clean_up) + # self._remove_mkinitcpio(location, system_context) assert os.path.isfile(initrd) - - def _install_extra_binaries( - self, location: Location, system_context: SystemContext - ) -> typing.Sequence[str]: - to_clean_up: typing.List[str] = [ - self._copy_extra_file(location, system_context, "initrd-check-bios.sh"), - self._copy_extra_file(location, system_context, "initrd-mnencode"), - ] - return to_clean_up - - def _copy_extra_file( - self, location: Location, system_context: SystemContext, extra_file: str - ) -> str: - location.set_description( - "Installing extra mkinitcpio file {}".format(extra_file) - ) - helper_directory = self._helper_directory - assert helper_directory - source_path = os.path.join(helper_directory, extra_file) - dest_path = os.path.join("/usr/bin", extra_file) - copy(system_context, source_path, dest_path, from_outside=True) - chmod(system_context, 0o755, dest_path) - return dest_path diff --git a/cleanroom/commands/create_os_release.py b/cleanroom/commands/create_os_release.py index 39a1eb0..22a5868 100644 --- a/cleanroom/commands/create_os_release.py +++ b/cleanroom/commands/create_os_release.py @@ -15,7 +15,7 @@ class CreateOsReleaseCommand(Command): """The create_os_release command.""" - def __init__(self, **services) -> None: + def __init__(self, **services: typing.Any) -> None: """Constructor.""" super().__init__( "create_os_release", diff --git a/cleanroom/commands/debootstrap.py b/cleanroom/commands/debootstrap.py index 3deb309..5de76b9 100644 --- a/cleanroom/commands/debootstrap.py +++ b/cleanroom/commands/debootstrap.py @@ -58,9 +58,9 @@ def __call__( suite=kwargs.get("suite", ""), target=system_context.fs_directory, mirror=kwargs.get("mirror", ""), - variant=kwargs.get("variant", None), - include=kwargs.get("include", None), - exclude=kwargs.get("exclude", None), + variant=kwargs.get("variant", ""), + include=kwargs.get("include", ""), + exclude=kwargs.get("exclude", ""), debootstrap_command=self._binary(Binaries.DEBOOTSTRAP), ) diff --git a/cleanroom/commands/ensure_depmod.py b/cleanroom/commands/ensure_depmod.py new file mode 100644 index 0000000..9e87574 --- /dev/null +++ b/cleanroom/commands/ensure_depmod.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +"""ensure_depmod command. + +@author: Tobias Hunger +""" + + +from cleanroom.command import Command +from cleanroom.location import Location +from cleanroom.systemcontext import SystemContext + +import typing + + +class EnsureDepmodCommand(Command): + """The ensure_depmod command.""" + + def __init__(self, **services: typing.Any) -> None: + """Constructor.""" + super().__init__( + "ensure_depmod", + help_string="Ensure that depmod is run for all kernels.", + file=__file__, + **services + ) + + def validate( + self, location: Location, *args: typing.Any, **kwargs: typing.Any + ) -> None: + """Validate the arguments.""" + self._validate_no_arguments(location, *args, **kwargs) + + def __call__( + self, + location: Location, + system_context: SystemContext, + *args: typing.Any, + **kwargs: typing.Any + ) -> None: + """Execute command.""" + location.set_description("Run ldconfig") + self._add_hook( + location, system_context, "export", "_depmod_all", + ) diff --git a/cleanroom/commands/export.py b/cleanroom/commands/export.py index 572e06e..7a43733 100644 --- a/cleanroom/commands/export.py +++ b/cleanroom/commands/export.py @@ -12,23 +12,22 @@ from cleanroom.helper.file import exists from cleanroom.helper.run import run from cleanroom.systemcontext import SystemContext -import cleanroom.helper.disk as disk -from cleanroom.imager import ExtraPartition, create_image +from cleanroom.imager import create_image from cleanroom.printer import debug, h2, info, verbose -import os.path +import os import shutil import typing -def _kernel_name(system_context: SystemContext) -> str: +def _kernel_name(system_context: SystemContext, *, postfix: str = "") -> str: boot_data = system_context.boot_directory assert boot_data return os.path.join( boot_data, - "linux_{}.efi".format( - system_context.substitution_expanded("DISTRO_VERSION_ID", "") + "{}{}_{}.efi".format( + system_context.pretty_system_name, postfix, system_context.timestamp ), ) @@ -49,15 +48,15 @@ def _create_dmverity( squashfs_file: str, *, vrty_label: str, - veritysetup_command: str + veritysetup_command: str, ) -> typing.Tuple[str, str, str]: verity_file = os.path.join(target_directory, vrty_label) result = run(veritysetup_command, "format", squashfs_file, verity_file) _size_extend(verity_file) - root_hash = None - uuid = None + root_hash: typing.Optional[str] = None + uuid: typing.Optional[str] = None for line in result.stdout.split("\n"): if line.startswith("Root hash:"): root_hash = line[10:].strip() @@ -131,7 +130,7 @@ def __init__(self, **services: typing.Any) -> None: "[usr_only=True]", help_string="Export a filesystem image.", file=__file__, - **services + **services, ) def validate( @@ -153,7 +152,7 @@ def validate( "skip_validation", "usr_only", ), - **kwargs + **kwargs, ) if "key" in kwargs: @@ -209,7 +208,7 @@ def validate( location=location, ) - def _setup(self, *args, **kwargs): + def _setup(self, *args: typing.Any, **kwargs: typing.Any): self._key = kwargs.get("efi_key", "") self._cert = kwargs.get("efi_cert", "") self._image_format = kwargs.get("image_format", "raw") @@ -244,7 +243,7 @@ def register_substitutions(self) -> typing.List[typing.Tuple[str, str, str]]: ), ( "CLRM_IMAGE_FILENAME", - "${DISTRO_ID}_${DISTRO_VERSION_ID}", + "${PRETTY_SYSTEM_NAME}_${DISTRO_VERSION_ID}.img", "File name for the clrm image file", ), ] @@ -254,7 +253,7 @@ def __call__( location: Location, system_context: SystemContext, *args: typing.Any, - **kwargs: typing.Any + **kwargs: typing.Any, ) -> None: """Execute command.""" self._setup(*args, **kwargs) @@ -357,7 +356,7 @@ def _create_cache_data( ) vrty_label = system_context.substitution_expanded("VRTYFS_PARTLABEL", "") assert vrty_label - (verity_file, verity_uuid, root_hash) = _create_dmverity( + (verity_file, _, root_hash) = _create_dmverity( system_context.cache_directory, squashfs_file, vrty_label=vrty_label, @@ -468,9 +467,17 @@ def _sign_efi_kernel( def _create_initramfs( self, location: Location, system_context: SystemContext ) -> bool: - location.set_description("Create initrd") initrd_parts = os.path.join(system_context.boot_directory, "initrd-parts") + location.set_description("Create EXTRA initrd part") os.makedirs(initrd_parts, exist_ok=True) + self._execute( + location.next_line(), + system_context, + "_create_clrm_initrd_extra", + os.path.join(initrd_parts, "99-clrm-extra"), + ) + + location.set_description("Create initrd") self._execute( location.next_line(), system_context, @@ -506,7 +513,7 @@ def _create_squashfs( "-noX", "-processors", "1", - work_directory=system_context.fs_directory + work_directory=system_context.fs_directory, ) _size_extend(squash_file) return squash_file diff --git a/cleanroom/commands/helper/create_initrd/initrd-check-bios.sh b/cleanroom/commands/helper/create_initrd/initrd-check-bios.sh deleted file mode 100755 index e960c3c..0000000 --- a/cleanroom/commands/helper/create_initrd/initrd-check-bios.sh +++ /dev/null @@ -1,31 +0,0 @@ -#!/usr/bin/sh - -MNENCODE="/usr/bin/initrd-mnencode" -HASH="/usr/bin/md5sum" - -HAVE_TPM="no" -PCRS="" -for i in 'tpm0' 'tpm1' 'tpm2' 'tpm3' ; do - P="/sys/class/tpm/${i}/pcrs" - if test -r "${P}" ; then - NEXT=`head -n 17 "${P}"` - PCRS="${PCRS}${P}:\n${NEXT}" - HAVE_TPM='yes' - fi -done - -if test "x${HAVE_TPM}" != "xyes" ; then - echo "**** NO TPM CHIP FOUND ****" - exit 0 -fi - -if test -z "${PCRS}" ; then - echo "**** NO PCRS REGISTERS FOUND ****" - exit 0 -fi - -HRESULT=`echo "${PCRS}" | "${HASH}" | cut -d' ' -f1` -RESULT=`printf "${HRESULT}" | "${MNENCODE}"` -echo "${HRESULT}" -echo "${RESULT}" - diff --git a/cleanroom/commands/helper/create_initrd/initrd-mnencode b/cleanroom/commands/helper/create_initrd/initrd-mnencode deleted file mode 100755 index 0be75bf..0000000 Binary files a/cleanroom/commands/helper/create_initrd/initrd-mnencode and /dev/null differ diff --git a/cleanroom/commands/net_firewall_open_port.py b/cleanroom/commands/net_firewall_open_port.py index 63f7602..fb50055 100644 --- a/cleanroom/commands/net_firewall_open_port.py +++ b/cleanroom/commands/net_firewall_open_port.py @@ -50,5 +50,5 @@ def __call__( ) -> None: """Execute command.""" protocol = kwargs.get("protocol", "tcp") - comment = kwargs.get("comment", None) + comment = kwargs.get("comment", "") open_port(system_context, args[0], protocol=protocol, comment=comment) diff --git a/cleanroom/commands/pacman.py b/cleanroom/commands/pacman.py index f9d0446..5e47c85 100644 --- a/cleanroom/commands/pacman.py +++ b/cleanroom/commands/pacman.py @@ -55,7 +55,7 @@ def __call__( *args, remove=kwargs.get("remove", False), overwrite=kwargs.get("overwrite", ""), - assume_installed=kwargs.get("assume_installed", None), + assume_installed=kwargs.get("assume_installed", ""), pacman_command=self._binary(Binaries.PACMAN), chroot_helper=self._binary(Binaries.CHROOT_HELPER) ) diff --git a/cleanroom/commands/pacstrap.py b/cleanroom/commands/pacstrap.py index 4902f06..10c87d8 100644 --- a/cleanroom/commands/pacstrap.py +++ b/cleanroom/commands/pacstrap.py @@ -36,7 +36,9 @@ def __init__(self, **services: typing.Any) -> None: **services ) - def validate(self, location: Location, *args: str, **kwargs: typing.Any) -> None: + def validate( + self, location: Location, *args: typing.Any, **kwargs: typing.Any + ) -> None: """Validate the arguments.""" self._validate_args_at_least( location, @@ -51,11 +53,11 @@ def __call__( self, location: Location, system_context: SystemContext, - *args: str, + *args: typing.Any, **kwargs: typing.Any ) -> None: """Execute command.""" - pacman_setup(system_context, kwargs.get("config")) + pacman_setup(system_context, kwargs.get("config", "")) pacman_key_command = self._binary(Binaries.PACMAN_KEY) pacman_keyinit(system_context, pacman_key_command=pacman_key_command) diff --git a/cleanroom/commands/pkg_amd_cpu.py b/cleanroom/commands/pkg_amd_cpu.py index 97969c4..d1e638b 100644 --- a/cleanroom/commands/pkg_amd_cpu.py +++ b/cleanroom/commands/pkg_amd_cpu.py @@ -9,7 +9,7 @@ from cleanroom.location import Location from cleanroom.systemcontext import SystemContext -import os.path +import os import typing diff --git a/cleanroom/commands/pkg_intel_cpu.py b/cleanroom/commands/pkg_intel_cpu.py index 7d7baa9..ce2fcdf 100644 --- a/cleanroom/commands/pkg_intel_cpu.py +++ b/cleanroom/commands/pkg_intel_cpu.py @@ -8,7 +8,7 @@ from cleanroom.location import Location from cleanroom.systemcontext import SystemContext -import os.path +import os import typing diff --git a/cleanroom/commands/pkg_kernel.py b/cleanroom/commands/pkg_kernel.py index 33776f6..f91769b 100644 --- a/cleanroom/commands/pkg_kernel.py +++ b/cleanroom/commands/pkg_kernel.py @@ -8,7 +8,6 @@ from cleanroom.location import Location from cleanroom.systemcontext import SystemContext -import textwrap import typing import os.path diff --git a/cleanroom/commands/pkg_nvidia_gpu.py b/cleanroom/commands/pkg_nvidia_gpu.py new file mode 100644 index 0000000..6be47b7 --- /dev/null +++ b/cleanroom/commands/pkg_nvidia_gpu.py @@ -0,0 +1,68 @@ +"""pkg_nvidia_gpu command. + +@author: Paul Hunnisett +""" + +from cleanroom.command import Command +from cleanroom.location import Location +from cleanroom.systemcontext import SystemContext + +import typing + + +class PkgNvidiaGpuCommand(Command): + """The pkg_nvidia_gpu command.""" + + def __init__(self, **services: typing.Any) -> None: + """Constructor.""" + super().__init__( + "pkg_nvidia_gpu", + help_string="Set up NVidia GPU.", + file=__file__, + **services + ) + + def validate( + self, location: Location, *args: typing.Any, **kwargs: typing.Any + ) -> None: + """Validate the arguments.""" + self._validate_no_arguments(location, *args, **kwargs) + + def __call__( + self, + location: Location, + system_context: SystemContext, + *args: typing.Any, + **kwargs: typing.Any + ) -> None: + """Execute command.""" + + # Set some kernel parameters for: + system_context.set_or_append_substitution( + "KERNEL_CMDLINE", "nvidia-drm.modeset=1 nouveau.blacklist=1" + ) + + self._execute( + location, + system_context, + "pacman", + "nvidia", + "nvidia-settings", + "nvidia-utils", + "opencl-nvidia", + "libvdpau", + "lib32-libvdpau", + "lib32-nvidia-utils", + "lib32-opencl-nvidia", + "vdpauinfo", + "mesa", + "mesa-demos", + ) + + self._execute( + location.next_line(), + system_context, + "create", + "/etc/modprobe.d/nouveau-blacklist.conf", + "blacklist nouveau", + ) diff --git a/cleanroom/commands/pkg_sshd.py b/cleanroom/commands/pkg_sshd.py index d560f5f..876c845 100644 --- a/cleanroom/commands/pkg_sshd.py +++ b/cleanroom/commands/pkg_sshd.py @@ -69,20 +69,24 @@ def _install_openssh( ) -> None: self._execute(location, system_context, "pacman", "openssh") - def _yes_or_no(self, arg: str, **kwargs) -> str: + def _yes_or_no(self, arg: str, **kwargs: typing.Any) -> str: if kwargs.get(arg, False): return "yes" return "no" def _set_sshd_config_yes_or_no( - self, location: Location, system_context: SystemContext, arg: str, **kwargs + self, + location: Location, + system_context: SystemContext, + arg: str, + **kwargs: typing.Any ) -> None: self._set_sshd_config( location, system_context, arg, self._yes_or_no(arg, **kwargs) ) def _harden_sshd( - self, location: Location, system_context: SystemContext, **kwargs + self, location: Location, system_context: SystemContext, **kwargs: typing.Any ) -> None: # Install custom moduli moduli = os.path.join(self._config_directory(system_context), "moduli") diff --git a/cleanroom/commands/pkg_systemd_homed.py b/cleanroom/commands/pkg_systemd_homed.py index 2b5f711..be38ddd 100644 --- a/cleanroom/commands/pkg_systemd_homed.py +++ b/cleanroom/commands/pkg_systemd_homed.py @@ -7,13 +7,12 @@ from cleanroom.command import Command from cleanroom.exceptions import GenerateError -from cleanroom.helper.file import chmod, chown, copy, create_file, makedirs +from cleanroom.helper.file import chmod, chown, create_file, makedirs from cleanroom.location import Location from cleanroom.systemcontext import SystemContext import textwrap import typing -import os class PkgSystemdHomedCommand(Command): diff --git a/cleanroom/commands/pkg_usbguard.py b/cleanroom/commands/pkg_usbguard.py index 975b4db..f6dfdb3 100644 --- a/cleanroom/commands/pkg_usbguard.py +++ b/cleanroom/commands/pkg_usbguard.py @@ -95,7 +95,7 @@ def __call__( system_context, "/usr/share/factory/var/etc/usbguard/rules.conf", b"", - mode=0o755, + mode=0o600, ) remove( diff --git a/cleanroom/commands/remove.py b/cleanroom/commands/remove.py index 0bc6870..4c99e36 100644 --- a/cleanroom/commands/remove.py +++ b/cleanroom/commands/remove.py @@ -26,7 +26,9 @@ def __init__(self, **services: typing.Any) -> None: **services ) - def validate(self, location: Location, *args: str, **kwargs: typing.Any) -> None: + def validate( + self, location: Location, *args: typing.Any, **kwargs: typing.Any + ) -> None: """Validate the arguments.""" self._validate_args_at_least( location, @@ -40,7 +42,7 @@ def __call__( self, location: Location, system_context: SystemContext, - *args: str, + *args: typing.Any, **kwargs: typing.Any ) -> None: """Execute command.""" diff --git a/cleanroom/commands/sshd_set_hostkeys.py b/cleanroom/commands/sshd_set_hostkeys.py index 4b2bb75..e12490e 100644 --- a/cleanroom/commands/sshd_set_hostkeys.py +++ b/cleanroom/commands/sshd_set_hostkeys.py @@ -12,7 +12,7 @@ from cleanroom.systemcontext import SystemContext import glob -import os.path +import os import typing diff --git a/cleanroom/commands/strip_development_files.py b/cleanroom/commands/strip_development_files.py index 6dc04d2..ac9336a 100644 --- a/cleanroom/commands/strip_development_files.py +++ b/cleanroom/commands/strip_development_files.py @@ -11,7 +11,7 @@ import typing -import os.path +import os class StripDevelopmentFilesCommand(Command): diff --git a/cleanroom/commands/systemd_cleanup.py b/cleanroom/commands/systemd_cleanup.py index ae2c8a2..f050b49 100644 --- a/cleanroom/commands/systemd_cleanup.py +++ b/cleanroom/commands/systemd_cleanup.py @@ -11,7 +11,7 @@ from cleanroom.printer import trace from cleanroom.systemcontext import SystemContext -import os.path +import os import shutil import typing @@ -38,8 +38,8 @@ def _map_target_link( link_directory = os.path.dirname(link) - (link, _old_link) = _map_base(old_base, new_base, link) - (link_target, _old_link_target) = _map_base( + (link, _) = _map_base(old_base, new_base, link) + (link_target, _) = _map_base( old_base, new_base, os.path.join(link_directory, link_target) ) @@ -50,7 +50,9 @@ def _map_target_link( return link, link_target -def _map_host_link(root_directory, old_base, new_base, link, link_target): +def _map_host_link( + root_directory: str, old_base: str, new_base: str, link: str, link_target: str +): assert root_directory.endswith("/") assert old_base.startswith(root_directory) assert new_base.startswith(root_directory) @@ -75,7 +77,13 @@ def _map_host_link(root_directory, old_base, new_base, link, link_target): return os.path.join(root_directory, host_link[1:]), link_target -def _move_symlink(location, system_context, old_base, new_base, link): +def _move_symlink( + location: Location, + system_context: SystemContext, + old_base: str, + new_base: str, + link: str, +): """Move a symlink.""" root_directory = system_context.fs_directory + "/" link_target = os.readlink(link) @@ -125,7 +133,7 @@ def _move_symlink(location, system_context, old_base, new_base, link): os.unlink(link) -def _move_file(location, old_base, new_base, path): +def _move_file(location: Location, old_base: str, new_base: str, path: str): """Move a file.""" path_dir = os.path.dirname(path) path_name = os.path.basename(path) @@ -183,7 +191,7 @@ def __call__( trace("walking:", old_base) - for root, _dirs, files in os.walk(old_base): + for root, _, files in os.walk(old_base): for f in files: full_path = os.path.join(root, f) trace("Checking", full_path) diff --git a/cleanroom/exceptions.py b/cleanroom/exceptions.py index b93da4a..ea71e16 100644 --- a/cleanroom/exceptions.py +++ b/cleanroom/exceptions.py @@ -16,7 +16,7 @@ class CleanRoomError(Exception): def __init__( self, *args: typing.Any, - location: Location = None, + location: typing.Optional[Location] = None, original_exception: typing.Optional[Exception] = None ) -> None: """Constructor.""" diff --git a/cleanroom/execobject.py b/cleanroom/execobject.py index d74e831..346b732 100644 --- a/cleanroom/execobject.py +++ b/cleanroom/execobject.py @@ -5,9 +5,14 @@ """ -import collections +from cleanroom.location import Location -ExecObject = collections.namedtuple( - "ExecObject", ["location", "command", "args", "kwargs"] -) +import typing + + +class ExecObject(typing.NamedTuple): + location: Location + command: str + args: typing.Tuple[typing.Any, ...] + kwargs: typing.Dict[str, typing.Any] diff --git a/cleanroom/firestarter/containerfsinstalltarget.py b/cleanroom/firestarter/containerfsinstalltarget.py index fa5306d..b4203f5 100755 --- a/cleanroom/firestarter/containerfsinstalltarget.py +++ b/cleanroom/firestarter/containerfsinstalltarget.py @@ -14,29 +14,31 @@ import typing -def _extract_into_snapshot(_, rootfs: str, *, import_snapshot: str) -> None: +def _extract_into_snapshot(_, rootfs: str, *, import_snapshot: str) -> int: # Extract data - tool.run( + return tool.run( "/usr/bin/bash", "-c", '( cd "{}" ; tar -cf - . ) | ( cd "{}" ; tar -xf - )'.format( rootfs, import_snapshot ), - ) + ).returncode class ContainerFilesystemInstallTarget(InstallTarget): def __init__(self) -> None: super().__init__("container_fs", "Install a container filesystem.") - def __call__(self, parse_result: typing.Any) -> None: + def __call__( + self, *, parse_result: typing.Any, tmp_dir: str, image_file: str + ) -> int: container_name = parse_result.override_system_name if not container_name: container_name = parse_result.system_name if container_name.startswith("system-"): container_name = container_name[7:] - container_dir = os.path.join("/var/lib/machines", container_name) + container_dir = os.path.join(parse_result.machines_dir, container_name) import_dir = container_dir + "_import" try: @@ -44,11 +46,10 @@ def __call__(self, parse_result: typing.Any) -> None: btrfs.create_subvolume(import_dir) # Mount filessystems and copy the rootfs into import_dir: - tool.execute_with_system_mounted( + result = tool.execute_with_system_mounted( lambda e, r: _extract_into_snapshot(e, r, import_snapshot=import_dir), - repository=parse_result.repository, - system_name=parse_result.system_name, - system_version=parse_result.system_version, + image_file=image_file, + tmp_dir=tmp_dir, ) # Delete *old* container-name: @@ -61,14 +62,22 @@ def __call__(self, parse_result: typing.Any) -> None: finally: btrfs.delete_subvolume(import_dir) - def setup_subparser(self, parser: typing.Any) -> None: - parser.add_argument( + return result + + def setup_subparser(self, subparser: typing.Any) -> None: + subparser.add_argument( "--container-name", dest="override_system_name", action="store", nargs="?", default="", help="Container name to use " - "[default: system-name without " - '"system-" prefix]', + '[default: system-name without "system-" prefix]', + ) + subparser.add_argument( + "--machines-dir", + dest="machines_dir", + action="store", + default="/var/lib/machines", + help="Machines directory " "[default: /var/lib/machines]", ) diff --git a/cleanroom/firestarter/copyinstalltarget.py b/cleanroom/firestarter/copyinstalltarget.py new file mode 100755 index 0000000..df740ad --- /dev/null +++ b/cleanroom/firestarter/copyinstalltarget.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +"""Copy Install Target + +@author: Tobias Hunger +""" + + +from cleanroom.firestarter.installtarget import InstallTarget + +from shutil import copy +import typing + + +class CopyInstallTarget(InstallTarget): + def __init__(self) -> None: + super().__init__("copy", "copy the image to a directory, device or file") + + def setup_subparser(self, subparser: typing.Any) -> None: + subparser.add_argument( + dest="target", action="store", help="The target to copy into.", + ) + + def __call__( + self, *, parse_result: typing.Any, tmp_dir: str, image_file: str + ) -> int: + assert parse_result.target + + copy(image_file, parse_result.target) + + return 0 diff --git a/cleanroom/firestarter/deploytarget.py b/cleanroom/firestarter/deploytarget.py index 006af87..1f3f35a 100755 --- a/cleanroom/firestarter/deploytarget.py +++ b/cleanroom/firestarter/deploytarget.py @@ -8,7 +8,6 @@ from cleanroom.firestarter.installtarget import InstallTarget -import os import typing @@ -18,8 +17,10 @@ def __init__(self) -> None: "deploy", "Deploy the machine as specified in its deployment information" ) - def setup_subparser(self, parser: typing.Any) -> None: + def setup_subparser(self, subparser: typing.Any) -> None: pass - def __call__(self, parse_result: typing.Any) -> None: - pass + def __call__( + self, *, parse_result: typing.Any, tmp_dir: str, image_file: str + ) -> int: + return 0 diff --git a/cleanroom/firestarter/directoryinstalltarget.py b/cleanroom/firestarter/directoryinstalltarget.py deleted file mode 100755 index 1dad519..0000000 --- a/cleanroom/firestarter/directoryinstalltarget.py +++ /dev/null @@ -1,45 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -"""Main CleanRoom binary. - -@author: Tobias Hunger -""" - - -from cleanroom.firestarter.installtarget import InstallTarget -import cleanroom.firestarter.tools as tool - -import typing - - -class DirectoryInstallTarget(InstallTarget): - def __init__(self) -> None: - super().__init__("directory", "export image into directory") - - def setup_subparser(self, parser: typing.Any) -> None: - parser.add_argument( - dest="target_directory", - action="store", - help="The directory to export into.", - ) - parser.add_argument( - "--mode", action="store", default=0, type=int, help="mode of exported file." - ) - parser.add_argument("--owner", action="store", help="owner of exported file") - parser.add_argument("--group", action="store", help="group of exported file") - - parser.add_argument( - "--create-target-directory", dest="create_directory", action="store_true" - ) - - def __call__(self, parse_result: typing.Any) -> None: - tool.export_into_directory( - parse_result.system_name, - parse_result.target_directory, - version=parse_result.system_version, - repository=parse_result.repository, - create_directory=parse_result.create_directory, - owner=parse_result.owner, - group=parse_result.group, - mode=parse_result.mode, - ) diff --git a/cleanroom/firestarter/imagepartitioninstalltarget.py b/cleanroom/firestarter/imagepartitioninstalltarget.py index fe152bc..cc0fabd 100755 --- a/cleanroom/firestarter/imagepartitioninstalltarget.py +++ b/cleanroom/firestarter/imagepartitioninstalltarget.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -"""Main CleanRoom binary. +"""image_partition install target @author: Tobias Hunger """ @@ -9,42 +9,122 @@ from cleanroom.firestarter.installtarget import InstallTarget import cleanroom.firestarter.tools as tool import cleanroom.helper.mount as mount +from cleanroom.printer import debug, trace -import os.path -from tempfile import TemporaryDirectory +import os +from shutil import copy2, copytree +from sys import exit import typing +def _copy_file(src: str, dest: str, overwrite: bool): + file = os.path.basename(src) + if not os.path.exists(os.path.join(dest, file)) or overwrite: + debug( + "Copying {} into {}{}".format(src, dest, " [FORCE]." if overwrite else ".") + ) + copy2(src, dest) + else: + debug("Skipped copy of {} into {}.".format(src, dest)) + + +def _copy_efi( + src: str, dest: str, *, include_bootloader: bool = False, overwrite: bool = False +) -> int: + try: + efi_path = os.path.join(dest, "EFI") + os.makedirs(efi_path, exist_ok=True) + + linux_src_path = os.path.join(src, "EFI/Linux") + kernels = [ + f + for f in os.listdir(linux_src_path) + if os.path.isfile(os.path.join(linux_src_path, f)) + ] + debug('Found kernel(s): "{}".'.format('", "'.join(kernels))) + assert len(kernels) == 1 + kernel = kernels[0] + + _copy_file( + os.path.join(linux_src_path, kernel), + os.path.join(dest, "EFI/Linux"), + overwrite=overwrite, + ) + + if include_bootloader: + trace("Copying bootloader.") + + efi_src_path = os.path.join(src, "EFI") + + dirs = [ + d + for d in os.listdir(efi_src_path) + if d != "Linux" and os.path.isdir(os.path.join(efi_src_path, d)) + ] + for d in dirs: + trace( + "Copying {} to {}".format(os.path.join(efi_src_path, d), efi_path) + ) + copytree(os.path.join(efi_src_path, d), efi_path, dirs_exist_ok=True) + + copytree( + os.path.join(src, "loader"), + os.path.join(dest, "loader"), + dirs_exist_ok=True, + ) + + except Exception as e: + debug("Failed to install EFI: {}.".format(e)) + return 1 + else: + debug("Successfully installed EFI") + return 0 + + class ImagePartitionInstallTarget(InstallTarget): def __init__(self) -> None: super().__init__( "image_partition", "export image into image_partition and update EFI." ) - def setup_subparser(self, parser: typing.Any) -> None: - parser.add_argument( + def setup_subparser(self, subparser: typing.Any) -> None: + subparser.add_argument( "--efi-device", action="store", - default="/dev/sda1", + default="", dest="efi_device", help="The device containing the EFI partition.", ) + subparser.add_argument( + "--efi-options", + action="store", + default="", + dest="efi_options", + help="The mount options for the EFI partition.", + ) + subparser.add_argument( + "--efi-fs-type", + action="store", + default="vfat", + dest="efi_fs_type", + help="The filesystem used on the EFI partition.", + ) - parser.add_argument( + subparser.add_argument( "--image-device", action="store", default="", dest="image_device", help="The device containing the images.", ) - parser.add_argument( + subparser.add_argument( "--image-fs-type", action="store", default="btrfs", dest="image_fs_type", help="The filesystem type containing the image " "[defaults to btrfs].", ) - parser.add_argument( + subparser.add_argument( "--image-options", action="store", default="subvol=/.images", @@ -52,24 +132,52 @@ def setup_subparser(self, parser: typing.Any) -> None: help="Options used to mount image filessystem " "[defaults to: subvol=/.images]", ) + subparser.add_argument( + "--add-bootloader", + action="store_true", + default=False, + dest="include_bootloader", + help="Install the boot loader files in addition to the kernel.", + ) + subparser.add_argument( + "--overwrite", + action="store_true", + default=False, + dest="overwrite", + help="Overwrite existing images/kernels.", + ) - def __call__(self, parse_result: typing.Any) -> None: - with TemporaryDirectory() as tempdir: - with mount.Mount( - parse_result.image_device, - os.path.join(tempdir, "images"), - options=parse_result.image_options, - fs_type=parse_result.image_fs_type, - ) as images_mnt: - exported_file = tool.export_into_directory( - parse_result.system_name, - images_mnt, - version=parse_result.system_version, - repository=parse_result.repository, - ) + def __call__( + self, *, parse_result: typing.Any, tmp_dir: str, image_file: str + ) -> int: + if not parse_result.efi_device: + print("No --efi-device provided, stopping.") + exit(1) + if not parse_result.image_device: + print("No --image-device provided, stopping.") + exit(1) - tool.copy_efi_partition( - image_file=exported_file, - efi_device=parse_result.efi_device, - tempdir=tempdir, - ) + with mount.Mount( + parse_result.image_device, + os.path.join(tmp_dir, "images"), + options=parse_result.image_options, + fs_type=parse_result.image_fs_type, + ) as images_mnt: + _copy_file(image_file, images_mnt, overwrite=parse_result.overwrite) + + with mount.Mount( + parse_result.efi_device, + os.path.join(tmp_dir, "efi_dest"), + options=parse_result.efi_options, + fs_type=parse_result.efi_fs_type, + ) as efi_dest_mnt: + return tool.execute_with_system_mounted( + lambda e, _: _copy_efi( + e, + efi_dest_mnt, + include_bootloader=parse_result.include_bootloader, + overwrite=parse_result.overwrite, + ), + image_file=image_file, + tmp_dir=tmp_dir, + ) diff --git a/cleanroom/firestarter/installtarget.py b/cleanroom/firestarter/installtarget.py index dc76984..651c0cf 100755 --- a/cleanroom/firestarter/installtarget.py +++ b/cleanroom/firestarter/installtarget.py @@ -14,7 +14,9 @@ def __init__(self, name: str, help_string: str) -> None: self._name = name self._help_string = help_string - def __call__(self, parse_result: typing.Any) -> None: + def __call__( + self, *, parse_result: typing.Any, tmp_dir: str, image_file: str + ) -> int: assert False def setup_subparser(self, subparser: typing.Any) -> None: diff --git a/cleanroom/firestarter/main.py b/cleanroom/firestarter/main.py index ecb9950..236fb80 100755 --- a/cleanroom/firestarter/main.py +++ b/cleanroom/firestarter/main.py @@ -5,24 +5,28 @@ @author: Tobias Hunger """ - -from cleanroom.firestarter.imagepartitioninstalltarget import ( - ImagePartitionInstallTarget, -) from cleanroom.firestarter.installtarget import InstallTarget + from cleanroom.firestarter.containerfsinstalltarget import ( ContainerFilesystemInstallTarget, ) -from cleanroom.firestarter.directoryinstalltarget import DirectoryInstallTarget +from cleanroom.firestarter.copyinstalltarget import CopyInstallTarget +from cleanroom.firestarter.imagepartitioninstalltarget import ( + ImagePartitionInstallTarget, +) from cleanroom.firestarter.mountinstalltarget import MountInstallTarget -from cleanroom.firestarter.qemubootinstalltarget import QemuBootInstallTarget +from cleanroom.firestarter.partitioninstalltarget import PartitionInstallTarget from cleanroom.firestarter.qemuinstalltarget import QemuInstallTarget +from cleanroom.firestarter.qemuimageinstalltarget import QemuImageInstallTarget from cleanroom.firestarter.tarballinstalltarget import TarballInstallTarget -from cleanroom.printer import Printer + +from cleanroom.printer import Printer, trace, debug +from cleanroom.firestarter.tools import BorgMount from argparse import ArgumentParser import os import sys +from tempfile import TemporaryDirectory import typing @@ -58,9 +62,14 @@ def _parse_commandline( ) subparsers = parser.add_subparsers( - help="Installation target specifics", dest="target_type" + help="Installation target specifics", dest="subcommand", required=True, ) for it in install_targets: + debug( + 'Setting up subparser for "{}" with help "{}".'.format( + it.name, it.help_string + ) + ) it.setup_subparser(subparsers.add_parser(it.name, help=it.help_string)) return parser.parse_args(args[1:]) @@ -69,13 +78,14 @@ def _parse_commandline( # Main section: -def main(*command_args: str): - known_install_targets = [ +def main(*command_args: str) -> int: + known_install_targets: typing.List[InstallTarget] = [ ContainerFilesystemInstallTarget(), - DirectoryInstallTarget(), + CopyInstallTarget(), ImagePartitionInstallTarget(), MountInstallTarget(), - QemuBootInstallTarget(), + PartitionInstallTarget(), + QemuImageInstallTarget(), QemuInstallTarget(), TarballInstallTarget(), ] @@ -89,16 +99,48 @@ def main(*command_args: str): pr.set_verbosity(parse_result.verbose) pr.show_verbosity_level() + trace("Arguments parsed from command line: {}.".format(parse_result)) + install_target = next( - x for x in known_install_targets if x.name == parse_result.target_type + x for x in known_install_targets if x.name == parse_result.subcommand ) - install_target(parse_result) + assert install_target + debug("Install target {} found.".format(install_target.name)) + + with TemporaryDirectory(prefix="fs_{}".format(install_target.name)) as tmp_dir: + trace("Using temporary directory: {}.".format(tmp_dir)) + + image_dir = os.path.join(tmp_dir, "borg") + os.makedirs(image_dir) + + with BorgMount( + image_dir, + system_name=parse_result.system_name, + repository=parse_result.repository, + version=parse_result.system_version, + ) as image_file: + trace("Mounted borg directory with image file: {}.".format(image_file)) + debug( + "Running install target with parse_args={}, tmp_dir={} and image_file={}.".format( + parse_result, tmp_dir, image_file + ) + ) + result = install_target( + parse_result=parse_result, tmp_dir=tmp_dir, image_file=image_file, + ) + debug("Install target done: return code: {}.".format(result)) + trace("Starting cleanup.") + + trace("Done, leaving with return code {}.".format(result)) + return result def run(): current_directory = os.getcwd() try: - main(*sys.argv) + result = main(*sys.argv) finally: os.chdir(current_directory) + + return result diff --git a/cleanroom/firestarter/mountinstalltarget.py b/cleanroom/firestarter/mountinstalltarget.py index 940c814..711dfa8 100755 --- a/cleanroom/firestarter/mountinstalltarget.py +++ b/cleanroom/firestarter/mountinstalltarget.py @@ -12,11 +12,10 @@ import os from shlex import split -from tempfile import TemporaryDirectory import typing -def _execution(efi: str, rootfs: str, *, command: str) -> None: +def _execution(efi: str, rootfs: str, *, command: str) -> int: to_exec = command or '/usr/bin/bash -c "read -n1 -s"' prompt = "" if command else "<<< Press any key to continue >>>" @@ -34,7 +33,7 @@ def _execution(efi: str, rootfs: str, *, command: str) -> None: if prompt: print(prompt) - tool.run(*split(to_exec), env=env) + return tool.run(*split(to_exec), env=env).returncode class MountInstallTarget(InstallTarget): @@ -45,16 +44,17 @@ def __init__(self) -> None: "the given command is done executing.", ) - def __call__(self, parse_result: typing.Any) -> None: - tool.execute_with_system_mounted( + def __call__( + self, *, parse_result: typing.Any, tmp_dir: str, image_file: str + ) -> int: + return tool.execute_with_system_mounted( lambda e, r: _execution(e, r, command=parse_result.command), - repository=parse_result.repository, - system_name=parse_result.system_name, - system_version=parse_result.system_version, + image_file=image_file, + tmp_dir=tmp_dir, ) - def setup_subparser(self, parser: typing.Any) -> None: - parser.add_argument( + def setup_subparser(self, subparser: typing.Any) -> None: + subparser.add_argument( "--command", action="store", nargs="?", diff --git a/cleanroom/firestarter/partitioninstalltarget.py b/cleanroom/firestarter/partitioninstalltarget.py new file mode 100755 index 0000000..4e777fd --- /dev/null +++ b/cleanroom/firestarter/partitioninstalltarget.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +"""Partitioning install target + +@author: Tobias Hunger +""" + +from cleanroom.printer import error, trace +import cleanroom.helper.disk as disk +import cleanroom.helper.run as run + +from cleanroom.firestarter.installtarget import InstallTarget + +import os +import typing + + +def validate_device(dev: str, dir: str) -> typing.Tuple[str, str, str]: + if not os.path.isdir(dir): + ("{} is not a directory.".format(dir)) + return ("", "", "") + + if not disk.is_block_device(dev): + ("{} is not a block device.".format(dev)) + return ("", "", "") + + return (dev, "raw", dir) + + +def validate_image( + file: str, format: str, size: str, dir: str +) -> typing.Tuple[str, str, str]: + if not os.path.isdir(dir): + ("{} is not a directory.".format(dir)) + return ("", "", "") + + if file.startswith("/dev/") or file.startswith("/sys/"): + ('"{}" does not look like a image file name.'.format(file)) + return ("", "", "") + if not os.path.exists(file): + if not size: + ('No size for missing image file "{}"'.format(file)) + return ("", "", "") + disk.create_image_file(file, disk.byte_size(size), disk_format=format) + + if not os.path.isfile(file): + ('"{}" exists but is no file.'.format(file)) + return ("", "", "") + + return (file, format, dir) + + +def parse_arguments(args: typing.Any) -> typing.List[typing.Tuple[str, str, str]]: + device_list: typing.List[typing.Tuple[str, str, str]] = [] + device_map: typing.Dict[str, bool] = {} + + for m in args.mappings: + parts = m.split(":") + (dev, format, dir) = ("", "", "") + trace("==> {}.".format(":".join(parts))) + + if len(parts) == 2: + (dev, format, dir) = validate_device(*parts) + elif len(parts) == 3: + (dev, format, dir) = validate_image(parts[0], parts[1], "", parts[2]) + elif len(parts) == 4: + (dev, format, dir) = validate_image(parts[0], parts[1], parts[2], parts[3]) + + if not dev or not dir: + error('Failed to parse device mapping "{}"'.format(m)) + return [] + + if dev in device_map: + error('Multiple definitions of device "{}".'.format(dev)) + return [] + else: + device_map[dev] = True + + device_list.append((dev, format, dir)) + + return device_list + + +def create_device(dev: str, format: str) -> disk.Device: + if os.path.isfile(dev): + return disk.NbdDevice(dev, disk_format=format) + assert format == "raw" + return disk.Device(dev) + + +class PartitionInstallTarget(InstallTarget): + def __init__(self) -> None: + super().__init__("partition", "Partition installation devices") + + def setup_subparser(self, subparser: typing.Any) -> None: + subparser.add_argument( + "mappings", + metavar="(|::?):", + help="A mapping of device to systemd-repart directory, separated by :", + nargs="+", + ) + + def __call__( + self, *, parse_result: typing.Any, tmp_dir: str, image_file: str + ) -> int: + device_list = parse_arguments(parse_result) + if not device_list: + return 1 + + for (dev, format, dir) in device_list: + with create_device(dev, format) as d: + dev_node = d.device() + run.run( + "/usr/bin/systemd-repart", + "--definitions={}".format(dir), + "--dry-run=no", + "--empty=force", + dev_node, + ) + + return 0 diff --git a/cleanroom/firestarter/qemubootinstalltarget.py b/cleanroom/firestarter/qemubootinstalltarget.py deleted file mode 100755 index 5d6b773..0000000 --- a/cleanroom/firestarter/qemubootinstalltarget.py +++ /dev/null @@ -1,53 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -"""Firestarter: Simple qemu runner - -@author: Tobias Hunger -""" - - -from cleanroom.firestarter.installtarget import InstallTarget -import cleanroom.firestarter.tools as tool -import cleanroom.firestarter.qemutools as qemu_tool - -import os -from tempfile import TemporaryDirectory -import typing - - -class QemuBootInstallTarget(InstallTarget): - def __init__(self) -> None: - super().__init__("qemu_boot", "Boot image in qemu") - - def __call__(self, parse_result: typing.Any) -> None: - if not "DISPLAY" in os.environ: - print("No DISPLAY variable set: Can not start qemu.") - exit(1) - - with TemporaryDirectory(prefix="clrm_qemu_") as tempdir: - extracted_version = tool.write_image( - parse_result.system_name, - tempdir, - repository=parse_result.repository, - version=parse_result.system_version, - ) - - extracted_image = os.path.join(tempdir, "clrm_{}".format(extracted_version)) - assert os.path.isfile(extracted_image) - - clrm_device = "{}:raw".format(extracted_image) - if parse_result.usb_clrm: - clrm_device += ":usb" - - qemu_tool.run_qemu( - parse_result, drives=[clrm_device], work_directory=tempdir, - ) - - def setup_subparser(self, parser: typing.Any) -> None: - qemu_tool.setup_parser_for_qemu(parser) - parser.add_argument( - "--usb-clrm", - dest="usb_clrm", - action="store_true", - help="Put CLRM onto a USB stick", - ) diff --git a/cleanroom/firestarter/qemuimageinstalltarget.py b/cleanroom/firestarter/qemuimageinstalltarget.py new file mode 100755 index 0000000..3a1a565 --- /dev/null +++ b/cleanroom/firestarter/qemuimageinstalltarget.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +"""Firestarter: qemu_image install command + +@author: Tobias Hunger +""" + + +from cleanroom.firestarter.installtarget import InstallTarget +import cleanroom.firestarter.qemutools as qemu_tool +import cleanroom.firestarter.tools as tool +import cleanroom.helper.disk as disk +import cleanroom.helper.mount as mount +from cleanroom.helper.run import run +from cleanroom.printer import debug, verbose, trace + +import os +from shutil import copyfile, copytree +import typing + + +def _create_hdd_image(device: disk.Device): + verbose("hdd.img created.") + partitioner = disk.Partitioner(device) + + partitioner.repartition( + [ + disk.Partitioner.efi_partition(size=disk.byte_size("512M")), + disk.Partitioner.swap_partition(size=disk.byte_size("1G"), name="swap"), + disk.Partitioner.data_partition(name="data"), + ] + ) + + verbose("hdd.img repartitioned.") + + debug("Format EFI partitition.") + run("/usr/bin/mkfs.vfat", "-F32", device.device(1)) + debug("Set up swap partitition.") + run("/usr/bin/mkswap", device.device(2)) + debug("Format data partitition.") + run("/usr/bin/mkfs.btrfs", "-L", "fs_btrfs", device.device(3)) + + +def _setup_btrfs(mount_point: str): + trace("Creating subvolumes.") + run("btrfs", "subvol", "create", "@btrfs", work_directory=mount_point) + run("btrfs", "subvol", "create", "@home", work_directory=mount_point) + run("btrfs", "subvol", "create", "@var", work_directory=mount_point) + run("btrfs", "subvol", "create", ".images", work_directory=mount_point) + + +def _copy_efi(src: str, dest: str) -> int: + try: + efi_path = os.path.join(dest, "EFI") + os.makedirs(efi_path, exist_ok=True) + + trace("Copying bootloader.") + + efi_src_path = os.path.join(src, "EFI") + + dirs = [ + d + for d in os.listdir(efi_src_path) + if os.path.isdir(os.path.join(efi_src_path, d)) + ] + for d in dirs: + dest = os.path.join(efi_path, d) + trace("Copying {} to {}".format(os.path.join(efi_src_path, d), dest)) + copytree( + os.path.join(efi_src_path, d), dest, dirs_exist_ok=True, + ) + + copytree( + os.path.join(src, "loader"), + os.path.join(dest, "loader"), + dirs_exist_ok=True, + ) + + except Exception as e: + debug("Failed to install EFI: {}.".format(e)) + return 1 + else: + debug("Successfully installed EFI") + return 0 + + +def create_qemu_image( + image_path: str, + *, + image_size: int, + image_format: str = "qcow2", + system_image_file: str, + tmp_dir: str, +) -> str: + trace("Creating image file {}.".format(image_path)) + with disk.NbdDevice.new_image_file( + image_path, image_size, disk_format=image_format + ) as device: + _create_hdd_image(device) + + debug("mounting data partition for further setup.") + with mount.Mount( + device.device(3), + os.path.join(tmp_dir, "data"), + fs_type="btrfs", + options="subvolid=0", + fallback_cwd=os.getcwd(), + ) as data_dir: + _setup_btrfs(data_dir) + + trace("Copying image file") + copyfile( + system_image_file, + os.path.join(data_dir, ".images", os.path.basename(system_image_file)), + ) + + with mount.Mount( + device.device(1), + os.path.join(tmp_dir, "efi_dest"), + options="defaults", + fs_type="vfat", + ) as efi_dest_mnt: + tool.execute_with_system_mounted( + lambda e, _: _copy_efi(e, efi_dest_mnt,), + image_file=system_image_file, + tmp_dir=tmp_dir, + ) + + return image_path + + +class QemuImageInstallTarget(InstallTarget): + def __init__(self) -> None: + super().__init__("qemu-image", "Set up hdd image and start it in qemu") + + def __call__( + self, *, parse_result: typing.Any, tmp_dir: str, image_file: str + ) -> int: + image_path = create_qemu_image( + os.path.join(tmp_dir, "hdd.img"), + image_size=parse_result.hdd_size, + image_format=parse_result.hdd_format, + system_image_file=image_file, + tmp_dir=tmp_dir, + ) + + return qemu_tool.run_qemu( + parse_result, + drives=["{}:{}".format(image_path, parse_result.hdd_format)], + work_directory=tmp_dir, + ) + + def setup_subparser(self, subparser: typing.Any) -> None: + qemu_tool.setup_parser_for_qemu(subparser) + + subparser.add_argument( + "--hdd-size", + dest="hdd_size", + action="store", + nargs="?", + default="10G", + help="Size of HDD to generate.", + ) + + subparser.add_argument( + "--hdd-format", + dest="hdd_format", + action="store", + nargs="?", + default="qcow2", + help="Format of HDD to generate.", + ) diff --git a/cleanroom/firestarter/qemuinstalltarget.py b/cleanroom/firestarter/qemuinstalltarget.py index 1784340..22784b4 100755 --- a/cleanroom/firestarter/qemuinstalltarget.py +++ b/cleanroom/firestarter/qemuinstalltarget.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -"""Firestarter binary. +"""Firestarter: Simple qemu runner @author: Tobias Hunger """ @@ -8,136 +8,35 @@ from cleanroom.firestarter.installtarget import InstallTarget import cleanroom.firestarter.qemutools as qemu_tool -import cleanroom.firestarter.tools as tool -import cleanroom.helper.disk as disk -import cleanroom.helper.mount as mount -from cleanroom.helper.run import run -from cleanroom.printer import debug, verbose, trace import os -from tempfile import TemporaryDirectory import typing -def _create_hdd_image(device): - verbose("hdd.img created.") - partitioner = disk.Partitioner(device) - - partitioner.repartition( - [ - disk.Partitioner.efi_partition(size="512M"), - disk.Partitioner.swap_partition(size="1G", name="swap"), - disk.Partitioner.data_partition(name="data"), - ] - ) - - verbose("hdd.img repartitioned.") - - debug("Format EFI partitition.") - run("/usr/bin/mkfs.vfat", device.device(1)) - debug("Set up swap partitition.") - run("/usr/bin/mkswap", device.device(2)) - debug("Format data partitition.") - run("/usr/bin/mkfs.btrfs", "-L", "fs_btrfs", device.device(3)) - - -def _setup_btrfs(mount_point: str): - trace("Creating subvolumes.") - run("btrfs", "subvol", "create", "@btrfs", work_directory=mount_point) - run("btrfs", "subvol", "create", "@home", work_directory=mount_point) - run("btrfs", "subvol", "create", "@var", work_directory=mount_point) - run("btrfs", "subvol", "create", ".images", work_directory=mount_point) - - -def create_qemu_image( - image_path: str, - *, - image_size: int, - image_format: str = "qcow2", - system_name: str, - system_version: str = "", - repository: str, - tempdir: str -) -> str: - trace("Creating image file {}.".format(image_path)) - with disk.NbdDevice.new_image_file( - image_path, image_size, disk_format=image_format - ) as device: - _create_hdd_image(device) - - debug("mounting data partition for further setup.") - with mount.Mount( - device.device(3), - os.path.join(tempdir, "data"), - fs_type="btrfs", - options="subvolid=0", - fallback_cwd=os.getcwd(), - ) as data_dir: - _setup_btrfs(data_dir) - - extract_location = os.path.join(data_dir, ".images") - verbose("Extracting system image to {}.".format(extract_location)) - extracted_version = tool.write_image( - system_name, - extract_location, - repository=repository, - version=system_version, - ) - - extracted_image = os.path.join( - data_dir, ".images", "clrm_{}".format(extracted_version) - ) - assert os.path.isfile(extracted_image) - - tool.copy_efi_partition( - image_file=extracted_image, - efi_device=device.device(1), - tempdir=tempdir, - kernel_only=False, - ) - - return image_path - - class QemuInstallTarget(InstallTarget): def __init__(self) -> None: - super().__init__("qemu", "Set up hdd image and start it in qemu") - - def __call__(self, parse_result: typing.Any) -> None: - with TemporaryDirectory(prefix="clrm_qemu_") as tempdir: - image_path = create_qemu_image( - os.path.join(tempdir, "hdd.img"), - image_size=parse_result.hdd_size, - image_format=parse_result.hdd_format, - system_name=parse_result.system_name, - system_version=parse_result.system_version, - repository=parse_result.repository, - tempdir=tempdir, - ) + super().__init__("qemu", "Boot image in qemu") - qemu_tool.run_qemu( - parse_result, - drives=["{}:{}".format(image_path, parse_result.hdd_format)], - work_directory=tempdir, - ) + def __call__( + self, *, parse_result: typing.Any, tmp_dir: str, image_file: str + ) -> int: + if not "DISPLAY" in os.environ: + print("No DISPLAY variable set: Can not start qemu.") + exit(1) - def setup_subparser(self, parser: typing.Any) -> None: - qemu_tool.setup_parser_for_qemu(parser) + clrm_device = "{}:raw:read-only".format(image_file) + if parse_result.usb_clrm: + clrm_device += ":usb" - parser.add_argument( - "--hdd-size", - dest="hdd_size", - action="store", - nargs="?", - default="10G", - help="Size of HDD to generate.", + return qemu_tool.run_qemu( + parse_result, drives=[clrm_device], work_directory=tmp_dir, ) - parser.add_argument( - "--hdd-format", - dest="hdd_format", - action="store", - nargs="?", - default="qcow2", - help="Format of HDD to generate.", + def setup_subparser(self, subparser: typing.Any) -> None: + qemu_tool.setup_parser_for_qemu(subparser) + subparser.add_argument( + "--usb-clrm", + dest="usb_clrm", + action="store_true", + help="Put CLRM onto a virtual USB stick", ) diff --git a/cleanroom/firestarter/qemutools.py b/cleanroom/firestarter/qemutools.py index bed1040..128fd2b 100755 --- a/cleanroom/firestarter/qemutools.py +++ b/cleanroom/firestarter/qemutools.py @@ -11,7 +11,14 @@ import typing -def _append_network(hostname, *, hostfwd=[], mac="", net="", host=""): +def _append_network( + hostname: str, + *, + hostfwd: typing.List[str] = [], + mac: str = "", + net: str = "", + host: str = "" +): hostfwd_args = ["hostfwd={}".format(p) for p in hostfwd] hostfwd_str = "," + ",".join(hostfwd_args) if hostfwd_args else "" @@ -32,11 +39,14 @@ def _append_network(hostname, *, hostfwd=[], mac="", net="", host=""): ] -def _append_hdd(bootindex, counter, disk): +def _append_hdd(bootindex: int, counter: int, disk: str): disk_parts = disk.split(":") usb_disk = "usb" in disk_parts + read_only = "read-only" in disk_parts if usb_disk: disk_parts.remove("usb") + if read_only: + disk_parts.remove("read-only") if len(disk_parts) < 2: disk_parts.append("qcow2") @@ -48,15 +58,21 @@ def _append_hdd(bootindex, counter, disk): if usb_disk: driver = "usb-storage" + drive_extra = "" + if read_only: + drive_extra += ",read-only" + return [ "-drive", - "file={},format={},if=none,id=disk{}".format(disk_parts[0], disk_parts[1], c), + "file={},format={},if=none,id=disk{}{}".format( + disk_parts[0], disk_parts[1], c, drive_extra + ), "-device", "{},drive=disk{},bootindex={}".format(driver, c, bootindex), ] -def _append_fs(fs, *, read_only=False): +def _append_fs(fs: str, *, read_only: bool = False): fs_parts = fs.split(":") assert len(fs_parts) == 2 @@ -70,7 +86,7 @@ def _append_fs(fs, *, read_only=False): ] -def _append_efi(efi_vars): +def _append_efi(efi_vars: str): if not os.path.exists(efi_vars): copyfile("/usr/share/ovmf/x64/OVMF_VARS.fd", efi_vars) return [ @@ -173,7 +189,7 @@ def setup_parser_for_qemu(parser: typing.Any) -> None: def run_qemu( parse_result: typing.Any, *, drives: typing.List[str] = [], work_directory: str -): +) -> int: qemu_args = [ "/usr/bin/qemu-system-x86_64", "--enable-kvm", @@ -233,3 +249,5 @@ def run_qemu( print("Qemu run Failed with return code {}.".format(result.returncode)) print("Qemu stdout: {}".format(result.stdout)) print("Qemu stderr: {}".format(result.stderr)) + + return result.returncode diff --git a/cleanroom/firestarter/tarballinstalltarget.py b/cleanroom/firestarter/tarballinstalltarget.py index 511e140..9433a5f 100755 --- a/cleanroom/firestarter/tarballinstalltarget.py +++ b/cleanroom/firestarter/tarballinstalltarget.py @@ -8,63 +8,68 @@ from cleanroom.firestarter.installtarget import InstallTarget import cleanroom.firestarter.tools as tool -from cleanroom.helper.btrfs import BtrfsHelper import os import typing -def _tar(efi_fs: str, rootfs: str, *, tarball_name: str, efi_tarball_name: str) -> None: +def _tar(efi_fs: str, rootfs: str, *, tarball_name: str, efi_tarball_name: str) -> int: # Extract data + result = 0 if efi_tarball_name: - tool.run( + result = tool.run( "/usr/bin/bash", "-c", '( cd {} ; tar -cf "{}" --auto-compress .) '.format( efi_fs, efi_tarball_name ), - ) + ).returncode if tarball_name: - tool.run( + result += tool.run( "/usr/bin/bash", "-c", '( cd {} ; tar -cf "{}" --auto-compress .) '.format(rootfs, tarball_name), - ) + ).returncode + + return result class TarballInstallTarget(InstallTarget): def __init__(self) -> None: super().__init__("tarball", "Creates a tarball from the system image.") - def setup_subparser(self, parser: typing.Any) -> None: - parser.add_argument( + def setup_subparser(self, subparser: typing.Any) -> None: + subparser.add_argument( "--efi-tarball", action="store", dest="efi_tarball", help="The tarball containing the EFI partition. [Default: empty -- skip]", ) - parser.add_argument( + subparser.add_argument( "--tarball", action="store", dest="tarball", help="The tarball containing the root filesystem image [Default: empty -- skip].", ) - def __call__(self, parse_result: typing.Any) -> None: + def __call__( + self, *, parse_result: typing.Any, tmp_dir: str, image_file: str + ) -> int: if not parse_result.tarball and not parse_result.efi_tarball: - return + return 1 + + assert os.path.isfile(image_file) # Mount filessystems and copy the rootfs into import_dir: - tool.execute_with_system_mounted( + return tool.execute_with_system_mounted( lambda e, r: _tar( e, r, tarball_name=parse_result.tarball, efi_tarball_name=parse_result.efi_tarball, ), - repository=parse_result.repository, - system_name=parse_result.system_name, - system_version=parse_result.system_version, + image_file=image_file, + tmp_dir=tmp_dir, ) diff --git a/cleanroom/firestarter/tools.py b/cleanroom/firestarter/tools.py index 8c7789f..e12296f 100755 --- a/cleanroom/firestarter/tools.py +++ b/cleanroom/firestarter/tools.py @@ -11,11 +11,7 @@ import cleanroom.helper.mount as mount import os -from shutil import chown, copyfile -from distutils.dir_util import copy_tree import subprocess -import sys -from tempfile import TemporaryDirectory import typing @@ -23,7 +19,10 @@ def run( - *args, work_directory: str = "", check: bool = True, env: os._Environ = os.environ + *args: str, + work_directory: str = "", + check: bool = True, + env: typing.Any = os.environ, ## What is a better type for this? ) -> subprocess.CompletedProcess: env["LC_ALL"] = "en_US.UTF-8" @@ -49,7 +48,7 @@ def run( return result -def run_borg(*args, work_directory: str = "") -> subprocess.CompletedProcess: +def run_borg(*args: str, work_directory: str = "") -> subprocess.CompletedProcess: return run("/usr/bin/borg", *args, work_directory=work_directory) @@ -77,141 +76,66 @@ def find_archive( return archive_to_use, archive_to_use[len(system_name) + 1 :] -def extract_archive(archive: str, target_directory: str, *, repository: str) -> None: - run_borg( - "extract", "{}::{}".format(repository, archive), work_directory=target_directory - ) - - -def write_image( - system_name: str, target_directory: str, *, repository: str, version: str = "" -) -> str: - (archive_to_extract, extracted_version) = find_archive( - system_name, repository=repository, version=version - ) - if not archive_to_extract: - if version: - print( - 'Could not find version "{}" of system "{}" to extract.'.format( - version, system_name - ) - ) - else: - print('Could not find system "{}" to extract'.format(system_name)) - sys.exit(2) - - extract_archive(archive_to_extract, target_directory, repository=repository) - - return extracted_version - - -def export_into_directory( - system_name: str, - target_directory: str, - *, - repository: str, - version: str = "", - create_directory: bool = False, - owner: str = "", - group: str = "", - mode: int = 0 -) -> str: - if not os.path.isdir(target_directory): - if create_directory: - os.makedirs(target_directory) - - assert os.path.isdir(target_directory) - - with TemporaryDirectory(prefix="clrm_dir_", dir=target_directory) as tempdir: - extracted_version = write_image( - system_name, tempdir, repository=repository, version=version - ) - - exported_file_name = "clrm_{}".format(extracted_version) - exported_file = os.path.join(tempdir, exported_file_name) - assert os.path.isfile(exported_file) +def execute_with_system_mounted( + to_execute: typing.Callable[[str, str], int], *, image_file: str, tmp_dir: str +) -> int: + assert os.path.isfile(image_file) - if group or owner: - chown(exported_file, user=owner or "root", group=group or "root") - if mode: - os.chmod(exported_file, mode) + with disk.NbdDevice(image_file, disk_format="raw", read_only=True) as device: + verbose("Mounting EFI...") + device.wait_for_device_node(partition=1) + with mount.Mount( + device.device(1), + os.path.join(tmp_dir, "EFI"), + fs_type="vfat", + options="ro", + ) as efi: + verbose("Mounting root filesystem...") + with mount.Mount( + device.device(2), + os.path.join(tmp_dir, "root"), + fs_type="squashfs", + options="ro", + ) as root: - target_file = os.path.join(target_directory, exported_file_name) - os.rename(exported_file, target_file) + trace('Executing with EFI "{}" and root "{}".'.format(efi, root)) + result = to_execute(efi, root) - # Create symlink: - link_location = os.path.join(target_directory, "latest.img") - if os.path.islink(link_location): - os.unlink(link_location) - os.symlink("./{}".format(exported_file_name), link_location) - if group or owner: - chown(link_location, user=owner or "root", group=group or "root") + return result - return target_file +class BorgMount: + def __init__( + self, mnt_point: str, *, repository: str, system_name: str, version: str, + ) -> None: + if not os.path.isdir(mnt_point): + raise OSError('Mount point "{}" is not a directory.'.format(mnt_point)) -def copy_efi_partition( - *, image_file: str, efi_device, tempdir: str, kernel_only: bool = True -): - verbose("Copying EFI configuration out of image file.") - with disk.NbdDevice(image_file, disk_format="raw") as internal_device: - internal_device.wait_for_device_node(partition=1) - with mount.Mount( - internal_device.device(1), os.path.join(tempdir, "_efi") - ) as int_efi: - with mount.Mount( - efi_device, os.path.join(tempdir, "efi"), fs_type="vfat" - ) as efi: - if kernel_only: - img_dir = os.path.join(int_efi, "EFI", "Linux") - efi_dir = os.path.join(efi, "EFI", "Linux") - assert os.path.isdir(img_dir) - if not os.path.isdir(efi_dir): - os.makedirs(efi_dir) - - for f in [ - f - for f in os.listdir(img_dir) - if os.path.isfile(os.path.join(img_dir, f)) - ]: - trace("Copying EFI kernel {}.".format(f)) - copyfile(os.path.join(img_dir, f), os.path.join(efi_dir, f)) - else: - trace("Copying EFI folder into system.") - copy_tree(int_efi, efi) + (archive, _) = find_archive(system_name, repository=repository, version=version) + if not archive: + raise OSError("Failed to find repository or system.") + self._mnt_point = mnt_point + self._repository = repository + self._archive = archive + self._version = version -def execute_with_system_mounted( - to_execute: typing.Callable[[str, str], None], - *, - repository: str, - system_name: str, - system_version: str = "" -) -> None: - with TemporaryDirectory(prefix="clrm_qemu_") as tempdir: - verbose("Extracting image") - image_path = export_into_directory( - system_name, tempdir, repository=repository, version=system_version + def __enter__(self) -> typing.Any: + run_borg( + "mount", "{}::{}".format(self._repository, self._archive), self._mnt_point ) - assert os.path.isfile(image_path) + # find image file: + image_files = [ + f + for f in os.listdir(self._mnt_point) + if self._version in f and os.path.isfile(os.path.join(self._mnt_point, f)) + ] + assert len(image_files) == 1 - with disk.NbdDevice(image_path, disk_format="raw") as device: - verbose("Mounting EFI...") - device.wait_for_device_node(partition=1) - with mount.Mount( - device.device(1), - os.path.join(tempdir, "EFI"), - fs_type="vfat", - options="ro", - ) as efi: - verbose("Mounting root filesystem...") - with mount.Mount( - device.device(2), - os.path.join(tempdir, "root"), - fs_type="squashfs", - options="ro", - ) as root: - - verbose('Executing with EFI "{}" and root "{}".'.format(efi, root)) - to_execute(efi, root) + return os.path.join(self._mnt_point, image_files[0]) + + def __exit__( + self, exc_type: typing.Any, exc_val: typing.Any, exc_tb: typing.Any + ) -> None: + mount.umount(self._mnt_point) diff --git a/cleanroom/helper/archlinux/pacman.py b/cleanroom/helper/archlinux/pacman.py index 7821d0c..e5352e2 100644 --- a/cleanroom/helper/archlinux/pacman.py +++ b/cleanroom/helper/archlinux/pacman.py @@ -5,10 +5,8 @@ """ -from ...binarymanager import Binaries from ...printer import debug, info from ...systemcontext import SystemContext -from ..btrfs import BtrfsHelper from ..run import run from ..mount import umount_all, mount @@ -137,7 +135,7 @@ def _pacman_keyinit(system_context: SystemContext, pacman_key_command: str) -> N ) -def _mountpoint(root_dir, folder, dev, **kwargs): +def _mountpoint(root_dir: str, folder: str, dev: str, **kwargs: typing.Any): debug("Mounting {} in chroot.".format(folder)) path = os.path.join(root_dir, folder) if not os.path.isdir(path): @@ -145,7 +143,7 @@ def _mountpoint(root_dir, folder, dev, **kwargs): mount(dev, path, **kwargs) -def _mount_directories_if_needed(root_dir, *, pacman_in_filesystem=False): +def _mount_directories_if_needed(root_dir: str, *, pacman_in_filesystem: bool = False): debug("Preparing pacman chroot for external pacman run.") _mountpoint(root_dir, "proc", "proc", options="nosuid,noexec,nodev", fs_type="proc") _mountpoint( @@ -172,7 +170,7 @@ def _mount_directories_if_needed(root_dir, *, pacman_in_filesystem=False): ) -def _unmount_directories_if_needed(root_dir, *, pacman_in_filesystem=False): +def _umount_directories_if_needed(root_dir: str, *, pacman_in_filesystem: bool = False): debug("Cleaning up pacman chroot.") umount_all(root_dir) @@ -182,7 +180,7 @@ def _run_pacman( *args: str, pacman_command: str, pacman_in_filesystem: bool, - **kwargs + **kwargs: typing.Any ) -> None: _sanity_check(system_context) @@ -310,7 +308,7 @@ def pacman( pacman_command=pacman_command, pacman_in_filesystem=previous_pacstate ) - _unmount_directories_if_needed( + _umount_directories_if_needed( system_context.fs_directory, pacman_in_filesystem=previous_pacstate ) @@ -367,21 +365,21 @@ def pacman_report( ) # Generate file list: - qlin = os.path.join(directory, "pacman-Ql.txt.in") + ql_in = os.path.join(directory, "pacman-Ql.txt.in") action = ["-Ql"] _run_pacman( system_context, *action, - stdout=qlin, + stdout=ql_in, pacman_command=pacman_command, pacman_in_filesystem=False ) # Filter prefix from file list: - with open(qlin, "r") as input_fd: + with open(ql_in, "r") as input_fd: with open(os.path.join(directory, "pacman-Ql.txt"), "w") as output_fd: for line in input_fd: output_fd.write(line.replace(system_context.fs_directory, "")) # Remove prefix-ed version: - os.remove(qlin) + os.remove(ql_in) diff --git a/cleanroom/helper/btrfs.py b/cleanroom/helper/btrfs.py index 243892b..f0b3f2d 100644 --- a/cleanroom/helper/btrfs.py +++ b/cleanroom/helper/btrfs.py @@ -8,12 +8,12 @@ from ..printer import trace from .run import run -import os.path +import os import typing class BtrfsHelper: - def __init__(self, btrfs_command): + def __init__(self, btrfs_command: str): assert btrfs_command self._command = btrfs_command diff --git a/cleanroom/helper/debian/apt.py b/cleanroom/helper/debian/apt.py index d513ce7..c0c5008 100644 --- a/cleanroom/helper/debian/apt.py +++ b/cleanroom/helper/debian/apt.py @@ -5,7 +5,6 @@ """ -from ...binarymanager import Binaries from ...systemcontext import SystemContext from ..run import run @@ -20,22 +19,10 @@ def _package_type(system_context: SystemContext) -> typing.Optional[str]: return system_context.substitution("CLRM_PACKAGE_TYPE", "") -def _set_package_type(system_context: SystemContext) -> None: - system_context.set_substitution("CLRM_PACKAGE_TYPE", "deb") - - def _apt_state(system_context: SystemContext) -> bool: return system_context.substitution("APT_INSTALL_STATE", str(False)) -def _set_apt_state(system_context: SystemContext, internal: bool = False) -> None: - system_context.set_substitution("APT_INSTALL_STATE", str(internal)) - - -def _fs_directory(system_context: SystemContext) -> str: - return system_context.fs_directory - - def _dpkg_state_directory(system_context: SystemContext, internal: bool = False) -> str: if internal: return system_context.file_name("/var/lib/dpkg") @@ -83,8 +70,8 @@ def debootstrap( variant: str = "", target: str, mirror: str, - include: str = None, - exclude: str = None, + include: typing.Optional[str] = None, + exclude: typing.Optional[str] = None, debootstrap_command: str ) -> None: """Run debootstrap on host.""" @@ -94,7 +81,7 @@ def debootstrap( assert suite assert target - args = [] # type: typing.List[str] + args: typing.List[str] = [] if variant: args.append("--variant={}".format(variant)) if include: diff --git a/cleanroom/helper/disk.py b/cleanroom/helper/disk.py index 086a867..8864b1d 100644 --- a/cleanroom/helper/disk.py +++ b/cleanroom/helper/disk.py @@ -11,11 +11,9 @@ from ..printer import debug, trace, warn from .run import run -import collections import json import math import os -import os.path import subprocess from re import findall import stat @@ -23,23 +21,25 @@ from time import sleep -Disk = collections.namedtuple( - "Disk", - [ - "label", - "id", - "device", - "unit", - "firstlba", - "lastlba", - "partitions", - "sectorsize", - ], -) -Partition = collections.namedtuple( - "Partition", - ["node", "start", "size", "partition_type", "uuid", "name", "sectorsize"], -) +class Disk(typing.NamedTuple): + label: str + id: str + device: str + unit: str + firstlba: typing.Optional[int] + lastlba: typing.Optional[int] + partitions: typing.List[Partition] + sectorsize: int + + +class Partition(typing.NamedTuple): + node: typing.Optional[str] + start: typing.Optional[int] + size: typing.Optional[int] + partition_type: str + uuid: str + name: str + sectorsize: int def _is_root() -> bool: @@ -61,10 +61,14 @@ def is_nbd_device_in_use(device: str, *, nbd_client_command: str = ""): # Not connected according to nbd-client, now try to open: # https://unix.stackexchange.com/questions/33508/check-which-network-block-devices-are-in-use # says this extra step is necessary. + trace("Running extra open check...") fd = os.open(device, os.O_EXCL) ret_val = fd == -1 if fd != -1: os.close(fd) + + trace(" Extra check result: {}.".format(ret_val)) + return ret_val @@ -137,10 +141,10 @@ def create_image_file( if not os.path.exists(file_name): trace("New image file") - with open(file_name, "a") as f: + with open(file_name, "a") as _: pass trace(".... image file created.") - run("/usr/bin/attr", "+C", file_name, returncode=None) + run("/usr/bin/chattr", "+C", file_name, returncode=None) trace(".... nocow attribtue set on file (if supported).") run( @@ -164,7 +168,9 @@ def __init__(self, device: str) -> None: def __enter__(self) -> typing.Any: return self - def __exit__(self, exc_type, exc_val, exc_tb) -> None: + def __exit__( + self, exc_type: typing.Any, exc_val: typing.Any, exc_tb: typing.Any + ) -> None: pass def device(self, partition: typing.Optional[int] = None) -> str: @@ -173,12 +179,12 @@ def device(self, partition: typing.Optional[int] = None) -> str: return "{}{}".format(self._device, partition) def close(self): - pass + self._device = "" def wait_for_device_node(self, partition: typing.Optional[int] = None) -> bool: dev = self.device(partition) trace('Waiting for "{}".'.format(dev)) - for i in range(20): + for _ in range(20): if is_block_device(dev): return True elif os.path.exists(dev): @@ -227,6 +233,7 @@ def __init__( nbd_client_command: str = "", sync_command: str = "", modprobe_command: str = "", + read_only: bool = False, ) -> None: assert os.path.isfile(file_name) @@ -241,6 +248,7 @@ def __init__( disk_format=disk_format, qemu_nbd_command=qemu_nbd_command, modprobe_command=modprobe_command, + read_only=read_only, ) assert device @@ -255,16 +263,18 @@ def __init__( def __enter__(self) -> typing.Any: return self - def __exit__(self, exc_type, exc_val, exc_tb) -> None: + def __exit__( + self, exc_type: typing.Any, exc_val: typing.Any, exc_tb: typing.Any + ) -> None: self.close() def close(self) -> None: - if self._device: + if self.device(): run( self._sync_command or "/usr/bin/sync" ) # make sure changes are synced to disk! self._delete_nbd_block_device(self._device, self._qemu_nbd_command) - self._device = "" + super().close() def device(self, partition: typing.Optional[int] = None) -> str: if partition is None: @@ -287,6 +297,7 @@ def _create_nbd_block_device( qemu_nbd_command: str = "", nbd_client_command: str = "", modprobe_command: str = "", + read_only: bool = False, ) -> typing.Optional[str]: assert _is_root() assert os.path.isfile(file_name) @@ -295,6 +306,9 @@ def _create_nbd_block_device( trace("Loading nbd kernel module...") run(modprobe_command or "/usr/bin/modprobe", "nbd") + assert is_block_device(_nbd_device(0)) + debug("nbd kernel module is installed and ready.") + nbd_count = _get_max_nbd_count() for counter in range(nbd_count): @@ -308,12 +322,18 @@ def _create_nbd_block_device( trace("{} is in use, skipping".format(device)) continue + args = [ + "--connect={}".format(device), + "--format={}".format(disk_format), + file_name, + ] + if read_only: + args.append("-r") + try: result = run( qemu_nbd_command or "/usr/bin/qemu-nbd", - "--connect={}".format(device), - "--format={}".format(disk_format), - file_name, + *args, returncode=None, timeout=5, stdout="/dev/null", @@ -366,15 +386,15 @@ def __init__( self._flock_command = flock_command or "/usr/bin/flock" self._sfdisk_command = sfdisk_command or "/usr/bin/sfdisk" self._device = device - self._data = None # type: typing.Optional[Disk] + self._data: typing.Optional[Disk] = None self._get_partition_data() @staticmethod def swap_partition( *, - start: typing.Optional[str] = None, - size: typing.Any = "4G", + start: typing.Optional[int] = None, + size: int = byte_size("4G"), name: str = "swap partition", ) -> Partition: return Partition( @@ -389,7 +409,7 @@ def swap_partition( @staticmethod def efi_partition( - *, start: typing.Optional[str] = None, size: typing.Any = "512M" + *, start: typing.Optional[int] = None, size: int = byte_size("512M") ) -> Partition: return Partition( node=None, @@ -404,8 +424,8 @@ def efi_partition( @staticmethod def data_partition( *, - start: typing.Optional[str] = None, - size: typing.Any = None, + start: typing.Optional[int] = None, + size: typing.Optional[int] = None, partition_type: str = "2d212206-b0ee-482e-9fec-e7c208bef27a", partition_uuid: str = "", name: str, diff --git a/cleanroom/helper/file.py b/cleanroom/helper/file.py index 8ca2593..2af3914 100644 --- a/cleanroom/helper/file.py +++ b/cleanroom/helper/file.py @@ -50,11 +50,11 @@ def expand_files( """ def func(f: str): - return ( - file_name(system_context, f) - if system_context - else os.path.join(os.getcwd(), f) - ) + if system_context: + return file_name(system_context, f) + if os.path.isabs(f): + return f + return os.path.join(os.getcwd(), f) to_iterate = map(func, files) @@ -68,7 +68,7 @@ def func(f: str): def _check_file( system_context: SystemContext, f: str, - op: typing.Callable, + op: typing.Callable[..., bool], description: str, work_directory: typing.Optional[str] = None, ) -> bool: @@ -321,7 +321,7 @@ def prepend_file( def _file_op( system_context: typing.Optional[SystemContext], - op: typing.Callable, + op: typing.Callable[..., None], description: str, *args: str, to_outside: bool = False, diff --git a/cleanroom/helper/group.py b/cleanroom/helper/group.py index dfeb823..9c86729 100644 --- a/cleanroom/helper/group.py +++ b/cleanroom/helper/group.py @@ -7,12 +7,15 @@ from .run import run -import collections -import os.path +import os import typing -Group = collections.namedtuple("Group", ["name", "password", "gid", "members"]) +class Group(typing.NamedTuple): + name: str + password: str + gid: int + members: typing.List[str] def _group_data(group_file: str, name: str) -> typing.Optional[Group]: @@ -23,7 +26,7 @@ def _group_data(group_file: str, name: str) -> typing.Optional[Group]: for line in group: if line.endswith("\n"): line = line[:-1] - current_group = line.split(":") # type: typing.Any + current_group: typing.Any = line.split(":") if current_group[0] == name: current_group[2] = int(current_group[2]) if current_group[3] == "": diff --git a/cleanroom/helper/mount.py b/cleanroom/helper/mount.py index 06f3d0e..d3e45c2 100644 --- a/cleanroom/helper/mount.py +++ b/cleanroom/helper/mount.py @@ -9,7 +9,7 @@ from .run import run import re -import os.path +import os import stat import typing @@ -54,7 +54,7 @@ def mount_points( def umount(directory: str, chroot: typing.Optional[str] = None) -> None: - """Unmount a directory.""" + """Umount a directory.""" assert len(mount_points(directory)) == 1 run("/usr/bin/umount", _map_into_chroot(directory, chroot)) @@ -63,7 +63,7 @@ def umount(directory: str, chroot: typing.Optional[str] = None) -> None: def umount_all(directory: str, chroot: typing.Optional[str] = None) -> bool: - """Unmount all mount points below a directory.""" + """Umount all mount points below a directory.""" sub_mounts = mount_points(directory, chroot=chroot) if sub_mounts: @@ -168,7 +168,9 @@ def __enter__(self) -> typing.Any: ) return self._directory - def __exit__(self, exc_type, exc_val, exc_tb) -> None: + def __exit__( + self, exc_type: typing.Any, exc_val: typing.Any, exc_tb: typing.Any + ) -> None: if self._fallback_cwd: os.chdir(self._fallback_cwd) umount(self._directory) diff --git a/cleanroom/helper/run.py b/cleanroom/helper/run.py index fc899d8..a7ab168 100644 --- a/cleanroom/helper/run.py +++ b/cleanroom/helper/run.py @@ -19,15 +19,15 @@ def _quote_args(*args: str) -> str: def report_completed_process( - channel: typing.Optional[typing.Callable], + channel: typing.Optional[typing.Callable[..., None]], completed_process: subprocess.CompletedProcess, ) -> None: """Report the completion state of an external command.""" if channel is None: return - stdout = "" - stderr = stdout + stdout: str = "" + stderr: str = stdout if completed_process.stdout is not None: stdout = completed_process.stdout @@ -45,7 +45,7 @@ def run( *args: str, returncode: typing.Optional[int] = 0, work_directory: typing.Optional[str] = None, - trace_output: typing.Optional[typing.Callable] = trace, + trace_output: typing.Optional[typing.Callable[..., None]] = trace, chroot: typing.Optional[str] = None, shell: bool = False, stdout: typing.Optional[str] = None, @@ -122,7 +122,7 @@ def run( def _report_output_lines( - channel: typing.Callable, headline: str, line_data: str + channel: typing.Callable[..., None], headline: str, line_data: str ) -> None: """Pretty-print output lines.""" channel(headline) diff --git a/cleanroom/helper/user.py b/cleanroom/helper/user.py index 52114bc..16d70a8 100644 --- a/cleanroom/helper/user.py +++ b/cleanroom/helper/user.py @@ -7,18 +7,21 @@ from .run import run -import collections -import os.path +import os import typing -User = collections.namedtuple( - "User", ["name", "password", "uid", "gid", "comment", "home", "shell"] -) +class User(typing.NamedTuple): + name: str + password: str + uid: int + gid: int + comment: str + home: str + shell: str def _user_data(passwd_file: str, name: str) -> typing.Optional[User]: - assert isinstance(name, str) if not os.path.isfile(passwd_file): return None with open(passwd_file, "r") as passwd: @@ -89,19 +92,19 @@ def useradd( def usermod( self, - user_name, + user_name: str, *, - comment="", - home="", - gid=-1, - uid=-1, - lock=None, - rename="", - shell="", - append=False, - groups="", - password="", - expire=None, + comment: str = "", + home: str = "", + gid: int = -1, + uid: int = -1, + lock: typing.Optional[bool] = None, + rename: str = "", + shell: str = "", + append: bool = False, + groups: str = "", + password: str = "", + expire: typing.Optional[str] = None, root_directory: str ) -> bool: """Modify an existing user.""" @@ -122,7 +125,7 @@ def usermod( if lock is not None: if lock: command.append("--lock") - elif not lock: + else: command.append("--unlock") if expire is not None: diff --git a/cleanroom/imager.py b/cleanroom/imager.py index ab7505b..5e63dc5 100644 --- a/cleanroom/imager.py +++ b/cleanroom/imager.py @@ -15,7 +15,6 @@ from .helper.run import run as helper_run from .printer import info, debug, fail, success, trace, verbose -import collections import os import shutil import tempfile @@ -27,20 +26,25 @@ class DataProvider: def write_root_partition(self, target_device: str): - assert False + pass def write_verity_partition(self, target_device: str): - assert False + pass - def has_linux_kernel(self): - assert False + def has_linux_kernel(self) -> bool: + return False def write_linux_kernel(self, target_directory: str): - assert False + pass class FileDataProvider(DataProvider): - def __init__(self, root_partition, verity_partition, kernel_file): + def __init__( + self, + root_partition: str, + verity_partition: str, + kernel_file: typing.Optional[str], + ): self._root_partition = root_partition self._verity_partition = verity_partition self._kernel_file = kernel_file @@ -51,8 +55,8 @@ def write_root_partition(self, target_device: str): def write_verity_partition(self, target_device: str): shutil.copyfile(self._verity_partition, target_device) - def has_linux_kernel(self): - return self._kernel_file + def has_linux_kernel(self) -> bool: + return self._kernel_file != "" def write_linux_kernel(self, target_directory: str): if self._kernel_file: @@ -62,42 +66,40 @@ def write_linux_kernel(self, target_directory: str): ) -ExtraPartition = collections.namedtuple( - "ExtraPartition", ["size", "filesystem", "label", "contents"] -) -ImageConfig = collections.namedtuple( - "ImageConfig", - [ - "path", - "disk_format", - "force", - "repartition", - "efi_size", - "swap_size", - "extra_partitions", - ], -) -RawImageConfig = collections.namedtuple( - "RawImageConfig", - [ - "path", - "disk_format", - "force", - "repartition", - "min_device_size", - "efi_size", - "root_size", - "verity_size", - "swap_size", - "root_hash", - "efi_label", - "root_label", - "verity_label", - "swap_label", - "extra_partitions", - "writer", - ], -) +class ExtraPartition(typing.NamedTuple): + size: int + filesystem: str + label: str + contents: str + + +class ImageConfig(typing.NamedTuple): + path: str + disk_format: str + force: bool + repartition: bool + efi_size: int + swap_size: int + extra_partitions: typing.List[ExtraPartition] + + +class RawImageConfig(typing.NamedTuple): + path: str + disk_format: str + force: bool + repartition: bool + min_device_size: int + efi_size: int + root_size: int + verity_size: int + swap_size: int + root_hash: str + efi_label: typing.Optional[str] + root_label: typing.Optional[str] + verity_label: typing.Optional[str] + swap_label: typing.Optional[str] + extra_partitions: typing.List[ExtraPartition] + writer: DataProvider def _minimum_efi_size(kernel_size: int) -> int: @@ -154,7 +156,7 @@ def _parse_extra_partition_value(value: str) -> typing.Optional[ExtraPartition]: def parse_extra_partitions( extra_partition_data: typing.List[str], ) -> typing.List[ExtraPartition]: - result = [] # type: typing.List[ExtraPartition] + result: typing.List[ExtraPartition] = [] for ep in extra_partition_data: parsed_ep = _parse_extra_partition_value(ep) if ep else None if parsed_ep: @@ -171,7 +173,7 @@ def _file_size(file_name: str) -> int: def _get_tree_size(start_path: str) -> int: total_size = 0 - for dirpath, dirnames, filenames in os.walk(start_path): + for dirpath, _, filenames in os.walk(start_path): for f in filenames: fp = os.path.join(dirpath, f) # skip if it is symbolic link @@ -294,8 +296,8 @@ def _work_on_device_node( partition_devices = _repartition( device, ic.repartition, - ic.root_label, - ic.verity_label, + ic.root_label if ic.root_label else "", + ic.verity_label if ic.verity_label else "", efi_size=ic.efi_size, root_size=ic.root_size, verity_size=ic.verity_size, @@ -357,7 +359,7 @@ def _find_or_create_device_node( nbd_client_command: str, sync_command: str, modprobe_command: str, -) -> typing.ContextManager: +) -> disk.Device: if disk.is_block_device(path): _validate_size_of_block_device(path, min_device_size) return disk.Device(path) @@ -418,7 +420,7 @@ def _repartition( root_size: int, verity_size: int, swap_size: int = 0, - extra_partitions: typing.Tuple[ExtraPartition, ...] = (), + extra_partitions: typing.List[ExtraPartition] = [], flock_command: str, sfdisk_command: str, ) -> typing.Mapping[str, str]: @@ -442,7 +444,7 @@ def _repartition( trace("Setting basic partitions") partitions = [ - partitioner.efi_partition(start="1m", size=efi_size), + partitioner.efi_partition(start=disk.byte_size("1m"), size=efi_size), partitioner.data_partition( size=root_size, name=root_label, @@ -610,7 +612,7 @@ def _prepare_efi_partition( efi_dev: str, root_dev: str, has_kernel: bool, - kernel_file_writer, + kernel_file_writer: typing.Callable[..., None], *, efi_emulator: typing.Optional[str], extra_efi_files: typing.Optional[str], @@ -682,10 +684,6 @@ def _prepare_efi_partition( helper_run("/usr/bin/sync") # make sure changes are synced to disk! -def _file_to_partition(device: str, file_name: str) -> None: - shutil.copyfile(file_name, device) - - def _format_partition(device: str, filesystem: str, *label_args: str) -> None: helper_run("/usr/bin/mkfs.{}".format(filesystem), *label_args, device) @@ -703,7 +701,7 @@ def _prepare_extra_partition( verbose("Preparing extra partition on {} using {}.".format(device, filesystem)) - label_args = () # type: typing.Tuple[str, ...] + label_args: typing.Tuple[str, ...] = () if label is not None: debug('... setting label to "{}".'.format(label)) if filesystem == "fat" or filesystem == "vfat": diff --git a/cleanroom/location.py b/cleanroom/location.py index a993767..e4b435a 100644 --- a/cleanroom/location.py +++ b/cleanroom/location.py @@ -19,7 +19,7 @@ def __init__( file_name: typing.Optional[str] = None, line_number: typing.Optional[int] = None, description: typing.Optional[str] = None, - parent: Location = None + parent: typing.Optional[Location] = None ) -> None: """Constructor.""" if line_number is not None: diff --git a/cleanroom/preflight.py b/cleanroom/preflight.py index 3645999..178ebee 100644 --- a/cleanroom/preflight.py +++ b/cleanroom/preflight.py @@ -6,14 +6,14 @@ from .exceptions import PreflightError -from .printer import debug, fail, h2, success, warn +from .printer import debug, fail, success import os import typing def preflight_check( - title: str, func: typing.Callable[[], None], *, ignore_errors=False + title: str, func: typing.Callable[[], None], *, ignore_errors: bool = False ) -> None: try: func() diff --git a/cleanroom/printer.py b/cleanroom/printer.py index 9230fd3..e9e4d9e 100644 --- a/cleanroom/printer.py +++ b/cleanroom/printer.py @@ -173,7 +173,7 @@ def _print(self, *args: str, verbosity: int = 0) -> None: def _print_at_verbosity_level(self, verbosity: int) -> bool: return verbosity <= self._verbose - def h1(self, *args, verbosity: int = 0) -> None: + def h1(self, *args: str, verbosity: int = 0) -> None: """Print big headline.""" intro = "\n\n{}============================================{}".format( self._h1_suffix, self._ansi_reset diff --git a/cleanroom/systemcontext.py b/cleanroom/systemcontext.py index d0bb7af..47e545c 100644 --- a/cleanroom/systemcontext.py +++ b/cleanroom/systemcontext.py @@ -104,7 +104,7 @@ def __enter__(self) -> typing.Any: h2("Creating system {}".format(self._system_name)) return self - def __exit__(self, exc_type, exc_val, exc_tb): + def __exit__(self, exc_type: typing.Any, exc_val: typing.Any, exc_tb: typing.Any): pass @property @@ -119,6 +119,13 @@ def repository_base_directory(self) -> str: def system_name(self) -> str: return self._system_name + @property + def pretty_system_name(self) -> str: + name = self._system_name + if name.startswith("system-"): + name = name[7:] + return name + @property def systems_definition_directory(self) -> str: return self._systems_definition_directory @@ -184,6 +191,7 @@ def _setup_core_substitutions(self) -> None: ) self.set_substitution("SYSTEM_HELPER_DIR", self.system_helper_directory) self.set_substitution("SYSTEM_NAME", self.system_name) + self.set_substitution("PRETTY_SYSTEM_NAME", self.pretty_system_name) ts = "unknown" if self.timestamp is None else self.timestamp self.set_substitution("TIMESTAMP", ts) diff --git a/cleanroom/systemsmanager.py b/cleanroom/systemsmanager.py index 51d1f44..b2f70d8 100644 --- a/cleanroom/systemsmanager.py +++ b/cleanroom/systemsmanager.py @@ -150,7 +150,7 @@ def _add_system(self, system_name: str) -> typing.Optional[_DependencyNode]: location = Location( file_name="", line_number=1, description="System setup" ) - exec_obj_list.append(ExecObject(location.next_line(), "_teardown", [], {})) + exec_obj_list.append(ExecObject(location.next_line(), "_teardown", (), {})) debug('"{}" depends on "{}"'.format(system_name, base_system_name)) diff --git a/cleanroom/workdir.py b/cleanroom/workdir.py index 3d28505..3b87520 100644 --- a/cleanroom/workdir.py +++ b/cleanroom/workdir.py @@ -59,7 +59,7 @@ def __init__( """Constructor.""" self._btrfs_helper = btrfs_helper self._work_directory = work_directory - self._temp_directory: typing.Optional[tempfile.TemporaryDirectory] = None + self._temp_directory: typing.Optional[tempfile.TemporaryDirectory[str]] = None if work_directory: if not os.path.exists(work_directory): @@ -76,7 +76,7 @@ def __init__( trace('Using existing work directory in "{}".'.format(work_directory)) if not umount_all(work_directory): raise PreflightError( - "Failed to unmount all in work " + "Failed to umount all in work " 'directory "{}".'.format(work_directory) ) if clear_scratch_directory: @@ -102,7 +102,7 @@ def __enter__(self) -> typing.Any: self._temp_directory.__enter__() return self - def __exit__(self, exc_type, exc_val, exc_tb): + def __exit__(self, exc_type: typing.Any, exc_val: typing.Any, exc_tb: typing.Any): """Exit a context.""" if self._temp_directory: tmp_directory = self._temp_directory diff --git a/examples/README.md b/examples/README.md index 87f9eea..5153712 100644 --- a/examples/README.md +++ b/examples/README.md @@ -92,7 +92,7 @@ or make sure that root can start UI applications for this to work: export BORG_PASSPHRASE=foobar "${CLRM_BASE}/firestarter" \ --repository="${BASE_DIR}/borg_repository \ - system-example qemu_boot + system-example qemu-image ``` Log in as root user using password root1234 diff --git a/examples/system-example.def b/examples/system-example.def index 754370d..81be6ff 100644 --- a/examples/system-example.def +++ b/examples/system-example.def @@ -5,8 +5,6 @@ based_on type-server set_hostname server pretty=Server set_machine_id cccccccccccccccccccccccccccccccc -add_partition - # pkg_amd_cpu add_partition 00_esp device=disk0 type=esp minSize=100M maxSize=100M uuid=aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa diff --git a/examples/type-base.def b/examples/type-base.def index adc6b5a..6fd9855 100644 --- a/examples/type-base.def +++ b/examples/type-base.def @@ -10,7 +10,7 @@ based_on scratch ### # ROOT: path to the systems filesystem root # Set up some more substitutions: -set KERNEL_CMDLINE 'quiet' +set KERNEL_CMDLINE 'debug' # sed and gzip are needed for locale-gen # which is needed by some of the pacman hooks @@ -54,6 +54,7 @@ remove /usr/lib/systemd/system/timers.target.wants/shadow.timer ensure_no_unused_shell_files ensure_hwdb ensure_ldconfig +ensure_depmod ensure_no_kernel_install ensure_no_update_service ensure_no_sysusers