diff --git a/Dockerfile b/Dockerfile index 3cb93bf9e..2adcf90f2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -271,6 +271,7 @@ RUN --mount=type=cache,target=/root/.cache/pip \ zstandard \ pdoc \ numpy \ + dwarffi \ ratarmountcore[full] FROM python_builder AS version_generator diff --git a/pyplugins/apis/kffi.py b/pyplugins/apis/kffi.py index 8f9d740ed..83ba68d50 100644 --- a/pyplugins/apis/kffi.py +++ b/pyplugins/apis/kffi.py @@ -33,12 +33,13 @@ yield from plugins.kffi.write_kernel_memory(0xffff888000000000, b"\x90\x90\x90\x90") """ -import functools import inspect from os.path import isfile, join, realpath from typing import Any, Generator, Iterator, Optional, Tuple, Union -from wrappers.ctypes_wrap import Ptr, VtypeJsonGroup +from dwarffi.instances import Ptr +from dwarffi.dffi import DFFI + from wrappers.generic import Wrapper from wrappers.ptregs_wrap import get_pt_regs_wrapper @@ -77,11 +78,10 @@ def __init__(self) -> None: self.logger.debug(f"Loading ISF file: {self.isf}") self.igloo_ko_isf = realpath(join(kernel, f"../igloo.ko.{arch}.json.xz")) - self.ffi = VtypeJsonGroup([self.igloo_ko_isf, self.isf]) + self.ffi = DFFI([self.igloo_ko_isf, self.isf]) self._tramp_callbacks = {} self._tramp_addresses = {} self.tramp_init = False - self._type_cache = {} def __init_tramp_functionality(self): if self.tramp_init: @@ -97,12 +97,6 @@ def __init_tramp_functionality(self): self.portal.register_interrupt_handler( "kffi", self._tramp_interrupt_handler) - def _get_type(self, type_name: str) -> Any: - if type_name in self._type_cache: - return self._type_cache[type_name] - self._type_cache[type_name] = self.ffi.get_type(type_name) - return self._type_cache[type_name] - def new(self, type_: str) -> Any: """ Create a new instance of a type. @@ -113,15 +107,12 @@ def new(self, type_: str) -> Any: Returns: Any: Instance of the type, or None if type not found. """ - t = self._get_type(type_) - if not t: + try: + return self.ffi.new(type_) + except (KeyError, ValueError): return None - size = t.size - buf = b"\x00" * size - return self.ffi.create_instance(t, buf) - def from_buffer(self, type_: str, buf: bytes, - instance_offset_in_buffer: int = 0) -> Any: + def from_buffer(self, type_: str, buf: bytes, instance_offset_in_buffer: int = 0) -> Any: """ Create an instance of a type from a buffer. @@ -133,8 +124,9 @@ def from_buffer(self, type_: str, buf: bytes, Returns: Any: Instance of the type. """ - t = self._get_type(type_) - return self.ffi.create_instance(t, buf, instance_offset_in_buffer) + """Create an instance of a type from a buffer.""" + # Ensure we pass a bytearray to dwarffi + return self.ffi.from_buffer(type_, bytearray(buf), offset=instance_offset_in_buffer) def get_field_casted(self, struct: Any, field: str) -> Any: """ @@ -147,7 +139,7 @@ def get_field_casted(self, struct: Any, field: str) -> Any: Any: Field value, or None if error occurs. """ try: - return self.panda.ffi.cast(struct._instance_type_def.fields[field].type_info["name"], getattr(struct, field)) + return self.ffi.cast(struct._instance_type_def.fields[field].type_info["name"], getattr(struct, field)) except Exception as e: self.logger.error(f"Error casting field {field} of struct {struct}: {e}") return None @@ -164,14 +156,14 @@ def read_type_panda(self, cpu: Any, addr: int, type_: str) -> Any: Returns: Any: Instance of the type, or None if read fails. """ - t = self._get_type(type_) - if not t: + size = self.ffi.sizeof(type_) + if not size: return None - buf = plugins.mem.read_bytes_panda(cpu, addr, t.size) + buf = plugins.mem.read_bytes_panda(cpu, addr, size) if not buf: self.logger.error(f"Failed to read bytes from {addr:#x}") return None - return self.ffi.create_instance(t, buf) + return self.ffi.from_buffer(type_, bytearray(buf), address=addr) def read_type(self, addr: int, type_: str) -> Generator[Any, Any, Any]: """ @@ -184,18 +176,16 @@ def read_type(self, addr: int, type_: str) -> Generator[Any, Any, Any]: Returns: Any: Instance of the type, or None if read fails. """ - t = self._get_type(type_) - if not t: + size = self.ffi.sizeof(type_) + if not size: return None if isinstance(addr, Ptr): addr = addr.address - buf = yield from plugins.mem.read_bytes(addr, t.size) + buf = yield from plugins.mem.read_bytes(addr, size) if not buf: self.logger.error(f"Failed to read bytes from {addr:#x}") return None - instance = self.ffi.create_instance(t, buf) - if not hasattr(instance, "_address"): - setattr(instance, "_address", addr) + instance = self.ffi.from_buffer(type_, buf, address=addr) return instance def deref(self, ptr: Ptr) -> Generator[Any, Any, Any]: @@ -211,7 +201,7 @@ def deref(self, ptr: Ptr) -> Generator[Any, Any, Any]: if ptr.address == 0: self.logger.error(f"Pointer address is 0: {ptr}") return None - val = yield from self.read_type(ptr.address, ptr._subtype_info.get("name")) + val = yield from self.read_type(ptr.address, ptr.points_to_type_name) return val def ref(self, thing: Any) -> Optional[int]: @@ -224,11 +214,7 @@ def ref(self, thing: Any) -> Optional[int]: Returns: int: The address, or None if no address attribute. """ - if hasattr(thing, "address"): - return thing.address - if hasattr(thing, "_address"): - return thing._address - return None + return self.ffi.addressof(thing) def get_enum_dict(self, enum_name: str) -> Wrapper: """ @@ -240,13 +226,12 @@ def get_enum_dict(self, enum_name: str) -> Wrapper: Returns: Wrapper: Wrapper containing enum constants. """ - enum = self.ffi.get_enum(enum_name) - if not enum: + enum = self.ffi.get_type(enum_name) + if not enum or not hasattr(enum, "constants"): self.logger.error(f"Enum {enum_name} not found in ISF") return {} return Wrapper(enum.constants) - @functools.lru_cache def get_struct_size(self, struct_name: str) -> Optional[int]: """ Get the size of a struct. @@ -257,9 +242,10 @@ def get_struct_size(self, struct_name: str) -> Optional[int]: Returns: Optional[int]: Size of the struct, or None if not found. """ - struct = self.ffi.get_user_type(struct_name) - if struct: - return struct.size + try: + return self.ffi.sizeof(struct_name) + except (KeyError, ValueError): + return None def sizeof(self, struct_name: str) -> Optional[int]: """ @@ -283,9 +269,13 @@ def get_function_address(self, function: str) -> Optional[int]: Returns: Optional[int]: Address of the function, or None if not found. """ - sym = self.ffi.get_symbol(function) - if sym: + sym = self.ffi.get_symbol(function, path=self.igloo_ko_isf) + if sym and hasattr(sym, "address") and sym.address not in [None, 0]: + return sym.address + sym = self.ffi.get_symbol(function, path=self.isf) + if sym and hasattr(sym, "address") and sym.address not in [None, 0]: return sym.address + return None def _fixup_igloo_module_baseaddr(self, addr): self.ffi.vtypejsons[self.igloo_ko_isf].shift_symbol_addresses(addr) @@ -326,7 +316,7 @@ def _prepare_ffi_call(self, func_ptr: int, args: list, func_name: str = None) -> kind = param_type.get("kind") # Unsigned base type: convert negative ints if kind == "base": - base_type = self.ffi.get_base_type(param_type.get("name")) + base_type = self.ffi.get_type(param_type.get("name")) if base_type and base_type.signed is False and isinstance(arg, int) and arg < 0: arg = arg % (1 << (base_type.size * 8)) # Pointer: allow int or Ptr @@ -364,15 +354,15 @@ def _prepare_ffi_call(self, func_ptr: int, args: list, func_name: str = None) -> total_bytes += len(b) elif type(arg).__name__ == 'BoundTypeInstance': if not hasattr(arg, 'address'): - to_write = arg.to_bytes() + to_write = bytes(self.ffi.buffer(arg)) raw_addr = yield from self.kmalloc(len(to_write) + 64) if not raw_addr: raise RuntimeError("Failed to allocate kernel memory for BoundTypeInstance") aligned_addr = (raw_addr + 63) & ~63 yield from plugins.mem.write_bytes(aligned_addr, to_write) boundtype_ptrs[i] = aligned_addr - elif hasattr(arg, 'to_bytes') and hasattr(arg, 'size'): - b = arg.to_bytes() + elif hasattr(arg, '__bytes__'): + b = bytes(arg) arg_bytes.append(b) arg_ptr_indices.append((i, total_bytes, len(b))) total_bytes += len(b) @@ -405,7 +395,9 @@ def _prepare_ffi_call(self, func_ptr: int, args: list, func_name: str = None) -> ffi_call.args[i] = boundtype_ptrs[i] else: raise TypeError(f"Unsupported argument type for FFI: {type(arg)}") - return ffi_call.to_bytes(), kmem_addr, func_typeinfo + + # ZERO-COPY UPGRADE: Extract the FFI portal structure dynamically via memoryview slice + return bytes(self.ffi.buffer(ffi_call)), kmem_addr, func_typeinfo def call_kernel_function( self, func: Union[int, str], *args: Any) -> Generator[Any, Any, Any]: @@ -462,24 +454,26 @@ def call_kernel_function( kind = ret_type.get("kind") name = ret_type.get("name") if kind == "base": - base_type = self.ffi.get_base_type(name) + base_type = self.ffi.get_type(name) if base_type: # Unsigned fixup if base_type.signed is False and result < 0: result = result % (1 << (base_type.size * 8)) # Convert to correct Python type - if base_type.kind == "int" or base_type.kind == "pointer": + if base_type.kind in ("int", "pointer"): result = int(result) elif base_type.kind == "float": result = float(result) elif base_type.kind == "bool": result = bool(result) elif kind == "enum": - enum_type = self.ffi.get_enum(name) - if enum_type: - result = enum_type and enum_type.base and self.ffi.get_base_type(enum_type.base) - if enum_type: - result = self.ffi.create_instance(enum_type, result.to_bytes(enum_type.size, 'little'))._value + try: + enum_size = self.ffi.sizeof(name) + # Bind bytearray and extract the bound EnumInstance underlying value dynamically + buf = bytearray(result.to_bytes(enum_size, 'little')) + result = self.ffi.from_buffer(name, buf)._value + except (KeyError, ValueError): + pass elif kind == "pointer": # Return a Ptr object ptr_type = ret_type.get("subtype") @@ -489,8 +483,6 @@ def call_kernel_function( struct_type = name if result != 0: val = yield from self.read_type(result, struct_type) - if not hasattr(val, "_address"): - setattr(val, "_address", result) result = val else: result = None diff --git a/pyplugins/apis/syscalls.py b/pyplugins/apis/syscalls.py index 5dd4a86b4..820d379d3 100644 --- a/pyplugins/apis/syscalls.py +++ b/pyplugins/apis/syscalls.py @@ -22,6 +22,39 @@ ] +class SyscallEvent: + __slots__ = ('_sce', 'name') + + def __init__(self, sce): + self._sce = sce + + # 1. Fast C-string extraction natively supported by dwarffi. + # Calling bytes() on the array field does a direct buffer slice + # (equivalent to your old start:start+size math) without the boilerplate. + raw_bytes = bytes(sce.syscall_name) + + # 2. Parse the C-string + self.name = raw_bytes.split(b'\x00', 1)[0].decode('utf-8', errors='replace') + + def __getattr__(self, attr): + # 3. Transparently pass through any standard field accesses + # (e.g., event.orig_x0) to the underlying dwarffi instance. + return getattr(self._sce, attr) + + def __setattr__(self, attr, value): + # If the attribute belongs to the wrapper, set it locally + if attr in self.__slots__: + object.__setattr__(self, attr, value) + else: + # Otherwise, forward the write to the underlying dwarffi instance + setattr(self._sce, attr, value) + + @property + def size(self) -> int: + # Expose the size from the underlying dwarffi BoundTypeInstance + return self._sce._instance_type_def.size + + class ValueFilter: """ ValueFilter @@ -499,55 +532,6 @@ def _syscall_interrupt_handler(self) -> bool: if func_name: self._name_to_hook_ptrs[func_name].append(hook_ptr) - ''' - On repeated calls to the same syscall in portal we produce new - syscall_event objects. However, it doesn't update the version of the object - that the function has. This means that when it sets values we have a - different object and can fail. - - So we keep the original syscall_event object in a dictionary and check if the - sequence number is the same. If it is we use the original object. - ''' - - def _get_syscall_event(self, cpu: int, arg: int) -> Any: - """ - Retrieve the syscall event object from the guest. - - Parameters - ---------- - cpu : int - CPU id. - arg : int - Argument pointer. - - Returns - ------- - Any - The syscall event object. - """ - sce = plugins.kffi.read_type_panda(cpu, arg, "syscall_event") - - # 1. Get field metadata - field_def = sce._instance_type_def.fields["syscall_name"] - - # 2. Calculate absolute start and end positions in the buffer - start = sce._instance_offset + field_def.offset - - # We need the size of the array to slice correctly. - # The accessor helper can calculate this from the type info. - size = sce._instance_vtype_accessor.get_type_size(field_def.type_info) - - # 3. Slice the buffer directly (Fast) - # We access the protected _instance_buffer directly for speed - name_bytes = sce._instance_buffer[start: start + size] - - # 4. Parse the C-string (Fast) - # Split at the first null byte, take the left side, and decode - sce.name = name_bytes.split(b'\x00', 1)[0].decode( - 'utf-8', errors='replace') - - return sce - def _get_proto(self, cpu: int, sce: Any, on_all: bool) -> SyscallPrototype: """ Get the syscall prototype for a given event. @@ -605,19 +589,8 @@ def _get_proto(self, cpu: int, sce: Any, on_all: bool) -> SyscallPrototype: # If still not found, create a new generic prototype with unknown args if not proto: - # Generate generic argument names (unknown1, unknown2, etc.) - generic_args = [] - for i in range(6): # Most syscalls have 6 or fewer args - arg_name = f"unknown{i+1}" - generic_args.append(("int", arg_name)) - - # Create new prototype with the appropriate name and generic args - proto = SyscallPrototype( - name=f'sys_{cleaned_name}', - args=generic_args - ) - - # Add to our table for future lookups + generic_args = [("int", f"unknown{i+1}") for i in range(6)] + proto = SyscallPrototype(name=f'sys_{cleaned_name}', args=generic_args) self._syscall_info_table[cleaned_name] = proto self.logger.error( f"Syscall {name} not registered {cleaned_name=}, created generic prototype with {len(generic_args)} args") @@ -722,7 +695,7 @@ def _syscall_event(self, cpu: int, is_enter: Optional[bool] = None) -> Any: arg = self.panda.arch.get_arg(cpu, 1, convention="syscall") # 1. Get Event Object (No bytes serialization yet) - sce = self._get_syscall_event(cpu, arg) + sce = SyscallEvent(plugins.kffi.read_type_panda(cpu, arg, "syscall_event")) hook_ptr = sce.hook.address if hook_ptr not in self._hooks: return @@ -730,15 +703,12 @@ def _syscall_event(self, cpu: int, is_enter: Optional[bool] = None) -> Any: # 2. Unpack Hook Data including read_only flag # _hooks now stores (on_all, handle, is_method, read_only, original_func) entry = self._hooks[hook_ptr] - on_all = entry[0] - f = entry[1] - is_method = entry[2] - read_only = entry[3] + on_all, f, is_method, read_only = entry[0], entry[1], entry[2], entry[3] - # 3. Calculate 'original' bytes ONLY if we might write back (not read_only) + # 1. ZERO-COPY UPGRADE original = None if not read_only: - original = sce.to_bytes() + original = bytes(plugins.kffi.ffi.buffer(sce)) # 4. Get Prototype (Optimized with Caching) proto = self._get_proto(cpu, sce, on_all) @@ -768,7 +738,7 @@ def _syscall_event(self, cpu: int, is_enter: Optional[bool] = None) -> Any: # 5. Write Back (Skipped if read_only) if not read_only: - new = sce.to_bytes() + new = bytes(plugins.kffi.ffi.buffer(sce)) if original != new: yield from plugins.mem.write_bytes(arg, new) @@ -817,11 +787,8 @@ def _register_syscall_hook( sch.comm_filter_enabled = False # Handle complex argument filtering - arg_filters = hook_config.get("arg_filters", []) - if arg_filters is None: - arg_filters = [] - - for i in range(6): # IGLOO_SYSCALL_MAXARGS + arg_filters = hook_config.get("arg_filters", []) or [] + for i in range(6): # IGLOO_SYSCALL_MAXARGS if i < len(arg_filters) and arg_filters[i] is not None: arg_filter = arg_filters[i] if arg_filter.__class__.__name__ == 'ValueFilter': @@ -882,8 +849,8 @@ def _register_syscall_hook( sch.retval_filter.max_value = 0 sch.retval_filter.bitmask = 0 - as_bytes = sch.to_bytes() - + # 3. ZERO-COPY UPGRADE + as_bytes = bytes(plugins.kffi.ffi.buffer(sch)) # Send to kernel via portal result = yield PortalCmd(hop.HYPER_OP_REGISTER_SYSCALL_HOOK, size=len(as_bytes), data=as_bytes) self._hook_info[result] = hook_config diff --git a/pyplugins/apis/uprobes.py b/pyplugins/apis/uprobes.py index 6781618b5..1ba4a8917 100644 --- a/pyplugins/apis/uprobes.py +++ b/pyplugins/apis/uprobes.py @@ -186,7 +186,7 @@ def _uprobe_event(self, cpu: Any, is_enter: bool) -> Any: original_bytes = None if not read_only: - original_bytes = pt_regs.to_bytes()[:] + original_bytes = bytes(plugins.kffi.ffi.buffer(pt_regs_raw)) fn_to_call = f if not is_method else self._resolve_callback( f, is_method, hook_id) @@ -216,7 +216,7 @@ def _uprobe_event(self, cpu: Any, is_enter: bool) -> Any: return if not read_only: - new_bytes = pt_regs.to_bytes() + new_bytes = bytes(plugins.kffi.ffi.buffer(pt_regs_raw)) if original_bytes != new_bytes: plugins.mem.write_bytes_panda(cpu, ptregs_addr, new_bytes) return fn_ret @@ -343,7 +343,7 @@ def _register_uprobe(self, config: Dict[str, Any]) -> Iterator[Optional[int]]: else: reg.comm[0] = 0 - reg_bytes = reg.to_bytes() + reg_bytes = bytes(plugins.kffi.ffi.buffer(reg)) result = yield PortalCmd(hop.HYPER_OP_REGISTER_UPROBE, offset, len(reg_bytes), None, reg_bytes) if result is None: diff --git a/pyplugins/testing/ioctl_interaction_test.py b/pyplugins/testing/ioctl_interaction_test.py index 75df764f2..0755dbe67 100644 --- a/pyplugins/testing/ioctl_interaction_test.py +++ b/pyplugins/testing/ioctl_interaction_test.py @@ -81,8 +81,7 @@ def siocdevprivate(self, regs, proto, syscall, fd, op, arg): @plugins.syscalls.syscall("on_sys_ioctl_return", arg_filters=[None, 0x89f1]) def ioctl_ret(self, regs, proto, syscall, fd, op, arg): ifreq = yield from plugins.kffi.read_type(arg, "ifreq") - interface = bytes(ifreq.ifr_ifrn.ifrn_name).decode( - "latin-1").rstrip("\x00") + interface = plugins.kffi.ffi.string(ifreq.ifr_ifrn.ifrn_name).decode("latin-1") self.logger.info(f"Interface: {interface}") esw_reg_ptr = ifreq.ifr_ifru.ifru_data.address off = yield from plugins.mem.read_int(esw_reg_ptr) diff --git a/pyplugins/testing/kffi_test.py b/pyplugins/testing/kffi_test.py index a455e1cab..c4b7626e6 100644 --- a/pyplugins/testing/kffi_test.py +++ b/pyplugins/testing/kffi_test.py @@ -70,11 +70,12 @@ def test_kffi(self, regs, proto, syscall, fd, op, arg): exe_file_ptr = yield from plugins.kffi.call_kernel_function("get_task_exe_file", task) exe_file = yield from plugins.kffi.read_type(exe_file_ptr, "file") buf = yield from plugins.kffi.kmalloc(64) - f_path_ptr = plugins.kffi.ref(exe_file)+exe_file.f_path.offset + f_path_ptr = plugins.kffi.ffi.addressof(exe_file, "f_path").address + # Testing types with a known result inode = yield from plugins.kffi.read_type(exe_file.f_inode, "inode") i_opflags = plugins.kffi.get_field_casted(inode, "i_opflags") - assert self.panda.ffi.typeof("unsigned short") == self.panda.ffi.typeof(i_opflags) + assert 'short unsigned int' == plugins.kffi.ffi.typeof(i_opflags).name # char *d_path(const struct path *path, char *buf, int buflen) path = yield from plugins.kffi.call_kernel_function("d_path", f_path_ptr, buf, 64) exe_path = yield from plugins.mem.read_str(path) diff --git a/pyplugins/wrappers/ctypes_wrap.py b/pyplugins/wrappers/ctypes_wrap.py deleted file mode 100644 index 16aac86c7..000000000 --- a/pyplugins/wrappers/ctypes_wrap.py +++ /dev/null @@ -1,1385 +0,0 @@ -""" -DWARF2JSON ISF Loader and Type Binding Utilities -================================================ - -This module provides classes and functions to parse the JSON output -from the dwarf2json tool (https://github.com/volatilityfoundation/dwarf2json), -specifically its Intermediate Symbol File (ISF) format. - -Key Features: -------------- - -- Fast parsing with ujson if available -- Lazy loading and caching of types, enums, and symbols -- .json.xz compressed file support -- Memory-efficient type objects using __slots__ -- Symbol lookup by address -- Flexible type input for create_instance (including base/enum types) -- Field and value writing, including array element assignment -- to_bytes() method for serializing instances -- Generic get_type() method for resolving type names - -How to Use: ------------ - -1. Load an ISF file (JSON or JSON.XZ): - - from wrappers.ctypes_wrap import load_isf_json - isf = load_isf_json("vmlinux.isf.json.xz") - -2. Access type definitions: - - my_struct_def = isf.get_user_type("my_struct") - int_def = isf.get_base_type("int") - enum_def = isf.get_enum("my_enum") - -3. Create an instance of a type bound to a buffer: - - buf = bytearray(my_struct_def.size) - instance = isf.create_instance("my_struct", buf) - print(instance.field1) - instance.field1 = 42 - print(instance.to_bytes()) - -4. Work with arrays and enums: - - arr = instance.array_field - arr[0] = 123 - print(arr[0]) - print(instance.enum_field.name) - -5. Lookup symbols by address: - - syms = isf.get_symbols_by_address(0xffffffffdeadbeef) - for sym in syms: - print(sym.name, sym.address) - -6. Create and manipulate base/enum type instances: - - int_buf = bytearray(int_def.size) - int_instance = isf.create_instance("int", int_buf) - int_instance._value = 123 - print(int_instance._value) - -Command Line Usage: -------------------- - - python -m wrappers.ctypes_wrap vmlinux.isf.json.xz [options] - -Options: - --find-symbol-at ADDRESS Find symbols at a given address - --get-type TYPE_NAME Test the generic get_type method - --test-write Test writing to struct fields - --test-to-bytes Test to_bytes() on an instance - --test-array-write Test writing to array elements - --test-base-enum-instance Test creating base/enum type instances - -v, --verbose Print detailed info - -See the __main__ section for more CLI details and test examples. - -Main Classes and Functions: ---------------------------- - -- load_isf_json(path_or_file): Load and parse an ISF file, returning a VtypeJson accessor. -- VtypeJson: Main accessor for types, enums, symbols, and instance creation. -- BoundTypeInstance: Represents a struct/union/base/enum instance bound to a buffer. -- BoundArrayView: View for array fields, supporting element access and assignment. -- EnumInstance: Wrapper for enum values, supporting name and value access. -""" - -try: - import ujson as json - _JSON_LIB_USED = "ujson" -except ImportError: - import json - _JSON_LIB_USED = "json" - -import base64 -import struct # For unpacking data from buffers -import argparse # For command-line argument parsing -import sys # For exiting script -import lzma # For handling .xz compressed files -from typing import List, Dict, Any, Optional, Union - - -class SourceMetadata: - """Represents source file metadata within the ISF.""" - __slots__ = 'kind', 'name', 'hash_type', 'hash_value' - - def __init__(self, data: Dict[str, Any]): - self.kind: Optional[str] = data.get("kind") - self.name: Optional[str] = data.get("name") - self.hash_type: Optional[str] = data.get("hash_type") - self.hash_value: Optional[str] = data.get("hash_value") - - def __repr__(self) -> str: - return f"" - - -class UnixMetadata: - """Represents Unix-specific (Linux/Mac) metadata within the ISF.""" - __slots__ = 'symbols', 'types' - - def __init__(self, data: Dict[str, Any]): - self.symbols: List[SourceMetadata] = [ - SourceMetadata(s_data) for s_data in data.get("symbols", []) if s_data - ] - self.types: List[SourceMetadata] = [ - SourceMetadata(t_data) for t_data in data.get("types", []) if t_data - ] - - def __repr__(self) -> str: - return f"" - - -class VtypeMetadata: - """Represents the top-level metadata in the ISF.""" - __slots__ = 'linux', 'mac', 'producer', 'format_version' - - def __init__(self, data: Dict[str, Any]): - self.linux: Optional[UnixMetadata] = UnixMetadata( - data["linux"]) if data.get("linux") else None - self.mac: Optional[UnixMetadata] = UnixMetadata( - data["mac"]) if data.get("mac") else None - self.producer: Dict[str, str] = data.get("producer", {}) - self.format_version: Optional[str] = data.get("format") - - def __repr__(self) -> str: - return f"" - - -class VtypeBaseType: - """Represents a base type definition in the ISF (e.g., int, char).""" - __slots__ = 'name', 'size', 'signed', 'kind', 'endian', '_compiled_struct' - - def __init__(self, name: str, data: Dict[str, Any]): - self.name: str = name - self.size: Optional[int] = data.get("size") - self.signed: Optional[bool] = data.get("signed") - self.kind: Optional[str] = data.get("kind") - self.endian: Optional[str] = data.get("endian") - self._compiled_struct: Optional[struct.Struct] = None - - def get_compiled_struct(self) -> Optional[struct.Struct]: - if hasattr(self, '_compiled_struct') and self._compiled_struct is not None: - if self.size == 0 and self._compiled_struct is None: - return None - if self.size != 0: - return self._compiled_struct - elif self.size == 0: - self._compiled_struct = None - return None - - if self.size is None or self.kind is None or self.endian is None: - return None - if self.size == 0 and self.kind == "void": - self._compiled_struct = None - return None - - endian_char = '<' if self.endian == 'little' else '>' - fmt_char: Optional[str] = None - - if self.kind == "int" or self.kind == "pointer": - if self.size == 1: - fmt_char = 'b' if self.signed else 'B' - elif self.size == 2: - fmt_char = 'h' if self.signed else 'H' - elif self.size == 4: - fmt_char = 'i' if self.signed else 'I' - elif self.size == 8: - fmt_char = 'q' if self.signed else 'Q' - elif self.kind == "char": - if self.size == 1: - fmt_char = 'b' if self.signed else 'B' - elif self.kind == "bool": - if self.size == 1: - fmt_char = '?' - elif self.kind == "float": - if self.size == 4: - fmt_char = 'f' - elif self.size == 8: - fmt_char = 'd' - - if fmt_char: - try: - self._compiled_struct = struct.Struct(endian_char + fmt_char) - except struct.error: - self._compiled_struct = None - else: - self._compiled_struct = None - return self._compiled_struct - - def __repr__(self) -> str: - return f"" - - -class VtypeStructField: - """Represents a field within a user-defined struct or union.""" - __slots__ = 'name', 'type_info', 'offset', 'anonymous' - - def __init__(self, name: str, data: Dict[str, Any]): - self.name: str = name - self.type_info: Dict[str, Any] = data.get("type", {}) - self.offset: Optional[int] = data.get("offset") - self.anonymous: Optional[bool] = data.get("anonymous", False) - - def __repr__(self) -> str: - type_kind = self.type_info.get('kind', 'unknown') - type_name_val = self.type_info.get('name', '') - name_part = f" TypeName='{type_name_val}'" if type_name_val else "" - return f"" - - -class VtypeUserType: - """Represents a user-defined type (struct or union) in the ISF.""" - __slots__ = 'name', 'size', 'fields', 'kind' - - def __init__(self, name: str, data: Dict[str, Any]): - self.name: str = name - self.size: Optional[int] = data.get("size") - self.fields: Dict[str, VtypeStructField] = { - f_name: VtypeStructField(f_name, f_data) for f_name, f_data in data.get("fields", {}).items() if f_data - } - self.kind: Optional[str] = data.get("kind") # "struct" or "union" - - def __repr__(self) -> str: - return f"" - - -class VtypeEnum: - """Represents an enumeration type in the ISF.""" - __slots__ = 'name', 'size', 'base', 'constants', '_val_to_name' - - def __init__(self, name: str, data: Dict[str, Any]): - self.name: str = name - self.size: Optional[int] = data.get("size") - self.base: Optional[str] = data.get("base") - self.constants: Dict[str, int] = data.get("constants", {}) - self._val_to_name: Optional[Dict[int, str]] = None - - def get_name_for_value(self, value: int) -> Optional[str]: - if self._val_to_name is None: - self._val_to_name = {v: k for k, v in self.constants.items()} - return self._val_to_name.get(value) - - def __repr__(self) -> str: - return f"" - - -class VtypeSymbol: - """Represents a symbol (variable or function) in the ISF.""" - __slots__ = 'name', 'type_info', 'address', 'constant_data' - - def __init__(self, name: str, data: Dict[str, Any]): - self.name: str = name - self.type_info: Optional[Dict[str, Any]] = data.get("type") - self.address: Optional[int] = data.get("address") - self.constant_data: Optional[str] = data.get("constant_data") - - def get_decoded_constant_data(self) -> Optional[bytes]: - if self.constant_data: - try: - return base64.b64decode(self.constant_data) - except Exception: - return None - return None - - def __repr__(self) -> str: - type_kind = self.type_info.get( - 'kind', 'N/A') if self.type_info else 'N/A' - addr = f"{self.address:#x}" if self.address is not None else 'N/A' - return f"" - - -class BoundArrayView: - """A view into an array field of a BoundTypeInstance, allowing get/set of elements.""" - __slots__ = '_parent_instance', '_array_field_name', '_array_subtype_info', '_array_count', '_element_size', '_array_start_offset_in_parent' - - def __init__(self, parent_instance: 'BoundTypeInstance', array_field_name: str, - array_type_info: Dict[str, Any], array_start_offset_in_parent: int): - self._parent_instance = parent_instance - self._array_field_name = array_field_name # For error messages - self._array_subtype_info = array_type_info.get("subtype") - if self._array_subtype_info is None: - raise ValueError( - f"Array field '{array_field_name}' has no subtype information. type_info={array_type_info}") - self._array_count = array_type_info.get("count", 0) - - # Pre-calculate element size for efficiency - self._element_size = parent_instance._instance_vtype_accessor.get_type_size( - self._array_subtype_info) - if self._element_size is None: - raise ValueError( - f"Cannot determine element size for array '{array_field_name}'.\n" - f" Parent struct: {getattr(parent_instance, '_instance_type_name', None)}\n" - f" Array type_info: {array_type_info}\n" - f" Subtype info: {self._array_subtype_info}\n" - f" get_type_size(subtype_info) returned None.\n" - f" Check that the subtype is defined in the ISF and has a valid size.") - - self._array_start_offset_in_parent = array_start_offset_in_parent - - def _get_element_offset_in_parent_struct(self, index: int) -> int: - if not 0 <= index < self._array_count: - raise IndexError( - f"Array index {index} out of bounds for array '{self._array_field_name}' of size {self._array_count}.") - # type: ignore - return self._array_start_offset_in_parent + (index * self._element_size) - - def __getitem__(self, index: int) -> Any: - element_offset = self._get_element_offset_in_parent_struct(index) - # _read_data expects offset relative to parent struct start - return self._parent_instance._read_data( - self._array_subtype_info, - element_offset, - f"{self._array_field_name}[{index}]" - ) - - def __setitem__(self, index: int, value: Any): - element_offset = self._get_element_offset_in_parent_struct(index) - # _write_data expects offset relative to parent struct start - self._parent_instance._write_data( - self._array_subtype_info, - element_offset, - value, - f"{self._array_field_name}[{index}]" - ) - # Invalidate parent's cache for this array field, as its content (via this view) has changed. - if self._array_field_name in self._parent_instance._instance_cache: - del self._parent_instance._instance_cache[self._array_field_name] - - def __len__(self) -> int: - return self._array_count - - def __iter__(self): - for i in range(self._array_count): - yield self[i] - - def __repr__(self) -> str: - # Displaying all elements can be verbose for large arrays - # Consider showing first few and count, or just type and count - preview_count = min(self._array_count, 3) - items_preview = [repr(self[i]) for i in range(preview_count)] - if self._array_count > preview_count: - items_preview.append("...") - return f"" - - -class BoundTypeInstance: - """Represents an instance of a DWARF type bound to a memory buffer (bytearray for writability).""" - - def __init__(self, type_name: str, type_def: Union[VtypeUserType, VtypeBaseType, VtypeEnum], - buffer: bytearray, vtype_accessor: 'VtypeJson', - instance_offset_in_buffer: int = 0): - if not isinstance(buffer, bytearray): - raise TypeError( - "Internal Error: BoundTypeInstance expects a bytearray.") - self._instance_type_name = type_name - self._instance_type_def = type_def - self._instance_buffer = buffer - self._instance_vtype_accessor = vtype_accessor - self._instance_offset = instance_offset_in_buffer - self._instance_cache = {} - - @property - def _value(self) -> Any: - if isinstance(self._instance_type_def, VtypeUserType): - raise AttributeError( - f"'{self._instance_type_name}' is a struct/union and does not have a direct '._value' attribute. Access its fields instead.") - if isinstance(self._instance_type_def, VtypeBaseType): - base_type_def = self._instance_type_def - compiled_struct_obj = base_type_def.get_compiled_struct() - if base_type_def.size == 0: - return None - if compiled_struct_obj is None: - raise ValueError( - f"Cannot get compiled struct for base type '{base_type_def.name}'") - try: - return compiled_struct_obj.unpack_from(self._instance_buffer, self._instance_offset)[0] - except struct.error as e: - raise struct.error( - f"Error unpacking value for base type '{base_type_def.name}': {e}") - elif isinstance(self._instance_type_def, VtypeEnum): - enum_def = self._instance_type_def - if enum_def.base is None: - raise ValueError(f"Enum '{enum_def.name}' has no base type.") - base_type_def = self._instance_vtype_accessor.get_base_type( - enum_def.base) - if base_type_def is None: - raise ValueError( - f"Base type '{enum_def.base}' for enum '{enum_def.name}' not found.") - compiled_struct_obj = base_type_def.get_compiled_struct() - if compiled_struct_obj is None: - raise ValueError( - f"Cannot get compiled struct for enum base type '{enum_def.base}'.") - try: - int_val = compiled_struct_obj.unpack_from( - self._instance_buffer, self._instance_offset)[0] - return EnumInstance(enum_def, int_val) - except struct.error as e: - raise struct.error( - f"Error unpacking value for enum '{enum_def.name}': {e}") - else: - raise TypeError( - f"'._value' property not applicable to internal type: {type(self._instance_type_def).__name__}") - - @_value.setter - def _value(self, new_value: Any): - if isinstance(self._instance_type_def, VtypeUserType): - raise AttributeError( - f"Cannot set '._value' on a struct/union '{self._instance_type_name}'. Set its fields instead.") - if isinstance(self._instance_type_def, VtypeBaseType): - base_type_def = self._instance_type_def - compiled_struct_obj = base_type_def.get_compiled_struct() - if base_type_def.size == 0: - if new_value is not None: - raise ValueError("Cannot assign value to void type.") - return - if compiled_struct_obj is None: - raise ValueError( - f"Cannot get compiled struct for base type '{base_type_def.name}' to write value.") - # Handle negative values for unsigned types - if base_type_def.signed is False and isinstance(new_value, int) and new_value < 0: - new_value = new_value % (1 << (base_type_def.size * 8)) - try: - compiled_struct_obj.pack_into( - self._instance_buffer, self._instance_offset, new_value) - except struct.error as e: - raise struct.error( - f"Error packing value for base type '{base_type_def.name}': {e}") - elif isinstance(self._instance_type_def, VtypeEnum): - enum_def = self._instance_type_def - if enum_def.base is None: - raise ValueError( - f"Enum '{enum_def.name}' has no base type for writing.") - base_type_def = self._instance_vtype_accessor.get_base_type( - enum_def.base) - if base_type_def is None: - raise ValueError( - f"Base type '{enum_def.base}' for enum '{enum_def.name}' not found for writing.") - compiled_struct_obj = base_type_def.get_compiled_struct() - if compiled_struct_obj is None: - raise ValueError( - f"Cannot get compiled struct for enum base type '{enum_def.base}' for writing.") - int_val_to_write: int - if isinstance(new_value, EnumInstance): - int_val_to_write = new_value._value - elif isinstance(new_value, int): - int_val_to_write = new_value - elif isinstance(new_value, str): - found_val = enum_def.constants.get(new_value) - if found_val is None: - raise ValueError( - f"Enum constant name '{new_value}' not found in enum '{enum_def.name}'.") - int_val_to_write = found_val - else: - raise TypeError( - f"Cannot write type '{type(new_value)}' to enum instance. Expected EnumInstance, int, or str.") - try: - compiled_struct_obj.pack_into( - self._instance_buffer, self._instance_offset, int_val_to_write) - except struct.error as e: - raise struct.error( - f"Error packing value for enum '{enum_def.name}': {e}") - else: - raise TypeError( - f"'._value' property setter not applicable to internal type: {type(self._instance_type_def).__name__}") - if '_value' in self._instance_cache: - del self._instance_cache['_value'] - - def _read_data(self, field_type_info: Dict[str, Any], field_offset_in_struct: int, field_name_for_error: str) -> Any: - kind = field_type_info.get("kind") - name = field_type_info.get("name") - absolute_field_offset = self._instance_offset + field_offset_in_struct - - if kind == "base": - if name is None: - raise ValueError( - f"Base type for field '{field_name_for_error}' has no name.") - base_type_def = self._instance_vtype_accessor.get_base_type(name) - if base_type_def is None: - raise ValueError( - f"Base type '{name}' not found for field '{field_name_for_error}'.") - compiled_struct_obj = base_type_def.get_compiled_struct() - if base_type_def.size == 0: - return None - if compiled_struct_obj is None: - raise ValueError( - f"Cannot get compiled struct for base type '{name}'.") - try: - return compiled_struct_obj.unpack_from(self._instance_buffer, absolute_field_offset)[0] - except struct.error as e: - raise struct.error( - f"Error unpacking base type '{name}' for field '{field_name_for_error}': {e}") - - elif kind == "pointer": - ptr_base_type = self._instance_vtype_accessor.get_base_type( - "pointer") - if ptr_base_type is None: - raise ValueError("Base type 'pointer' definition not found.") - compiled_struct_obj = ptr_base_type.get_compiled_struct() - if compiled_struct_obj is None: - raise ValueError( - "Cannot get compiled struct for 'pointer' base type.") - try: - address = compiled_struct_obj.unpack_from( - self._instance_buffer, absolute_field_offset)[0] - return Ptr(address, field_type_info.get("subtype"), self._instance_vtype_accessor) - except struct.error as e: - raise struct.error( - f"Error unpacking pointer for field '{field_name_for_error}': {e}") - - elif kind == "array": - # Return a BoundArrayView instance instead of a Python list - return BoundArrayView(self, field_name_for_error, field_type_info, field_offset_in_struct) - - elif kind == "struct" or kind == "union": - if name is None: - raise ValueError( - f"User type for field '{field_name_for_error}' has no name.") - user_type_def = self._instance_vtype_accessor.get_user_type(name) - if user_type_def is None: - raise ValueError( - f"User type '{name}' not found for field '{field_name_for_error}'.") - return BoundTypeInstance(name, user_type_def, self._instance_buffer, self._instance_vtype_accessor, absolute_field_offset) - - elif kind == "enum": - if name is None: - raise ValueError( - f"Enum type for field '{field_name_for_error}' has no name.") - enum_def = self._instance_vtype_accessor.get_enum(name) - if enum_def is None: - raise ValueError( - f"Enum type '{name}' not found for field '{field_name_for_error}'.") - if enum_def.base is None: - raise ValueError(f"Enum '{name}' has no base type.") - base_type_def = self._instance_vtype_accessor.get_base_type( - enum_def.base) - if base_type_def is None: - raise ValueError( - f"Base type '{enum_def.base}' for enum '{name}' not found.") - compiled_struct_obj = base_type_def.get_compiled_struct() - if compiled_struct_obj is None: - raise ValueError( - f"Cannot get compiled struct for enum base type '{enum_def.base}'.") - int_val = compiled_struct_obj.unpack_from( - self._instance_buffer, absolute_field_offset)[0] - return EnumInstance(enum_def, int_val) - - elif kind == "bitfield": - bit_length = field_type_info.get("bit_length") - bit_position = field_type_info.get("bit_position") - underlying_type_info = field_type_info.get("type") - if None in [bit_length, bit_position, underlying_type_info]: - raise ValueError( - f"Bitfield '{field_name_for_error}' missing properties.") - underlying_base_name = underlying_type_info.get("name") - if underlying_base_name is None: - raise ValueError( - f"Bitfield '{field_name_for_error}' underlying type has no name.") - underlying_base_def = self._instance_vtype_accessor.get_base_type( - underlying_base_name) - if underlying_base_def is None or underlying_base_def.size is None: - raise ValueError( - f"Cannot get underlying type definition or size for bitfield '{field_name_for_error}'.") - compiled_struct_obj = underlying_base_def.get_compiled_struct() - if compiled_struct_obj is None: - raise ValueError( - f"Cannot get compiled struct for bitfield '{field_name_for_error}' underlying type.") - storage_unit_val = compiled_struct_obj.unpack_from( - self._instance_buffer, absolute_field_offset)[0] - mask = (1 << bit_length) - 1 - return (storage_unit_val >> bit_position) & mask - - elif kind == "function": - return f"" - elif kind == "void" and field_type_info.get("name") == "void": - return None - else: - raise ValueError( - f"Unsupported/invalid type kind '{kind}' for field '{field_name_for_error}'.") - - def _write_data(self, field_type_info: Dict[str, Any], field_offset_in_struct: int, - value_to_write: Any, field_name_for_error: str): - kind = field_type_info.get("kind") - name = field_type_info.get("name") - absolute_field_offset = self._instance_offset + field_offset_in_struct - - if kind == "base": - if name is None: - raise ValueError( - f"Base type for field '{field_name_for_error}' has no name.") - base_type_def = self._instance_vtype_accessor.get_base_type(name) - if base_type_def is None: - raise ValueError( - f"Base type '{name}' not found for field '{field_name_for_error}'.") - compiled_struct_obj = base_type_def.get_compiled_struct() - if base_type_def.size == 0: - if value_to_write is not None: - raise ValueError( - f"Cannot write value to void type field '{field_name_for_error}'.") - return - if compiled_struct_obj is None: - raise ValueError( - f"Cannot get compiled struct for base type '{name}' to write field '{field_name_for_error}'.") - # Handle negative values for unsigned types - if base_type_def.signed is False and isinstance(value_to_write, int) and value_to_write < 0: - value_to_write = value_to_write % (1 << (base_type_def.size * 8)) - # Handle wrapping for signed types - if base_type_def.signed is True and isinstance(value_to_write, int): - bits = base_type_def.size * 8 - max_signed = (1 << (bits - 1)) - 1 - min_signed = -(1 << (bits - 1)) - if value_to_write > max_signed: - value_to_write = value_to_write - (1 << bits) - if value_to_write < min_signed: - value_to_write = ((value_to_write + (1 << bits)) % (1 << bits)) + min_signed - try: - compiled_struct_obj.pack_into( - self._instance_buffer, absolute_field_offset, value_to_write) - except struct.error as e: - raise struct.error( - f"Error packing base type '{name}' for field '{field_name_for_error}': {e}") - - elif kind == "pointer": - ptr_base_type = self._instance_vtype_accessor.get_base_type( - "pointer") - if ptr_base_type is None: - raise ValueError( - "Base type 'pointer' definition not found for writing.") - compiled_struct_obj = ptr_base_type.get_compiled_struct() - if compiled_struct_obj is None: - raise ValueError( - "Cannot get compiled struct for 'pointer' base type for writing.") - address_to_write: int - if isinstance(value_to_write, Ptr): - address_to_write = value_to_write.address - elif isinstance(value_to_write, int): - address_to_write = value_to_write - else: - raise TypeError( - f"Cannot write type '{type(value_to_write)}' to pointer field '{field_name_for_error}'. Expected Ptr or int.") - try: - compiled_struct_obj.pack_into( - self._instance_buffer, absolute_field_offset, address_to_write) - except struct.error as e: - raise struct.error( - f"Error packing pointer for field '{field_name_for_error}': {e}") - - elif kind == "enum": - if name is None: - raise ValueError( - f"Enum type for field '{field_name_for_error}' has no name.") - enum_def = self._instance_vtype_accessor.get_enum(name) - if enum_def is None: - raise ValueError( - f"Enum type '{name}' not found for field '{field_name_for_error}'.") - if enum_def.base is None: - raise ValueError( - f"Enum '{name}' has no base type defined for writing.") - base_type_def = self._instance_vtype_accessor.get_base_type( - enum_def.base) - if base_type_def is None: - raise ValueError( - f"Base type '{enum_def.base}' for enum '{name}' not found for writing.") - compiled_struct_obj = base_type_def.get_compiled_struct() - if compiled_struct_obj is None: - raise ValueError( - f"Cannot get compiled struct for enum base type '{enum_def.base}' for writing.") - int_val_to_write: int - if isinstance(value_to_write, EnumInstance): - int_val_to_write = value_to_write._value - elif isinstance(value_to_write, int): - int_val_to_write = value_to_write - elif isinstance(value_to_write, str): - found_val = enum_def.constants.get(value_to_write) - if found_val is None: - raise ValueError( - f"Enum constant name '{value_to_write}' not found in enum '{name}'.") - int_val_to_write = found_val - else: - raise TypeError( - f"Cannot write type '{type(value_to_write)}' to enum field '{field_name_for_error}'. Expected EnumInstance, int, or str.") - try: - compiled_struct_obj.pack_into( - self._instance_buffer, absolute_field_offset, int_val_to_write) - except struct.error as e: - raise struct.error( - f"Error packing enum '{name}' for field '{field_name_for_error}': {e}") - - elif kind == "bitfield": - bit_length = field_type_info.get("bit_length") - bit_position = field_type_info.get("bit_position") - underlying_type_info = field_type_info.get("type") - if None in [bit_length, bit_position, underlying_type_info]: - raise ValueError( - f"Bitfield '{field_name_for_error}' missing properties for writing.") - underlying_base_name = underlying_type_info.get("name") - if underlying_base_name is None: - raise ValueError( - f"Bitfield '{field_name_for_error}' underlying type has no name for writing.") - underlying_base_def = self._instance_vtype_accessor.get_base_type( - underlying_base_name) - if underlying_base_def is None or underlying_base_def.size is None: - raise ValueError( - f"Cannot get underlying type definition or size for bitfield '{field_name_for_error}' for writing.") - compiled_struct_obj = underlying_base_def.get_compiled_struct() - if compiled_struct_obj is None: - raise ValueError( - f"Cannot get compiled struct for bitfield '{field_name_for_error}' underlying type for writing.") - current_storage_val = compiled_struct_obj.unpack_from( - self._instance_buffer, absolute_field_offset)[0] - if not isinstance(value_to_write, int): - raise TypeError( - f"Value for bitfield '{field_name_for_error}' must be an integer, got {type(value_to_write)}.") - mask = (1 << bit_length) - 1 - value_to_set = value_to_write & mask - new_storage_val = (current_storage_val & ~( - mask << bit_position)) | (value_to_set << bit_position) - try: - compiled_struct_obj.pack_into( - self._instance_buffer, absolute_field_offset, new_storage_val) - except struct.error as e: - raise struct.error( - f"Error packing bitfield '{field_name_for_error}': {e}") - - elif kind == "array" or kind == "struct" or kind == "union": - raise NotImplementedError( - f"Direct assignment to field '{field_name_for_error}' of type '{kind}' is not supported. Modify elements or nested fields individually.") - else: - raise TypeError( - f"Cannot write to field '{field_name_for_error}' of unhandled type kind '{kind}'.") - - def __getattr__(self, name: str) -> Any: - if name.startswith('_instance_'): - return super().__getattribute__(name) - - if isinstance(self._instance_type_def, VtypeUserType): - if name in self._instance_cache: - return self._instance_cache[name] - field_def = self._instance_type_def.fields.get(name) - if field_def is None: - raise AttributeError( - f"'{self._instance_type_name}' (struct/union) has no attribute '{name}'") - if field_def.offset is None: - raise ValueError( - f"Field '{name}' in '{self._instance_type_name}' has no offset.") - try: - val = self._read_data( - field_def.type_info, field_def.offset, name) - # Cache the BoundArrayView if an array is accessed - if field_def.type_info.get("kind") in ["struct", "union", "array"]: - self._instance_cache[name] = val - return val - except (struct.error, ValueError) as e: - raise AttributeError(f"Error processing field '{name}': {e}") - - raise AttributeError( - f"Type '{self._instance_type_name}' (kind: {self._instance_type_def.__class__.__name__}) has no attribute '{name}'. Use '._value' for base/enum types.") - - def __setattr__(self, name: str, new_value: Any): - if name.startswith('_instance_'): - super().__setattr__(name, new_value) - return - - if isinstance(self._instance_type_def, VtypeUserType): - field_def = self._instance_type_def.fields.get(name) - if field_def is None: - super().__setattr__(name, new_value) - return - if field_def.offset is None: - raise ValueError( - f"Field '{name}' in '{self._instance_type_name}' has no offset, cannot write.") - try: - # If setting an array field, the user should be using the BoundArrayView's __setitem__ - # This __setattr__ is for regular fields. - if field_def.type_info.get("kind") == "array": - raise NotImplementedError( - f"Direct assignment to array field '{name}' is not supported. Access elements like '{name}[index] = value'.") - - self._write_data(field_def.type_info, - field_def.offset, new_value, name) - if name in self._instance_cache: - del self._instance_cache[name] - return - except (struct.error, ValueError, TypeError, NotImplementedError) as e: - raise AttributeError(f"Error setting field '{name}': {e}") - - raise AttributeError( - f"Cannot set attribute '{name}' on type '{self._instance_type_name}' (kind: {self._instance_type_def.__class__.__name__}). Use '._value' for base/enum types.") - - def to_bytes(self) -> bytes: - size = self._instance_type_def.size - if size is None: - raise ValueError( - f"Cannot determine size for type '{self._instance_type_name}' (kind: {self._instance_type_def.__class__.__name__}) to get bytes.") - if size == 0: - return b'' - start = self._instance_offset - end = start + size - return bytes(self._instance_buffer[start:end]) - - @property - def offset(self) -> int: - """Returns the offset of this instance within the buffer.""" - return self._instance_offset - - def __repr__(self) -> str: - return f"" - - def __dir__(self): - attrs = list(super().__dir__()) - if isinstance(self._instance_type_def, VtypeUserType): - attrs.extend(self._instance_type_def.fields.keys()) - if isinstance(self._instance_type_def, (VtypeBaseType, VtypeEnum)): - attrs.append('_value') - return sorted(list(set(a for a in attrs if a != '_instance_cache'))) - - -class Ptr: - __slots__ = 'address', '_subtype_info', '_vtype_accessor' - - def __init__(self, address: int, subtype_info: Optional[Dict[str, Any]], vtype_accessor: 'VtypeJson'): - self.address = address - self._subtype_info = subtype_info - self._vtype_accessor = vtype_accessor - - def __repr__(self) -> str: - subtype_str = "void" - if self._subtype_info: - kind, name = self._subtype_info.get( - "kind"), self._subtype_info.get("name") - subtype_str = name if name else (kind if kind else "unknown") - return f"" - - @property - def points_to_type_info( - self) -> Optional[Dict[str, Any]]: return self._subtype_info - - @property - def points_to_type_name(self) -> str: - if not self._subtype_info: - return "void" - name, kind = self._subtype_info.get( - "name"), self._subtype_info.get("kind") - if name: - return name - return "void" if kind == "base" and not name else (kind if kind else "unknown") - - -class EnumInstance: - __slots__ = '_enum_def', '_value' - - def __init__(self, enum_def: VtypeEnum, value: int): - self._enum_def = enum_def - self._value = value - - @property - def name( - self) -> Optional[str]: return self._enum_def.get_name_for_value(self._value) - - def __repr__(self) -> str: - name_part = f"{self._enum_def.name}.{self.name}" if self.name else f"{self._enum_def.name} (value)" - return f"" - - def __int__(self) -> int: return self._value - - def __eq__(self, other): - if isinstance(other, EnumInstance): - return self._value == other._value and self._enum_def.name == other._enum_def.name - if isinstance(other, int): - return self._value == other - if isinstance(other, str): - return self.name == other - return False - - -class VtypeJson: - def __init__(self, data: Dict[str, Any]): - self.metadata: VtypeMetadata = VtypeMetadata(data.get("metadata", {})) - self._raw_base_types: Dict[str, Any] = data.get("base_types", {}) - self._parsed_base_types_cache: Dict[str, VtypeBaseType] = {} - self._raw_user_types: Dict[str, Any] = data.get("user_types", {}) - self._parsed_user_types_cache: Dict[str, VtypeUserType] = {} - self._raw_enums: Dict[str, Any] = data.get("enums", {}) - self._parsed_enums_cache: Dict[str, VtypeEnum] = {} - self._raw_symbols: Dict[str, Any] = data.get("symbols", {}) - self._parsed_symbols_cache: Dict[str, VtypeSymbol] = {} - self._address_to_symbol_list_cache: Optional[Dict[int, - List[VtypeSymbol]]] = None - - def shift_symbol_addresses(self, delta: int): - """ - Shift all symbol addresses by the given delta. Updates both raw symbol data and cached VtypeSymbol objects. - """ - # Update raw symbol data - for sym_name, sym_data in self._raw_symbols.items(): - if sym_data is not None and "address" in sym_data and sym_data["address"] not in [None, 0]: - sym_data["address"] += delta - # Update cached VtypeSymbol objects - for sym_obj in self._parsed_symbols_cache.values(): - if sym_obj.address not in [None, 0]: - sym_obj.address += delta - # Invalidate address-to-symbol cache - self._address_to_symbol_list_cache = None - - def get_base_type(self, name: str) -> Optional[VtypeBaseType]: - if name in self._parsed_base_types_cache: - return self._parsed_base_types_cache[name] - raw_data = self._raw_base_types.get(name) - if raw_data is None: - return None - obj = VtypeBaseType(name, raw_data) - self._parsed_base_types_cache[name] = obj - return obj - - def get_user_type(self, name: str) -> Optional[VtypeUserType]: - if name in self._parsed_user_types_cache: - return self._parsed_user_types_cache[name] - raw_data = self._raw_user_types.get(name) - if raw_data is None: - return None - obj = VtypeUserType(name, raw_data) - self._parsed_user_types_cache[name] = obj - return obj - - def get_enum(self, name: str) -> Optional[VtypeEnum]: - if name in self._parsed_enums_cache: - return self._parsed_enums_cache[name] - raw_data = self._raw_enums.get(name) - if raw_data is None: - return None - obj = VtypeEnum(name, raw_data) - self._parsed_enums_cache[name] = obj - return obj - - def get_symbol(self, name: str) -> Optional[VtypeSymbol]: - if name in self._parsed_symbols_cache: - return self._parsed_symbols_cache[name] - raw_data = self._raw_symbols.get(name) - if raw_data is None: - return None - obj = VtypeSymbol(name, raw_data) - self._parsed_symbols_cache[name] = obj - return obj - - def get_type(self, name: str) -> Optional[Union[VtypeUserType, VtypeBaseType, VtypeEnum]]: - original_name = name - name_lower = name.lower() - - if name_lower.startswith("struct "): - type_name_to_find = original_name[len("struct "):].strip() - return self.get_user_type(type_name_to_find) - elif name_lower.startswith("union "): - type_name_to_find = original_name[len("union "):].strip() - user_type = self.get_user_type(type_name_to_find) - if user_type and user_type.kind == "union": - return user_type - return None - elif name_lower.startswith("enum "): - type_name_to_find = original_name[len("enum "):].strip() - return self.get_enum(type_name_to_find) - - found_type = self.get_user_type(original_name) - if found_type: - return found_type - found_type = self.get_enum(original_name) - if found_type: - return found_type - found_type = self.get_base_type(original_name) - if found_type: - return found_type - return None - - def get_symbols_by_address(self, target_address: int) -> List[VtypeSymbol]: - if self._address_to_symbol_list_cache is None: - self._address_to_symbol_list_cache = {} - for symbol_name in self._raw_symbols.keys(): - symbol_obj = self.get_symbol(symbol_name) - if symbol_obj and symbol_obj.address is not None: - self._address_to_symbol_list_cache.setdefault( - symbol_obj.address, []).append(symbol_obj) - return self._address_to_symbol_list_cache.get(target_address, []) - - def get_type_size(self, type_info: Dict[str, Any]) -> Optional[int]: - kind, name = type_info.get("kind"), type_info.get("name") - if kind == "base": - base_def = self.get_base_type(name) if name else None - return base_def.size if base_def else None - if kind == "pointer": - ptr_base_def = self.get_base_type("pointer") - return ptr_base_def.size if ptr_base_def else None - if kind in ["struct", "union"]: - user_def = self.get_user_type(name) if name else None - return user_def.size if user_def else None - if kind == "enum": - enum_def = self.get_enum(name) if name else None - if not enum_def or not enum_def.base: - return None - base_type_for_enum = self.get_base_type(enum_def.base) - return base_type_for_enum.size if base_type_for_enum else None - if kind == "array": - count, subtype_info = type_info.get( - "count"), type_info.get("subtype") - if None in [count, subtype_info]: - return None - element_size = self.get_type_size(subtype_info) - return count * element_size if element_size is not None else None - if kind == "bitfield": - underlying_type_info = type_info.get("type") - return self.get_type_size(underlying_type_info) if underlying_type_info else None - return None - - def create_instance(self, type_input: Union[str, VtypeUserType, VtypeBaseType, VtypeEnum], - buffer: Union[bytes, bytearray], - instance_offset_in_buffer: int = 0) -> BoundTypeInstance: - - type_def: Optional[Union[VtypeUserType, - VtypeBaseType, VtypeEnum]] = None - type_name_for_instance: str - - processed_buffer: bytearray - if isinstance(buffer, bytes): - processed_buffer = bytearray(buffer) - elif isinstance(buffer, bytearray): - processed_buffer = buffer - else: - raise TypeError( - "Input buffer for create_instance must be bytes or bytearray.") - - if isinstance(type_input, str): - type_name_for_instance = type_input - type_def = self.get_type(type_input) - elif isinstance(type_input, (VtypeUserType, VtypeBaseType, VtypeEnum)): - type_def = type_input - type_name_for_instance = type_def.name - else: - raise TypeError( - f"type_input must be a string (type name) or a VtypeUserType/BaseType/Enum object, got {type(type_input)}") - - if type_def: - if not hasattr(type_def, 'size') or type_def.size is None: - if not (hasattr(type_def, 'kind') and getattr(type_def, 'kind') == 'void' and type_def.size == 0): - raise ValueError( - f"Type definition for '{type_name_for_instance}' (kind: {type_def.__class__.__name__}) lacks a valid size attribute.") - - if type_def.size is not None: - effective_len = len(processed_buffer) - \ - instance_offset_in_buffer - if type_def.size > effective_len: - raise ValueError( - f"Buffer too small for '{type_name_for_instance}' at offset {instance_offset_in_buffer}. Need {type_def.size}, got {effective_len}.") - return BoundTypeInstance(type_name_for_instance, type_def, processed_buffer, self, instance_offset_in_buffer) - - raise ValueError( - f"Type definition for '{type_input if isinstance(type_input, str) else type_input.name}' not found.") - - def __repr__(self) -> str: - return (f"") - - -class VtypeJsonGroup: - """ - Container for multiple related VtypeJson objects, loaded from a list of file paths or file-like objects. - Methods are dispatched in order to each VtypeJson until a result is found. - """ - def __init__(self, file_list: list): - self._file_order = list(file_list) - self.vtypejsons = {} - for f in self._file_order: - self.vtypejsons[f] = load_isf_json(f) - - @property - def paths(self): - return list(self._file_order) - - def get_vtypejson(self, path): - return self.vtypejsons[path] - - def get_base_type(self, name: str): - for f in self._file_order: - res = self.vtypejsons[f].get_base_type(name) - if res is not None: - return res - return None - - def get_user_type(self, name: str): - for f in self._file_order: - res = self.vtypejsons[f].get_user_type(name) - if res is not None: - return res - return None - - def get_enum(self, name: str): - for f in self._file_order: - res = self.vtypejsons[f].get_enum(name) - if res is not None: - return res - return None - - def get_symbol(self, name: str): - for f in self._file_order: - res = self.vtypejsons[f].get_symbol(name) - if hasattr(res, 'address') and res.address in [None, 0]: - continue - if res: - return res - - def get_type(self, name: str): - for f in self._file_order: - res = self.vtypejsons[f].get_type(name) - if res is not None: - return res - return None - - def get_symbols_by_address(self, target_address: int): - results = [] - for f in self._file_order: - results.extend(self.vtypejsons[f].get_symbols_by_address(target_address)) - return results - - def get_type_size(self, type_info: dict): - for f in self._file_order: - res = self.vtypejsons[f].get_type_size(type_info) - if res is not None: - return res - return None - - def create_instance(self, type_input, buffer, instance_offset_in_buffer=0): - for f in self._file_order: - try: - return self.vtypejsons[f].create_instance(type_input, buffer, instance_offset_in_buffer) - except ValueError: - continue - raise ValueError(f"Type definition for '{type_input if isinstance(type_input, str) else getattr(type_input, 'name', type(type_input))}' not found in any VtypeJson.") - - def shift_symbol_addresses(self, delta: int, path: str = None): - """ - Shift symbol addresses for all or a specific VtypeJson in the group. - If path is None, shift all; else shift only the VtypeJson for the given path. - """ - if path is None: - for f in self._file_order: - self.vtypejsons[f].shift_symbol_addresses(delta) - else: - self.vtypejsons[path].shift_symbol_addresses(delta) - - def __repr__(self): - return f"" - - -def load_isf_json(json_input: Union[str, object]) -> VtypeJson: - raw_data: Any - input_is_path_str = isinstance(json_input, str) - if input_is_path_str: - path_str = str(json_input) - is_xz = path_str.endswith(".xz") - try: - if is_xz: - with lzma.open(path_str, 'rt', encoding='utf-8') as f: - raw_data = json.load(f) - else: - with open(path_str, 'r', encoding='utf-8') as f: - raw_data = json.load(f) - except FileNotFoundError: - raise FileNotFoundError( - f"The ISF JSON file was not found: {path_str}") - except (IOError, OSError) as e: - raise ValueError( - f"Could not open or read file '{path_str}'. Error: {e}") from e - except json.JSONDecodeError as e: - raise ValueError( - f"Error decoding JSON from file {path_str} (using {_JSON_LIB_USED}).") from e - except lzma.LZMAError as e: - raise ValueError(f"Error decompressing XZ file {path_str}.") from e - elif hasattr(json_input, 'read'): - try: - raw_data = json.load(json_input) - except json.JSONDecodeError as e: - raise ValueError( - f"Error decoding JSON from file-like object (using {_JSON_LIB_USED}).") from e - else: - raise TypeError( - f"Input must be a JSON string (path or content), or a file-like object. Got {type(json_input)}.") - if not isinstance(raw_data, dict): - raise ValueError( - "ISF JSON root must be an object, not a list or other type.") - return VtypeJson(raw_data) - - -if __name__ == '__main__': - cli_parser = argparse.ArgumentParser( - description="Load and parse a dwarf2json ISF (Intermediate Symbol File) JSON or JSON.XZ.", - epilog=f"This script uses the '{_JSON_LIB_USED}' library for JSON parsing." - ) - cli_parser.add_argument("json_file_path", type=str, - help="Path to the ISF JSON or JSON.XZ file.") - cli_parser.add_argument( - "-v", "--verbose", action="store_true", help="Print detailed info.") - cli_parser.add_argument("--find-symbol-at", type=lambda x: int(x, 0), - metavar="ADDRESS", help="Find symbols at address.") - cli_parser.add_argument( - "--test-write", action="store_true", help="Run field write test.") - cli_parser.add_argument( - "--test-to-bytes", action="store_true", help="Run to_bytes() test.") - cli_parser.add_argument("--test-base-enum-instance", action="store_true", - help="Test creating instances of base/enum types.") - cli_parser.add_argument( - "--get-type", type=str, help="Test the generic get_type method with the provided type name.") - cli_parser.add_argument( - "--test-array-write", action="store_true", help="Test writing to array elements.") - - args = cli_parser.parse_args() - print( - f"Attempting to load ISF file: {args.json_file_path} (using {_JSON_LIB_USED})") - - try: - isf_data: VtypeJson = load_isf_json(args.json_file_path) - print("\nSuccessfully loaded ISF JSON.") - print(f" ISF Representation: {isf_data}") - - if args.get_type: - print(f"\n--- Testing get_type('{args.get_type}') ---") - found_type_obj = isf_data.get_type(args.get_type) - if found_type_obj: - print(f" Found type: {found_type_obj}") - print(f" Type class: {found_type_obj.__class__.__name__}") - else: - print(f" Type '{args.get_type}' not found.") - - if args.find_symbol_at is not None: - print( - f"\n--- Finding symbols at address {args.find_symbol_at:#x} ---") - symbols_at_addr = isf_data.get_symbols_by_address( - args.find_symbol_at) - if symbols_at_addr: - print( - f" Found {len(symbols_at_addr)} symbol(s) at {args.find_symbol_at:#x}:") - for sym_obj in symbols_at_addr: - print(f" - {sym_obj}") - else: - print( - f" No symbols found at address {args.find_symbol_at:#x}.") - if isf_data._address_to_symbol_list_cache is not None: - print( - f" Address-to-symbol cache is now populated with {len(isf_data._address_to_symbol_list_cache)} entries.") - - if args.verbose: - print("\n--- Verbose Information ---") - print( - f" Metadata Producer: {isf_data.metadata.producer.get('name', 'N/A')}, Version: {isf_data.metadata.producer.get('version', 'N/A')}") - print(f" ISF Format Version: {isf_data.metadata.format_version}") - print( - f" Number of raw base types defined: {len(isf_data._raw_base_types)}") - print( - f" Number of raw user types defined: {len(isf_data._raw_user_types)}") - print(f" Number of raw enums defined: {len(isf_data._raw_enums)}") - print( - f" Number of raw symbols defined: {len(isf_data._raw_symbols)}") - - if args.test_write or args.test_to_bytes or args.test_array_write: - print( - "\n--- Testing Field Write, To Bytes, and/or Array Write Functionality ---") - # This test assumes 'my_struct' and 'portal_ffi_call' are defined as in previous examples or in your ISF. - # Adjust struct name and fields as necessary. - struct_to_test = "my_struct" # or "portal_ffi_call" if that's in your ISF - struct_def = isf_data.get_user_type(struct_to_test) - - if struct_def and struct_def.size is not None: - buffer_data = bytearray(struct_def.size) - - # Initialize buffer for "my_struct" (example) - if struct_to_test == "my_struct": - struct.pack_into(" 0 else 'N/A'}") - if len(args_array_view) > 0: - args_array_view[0] = 0xAAAAAAAAAAAAAAAA - print( - " Set args_array_view[0] = 0xAAAAAAAAAAAAAAAA") - print( - f" New args_array_view[0]: {args_array_view[0]}") - if len(args_array_view) > 1: - args_array_view[1] = 0xBBBBBBBBBBBBBBBB - print( - " Set args_array_view[1] = 0xBBBBBBBBBBBBBBBB") - print( - f" New args_array_view[1]: {args_array_view[1]}") - - # Verify directly from buffer if possible (assuming 'args' field offset and element size) - args_field_def = struct_def.fields.get("args") - if args_field_def and args_field_def.offset is not None: - subtype_info = args_field_def.type_info.get("subtype") - if subtype_info: - element_size = isf_data.get_type_size(subtype_info) - if element_size: - if len(args_array_view) > 0: - val0 = struct.unpack_from( - f"<{isf_data.get_base_type(subtype_info.get('name')).get_compiled_struct().format[-1]}", buffer_data, instance._instance_offset + args_field_def.offset)[0] - print( - f" Buffer check args[0]: {val0:#x}") - if len(args_array_view) > 1: - val1 = struct.unpack_from( - f"<{isf_data.get_base_type(subtype_info.get('name')).get_compiled_struct().format[-1]}", buffer_data, instance._instance_offset + args_field_def.offset + element_size)[0] - print( - f" Buffer check args[1]: {val1:#x}") - - if args.test_to_bytes: - instance_bytes = instance.to_bytes() - print( - f" instance.to_bytes() (hex) for '{struct_to_test}': {instance_bytes.hex()}") - else: - print( - f" Skipping write/to_bytes/array_write test: '{struct_to_test}' not found or has no size.") - - if args.test_base_enum_instance: - # ... (base/enum instance test as before) ... - print("\n--- Testing Base/Enum Instance Creation ---") - int_def = isf_data.get_base_type("int") - if int_def and int_def.size is not None: - int_buffer = bytearray(int_def.size) - struct.pack_into(" bool: def to_bytes(self): """Pass-through to underlying bound object for serialization.""" - return self._obj.to_bytes() + return bytes(self._obj) # --- Standard Accessors (Proxied via get_register for simplicity) --- @@ -605,28 +605,21 @@ def get_syscall_number(self) -> Optional[int]: class AArch64PtRegsWrapper(PtRegsWrapper): """Wrapper for AArch64 pt_regs""" - # Helper for nested access: obj.unnamed_field_0.unnamed_field_0 - # We can pre-calculate the getter to jump straight to the inner struct - _ACCESSORS = {} - @staticmethod - def _get_inner(obj): - return obj.unnamed_field_0.unnamed_field_0 - # Build accessors for i in range(31): _ACCESSORS[f"x{i}"] = ( - lambda obj, i=i: AArch64PtRegsWrapper._get_inner(obj).regs[i], - lambda obj, val, i=i: AArch64PtRegsWrapper._get_inner(obj).regs.__setitem__(i, val) + lambda obj, i=i: obj.regs[i], + lambda obj, val, i=i: obj.regs.__setitem__(i, val) ) - _ACCESSORS["sp"] = (lambda obj: AArch64PtRegsWrapper._get_inner(obj).sp, - lambda obj, val: setattr(AArch64PtRegsWrapper._get_inner(obj), 'sp', val)) - _ACCESSORS["pc"] = (lambda obj: AArch64PtRegsWrapper._get_inner(obj).pc, - lambda obj, val: setattr(AArch64PtRegsWrapper._get_inner(obj), 'pc', val)) - _ACCESSORS["pstate"] = (lambda obj: AArch64PtRegsWrapper._get_inner(obj).pstate, - lambda obj, val: setattr(AArch64PtRegsWrapper._get_inner(obj), 'pstate', val)) + _ACCESSORS["sp"] = (lambda obj: obj.sp, + lambda obj, val: setattr(obj, 'sp', val)) + _ACCESSORS["pc"] = (lambda obj: obj.pc, + lambda obj, val: setattr(obj, 'pc', val)) + _ACCESSORS["pstate"] = (lambda obj: obj.pstate, + lambda obj, val: setattr(obj, 'pstate', val)) # Direct fields _ACCESSORS["syscallno"] = (_make_attr_getter("syscallno"), _make_attr_setter("syscallno")) @@ -658,7 +651,7 @@ def _is_aarch32_mode(self) -> bool: self._checking_mode = True try: # Check pstate nRW bit (bit 4). 1 = 32-bit. - pstate = self._get_inner(self._obj).pstate + pstate = self._obj.pstate return (pstate & 0x10) != 0 except (AttributeError, TypeError): return False @@ -836,27 +829,20 @@ def get_userland_arg(self, num: int) -> Optional[int]: class PowerPCPtRegsWrapper(PtRegsWrapper): """Wrapper for PowerPC pt_regs""" - # Helper to traverse PANDA's nested union structure for PPC - # obj.unnamed_field_0.unnamed_field_0.gpr[i] - @staticmethod - def _get_inner(obj): - # Optimistic access: assumes standard PANDA nesting - return obj.unnamed_field_0.unnamed_field_0 - _ACCESSORS = {} # 1. GPRs (r0-r31) for i in range(32): _ACCESSORS[f"r{i}"] = ( - lambda obj, i=i: PowerPCPtRegsWrapper._get_inner(obj).gpr[i], - lambda obj, val, i=i: PowerPCPtRegsWrapper._get_inner(obj).gpr.__setitem__(i, val) + lambda obj, i=i: obj.gpr[i], + lambda obj, val, i=i: obj.gpr.__setitem__(i, val) ) # 2. Special Registers (Direct fields in inner struct) for reg in ["nip", "msr", "orig_gpr3", "ctr", "link", "xer", "ccr", "softe", "trap", "dar", "dsisr", "result"]: _ACCESSORS[reg] = ( - lambda obj, r=reg: getattr(PowerPCPtRegsWrapper._get_inner(obj), r), - lambda obj, val, r=reg: setattr(PowerPCPtRegsWrapper._get_inner(obj), r, val) + lambda obj, r=reg: getattr(obj, r), + lambda obj, val, r=reg: setattr(obj, r, val) ) # 3. Aliases @@ -870,13 +856,6 @@ def _get_inner(obj): def __init__(self, obj: Any, panda: Optional[Any] = None) -> None: super().__init__(obj, panda) - # Check if we need to adjust the object for non-nested structures (rare case) - if not hasattr(obj, "unnamed_field_0"): - # If struct is flat, we patch _get_inner to return obj identity - # (Note: This monkeypatch is per-instance if we attach it to self, - # but accessors are class-level. Standard PANDA is nested. - # We assume nested for the optimized path). - pass def get_syscall_arg(self, num: int) -> Optional[int]: """Get PowerPC syscall argument"""