diff --git a/dissect/target/helpers/configutil.py b/dissect/target/helpers/configutil.py index 998ed4d871..07decdcc54 100644 --- a/dissect/target/helpers/configutil.py +++ b/dissect/target/helpers/configutil.py @@ -1,9 +1,11 @@ from __future__ import annotations +import enum import io import json import re import sys +import textwrap from collections import deque from collections.abc import Callable, ItemsView, Iterable, Iterator, KeysView from configparser import ConfigParser, MissingSectionHeaderError @@ -911,6 +913,365 @@ def parse_file(self, fh: TextIO) -> None: _update_dictionary(self.parsed_data, key, value) +class _NixTokenType(enum.Enum): + LBRACE = "{" + RBRACE = "}" + LBRACKET = "[" + RBRACKET = "]" + EQUALS = "=" + SEMICOLON = ";" + COLON = ":" + DOT = "." + STRING = "str" + MULTILINE_STRING = "ml" + IDENT = "ident" + EOF = "eof" + + +@dataclass +class _NixToken: + type: _NixTokenType + value: str + + +class _NixTokenizer: + """Character-by-character tokenizer for Nix configuration files. + + Produces a stream of tokens from ``.nix`` file text, handling string literals, + multiline strings (``'' ... ''``), and line comments (``#``). + """ + + def __init__(self, text: str) -> None: + self._text = text + self._pos = 0 + self._length = len(text) + + def _peek(self, offset: int = 0) -> str | None: + pos = self._pos + offset + return self._text[pos] if pos < self._length else None + + def _advance(self) -> str: + ch = self._text[self._pos] + self._pos += 1 + return ch + + def _skip_whitespace_and_comments(self) -> None: + while self._pos < self._length: + ch = self._peek() + if ch in (" ", "\t", "\n", "\r"): + self._pos += 1 + elif ch == "#": + while self._pos < self._length and self._text[self._pos] != "\n": + self._pos += 1 + else: + break + + def _read_string(self) -> str: + """Read a ``"..."`` string literal with basic escape handling.""" + self._advance() # consume opening " + parts = [] + while self._pos < self._length: + ch = self._text[self._pos] + if ch == "\\": + self._pos += 1 + if self._pos < self._length: + escaped = self._text[self._pos] + if escaped == "n": + parts.append("\n") + elif escaped == "t": + parts.append("\t") + elif escaped == "r": + parts.append("\r") + else: + parts.append(escaped) + self._pos += 1 + elif ch == '"': + self._pos += 1 + break + else: + parts.append(ch) + self._pos += 1 + return "".join(parts) + + def _read_multiline_string(self) -> str: + """Read a ``'' ... ''`` multiline string literal.""" + self._pos += 2 # consume opening '' + parts = [] + while self._pos < self._length: + if self._text[self._pos] == "'" and self._peek(1) == "'": + self._pos += 2 # consume closing '' + break + parts.append(self._text[self._pos]) + self._pos += 1 + return "".join(parts).strip() + + def _read_identifier(self) -> str: + """Read a bare identifier: alphanumeric, ``_``, or ``-``.""" + start = self._pos + while self._pos < self._length: + ch = self._text[self._pos] + if ch.isalnum() or ch in ("_", "-"): + self._pos += 1 + else: + break + return self._text[start : self._pos] + + def tokenize(self) -> list[_NixToken]: + tokens = [] + while True: + self._skip_whitespace_and_comments() + if self._pos >= self._length: + tokens.append(_NixToken(_NixTokenType.EOF, "")) + break + + ch = self._peek() + + if ch == "{": + tokens.append(_NixToken(_NixTokenType.LBRACE, "{")) + self._advance() + elif ch == "}": + tokens.append(_NixToken(_NixTokenType.RBRACE, "}")) + self._advance() + elif ch == "[": + tokens.append(_NixToken(_NixTokenType.LBRACKET, "[")) + self._advance() + elif ch == "]": + tokens.append(_NixToken(_NixTokenType.RBRACKET, "]")) + self._advance() + elif ch == "=": + tokens.append(_NixToken(_NixTokenType.EQUALS, "=")) + self._advance() + elif ch == ";": + tokens.append(_NixToken(_NixTokenType.SEMICOLON, ";")) + self._advance() + elif ch == ":": + tokens.append(_NixToken(_NixTokenType.COLON, ":")) + self._advance() + elif ch == ".": + tokens.append(_NixToken(_NixTokenType.DOT, ".")) + self._advance() + elif ch == '"': + tokens.append(_NixToken(_NixTokenType.STRING, self._read_string())) + elif ch == "'" and self._peek(1) == "'": + tokens.append(_NixToken(_NixTokenType.MULTILINE_STRING, self._read_multiline_string())) + elif ch == ",": + # Commas appear in function headers { lib, ... }: — skip + self._advance() + elif ch.isalnum() or ch == "_": + tokens.append(_NixToken(_NixTokenType.IDENT, self._read_identifier())) + else: + # Skip unknown characters + self._advance() + + return tokens + + +class _NixParser: + """Recursive descent parser over a Nix token stream. + + Parses the subset of Nix used in NixOS configuration files into nested + Python dictionaries, lists, strings, booleans, and integers. + """ + + def __init__(self, tokens: list[_NixToken]) -> None: + self._tokens = tokens + self._pos = 0 + + def _peek(self, offset: int = 0) -> _NixToken: + pos = self._pos + offset + if pos < len(self._tokens): + return self._tokens[pos] + return _NixToken(_NixTokenType.EOF, "") + + def _advance(self) -> _NixToken: + token = self._tokens[self._pos] + self._pos += 1 + return token + + def _expect(self, token_type: _NixTokenType) -> _NixToken: + token = self._advance() + if token.type != token_type: + raise ConfigurationParsingError( + f"Expected {token_type.value}, got {token.type.value} ({token.value!r})" + ) + return token + + def parse(self) -> dict: + """Entry point. Handles optional function header then parses body.""" + self._skip_function_header() + if self._peek().type == _NixTokenType.LBRACE: + return self._parse_attr_set() + return {} + + def _skip_function_header(self) -> None: + """Skip ``{ lib, ..., ... }:`` function parameter pattern. + + Detects by scanning from the first ``{`` to find if the matching ``}`` is followed by ``:``.""" + if self._peek().type != _NixTokenType.LBRACE: + return + + # Scan ahead to find matching RBRACE + depth = 0 + scan_pos = self._pos + while scan_pos < len(self._tokens): + t = self._tokens[scan_pos] + if t.type == _NixTokenType.LBRACE: + depth += 1 + elif t.type == _NixTokenType.RBRACE: + depth -= 1 + if depth == 0: + # Check if next token is COLON — indicates function header + if scan_pos + 1 < len(self._tokens) and self._tokens[scan_pos + 1].type == _NixTokenType.COLON: + self._pos = scan_pos + 2 # skip past }: + return + scan_pos += 1 + + def _parse_attr_set(self) -> dict: + """Parse ``{ key = value; ... }`` into a dict.""" + self._expect(_NixTokenType.LBRACE) + result: dict = {} + + while self._peek().type != _NixTokenType.RBRACE and self._peek().type != _NixTokenType.EOF: + # Parse key path (possibly dotted) + key_parts = self._parse_key_path() + if not key_parts: + # Skip unexpected tokens + self._advance() + continue + + self._expect(_NixTokenType.EQUALS) + value = self._parse_value() + self._expect(_NixTokenType.SEMICOLON) + + self._merge_dotted_key(result, key_parts, value) + + if self._peek().type == _NixTokenType.RBRACE: + self._advance() # consume } + + return result + + def _parse_key_path(self) -> list[str]: + """Parse a dotted key path like ``networking.interfaces.eth0``.""" + token = self._peek() + if token.type == _NixTokenType.STRING: + parts = [self._advance().value] + elif token.type == _NixTokenType.IDENT: + parts = [self._advance().value] + else: + return [] + + while self._peek().type == _NixTokenType.DOT: + self._advance() # consume DOT + token = self._peek() + if token.type in (_NixTokenType.IDENT, _NixTokenType.STRING): + parts.append(self._advance().value) + else: + break + + return parts + + def _parse_value(self) -> str | int | bool | dict | list: + """Parse a value expression.""" + token = self._peek() + + if token.type == _NixTokenType.STRING: + return self._advance().value + + if token.type == _NixTokenType.MULTILINE_STRING: + return self._advance().value + + if token.type == _NixTokenType.LBRACE: + return self._parse_attr_set() + + if token.type == _NixTokenType.LBRACKET: + return self._parse_list() + + if token.type == _NixTokenType.IDENT: + # Check for lib.mkForce / lib.mkDefault / lib.mkOverride + if token.value == "lib" and self._peek(1).type == _NixTokenType.DOT: + return self._unwrap_lib_call() + + value = self._advance().value + + # Convert native types + if value == "true": + return True + if value == "false": + return False + try: + return int(value) + except ValueError: + pass + + # Bare identifier used as value (e.g., variable reference) — check if followed by + # more identifiers (function application like: pkgs.something) + while self._peek().type == _NixTokenType.DOT: + self._advance() # consume DOT + if self._peek().type == _NixTokenType.IDENT: + value = f"{value}.{self._advance().value}" + else: + break + + return value + + # Fallback: skip unexpected token + return self._advance().value + + def _parse_list(self) -> list: + """Parse ``[ elem1 elem2 ... ]`` into a Python list.""" + self._expect(_NixTokenType.LBRACKET) + result = [] + + while self._peek().type != _NixTokenType.RBRACKET and self._peek().type != _NixTokenType.EOF: + result.append(self._parse_value()) + + if self._peek().type == _NixTokenType.RBRACKET: + self._advance() # consume ] + + return result + + def _unwrap_lib_call(self) -> str | int | bool | dict | list: + """Unwrap ``lib.mkForce value`` and ``lib.mkDefault value``.""" + self._advance() # consume "lib" + self._advance() # consume DOT + self._advance() # consume function name (mkForce, mkDefault, etc.) + return self._parse_value() + + def _merge_dotted_key(self, target: dict, parts: list[str], value: ...) -> None: + """Merge a dotted key path into nested dicts. + + ``["a", "b", "c"]`` with value ``1`` produces ``{"a": {"b": {"c": 1}}}``. + Merges with existing dicts at each level. + """ + current = target + for part in parts[:-1]: + if part not in current: + current[part] = {} + elif not isinstance(current[part], dict): + current[part] = {} + current = current[part] + final_key = parts[-1] + existing = current.get(final_key) + if isinstance(existing, dict) and isinstance(value, dict): + existing.update(value) + else: + current[final_key] = value + + +class Nix(ConfigurationParser): + """Parse NixOS ``.nix`` configuration files into nested dictionaries. + + Handles the subset of Nix used in declarative NixOS configurations: + function headers, attribute sets (with dotted paths), lists, strings, + booleans, integers, and ``lib.mkForce``/``lib.mkDefault`` wrappers. + """ + + def parse_file(self, fh: TextIO) -> None: + text = fh.read() + tokens = _NixTokenizer(text).tokenize() + self.parsed_data = _NixParser(tokens).parse() + + @dataclass(frozen=True) class ParserOptions: collapse: bool | set[str] | None = None @@ -970,6 +1331,7 @@ def create_parser(self, options: ParserOptions | None = None) -> ConfigurationPa "template": ParserConfig(Txt), "toml": ParserConfig(Toml), "leases": ParserConfig(Leases), + "nix": ParserConfig(Nix), "meta_bare": ParserConfig(Default), # Return the basic config parser } diff --git a/dissect/target/plugins/os/unix/linux/network.py b/dissect/target/plugins/os/unix/linux/network.py index 9e63e1919f..849cc6a291 100644 --- a/dissect/target/plugins/os/unix/linux/network.py +++ b/dissect/target/plugins/os/unix/linux/network.py @@ -394,6 +394,130 @@ def _parse_gateway(self, value: str | None) -> NetAddress | None: return ip_address(value) +class NixOSConfigParser(LinuxNetworkConfigParser): + """NixOS declarative network configuration parser. + + NixOS uses the Nix functional language for system configuration. Network settings + are declared in ``/etc/nixos/configuration.nix`` or imported ``.nix`` files + (commonly ``networking.nix``). This parser uses :func:`configutil.parse` with + ``hint="nix"`` to parse ``.nix`` files into dictionaries, then extracts network + interface information from the resulting structure. + + References: + - https://nixos.org/manual/nixos/stable/#sec-networking + """ + + config_paths: tuple[str, ...] = ("/etc/nixos/",) + + def interfaces(self) -> Iterator[UnixInterfaceRecord]: + if not ( + self._target.fs.path("/etc/NIXOS").exists() + or self._target.fs.path("/etc/nixos/configuration.nix").exists() + ): + return + + for config_file in self._config_files(self.config_paths, "*.nix"): + try: + config = configutil.parse(config_file, hint="nix") + except Exception as e: + self._target.log.warning("Error parsing NixOS config file %s", config_file) + self._target.log.debug("", exc_info=e) + continue + + networking = config.get("networking", {}) + if not networking: + continue + + dns: set[NetAddress] = set() + for server in networking.get("nameservers", []): + try: + dns.add(ip_address(str(server))) + except ValueError: + self._target.log.debug("Invalid NixOS DNS server: %s", server) + + gateways: set[NetAddress] = set() + for gw_key in ("defaultGateway", "defaultGateway6"): + gw = networking.get(gw_key) + if isinstance(gw, str) and gw: + try: + gateways.add(ip_address(gw)) + except ValueError: + self._target.log.debug("Invalid NixOS gateway: %s", gw) + elif isinstance(gw, dict): + gw_addr = gw.get("address", "") + if gw_addr: + try: + gateways.add(ip_address(str(gw_addr))) + except ValueError: + self._target.log.debug("Invalid NixOS gateway: %s", gw_addr) + + use_dhcp = networking.get("useDHCP", False) is True + dhcpcd = networking.get("dhcpcd", {}) + if isinstance(dhcpcd, dict) and dhcpcd.get("enable") is True: + use_dhcp = True + + interfaces_cfg = networking.get("interfaces", {}) + if not isinstance(interfaces_cfg, dict): + interfaces_cfg = {} + + if not interfaces_cfg: + if dns or gateways or use_dhcp: + yield UnixInterfaceRecord( + name=None, + type="dhcp" if use_dhcp else "static", + enabled=None, + dns=list(dns), + gateway=list(gateways), + source=str(config_file), + dhcp_ipv4=use_dhcp, + dhcp_ipv6=use_dhcp, + configurator="nixos", + _target=self._target, + ) + continue + + for iface_name, iface_cfg in sorted(interfaces_cfg.items()): + if not isinstance(iface_cfg, dict): + continue + + ip_interfaces: set[NetInterface] = set() + iface_dhcp = use_dhcp + + if iface_cfg.get("useDHCP") is True: + iface_dhcp = True + elif iface_cfg.get("useDHCP") is False: + iface_dhcp = False + + for ip_version in ("ipv4", "ipv6"): + ip_cfg = iface_cfg.get(ip_version, {}) + if not isinstance(ip_cfg, dict): + continue + for addr_entry in ip_cfg.get("addresses", []): + if not isinstance(addr_entry, dict): + continue + addr = addr_entry.get("address") + prefix = addr_entry.get("prefixLength") + if addr and prefix is not None: + try: + ip_interfaces.add(ip_interface(f"{addr}/{prefix}")) + except ValueError: + self._target.log.debug("Invalid NixOS address: %s/%s", addr, prefix) + + yield UnixInterfaceRecord( + name=iface_name, + type="dhcp" if iface_dhcp else "static", + enabled=None, + cidr=ip_interfaces, + gateway=list(gateways), + dns=list(dns), + source=str(config_file), + dhcp_ipv4=iface_dhcp, + dhcp_ipv6=iface_dhcp, + configurator="nixos", + _target=self._target, + ) + + class DhclientLeaseParser(LinuxNetworkConfigParser): """Parse network interfaces from dhclient DHCP ``.leases`` files. @@ -763,5 +887,5 @@ def parse_debian_centos_dhclient_lease(log_record: LogRecord) -> DhcpLease | Non return DhcpLease(None, ip_interface(ip), None) if (ip := match.group(1)) else None -MANAGERS = [NetworkManagerConfigParser, SystemdNetworkConfigParser, ProcConfigParser] +MANAGERS = [NetworkManagerConfigParser, SystemdNetworkConfigParser, NixOSConfigParser, ProcConfigParser] LEASERS = [DhclientLeaseParser, NetworkManagerLeaseParser] diff --git a/tests/_data/plugins/os/unix/linux/nixos/configuration.nix b/tests/_data/plugins/os/unix/linux/nixos/configuration.nix new file mode 100644 index 0000000000..5120e7a052 --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/nixos/configuration.nix @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:8856ba0a47f6f1ba5596c6e04e3a5982ea98bd88174377978ca49561d7c31150 +size 275 diff --git a/tests/_data/plugins/os/unix/linux/nixos/networking.nix b/tests/_data/plugins/os/unix/linux/nixos/networking.nix new file mode 100644 index 0000000000..fdae6b7eaf --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/nixos/networking.nix @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:839698cc541350d10c5ba4b2b9d0b6339ea8622580ebe49824d1461ee578673b +size 749 diff --git a/tests/helpers/test_configutil.py b/tests/helpers/test_configutil.py index 241928b1d9..f3c4e92231 100644 --- a/tests/helpers/test_configutil.py +++ b/tests/helpers/test_configutil.py @@ -17,6 +17,7 @@ Indentation, Json, Leases, + Nix, ScopeManager, SystemD, parse, @@ -401,3 +402,39 @@ def test_leases_parser(string_data: str, expected_output: dict) -> None: parser.parse_file(StringIO(string_data)) assert parser.parsed_data == expected_output + + +def test_nix_parser_syntax() -> None: + """Test Nix parser syntax: function headers, dotted keys, types, lists, lib.mkForce, comments.""" + data = textwrap.dedent("""\ + { lib, ... }: + # Comment + { + a.b = "1"; a.c = "2"; + types = { bool = true; int = 42; empty = {}; }; + items = [ "x" { address = "10.0.0.1"; prefixLength = 24; } ]; + forced = lib.mkForce false; + } + """) + result = parse_data(Nix, data) + assert result["a"] == {"b": "1", "c": "2"} + assert result["types"] == {"bool": True, "int": 42, "empty": {}} + assert result["items"] == ["x", {"address": "10.0.0.1", "prefixLength": 24}] + assert result["forced"] is False + + +def test_nix_parser_networking_fixture() -> None: + """Test full networking.nix fixture from nixos-infect.""" + fixture = Path(absolute_path("_data/plugins/os/unix/linux/nixos/networking.nix")) + result = parse_data(Nix, fixture.read_text()) + + networking = result["networking"] + assert networking["nameservers"] == ["10.13.37.1", "10.13.37.2"] + assert networking["defaultGateway"] == "10.13.37.0" + assert networking["defaultGateway6"] == {"address": "", "interface": "eth0"} + assert networking["dhcpcd"]["enable"] is False + assert networking["usePredictableInterfaceNames"] is False + + eth0 = networking["interfaces"]["eth0"] + assert eth0["ipv4"]["addresses"] == [{"address": "10.13.37.10", "prefixLength": 24}] + assert eth0["ipv6"]["addresses"] == [{"address": "2001:db8::1", "prefixLength": 64}] diff --git a/tests/plugins/os/unix/linux/test_network.py b/tests/plugins/os/unix/linux/test_network.py index b94ddf505f..441926d1e2 100644 --- a/tests/plugins/os/unix/linux/test_network.py +++ b/tests/plugins/os/unix/linux/test_network.py @@ -16,6 +16,7 @@ LinuxNetworkPlugin, NetworkManagerConfigParser, NetworkManagerLeaseParser, + NixOSConfigParser, ProcConfigParser, SyslogConfigParser, SystemdNetworkConfigParser, @@ -441,3 +442,63 @@ def test_syslog_config_parser_multiple_lines(target_unix_users: Target, fs_unix: assert results[2].cidr == [ip_interface("10.13.37.1/32")] assert results[1].cidr == [ip_interface("10.13.37.2/24")] assert results[0].cidr == [ip_interface("10.13.37.3/32")] + + +def test_nixos_config_parser(target_linux: Target, fs_linux: VirtualFilesystem) -> None: + """Test NixOS declarative network configuration parsing.""" + fixture_dir = absolute_path("_data/plugins/os/unix/linux/nixos") + fs_linux.map_file("/etc/nixos/configuration.nix", fixture_dir / "configuration.nix") + fs_linux.map_file("/etc/nixos/networking.nix", fixture_dir / "networking.nix") + fs_linux.map_file_fh("/etc/NIXOS", BytesIO(b"")) + + parser = NixOSConfigParser(target_linux) + interfaces = list(parser.interfaces()) + + assert len(interfaces) == 1 + iface = interfaces[0] + + assert iface.name == "eth0" + assert iface.type == "static" + assert iface.enabled is None + assert set(iface.cidr) == {ip_interface("10.13.37.10/24"), ip_interface("2001:db8::1/64")} + assert iface.gateway == [ip_address("10.13.37.0")] + assert set(iface.dns) == {ip_address("10.13.37.1"), ip_address("10.13.37.2")} + assert iface.source == "/etc/nixos/networking.nix" + assert not iface.dhcp_ipv4 + assert not iface.dhcp_ipv6 + assert iface.configurator == "nixos" + + +def test_nixos_config_parser_dhcp(target_linux: Target, fs_linux: VirtualFilesystem) -> None: + """Test NixOS DHCP configuration detection.""" + nix_config = textwrap.dedent("""\ + { ... }: { + networking = { + useDHCP = true; + interfaces = { + ens3 = {}; + }; + }; + } + """) + + fs_linux.map_file_fh("/etc/nixos/configuration.nix", BytesIO(nix_config.encode())) + fs_linux.map_file_fh("/etc/NIXOS", BytesIO(b"")) + + parser = NixOSConfigParser(target_linux) + interfaces = list(parser.interfaces()) + + assert len(interfaces) == 1 + assert interfaces[0].name == "ens3" + assert interfaces[0].type == "dhcp" + assert interfaces[0].dhcp_ipv4 + assert interfaces[0].dhcp_ipv6 + assert interfaces[0].configurator == "nixos" + + +def test_nixos_config_parser_not_nixos(target_linux: Target, fs_linux: VirtualFilesystem) -> None: + """Test that NixOS parser yields nothing on non-NixOS systems.""" + parser = NixOSConfigParser(target_linux) + interfaces = list(parser.interfaces()) + + assert len(interfaces) == 0