diff --git a/pyproject.toml b/pyproject.toml index c0323e7..89882ad 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,13 +8,30 @@ dynamic = ["version"] description = "dwarffi is a Python library for parsing ISF files and providing an interface to access kernel symbols and types." readme = "README.md" requires-python = ">=3.10" -dependencies = [] +dependencies = [ + "msgspec", +] +authors = [ + { name = "Luke Craig", email = "luke.craig@mit.edu" } +] +classifiers = [ + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Intended Audience :: Developers", + "Topic :: Software Development :: Libraries :: Python Modules", + "Topic :: System :: Operating System Kernels", +] + +[project.urls] +Repository = "https://github.com/rehosting/dwarffi" +Issues = "https://github.com/rehosting/dwarffi/issues" [project.optional-dependencies] dev = [ "pytest>=8.0.0", - "ruff>=0.4.0", - "ujson>=5.11.0", + "ruff>=0.4.0" ] [tool.pytest.ini_options] diff --git a/scripts/benchmark_json.py b/scripts/benchmark_json.py new file mode 100644 index 0000000..6d9493d --- /dev/null +++ b/scripts/benchmark_json.py @@ -0,0 +1,156 @@ +import gc +import json +import time +import tracemalloc + +try: + import orjson +except ImportError: + orjson = None + +import msgspec + +# Import the new ISFData struct from your refactored codebase +from dwarffi.types import ISFData + + +def generate_heavy_isf_payload() -> bytes: + print("Generating synthetic heavy ISF payload...") + isf = { + "metadata": {"format": "1.0.0", "producer": {"name": "benchmark"}}, + "base_types": { + "int": {"kind": "int", "size": 4, "signed": True, "endian": "little"}, + "pointer": {"kind": "pointer", "size": 8, "endian": "little"} + }, + "user_types": {}, + "enums": {}, + "symbols": {}, + "typedefs": {} + } + + # Generate 15,000 Structs + for i in range(15000): + isf["user_types"][f"struct_heavy_{i}"] = { + "kind": "struct", + "size": 16, + "fields": { + "field_a": {"offset": 0, "type": {"kind": "base", "name": "int"}}, + "field_b": {"offset": 8, "type": {"kind": "pointer", "subtype": {"kind": "base", "name": "int"}}} + } + } + + # Generate 5,000 Enums + for i in range(5000): + isf["enums"][f"enum_heavy_{i}"] = { + "size": 4, + "base": "int", + "constants": {"A": 0, "B": 1, "C": 2} + } + + # Generate 50,000 Symbols + for i in range(50000): + isf["symbols"][f"sys_func_{i}"] = { + "address": 0xFFFFFFFF81000000 + (i * 16), + "type": {"kind": "struct", "name": f"struct_heavy_{i % 15000}"} + } + + payload = json.dumps(isf).encode("utf-8") + print(f"Payload generated: {len(payload) / 1024 / 1024:.2f} MB\n") + return payload + + +def parse_old_way_json(data: bytes): + """Simulates the legacy VtypeJson parsing and validation using standard json.""" + raw_data = json.loads(data) + + # Legacy manual schema validation + required_sections = ["base_types", "user_types"] + missing = [s for s in required_sections if s not in raw_data] + if missing: + raise ValueError(f"ISF is missing required top-level sections: {missing}") + + for name, definition in raw_data.get("user_types", {}).items(): + if "kind" not in definition: + raise ValueError(f"User type '{name}' is missing the required 'kind' field.") + + return raw_data + + +def parse_old_way_orjson(data: bytes): + """Simulates the legacy VtypeJson parsing and validation using orjson (if available).""" + raw_data = orjson.loads(data) + + required_sections = ["base_types", "user_types"] + missing = [s for s in required_sections if s not in raw_data] + if missing: + raise ValueError(f"ISF is missing required top-level sections: {missing}") + + for name, definition in raw_data.get("user_types", {}).items(): + if "kind" not in definition: + raise ValueError(f"User type '{name}' is missing the required 'kind' field.") + + return raw_data + + +def parse_new_way_msgspec(data: bytes): + """Simulates the new msgspec strict-schema decoding.""" + return msgspec.json.decode(data, type=ISFData) + + +def run_benchmark(name: str, func, payload: bytes, iterations: int = 5): + print(f"--- Benchmarking: {name} ---") + + # 1. Measure Time + gc.disable() # Disable GC to isolate parsing CPU time + start_time = time.perf_counter() + for _ in range(iterations): + _ = func(payload) + end_time = time.perf_counter() + gc.enable() + + avg_time = (end_time - start_time) / iterations + + # 2. Measure Memory + gc.collect() + tracemalloc.start() + + parsed_obj = func(payload) + + current_mem, peak_mem = tracemalloc.get_traced_memory() + tracemalloc.stop() + + # Keep object alive intentionally so 'current_mem' reflects retained size + _ = parsed_obj + + print(f" Average Time : {avg_time:.4f} seconds") + print(f" Peak Memory : {peak_mem / 1024 / 1024:.2f} MB") + print(f" Retained Mem : {current_mem / 1024 / 1024:.2f} MB\n") + + return avg_time, peak_mem, current_mem + + +def main(): + payload = generate_heavy_isf_payload() + iterations = 10 + + time_json, peak_json, ret_json = run_benchmark("Legacy Parser (Standard 'json')", parse_old_way_json, payload, iterations) + + if orjson: + time_orjson, peak_orjson, ret_orjson = run_benchmark("Legacy Parser ('orjson')", parse_old_way_orjson, payload, iterations) + else: + print("--- Benchmarking: Legacy Parser ('orjson') ---\n [orjson not installed, skipping]\n") + time_orjson = float('inf') + + time_msgspec, peak_msgspec, ret_msgspec = run_benchmark("New Parser ('msgspec')", parse_new_way_msgspec, payload, iterations) + + print("================ SUMMARY ================") + print(f"Speedup vs standard json : {time_json / time_msgspec:.2f}x faster") + if orjson: + print(f"Speedup vs orjson : {time_orjson / time_msgspec:.2f}x faster") + + print(f"Peak Memory Reduction : {peak_json / peak_msgspec:.2f}x less RAM used during parsing") + print(f"Retained Memory Reduction: {ret_json / ret_msgspec:.2f}x less RAM held by parsed objects") + print("=========================================") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/dwarffi/dffi.py b/src/dwarffi/dffi.py index 21ac0bb..096d7d3 100644 --- a/src/dwarffi/dffi.py +++ b/src/dwarffi/dffi.py @@ -13,7 +13,7 @@ from .backend import BytesBackend, LiveMemoryProxy, MemoryBackend from .instances import BoundArrayView, BoundTypeInstance, Ptr from .parser import VtypeJson -from .types import VtypeBaseType, VtypeEnum, VtypeSymbol, VtypeUserType +from .types import VtypeBaseType, VtypeEnum, VtypeStructField, VtypeSymbol, VtypeUserType # Clean, unified Type Aliases VTYPE_CLASSES = (VtypeBaseType, VtypeEnum, VtypeUserType) @@ -96,7 +96,7 @@ def symbols(self) -> Dict[str, Any]: # First loaded wins => iterate in load order and don't overwrite. for path in self._file_order: vj = self.vtypejsons[path] - for sym_name in vj._raw_symbols.keys(): + for sym_name in vj._isf.symbols.keys(): if sym_name in merged: continue sym = self.get_symbol(sym_name, path=path, include_incomplete=True) @@ -110,7 +110,7 @@ def types(self) -> Dict[str, "VtypeUserType"]: """Returns a dictionary of all user types (structs/unions) across all loaded ISF files.""" merged = {} for path in reversed(self._file_order): - for type_name in self.vtypejsons[path]._raw_user_types.keys(): + for type_name in self.vtypejsons[path]._isf.user_types.keys(): t = self.get_user_type(type_name) if t: merged[type_name] = t @@ -121,7 +121,7 @@ def base_types(self) -> Dict[str, "VtypeBaseType"]: """Returns a dictionary of all base types across all loaded ISF files.""" merged = {} for path in reversed(self._file_order): - for type_name in self.vtypejsons[path]._raw_base_types.keys(): + for type_name in self.vtypejsons[path]._isf.base_types.keys(): t = self.get_base_type(type_name) if t: merged[type_name] = t @@ -132,7 +132,7 @@ def enums(self) -> Dict[str, "VtypeEnum"]: """Returns a dictionary of all enums across all loaded ISF files.""" merged = {} for path in reversed(self._file_order): - for enum_name in self.vtypejsons[path]._raw_enums.keys(): + for enum_name in self.vtypejsons[path]._isf.enums.keys(): t = self.get_enum(enum_name) if t: merged[enum_name] = t @@ -152,7 +152,7 @@ def _resolve_type_info(self, type_info: Dict[str, Any]) -> Dict[str, Any]: td = None for f in self._file_order: - td = self.vtypejsons[f]._raw_typedefs.get(name) + td = self.vtypejsons[f]._isf.typedefs.get(name) if td: break if not td: @@ -599,12 +599,12 @@ def new(self, ctype: Union[str, Vtype, dict], init: Any = None) -> BoundType: # without modifying the core instances engine. dummy_name = f"__dummy_{id(buf)}" primary_isf_path = self._file_order[0] - self.vtypejsons[primary_isf_path]._raw_user_types[dummy_name] = { - "kind": "struct", - "size": size, - "fields": {"arr": {"offset": 0, "type": t}}, - } - self.vtypejsons[primary_isf_path]._parsed_user_types_cache.pop(dummy_name, None) + self.vtypejsons[primary_isf_path]._isf.user_types[dummy_name] = VtypeUserType( + kind="struct", + size=size, + fields={"arr": VtypeStructField(type_info=t, offset=0, name="arr")}, + name=dummy_name + ) instance = self._create_instance(dummy_name, buf) arr_view = instance.arr @@ -709,12 +709,12 @@ def from_address(self, ctype: Union[str, Vtype, dict], address: int) -> BoundTyp dummy_size = count * elem_size dummy_name = f"__dummy_backend_{address}_{hash(str(t_dummy))}" primary_isf_path = self._file_order[0] - self.vtypejsons[primary_isf_path]._raw_user_types[dummy_name] = { - "kind": "struct", - "size": dummy_size, - "fields": {"arr": {"offset": 0, "type": t_dummy}}, - } - self.vtypejsons[primary_isf_path]._parsed_user_types_cache.pop(dummy_name, None) + self.vtypejsons[primary_isf_path]._isf.user_types[dummy_name] = VtypeUserType( + kind="struct", + size=dummy_size, + fields={"arr": VtypeStructField(type_info=t_dummy, offset=0, name="arr")}, + name=dummy_name + ) instance = self._create_instance(dummy_name, proxy, instance_offset_in_buffer=address) return instance.arr @@ -782,12 +782,12 @@ def from_buffer( dummy_name = f"__dummy_{id(python_buffer)}_{offset}_{hash(str(t))}" primary_isf_path = self._file_order[0] - self.vtypejsons[primary_isf_path]._raw_user_types[dummy_name] = { - "kind": "struct", - "size": dummy_size, - "fields": {"arr": {"offset": 0, "type": t}}, - } - self.vtypejsons[primary_isf_path]._parsed_user_types_cache.pop(dummy_name, None) + self.vtypejsons[primary_isf_path]._isf.user_types[dummy_name] = VtypeUserType( + kind="struct", + size=dummy_size, + fields={"arr": VtypeStructField(type_info=t, offset=0, name="arr")}, + name=dummy_name + ) instance = self._create_instance(dummy_name, python_buffer, instance_offset_in_buffer=offset, base_address=address) return instance.arr diff --git a/src/dwarffi/parser.py b/src/dwarffi/parser.py index 27a7a10..4b1cd34 100644 --- a/src/dwarffi/parser.py +++ b/src/dwarffi/parser.py @@ -2,16 +2,9 @@ import lzma from typing import Any, Dict, List, Optional, Union -try: - import ujson as json +import msgspec - _JSON_LIB_USED = "ujson" -except ImportError: - import json - - _JSON_LIB_USED = "json" - -from .types import VtypeBaseType, VtypeEnum, VtypeMetadata, VtypeSymbol, VtypeUserType +from .types import ISFData, VtypeBaseType, VtypeEnum, VtypeMetadata, VtypeSymbol, VtypeUserType class VtypeJson: @@ -19,11 +12,11 @@ class VtypeJson: Parser and container for Intermediate Structure Format (ISF) data. This class handles the ingestion of ISF JSON data (from dictionaries, files, or - compressed .xz streams) and provides a lazy-loading interface to resolve - Dwarf-derived types and symbols. + compressed .xz streams) utilizing msgspec for accelerated loading and strict schema + enforcement. """ - def __init__(self, isf_input: Union[Dict[str, Any], str, io.IOBase]): + def __init__(self, isf_input: Union[Dict[str, Any], bytes, str, io.IOBase]): """ Initializes an ISF definition from a dictionary, file path, or file-like object. @@ -36,63 +29,50 @@ def __init__(self, isf_input: Union[Dict[str, Any], str, io.IOBase]): ValueError: If JSON is malformed or required ISF sections are missing. TypeError: If input is not one of the supported types. """ - raw_data: Dict[str, Any] - - if isinstance(isf_input, dict): - raw_data = isf_input - elif isinstance(isf_input, str): - # Treat string as a file path - is_xz = isf_input.endswith(".xz") - try: + try: + if isinstance(isf_input, dict): + self._isf = msgspec.convert(isf_input, type=ISFData) + elif isinstance(isf_input, bytes): + self._isf = msgspec.json.decode(isf_input, type=ISFData) + elif isinstance(isf_input, str): + is_xz = isf_input.endswith(".xz") if is_xz: - with lzma.open(isf_input, "rt", encoding="utf-8") as f: - raw_data = json.load(f) + with lzma.open(isf_input, "rb") as f: + try: + file_data = f.read() + except lzma.LZMAError as e: + raise ValueError(f"Error decompressing XZ file {isf_input}.") from e + self._isf = msgspec.json.decode(file_data, type=ISFData) else: - with open(isf_input, "r", encoding="utf-8") as f: - raw_data = json.load(f) - except FileNotFoundError as e: - raise FileNotFoundError(f"The ISF JSON file was not found: {isf_input}") from e - except (IOError, OSError) as e: - raise ValueError(f"Could not open or read file '{isf_input}'. Error: {e}") from e - except json.JSONDecodeError as e: - raise ValueError(f"Error decoding JSON from file {isf_input} (using {_JSON_LIB_USED}).") from e - except lzma.LZMAError as e: - raise ValueError(f"Error decompressing XZ file {isf_input}.") from e - elif hasattr(isf_input, "read"): - # Treat as a file-like object - try: - raw_data = json.load(isf_input) - except json.JSONDecodeError as e: - raise ValueError(f"Error decoding JSON from file-like object (using {_JSON_LIB_USED}).") from e - else: - raise TypeError(f"Input must be a dict, file path (str), or file-like object. Got {type(isf_input)}.") - - if not isinstance(raw_data, dict): - raise ValueError("ISF JSON root must be an object, not a list or other type.") - - # Basic Schema Validation - required_sections = ["base_types", "user_types"] - missing = [s for s in required_sections if s not in raw_data] - if missing: - raise ValueError(f"ISF is missing required top-level sections: {missing}") - - # Ensure all user types have a 'kind' - for name, definition in raw_data.get("user_types", {}).items(): - if "kind" not in definition: - raise ValueError(f"User type '{name}' is missing the required 'kind' field (struct, union, etc).") - - # Initialize core data structures and metadata - self.metadata: VtypeMetadata = VtypeMetadata(raw_data.get("metadata", {})) - self._raw_base_types: Dict[str, Any] = raw_data.get("base_types", {}) - self._parsed_base_types_cache: Dict[str, VtypeBaseType] = {} - self._raw_user_types: Dict[str, Any] = raw_data.get("user_types", {}) - self._parsed_user_types_cache: Dict[str, VtypeUserType] = {} - self._raw_enums: Dict[str, Any] = raw_data.get("enums", {}) - self._parsed_enums_cache: Dict[str, VtypeEnum] = {} - self._raw_symbols: Dict[str, Any] = raw_data.get("symbols", {}) - self._parsed_symbols_cache: Dict[str, VtypeSymbol] = {} + with open(isf_input, "rb") as f: + self._isf = msgspec.json.decode(f.read(), type=ISFData) + elif hasattr(isf_input, "read"): + data = isf_input.read() + if isinstance(data, str): + data = data.encode("utf-8") + self._isf = msgspec.json.decode(data, type=ISFData) + else: + raise TypeError(f"Input must be a dict, bytes, file path (str), or file-like object. Got {type(isf_input)}.") + + except FileNotFoundError as e: + raise FileNotFoundError(f"The ISF JSON file was not found: {isf_input}") from e + except (IOError, OSError) as e: + raise ValueError(f"Could not open or read file '{isf_input}'. Error: {e}") from e + except msgspec.ValidationError as e: + err_str = str(e) + if "Expected `object`" in err_str and "got `array`" in err_str: + raise ValueError("ISF JSON root must be an object, not a list or other type.") from e + if "missing required field `kind`" in err_str: + raise ValueError("missing the required 'kind' field (struct, union, etc).") from e + if "missing required field `base_types`" in err_str or "missing required field `user_types`" in err_str: + raise ValueError("ISF is missing required top-level sections") from e + raise ValueError(f"ISF format validation failed: {e}") from e + + except msgspec.DecodeError as e: + raise ValueError(f"Error decoding JSON: {e}") from e + + self.metadata: VtypeMetadata = self._isf.metadata self._address_to_symbol_list_cache: Optional[Dict[int, List[VtypeSymbol]]] = None - self._raw_typedefs: Dict[str, Any] = raw_data.get("typedefs", {}) def _resolve_type_info(self, type_info: Dict[str, Any]) -> Dict[str, Any]: """ @@ -113,7 +93,7 @@ def _resolve_type_info(self, type_info: Dict[str, Any]) -> Dict[str, Any]: if name in visited: raise ValueError(f"Circular typedef: {name}") visited.add(name) - td = self._raw_typedefs.get(name) + td = self._isf.typedefs.get(name) if not td: break current = td @@ -127,63 +107,26 @@ def shift_symbol_addresses(self, delta: int) -> None: Args: delta: The integer amount to shift addresses by. """ - for _sym_name, sym_data in self._raw_symbols.items(): - if ( - sym_data is not None - and "address" in sym_data - and sym_data["address"] not in [None, 0] - ): - sym_data["address"] += delta - for sym_obj in self._parsed_symbols_cache.values(): + for sym_obj in self._isf.symbols.values(): if sym_obj.address not in [None, 0]: sym_obj.address += delta - - # Invalidate the reverse lookup cache after a shift self._address_to_symbol_list_cache = None def get_base_type(self, name: str) -> Optional[VtypeBaseType]: """Retrieves a cached VtypeBaseType object by name.""" - if name in self._parsed_base_types_cache: - return self._parsed_base_types_cache[name] - raw_data = self._raw_base_types.get(name) - if raw_data is None: - return None - obj = VtypeBaseType(name, raw_data) - self._parsed_base_types_cache[name] = obj - return obj + return self._isf.base_types.get(name) def get_user_type(self, name: str) -> Optional[VtypeUserType]: """Retrieves a cached VtypeUserType (struct/union) by name.""" - if name in self._parsed_user_types_cache: - return self._parsed_user_types_cache[name] - raw_data = self._raw_user_types.get(name) - if raw_data is None: - return None - obj = VtypeUserType(name, raw_data) - self._parsed_user_types_cache[name] = obj - return obj + return self._isf.user_types.get(name) def get_enum(self, name: str) -> Optional[VtypeEnum]: """Retrieves a cached VtypeEnum object by name.""" - if name in self._parsed_enums_cache: - return self._parsed_enums_cache[name] - raw_data = self._raw_enums.get(name) - if raw_data is None: - return None - obj = VtypeEnum(name, raw_data) - self._parsed_enums_cache[name] = obj - return obj + return self._isf.enums.get(name) def get_symbol(self, name: str) -> Optional[VtypeSymbol]: """Retrieves a cached VtypeSymbol object by name.""" - if name in self._parsed_symbols_cache: - return self._parsed_symbols_cache[name] - raw_data = self._raw_symbols.get(name) - if raw_data is None: - return None - obj = VtypeSymbol(name, raw_data) - self._parsed_symbols_cache[name] = obj - return obj + return self._isf.symbols.get(name) def get_type(self, name: str) -> Optional[Union[VtypeUserType, VtypeBaseType, VtypeEnum]]: """ @@ -219,9 +162,8 @@ def get_symbols_by_address(self, target_address: int) -> List[VtypeSymbol]: """ if self._address_to_symbol_list_cache is None: self._address_to_symbol_list_cache = {} - for symbol_name in self._raw_symbols.keys(): - symbol_obj = self.get_symbol(symbol_name) - if symbol_obj and symbol_obj.address is not None: + for symbol_obj in self._isf.symbols.values(): + if symbol_obj.address is not None: self._address_to_symbol_list_cache.setdefault(symbol_obj.address, []).append( symbol_obj ) @@ -267,6 +209,6 @@ def get_type_size(self, in_type_info: Dict[str, Any]) -> Optional[int]: def __repr__(self) -> str: return ( - f"" + f"" ) diff --git a/src/dwarffi/types.py b/src/dwarffi/types.py index c8ee7b4..5504bdf 100644 --- a/src/dwarffi/types.py +++ b/src/dwarffi/types.py @@ -2,51 +2,40 @@ import struct from typing import Any, Dict, List, Optional, Tuple, Union +import msgspec -class SourceMetadata: - """Represents source file metadata within the ISF, tracking provenance.""" - - __slots__ = "kind", "name", "hash_type", "hash_value" - def __init__(self, data: Dict[str, Any]): - self.kind: Optional[str] = data.get("kind") - self.name: Optional[str] = data.get("name") - self.hash_type: Optional[str] = data.get("hash_type") - self.hash_value: Optional[str] = data.get("hash_value") +class SourceMetadata(msgspec.Struct): + """Represents source file metadata within the ISF, tracking provenance.""" + kind: Optional[str] = None + name: Optional[str] = None + hash_type: Optional[str] = None + hash_value: Optional[str] = None def __repr__(self) -> str: return f"" -class UnixMetadata: +class UnixMetadata(msgspec.Struct): """Represents Unix-specific (Linux/Mac) metadata grouping symbols and types.""" + symbols: List[Optional[SourceMetadata]] = msgspec.field(default_factory=list) + types: List[Optional[SourceMetadata]] = msgspec.field(default_factory=list) - __slots__ = "symbols", "types" - - def __init__(self, data: Dict[str, Any]): - self.symbols: List[SourceMetadata] = [ - SourceMetadata(s_data) for s_data in data.get("symbols", []) if s_data - ] - self.types: List[SourceMetadata] = [ - SourceMetadata(t_data) for t_data in data.get("types", []) if t_data - ] + def __post_init__(self): + # Filter out null entries + self.symbols = [s for s in self.symbols if s is not None] + self.types = [t for t in self.types if t is not None] def __repr__(self) -> str: return f"" -class VtypeMetadata: +class VtypeMetadata(msgspec.Struct): """Represents the top-level provenance and format metadata in the ISF.""" - - __slots__ = "linux", "mac", "producer", "format_version" - - def __init__(self, data: Dict[str, Any]): - self.linux: Optional[UnixMetadata] = ( - UnixMetadata(data["linux"]) if data.get("linux") else None - ) - self.mac: Optional[UnixMetadata] = UnixMetadata(data["mac"]) if data.get("mac") else None - self.producer: Dict[str, str] = data.get("producer", {}) - self.format_version: Optional[str] = data.get("format") + linux: Optional[UnixMetadata] = None + mac: Optional[UnixMetadata] = None + producer: Dict[str, str] = msgspec.field(default_factory=dict) + format_version: Optional[str] = msgspec.field(name="format", default=None) def __repr__(self) -> str: return ( @@ -113,23 +102,16 @@ def pack_into(self, buffer: Union[bytearray, memoryview], offset: int, value: Un buffer[offset : offset + valid_len] = value[:valid_len] -class VtypeBaseType: +class VtypeBaseType(msgspec.Struct): """ Represents a primitive base type definition (e.g., int, char, float). - - Caches a `struct.Struct` object (or an equivalent duck-type) for - high-performance memory packing/unpacking. """ - - __slots__ = "name", "size", "signed", "kind", "endian", "_compiled_struct" - - def __init__(self, name: str, data: Dict[str, Any]): - self.name: str = name - self.size: int = data.get("size", 0) - self.signed: bool = data.get("signed", False) - self.kind: str = data.get("kind", "int") - self.endian: str = data.get("endian", "little") - self._compiled_struct: Any = None + size: int = 0 + signed: bool = False + kind: str = "int" + endian: str = "little" + name: str = "" + _compiled_struct: Any = msgspec.field(default=None) def get_compiled_struct(self) -> Any: """ @@ -190,16 +172,12 @@ def __repr__(self) -> str: return f"" -class VtypeStructField: +class VtypeStructField(msgspec.Struct): """Represents a single field within a user-defined struct or union.""" - - __slots__ = "name", "type_info", "offset", "anonymous" - - def __init__(self, name: str, data: Dict[str, Any]): - self.name: str = name - self.type_info: Dict[str, Any] = data.get("type", {}) - self.offset: int = data.get("offset", 0) - self.anonymous: bool = data.get("anonymous", False) + type_info: Dict[str, Any] = msgspec.field(name="type", default_factory=dict) + offset: int = 0 + anonymous: bool = False + name: str = "" def __repr__(self) -> str: type_kind = self.type_info.get("kind", "unknown") @@ -208,27 +186,28 @@ def __repr__(self) -> str: return f"" -class VtypeUserType: +class VtypeUserType(msgspec.Struct): """ Represents a complex user-defined type (struct or union). Supports O(1) flattened field lookups and optimized block-unpacking for primitive-only structures. """ - - __slots__ = "name", "size", "fields", "kind", "_flattened_fields", "_aggregated_struct" - - def __init__(self, name: str, data: Dict[str, Any]): - self.name: str = name - self.size: int = data.get("size", 0) - self.fields: Dict[str, VtypeStructField] = { - f_name: VtypeStructField(f_name, f_data) - for f_name, f_data in data.get("fields", {}).items() - if f_data - } - self.kind: str = data.get("kind", "struct") - self._flattened_fields: Optional[Dict[str, Tuple[VtypeStructField, int, Dict[str, Any], Any]]] = None - self._aggregated_struct: Optional[struct.Struct] = None + kind: str + size: int = 0 + fields: Dict[str, Optional[VtypeStructField]] = msgspec.field(default_factory=dict) + name: str = "" + _flattened_fields: Optional[Dict[str, Tuple[VtypeStructField, int, Dict[str, Any], Any]]] = msgspec.field(default=None) + _aggregated_struct: Optional[struct.Struct] = msgspec.field(default=None) + + def __post_init__(self): + if self.fields: + clean_fields = {} + for k, v in self.fields.items(): + if v is not None: + v.name = k + clean_fields[k] = v + self.fields = clean_fields def get_flattened_fields(self, vtype_accessor: Any) -> Dict[str, Tuple[VtypeStructField, int, Dict[str, Any], Any]]: """ @@ -362,17 +341,13 @@ def __str__(self) -> str: return self.pretty_print() -class VtypeEnum: +class VtypeEnum(msgspec.Struct): """Represents a C enumeration and its constant mappings.""" - - __slots__ = "name", "size", "base", "constants", "_val_to_name" - - def __init__(self, name: str, data: Dict[str, Any]): - self.name: str = name - self.size: int = data.get("size", 0) - self.base: Optional[str] = data.get("base") - self.constants: Dict[str, int] = data.get("constants", {}) - self._val_to_name: Optional[Dict[int, str]] = None + size: int = 0 + base: Optional[str] = None + constants: Dict[str, int] = msgspec.field(default_factory=dict) + name: str = "" + _val_to_name: Optional[Dict[int, str]] = msgspec.field(default=None) def get_name_for_value(self, value: int) -> Optional[str]: """Performs a reverse lookup to find a constant name for an integer value.""" @@ -408,16 +383,12 @@ def __str__(self) -> str: return self.pretty_print() -class VtypeSymbol: +class VtypeSymbol(msgspec.Struct): """Represents a global symbol (function or variable) and its memory location.""" - - __slots__ = "name", "type_info", "address", "constant_data" - - def __init__(self, name: str, data: Dict[str, Any]): - self.name: str = name - self.type_info: Optional[Dict[str, Any]] = data.get("type") - self.address: Optional[int] = data.get("address") - self.constant_data: Optional[str] = data.get("constant_data") + type_info: Optional[Dict[str, Any]] = msgspec.field(name="type", default=None) + address: Optional[int] = None + constant_data: Optional[str] = None + name: str = "" def get_decoded_constant_data(self) -> Optional[bytes]: """Decodes base64-encoded constant data associated with the symbol.""" @@ -458,4 +429,39 @@ def pretty_print(self) -> str: return f"Symbol {self.name} @ {addr_str} (Type: {type_str})" def __str__(self) -> str: - return self.pretty_print() \ No newline at end of file + return self.pretty_print() + + +class ISFData(msgspec.Struct): + """Top-Level ISF Parsing Structure""" + base_types: Dict[str, Optional[VtypeBaseType]] + user_types: Dict[str, Optional[VtypeUserType]] + metadata: VtypeMetadata = msgspec.field(default_factory=VtypeMetadata) + enums: Dict[str, Optional[VtypeEnum]] = msgspec.field(default_factory=dict) + symbols: Dict[str, Optional[VtypeSymbol]] = msgspec.field(default_factory=dict) + typedefs: Dict[str, Any] = msgspec.field(default_factory=dict) + + def __post_init__(self): + if self.base_types: + self.base_types = {k: v for k, v in self.base_types.items() if v is not None} + for k, v in self.base_types.items(): + v.name = k + else: + self.base_types = {} + + if self.user_types: + self.user_types = {k: v for k, v in self.user_types.items() if v is not None} + for k, v in self.user_types.items(): + v.name = k + else: + self.user_types = {} + + if self.enums: + self.enums = {k: v for k, v in self.enums.items() if v is not None} + for k, v in self.enums.items(): + v.name = k + + if self.symbols: + self.symbols = {k: v for k, v in self.symbols.items() if v is not None} + for k, v in self.symbols.items(): + v.name = k \ No newline at end of file diff --git a/tests/test_advanced.py b/tests/test_advanced.py index 37aad08..f92ed4f 100644 --- a/tests/test_advanced.py +++ b/tests/test_advanced.py @@ -3,6 +3,7 @@ import pytest from dwarffi import DFFI, BoundTypeInstance +from dwarffi.types import VtypeStructField, VtypeUserType @pytest.fixture @@ -307,17 +308,19 @@ def test_union_in_union_overlap(adv_ffi_env): assert u.a == 0xAABBCCFF def test_function_pointer_repr(adv_ffi_env): - # Inject a struct with a function pointer into the environment - adv_ffi_env.vtypejsons[adv_ffi_env._file_order[0]]._raw_user_types["callback_struct"] = { - "kind": "struct", "size": 8, - "fields": { - "on_click": { - "offset": 0, - # Simulate dwarf2json's "function" kind - "type": {"kind": "function", "name": "click_handler_fn"} - } + # Inject a struct with a function pointer into the environment natively + adv_ffi_env.vtypejsons[adv_ffi_env._file_order[0]]._isf.user_types["callback_struct"] = VtypeUserType( + name="callback_struct", + kind="struct", + size=8, + fields={ + "on_click": VtypeStructField( + name="on_click", + offset=0, + type_info={"kind": "function", "name": "click_handler_fn"} + ) } - } + ) inst = adv_ffi_env.new("struct callback_struct") diff --git a/tests/test_comp.py b/tests/test_comp.py index f7232cb..ad0fd37 100644 --- a/tests/test_comp.py +++ b/tests/test_comp.py @@ -61,6 +61,7 @@ from dwarffi.types import ( VtypeBaseType, VtypeEnum, + VtypeStructField, VtypeSymbol, VtypeUserType, _FallbackBytesStruct, @@ -265,7 +266,7 @@ def test_size_attribute(self): class TestVtypeBaseTypeExoticKinds: def test_bool_kind_compiles(self): - bt = VtypeBaseType("mybool", {"kind": "bool", "size": 1, "signed": False, "endian": "little"}) + bt = VtypeBaseType(name="mybool", kind="bool", size=1, signed=False, endian="little") cs = bt.get_compiled_struct() assert cs is not None buf = bytearray(1) @@ -273,7 +274,7 @@ def test_bool_kind_compiles(self): assert buf[0] == 1 def test_char_kind_unsigned(self): - bt = VtypeBaseType("char", {"kind": "char", "size": 1, "signed": False, "endian": "little"}) + bt = VtypeBaseType(name="char", kind="char", size=1, signed=False, endian="little") cs = bt.get_compiled_struct() assert cs is not None buf = bytearray(1) @@ -281,14 +282,14 @@ def test_char_kind_unsigned(self): assert buf[0] == 255 def test_f16_half_precision(self): - bt = VtypeBaseType("f16", {"kind": "float", "size": 2, "signed": True, "endian": "little"}) + bt = VtypeBaseType(name="f16", kind="float", size=2, signed=True, endian="little") cs = bt.get_compiled_struct() assert cs is not None assert cs.size == 2 def test_f32_and_f64(self): - f32 = VtypeBaseType("f32", {"kind": "float", "size": 4, "signed": True, "endian": "little"}) - f64 = VtypeBaseType("f64", {"kind": "float", "size": 8, "signed": True, "endian": "little"}) + f32 = VtypeBaseType(name="f32", kind="float", size=4, signed=True, endian="little") + f64 = VtypeBaseType(name="f64", kind="float", size=8, signed=True, endian="little") assert f32.get_compiled_struct() is not None assert f64.get_compiled_struct() is not None @@ -350,17 +351,17 @@ def test_union_overlap_blocks_aggregation(self): class TestVtypeEnumCache: def test_val_to_name_starts_none(self): - e = VtypeEnum("e", {"size": 4, "base": "int", "constants": {"A": 1, "B": 2}}) + e = VtypeEnum(name="e", size=4, base="int", constants={"A": 1, "B": 2}) assert e._val_to_name is None def test_get_name_for_value_builds_cache(self): - e = VtypeEnum("e", {"size": 4, "base": "int", "constants": {"A": 1, "B": 2}}) + e = VtypeEnum(name="e", size=4, base="int", constants={"A": 1, "B": 2}) assert e.get_name_for_value(1) == "A" assert e._val_to_name is not None assert e.get_name_for_value(2) == "B" def test_unknown_value_returns_none(self): - e = VtypeEnum("e", {"size": 4, "base": "int", "constants": {"A": 0}}) + e = VtypeEnum(name="e", size=4, base="int", constants={"A": 0}) assert e.get_name_for_value(99) is None @@ -372,29 +373,29 @@ class TestVtypeSymbolConstantData: def test_valid_base64_decodes(self): payload = b"hello world" encoded = base64.b64encode(payload).decode() - s = VtypeSymbol("sym", {"address": 0x1000, "constant_data": encoded}) + s = VtypeSymbol(name="sym", address=0x1000, constant_data=encoded) assert s.get_decoded_constant_data() == payload def test_no_constant_data_returns_none(self): - s = VtypeSymbol("sym", {"address": 0x1000}) + s = VtypeSymbol(name="sym", address=0x1000) assert s.get_decoded_constant_data() is None def test_invalid_base64_returns_none(self): - s = VtypeSymbol("sym", {"address": 0x1000, "constant_data": "!!!not valid!!!"}) + s = VtypeSymbol(name="sym", address=0x1000, constant_data="!!!not valid!!!") assert s.get_decoded_constant_data() is None def test_symbol_pretty_print(self): - s = VtypeSymbol("my_var", {"address": 0xDEAD, "type": {"kind": "base", "name": "int"}}) + s = VtypeSymbol(name="my_var", address=0xDEAD, type_info={"kind": "base", "name": "int"}) out = s.pretty_print() assert "my_var" in out assert "0xdead" in out.lower() def test_symbol_str_equals_pretty_print(self): - s = VtypeSymbol("my_var", {"address": 0x10, "type": {"kind": "base", "name": "int"}}) + s = VtypeSymbol(name="my_var", address=0x10, type_info={"kind": "base", "name": "int"}) assert str(s) == s.pretty_print() def test_symbol_to_dict(self): - s = VtypeSymbol("foo", {"address": 0x400, "type": {"kind": "struct", "name": "bar"}}) + s = VtypeSymbol(name="foo", address=0x400, type_info={"kind": "struct", "name": "bar"}) d = s.to_dict() assert d["name"] == "foo" assert d["address"] == 0x400 @@ -408,7 +409,7 @@ def test_symbol_to_dict(self): class TestEnumInstanceEquality: @pytest.fixture def color_enum(self): - return VtypeEnum("color", {"size": 4, "base": "int", "constants": {"RED": 0, "GREEN": 1}}) + return VtypeEnum(name="color", size=4, base="int", constants={"RED": 0, "GREEN": 1}) def test_equal_to_same_value(self, color_enum): assert EnumInstance(color_enum, 0) == EnumInstance(color_enum, 0) @@ -963,13 +964,13 @@ def test_offsetof_nested_into_non_struct_raises(self, struct_ffi): class TestVtypeTypePrettyPrint: def _make_user_type(self): - return VtypeUserType("foo", { - "kind": "struct", "size": 8, - "fields": {"a": {"offset": 0, "type": {"kind": "base", "name": "int"}, "anonymous": False}}, - }) + return VtypeUserType( + name="foo", kind="struct", size=8, + fields={"a": VtypeStructField(name="a", offset=0, type_info={"kind": "base", "name": "int"}, anonymous=False)} + ) def _make_enum(self): - return VtypeEnum("status", {"size": 4, "base": "int", "constants": {"OK": 0, "ERR": 1}}) + return VtypeEnum(name="status", size=4, base="int", constants={"OK": 0, "ERR": 1}) def test_user_type_to_dict(self): d = self._make_user_type().to_dict()