Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
-

### Bug Fixes
- loader: fix recycled TID overwriting matched calls in dynamic layout, causing ValueError during rendering @devs6186 #2619
- main: suggest --os flag in unsupported OS error message to help users override ELF OS detection @devs6186 #2577
- render: escape sample-controlled strings before passing to Rich to prevent MarkupError @devs6186 #2699
- Fixed insecure deserialization vulnerability in YAML loading @0x1622 (#2770)
Expand Down
129 changes: 99 additions & 30 deletions capa/loader.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file contains many format-based changes. Please revert these if they are not necessary to pass linting checks.

Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@
import capa.features.extractors.common
from capa.rules import RuleSet
from capa.engine import MatchResults
from capa.exceptions import UnsupportedOSError, UnsupportedArchError, UnsupportedFormatError
from capa.exceptions import (
UnsupportedOSError,
UnsupportedArchError,
UnsupportedFormatError,
)
from capa.features.common import (
OS_AUTO,
FORMAT_PE,
Expand Down Expand Up @@ -160,9 +164,13 @@ def get_workspace(path: Path, input_format: str, sigpaths: list[Path]):
vw = viv_utils.getWorkspace(str(path), analyze=False, should_save=False)
elif input_format == FORMAT_SC32:
# these are not analyzed nor saved.
vw = viv_utils.getShellcodeWorkspaceFromFile(str(path), arch="i386", analyze=False)
vw = viv_utils.getShellcodeWorkspaceFromFile(
str(path), arch="i386", analyze=False
)
elif input_format == FORMAT_SC64:
vw = viv_utils.getShellcodeWorkspaceFromFile(str(path), arch="amd64", analyze=False)
vw = viv_utils.getShellcodeWorkspaceFromFile(
str(path), arch="amd64", analyze=False
)
else:
raise ValueError("unexpected format: " + input_format)
except envi.exc.SegmentationViolation as e:
Expand Down Expand Up @@ -231,20 +239,26 @@ def get_extractor(
import capa.features.extractors.drakvuf.extractor

report = capa.helpers.load_jsonl_from_path(input_path)
return capa.features.extractors.drakvuf.extractor.DrakvufExtractor.from_report(report)
return capa.features.extractors.drakvuf.extractor.DrakvufExtractor.from_report(
report
)

elif backend == BACKEND_VMRAY:
import capa.features.extractors.vmray.extractor

return capa.features.extractors.vmray.extractor.VMRayExtractor.from_zipfile(input_path)
return capa.features.extractors.vmray.extractor.VMRayExtractor.from_zipfile(
input_path
)

elif backend == BACKEND_DOTNET:
import capa.features.extractors.dnfile.extractor

if input_format not in (FORMAT_PE, FORMAT_DOTNET):
raise UnsupportedFormatError()

return capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(input_path)
return capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(
input_path
)

elif backend == BACKEND_BINJA:
import capa.features.extractors.binja.find_binja_api as finder
Expand Down Expand Up @@ -303,11 +317,15 @@ def get_extractor(
vw.saveWorkspace()
except IOError:
# see #168 for discussion around how to handle non-writable directories
logger.info("source directory is not writable, won't save intermediate workspace")
logger.info(
"source directory is not writable, won't save intermediate workspace"
)
else:
logger.debug("CAPA_SAVE_WORKSPACE unset, not saving workspace")

return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, input_path, os_)
return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(
vw, input_path, os_
)

elif backend == BACKEND_FREEZE:
return frz.load(input_path.read_bytes())
Expand All @@ -320,7 +338,9 @@ def get_extractor(
assert sample_path is not None
buf = sample_path.read_bytes()

return capa.features.extractors.binexport2.extractor.BinExport2FeatureExtractor(be2, buf)
return capa.features.extractors.binexport2.extractor.BinExport2FeatureExtractor(
be2, buf
)

elif backend == BACKEND_IDA:
import capa.features.extractors.ida.idalib as idalib
Expand Down Expand Up @@ -351,7 +371,9 @@ def get_extractor(
# -1 - Generic errors (database already open, auto-analysis failed, etc.)
# -2 - User cancelled operation
ret = idapro.open_database(
str(input_path), run_auto_analysis=True, args="-Olumina:host=0.0.0.0 -Osecondary_lumina:host=0.0.0.0 -R"
str(input_path),
run_auto_analysis=True,
args="-Olumina:host=0.0.0.0 -Osecondary_lumina:host=0.0.0.0 -R",
)
if ret != 0:
raise RuntimeError("failed to analyze input file")
Expand Down Expand Up @@ -386,12 +408,19 @@ def get_extractor(
monitor = TaskMonitor.DUMMY

# Import file
loader = pyghidra.program_loader().project(project).source(str(input_path)).name(input_path.name)
loader = (
pyghidra.program_loader()
.project(project)
.source(str(input_path))
.name(input_path.name)
)
with loader.load() as load_results:
load_results.save(monitor)

# Open program
program, consumer = pyghidra.consume_program(project, "/" + input_path.name)
program, consumer = pyghidra.consume_program(
project, "/" + input_path.name
)

# Analyze
pyghidra.analyze(program, monitor)
Expand Down Expand Up @@ -424,7 +453,9 @@ def __exit__(self, exc_type, exc_val, exc_tb):

import capa.features.extractors.ghidra.extractor

return capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor(ctx_manager=cm, tmpdir=tmpdir)
return capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor(
ctx_manager=cm, tmpdir=tmpdir
)
else:
raise ValueError("unexpected backend: " + backend)

Expand Down Expand Up @@ -461,37 +492,55 @@ def get_file_extractors(input_file: Path, input_format: str) -> list[FeatureExtr
if input_format == FORMAT_PE:
import capa.features.extractors.pefile

file_extractors.append(capa.features.extractors.pefile.PefileFeatureExtractor(input_file))
file_extractors.append(
capa.features.extractors.pefile.PefileFeatureExtractor(input_file)
)

elif input_format == FORMAT_DOTNET:
import capa.features.extractors.pefile
import capa.features.extractors.dotnetfile

file_extractors.append(capa.features.extractors.pefile.PefileFeatureExtractor(input_file))
file_extractors.append(capa.features.extractors.dotnetfile.DotnetFileFeatureExtractor(input_file))
file_extractors.append(
capa.features.extractors.pefile.PefileFeatureExtractor(input_file)
)
file_extractors.append(
capa.features.extractors.dotnetfile.DotnetFileFeatureExtractor(input_file)
)

elif input_format == FORMAT_ELF:
import capa.features.extractors.elffile

file_extractors.append(capa.features.extractors.elffile.ElfFeatureExtractor(input_file))
file_extractors.append(
capa.features.extractors.elffile.ElfFeatureExtractor(input_file)
)

elif input_format == FORMAT_CAPE:
import capa.features.extractors.cape.extractor

report = capa.helpers.load_json_from_path(input_file)
file_extractors.append(capa.features.extractors.cape.extractor.CapeExtractor.from_report(report))
file_extractors.append(
capa.features.extractors.cape.extractor.CapeExtractor.from_report(report)
)

elif input_format == FORMAT_DRAKVUF:
import capa.helpers
import capa.features.extractors.drakvuf.extractor

report = capa.helpers.load_jsonl_from_path(input_file)
file_extractors.append(capa.features.extractors.drakvuf.extractor.DrakvufExtractor.from_report(report))
file_extractors.append(
capa.features.extractors.drakvuf.extractor.DrakvufExtractor.from_report(
report
)
)

elif input_format == FORMAT_VMRAY:
import capa.features.extractors.vmray.extractor

file_extractors.append(capa.features.extractors.vmray.extractor.VMRayExtractor.from_zipfile(input_file))
file_extractors.append(
capa.features.extractors.vmray.extractor.VMRayExtractor.from_zipfile(
input_file
)
)

elif input_format == FORMAT_BINEXPORT2:
file_extractors = _get_binexport2_file_extractors(input_file)
Expand All @@ -501,7 +550,9 @@ def get_file_extractors(input_file: Path, input_format: str) -> list[FeatureExtr

def get_signatures(sigs_path: Path) -> list[Path]:
if not sigs_path.exists():
raise IOError(f"signatures path {sigs_path} does not exist or cannot be accessed")
raise IOError(
f"signatures path {sigs_path} does not exist or cannot be accessed"
)

paths: list[Path] = []
if sigs_path.is_file():
Expand All @@ -525,7 +576,9 @@ def get_signatures(sigs_path: Path) -> list[Path]:
return paths


def get_sample_analysis(format_, arch, os_, extractor, rules_path, feature_counts, library_functions):
def get_sample_analysis(
format_, arch, os_, extractor, rules_path, feature_counts, library_functions
):
if isinstance(extractor, StaticFeatureExtractor):
return rdoc.StaticAnalysis(
format=format_,
Expand Down Expand Up @@ -575,12 +628,20 @@ def collect_metadata(
md5, sha1, sha256 = sample_hashes.md5, sample_hashes.sha1, sample_hashes.sha256

global_feats = list(extractor.extract_global_features())
extractor_format = [f.value for (f, _) in global_feats if isinstance(f, capa.features.common.Format)]
extractor_arch = [f.value for (f, _) in global_feats if isinstance(f, capa.features.common.Arch)]
extractor_os = [f.value for (f, _) in global_feats if isinstance(f, capa.features.common.OS)]
extractor_format = [
f.value for (f, _) in global_feats if isinstance(f, capa.features.common.Format)
]
extractor_arch = [
f.value for (f, _) in global_feats if isinstance(f, capa.features.common.Arch)
]
extractor_os = [
f.value for (f, _) in global_feats if isinstance(f, capa.features.common.OS)
]

input_format = (
str(extractor_format[0]) if extractor_format else "unknown" if input_format == FORMAT_AUTO else input_format
str(extractor_format[0])
if extractor_format
else "unknown" if input_format == FORMAT_AUTO else input_format
)
arch = str(extractor_arch[0]) if extractor_arch else "unknown"
os_ = str(extractor_os[0]) if extractor_os else "unknown" if os_ == OS_AUTO else os_
Expand Down Expand Up @@ -655,14 +716,18 @@ def result_rec(result: capa.features.common.Result):
threads_by_process[p.address] = []

for t in extractor.get_threads(p):
calls_by_thread[t.address] = []
# use setdefault so that a recycled TID (same pid+tid seen again) accumulates
# calls from all its instances rather than overwriting the prior instance's calls.
calls_by_thread.setdefault(t.address, [])

for c in extractor.get_calls(p, t):
if c.address in matched_calls:
names_by_call[c.address] = extractor.get_call_name(p, t, c)
calls_by_thread[t.address].append(c.address)

if calls_by_thread[t.address]:
# only register the thread address once; a recycled TID must not create
# a duplicate entry in threads_by_process or cause a double-add to matched_threads.
if calls_by_thread[t.address] and t.address not in matched_threads:
matched_threads.add(t.address)
threads_by_process[p.address].append(t.address)

Expand Down Expand Up @@ -700,7 +765,9 @@ def result_rec(result: capa.features.common.Result):
return layout


def compute_static_layout(rules: RuleSet, extractor: StaticFeatureExtractor, capabilities) -> rdoc.StaticLayout:
def compute_static_layout(
rules: RuleSet, extractor: StaticFeatureExtractor, capabilities
) -> rdoc.StaticLayout:
"""
compute a metadata structure that links basic blocks
to the functions in which they're found.
Expand Down Expand Up @@ -730,7 +797,9 @@ def compute_static_layout(rules: RuleSet, extractor: StaticFeatureExtractor, cap
rdoc.FunctionLayout(
address=frz.Address.from_capa(f),
matched_basic_blocks=tuple(
rdoc.BasicBlockLayout(address=frz.Address.from_capa(bb)) for bb in bbs if bb in matched_bbs
rdoc.BasicBlockLayout(address=frz.Address.from_capa(bb))
for bb in bbs
if bb in matched_bbs
), # this object is open to extension in the future,
# such as with the function name, etc.
)
Expand Down
Loading