diff --git a/pyplugins/hyperfile/__init__.py b/pyplugins/hyperfile/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/pyplugins/hyperfile/devfs.py b/pyplugins/hyperfile/devfs.py new file mode 100644 index 000000000..15724d601 --- /dev/null +++ b/pyplugins/hyperfile/devfs.py @@ -0,0 +1,187 @@ +from penguin import Plugin, plugins +from hyper.portal import PortalCmd +from hyper.consts import HYPER_OP as hop +from typing import List, Dict, Generator, Optional, Tuple +from hyperfile.models.base import DevFile + +class Devfs(Plugin): + def __init__(self): + self.outdir = self.get_arg("outdir") + self.proj_dir = self.get_arg("proj_dir") + self._pending_devfs: List[Tuple[str, DevFile, int, int]] = [] + self._devfs: Dict[str, DevFile] = {} + + # Cache for directory IDs (path -> id). Root "" is ID 0. + self._dev_dirs: Dict[str, int] = {"": 0} + + plugins.portal.register_interrupt_handler( + "devfs", self._hyperdevfs_interrupt_handler) + + def _get_overridden_methods(self, devfs_file: DevFile) -> Dict[str, callable]: + base = DevFile + overridden = {} + for name in [ + "open", "read", "read_iter", "write", "write_iter", "lseek", "release", "poll", + "ioctl", "compat_ioctl", "mmap", "get_unmapped_area", + "flush", "fsync", "fasync", "lock" + ]: + meth = getattr(devfs_file, name, None) + base_meth = getattr(base, name, None) + # Check if method is overridden (different code object) + if ( + meth is not None and base_meth is not None + and hasattr(meth, "__code__") and hasattr(base_meth, "__code__") + and meth.__code__ is not base_meth.__code__ + ): + overridden[name] = meth + return overridden + + def _make_ops_struct(self, devfs_file: DevFile): + kffi = plugins.kffi + overridden = self._get_overridden_methods(devfs_file) + + # Build the initialization dictionary dynamically + init_data = {} + for name, fn in overridden.items(): + init_data[name] = yield from kffi.callback(fn) + + return kffi.new("struct igloo_dev_ops", init_data) + + def register_devfs(self, devfs_file: DevFile, path: Optional[str] = None, major: Optional[int] = None, minor: Optional[int] = None): + if path: + fname = path + else: + fname = getattr(devfs_file, "PATH", None) + devfs_file.PATH = fname + + if not fname: + raise ValueError("DevFile must define PATH or define it in register_devfs") + + major_num = major if major is not None else getattr(devfs_file, "MAJOR", -1) + minor_num = minor if minor is not None else getattr(devfs_file, "MINOR", 0) + + if fname.startswith("/dev/"): + fname = fname[len("/dev/"):] # Remove leading /dev/ + + # Deduplicate registration + if fname not in self._devfs and devfs_file not in [f for _, f, _, _ in self._pending_devfs]: + plugins.portal.queue_interrupt("devfs") + self._pending_devfs.append((fname, devfs_file, major_num, minor_num)) + + self._devfs[fname] = devfs_file + + def _split_dev_path(self, path: str) -> Tuple[str, str]: + """ + Splits 'a/b/c' into ('a/b', 'c'). Returns ('', 'c') if no slashes. + """ + path = path.strip("/") + if "/" in path: + parent, fname = path.rsplit("/", 1) + return parent, fname + else: + return "", path + + def _get_or_create_dev_dir(self, dir_path: str) -> Generator[int, None, int]: + """ + Recursively creates directories via portal and returns the ID of the final directory. + """ + parts = [p for p in dir_path.strip("/").split("/") if p] + if not parts: + return 0 # Root + + parent_id = 0 + cur_path = "" + + for part in parts: + cur_path = cur_path + "/" + part if cur_path else part + + # Use cache if available + if cur_path in self._dev_dirs: + parent_id = self._dev_dirs[cur_path] + continue + + kffi = plugins.kffi + init_data = { + "name": part.encode("latin-1", errors="ignore"), + "parent_id": parent_id, + "replace": 0 + } + + req = kffi.new("struct portal_devfs_dir_req", init_data) + req_bytes = bytes(req) + + # Ensure HYPER_OP_DEVFS_CREATE_OR_LOOKUP_DIR is defined in your hop consts + result = yield PortalCmd( + hop.HYPER_OP_DEVFS_CREATE_OR_LOOKUP_DIR, + 0, + len(req_bytes), + None, + req_bytes + ) + + if result is None or result < 0: + raise RuntimeError(f"Failed to create/lookup devfs dir '{cur_path}'") + + self._dev_dirs[cur_path] = result + parent_id = result + + return parent_id + + def _register_devfs(self, devfs_list: List[Tuple[str, DevFile, int, int]]) -> Generator[int, None, None]: + for fname, devfs_file, major, minor in devfs_list: + + # 1. Resolve path hierarchy + parent_dir, file_name = self._split_dev_path(fname) + + try: + parent_id = yield from self._get_or_create_dev_dir(parent_dir) + except RuntimeError as e: + self.logger.error(f"Could not register {fname}: {e}") + continue + + # Validate final filename (should be flat now) + if not file_name or "/" in file_name: + self.logger.error(f"Invalid devfs device name after split: '{file_name}'") + continue + + ops = yield from self._make_ops_struct(devfs_file) + kffi = plugins.kffi + + init_data = { + "name": file_name.encode("latin-1", errors="ignore"), + "major": major, + "minor": minor, + "ops": ops, + "replace": 1, + # Dwarffi safely ignores keys that don't exist on the target struct, + # entirely replacing the need for 'hasattr(req, "parent_id")' checks! + "parent_id": parent_id + } + + req = kffi.new("struct portal_devfs_create_req", init_data) + req_bytes = bytes(req) + + result = yield PortalCmd( + hop.HYPER_OP_DEVFS_CREATE_DEVICE, + 0, + len(req_bytes), + None, + req_bytes + ) + + if result == 0 or result is None: + self.logger.error(f"Failed to register devfs device '{fname}' (kernel returned 0)") + continue + + self.logger.info(f"Registered devfs device '{fname}' with kernel") + + def _hyperdevfs_interrupt_handler(self) -> Generator[bool, None, bool]: + if not self._pending_devfs: + return False + + pending = self._pending_devfs[:] + self._pending_devfs.clear() + while pending: + devfs = pending.pop(0) + yield from self._register_devfs([devfs]) + return False \ No newline at end of file diff --git a/pyplugins/hyperfile/models/base.py b/pyplugins/hyperfile/models/base.py new file mode 100644 index 000000000..5ef79edc5 --- /dev/null +++ b/pyplugins/hyperfile/models/base.py @@ -0,0 +1,147 @@ +from wrappers.ptregs_wrap import PtRegsWrapper +from penguin import getColoredLogger + + +class BaseFile: + """ + The root base class for all file types. + Acts as the 'Argument Sink' to prevent object.__init__ failures. + """ + PATH = None + FS = "unknown" + + def __init__(self, *, path: str = None, fs: str = None, **kwargs): + """ + Consumes 'path' and 'fs' arguments. + Swallows any remaining kwargs so object.__init__ doesn't crash. + """ + if path is not None: + self.PATH = path + if fs is not None: + self.FS = fs + + # We do not pass kwargs to super() because object() takes no args. + super().__init__() + + @property + def full_path(self) -> str: + if self.PATH is None: + return "unknown_path" + pth = self.PATH.lstrip("/") + if self.FS == "procfs": + if pth.startswith("/proc/"): + pth = pth[len("/proc/"):] + return f"/proc/{pth}" + elif self.FS == "devfs": + if pth.startswith("/dev/"): + pth = pth[len("/dev/"):] + return f"/dev/{pth}" + elif self.FS == "sysfs": + if pth.startswith("/sys/"): + pth = pth[len("/sys/"):] + return f"/sys/{pth}" + else: + return self.PATH + + @property + def logger(self): + if hasattr(self, "_logger"): + return self._logger + self._logger = getColoredLogger(f"hyperfs.{self.FS}.{self.full_path}") + return self._logger + + +class VFSFile(BaseFile): + """ + Base class defining the VFS interface. + """ + def open(self, ptregs: PtRegsWrapper, inode: int, file: int) -> None: + pass + + def read(self, ptregs: PtRegsWrapper, file: int, user_buf: int, size: int, offset_ptr: int) -> None: + pass + + def read_iter(self, ptregs: PtRegsWrapper, kiocb: int, iov_iter: int) -> None: + pass + + def write(self, ptregs: PtRegsWrapper, file: int, user_buf: int, size: int, offset_ptr: int) -> None: + pass + + def lseek(self, ptregs: PtRegsWrapper, file: int, offset: int, whence: int) -> None: + pass + + def release(self, ptregs: PtRegsWrapper, inode: int, file: int) -> None: + pass + + def poll(self, ptregs: PtRegsWrapper, file: int, poll_table_struct: int) -> None: + pass + + def ioctl(self, ptregs: PtRegsWrapper, file: int, cmd: int, arg: int) -> None: + pass + + def compat_ioctl(self, ptregs: PtRegsWrapper, file: int, cmd: int, arg: int) -> None: + pass + + def mmap(self, ptregs: PtRegsWrapper, file: int, vm_area_struct: int) -> None: + pass + + def get_unmapped_area(self, ptregs: PtRegsWrapper, file: int, addr: int, len_: int, pgoff: int, flags: int) -> None: + pass + + +class ProcFile(VFSFile): + FS = "procfs" + + +class DevFile(VFSFile): + FS = "devfs" + MAJOR = -1 # -1 for dynamic + MINOR = 0 + + def __init__(self, *, major: int = None, minor: int = None, **kwargs): + if major is not None: + self.MAJOR = major + if minor is not None: + self.MINOR = minor + super().__init__(**kwargs) + + def flush(self, ptregs: PtRegsWrapper, file: int, owner: int) -> None: + pass + + def fsync(self, ptregs: PtRegsWrapper, file: int, start: int, end: int, datasync: int) -> None: + pass + + def fasync(self, ptregs: PtRegsWrapper, fd: int, file: int, on: int) -> None: + pass + + def lock(self, ptregs: PtRegsWrapper, file: int, cmd: int, file_lock: int) -> None: + pass + + +class SysFile(BaseFile): + """ + SysFS nodes usually use show/store rather than raw read/write. + """ + FS = "sysfs" + + def show(self, ptregs: PtRegsWrapper, kobj, attr, buf) -> None: + pass + + def store(self, ptregs: PtRegsWrapper, kobj, attr, buf, count) -> None: + pass + + +class SysfsBridge: + """ + Bridging class that maps SysFS show/store to VFS read/write + so we can use standard Read/Write mixins. + """ + def show(self, ptregs, kobj, attr, buf): + # Create a fake 'user_buf' pointer (actually the kernel buf) + # and call the mixin's read method. + # Note: Sysfs show ignores offset/size usually, just dumping the whole thing. + # We might need to adapt arguments based on your specific read implementation. + yield from self.read(ptregs, file=0, user_buf=buf, size=4096, loff=0) + + def store(self, ptregs, kobj, attr, buf, count): + yield from self.write(ptregs, file=0, user_buf=buf, size=count, loff=0) \ No newline at end of file diff --git a/pyplugins/hyperfile/models/ioctl.py b/pyplugins/hyperfile/models/ioctl.py new file mode 100644 index 000000000..c3ddde37e --- /dev/null +++ b/pyplugins/hyperfile/models/ioctl.py @@ -0,0 +1,143 @@ +from wrappers.ptregs_wrap import PtRegsWrapper +from penguin import plugins +from typing import Union + +class IoctlReturnMixin: + ''' + Base mixin that simply returns a constant integer for any IOCTL. + ''' + def __init__(self, *, ioctl_retval: int = 0, **kwargs): + self.ioctl_retval = ioctl_retval + super().__init__(**kwargs) + + def ioctl(self, ptregs: PtRegsWrapper, file: int, cmd: int, arg: int): + ptregs.set_retval(self.ioctl_retval) + +class IoctlZero(IoctlReturnMixin): + ''' + Always returns 0 (Success). + ''' + def __init__(self, **kwargs): + super().__init__(ioctl_retval=0, **kwargs) + +class IoctlUnhandled(IoctlReturnMixin): + ''' + Always returns -25 (-ENOTTY: Inappropriate ioctl for device). + ''' + def __init__(self, **kwargs): + super().__init__(ioctl_retval=-25, **kwargs) + +class IoctlWriteDataArg: + ''' + Writes a constant buffer to the address pointed to by 'arg'. + Returns 0 after writing. + ''' + def __init__(self, *, ioctl_data: Union[bytes, int, str] = b"", retval: int = 0, **kwargs): + if isinstance(ioctl_data, str): + ioctl_data = ioctl_data.encode("utf-8") + self.ioctl_data = ioctl_data + self.retval = retval + super().__init__(**kwargs) + + def ioctl(self, ptregs: PtRegsWrapper, file: int, cmd: int, arg: int): + # Only write if we have data and a valid pointer + if self.ioctl_data and arg != 0: + if isinstance(self.ioctl_data, int): + yield from plugins.mem.write_int(arg, self.ioctl_data) + else: + yield from plugins.mem.write_bytes(arg, self.ioctl_data) + + # Standard success return + ptregs.set_retval(self.retval) + + +class IoctlExternalVFS: + """Modern VFS Ioctl Adapter""" + def __init__(self, *, ioctl_plugin: str = None, ioctl_function: str = "ioctl", **kwargs): + self._func = getattr(getattr(plugins, ioctl_plugin), ioctl_function) + super().__init__(**kwargs) + + def ioctl(self, ptregs, file, cmd, arg): + yield from self._func(ptregs, file, cmd, arg) + + +class IoctlExternalLegacy: + """Legacy Ioctl Adapter""" + def __init__(self, *, ioctl_plugin: str = None, ioctl_function: str = "ioctl", **kwargs): + self._func = getattr(getattr(plugins, ioctl_plugin), ioctl_function) + self._legacy_kwargs = kwargs.copy() + super().__init__(**kwargs) + + def ioctl(self, ptregs, file, cmd, arg): + # Legacy ioctls were often synchronous and returned the value directly + result = self._func(self, self.full_path, cmd, arg, self._legacy_kwargs) + ptregs.set_retval(result if result is not None else 0) + + +class IoctlDispatcher: + """ + The Mixin that DevFile inherits. + It routes the ioctl 'cmd' to the correct Handler. + """ + def __init__(self, *, ioctl_handlers: dict = None, **kwargs): + self.ioctl_handlers = ioctl_handlers or {} + super().__init__(**kwargs) + + def ioctl(self, ptregs: PtRegsWrapper, file: int, cmd: int, arg: int): + # 1. Try exact match + handler = self.ioctl_handlers.get(cmd) + + # 2. Try string match (sometimes yaml parses numbers as strings) + if handler is None: + handler = self.ioctl_handlers.get(str(cmd)) + + # 3. Try wildcard + if handler is None: + handler = self.ioctl_handlers.get("*") + # 4. Dispatch or Fail + if handler: + # We pass 'self' so handlers can access file attributes if needed + yield from handler.handle(self, ptregs, file, cmd, arg) + else: + # Default error for unhandled ioctl + ptregs.set_retval(-25) # -ENOTTY + +class IoctlHandlerBase: + def handle(self, file_obj, ptregs, file, cmd, arg): + raise NotImplementedError + +class IoctlReturnConst(IoctlHandlerBase): + """Returns a static constant.""" + def __init__(self, val): + self.val = val + + def handle(self, file_obj, ptregs, file, cmd, arg): + ptregs.set_retval(self.val) + yield from [] # Ensure it's a generator + +class IoctlPluginVFS(IoctlHandlerBase): + """Calls a modern VFS plugin function.""" + def __init__(self, plugin_name, func_name): + self.func = getattr(getattr(plugins, plugin_name), func_name) + + def handle(self, file_obj, ptregs, file, cmd, arg): + yield from self.func(ptregs, file, cmd, arg) + +class IoctlPluginLegacy(IoctlHandlerBase): + """Calls a legacy plugin function (synchronous/complex return).""" + def __init__(self, plugin_name, func_name, extra_kwargs): + self.func = getattr(getattr(plugins, plugin_name), func_name) + self.extra_kwargs = extra_kwargs + + def handle(self, file_obj, ptregs, file, cmd, arg): + # Legacy signature often expects (self, filename, cmd, arg, details) + # We pass file_obj as 'self' to the plugin + result = self.func( + file_obj, + file_obj.full_path, + cmd, + arg, + self.extra_kwargs + ) + ptregs.set_retval(result if result is not None else 0) + yield from [] # Ensure it's a generator \ No newline at end of file diff --git a/pyplugins/hyperfile/models/read.py b/pyplugins/hyperfile/models/read.py new file mode 100644 index 000000000..db2024251 --- /dev/null +++ b/pyplugins/hyperfile/models/read.py @@ -0,0 +1,293 @@ +from typing import Union +from penguin import plugins +from wrappers.ptregs_wrap import PtRegsWrapper +import os + +class ReadBufWrapper: + ''' + The Logic Mixin: Consumes 'buffer' to set up the buffer. + ''' + def __init__(self, *, buffer: Union[bytes, str] = None, cycle: bool = False, **kwargs): + self._cycle = cycle + if buffer is not None: + self._data = buffer + else: + # If buffer is None, default to empty bytes instead of raising an exception immediately, + # unless you strictly require it. This handles cases where a mixin chain might + # initialize it differently, or if we want a safe default. + # However, based on your request for "const_buf", we'll initialize it safely. + self._data = b"" + + super().__init__(**kwargs) + + def read(self, ptregs: PtRegsWrapper, file: int, user_buf: int, size: int, loff: int): + """ + Reads data once, respecting offset and size. + Returns 0 if offset is beyond the data. + """ + if isinstance(self._data, bytes): + data_bytes = self._data + else: + data_bytes = self._data.encode("utf-8") + data_len = len(data_bytes) + offset = yield from plugins.mem.read_int(loff) + + # Check for cycling + cycle = getattr(self, "_cycle", False) + + if size <= 0 or offset < 0 or (not cycle and offset >= data_len): + ptregs.set_retval(0) + return + + if not cycle: + chunk = min(size, data_len - offset) + yield from plugins.mem.write_bytes(user_buf, data_bytes[offset:offset + chunk]) + yield from plugins.mem.write_int(loff, offset + chunk) + ptregs.set_retval(chunk) + else: + # Cycle: repeat buffer forever, write the requested size in one go + if data_len == 0: + ptregs.set_retval(0) + return + pos = offset % data_len + # Build the output by repeating the buffer as needed + full_repeats = (size + data_len - 1 - pos) // data_len + end_pos = (pos + size) % data_len + if end_pos > pos: + chunk = data_bytes[pos:end_pos] + else: + chunk = data_bytes[pos:] + data_bytes * (full_repeats - 1) + data_bytes[:end_pos] + chunk = chunk[:size] # Ensure exact size + yield from plugins.mem.write_bytes(user_buf, chunk) + yield from plugins.mem.write_int(loff, offset + size) + ptregs.set_retval(size) + + +class ReadConstBuf(ReadBufWrapper): + ''' + The Translator: Takes 'buffer' or 'const_buf' + ''' + def __init__(self, *, const_buf: str = None, buffer: str = None, **kwargs): + self.cycle = False + # Support both argument names + buf = const_buf if const_buf is not None else buffer + super().__init__(buffer=buf, **kwargs) + + +class ReadEmpty(ReadBufWrapper): + ''' + The Preset: Hardcodes the data. + ''' + def __init__(self, **kwargs): + # Just inject the hardcoded value + super().__init__(buffer="", **kwargs) + +class ReadZero(ReadBufWrapper): + ''' + The Preset: Hardcodes the data. + ''' + def __init__(self, **kwargs): + # Just inject the hardcoded value + super().__init__(buffer="0", **kwargs) + +class ReadDefault: + def __init__(self, **kwargs): + super().__init__(**kwargs) + + def read(self, ptregs: PtRegsWrapper, file: int, user_buf: int, size: int, loff: int): + ptregs.set_retval(-22) + +class ReadFromFile: + ''' + The Loader: Takes 'read_filepath' or 'filename', loads it. + ''' + def __init__(self, *, read_filepath: str = None, filename: str = None, **kwargs): + self.filename = read_filepath if read_filepath is not None else filename + super().__init__(**kwargs) + + def read(self, ptregs: PtRegsWrapper, file: int, user_buf: int, size: int, loff: int): + offset = yield from plugins.mem.read_int(loff) + fname = self.filename + if fname is None: + ptregs.set_retval(0) + return + if not os.path.isabs(fname): + # Paths are relative to cwd or caller will resolve relative path + fpath = fname + else: + fpath = fname + try: + with open(fpath, "rb") as f: + f.seek(offset) + chunk = f.read(size) + except Exception: + chunk = b"" + yield from plugins.mem.write_bytes(user_buf, chunk) + yield from plugins.mem.write_int(loff, offset + len(chunk)) + ptregs.set_retval(len(chunk)) + +class ReadConstMap(ReadBufWrapper): + ''' + Reads a sparse map of offsets to values, with optional padding and size. + ''' + def __init__(self, *, vals=None, pad: Union[str, int, bytes]=b"\x00", size: int=0x10000, **kwargs): + self.vals = vals or {} + # Normalize pad to bytes + if isinstance(pad, str): + self.pad = pad.encode() + elif isinstance(pad, int): + self.pad = bytes([pad]) + else: + self.pad = pad + self.size = size + # Render the buffer once at init + data = self._render_file() + super().__init__(buffer=data, **kwargs) + + def _render_file(self): + # sort vals dict by key, lowest to highest + vals = { + k: v for k, v in sorted(self.vals.items(), key=lambda item: item[0]) + } + data = b"" + for off, val in vals.items(): + # Accept str, bytes, list[int], or list[str] + if isinstance(val, str): + val = val.encode() + elif isinstance(val, list): + if not len(val): + continue + first_val = val[0] + if isinstance(first_val, int): + val = bytes(val) + elif isinstance(first_val, str): + val = b"\x00".join([x.encode() for x in val]) + else: + raise ValueError("const_map: list values must be int or str") + elif isinstance(val, bytes): + pass + else: + raise ValueError("const_map: vals must be str, bytes, or list") + # Pad before this value, then add the value + data += self.pad * (off - len(data)) + val + # Pad up to size + assert len(data) <= self.size, f"Data is too long: {len(data)} > size {self.size}" + data += self.pad * (self.size - len(data)) + return data + +class ReadConstMapFile(ReadConstMap): + ''' + Like ReadConstMap, but persists the buffer to a file and reads from it. + ''' + def __init__(self, *, filename, vals=None, pad: Union[str, int, bytes]=b"\x00", size: int=0x10000, **kwargs): + self.filename = filename + self.vals = vals or {} + # Normalize pad to bytes + if isinstance(pad, str): + self.pad = pad.encode() + elif isinstance(pad, int): + self.pad = bytes([pad]) + else: + self.pad = pad + self.size = size + # Create file if it doesn't exist + if not os.path.isabs(self.filename): + # Assume cwd or caller will resolve relative path + fpath = self.filename + else: + fpath = self.filename + if not os.path.isfile(fpath): + data = self._render_file() + with open(fpath, "wb") as f: + f.write(data) + super().__init__(vals=vals, pad=pad, size=size, **kwargs) + + def read(self, ptregs: PtRegsWrapper, file: int, user_buf: int, size: int, loff: int): + offset = yield from plugins.mem.read_int(loff) + # Read from file + with open(self.filename, "rb") as f: + f.seek(offset) + chunk = f.read(size) + yield from plugins.mem.write_bytes(user_buf, chunk) + yield from plugins.mem.write_int(loff, offset + len(chunk)) + ptregs.set_retval(len(chunk)) + +class ReadCycle(ReadBufWrapper): + ''' + Like ReadBufWrapper, but cycles the buffer forever. + ''' + def __init__(self, *, buffer: Union[bytes, str] = None, **kwargs): + super().__init__(buffer=buffer, cycle=True, **kwargs) + +class ReadZeroCycle(ReadCycle): + ''' + Cycles "0" forever. + ''' + def __init__(self, **kwargs): + super().__init__(buffer="0", **kwargs) + +class ReadOneCycle(ReadCycle): + ''' + Cycles "1" forever. + ''' + def __init__(self, **kwargs): + super().__init__(buffer="1", **kwargs) + +class ReadConstBufCycle(ReadCycle): + ''' + Cycles a constant buffer forever. + ''' + def __init__(self, *, buffer: str = None, **kwargs): + super().__init__(buffer=buffer, **kwargs) + +class ReadExternalVFS: + """ + Modern Adapter: Calls a plugin function with the standard VFS signature. + func(ptregs, file, user_buf, size, loff) -> Generator + """ + def __init__(self, *, read_plugin: str = None, read_function: str = "read", **kwargs): + self._func = getattr(getattr(plugins, read_plugin), read_function) + super().__init__(**kwargs) + + def read(self, ptregs: PtRegsWrapper, file: int, user_buf: int, size: int, loff: int): + yield from self._func(ptregs, file, user_buf, size, loff) + +class ReadExternalLegacy: + """ + Legacy Adapter: Adapts the old synchronous/complex return signature to VFS. + func(self, filename, user_buf, size, offset, details=kwargs) -> (data, retval) + """ + def __init__(self, *, read_plugin: str = None, read_function: str = "read", **kwargs): + self._func = getattr(getattr(plugins, read_plugin), read_function) + self._legacy_kwargs = kwargs.copy() # Capture extra args for 'details' + super().__init__(**kwargs) + + def read(self, ptregs: PtRegsWrapper, file: int, user_buf: int, size: int, loff: int): + offset = yield from plugins.mem.read_int(loff) + + # Call the legacy function + # Note: We pass 'self' as the first arg because legacy plugins expected the file instance + # self.full_path comes from BaseFile via composition + filename = getattr(self, "full_path", "unknown") + + val = yield from self._func(self, filename, user_buf, size, offset, details=self._legacy_kwargs) + + # Handle the polymorphic return types of the old system + retval = 0 + write_data = b"" + + if isinstance(val, tuple) and len(val) == 2: + write_data, retval = val + elif isinstance(val, int): + retval = val + elif isinstance(val, (bytes, str)): + write_data = val + retval = len(val) + + if write_data: + if isinstance(write_data, str): + write_data = write_data.encode("utf-8") + yield from plugins.mem.write_bytes(user_buf, write_data) + yield from plugins.mem.write_int(loff, offset + len(write_data)) + + ptregs.set_retval(retval) \ No newline at end of file diff --git a/pyplugins/hyperfile/models/write.py b/pyplugins/hyperfile/models/write.py new file mode 100644 index 000000000..a5ff733dc --- /dev/null +++ b/pyplugins/hyperfile/models/write.py @@ -0,0 +1,171 @@ +from wrappers.ptregs_wrap import PtRegsWrapper +from penguin import plugins +import inspect +from os.path import isabs, join as pjoin + +class WriteDiscard: + ''' + This mixin discards all written data. + ''' + def __init__(self, **kwargs): + # Even though we don't need args, we must pass kwargs up + # in case we are mixed with something that does. + super().__init__(**kwargs) + + def write(self, ptregs: PtRegsWrapper, file: int, user_buf: int, size: int, loff: int) -> None: + """ + Discards all written data. + Always returns the size written. + """ + ptregs.set_retval(size) + +class WriteReturnConst: + ''' + A mixin that returns a constant value on write. + ''' + def __init__(self, *, const: int, **kwargs): + self.const = const + super().__init__(**kwargs) + + def write(self, ptregs: PtRegsWrapper, file: int, user_buf: int, size: int, loff: int) -> None: + ptregs.set_retval(self.const) + + +class WriteUnhandled(WriteReturnConst): + ''' + A mixin that returns -EINVAL on write. + ''' + def __init__(self, **kwargs): + super().__init__(const=-22, **kwargs) + + +class WriteRecord: + ''' + Records all written data into self.written_data. + ''' + def __init__(self, **kwargs): + # Initialize the buffer here instead of doing hasattr checks in the loop. + # Use setdefault in case another mixin touched it, though unlikely. + if not hasattr(self, "written_data"): + self.written_data = b"" + super().__init__(**kwargs) + + def write(self, ptregs: PtRegsWrapper, file: int, user_buf: int, size: int, loff: int) -> None: + """ + Records all written data into self.written_data. + Always returns the size written. + """ + buf = yield from plugins.mem.read_bytes(user_buf, size) + self.written_data += buf + ptregs.set_retval(size) + + +class WriteDefault(WriteRecord): + pass + + +class WriteToFile: + ''' + Writes incoming data to a file on the host. + ''' + def __init__(self, *, write_filepath: str = None, proj_dir: str = None, **kwargs): + self.proj_dir = proj_dir + if not isabs(write_filepath): + # Paths are relative to the project directory, unless absolute + self.write_filepath = pjoin(self.proj_dir, write_filepath) + else: + self.write_filepath = write_filepath + # 2. FORWARD: Pass the rest up. + super().__init__(**kwargs) + + def write(self, ptregs: PtRegsWrapper, file: int, user_buf: int, size: int, loff: int) -> None: + """ + Writes all data to the specified host file. + Always returns the size written. + """ + if not self.write_filepath: + # Fallback if initialized without a path, or return error + ptregs.set_retval(-22) + return + + buf = yield from plugins.mem.read_bytes(user_buf, size) + offset =yield from plugins.mem.read_int(loff) + + with open(self.write_filepath, "wb") as f: + f.seek(offset) + f.write(buf) + + ptregs.set_retval(size) + + +class WriteFromPlugin: + ''' + Calls a function on a plugin to handle the write. + Example usage: + class MyFile(WriteFromPlugin, ...): + def __init__(self): + super().__init__(plugin="myplugin", function="handle_write") + ''' + def __init__(self, *, plugin: str, function: str = "write", **kwargs): + self._kwargs = kwargs + self._kwargs["plugin"] = plugin + self._kwargs["function"] = function + self._plugin_name = plugin + self._plugin_func = function + self._plugin_obj = getattr(plugins, plugin, None) + if self._plugin_obj is None: + raise ValueError(f"WriteFromPlugin: plugin '{plugin}' not found in plugins") + self._func = getattr(self._plugin_obj, self._plugin_func, None) + if self._func is None: + raise ValueError(f"WriteFromPlugin: function '{function}' not found on plugin '{plugin}'") + sig = inspect.signature(self._func) + params = sig.parameters.values() + + required = [ + p for p in params + if p.default is inspect._empty + and p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD) + ] + optional = [ + p for p in params + if p.default is not inspect._empty + and p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD) + ] + self._old_style = (len(required) == 6 and len(optional) == 1) + super().__init__(**kwargs) + + def write(self, ptregs: PtRegsWrapper, file: int, user_buf: int, size: int, loff: int): + buf = yield from plugins.mem.read_bytes(user_buf, size) + if self._old_style: + fname = self.full_path + result = self._func(self, fname, user_buf, size, loff, buf, self._kwargs) + # If the plugin returns a value, use it as retval, else default to size + ptregs.set_retval(result if result is not None else size) + else: + # New style: (self, ptregs, file, user_buf, size, loff) + yield from self._func(ptregs, file, user_buf, size, loff) + +class WriteExternalVFS: + """Modern VFS Write Adapter""" + def __init__(self, *, write_plugin: str = None, write_function: str = "write", **kwargs): + self._func = getattr(getattr(plugins, write_plugin), write_function) + super().__init__(**kwargs) + + def write(self, ptregs, file, user_buf, size, loff): + yield from self._func(ptregs, file, user_buf, size, loff) + +class WriteExternalLegacy: + """Legacy Write Adapter""" + def __init__(self, *, write_plugin: str = None, write_function: str = "write", **kwargs): + self._func = getattr(getattr(plugins, write_plugin), write_function) + self._legacy_kwargs = kwargs.copy() + super().__init__(**kwargs) + + def write(self, ptregs, file, user_buf, size, loff): + # Legacy writes often expected the buffer to be pre-read for them + buf = yield from plugins.mem.read_bytes(user_buf, size) + + result = self._func(self, self.full_path, user_buf, size, loff, buf, self._legacy_kwargs) + + # Legacy plugins usually return the bytes written or an error code + ptregs.set_retval(result if result is not None else size) \ No newline at end of file diff --git a/pyplugins/hyperfile/mtd.py b/pyplugins/hyperfile/mtd.py new file mode 100644 index 000000000..57451f31a --- /dev/null +++ b/pyplugins/hyperfile/mtd.py @@ -0,0 +1,291 @@ +import os +import re +from penguin import Plugin, plugins +from hyper.portal import PortalCmd +from hyper.consts import HYPER_OP as hop + +class MTD(Plugin): + def __init__(self): + self.config = self.get_arg("devices") or {} + self.internal_devices = self._validate_and_build(self.config) + + # Registry for open file handles: { mtd_id: file_object } + self._handles = {} + + # Registry to keep CFFI callbacks alive (prevent Garbage Collection) + self._c_callbacks = [] + self._cb_ptrs = {} # { 'read': int_addr, ... } + + if self.internal_devices: + plugins.portal.register_interrupt_handler( + "mtd", self._mtd_interrupt_handler) + plugins.portal.queue_interrupt("mtd") + + # ------------------------------------------------------------------------- + # Guest -> Host Callbacks + # ------------------------------------------------------------------------- + + def _mtd_read(self, mtd_id, offset, length, buf_ptr): + """Called by Guest Kernel to read data from Host file.""" + f = self._handles.get(mtd_id) + if not f: + return -19 # -ENODEV + + try: + f.seek(offset) + data = f.read(length) + read_len = len(data) + + # Copy data from Python bytes -> Guest Memory Pointer + plugins.kffi.memmove(buf_ptr, data, read_len) + + # If we hit EOF, pad the rest with 0xFF (Simulate Erased Flash) + if read_len < length: + pad_len = length - read_len + pad_ptr = buf_ptr + read_len + pad = b'\xff' * pad_len + plugins.kffi.memmove(pad_ptr, pad, pad_len) + + return 0 # Success + except Exception as e: + self.logger.error(f"MTD Read Error (ID {mtd_id}): {e}") + return -5 # -EIO + + def _mtd_write(self, mtd_id, offset, length, buf_ptr): + """Called by Guest Kernel to write data to Host file.""" + f = self._handles.get(mtd_id) + if not f: + return -19 + + try: + # Read data from Guest Memory Pointer -> Python bytes + # kffi.buffer creates a view, [:] copies it to bytes + data = plugins.kffi.buffer(buf_ptr, length)[:] + + f.seek(offset) + f.write(data) + return 0 + except Exception as e: + self.logger.error(f"MTD Write Error (ID {mtd_id}): {e}") + return -5 + + def _mtd_erase(self, mtd_id, offset, length): + """Called by Guest Kernel to erase a block.""" + f = self._handles.get(mtd_id) + if not f: + return -19 + + try: + # Simulate erase by writing 0xFFs to the file + f.seek(offset) + f.write(b'\xff' * length) + return 0 + except Exception as e: + self.logger.error(f"MTD Erase Error (ID {mtd_id}): {e}") + return -5 + + # ------------------------------------------------------------------------- + # Initialization Logic + # ------------------------------------------------------------------------- + + def _mtd_interrupt_handler(self): + return self._setup_hardware() + + def _setup_hardware(self): + self.logger.info("Initializing MTD Subsystem...") + + # 1. Initialize Callbacks + # We must create these once and store references, otherwise CFFI + # destroys the callback stub and the kernel crashes on invocation. + kffi = plugins.kffi + cb_read = yield from kffi.callback(self._mtd_read) + cb_write = yield from kffi.callback(self._mtd_write) + cb_erase = yield from kffi.callback(self._mtd_erase) + + self._c_callbacks = [cb_read, cb_write, cb_erase] + + # Cache the integer addresses to pass to the struct + self._cb_ptrs = { + 'read': cb_read, + 'write': cb_write, + 'erase': cb_erase + } + + # 2. Scorched Earth (Nuke existing MTDs) + yield from self._cmd_nuke() + + # 3. Create Devices + for dev in self.internal_devices: + yield from self._cmd_create(dev) + + self.logger.info(f"MTD Subsystem Ready. {len(self.internal_devices)} devices created.") + + def _cmd_nuke(self): + kffi = plugins.kffi + req = kffi.new("struct portal_mtd_nuke_req") + req.max_scan_index = 64 + req_bytes = bytes(req) + + result = yield PortalCmd(hop.HYPER_OP_MTD_NUKE, 0, len(req_bytes), None, req_bytes) + + if result is None: + self.logger.error("MTD Nuke command failed (IPC error)") + else: + self.logger.info(f"MTD Nuke complete. Removed {result} devices.") + + def _cmd_create(self, dev): + kffi = plugins.kffi + req = kffi.new("struct portal_mtd_create_req") + + # --- 1. Basic Metadata --- + label_bytes = dev['name'].encode('utf-8')[:63] + for i, b in enumerate(label_bytes): req.label[i] = b + + req.total_size = dev['total_size'] + req.erase_size = dev['geometry']['erase_size'] + req.write_size = dev['geometry']['write_size'] + req.oob_size = dev['geometry']['oob_size'] + req.is_nand = 1 if dev['geometry']['type'] == 'nand' else 0 + + # --- 2. Mode Selection --- + f_handle = None + + if dev['model'] == 'zeros': + req.mode = 0 # RAM Mode + req.cb_read_ptr = 0 + req.cb_write_ptr = 0 + req.cb_erase_ptr = 0 + # Callbacks are ignored in this mode + else: + req.mode = 1 # Callback Mode + req.cb_read_ptr = self._cb_ptrs['read'] + req.cb_write_ptr = self._cb_ptrs['write'] + req.cb_erase_ptr = self._cb_ptrs['erase'] + + # Prepare file handle + try: + # Open with rb+ for Read/Write, rb for Read-Only + mode_str = "rb" if dev.get('mode') == 'ro' else "rb+" + f_handle = open(dev['backing_path'], mode_str) + except Exception as e: + self.logger.error(f"Failed to open backing file {dev['backing_path']}: {e}") + return # Skip this device + + # --- 3. Send Command --- + req_bytes = bytes(req) + + result_id = yield PortalCmd(hop.HYPER_OP_MTD_CREATE, 0, len(req_bytes), None, req_bytes) + + if result_id is None or result_id < 0: + self.logger.error(f"Failed to create MTD device '{dev['name']}'") + if f_handle: f_handle.close() + else: + self.logger.info(f"Created MTD device '{dev['name']}' as mtd{result_id}") + # Map the returned ID to the file handle for the callbacks to use + if f_handle: + self._handles[result_id] = f_handle + + # --- Utility Methods --- + + def _parse_size(self, size_input): + if isinstance(size_input, int): return size_input + units = {"k": 1024, "m": 1024**2, "g": 1024**3} + match = re.match(r"(\d+)([kmgKMG]?)", str(size_input)) + if not match: raise ValueError(f"Invalid size format: {size_input}") + number, unit = match.groups() + return int(number) * units.get(unit.lower(), 1) + + def _get_personality_defaults(self, p_type): + if p_type == "nand": + return {"type": "nand", "erase_size": 131072, "write_size": 2048, "oob_size": 64} + elif p_type == "nor": + return {"type": "nor", "erase_size": 65536, "write_size": 1, "oob_size": 0} + return {} + + def _validate_and_build(self, raw_devices): + device_list = [] + used_ids = set() + + # --- Pass 1: Reserve Explicit IDs --- + for name, dev in raw_devices.items(): + if "id" in dev: + did = int(dev["id"]) + if did in used_ids: + raise ValueError(f"Duplicate device ID explicitly defined: {did} (device: '{name}')") + used_ids.add(did) + + # --- Pass 2: Assign Missing IDs Alphabetically --- + resolved_configs = [] + next_candidate_id = 0 + + for name in sorted(raw_devices.keys()): + dev = raw_devices[name] + + if "id" in dev: + final_id = int(dev["id"]) + else: + while next_candidate_id in used_ids: + next_candidate_id += 1 + final_id = next_candidate_id + used_ids.add(final_id) + + resolved_configs.append((name, final_id, dev)) + + # --- Pass 3: Build Device Nodes --- + for name, dev_id, dev in resolved_configs: + + model = dev.get("model") + final_size = 0 + backing_path = None + + if model == "backing_file": + backing_path = dev.get("backing_file") + if not backing_path: + raise ValueError(f"Device '{name}' (backing_file) missing 'backing_file' path") + + backing_path = os.path.abspath(os.path.expanduser(backing_path)) + if not os.path.exists(backing_path): + raise FileNotFoundError(f"File not found: {backing_path}") + + final_size = os.path.getsize(backing_path) + + elif model == "zeros": + if "size" in dev: + final_size = self._parse_size(dev["size"]) + else: + # Pick a reasonable default based on the intended technology + # NAND defaults to 256MB, NOR defaults to 16MB + p_type_guess = dev.get("personality", {}).get("type", "nand") + if p_type_guess == "nor": + final_size = 16 * 1024 * 1024 # 16MB + else: + final_size = 256 * 1024 * 1024 # 256MB + + else: + raise ValueError(f"Unknown model type '{model}' for device '{name}'") + + # Personality & Geometry + raw_pers = dev.get("personality", {}) + p_type = raw_pers.get("type", "nand") + personality = self._get_personality_defaults(p_type) + + if "erase_size" in raw_pers: + personality["erase_size"] = self._parse_size(raw_pers["erase_size"]) + if "write_size" in raw_pers: + personality["write_size"] = self._parse_size(raw_pers["write_size"]) + if "oob_size" in raw_pers: + personality["oob_size"] = int(raw_pers["oob_size"]) + + device_list.append({ + "name": name, + "id": dev_id, + "model": model, + "mode": dev.get("mode", "rw"), + "total_size": final_size, + "backing_path": backing_path, + "geometry": personality + }) + + device_list.sort(key=lambda x: x["id"]) + + return device_list \ No newline at end of file diff --git a/pyplugins/hyperfile/procfs.py b/pyplugins/hyperfile/procfs.py new file mode 100644 index 000000000..02b7d4eb3 --- /dev/null +++ b/pyplugins/hyperfile/procfs.py @@ -0,0 +1,183 @@ +from penguin import Plugin, plugins +from hyper.portal import PortalCmd +from hyper.consts import HYPER_OP as hop +from typing import List, Dict, Generator, Optional, Tuple +from hyperfile.models.base import ProcFile + + +class Proc(Plugin): + def __init__(self): + self.outdir = self.get_arg("outdir") + self.proj_dir = self.get_arg("proj_dir") + self._pending_procs: List[ProcFile] = [] + self._procs: Dict[str, ProcFile] = {} + self._proc_dirs: Dict[str, int] = {} # path -> dir id + plugins.portal.register_interrupt_handler( + "procfs", self._proc_interrupt_handler) + + def _get_overridden_methods(self, proc_file: ProcFile) -> Dict[str, callable]: + """ + Return a dict of method_name: method for all methods overridden from ProcFile. + """ + base = ProcFile + overridden = {} + for name in [ + "open", "read", "read_iter", "write", "lseek", "release", "poll", + "ioctl", "compat_ioctl", "mmap", "get_unmapped_area" + ]: + meth = getattr(proc_file, name, None) + base_meth = getattr(base, name, None) + # Use __code__ to compare function implementations + if ( + meth is not None and base_meth is not None + and hasattr(meth, "__code__") and hasattr(base_meth, "__code__") + and meth.__code__ is not base_meth.__code__ + ): + overridden[name] = meth + return overridden + + def _make_fops_struct(self, proc_file: ProcFile): + """ + Build a file_operations struct with function pointers for only the overridden methods. + """ + kffi = plugins.kffi + overridden = self._get_overridden_methods(proc_file) + + # Build the initialization dictionary dynamically + init_data = {} + for name, fn in overridden.items(): + init_data[name] = yield from kffi.callback(fn) + + return kffi.new("struct igloo_proc_ops", init_data) + + def register_proc(self, proc_file: ProcFile, path: Optional[str] =None): + """ + Register a ProcFile for later portal registration. + """ + if path: + fname = path + else: + fname = getattr(proc_file, "PATH", None) + proc_file.PATH = path + if not fname: + raise ValueError("ProcFile must define PATH or define it in register_proc") + if fname.startswith("/proc/"): + fname = fname[len("/proc/"):] # Remove leading /proc/ + if fname not in self._procs and proc_file not in self._pending_procs: + plugins.portal.queue_interrupt("procfs") + self._pending_procs.append((fname, proc_file)) + self._procs[fname] = proc_file + + def _get_or_create_proc_dir(self, dir_path: str) -> Generator[int, None, int]: + """ + Create or look up a procfs directory, returning its id. + Handles multi-level paths recursively. Root is id 0. + """ + parts = [p for p in dir_path.strip("/").split("/") if p] + if not parts: + self._proc_dirs[""] = 0 + return 0 + parent_id = 0 + cur_path = "" + for part in parts: + cur_path = cur_path + "/" + part if cur_path else part + if cur_path in self._proc_dirs: + parent_id = self._proc_dirs[cur_path] + else: + kffi = plugins.kffi + + # Dwarffi natively handles null-termination and bounds truncation for byte arrays + init_data = { + "path": part.encode("latin-1", errors="ignore"), + "parent_id": parent_id, + "replace": 0 + } + + req = kffi.new("struct portal_procfs_create_req", init_data) + req_bytes = bytes(req) + + # Now parent_id is passed for each level + result = yield PortalCmd( + hop.HYPER_OP_PROCFS_CREATE_OR_LOOKUP_DIR, + 0, + len(req_bytes), + None, + req_bytes + ) + if not result or result < 0: + raise RuntimeError(f"Failed to create/lookup proc dir '{cur_path}'") + self._proc_dirs[cur_path] = result + parent_id = result + return parent_id + + def _split_proc_path(self, path: str): + """ + Split a procfs path into (parent_dir, file_name). + If no directory, parent_dir is '' (root). + """ + path = path.strip("/") + if "/" in path: + parent, fname = path.rsplit("/", 1) + return parent, fname + else: + return "", path + + def _register_procs(self, procs: List[Tuple[str, ProcFile]]) -> Generator[int, None, None]: + """ + Register proc files with the kernel via portal. + """ + for fname, proc in procs: + # Check for /proc/self or /proc/ + norm_path = fname.strip("/") + if norm_path == "self" or (norm_path.isdigit()): + self.logger.error( + f"Cannot register special procfs path '/proc/{norm_path}': not supported." + ) + continue + + parent_dir, file_name = self._split_proc_path(fname) + parent_id = yield from self._get_or_create_proc_dir(parent_dir) + + # Validate file name + if not file_name or "/" in file_name: + self.logger.error(f"Invalid proc file name: '{file_name}' from path '{fname}'") + continue + + fops = yield from self._make_fops_struct(proc) + kffi = plugins.kffi + + init_data = { + "path": file_name.encode("latin-1", errors="ignore"), + "fops": fops, + "parent_id": parent_id, + "replace": 1 + } + + req = kffi.new("struct portal_procfs_create_req", init_data) + req_bytes = bytes(req) + + result = yield PortalCmd( + hop.HYPER_OP_PROCFS_CREATE_FILE, + 0, + len(req_bytes), + None, + req_bytes + ) + if result == 0 or result is None: + self.logger.error(f"Failed to register proc '{fname}' (kernel returned 0)") + continue + self.logger.info(f"Registered proc '{fname}' with kernel") + + def _proc_interrupt_handler(self) -> Generator[bool, None, bool]: + """ + Process pending proc registrations. + """ + if not self._pending_procs: + return False + + pending = self._pending_procs[:] + while pending: + proc = pending.pop(0) + yield from self._register_procs([proc]) + self._pending_procs.remove(proc) + return False \ No newline at end of file diff --git a/pyplugins/hyperfile/pseudofiles.py b/pyplugins/hyperfile/pseudofiles.py new file mode 100644 index 000000000..62c3030ea --- /dev/null +++ b/pyplugins/hyperfile/pseudofiles.py @@ -0,0 +1,269 @@ +import inspect +from penguin import plugins, Plugin +from hyperfile.models.base import DevFile, ProcFile, SysFile +from hyperfile.models.read import ReadZero, ReadExternalVFS, ReadExternalLegacy, ReadConstBuf, ReadEmpty, ReadFromFile +from hyperfile.models.write import WriteDiscard, WriteExternalVFS, WriteExternalLegacy, WriteToFile +from hyperfile.models.ioctl import ( + IoctlDispatcher, + IoctlReturnConst, + IoctlExternalVFS, + IoctlExternalLegacy, + IoctlZero, + IoctlPluginVFS, + IoctlPluginLegacy, +) + + +class Pseudofiles(Plugin): + def __init__(self): + self.config = self.get_arg("conf") + self._populate_hf_config() + + # 1. MAPPING LEGACY NAMES TO NEW CLASSES + # -------------------------------------- + read_models = { + "zero": ReadZero, + "empty": ReadEmpty, + "const_buf": ReadConstBuf, + "from_file": ReadFromFile, + "return_const": ReadConstBuf, # Legacy compatibility + # "default": ReadDefault # If you implement a default error mixin + } + + write_models = { + "discard": WriteDiscard, + "to_file": WriteToFile, + # "default": WriteUnhandledMixin + } + + ioctl_models = { + "return_const": IoctlZero, + "zero": IoctlReturnConst, + # "symex": IoctlSymexMixin, # If you implement symex later + } + + def _translate_kwargs(self, domain, raw_config): + """ + Converts schema keys (filename, plugin) to Mixin keys (read_filepath, write_plugin). + domain: 'read', 'write', or 'ioctl' + """ + new_kwargs = {} + + # 1. Handle Plugins (collision prone) + if "plugin" in raw_config: + new_kwargs[f"{domain}_plugin"] = raw_config["plugin"] + + if "function" in raw_config: + new_kwargs[f"{domain}_function"] = raw_config["function"] + + # 2. Handle Filenames (collision prone) + if "filename" in raw_config: + if domain == "read": + # For const_map_file or from_file + new_kwargs["read_filepath"] = raw_config["filename"] + elif domain == "write": + new_kwargs["write_filepath"] = raw_config["filename"] + + # 3. Handle 'val' (generic value) + if "val" in raw_config: + if domain == "read": + # const_buf uses 'buffer' internally in ReadBufWrapper + new_kwargs["buffer"] = raw_config["val"] + elif domain == "ioctl": + new_kwargs["ioctl_retval"] = raw_config["val"] + + # 4. Pass through non-conflicting keys (pad, size, etc) + # Careful: 'size' might be used by both read maps and base file props. + # Usually BaseFile consumes 'size' via kwargs last, so it's okay to pass through. + for k, v in raw_config.items(): + if k not in ["plugin", "function", "filename", "val", "model"]: + new_kwargs[k] = v + + return new_kwargs + + def _detect_plugin_style(self, plugin_name, func_name, type_hint): + """ + Introspects the target plugin function to determine if it matches + the legacy signature or the new VFS signature. + """ + plugin = getattr(plugins, plugin_name, None) + if not plugin: + raise ValueError(f"Plugin '{plugin_name}' not found/loaded.") + + func = getattr(plugin, func_name, None) + if not func: + raise ValueError(f"Function '{func_name}' not found in plugin '{plugin_name}'.") + + sig = inspect.signature(func) + params = list(sig.parameters.values()) + + # Filter out 'self' if it's a bound method, though inspect usually handles this. + # We count positional arguments. + + # SIGNATURE HEURISTICS based on your previous code: + # Read Legacy: (self, filename, user_buf, size, offset, details=...) -> 5 args + optional + # Read VFS: (ptregs, file, user_buf, size, loff) -> 5 args (no details kwarg usually) + + # Write Legacy: (self, fname, user_buf, size, loff, buf, details) -> 6 args + # Write VFS: (ptregs, file, user_buf, size, loff) -> 5 args + + # Ioctl Legacy: (self, filename, cmd, arg, details) -> 4 args + # Ioctl VFS: (ptregs, file, cmd, arg) -> 4 args + + # Since arg counts overlap, we look for specific naming conventions + # or the presence of 'details'/'kwargs' which legacy heavily relied on. + + param_names = [p.name for p in params] + + if type_hint == "read": + if "details" in param_names or "filename" in param_names: + return ReadExternalLegacy + return ReadExternalVFS + + elif type_hint == "write": + if "details" in param_names or "buf" in param_names: + # Legacy took the buffer content as an arg ('buf') + return WriteExternalLegacy + return WriteExternalVFS + + elif type_hint == "ioctl": + if "details" in param_names: + return IoctlExternalLegacy + return IoctlExternalVFS + + return None + + def _detect_plugin_style_ioctl(self, plugin_name, func_name): + """ + Inspects plugin to see if it uses legacy signature for IOCTL. + Returns: 'legacy' or 'vfs' + """ + plugin = getattr(plugins, plugin_name, None) + if not plugin: return 'vfs' + func = getattr(plugin, func_name, None) + if not func: return 'vfs' + + sig = inspect.signature(func) + params = [p.name for p in sig.parameters.values()] + + if "details" in params or "filename" in params: + return 'legacy' + return 'vfs' + + def _create_ioctl_handler(self, details): + """ + Creates a specific Handler object for one ioctl entry. + """ + model = details.get("model", "return_const") + + if model == "return_const": + val = details.get("val", 0) + return IoctlReturnConst(val) + + elif model == "from_plugin": + plugin_name = details.get("plugin") + func_name = details.get("function", "ioctl") + + style = self._detect_plugin_style_ioctl(plugin_name, func_name) + + if style == "legacy": + return IoctlPluginLegacy(plugin_name, func_name, details) + else: + return IoctlPluginVFS(plugin_name, func_name) + + # Fallback + return IoctlReturnConst(0) + + def _resolve_mixin(self, domain, conf): + """ + Determines the correct Mixin class to use. + If model == 'from_plugin', performs introspection. + """ + model_name = conf.get("model", "default") # e.g. "zero", "from_plugin" + + # 1. Handle Standard Models + if model_name != "from_plugin": + if domain == "read": + return self.read_models.get(model_name, ReadZero) + elif domain == "write": + return self.write_models.get(model_name, WriteDiscard) + elif domain == "ioctl": + return self.ioctl_models.get(model_name, IoctlZero) # TODO FIX + + # 2. Handle "from_plugin" + plugin_name = conf.get("plugin") + # Default function names if not provided + default_funcs = {"read": "read", "write": "write", "ioctl": "ioctl"} + func_name = conf.get("function", default_funcs[domain]) + + return self._detect_plugin_style(plugin_name, func_name, domain) + + def _create_dynamic_class(self, filename, details, BaseClass): + read_conf = details.get("read", {}) + write_conf = details.get("write", {}) + ioctl_conf = details.get("ioctl", {}) + + # Resolve Classes + R_Mixin = self._resolve_mixin("read", read_conf) + W_Mixin = self._resolve_mixin("write", write_conf) + + # Resolve Ioctl Handlers + handlers_map = {} + for cmd_key, cmd_details in ioctl_conf.items(): + if cmd_key != "*": + try: + cmd_key = int(cmd_key) + except ValueError: + pass + handlers_map[cmd_key] = self._create_ioctl_handler(cmd_details) + + # Translate Args (as defined in previous turn) + r_kwargs = self._translate_kwargs("read", read_conf) + w_kwargs = self._translate_kwargs("write", write_conf) + + # Assemble + safe_name = f"Gen_{filename.replace('/', '_')}" + bases = (R_Mixin, W_Mixin, IoctlDispatcher, BaseClass) + + all_kwargs = {**r_kwargs, **w_kwargs} + all_kwargs['ioctl_handlers'] = handlers_map + all_kwargs['path'] = filename + all_kwargs['fs'] = BaseClass.FS + + return type(safe_name, bases, {})(**all_kwargs) + + def _force_removal(self, filename): + self.config["static_files"][filename] = {"type": "delete"} + + def _populate_hf_config(self): + if not self.config or "pseudofiles" not in self.config: + return + + for filename, details in self.config.get("pseudofiles", {}).items(): + + # Determine Base Class and Registerer based on path prefix + if filename.startswith("/proc/"): + BaseClass = ProcFile + registrar = plugins.procfs.register_proc + elif filename.startswith("/dev/"): + BaseClass = DevFile + registrar = plugins.devfs.register_devfs + self._force_removal(filename) + elif filename.startswith("/sys/"): + # Sysfs uses show/store, so read/write mixins might need + # the SysfsAdapterMixin we discussed earlier, or SysFile needs + # to map read->show internally. + BaseClass = SysFile + registrar = plugins.sysfs.register_sysfs + else: + self.logger.warning(f"Unknown path type for {filename}, skipping.") + continue + # Create the object + instance = self._create_dynamic_class(filename, details, BaseClass) + + # Register it + # Note: Some legacy configs might have specific devfs major/minors + # defined in 'details'. You can extract them and pass them here. + registrar(instance, path=filename) + + self.logger.debug(f"Dynamically registered {filename} as {instance.__class__.__name__}") diff --git a/pyplugins/hyperfile/sysfs.py b/pyplugins/hyperfile/sysfs.py new file mode 100644 index 000000000..3ba36965f --- /dev/null +++ b/pyplugins/hyperfile/sysfs.py @@ -0,0 +1,157 @@ +from penguin import Plugin, plugins +from wrappers.ptregs_wrap import PtRegsWrapper +from hyper.portal import PortalCmd +from hyper.consts import HYPER_OP as hop +from typing import List, Dict, Generator, Optional, Tuple +from hyperfile.models.base import SysFile + + +class Sysfs(Plugin): + def __init__(self): + self.outdir = self.get_arg("outdir") + self.proj_dir = self.get_arg("proj_dir") + self._pending_sysfs: List[Tuple[str, SysFile, int]] = [] + self._sysfs: Dict[str, SysFile] = {} + self._sysfs_dirs: Dict[str, int] = {} # path -> dir id + plugins.portal.register_interrupt_handler( + "sysfs", self._hypersysfs_interrupt_handler) + + def _get_overridden_methods(self, sysfs_file: SysFile) -> Dict[str, callable]: + base = SysFile + overridden = {} + for name in ["show", "store"]: + meth = getattr(sysfs_file, name, None) + base_meth = getattr(base, name, None) + if ( + meth is not None and base_meth is not None + and hasattr(meth, "__code__") and hasattr(base_meth, "__code__") + and meth.__code__ is not base_meth.__code__ + ): + overridden[name] = meth + return overridden + + def _make_ops_struct(self, sysfs_file: SysFile): + kffi = plugins.kffi + overridden = self._get_overridden_methods(sysfs_file) + + # Build the initialization dictionary dynamically + init_data = {} + for name, fn in overridden.items(): + init_data[name] = yield from kffi.callback(fn) + + return kffi.new("struct igloo_sysfs_ops", init_data) + + def register_sysfs(self, sysfs_file: SysFile, path: Optional[str] = None, mode: int = 0o644): + if path: + fname = path + else: + fname = getattr(sysfs_file, "PATH", None) + sysfs_file.PATH = fname + if not fname: + raise ValueError("SysFile must define PATH or define it in register_sysfs") + if fname.startswith("/sys/"): + fname = fname[len("/sys/"):] + # Fix: _pending_sysfs contains (fname, sysfs_file, mode) + if fname not in self._sysfs and sysfs_file not in [f for _, f, _ in self._pending_sysfs]: + plugins.portal.queue_interrupt("sysfs") + self._pending_sysfs.append((fname, sysfs_file, mode)) + self._sysfs[fname] = sysfs_file + + def _get_or_create_sysfs_dir(self, dir_path: str) -> Generator[int, None, int]: + parts = [p for p in dir_path.strip("/").split("/") if p] + if not parts: + self._sysfs_dirs[""] = 0 + return 0 + parent_id = 0 + cur_path = "" + for part in parts: + cur_path = cur_path + "/" + part if cur_path else part + if cur_path in self._sysfs_dirs: + parent_id = self._sysfs_dirs[cur_path] + else: + kffi = plugins.kffi + + init_data = { + "parent_id": parent_id, + "replace": 0, + "mode": 0o755, + "path": part.encode("latin-1", errors="ignore") + } + + req = kffi.new("struct portal_sysfs_create_req", init_data) + req_bytes = bytes(req) + + result = yield PortalCmd( + hop.HYPER_OP_SYSFS_CREATE_OR_LOOKUP_DIR, + 0, + len(req_bytes), + None, + req_bytes + ) + if not result or result < 0: + raise RuntimeError(f"Failed to create/lookup sysfs dir '{cur_path}'") + self._sysfs_dirs[cur_path] = result + parent_id = result + return parent_id + + def _split_sysfs_path(self, path: str): + path = path.strip("/") + if "/" in path: + parent, fname = path.rsplit("/", 1) + return parent, fname + else: + return "", path + + def _register_sysfs(self, sysfs_list: List[Tuple[str, SysFile, int]]) -> Generator[int, None, None]: + for fname, sysfs_file, mode in sysfs_list: + # Require at least one directory level (e.g., "foo/bar") + norm_path = fname.strip("/") + if "/" not in norm_path: + self.logger.error( + f"Cannot register sysfs file '{fname}': must be in a directory (e.g., 'foo/bar')." + ) + continue + + parent_dir, file_name = self._split_sysfs_path(fname) + parent_id = yield from self._get_or_create_sysfs_dir(parent_dir) + + if not file_name or "/" in file_name: + self.logger.error(f"Invalid sysfs file name: '{file_name}' from path '{fname}'") + continue + + ops = yield from self._make_ops_struct(sysfs_file) + kffi = plugins.kffi + + init_data = { + "path": file_name.encode("latin-1", errors="ignore"), + "ops": ops, + "parent_id": parent_id, + "replace": 1, + "mode": mode + } + + req = kffi.new("struct portal_sysfs_create_req", init_data) + req_bytes = bytes(req) + + result = yield PortalCmd( + hop.HYPER_OP_SYSFS_CREATE_FILE, + 0, + len(req_bytes), + None, + req_bytes + ) + if result == 0 or result is None: + self.logger.error(f"Failed to register sysfs '{fname}' (kernel returned 0)") + continue + self.logger.info(f"Registered sysfs '{fname}' with kernel") + + def _hypersysfs_interrupt_handler(self) -> Generator[bool, None, bool]: + if not self._pending_sysfs: + return False + + pending = self._pending_sysfs[:] + self._pending_sysfs.clear() + while pending: + sysfs = pending.pop(0) + yield from self._register_sysfs([sysfs]) + return False diff --git a/pyplugins/interventions/hyperfile.py b/pyplugins/interventions/hyperfile.py deleted file mode 100644 index 64d68120a..000000000 --- a/pyplugins/interventions/hyperfile.py +++ /dev/null @@ -1,586 +0,0 @@ -""" -HyperFile Plugin -================ - -This module implements the HyperFile plugin for the Penguin framework, enabling -hypercall-based file operations between a guest and the host. It provides a model -for virtual files that can be read, written, or controlled via ioctl/getattr -operations from the guest OS. The plugin is designed to be flexible and extensible, -allowing users to specify custom file behaviors via models. - -Features --------- - -- Handles hypercalls for file operations (read, write, ioctl, getattr) -- Supports dynamic file models for custom device/file behaviors -- Logs and tracks file operation results for analysis -- Provides default behaviors for unhandled operations - -Example Usage -------------- - -.. code-block:: python - - from pyplugins.interventions.hyperfile import HyperFile - - # Register the plugin with Penguin, specifying file models and log file - plugin = HyperFile() - -File Model Example ------------------- - -.. code-block:: python - - files = { - "/dev/zero": { - fops.HYP_READ: HyperFile.read_zero, - fops.HYP_WRITE: HyperFile.write_discard, - "size": 0, - } - } - -Classes -------- - -- HyperFile: Main plugin class implementing the hypercall interface. - -Functions ---------- - -- hyper(name: str) -> int: Map operation name to hyperfile operation constant. -- hyper2name(num: int) -> str: Map hyperfile operation constant to operation name. - -""" - -import struct -from typing import Any, Dict, Tuple -from penguin import Plugin, plugins -from hyper.consts import igloo_hypercall_constants as iconsts -from hyper.consts import hyperfs_ops as hops -from hyper.consts import hyperfs_file_ops as fops - -HYP_RETRY = 0xdeadbeef - -try: - from penguin import yaml -except ImportError: - import yaml - - -def hyper(name: str) -> int: - """ - **Map a string operation name to its corresponding hyperfile operation constant.** - - **Parameters** - - `name` (`str`): The operation name ("read", "write", "ioctl", "getattr"). - - **Returns** - - `int`: The corresponding hyperfile operation constant. - - **Raises** - - `ValueError`: If the operation name is unknown. - """ - if name == "read": - return fops.HYP_READ - elif name == "write": - return fops.HYP_WRITE - elif name == "ioctl": - return fops.HYP_IOCTL - elif name == "getattr": - return fops.HYP_GETATTR - raise ValueError(f"Unknown hyperfile operation {name}") - - -def hyper2name(num: int) -> str: - """ - **Map a hyperfile operation constant to its string operation name.** - - **Parameters** - - `num` (`int`): The hyperfile operation constant. - - **Returns** - - `str`: The operation name. - - **Raises** - - `ValueError`: If the operation constant is unknown. - """ - if num == fops.HYP_READ: - return "read" - elif num == fops.HYP_WRITE: - return "write" - elif num == fops.HYP_IOCTL: - return "ioctl" - elif num == fops.HYP_GETATTR: - return "getattr" - raise ValueError(f"Unknown hyperfile operation {num}") - - -class HyperFile(Plugin): - """ - **The HyperFile plugin implements a virtual file interface for the guest OS, - allowing the guest to perform file operations via hypercalls.** - - **Attributes** - - `arch_bytes` (`int`): Number of bytes per architecture word. - - `log_file` (`Optional[str]`): Path to the log file for operation results. - - `files` (`Optional[Dict[str, Dict]]`): File models for virtual devices. - - `logger` (`Any`): Logger instance. - - `endian` (`str`): Endianness format for struct packing. - - `s_word`, `u_word` (`str`): Signed/unsigned word format for struct packing. - - `results` (`Dict`): Stores results of file operations for logging. - - `default_model` (`Dict`): Default model for unhandled file operations. - """ - - def __init__(self) -> None: - """ - **Initialize the HyperFile plugin, set up file models, logging, and - register hypercall handlers.** - - **Returns** - - `None` - """ - panda = self.panda - self.arch_bytes = panda.bits // 8 - self.log_file = self.get_arg("log_file") - self.files = self.get_arg("models") - self.logger = self.get_arg("logger") - - # Struct format strings for endianness and word size - self.endian = '<' if panda.endianness == 'little' else '>' - self.s_word, self.u_word = 'iI' if panda.bits == 32 else 'qQ' - - if self.files is None: - # We can be imported without files, but we'll ignore it - return - - if self.log_file: - # Initialize a blank file so we can tail it - open(self.log_file, "w").close() - - # We track when processes access or IOCTL files we've added here: - self.results = {} # path: {event: ... } - # event="read": {bytes_read: X, data: "0"} - # event="write": {bytes_written: X, data: ...} - # event="icotl": {mode: {count: X, rv: Y}} - - assert isinstance( - self.files, dict), f"Files should be dict, not {self.files}" - - self.default_model = { - fops.HYP_READ: self.read_unhandled, - fops.HYP_WRITE: self.write_unhandled, - fops.HYP_IOCTL: self.ioctl, - fops.HYP_GETATTR: self.getattr, - "size": 0, - } - - # files = {filename: {'read': func, 'write': func, 'ioctl': func}}} - - # On hypercall we dispatch to the appropriate handler: read, write, - # ioctl - @panda.hypercall(iconsts.IGLOO_HYPERFS_MAGIC) - def before_hypercall(cpu): - # We pass args in the arch-syscall ABI specified in pypanda's arch.py - # arm: x8/r7 r0, r1, r2 - # mips: v0, a0, a1, a2 - hc_type = panda.arch.get_arg(cpu, 1, convention="syscall") - if hc_type == hops.HYP_FILE_OP: - self.handle_file_op(cpu) - elif hc_type == hops.HYP_GET_NUM_HYPERFILES: - self.handle_get_num_hyperfiles(cpu) - elif hc_type == hops.HYP_GET_HYPERFILE_PATHS: - self.handle_get_hyperfile_paths(cpu) - - def handle_get_num_hyperfiles(self, cpu: Any) -> None: - """ - **Handle the hypercall to get the number of hyperfiles.** - - **Parameters** - - `cpu` (`Any`): The CPU context from Panda. - - **Returns** - - `None` - """ - num_hyperfiles_addr = self.panda.arch.get_arg( - cpu, 2, convention="syscall") - try: - plugins.mem.write_bytes_panda( - cpu, - num_hyperfiles_addr, - struct.pack(f"{self.endian} {self.u_word}", len(self.files)), - ) - except ValueError: - # Memory r/w failed - tell guest to retry - self.panda.arch.set_retval(cpu, HYP_RETRY) - self.logger.debug( - "Failed to read/write number of hyperfiles from guest - retry") - - def handle_get_hyperfile_paths(self, cpu: Any) -> None: - """ - **Handle the hypercall to get the paths of all hyperfiles.** - - **Parameters** - - `cpu` (`Any`): The CPU context from Panda. - - **Returns** - - `None` - """ - hyperfile_paths_array_ptr = self.panda.arch.get_arg( - cpu, 2, convention="syscall") - n = len(self.files) - hyperfile_paths_ptrs = [None] * n - for i in range(n): - try: - hyperfile_paths_ptrs[i] = self.panda.virtual_memory_read( - cpu, - hyperfile_paths_array_ptr + i * self.arch_bytes, - self.arch_bytes, - fmt="int", - ) - except ValueError: - self.panda.arch.set_retval(cpu, HYP_RETRY) - self.logger.debug( - "Failed to read hyperfile path ptr from guest - retry") - return - for path, buf in zip(self.files.keys(), hyperfile_paths_ptrs): - try: - plugins.mem.write_bytes_panda(cpu, buf, path.encode()) - except ValueError: - self.panda.arch.set_retval(cpu, HYP_RETRY) - self.logger.debug( - "Failed to write hyperfile path to guest - retry") - return - - def handle_file_op(self, cpu: Any) -> None: - """ - **Handle a file operation hypercall (read, write, ioctl, getattr).** - - **Parameters** - - `cpu` (`Any`): The CPU context from Panda. - - **Returns** - - `None` - """ - header_fmt = f"{self.endian} i {self.u_word}" - read_fmt = write_fmt = f"{self.endian} {self.u_word} {self.u_word} q" - ioctl_fmt = f"{self.endian} I {self.u_word}" - getattr_fmt = f"{self.endian} {self.u_word}" - hyperfs_data_size = struct.calcsize( - header_fmt) + max(struct.calcsize(fmt) for fmt in (read_fmt, write_fmt, ioctl_fmt)) - - buf_addr = self.panda.arch.get_arg(cpu, 2, convention="syscall") - try: - buf = plugins.mem.read_bytes_panda(cpu, buf_addr, hyperfs_data_size) - except ValueError: - # Memory read failed - tell guest to retry - self.panda.arch.set_retval(cpu, HYP_RETRY) - self.logger.debug( - "Failed to read hyperfile struct from guest - retry") - return - - # Unpack request with our dynamic format string - type_val, path_ptr = struct.unpack_from(header_fmt, buf) - try: - device_name = plugins.mem.read_str_panda(cpu, path_ptr) - except ValueError: - # Memory read failed - tell guest to retry - self.panda.arch.set_retval(cpu, HYP_RETRY) - self.logger.debug( - "Failed to read hyperfile struct from guest - retry") - return - - if not len(device_name): - # XXX: why does this happen? Probably a bug somewhere else? - self.logger.warning( - "Empty device name in hyperfile request - ignore") - self.panda.arch.set_retval( - cpu, self.panda.to_unsigned_guest(-22), failure=True) - return - - sub_offset = struct.calcsize(header_fmt) - - # Ensure we have a model - if we don't, warn and add default - if device_name not in self.files: - self.logger.warning( - f"Detected {hyper2name(type_val)} event on device {repr(device_name)} but device is not in config. Using defaults.") - self.files[device_name] = { - k: v for k, v in self.default_model.items()} # XXX can't use deepcopy - - model = self.files[device_name] - # Ensure our model specifies the current behavior - if not, warn and - # add default - if type_val not in model: - if not (type_val == fops.HYP_GETATTR and "size" in model): - # If we have a size, we can handle getattr with out default - # method (return size) and it's fine. Otherwise warn - self.logger.warning( - f"Detected {hyper2name(type_val)} event on device {repr(device_name)} but this event is not modeled in config. Using default.") - model[type_val] = self.default_model[type_val] - - # Dispatch based on the type of operation - if type_val == fops.HYP_READ: - buffer, length, offset = struct.unpack_from( - read_fmt, buf, sub_offset) - new_buffer, retval = model[type_val]( - device_name, buffer, length, offset) - - # We need to write new_buffer back into the struct at buffer - # XXX: sizes? overflows? - if len(new_buffer): - try: - plugins.mem.write_bytes_panda(cpu, buffer, new_buffer) - except ValueError: - self.logger.warning( - f"After reading hyperfile {device_name} failed to write result into guest memory at {buffer:x} - retry") - self.panda.arch.set_retval(cpu, HYP_RETRY) - # XXX: If we ever have stateful files, we'll need to tell - # it the read failed - return - - self.handle_result(device_name, "read", retval, length, new_buffer) - - elif type_val == fops.HYP_WRITE: - buffer, length, offset = struct.unpack_from( - write_fmt, buf, sub_offset) - # We're writing data into our pseudofile. First we need to read what the guest - # has given us as data to write - # XXX offset is _internal_ to our data structures, it's how far into the file - # we've seeked. It's NOT related to the guest buffer - try: - contents = plugins.mem.read_bytes_panda(cpu, buffer, length) - except ValueError: - self.logger.warning( - f"Before writing to hyperfile {device_name} failed to read data out of guest memory at {buffer:x} with offset {offset:x}") - self.panda.arch.set_retval(cpu, HYP_RETRY) - # XXX: We might be able to get stuck in a loop here if hyperfs isn't paging in - # what we expect - return - - retval = model[type_val]( - device_name, buffer, length, offset, contents) - self.handle_result( - device_name, - "write", - retval, - length, - offset, - contents) - - elif type_val == fops.HYP_IOCTL: - cmd, arg = struct.unpack_from(ioctl_fmt, buf, sub_offset) - retval = model[type_val](device_name, cmd, arg) - self.handle_result(device_name, "ioctl", retval, cmd, arg) - - elif type_val == fops.HYP_GETATTR: - retval, size_data = model[type_val](device_name, model) - size_bytes = struct.pack(f"{self.endian} q", size_data) - self.handle_result(device_name, "getattr", retval, size_data) - - size_ptr, = struct.unpack_from(getattr_fmt, buf, sub_offset) - try: - plugins.mem.write_bytes_panda(cpu, size_ptr, size_bytes) - except ValueError: - self.logger.debug( - "Failed to write hyperfile size into guest - retry(?)") - self.panda.arch.set_retval(cpu, HYP_RETRY) - return - - self.panda.arch.set_retval(cpu, self.panda.to_unsigned_guest(retval)) - - def handle_result(self, device_name: str, event: str, - retval: int, *data: Any) -> None: - """ - **Record the result of a file operation for logging and analysis.** - - **Parameters** - - `device_name` (`str`): The name of the device/file. - - `event` (`str`): The event type ("read", "write", "ioctl", "getattr"). - - `retval` (`int`): The return value of the operation. - - `*data` (`Any`): Additional data relevant to the event. - - **Returns** - - `None` - """ - if device_name not in self.results: - self.results[device_name] = {} - - if event not in self.results[device_name]: - self.results[device_name][event] = [] - - if event == "read": - requested_length, buffer = data - buffer = buffer.decode("utf-8", errors="ignore") - result = { - "readval": retval, - "bytes_requested": requested_length, - "data": buffer, - } - - elif event == "write": - length, offset, buffer = data - buffer = buffer.decode("utf-8", errors="ignore") - result = { - "retval": retval, - "bytes_requested": length, - "offset": offset, - "data": buffer, - } - - elif event == "ioctl": - cmd, arg = data - result = { - "cmd": cmd, - "arg": arg, - "retval": retval, - } - elif event == "getattr": - result = { - "size": data[0], - "retval": retval, - } - else: - raise ValueError(f"Unknown event {event}") - self.results[device_name][event].append(result) - - # XXX TESTING ONLY, dump log in a stream? - # with open(self.log_file, "w") as f: - # yaml.dump(self.results, f) - - # event="read": {bytes_read: X, data: "0"} - # event="write": {bytes_written: X, data: ...} - # event="icotl": {mode: {count: X, rv: Y}} - - # Function to handle read operations - @staticmethod - def read_zero(devname: str, buffer: int, length: int, - offset: int) -> Tuple[bytes, int]: - """ - **Return a buffer of zero bytes for read operations.** - - **Parameters** - - `devname` (`str`): Device name. - - `buffer` (`int`): Guest buffer address. - - `length` (`int`): Number of bytes to read. - - `offset` (`int`): Offset into the file. - - **Returns** - - `Tuple[bytes, int]`: (Data read, number of bytes read) - """ - data = b"0" - final_data = data[offset: offset + length] - return (final_data, len(final_data)) # data, rv - - # Function to handle write operations - @staticmethod - def write_discard(devname: str, buffer: int, length: int, - offset: int, contents: bytes) -> int: - """ - **Discard written data and return the number of bytes written.** - - **Parameters** - - `devname` (`str`): Device name. - - `buffer` (`int`): Guest buffer address. - - `length` (`int`): Number of bytes to write. - - `offset` (`int`): Offset into the file. - - `contents` (`bytes`): Data to write. - - **Returns** - - `int`: Number of bytes written. - """ - return length - - @staticmethod - def ioctl(devname: str, cmd: int, arg: int) -> int: - """ - **Handle an ioctl operation (default: always succeeds).** - - **Parameters** - - `devname` (`str`): Device name. - - `cmd` (`int`): IOCTL command. - - `arg` (`int`): IOCTL argument. - - **Returns** - - `int`: Return value (0 for success). - """ - return 0 - - @staticmethod - def ioctl_unhandled(devname: str, cmd: int, arg: int) -> int: - """ - **Handle an unhandled ioctl operation.** - - **Parameters** - - `devname` (`str`): Device name. - - `cmd` (`int`): IOCTL command. - - `arg` (`int`): IOCTL argument. - - **Returns** - - `int`: Return value (-25 for ENOTTY). - """ - return -25 # -ENOTTY - - @staticmethod - def read_unhandled(filename: str, buffer: int, length: int, - offset: int) -> Tuple[bytes, int]: - """ - **Handle an unhandled read operation.** - - **Parameters** - - `filename` (`str`): File name. - - `buffer` (`int`): Guest buffer address. - - `length` (`int`): Number of bytes to read. - - `offset` (`int`): Offset into the file. - - **Returns** - - `Tuple[bytes, int]`: (Empty bytes, -22 for EINVAL) - """ - return (b"", -22) # -EINVAL - - @staticmethod - def write_unhandled(filename: str, buffer: int, - length: int, offset: int, contents: bytes) -> int: - """ - **Handle an unhandled write operation.** - - **Parameters** - - `filename` (`str`): File name. - - `buffer` (`int`): Guest buffer address. - - `length` (`int`): Number of bytes to write. - - `offset` (`int`): Offset into the file. - - `contents` (`bytes`): Data to write. - - **Returns** - - `int`: Return value (-22 for EINVAL). - """ - return -22 # -EINVAL - - @staticmethod - def getattr(device_name: str, model: Dict[str, Any]) -> Tuple[int, int]: - """ - **Handle a getattr operation, returning the file size.** - - **Parameters** - - `device_name` (`str`): Device name. - - `model` (`Dict[str, Any]`): File model dictionary. - - **Returns** - - `Tuple[int, int]`: (Return value, file size) - """ - """ - Return retval, size to write into buffer. - Note we could refactor this to be different and take in the panda object as an arg - and handle writing the getattr results into memory. For now we're just returning - a retval + size that's getting written into guest memory by the caller. - """ - return 0, model.get("size", 0) - - def uninit(self) -> None: - """ - **Dump the results to the log file on plugin unload.** - - **Returns** - - `None` - """ - if self.log_file is not None: - with open(self.log_file, "w") as f: - yaml.dump(self.results, f) diff --git a/pyplugins/interventions/pseudofiles.py b/pyplugins/interventions/pseudofiles.py deleted file mode 100644 index f86538cfd..000000000 --- a/pyplugins/interventions/pseudofiles.py +++ /dev/null @@ -1,1287 +0,0 @@ -""" -Pseudofiles Plugin -================== - -This plugin creates, manages, and suggests new pseudofiles. -It is configurable via the Penguin project config file. - -Purpose -------- - -- Registers all pseudofile models as hyperfiles. -- Handles read, write, and ioctl commands issued to pseudofiles. -- Monitors for access attempts to missing files and suggests adding them as pseudofiles. - -Usage ------ - -Place pseudofile models in the Penguin project config. - -Example -------- - -/dev/mydevice: - read: - model: zero - write: - model: discard - ioctl: - '*': - model: return_const - val: 0 - -""" - -import logging -import re -from os.path import dirname, isfile, isabs -from os.path import join as pjoin -from sys import path as syspath -from penguin import Plugin, plugins, yaml - -syspath.append(dirname(__file__)) - -KNOWN_PATHS = [ - "/dev/", - "/dev/pts", - "/sys", - "/proc", - "/run", - # Directories not in static FS that are added by igloo_init (mostly - # irrelevant with wrong prefixes) - "/tmp", - "/dev/ttyS0", - "/dev/console", - "/dev/root", - "/dev/ram", - "/dev/ram0" # We set these up in our init script, common device types - "/dev/null", - "/dev/zero", - "/dev/random", - "/dev/urandom", # Standard devices we should see in devtmpfs - # TODO: pull in devices like how we do during static analysis (e.g., - # /resources/proc_sys.txt) - "/proc/penguin_net", -] - - -# Missing files go into our first log -outfile_missing = "pseudofiles_failures.yaml" -# Files we're modeling go into the second. Particularly useful for defaults -outfile_models = "pseudofiles_modeled.yaml" -MAGIC_SYMEX_RETVAL = 999 - - -def path_interesting(path): - """ - Determines if a path is likely to contain a good candidate for pseudofiles. - - Parameters - ---------- - path : str - File path. - - Returns - ------- - bool - """ - if "/pipe:[" in path: - return False - - if "\\" in path: - # non-printable chars get escaped somewhere - # These are junk - return False - - if path.startswith("/dev/"): - return True - - if path.startswith("/proc/"): - return True - - if path.startswith("/sys/"): - return True - - return False - - -def proc_interesting(path): - """ - Determines if a process is relevant to overall rehosting. - - Parameters - ---------- - path : str - File path. - - Returns - ------- - bool - """ - # Avoid standard procfs files - # Transformed PID references - if path.startswith("/proc/PID"): - return False - if path.startswith("/proc/self"): - return False - if path.startswith("/proc/thread-self"): - return False - - return path.startswith("/proc/") - - -def ignore_cmd(ioctl): - """ - Ignore TTY ioctls, see ioctls.h for T*, TC*, and TIO* ioctls - - Parameters - ---------- - ioctl : int - Ioctl command number. - - Returns - ------- - bool - """ - if ioctl >= 0x5400 and ioctl <= 0x54FF: - return True - return False - - -def ignore_ioctl_path(path): - """ - Filter out ioctl paths that are irrelevant to rehosting. - - Parameters - ---------- - path : str - File path. - - Returns - ------- - bool - """ - # Paths we don't care about: - # /firmadyne/libnvram anything - this reveals the nvram values read though - # socket:{RAW,UDP,TCP,...} - # /proc/*/{mounts,stat,cmdline} - boring? - if path.startswith("/firmadyne/libnvram"): - return True - if path.startswith("/proc/"): - return True - if path.startswith("socket:"): - # XXX We do want to log socket failures and eventually model them! - return True - if "/pipe:[" in path: - return True - return False - - -# Closure so we can pass details through -def make_rwif(details, fn_ref): - def rwif(*args): - return fn_ref(*args, details) - - return rwif - - -def get_total_counts(d): - """ - Get the sum of all "count" values of a nested dictionary - - Parameters - ---------- - d : dict - Dictionary with values to count. - - Returns - ------- - int - """ - return ( - ( - d["count"] - if "count" in d else - sum(map(get_total_counts, d.values())) - ) - if isinstance(d, dict) else 0 - ) - - -def sort_file_failures(d): - """ - Get a sorted version of the file failures dictionary. - - Parameters - ---------- - d : dict - Dictionary to sort. - - Returns - ------- - dict - """ - # This relies on dict iteration being the same as insertion order, - # which is an implementation detail in CPython, - # but OrderedDict is harder to serialize with pyyaml. - return ( - dict( - sorted( - ((k, sort_file_failures(v)) for k, v in d.items()), - key=lambda pair: get_total_counts(pair[1]), - reverse=True, - ) - ) - if isinstance(d, dict) else d - ) - - -class Pseudofiles(Plugin): - """ - Pseudofiles Plugin - ================== - - Creates and manages pseudofiles and guest interactions with them. - Also suggests new paths to add to modeled pseudofiles. - - Attributes - ---------- - outdir : str - Output directory for logs. - proj_dir : str - Project directory, used to find host files. - conf : str - Penguin project config. - verbose : bool - If True, enable verbose logger output. - logging : str - Determines which log output files will be generated. - Can be 'all', 'modeled', 'missing', or 'none'. - - Behavior - -------- - - Create hyperfile to replace or create a new file in the guest. - - Subscribe to all interactions with that hyperfile, and use configuration models - for read, write, and ioctl commands. - """ - def __init__(self): - """ - Initialize pseudofiles plugin. - Creates hyperfiles of all modeled pseudofiles and creates log files. - - Returns - ------- - None - """ - self.outdir = self.get_arg("outdir") - self.proj_dir = self.get_arg("proj_dir") - self.written_data = {} # filename -> data that was written to it - if self.get_arg( - "conf") is None or "pseudofiles" not in self.get_arg("conf"): - raise ValueError( - "No 'pseudofiles' in config: {self.get_arg('conf')}") - self.config = self.get_arg("conf") - if self.get_arg_bool("verbose"): - self.logger.setLevel(logging.DEBUG) - self.logging_enabled = self.get_arg("logging") - if self.logging_enabled is None: - self.logging_enabled = "all" # Default is all logging on - if "all" in self.logging_enabled or "missing" in self.logging_enabled: - self.log_missing = True - else: - self.log_missing = False - self.logger.info(f"logging missing pseudofiles: {self.log_missing}") - if "all" in self.logging_enabled or "modeled" in self.logging_enabled: - self.log_modeled = True - else: - self.log_modeled = False - self.logger.info(f"logging modeled pseudofiles: {self.log_modeled}") - self.did_mtd_warn = False # Set if we've warned about misconfigured MTD devices - # XXX: It has seemed like this should be 1 for some architectures, but - # that can't be right? - self.ENOENT = 2 - self.warned = set() # Syscalls we don't know about that we've seen - - # We track when processes try accessing or IOCTLing on missing files - # here: - self.file_failures = ( - {} - ) # path: {event: {count: X}}. Event is like open/read/ioctl/stat/lstat. - - self.devfs = [] - self.procfs = [] - self.sysfs = [] - # self.last_symex = None - self.warned = set() - self.need_ioctl_hooks = False - self.hf_config = self.populate_hf_config() - - self.logger.debug("Registered pseudofiles:") - for filename, details in self.hf_config.items(): - self.logger.debug(f" {filename}") - - # filename -> {read: model, write: model, ioctls: model} - # Coordinates with hyperfile for modeling behavior! - # Can we just pass our config straight over and load both? - # Need to implement read, write, and IOCTLs - # IOCTLs with symex gets scary, others are easy though? - from hyperfile import HyperFile - if self.log_modeled: - plugins.load( - HyperFile, - { - "models": self.hf_config, - "log_file": pjoin(self.outdir, outfile_models), - "logger": self.logger, - }, - ) - else: - plugins.load( - HyperFile, - { - "models": self.hf_config, - "logger": self.logger, - }, - ) - # Clear results file - we'll update it as we go - if self.log_missing: - self.dump_results() - - plugins.subscribe(plugins.Events, "igloo_hyp_enoent", self.hyp_enoent) - - # Open/openat is a special case with hypercalls helping us out - # because openat requires guest introspection to resolve the dfd, but we just - # did it in the kernel - plugins.subscribe(plugins.Events, "igloo_open", self.fail_detect_opens) - plugins.subscribe( - plugins.Events, - "igloo_ioctl", - self.fail_detect_ioctl) - - # On ioctl return we might want to start symex. We detect failures with - # a special handler though - if self.need_ioctl_hooks: - plugins.syscalls.syscall("on_sys_ioctl_return")( - self.symex_ioctl_return) - - def gen_hyperfile_function(self, filename, details, ftype): - """ - Generate correct hyperfile handler. - - Parameters - ---------- - filename : str - Pseudofile path. - details : dict - Additional pseudofile model information. - ftype : str - Pseudofile type. - - Returns - ------- - function - """ - if ftype not in details or "model" not in details[ftype]: - model = "default" # default is default - else: - model = details[ftype]["model"] - - if hasattr(self, f"{ftype}_{model}"): - # Have a model specified - fn = getattr(self, f"{ftype}_{model}") - elif model == "from_plugin": - plugin_name = details[ftype]["plugin"] - plugin = getattr(plugins, plugin_name) - func = details[ftype].get("function", ftype) - if hasattr(plugin, func): - fn = getattr(plugin, func) - else: - raise ValueError( - f"Hyperfile {filename} depends on plugin {plugin} which does not have function {func}") - else: - if ftype == "ioctl": - guess = {"pseudofiles": {filename: {"*": details}}} - raise ValueError( - f"Invalid ioctl settings. Must specify ioctl number (or '*') within ioctl dictionary, then map each to a model. Did you mean: {guess}" - ) - raise ValueError( - f"Unsupported hyperfile {ftype}_{model} for {filename}: {details[ftype] if ftype in details else None}" - ) - return make_rwif( - details[ftype] if ftype in details else {}, fn - ) - - def populate_hf_config(self): - """ - Populate the hyperfile config dictionary. - - Returns - ------- - None - """ - # XXX We need this import in here, otherwise when we load psueodfiles with panda.load_plugin /path/to/pseudofiles.py - # it sees both FileFailures AND HyperFile. But we only want hyperfile to be loaded by us here, not by our caller. - # we are not currently using HYPER_WRITE so we do not import it - from hyper.consts import hyperfs_file_ops as fops - from hyperfile import (HyperFile, hyper) - HYP_IOCTL = fops.HYP_IOCTL - HYP_READ = fops.HYP_READ - hf_config = {} - for filename, details in self.config["pseudofiles"].items(): - if "logging" in filename: - continue - hf_config[filename] = {} - - for targ, prefix in [ - (self.devfs, "/dev/"), - (self.procfs, "/proc/"), - (self.sysfs, "/sys/"), - ]: - if filename.startswith(prefix): - targ.append(filename[len(prefix):]) - - hf_config[filename]["size"] = details.get("size", 0) - - # Check if any details with non /dev/mtd names has a 'name' - # property - if not filename.startswith("/dev/mtd") and "name" in details: - raise ValueError( - "Pseudofiles: name property can only be set for MTD devices" - ) - if filename.startswith("/dev/mtd") and "name" not in details: - raise ValueError( - "Pseudofiles: name property must be set for MTD devices" - ) - - for ftype in "read", "write", "ioctl": - hf_config[filename][hyper(ftype)] = self.gen_hyperfile_function( - filename, details, ftype) - if ( - ftype == "ioctl" - and ftype in details - and "model" not in details[ftype] - and any([x["model"] == "symex" for x in details[ftype].values()]) - ): - # If we have a symex model we'll need to enable some extra - # introspection - self.need_ioctl_hooks = True - - if len(self.get_arg("conf").get("netdevs", [])): - # If we have netdevs in our config, we'll make the /proc/penguin_net pseudofile with the contents of it - # Here we'll use our make_rwif closure - netdev_val = " ".join(self.get_arg("conf")["netdevs"]) - hf_config["/proc/penguin_net"] = { - HYP_READ: make_rwif({"val": netdev_val}, self.read_const_buf), - "size": len(netdev_val), - } - - hf_config["/proc/mtd"] = { - # Note we don't use our make_rwif closure helper here because these - # are static - HYP_READ: self.proc_mtd_check, - HYP_IOCTL: HyperFile.ioctl_unhandled, - "size": 0, - } - return hf_config - - def symex_ioctl_return(self, regs, proto, syscall, fd, cmd, arg): - """ - Replace ioctl return value to signal that we should start symbolic execution. - - Parameters - ---------- - regs : PtRegsWrapper - CPU registers. - proto : Any - Protocol or plugin-specific context. - syscall : int - Syscall number. - fd : int - File descriptor. - cmd : int - Ioctl command number. - arg : int - Optional additional pointer to a buffer in memory. - - Returns - ------- - None - """ - # We'll return -999 as a magic placeholder value that indicates we should - # Start symex. Is this a terrible hack. You betcha! - rv = syscall.retval - - if rv != MAGIC_SYMEX_RETVAL: - return - - if not hasattr(self, "symex"): - # Initialize symex on first use - from symex import PathExpIoctl - - self.symex = PathExpIoctl(self.outdir, self.config["core"]["fs"]) - - # Look through our config and find the filename with a symex model - # XXX: This is a bit of a hack - we're assuming we only have one symex - # model - filename = None - for fname, file_model in self.config["pseudofiles"].items(): - if "ioctl" in file_model: - for _, model in file_model["ioctl"].items(): - if model["model"] == "symex": - filename = fname - break - - if filename is None: - raise ValueError( - "No filename with symex model found in config, but we got a symex ioctl. Unexpected" - ) - cpu = self.panda.get_cpu() - # It's time to launch symex! - self.symex.do_symex(self.panda, cpu, syscall.pc, filename, cmd) - - # We write down the "failure" so we can see that it happened (and know to query symex - # to get results) - self.log_ioctl_failure(filename, cmd) - - # set retval to 0 with no error. - syscall.retval = 0 - - def hyp_enoent(self, cpu, file): - """ - Log files that return ENOENT. - - Parameters - ---------- - cpu : Any - CPU context from PANDA. - file : str - File path of hyperfile. - - Returns - ------- - None - """ - if any(file.startswith(x) for x in ("/dev/", "/proc/", "/sys/")): - self.centralized_log(file, "syscall") - - ####################################### - def centralized_log(self, path, event, event_details=None): - """ - Log potential pseudofile candidates. - - Parameters - ---------- - path : str - File path. - event : str - Event which triggered the file to be found as missing or interesting. - event_details : Any - Additional context, defaults to None. - - Returns - ------- - None - """ - # Log a failure to open a given path if it's interesting - # We just track count - if not path_interesting(path): - return - - if path.startswith("/proc/"): - # replace /proc/ with /proc/ to avoid a ton of different - # paths - path = re.sub(r"/proc/\d+", "/proc/PID", path) - - if path not in self.file_failures: - self.file_failures[path] = {} - - if event not in self.file_failures[path]: - self.file_failures[path][event] = {"count": 0} - - if "count" not in self.file_failures[path][event]: - # If we ioctl'd before opening, we'll have a count-free entry - self.file_failures[path][event]["count"] = 0 - - self.file_failures[path][event]["count"] += 1 - - if event_details is not None: - if "details" not in self.file_failures[path][event]: - self.file_failures[path][event]["details"] = [] - self.file_failures[path][event]["details"].append(event_details) - - def proc_mtd_check(self, filename, buffer, length, offset, details=None): - """ - The guest is reading /proc/mtd. We should populate this file - dynamically based on the /dev/mtd* devices we've set up. - - These devices have a name in addition to other properties: - /dev/mtd0: - name: mymtdname - read: - model: return_const - buf: "foo" - - Parameters - ---------- - filename : str - Path to mtd file. - buffer : int - Pointer to buffer. - length : int - Length of buffer. - offset : int - Offset into buffer. - details : Any - Additional context. - - Returns - ------- - tuple - """ - - assert filename == "/proc/mtd" - - # For each device in our config that's /dev/mtdX, we'll add a line to the buffer - # Buffer size is limited to 512 in kernel for now. - buf = "" - did_warn = False - for filename, details in self.config["pseudofiles"].items(): - if not filename.startswith("/dev/mtd"): - continue - - idx = filename.split("/dev/mtd")[1] - if idx.startswith("/"): # i.e., /dev/mtd/0 -> 0 - idx = idx[1:] - - if not idx.isdigit(): - if not self.did_mtd_warn: - did_warn = True - self.logger.warning( - f"Mtd device {filename} is non-numeric. Skipping in /proc/mtd report" - ) - continue - - if "name" not in details: - if not self.did_mtd_warn: - did_warn = True - self.logger.warning( - f"Mtd device {filename} has no name. Skipping in /proc/mtd report" - ) - continue - - buf += 'mtd{}: {:08x} {:08x} "{}"\n'.format( - int(idx), 0x1000000, 0x20000, details["name"] - ) - - if did_warn: - self.did_mtd_warn = True - - buf = buf[offset: offset + length].encode() - - if len(buf) == 0: - with open(pjoin(self.outdir, "pseudofiles_proc_mtd.txt"), "w") as f: - f.write("/proc/mtd was read with no devices in config") - - # The guest read /proc/mtd, but we didn't have anything set up in it! Perhaps - # it's looking for a device of a specific name - potential failure we can mitigate - # self.file_failures['/proc/mtd'] = {'read': {'count': 1, 'details': 'special: no mtd devices in pseudofiles'}} - - return (buf, len(buf)) - - def fail_detect_ioctl(self, cpu, fname, cmd): - """ - Detect a failed ioctl call via return value. - - Parameters - ---------- - cpu : Any - CPU context from PANDA. - fname : str - File path to device. - cmd : int - Command number of ioctl. - - Returns - ------- - None - """ - # A regular (non-dyndev) device was ioctl'd and is returning -ENOTTY so - # our hypercall triggers - self.log_ioctl_failure(fname, cmd) - - def fail_detect_opens(self, cpu, fname, fd): - """ - Triggers on failed open calls. - - Parameters - ---------- - cpu : Any - CPU context from PANDA. - fname : str - File path. - fd : int - File descriptor. - - Returns - ------- - None - """ - fd = self.panda.from_unsigned_guest(fd) - - if fd == -self.ENOENT: - # enoent let's gooooo - self.centralized_log(fname, "open") - - def log_ioctl_failure(self, path, cmd): - """ - Format and write ioctl failures to logs. - - Parameters - ---------- - path : str - File path of device with ioctl failure. - cmd : int - Command number of ioctl that failed. - - Returns - ------- - None - """ - # This might trigger twice, depending on the -ENOTTY path - # between our dyndev ioctl handler and do_vfs_ioctl? - - if ignore_ioctl_path(path) or ignore_cmd(cmd): - # Uninteresting - return - - if path not in self.file_failures: - self.file_failures[path] = {} - - if "ioctl" not in self.file_failures[path]: - self.file_failures[path]["ioctl"] = {} - - first = False - if cmd not in self.file_failures[path]["ioctl"]: - self.file_failures[path]["ioctl"][cmd] = {"count": 0} - first = True - - self.file_failures[path]["ioctl"][cmd]["count"] += 1 - if first: - # The first time we see an IOCTL update our results on disk - # This is just relevant if someone's watching the output during a run - # final results are always written at the end. - if self.log_missing: - self.dump_results() - self.logger.debug(f"New ioctl failure observed: {cmd:x} on {path}") - - def read_zero(self, filename, buffer, length, offset, details=None): - """ - Simple peripheral model inspired by firmadyne/firmae. Just return 0. - If we've seen a write to this device, mix that data in with 0s - padding around it. - - Parameters - ---------- - filename : str - File path of pseudofile/peripheral. - buffer : int - Pointer to buffer to fill with 0s. - length : int - Length of read. - offset : int - Offset into device being read from. - details : Any - Additional device context. - - Returns - ------- - tuple - """ - data = b"0" - if filename in self.written_data: - data = self.written_data[filename] - - final_data = data[offset: offset + length] - # XXX if offset > len(data) should we return an error instead of 0? - return (final_data, len(final_data)) # data, rv - - def read_one(self, filename, buffer, length, offset, details=None): - """ - Simple peripheral model to return a read of '1'. - - Parameters - ---------- - filename : str - File path of pseudofile/peripheral. - buffer : int - Pointer to buffer to write to. - length : int - Length of read. - offset : int - Offset into device being read from. - details : Any - Additional device context. - - Returns - ------- - tuple - """ - data = b"1" - if filename in self.written_data: - data = self.written_data[filename] - - final_data = data[offset: offset + length] - # XXX if offset > len(data) should we return an error instead of 0? - return (final_data, len(final_data)) # data, rv - - def read_empty(self, filename, buffer, length, offset, details=None): - """ - Simple peripheral model to return an empty buffer. - - Parameters - ---------- - filename : str - File path of pseudofile/peripheral. - buffer : int - Pointer to buffer to write to. - length : int - Length of read. - offset : int - Offset into device being read from. - details : Any - Additional device context. - - Returns - ------- - tuple - """ - data = b"" - # XXX if offset > len(data) should we return an error instead of 0? - return (data, 0) # data, rv - - def read_const_buf(self, filename, buffer, length, offset, details=None): - """ - Simple peripheral model to return a constant buffer. - - Parameters - ---------- - filename : str - File path of pseudofile/peripheral. - buffer : int - Pointer to buffer to write to. - length : int - Length of read. - offset : int - Offset into device being read from. - details : Any - Additional device context, including buffer contents. - - Returns - ------- - tuple - """ - data = details["val"].encode() + b"\x00" # Null terminate? - final_data = data[offset: offset + length] - # XXX if offset > len(data) should we return an error instead of 0? - if offset > len(data): - return (b"", 0) # -EINVAL - - return (final_data, len(final_data)) # data, rv - - def _render_file(self, details): - """ - Combine data mapping and return buffer. - - Parameters - ---------- - details : Any - Device model details from config. - - Returns - ------- - bytestr - """ - # Given offset: data mapping plus a pad, we - # combine to return a buffer - pad = b"\x00" - if "pad" in details: - if isinstance(details["pad"], str): - pad = details["pad"].encode() - elif isinstance(details["pad"], int): - pad = bytes([details["pad"]]) - else: - raise ValueError("const_map: pad value must be string or int") - - size = details["size"] if "size" in details else 0x10000 - vals = details["vals"] - - # sort vals dict by key, lowest to highest - vals = { - k: v for k, - v in sorted( - vals.items(), - key=lambda item: item[0])} - - # now we flatten. For each offset, val pair - # Need to grab first offset, then pad to that - data = b"" # pad * (list(vals.keys())[0] if len(vals.keys()) else 0) - - for off, val in vals.items(): - # We have offset: value where value - # may be a string, a list of ints (for non-printable chars) - # or a list of strings to be joined by null terminators - - if isinstance(val, str): - val = val.encode() - - elif isinstance(val, list): - if not len(val): - continue # Wat? - - # All shoudl be same type. Could support a list of lists e.g., - # ["key=val", [0x41, 0x00, 0x42], ...]? - first_val = val[0] - for this_val in val[1:]: - if not isinstance(this_val, type(first_val)): - raise ValueError( - f"Need matching vals but we have {this_val} and {first_val}" - ) - - if isinstance(first_val, int): - # We have a list of ints - these are non-printable chars - val = bytes(val) - - elif isinstance(first_val, str): - # Join this list with null bytes - val = b"\x00".join([x.encode() for x in val]) - else: - raise ValueError( - "_render_file: vals must be strings lists of ints/strings" - ) - - # Pad before this value, then add the value - data += pad * (off - len(data)) + val - - # Finally pad up to size - assert len( - data) <= size, f"Data is too long: {len(data)} > size {size}" - data += pad * (size - len(data)) - return data - - def read_const_map(self, filename, buffer, length, offset, details=None): - """ - Read data and returns tuple of buffer and size. - - Parameters - ---------- - filename : str - File path of pseudofile/peripheral. - buffer : int - Pointer to buffer to write to. - length : int - Length of read. - offset : int - Offset into device file. - details : Any - Additional device context. - - Returns - ------- - tuple - """ - data = self._render_file(details) - final_data = data[offset: offset + length] - if offset > len(data): - return (b"", 0) # No data, no bytes read - - return (final_data, len(final_data)) # data, length - - def read_const_map_file(self, filename, buffer, - length, offset, details=None): - """ - Create a file on the host using the specified pad, size, vals - When we read from the guest, we read from the host file. - - Parameters - ---------- - filename : str - File path of device. - buffer : int - Pointer to buffer to write to. - length : int - Length of read. - offset : int - Offset into device. - details : Any - Additional device context. - - Returns - ------- - tuple - """ - # Create a file on the host using the specified pad, size, vals - # When we read from the guest, we read from the host file. - hostfile = details["filename"] - - if not isabs(hostfile): - # Paths are relative to the project directory, unless absolute - hostfile = pjoin(self.proj_dir, hostfile) - - # Create initial host file - if not isfile(hostfile): - data = self._render_file(details) - # Create initial file - with open(hostfile, "wb") as f: - f.write(data) - - # Read from host file - with open(hostfile, "rb") as f: - f.seek(offset) - final_data = f.read(length) - - return (final_data, len(final_data)) # data, length - - def read_from_file(self, filename, buffer, length, offset, details=None): - """ - Read from host file. - - Parameters - ---------- - filename : str - File path of pseudofile/peripheral. - buffer : int - Pointer to buffer to write to. - length : int - Length of read. - offset : int - Offset into device. - details : Any - Additional device context. - - Returns - ------- - tuple - """ - self.logger.debug( - f"Reading {filename} with {length} bytes at {offset}:") - fname = details["filename"] # Host file - - if not isabs(fname): - # Paths are relative to the project directory, unless absolute - fname = pjoin(self.proj_dir, fname) - - with open(fname, "rb") as f: - f.seek(offset) - data = f.read(length) - - return (data, len(data)) - - def write_to_file(self, filename, buffer, length, - offset, contents, details=None): - """ - Write to host file. - - Parameters - ---------- - filename : str - File path of pseudofile/peripheral. - buffer : int - Pointer to buffer to read from. - length : int - Length of read. - offset : int - Offset into device. - details : Any - Additional device context. - - Returns - ------- - tuple - """ - fname = details["filename"] # Host file - if not isabs(fname): - # Paths are relative to the project directory, unless absolute - fname = pjoin(self.proj_dir, fname) - self.logger.debug( - f"Writing {fname} with {length} bytes at {offset}: {contents[:100]}" - ) - - with open(fname, "r+b") as f: - f.seek(offset) - f.write(contents) - - return length - - def write_discard(self, filename, buffer, length, - offset, contents, details=None): - """ - TODO: make this actually discard - not sure where it's used right now - and default is a better model in general - - Parameters - ---------- - filename : str - File path of pseudofile/peripheral. - buffer : int - Pointer to buffer to write to. - length : int - Length of read. - offset : int - Offset into device. - details : Any - Additional device context. - - Returns - ------- - int - """ - return self.write_default( - filename, buffer, length, offset, contents, details) - - def write_default(self, filename, buffer, length, - offset, contents, details=None): - """ - Store the contents for this file - print(f"{filename} writes {length} bytes at {offset}: {contents[:100]}") - - Parameters - ---------- - filename : str - File path of pseudofile/peripheral. - buffer : int - Pointer to buffer to read from. - length : int - Length of read. - offset : int - Offset into device. - details : Any - Additional device context. - - Returns - ------- - int - """ - if filename not in self.written_data: - self.written_data[filename] = b"" - # Seek to offset and write contents - previous = self.written_data[filename][:offset] - if len(previous) < offset: - # Pad with null bytes - previous += b"\x00" * (offset - len(previous)) - self.written_data[filename] = ( - previous - + contents - + ( - self.written_data[filename][offset + length:] - if len(self.written_data[filename]) > offset + length - else b"" - ) - ) - return length - - # XXX on write we can allow and store by default. Or should we explicitly error and require a model? - # def write_default(self, filename, buffer, length, offset, contents, details=None): - # self.centralized_log(filename, 'write') - # return -22 # -EINVAL - we don't support writes - - # default models - log failures - def read_default(self, filename, buffer, length, offset, details=None): - self.centralized_log(filename, "read") - return (b"", -22) # -EINVAL - we don't support reads - - # IOCTL is more complicated than read/write. - # default is a bit of a misnomer, it's our default ioctl handler which - # implements default behavior (i.e., error) on issue of unspecified ioctls, - # but implements what it's told for others - def ioctl_default(self, filename, cmd, arg, ioctl_details): - """ - Given a cmd and arg, return a value. - - Parameters - ---------- - filename : str - Device path. - cmd : int - Ioctl command number. - arg : str - Optional pointer to buffer. - ioctl_details : dict - Dictionary structure is cmd -> {'model': 'return_const'|'symex'|'from_plugin', - 'val': X} - - Returns - ------- - int - """ - # Try to use cmd as our key, but '*' is a fallback - # is_wildcard = False - if cmd in ioctl_details: - cmd_details = ioctl_details[cmd] - elif "*" in ioctl_details: - cmd_details = ioctl_details["*"] - # is_wildcard = True - else: - self.log_ioctl_failure(filename, cmd) - return -25 # -ENOTTY - - model = cmd_details["model"] - - if model == "return_const": - rv = cmd_details["val"] - return rv - - elif model == "symex": - # Symex is tricky and different from normal models. - # First off, these models need to specify a 'val' just like any other - # for us to use after (and, to be honest, during) symex. - # JK: we're gonna always use 0 when doing symex! - - # if self.last_symex: - # We could be smart and encode info in our retval - # or do something else. I don't think we want to fully - # ignore? But we probably could? - # raise NotImplementedError("Uhhhh nested symex") - # self.last_symex = filename - # We'll detect this on the return and know what to do. I think? - return MAGIC_SYMEX_RETVAL - elif model == "from_plugin": - plugin_name = cmd_details["plugin"] - plugin = getattr(plugins, plugin_name) - func = cmd_details.get("function", "ioctl") - if hasattr(plugin, func): - fn = getattr(plugin, func) - else: - raise ValueError( - f"Hyperfile {filename} depends on plugin {plugin} which does not have function {func}") - return fn(filename, cmd, arg, cmd_details) - else: - # This is an actual error - config is malformed. Bail - raise ValueError(f"Unsupported ioctl model {model} for cmd {cmd}") - # return -25 # -ENOTTY - - def dump_results(self): - """ - Dump all file failures to disk as yaml. - - Returns - ------- - None - """ - with open(pjoin(self.outdir, outfile_missing), "w") as f: - out = sort_file_failures(self.file_failures) - yaml.dump(out, f, sort_keys=False) - - if hasattr(self, "symex"): - # Need to tell symex to export results as well - self.symex.save_results() - - def uninit(self): - """ - Uninitialize plugin and update logs for a final time. - - Returns - ------- - None - """ - if self.log_missing: - self.dump_results() diff --git a/pyplugins/testing/devfs_test.py b/pyplugins/testing/devfs_test.py new file mode 100644 index 000000000..458a65ead --- /dev/null +++ b/pyplugins/testing/devfs_test.py @@ -0,0 +1,43 @@ +from penguin import Plugin, plugins +from wrappers.ptregs_wrap import PtRegsWrapper +from hyperfile.models.base import DevFile +from hyperfile.models.read import ReadBufWrapper + +class SimpleDevfsDevice(DevFile, ReadBufWrapper): + PATH = "simpledev" + MAJOR = -1 # dynamic + MINOR = 0 + + def __init__(self): + self.value = 0 + + def open(self, ptregs: PtRegsWrapper, inode: int, file: int): + print("SimpleDevfsDevice.open called") + ptregs.set_retval(0) + + def read(self, ptregs: PtRegsWrapper, file: int, user_buf: int, size: int, loff: int): + """ + Returns the string representation of the stored integer value. + """ + data = str(self.value).encode("utf-8") + yield from self._impl_read(ptregs, file, user_buf, size, loff, data) + + def write(self, ptregs: PtRegsWrapper, file: int, user_buf: int, size: int, offset_ptr: int): + """ + Accepts an integer string and stores it. + """ + if size <= 0: + ptregs.set_retval(0) + return + raw = yield from plugins.mem.read_bytes(user_buf, size) + try: + val = int(raw.decode("utf-8").strip()) + self.value = val + ptregs.set_retval(size) + except Exception: + ptregs.set_retval(-1) + +class DevfsTest(Plugin): + def __init__(self): + plugins.devfs.register_devfs(SimpleDevfsDevice()) + plugins.devfs.register_devfs(SimpleDevfsDevice(), path="/dev/bcde") diff --git a/pyplugins/testing/procfs_test.py b/pyplugins/testing/procfs_test.py new file mode 100644 index 000000000..053151295 --- /dev/null +++ b/pyplugins/testing/procfs_test.py @@ -0,0 +1,158 @@ +from penguin import Plugin, plugins +from wrappers.ptregs_wrap import PtRegsWrapper +from hyperfile.models.base import ProcFile +from hyperfile.models.read import ReadConstBuf +from hyperfile.models.write import WriteDiscard +from hyperfile.models.ioctl import IoctlZero + + +class SimpleProcfsFile(ReadConstBuf, WriteDiscard, IoctlZero, ProcFile): + PATH = "s/i/m/p/l/e/simple_proc" # No /proc prefix + + def __init__(self): + super().__init__(buffer=b"Hello from simple_proc!\n") + + def open(self, ptregs: PtRegsWrapper, inode: int, file: int): + procname = yield from plugins.osi.get_proc_name() + print(f"SimpleProcfsFile.open called in {procname}") + ptregs.set_retval(0) + + def release(self, ptregs: PtRegsWrapper, inode: int, file: int): + procname = yield from plugins.osi.get_proc_name() + print(f"SimpleProcfsFile.release called in {procname}") + ptregs.set_retval(0) + +class CPUinfoFile(ReadConstBuf, ProcFile): + PATH = "/proc/cpuinfo" # No /proc prefix + def __init__(self): + data = b"processor : IGLOO\n" + super().__init__(buffer=data) + +class PenguinNet(ReadConstBuf, ProcFile): + PATH = "/proc/penguin_net" # No /proc prefix + def __init__(self, config): + netdev_val = " ".join(config["netdevs"]) + super().__init__(buffer=netdev_val) + +class ProcMtd(ReadConstBuf, ProcFile): + PATH = "/proc/mtd" # No /proc prefix + + def __init__(self, config): + # Generate the /proc/mtd buffer based on config["pseudofiles"] + buf = self._generate_mtd_buf(config) + super().__init__(buffer=buf) + + def _generate_mtd_buf(self, config): + """ + Generate the /proc/mtd contents based on /dev/mtd* pseudofiles in config. + """ + buf = "" + did_warn = False + for filename, details in config.get("pseudofiles", {}).items(): + if not filename.startswith("/dev/mtd"): + continue + + idx = filename.split("/dev/mtd")[1] + if idx.startswith("/"): + idx = idx[1:] + + if not idx.isdigit(): + # Optionally log a warning here if needed + continue + + if "name" not in details: + # Optionally log a warning here if needed + continue + + buf += 'mtd{}: {:08x} {:08x} "{}"\n'.format( + int(idx), 0x1000000, 0x20000, details["name"] + ) + return buf + +class ModulesFile(ReadConstBuf, ProcFile): + PATH = "/proc/modules" # No /proc prefix + + def __init__(self, modules: dict = None): + self.modules = modules if modules is not None else {} + formatted = self._format_proc_modules(self.modules) + data = formatted.encode("utf-8") if formatted else b"" + super().__init__(buffer=data) + + def _format_proc_modules(self, modules: dict) -> str: + """ + Format a dictionary of modules into /proc/modules output. + + modules: dict mapping module_name -> attribute dict. + + Valid attribute keys (all optional): + init_size: int + core_size: int + refcount: int or "-" + deps: list[str] + state: "LIVE", "COMING", "GOING", "UNFORMED" + base_addr: int + taints: str (e.g., "O", "P", "OE") + """ + + results = [] + + # Kernel state → printed string + state_map = { + "GOING": "Unloading", + "COMING": "Loading", + "LIVE": "Live", + } + + for name, attr in modules.items(): + # ----- Defaults ----- + init_size = attr.get("init_size", 0) + core_size = attr.get("core_size", 0) + total_size = init_size + core_size + + refcount = attr.get("refcount", "-") + deps_list = attr.get("deps", []) + state = attr.get("state", "LIVE") + base_addr = attr.get("base_addr", 0) + taints = attr.get("taints", "") + + # Skip unformed modules like the kernel + if state == "UNFORMED": + continue + + # ----- Format fields ----- + line = f"{name} {total_size}" + + # refcount + if refcount is None: + refcount = "-" + line += f" {refcount}" + + # dependencies + deps = ",".join(deps_list) if deps_list else "-" + line += f" {deps}" + + # state text + state_str = state_map.get(state, "Live") + line += f" {state_str}" + + # address (hex) + line += f" 0x{base_addr:x}" + + # taints (e.g. "(O)" or "(OE)") + if taints: + line += f" ({taints})" + + results.append(line) + + # kernel prints nothing if no modules produce output + return "\n".join(results) if results else "" + + + +class ProcTest(Plugin): + def __init__(self): + plugins.procfs.register_proc(SimpleProcfsFile()) + plugins.procfs.register_proc(CPUinfoFile()) + plugins.procfs.register_proc(ModulesFile()) + plugins.procfs.register_proc(PenguinNet(self.get_arg("conf"))) + plugins.procfs.register_proc(ProcMtd(self.get_arg("conf"))) diff --git a/pyplugins/testing/sysfs_test.py b/pyplugins/testing/sysfs_test.py new file mode 100644 index 000000000..a67e2880b --- /dev/null +++ b/pyplugins/testing/sysfs_test.py @@ -0,0 +1,90 @@ +from penguin import Plugin, plugins +from wrappers.ptregs_wrap import PtRegsWrapper +from hyperfile.models.base import SysFile + +class SimpleSysfsFile(SysFile): + PATH = "/sys/kernel/simple_sysfs/value" # No /sys prefix + + def __init__(self): + self.value = 0 + + def show(self, ptregs: PtRegsWrapper, kobj, attr, buf): + """ + Write the current value as a string to buf. + """ + data = f"{self.value}\n".encode("utf-8") + yield from plugins.mem.write_bytes(buf, data) + ptregs.set_retval(len(data)) + + def store(self, ptregs: PtRegsWrapper, kobj, attr, buf, count): + """ + Read an integer from buf and store it. + """ + if count <= 0: + ptregs.set_retval(0) + return + raw = yield from plugins.mem.read_bytes(buf, count) + try: + val = int(raw.decode("utf-8").strip()) + self.value = val + ptregs.set_retval(count) + except Exception: + ptregs.set_retval(-1) + +class RandomSysfsFile(SysFile): + PATH = "/sys/kernel/simple_sysfs/random" # No /sys prefix + + def show(self, ptregs: PtRegsWrapper, kobj, attr, buf): + """ + Write the current value as a string to buf. + """ + import random + data = f"{random.randint(0, 0xffffffff)}\n".encode("utf-8") + yield from plugins.mem.write_bytes(buf, data) + ptregs.set_retval(len(data)) + + def store(self, ptregs: PtRegsWrapper, kobj, attr, buf, count): + ptregs.set_reval(count) + +class PowerStateSyfsFile(SysFile): + PATH = "/sys/power/state" # No /sys prefix + + def show(self, ptregs: PtRegsWrapper, kobj, attr, buf): + """ + Write the current power state as a string to buf. + """ + data = "mem\n" + data_bytes = data.encode("utf-8") + yield from plugins.mem.write_bytes(buf, data_bytes) + ptregs.set_retval(len(data_bytes)) + + def store(self, ptregs: PtRegsWrapper, kobj, attr, buf, count): + """ + Accepts any write, discards data, and returns the number of bytes written. + """ + ptregs.set_retval(count) + +class BaseStateSyfsFile(SysFile): + PATH = "/sys/s/t/a/state" # No /sys prefix + + def show(self, ptregs: PtRegsWrapper, kobj, attr, buf): + """ + Write the current power state as a string to buf. + """ + data = "mem\n" + data_bytes = data.encode("utf-8") + yield from plugins.mem.write_bytes(buf, data_bytes) + ptregs.set_retval(len(data_bytes)) + + def store(self, ptregs: PtRegsWrapper, kobj, attr, buf, count): + """ + Accepts any write, discards data, and returns the number of bytes written. + """ + ptregs.set_retval(count) + +class SysfsTest(Plugin): + def __init__(self): + plugins.sysfs.register_sysfs(SimpleSysfsFile()) + plugins.sysfs.register_sysfs(RandomSysfsFile()) + plugins.sysfs.register_sysfs(PowerStateSyfsFile()) + plugins.sysfs.register_sysfs(BaseStateSyfsFile()) diff --git a/pyplugins/testing/verifier.py b/pyplugins/testing/verifier.py index 2387d75f0..19faad140 100644 --- a/pyplugins/testing/verifier.py +++ b/pyplugins/testing/verifier.py @@ -13,6 +13,11 @@ from sqlalchemy import create_engine from pengutils.utils.cli_db import syscall_filter +GREEN = "\x1b[32m" +RED = "\x1b[31m" +END = "\x1b[0m" +FAILED = f"{RED}failed{END}" + class Verifier(Plugin): def __init__(self): @@ -218,11 +223,6 @@ def uninit(self): test_cases, results = self.check_test_cases() for tc in test_cases: - GREEN = "\x1b[32m" - RED = "\x1b[31m" - END = "\x1b[0m" - PASSED = f"{GREEN}passed{END}" - FAILED = f"{RED}failed{END}" test_passed = results[tc.name] if not test_passed: self.logger.info( @@ -237,4 +237,6 @@ def uninit(self): f"Verified output written to {join(self.outdir, 'verifier.xml')}") if all(results.values()): - self.logger.info("Verifier: ALL tests passed") + self.logger.info(f"{GREEN}Verifier: ALL tests passed{END}") + else: + self.logger.info(f"{RED}Verifier: Some tests failed{END}") \ No newline at end of file diff --git a/src/resources/source.d/10_setup_mounts.sh b/src/resources/source.d/10_setup_mounts.sh index 069c23b5f..dd4fa176c 100644 --- a/src/resources/source.d/10_setup_mounts.sh +++ b/src/resources/source.d/10_setup_mounts.sh @@ -1,20 +1,21 @@ # Mount sysfs, procfs, and devfs in /igloo/pfs/real -for f in sys proc dev; do - /igloo/utils/busybox mkdir -p /igloo/pfs/real/$f -done -/igloo/utils/busybox mount -t sysfs sysfs /igloo/pfs/real/sys -/igloo/utils/busybox mount -t proc proc /igloo/pfs/real/proc -/igloo/utils/busybox mount -t devtmpfs devtmpfs /igloo/pfs/real/dev +# for f in sys proc dev; do +# /igloo/utils/busybox mkdir -p /igloo/pfs/real/$f +# done +/igloo/utils/busybox mkdir -p /sys /dev/ /proc +/igloo/utils/busybox mount -t sysfs sysfs /sys +/igloo/utils/busybox mount -t proc proc /proc +/igloo/utils/busybox mount -t devtmpfs devtmpfs /dev -# Make hyperfs in /igloo/pfs/fake -/igloo/utils/busybox mkdir -p /igloo/pfs/fake -/igloo/utils/busybox rm -rf /dev # Remove /dev provided by firmware -/igloo/utils/busybox ln -s /igloo/pfs/real/dev /dev # Temp /dev symlink for FUSE -/igloo/utils/busybox mount -t hyperfs hyperfs /igloo/pfs/fake -o passthrough_path=/igloo/pfs/real -/igloo/utils/busybox rm /dev +# # Make hyperfs in /igloo/pfs/fake +# /igloo/utils/busybox mkdir -p /igloo/pfs/fake +# /igloo/utils/busybox rm -rf /dev # Remove /dev provided by firmware +# /igloo/utils/busybox ln -s /igloo/pfs/real/dev /dev # Temp /dev symlink for FUSE +# /igloo/utils/busybox mount -t hyperfs hyperfs /igloo/pfs/fake -o passthrough_path=/igloo/pfs/real +# /igloo/utils/busybox rm /dev -# Bind /sys,/proc,/dev to fake dirs -for f in sys proc dev; do - /igloo/utils/busybox mkdir -p /$f - /igloo/utils/busybox mount --bind /igloo/pfs/fake/$f /$f -done +# # Bind /sys,/proc,/dev to fake dirs +# for f in sys proc dev; do +# /igloo/utils/busybox mkdir -p /$f +# /igloo/utils/busybox mount --bind /igloo/pfs/fake/$f /$f +# done diff --git a/src/resources/source.d/11_coredump_filter.sh b/src/resources/source.d/11_coredump_filter.sh index 2b34d05f6..08ffdfded 100644 --- a/src/resources/source.d/11_coredump_filter.sh +++ b/src/resources/source.d/11_coredump_filter.sh @@ -1 +1 @@ -echo 0xFFFFFFFF > /igloo/pfs/real/proc/1/coredump_filter \ No newline at end of file +echo 0xFFFFFFFF > /proc/1/coredump_filter \ No newline at end of file diff --git a/src/resources/source.d/40_mount_shared_dir.sh b/src/resources/source.d/40_mount_shared_dir.sh index de4a757f5..8a00c060f 100644 --- a/src/resources/source.d/40_mount_shared_dir.sh +++ b/src/resources/source.d/40_mount_shared_dir.sh @@ -12,9 +12,9 @@ if [ ! -z "${SHARED_DIR}" ]; then /igloo/utils/busybox chmod -R 1777 /igloo/shared/core_dumps # Make sure the underlying file is overwritten and not a hyperfs pseudofile at that path. # One might want to make `/proc/sys/kernel/core_pattern` a pseudofile to prevent the guest from overwriting it. - /igloo/utils/busybox echo '/igloo/shared/core_dumps/core_%e.%p' > /igloo/pfs/real/proc/sys/kernel/core_pattern + /igloo/utils/busybox echo '/igloo/shared/core_dumps/core_%e.%p' > /proc/sys/kernel/core_pattern # 2 all processes dump core when possible. The core dump is owned by the current user and no security is applied. This is intended for system debugging situations only. Ptrace is unchecked. This is insecure as it allows regular users to examine the memory contents of privileged processes. # https://sysctl-explorer.net/fs/suid_dumpable/ - /igloo/utils/busybox echo 2 > /igloo/pfs/real/proc/sys/fs/suid_dumpable + /igloo/utils/busybox echo 2 > /proc/sys/fs/suid_dumpable ulimit -c unlimited fi