Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
- lint: disable rule caching during linting @Maijin #2817
- vmray: skip processes with invalid PID or missing filename @EclipseAditya #2807
- render: use default styling for dynamic -vv API/call details so they are easier to see @devs6186 #1865
- loader: handle struct.error from dnfile and show clear CorruptFile message @devs6186 #2442

### capa Explorer Web
- webui: fix 404 for "View rule in capa-rules" by using encodeURIComponent for rule name in URL @devs6186 #2482
Expand Down
41 changes: 32 additions & 9 deletions capa/features/extractors/dnfile/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@
import capa.features.extractors.dnfile.insn
import capa.features.extractors.dnfile.function
from capa.features.common import Feature
from capa.features.address import NO_ADDRESS, Address, DNTokenAddress, DNTokenOffsetAddress
from capa.features.address import (
NO_ADDRESS,
Address,
DNTokenAddress,
DNTokenOffsetAddress,
)
from capa.features.extractors.dnfile.types import DnType, DnUnmanagedMethod
from capa.features.extractors.base_extractor import (
BBHandle,
Expand All @@ -39,6 +44,7 @@
from capa.features.extractors.dnfile.helpers import (
get_dotnet_types,
get_dotnet_fields,
load_dotnet_image,
get_dotnet_managed_imports,
get_dotnet_managed_methods,
get_dotnet_unmanaged_imports,
Expand Down Expand Up @@ -68,7 +74,9 @@ def __init__(self, pe: dnfile.dnPE):
def get_import(self, token: int) -> Optional[Union[DnType, DnUnmanagedMethod]]:
return self.imports.get(token)

def get_native_import(self, token: int) -> Optional[Union[DnType, DnUnmanagedMethod]]:
def get_native_import(
self, token: int
) -> Optional[Union[DnType, DnUnmanagedMethod]]:
return self.native_imports.get(token)

def get_method(self, token: int) -> Optional[Union[DnType, DnUnmanagedMethod]]:
Expand All @@ -83,18 +91,26 @@ def get_type(self, token: int) -> Optional[Union[DnType, DnUnmanagedMethod]]:

class DnfileFeatureExtractor(StaticFeatureExtractor):
def __init__(self, path: Path):
self.pe: dnfile.dnPE = dnfile.dnPE(str(path))
self.pe = load_dotnet_image(path)
super().__init__(hashes=SampleHashes.from_bytes(path.read_bytes()))

# pre-compute .NET token lookup tables; each .NET method has access to this cache for feature extraction
# most relevant at instruction scope
self.token_cache: DnFileFeatureExtractorCache = DnFileFeatureExtractorCache(self.pe)
self.token_cache: DnFileFeatureExtractorCache = DnFileFeatureExtractorCache(
self.pe
)

# pre-compute these because we'll yield them at *every* scope.
self.global_features: list[tuple[Feature, Address]] = []
self.global_features.extend(capa.features.extractors.dotnetfile.extract_file_format())
self.global_features.extend(capa.features.extractors.dotnetfile.extract_file_os(pe=self.pe))
self.global_features.extend(capa.features.extractors.dotnetfile.extract_file_arch(pe=self.pe))
self.global_features.extend(
capa.features.extractors.dotnetfile.extract_file_format()
)
self.global_features.extend(
capa.features.extractors.dotnetfile.extract_file_os(pe=self.pe)
)
self.global_features.extend(
capa.features.extractors.dotnetfile.extract_file_arch(pe=self.pe)
)

def get_base_address(self):
return NO_ADDRESS
Expand All @@ -112,7 +128,12 @@ def get_functions(self) -> Iterator[FunctionHandle]:
fh: FunctionHandle = FunctionHandle(
address=DNTokenAddress(token),
inner=method,
ctx={"pe": self.pe, "calls_from": set(), "calls_to": set(), "cache": self.token_cache},
ctx={
"pe": self.pe,
"calls_from": set(),
"calls_to": set(),
"cache": self.token_cache,
},
)

# method tokens should be unique
Expand Down Expand Up @@ -160,7 +181,9 @@ def extract_basic_block_features(self, fh, bbh):
def get_instructions(self, fh, bbh):
for insn in bbh.inner.instructions:
yield InsnHandle(
address=DNTokenOffsetAddress(bbh.address, insn.offset - (fh.inner.offset + fh.inner.header_size)),
address=DNTokenOffsetAddress(
bbh.address, insn.offset - (fh.inner.offset + fh.inner.header_size)
),
inner=insn,
)

Expand Down
125 changes: 98 additions & 27 deletions capa/features/extractors/dnfile/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

from __future__ import annotations

import struct
import logging
from typing import Union, Iterator, Optional
from pathlib import Path

import dnfile
from dncil.cil.body import CilMethodBody
Expand All @@ -30,6 +32,16 @@
logger = logging.getLogger(__name__)


def load_dotnet_image(path: Path) -> dnfile.dnPE:
"""load a .NET PE file, raising CorruptFile on struct.error with the original error message."""
try:
return dnfile.dnPE(str(path))
except struct.error as e:
from capa.loader import CorruptFile

raise CorruptFile(f"Invalid or truncated .NET metadata: {e}") from e


class DnfileMethodBodyReader(CilMethodBodyReaderBase):
def __init__(self, pe: dnfile.dnPE, row: dnfile.mdtable.MethodDefRow):
self.pe: dnfile.dnPE = pe
Expand All @@ -48,7 +60,9 @@ def seek(self, offset: int) -> int:
return self.offset


def resolve_dotnet_token(pe: dnfile.dnPE, token: Token) -> Union[dnfile.base.MDTableRow, InvalidToken, str]:
def resolve_dotnet_token(
pe: dnfile.dnPE, token: Token
) -> Union[dnfile.base.MDTableRow, InvalidToken, str]:
"""map generic token to string or table row"""
assert pe.net is not None
assert pe.net.mdtables is not None
Expand All @@ -59,7 +73,9 @@ def resolve_dotnet_token(pe: dnfile.dnPE, token: Token) -> Union[dnfile.base.MDT
return InvalidToken(token.value)
return user_string

table: Optional[dnfile.base.ClrMetaDataTable] = pe.net.mdtables.tables.get(token.table)
table: Optional[dnfile.base.ClrMetaDataTable] = pe.net.mdtables.tables.get(
token.table
)
if table is None:
# table index is not valid
return InvalidToken(token.value)
Expand All @@ -71,7 +87,9 @@ def resolve_dotnet_token(pe: dnfile.dnPE, token: Token) -> Union[dnfile.base.MDT
return InvalidToken(token.value)


def read_dotnet_method_body(pe: dnfile.dnPE, row: dnfile.mdtable.MethodDefRow) -> Optional[CilMethodBody]:
def read_dotnet_method_body(
pe: dnfile.dnPE, row: dnfile.mdtable.MethodDefRow
) -> Optional[CilMethodBody]:
"""read dotnet method body"""
try:
return CilMethodBody(DnfileMethodBodyReader(pe, row))
Expand All @@ -90,7 +108,9 @@ def read_dotnet_user_string(pe: dnfile.dnPE, token: StringToken) -> Optional[str
return None

try:
user_string: Optional[dnfile.stream.UserString] = pe.net.user_strings.get(token.rid)
user_string: Optional[dnfile.stream.UserString] = pe.net.user_strings.get(
token.rid
)
except UnicodeDecodeError as e:
logger.debug("failed to decode #US stream index 0x%08x (%s)", token.rid, e)
return None
Expand Down Expand Up @@ -151,7 +171,9 @@ def get_dotnet_managed_imports(pe: dnfile.dnPE) -> Iterator[DnType]:
)


def get_dotnet_methoddef_property_accessors(pe: dnfile.dnPE) -> Iterator[tuple[int, str]]:
def get_dotnet_methoddef_property_accessors(
pe: dnfile.dnPE,
) -> Iterator[tuple[int, str]]:
"""get MethodDef methods used to access properties

see https://www.ntcore.com/files/dotnetformat.htm
Expand All @@ -162,7 +184,9 @@ def get_dotnet_methoddef_property_accessors(pe: dnfile.dnPE) -> Iterator[tuple[i
Method (index into the MethodDef table)
Association (index into the Event or Property table; more precisely, a HasSemantics coded index)
"""
for rid, method_semantics in iter_dotnet_table(pe, dnfile.mdtable.MethodSemantics.number):
for rid, method_semantics in iter_dotnet_table(
pe, dnfile.mdtable.MethodSemantics.number
):
assert isinstance(method_semantics, dnfile.mdtable.MethodSemanticsRow)

if method_semantics.Association.row is None:
Expand Down Expand Up @@ -216,17 +240,27 @@ def get_dotnet_managed_methods(pe: dnfile.dnPE) -> Iterator[DnType]:
logger.debug("TypeDef[0x%X] MethodList[0x%X] row is None", rid, idx)
continue

token: int = calculate_dotnet_token_value(method.table.number, method.row_index)
token: int = calculate_dotnet_token_value(
method.table.number, method.row_index
)
access: Optional[str] = accessor_map.get(token)

method_name: str = str(method.row.Name)
if method_name.startswith(("get_", "set_")):
# remove get_/set_
method_name = method_name[4:]

typedefnamespace, typedefname = resolve_nested_typedef_name(nested_class_table, rid, typedef, pe)
typedefnamespace, typedefname = resolve_nested_typedef_name(
nested_class_table, rid, typedef, pe
)

yield DnType(token, typedefname, namespace=typedefnamespace, member=method_name, access=access)
yield DnType(
token,
typedefname,
namespace=typedefnamespace,
member=method_name,
access=access,
)


def get_dotnet_fields(pe: dnfile.dnPE) -> Iterator[DnType]:
Expand All @@ -253,18 +287,28 @@ def get_dotnet_fields(pe: dnfile.dnPE) -> Iterator[DnType]:
logger.debug("TypeDef[0x%X] FieldList[0x%X] row is None", rid, idx)
continue

typedefnamespace, typedefname = resolve_nested_typedef_name(nested_class_table, rid, typedef, pe)
typedefnamespace, typedefname = resolve_nested_typedef_name(
nested_class_table, rid, typedef, pe
)

token: int = calculate_dotnet_token_value(field.table.number, field.row_index)
yield DnType(token, typedefname, namespace=typedefnamespace, member=field.row.Name)
token: int = calculate_dotnet_token_value(
field.table.number, field.row_index
)
yield DnType(
token, typedefname, namespace=typedefnamespace, member=field.row.Name
)


def get_dotnet_managed_method_bodies(pe: dnfile.dnPE) -> Iterator[tuple[int, CilMethodBody]]:
def get_dotnet_managed_method_bodies(
pe: dnfile.dnPE,
) -> Iterator[tuple[int, CilMethodBody]]:
"""get managed methods from MethodDef table"""
for rid, method_def in iter_dotnet_table(pe, dnfile.mdtable.MethodDef.number):
assert isinstance(method_def, dnfile.mdtable.MethodDefRow)

if not method_def.ImplFlags.miIL or any((method_def.Flags.mdAbstract, method_def.Flags.mdPinvokeImpl)):
if not method_def.ImplFlags.miIL or any(
(method_def.Flags.mdAbstract, method_def.Flags.mdPinvokeImpl)
):
# skip methods that do not have a method body
continue

Expand Down Expand Up @@ -310,7 +354,9 @@ def get_dotnet_unmanaged_imports(pe: dnfile.dnPE) -> Iterator[DnUnmanagedMethod]
# ECMA says "Each row of the ImplMap table associates a row in the MethodDef table (MemberForwarded) with the
# name of a routine (ImportName) in some unmanaged DLL (ImportScope)"; so we calculate and map the MemberForwarded
# MethodDef table token to help us later record native import method calls made from CIL
token: int = calculate_dotnet_token_value(member_forward_table, member_forward_row)
token: int = calculate_dotnet_token_value(
member_forward_table, member_forward_row
)

# like Kernel32.dll
if module and "." in module:
Expand All @@ -320,14 +366,18 @@ def get_dotnet_unmanaged_imports(pe: dnfile.dnPE) -> Iterator[DnUnmanagedMethod]
yield DnUnmanagedMethod(token, module, method)


def get_dotnet_table_row(pe: dnfile.dnPE, table_index: int, row_index: int) -> Optional[dnfile.base.MDTableRow]:
def get_dotnet_table_row(
pe: dnfile.dnPE, table_index: int, row_index: int
) -> Optional[dnfile.base.MDTableRow]:
assert pe.net is not None
assert pe.net.mdtables is not None

if row_index - 1 <= 0:
return None

table: Optional[dnfile.base.ClrMetaDataTable] = pe.net.mdtables.tables.get(table_index)
table: Optional[dnfile.base.ClrMetaDataTable] = pe.net.mdtables.tables.get(
table_index
)
if table is None:
return None

Expand All @@ -338,7 +388,10 @@ def get_dotnet_table_row(pe: dnfile.dnPE, table_index: int, row_index: int) -> O


def resolve_nested_typedef_name(
nested_class_table: dict, index: int, typedef: dnfile.mdtable.TypeDefRow, pe: dnfile.dnPE
nested_class_table: dict,
index: int,
typedef: dnfile.mdtable.TypeDefRow,
pe: dnfile.dnPE,
) -> tuple[str, tuple[str, ...]]:
"""Resolves all nested TypeDef class names. Returns the namespace as a str and the nested TypeRef name as a tuple"""

Expand All @@ -351,7 +404,9 @@ def resolve_nested_typedef_name(

while nested_class_table[index] in nested_class_table:
# Iterate through the typedef table to resolve the nested name
table_row = get_dotnet_table_row(pe, dnfile.mdtable.TypeDef.number, nested_class_table[index])
table_row = get_dotnet_table_row(
pe, dnfile.mdtable.TypeDef.number, nested_class_table[index]
)
if table_row is None:
return str(typedef.TypeNamespace), tuple(typedef_name[::-1])

Expand All @@ -360,7 +415,9 @@ def resolve_nested_typedef_name(
index = nested_class_table[index]

# Document the root enclosing details
table_row = get_dotnet_table_row(pe, dnfile.mdtable.TypeDef.number, nested_class_table[index])
table_row = get_dotnet_table_row(
pe, dnfile.mdtable.TypeDef.number, nested_class_table[index]
)
if table_row is None:
return str(typedef.TypeNamespace), tuple(typedef_name[::-1])

Expand Down Expand Up @@ -392,7 +449,9 @@ def resolve_nested_typeref_name(
# Iterate through the typeref table to resolve the nested name
typeref_name.append(name)
name = str(table_row.TypeName)
table_row = get_dotnet_table_row(pe, dnfile.mdtable.TypeRef.number, table_row.ResolutionScope.row_index)
table_row = get_dotnet_table_row(
pe, dnfile.mdtable.TypeRef.number, table_row.ResolutionScope.row_index
)
if table_row is None:
return str(typeref.TypeNamespace), tuple(typeref_name[::-1])

Expand All @@ -412,7 +471,9 @@ def get_dotnet_nested_class_table_index(pe: dnfile.dnPE) -> dict[int, int]:
# Used to find nested classes in typedef
for _, nestedclass in iter_dotnet_table(pe, dnfile.mdtable.NestedClass.number):
assert isinstance(nestedclass, dnfile.mdtable.NestedClassRow)
nested_class_table[nestedclass.NestedClass.row_index] = nestedclass.EnclosingClass.row_index
nested_class_table[nestedclass.NestedClass.row_index] = (
nestedclass.EnclosingClass.row_index
)

return nested_class_table

Expand All @@ -424,17 +485,25 @@ def get_dotnet_types(pe: dnfile.dnPE) -> Iterator[DnType]:
for rid, typedef in iter_dotnet_table(pe, dnfile.mdtable.TypeDef.number):
assert isinstance(typedef, dnfile.mdtable.TypeDefRow)

typedefnamespace, typedefname = resolve_nested_typedef_name(nested_class_table, rid, typedef, pe)
typedefnamespace, typedefname = resolve_nested_typedef_name(
nested_class_table, rid, typedef, pe
)

typedef_token: int = calculate_dotnet_token_value(dnfile.mdtable.TypeDef.number, rid)
typedef_token: int = calculate_dotnet_token_value(
dnfile.mdtable.TypeDef.number, rid
)
yield DnType(typedef_token, typedefname, namespace=typedefnamespace)

for rid, typeref in iter_dotnet_table(pe, dnfile.mdtable.TypeRef.number):
assert isinstance(typeref, dnfile.mdtable.TypeRefRow)

typerefnamespace, typerefname = resolve_nested_typeref_name(typeref.ResolutionScope.row_index, typeref, pe)
typerefnamespace, typerefname = resolve_nested_typeref_name(
typeref.ResolutionScope.row_index, typeref, pe
)

typeref_token: int = calculate_dotnet_token_value(dnfile.mdtable.TypeRef.number, rid)
typeref_token: int = calculate_dotnet_token_value(
dnfile.mdtable.TypeRef.number, rid
)
yield DnType(typeref_token, typerefname, namespace=typerefnamespace)


Expand All @@ -449,7 +518,9 @@ def is_dotnet_mixed_mode(pe: dnfile.dnPE) -> bool:
return not bool(pe.net.Flags.CLR_ILONLY)


def iter_dotnet_table(pe: dnfile.dnPE, table_index: int) -> Iterator[tuple[int, dnfile.base.MDTableRow]]:
def iter_dotnet_table(
pe: dnfile.dnPE, table_index: int
) -> Iterator[tuple[int, dnfile.base.MDTableRow]]:
assert pe.net is not None
assert pe.net.mdtables is not None

Expand Down
Loading
Loading