Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ RUN --mount=type=cache,target=/root/.cache/pip \
zstandard \
pdoc \
numpy \
dwarffi \
ratarmountcore[full]

FROM python_builder AS version_generator
Expand Down
108 changes: 50 additions & 58 deletions pyplugins/apis/kffi.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@
yield from plugins.kffi.write_kernel_memory(0xffff888000000000, b"\x90\x90\x90\x90")
"""

import functools
import inspect
from os.path import isfile, join, realpath
from typing import Any, Generator, Iterator, Optional, Tuple, Union

from wrappers.ctypes_wrap import Ptr, VtypeJsonGroup
from dwarffi.instances import Ptr
from dwarffi.dffi import DFFI

from wrappers.generic import Wrapper
from wrappers.ptregs_wrap import get_pt_regs_wrapper

Expand Down Expand Up @@ -77,11 +78,10 @@ def __init__(self) -> None:

self.logger.debug(f"Loading ISF file: {self.isf}")
self.igloo_ko_isf = realpath(join(kernel, f"../igloo.ko.{arch}.json.xz"))
self.ffi = VtypeJsonGroup([self.igloo_ko_isf, self.isf])
self.ffi = DFFI([self.igloo_ko_isf, self.isf])
self._tramp_callbacks = {}
self._tramp_addresses = {}
self.tramp_init = False
self._type_cache = {}

def __init_tramp_functionality(self):
if self.tramp_init:
Expand All @@ -97,12 +97,6 @@ def __init_tramp_functionality(self):
self.portal.register_interrupt_handler(
"kffi", self._tramp_interrupt_handler)

def _get_type(self, type_name: str) -> Any:
if type_name in self._type_cache:
return self._type_cache[type_name]
self._type_cache[type_name] = self.ffi.get_type(type_name)
return self._type_cache[type_name]

def new(self, type_: str) -> Any:
"""
Create a new instance of a type.
Expand All @@ -113,15 +107,12 @@ def new(self, type_: str) -> Any:
Returns:
Any: Instance of the type, or None if type not found.
"""
t = self._get_type(type_)
if not t:
try:
return self.ffi.new(type_)
except (KeyError, ValueError):
return None
size = t.size
buf = b"\x00" * size
return self.ffi.create_instance(t, buf)

def from_buffer(self, type_: str, buf: bytes,
instance_offset_in_buffer: int = 0) -> Any:
def from_buffer(self, type_: str, buf: bytes, instance_offset_in_buffer: int = 0) -> Any:
"""
Create an instance of a type from a buffer.

Expand All @@ -133,8 +124,9 @@ def from_buffer(self, type_: str, buf: bytes,
Returns:
Any: Instance of the type.
"""
t = self._get_type(type_)
return self.ffi.create_instance(t, buf, instance_offset_in_buffer)
"""Create an instance of a type from a buffer."""
# Ensure we pass a bytearray to dwarffi
return self.ffi.from_buffer(type_, bytearray(buf), offset=instance_offset_in_buffer)

def get_field_casted(self, struct: Any, field: str) -> Any:
"""
Expand All @@ -147,7 +139,7 @@ def get_field_casted(self, struct: Any, field: str) -> Any:
Any: Field value, or None if error occurs.
"""
try:
return self.panda.ffi.cast(struct._instance_type_def.fields[field].type_info["name"], getattr(struct, field))
return self.ffi.cast(struct._instance_type_def.fields[field].type_info["name"], getattr(struct, field))
except Exception as e:
self.logger.error(f"Error casting field {field} of struct {struct}: {e}")
return None
Expand All @@ -164,14 +156,14 @@ def read_type_panda(self, cpu: Any, addr: int, type_: str) -> Any:
Returns:
Any: Instance of the type, or None if read fails.
"""
t = self._get_type(type_)
if not t:
size = self.ffi.sizeof(type_)
if not size:
return None
buf = plugins.mem.read_bytes_panda(cpu, addr, t.size)
buf = plugins.mem.read_bytes_panda(cpu, addr, size)
if not buf:
self.logger.error(f"Failed to read bytes from {addr:#x}")
return None
return self.ffi.create_instance(t, buf)
return self.ffi.from_buffer(type_, bytearray(buf), address=addr)

def read_type(self, addr: int, type_: str) -> Generator[Any, Any, Any]:
"""
Expand All @@ -184,18 +176,16 @@ def read_type(self, addr: int, type_: str) -> Generator[Any, Any, Any]:
Returns:
Any: Instance of the type, or None if read fails.
"""
t = self._get_type(type_)
if not t:
size = self.ffi.sizeof(type_)
if not size:
return None
if isinstance(addr, Ptr):
addr = addr.address
buf = yield from plugins.mem.read_bytes(addr, t.size)
buf = yield from plugins.mem.read_bytes(addr, size)
if not buf:
self.logger.error(f"Failed to read bytes from {addr:#x}")
return None
instance = self.ffi.create_instance(t, buf)
if not hasattr(instance, "_address"):
setattr(instance, "_address", addr)
instance = self.ffi.from_buffer(type_, buf, address=addr)
return instance

def deref(self, ptr: Ptr) -> Generator[Any, Any, Any]:
Expand All @@ -211,7 +201,7 @@ def deref(self, ptr: Ptr) -> Generator[Any, Any, Any]:
if ptr.address == 0:
self.logger.error(f"Pointer address is 0: {ptr}")
return None
val = yield from self.read_type(ptr.address, ptr._subtype_info.get("name"))
val = yield from self.read_type(ptr.address, ptr.points_to_type_name)
return val

def ref(self, thing: Any) -> Optional[int]:
Expand All @@ -224,11 +214,7 @@ def ref(self, thing: Any) -> Optional[int]:
Returns:
int: The address, or None if no address attribute.
"""
if hasattr(thing, "address"):
return thing.address
if hasattr(thing, "_address"):
return thing._address
return None
return self.ffi.addressof(thing)

def get_enum_dict(self, enum_name: str) -> Wrapper:
"""
Expand All @@ -240,13 +226,12 @@ def get_enum_dict(self, enum_name: str) -> Wrapper:
Returns:
Wrapper: Wrapper containing enum constants.
"""
enum = self.ffi.get_enum(enum_name)
if not enum:
enum = self.ffi.get_type(enum_name)
if not enum or not hasattr(enum, "constants"):
self.logger.error(f"Enum {enum_name} not found in ISF")
return {}
return Wrapper(enum.constants)

@functools.lru_cache
def get_struct_size(self, struct_name: str) -> Optional[int]:
"""
Get the size of a struct.
Expand All @@ -257,9 +242,10 @@ def get_struct_size(self, struct_name: str) -> Optional[int]:
Returns:
Optional[int]: Size of the struct, or None if not found.
"""
struct = self.ffi.get_user_type(struct_name)
if struct:
return struct.size
try:
return self.ffi.sizeof(struct_name)
except (KeyError, ValueError):
return None

def sizeof(self, struct_name: str) -> Optional[int]:
"""
Expand All @@ -283,9 +269,13 @@ def get_function_address(self, function: str) -> Optional[int]:
Returns:
Optional[int]: Address of the function, or None if not found.
"""
sym = self.ffi.get_symbol(function)
if sym:
sym = self.ffi.get_symbol(function, path=self.igloo_ko_isf)
if sym and hasattr(sym, "address") and sym.address not in [None, 0]:
return sym.address
sym = self.ffi.get_symbol(function, path=self.isf)
if sym and hasattr(sym, "address") and sym.address not in [None, 0]:
return sym.address
return None

def _fixup_igloo_module_baseaddr(self, addr):
self.ffi.vtypejsons[self.igloo_ko_isf].shift_symbol_addresses(addr)
Expand Down Expand Up @@ -326,7 +316,7 @@ def _prepare_ffi_call(self, func_ptr: int, args: list, func_name: str = None) ->
kind = param_type.get("kind")
# Unsigned base type: convert negative ints
if kind == "base":
base_type = self.ffi.get_base_type(param_type.get("name"))
base_type = self.ffi.get_type(param_type.get("name"))
if base_type and base_type.signed is False and isinstance(arg, int) and arg < 0:
arg = arg % (1 << (base_type.size * 8))
# Pointer: allow int or Ptr
Expand Down Expand Up @@ -364,15 +354,15 @@ def _prepare_ffi_call(self, func_ptr: int, args: list, func_name: str = None) ->
total_bytes += len(b)
elif type(arg).__name__ == 'BoundTypeInstance':
if not hasattr(arg, 'address'):
to_write = arg.to_bytes()
to_write = bytes(self.ffi.buffer(arg))
raw_addr = yield from self.kmalloc(len(to_write) + 64)
if not raw_addr:
raise RuntimeError("Failed to allocate kernel memory for BoundTypeInstance")
aligned_addr = (raw_addr + 63) & ~63
yield from plugins.mem.write_bytes(aligned_addr, to_write)
boundtype_ptrs[i] = aligned_addr
elif hasattr(arg, 'to_bytes') and hasattr(arg, 'size'):
b = arg.to_bytes()
elif hasattr(arg, '__bytes__'):
b = bytes(arg)
arg_bytes.append(b)
arg_ptr_indices.append((i, total_bytes, len(b)))
total_bytes += len(b)
Expand Down Expand Up @@ -405,7 +395,9 @@ def _prepare_ffi_call(self, func_ptr: int, args: list, func_name: str = None) ->
ffi_call.args[i] = boundtype_ptrs[i]
else:
raise TypeError(f"Unsupported argument type for FFI: {type(arg)}")
return ffi_call.to_bytes(), kmem_addr, func_typeinfo

# ZERO-COPY UPGRADE: Extract the FFI portal structure dynamically via memoryview slice
return bytes(self.ffi.buffer(ffi_call)), kmem_addr, func_typeinfo

def call_kernel_function(
self, func: Union[int, str], *args: Any) -> Generator[Any, Any, Any]:
Expand Down Expand Up @@ -462,24 +454,26 @@ def call_kernel_function(
kind = ret_type.get("kind")
name = ret_type.get("name")
if kind == "base":
base_type = self.ffi.get_base_type(name)
base_type = self.ffi.get_type(name)
if base_type:
# Unsigned fixup
if base_type.signed is False and result < 0:
result = result % (1 << (base_type.size * 8))
# Convert to correct Python type
if base_type.kind == "int" or base_type.kind == "pointer":
if base_type.kind in ("int", "pointer"):
result = int(result)
elif base_type.kind == "float":
result = float(result)
elif base_type.kind == "bool":
result = bool(result)
elif kind == "enum":
enum_type = self.ffi.get_enum(name)
if enum_type:
result = enum_type and enum_type.base and self.ffi.get_base_type(enum_type.base)
if enum_type:
result = self.ffi.create_instance(enum_type, result.to_bytes(enum_type.size, 'little'))._value
try:
enum_size = self.ffi.sizeof(name)
# Bind bytearray and extract the bound EnumInstance underlying value dynamically
buf = bytearray(result.to_bytes(enum_size, 'little'))
result = self.ffi.from_buffer(name, buf)._value
except (KeyError, ValueError):
pass
elif kind == "pointer":
# Return a Ptr object
ptr_type = ret_type.get("subtype")
Expand All @@ -489,8 +483,6 @@ def call_kernel_function(
struct_type = name
if result != 0:
val = yield from self.read_type(result, struct_type)
if not hasattr(val, "_address"):
setattr(val, "_address", result)
result = val
else:
result = None
Expand Down
Loading
Loading