From cbfc54ffb7313fc231c423b7ce1b4f132850a310 Mon Sep 17 00:00:00 2001 From: jmvermeulen Date: Mon, 23 Feb 2026 11:15:26 +0100 Subject: [PATCH 1/4] feat: add NixOS network manager support for IP extraction NixOS uses the Nix functional language for declarative system configuration, storing network settings in /etc/nixos/*.nix files. None of the existing 8 network manager parsers can handle this format, resulting in empty IP addresses for NixOS targets. This adds a regex-based parser that extracts: - IPv4/IPv6 addresses from .addresses blocks - Default gateway from defaultGateway - DNS nameservers from nameservers list - Interface names from interfaces block - DHCP status from dhcpcd.enable / useDHCP Detection uses /etc/NIXOS marker file or /etc/nixos/configuration.nix. Includes test fixtures based on real-world NixOS networking.nix generated by nixos-infect, with tests for IP, DNS, and gateway extraction. --- .../plugins/os/unix/linux/network_managers.py | 96 +++++++++++++++++++ .../os/unix/_os/ips/nixos/configuration.nix | 13 +++ .../os/unix/_os/ips/nixos/networking.nix | 31 ++++++ tests/plugins/os/unix/test_ips.py | 45 +++++++++ 4 files changed, 185 insertions(+) create mode 100644 tests/_data/plugins/os/unix/_os/ips/nixos/configuration.nix create mode 100644 tests/_data/plugins/os/unix/_os/ips/nixos/networking.nix diff --git a/dissect/target/plugins/os/unix/linux/network_managers.py b/dissect/target/plugins/os/unix/linux/network_managers.py index 116254e706..afe30977cd 100644 --- a/dissect/target/plugins/os/unix/linux/network_managers.py +++ b/dissect/target/plugins/os/unix/linux/network_managers.py @@ -79,6 +79,8 @@ def create_config(self, path: TargetPath) -> dict | None: config = self._parse_netplan_config(path) elif self.name == "wicked": config = self._parse_wicked_config(path) + elif self.name == "nixos": + config = self._parse_nixos_config(path) elif self.name == "interfaces": config = self._parse_text_config(("#"), " ", path) elif isinstance(self.parser, ConfigParser): @@ -177,6 +179,91 @@ def _parse_text_config(self, comments: str, delim: str, path: TargetPath) -> dic return dict(config) + def _parse_nixos_config(self, path: TargetPath) -> dict: + """Internal function to parse NixOS network configuration files. + + NixOS uses the Nix functional language for system configuration. Network settings + are declared in ``/etc/nixos/configuration.nix`` or imported ``.nix`` files + (commonly ``networking.nix``). This parser uses regex to extract IP addresses, + gateways, DNS servers, interface names, and DHCP settings from these files. + + Args: + path: A path to the NixOS configuration file. + + Returns: + Dictionary containing parsed NixOS network configuration. + """ + config = defaultdict(dict) + option_dict = {} + + try: + text = path.open("rt").read() + except Exception: + log.debug("Failed to read NixOS config file %s", path) + return dict(config) + + # Extract IP addresses from ipv4.addresses and ipv6.addresses blocks. + # We avoid matching address values inside .routes blocks or udev rules. + addresses = [] + in_addresses_block = False + for line in text.split("\n"): + stripped = line.strip() + if re.search(r'\.addresses\s*=\s*\[', stripped): + in_addresses_block = True + if in_addresses_block: + for addr in re.findall(r'address\s*=\s*"([^"]+)"', stripped): + if addr and ("." in addr or ":" in addr): + addresses.append(addr) + if "]" in stripped: + in_addresses_block = False + if addresses: + option_dict["address"] = addresses + + # Extract default gateway: defaultGateway = "x.x.x.x" + gateway_match = re.search(r'defaultGateway\s*=\s*"([^"]+)"', text) + if gateway_match and gateway_match.group(1): + option_dict["gateway"] = gateway_match.group(1) + + # Extract DNS nameservers: nameservers = [ "x.x.x.x" "y.y.y.y" ] + dns_match = re.search(r'nameservers\s*=\s*\[(.*?)\]', text, re.DOTALL) + if dns_match: + dns_servers = re.findall(r'"([^"]+)"', dns_match.group(1)) + if dns_servers: + option_dict["dns"] = dns_servers + + # Extract interface names from networking.interfaces.NAME or interfaces block + iface_names = set() + for match in re.finditer(r'interfaces\.([a-zA-Z][a-zA-Z0-9_-]*)\.(?:ipv[46]|useDHCP)', text): + iface_names.add(match.group(1)) + lines = text.split("\n") + in_interfaces_block = False + brace_depth = 0 + for line in lines: + stripped = line.strip() + if re.search(r'interfaces\s*=\s*\{', stripped): + in_interfaces_block = True + brace_depth = 1 + continue + if in_interfaces_block: + brace_depth += stripped.count("{") - stripped.count("}") + iface_match = re.match(r'([a-zA-Z][a-zA-Z0-9_-]*)\s*=\s*\{', stripped) + if iface_match and brace_depth == 2: + iface_names.add(iface_match.group(1)) + if brace_depth <= 0: + in_interfaces_block = False + if iface_names: + option_dict["name"] = list(iface_names) + + # Extract DHCP setting + dhcp_match = re.search(r'(?:dhcpcd\.enable|useDHCP)\s*=\s*(true|false)', text) + if dhcp_match: + option_dict["dhcp"] = dhcp_match.group(1) + + for section in self.sections: + config[section] = option_dict + + return dict(config) + def _parse_xml_config(self, xml: ElementTree, sections: list, options: list) -> dict: """Internal function to parse a xml based Linux network manager based configuration file into a dict. @@ -679,6 +766,12 @@ def should_ignore_ip(ip: str) -> bool: ("/etc/sysconfig/network-scripts/ifcfg-*", "/usr/sbin/ifup", "/usr/sbin/ifdown"), ("/etc/sysconfig/network-scripts/ifcfg-*", "/etc/sysconfig/network/ifcfg-*"), ), + # NixOS declarative networking + NetworkManager( + "nixos", + ("/etc/NIXOS", "/etc/nixos/configuration.nix"), + ("/etc/nixos/*.nix",), + ), # Interfaces folder/files NetworkManager( "interfaces", @@ -716,6 +809,9 @@ def should_ignore_ip(ip: str) -> bool: ["ifupdown"], ["ipaddr", "bootproto", "dns", "gateway", "name", "device", "dns1"], ), + "nixos": Template( + "nixos", None, ["nixos"], ["address", "gateway", "dns", "dhcp", "name"] + ), "interfaces": Template( "interfaces", None, ["interfaces"], ["iface", "address", "gateway", "netmask", "dns-nameservers"] ), diff --git a/tests/_data/plugins/os/unix/_os/ips/nixos/configuration.nix b/tests/_data/plugins/os/unix/_os/ips/nixos/configuration.nix new file mode 100644 index 0000000000..8ba2059376 --- /dev/null +++ b/tests/_data/plugins/os/unix/_os/ips/nixos/configuration.nix @@ -0,0 +1,13 @@ +{ ... }: { + imports = [ + ./hardware-configuration.nix + ./networking.nix + ]; + + boot.tmp.cleanOnBoot = true; + zramSwap.enable = true; + networking.hostName = "test-nixos"; + networking.domain = ""; + services.openssh.enable = true; + system.stateVersion = "23.11"; +} diff --git a/tests/_data/plugins/os/unix/_os/ips/nixos/networking.nix b/tests/_data/plugins/os/unix/_os/ips/nixos/networking.nix new file mode 100644 index 0000000000..578be2fe93 --- /dev/null +++ b/tests/_data/plugins/os/unix/_os/ips/nixos/networking.nix @@ -0,0 +1,31 @@ +{ lib, ... }: { + networking = { + nameservers = [ "10.13.37.1" + "10.13.37.2" + ]; + defaultGateway = "10.13.37.0"; + defaultGateway6 = { + address = ""; + interface = "eth0"; + }; + dhcpcd.enable = false; + usePredictableInterfaceNames = lib.mkForce false; + interfaces = { + eth0 = { + ipv4.addresses = [ + { address="10.13.37.10"; prefixLength=24; } + ]; + ipv6.addresses = [ + { address="2001:db8::1"; prefixLength=64; } + ]; + ipv4.routes = [ { address = "10.13.37.0"; prefixLength = 32; } ]; + ipv6.routes = [ { address = ""; prefixLength = 128; } ]; + }; + + }; + }; + services.udev.extraRules = '' + ATTR{address}=="52:54:00:12:34:56", NAME="eth0" + + ''; +} diff --git a/tests/plugins/os/unix/test_ips.py b/tests/plugins/os/unix/test_ips.py index bf7e0b56ad..765b47f047 100644 --- a/tests/plugins/os/unix/test_ips.py +++ b/tests/plugins/os/unix/test_ips.py @@ -252,6 +252,51 @@ def test_regression_ips_unique_strings(target_unix: Target, fs_unix: VirtualFile assert target_unix.ips == ["1.2.3.4"] +def test_ips_nixos_static(target_unix_users: Target, fs_unix: VirtualFilesystem) -> None: + """Test statically defined ipv4 and ipv6 addresses in NixOS /etc/nixos/*.nix.""" + + nixos_dir = absolute_path("_data/plugins/os/unix/_os/ips/nixos") + fs_unix.map_file("/etc/nixos/configuration.nix", nixos_dir / "configuration.nix") + fs_unix.map_file("/etc/nixos/networking.nix", nixos_dir / "networking.nix") + # NixOS detection relies on /etc/NIXOS marker file + fs_unix.map_file_fh("/etc/NIXOS", BytesIO(b"")) + + target_unix_users.add_plugin(LinuxPlugin) + results = target_unix_users.ips + + assert sorted(results) == sorted(["10.13.37.10", "2001:db8::1"]) + + +def test_ips_nixos_dns(target_unix_users: Target, fs_unix: VirtualFilesystem) -> None: + """Test DNS nameserver extraction from NixOS configuration.""" + + nixos_dir = absolute_path("_data/plugins/os/unix/_os/ips/nixos") + fs_unix.map_file("/etc/nixos/configuration.nix", nixos_dir / "configuration.nix") + fs_unix.map_file("/etc/nixos/networking.nix", nixos_dir / "networking.nix") + fs_unix.map_file_fh("/etc/NIXOS", BytesIO(b"")) + + target_unix_users.add_plugin(LinuxPlugin) + results = target_unix_users.dns + + assert len(results) == 1 + assert results == [{"10.13.37.1", "10.13.37.2"}] + + +def test_ips_nixos_gateway(target_unix_users: Target, fs_unix: VirtualFilesystem) -> None: + """Test gateway extraction from NixOS configuration.""" + + nixos_dir = absolute_path("_data/plugins/os/unix/_os/ips/nixos") + fs_unix.map_file("/etc/nixos/configuration.nix", nixos_dir / "configuration.nix") + fs_unix.map_file("/etc/nixos/networking.nix", nixos_dir / "networking.nix") + fs_unix.map_file_fh("/etc/NIXOS", BytesIO(b"")) + + target_unix_users.add_plugin(LinuxPlugin) + results = target_unix_users.gateway + + assert len(results) == 1 + assert results == [{"10.13.37.0"}] + + def test_ips_dhcp_lease_files(target_unix: Target, fs_unix: VirtualFilesystem) -> None: """Test if we can detect DHCP lease files from NetworkManager and dhclient.""" From 637dde454d6351be6fa2d864e910be1e51cc51bb Mon Sep 17 00:00:00 2001 From: jmvermeulen Date: Mon, 23 Feb 2026 13:48:31 +0100 Subject: [PATCH 2/4] feat: move NixOS network support from network_managers to network plugin Move NixOS network configuration parsing to the new network.py plugin as requested in code review. The network_managers plugin is deprecated. Changes: - Add NixOSConfigParser to network.py yielding UnixInterfaceRecord - Extract IPs with prefixLength as proper ip_interface objects - Detect NixOS via /etc/NIXOS marker or /etc/nixos/configuration.nix - Revert all network_managers.py changes - Move test fixtures to tests/_data/plugins/os/unix/linux/nixos/ (LFS) - Add tests to test_network.py instead of test_ips.py --- .../target/plugins/os/unix/linux/network.py | 139 +++++++++++++++++- .../plugins/os/unix/linux/network_managers.py | 96 ------------ .../os/unix/_os/ips/nixos/configuration.nix | 13 -- .../os/unix/_os/ips/nixos/networking.nix | 31 ---- .../os/unix/linux/nixos/configuration.nix | 3 + .../os/unix/linux/nixos/networking.nix | 3 + tests/plugins/os/unix/linux/test_network.py | 61 ++++++++ tests/plugins/os/unix/test_ips.py | 45 ------ 8 files changed, 205 insertions(+), 186 deletions(-) delete mode 100644 tests/_data/plugins/os/unix/_os/ips/nixos/configuration.nix delete mode 100644 tests/_data/plugins/os/unix/_os/ips/nixos/networking.nix create mode 100644 tests/_data/plugins/os/unix/linux/nixos/configuration.nix create mode 100644 tests/_data/plugins/os/unix/linux/nixos/networking.nix diff --git a/dissect/target/plugins/os/unix/linux/network.py b/dissect/target/plugins/os/unix/linux/network.py index 9e63e1919f..f2457b775e 100644 --- a/dissect/target/plugins/os/unix/linux/network.py +++ b/dissect/target/plugins/os/unix/linux/network.py @@ -763,5 +763,142 @@ def parse_debian_centos_dhclient_lease(log_record: LogRecord) -> DhcpLease | Non return DhcpLease(None, ip_interface(ip), None) if (ip := match.group(1)) else None -MANAGERS = [NetworkManagerConfigParser, SystemdNetworkConfigParser, ProcConfigParser] +class NixOSConfigParser(LinuxNetworkConfigParser): + """NixOS declarative network configuration parser. + + NixOS uses the Nix functional language for system configuration. Network settings + are declared in ``/etc/nixos/configuration.nix`` or imported ``.nix`` files + (commonly ``networking.nix``). This parser uses regex to extract IP addresses, + gateways, DNS servers, interface names, and DHCP settings from these files. + + References: + - https://nixos.org/manual/nixos/stable/#sec-networking + """ + + config_paths: tuple[str, ...] = ("/etc/nixos/",) + + def interfaces(self) -> Iterator[UnixInterfaceRecord]: + # Only parse if this is actually a NixOS system + if not ( + self._target.fs.path("/etc/NIXOS").exists() + or self._target.fs.path("/etc/nixos/configuration.nix").exists() + ): + return + + for config_file in self._config_files(self.config_paths, "*.nix"): + try: + text = config_file.open("rt").read() + except Exception as e: + self._target.log.warning("Error reading NixOS config file %s", config_file) + self._target.log.debug("", exc_info=e) + continue + + ip_interfaces: set[NetInterface] = set() + gateways: set[NetAddress] = set() + dns: set[NetAddress] = set() + iface_names: set[str] = set() + dhcp_ipv4 = False + dhcp_ipv6 = False + + # Extract IP addresses from ipv4.addresses and ipv6.addresses blocks. + # Avoid matching address values inside .routes blocks or udev rules. + in_addresses_block = False + for line in text.split("\n"): + stripped = line.strip() + if re.search(r"\.addresses\s*=\s*\[", stripped): + in_addresses_block = True + if in_addresses_block: + for match in re.finditer( + r'address\s*=\s*"([^"]+)";\s*prefixLength\s*=\s*(\d+)', + stripped, + ): + addr, prefix = match.group(1), match.group(2) + if addr and ("." in addr or ":" in addr): + try: + ip_interfaces.add(ip_interface(f"{addr}/{prefix}")) + except ValueError: + self._target.log.debug("Invalid NixOS address: %s/%s", addr, prefix) + if "]" in stripped: + in_addresses_block = False + + # Extract default gateway: defaultGateway = "x.x.x.x" + gateway_match = re.search(r'defaultGateway\s*=\s*"([^"]+)"', text) + if gateway_match and gateway_match.group(1): + try: + gateways.add(ip_address(gateway_match.group(1))) + except ValueError: + self._target.log.debug("Invalid NixOS gateway: %s", gateway_match.group(1)) + + # Extract DNS nameservers: nameservers = [ "x.x.x.x" "y.y.y.y" ] + dns_match = re.search(r"nameservers\s*=\s*\[(.*?)\]", text, re.DOTALL) + if dns_match: + for server in re.findall(r'"([^"]+)"', dns_match.group(1)): + try: + dns.add(ip_address(server)) + except ValueError: + self._target.log.debug("Invalid NixOS DNS server: %s", server) + + # Extract interface names from interfaces.NAME.ipvX or interfaces = { NAME = { } } + for match in re.finditer(r"interfaces\.([a-zA-Z][a-zA-Z0-9_-]*)\.(?:ipv[46]|useDHCP)", text): + iface_names.add(match.group(1)) + + in_interfaces_block = False + brace_depth = 0 + for line in text.split("\n"): + stripped = line.strip() + if re.search(r"interfaces\s*=\s*\{", stripped): + in_interfaces_block = True + brace_depth = 1 + continue + if in_interfaces_block: + brace_depth += stripped.count("{") - stripped.count("}") + iface_match = re.match(r"([a-zA-Z][a-zA-Z0-9_-]*)\s*=\s*\{", stripped) + if iface_match and brace_depth == 2: + iface_names.add(iface_match.group(1)) + if brace_depth <= 0: + in_interfaces_block = False + + # Extract DHCP setting + dhcp_match = re.search(r"(?:dhcpcd\.enable|useDHCP)\s*=\s*(true|false)", text) + if dhcp_match: + dhcp_ipv4 = dhcp_match.group(1) == "true" + dhcp_ipv6 = dhcp_match.group(1) == "true" + + # Only yield if we found any network information + if not (ip_interfaces or gateways or dns or dhcp_ipv4 or dhcp_ipv6): + continue + + # Yield one record per interface, or a single record if no interface names found + if iface_names: + for name in sorted(iface_names): + yield UnixInterfaceRecord( + name=name, + type="static" if not (dhcp_ipv4 or dhcp_ipv6) else "dhcp", + enabled=None, + cidr=ip_interfaces, + gateway=list(gateways), + dns=list(dns), + source=str(config_file), + dhcp_ipv4=dhcp_ipv4, + dhcp_ipv6=dhcp_ipv6, + configurator="nixos", + _target=self._target, + ) + else: + yield UnixInterfaceRecord( + name=None, + type="static" if not (dhcp_ipv4 or dhcp_ipv6) else "dhcp", + enabled=None, + cidr=ip_interfaces, + gateway=list(gateways), + dns=list(dns), + source=str(config_file), + dhcp_ipv4=dhcp_ipv4, + dhcp_ipv6=dhcp_ipv6, + configurator="nixos", + _target=self._target, + ) + + +MANAGERS = [NetworkManagerConfigParser, SystemdNetworkConfigParser, NixOSConfigParser, ProcConfigParser] LEASERS = [DhclientLeaseParser, NetworkManagerLeaseParser] diff --git a/dissect/target/plugins/os/unix/linux/network_managers.py b/dissect/target/plugins/os/unix/linux/network_managers.py index afe30977cd..116254e706 100644 --- a/dissect/target/plugins/os/unix/linux/network_managers.py +++ b/dissect/target/plugins/os/unix/linux/network_managers.py @@ -79,8 +79,6 @@ def create_config(self, path: TargetPath) -> dict | None: config = self._parse_netplan_config(path) elif self.name == "wicked": config = self._parse_wicked_config(path) - elif self.name == "nixos": - config = self._parse_nixos_config(path) elif self.name == "interfaces": config = self._parse_text_config(("#"), " ", path) elif isinstance(self.parser, ConfigParser): @@ -179,91 +177,6 @@ def _parse_text_config(self, comments: str, delim: str, path: TargetPath) -> dic return dict(config) - def _parse_nixos_config(self, path: TargetPath) -> dict: - """Internal function to parse NixOS network configuration files. - - NixOS uses the Nix functional language for system configuration. Network settings - are declared in ``/etc/nixos/configuration.nix`` or imported ``.nix`` files - (commonly ``networking.nix``). This parser uses regex to extract IP addresses, - gateways, DNS servers, interface names, and DHCP settings from these files. - - Args: - path: A path to the NixOS configuration file. - - Returns: - Dictionary containing parsed NixOS network configuration. - """ - config = defaultdict(dict) - option_dict = {} - - try: - text = path.open("rt").read() - except Exception: - log.debug("Failed to read NixOS config file %s", path) - return dict(config) - - # Extract IP addresses from ipv4.addresses and ipv6.addresses blocks. - # We avoid matching address values inside .routes blocks or udev rules. - addresses = [] - in_addresses_block = False - for line in text.split("\n"): - stripped = line.strip() - if re.search(r'\.addresses\s*=\s*\[', stripped): - in_addresses_block = True - if in_addresses_block: - for addr in re.findall(r'address\s*=\s*"([^"]+)"', stripped): - if addr and ("." in addr or ":" in addr): - addresses.append(addr) - if "]" in stripped: - in_addresses_block = False - if addresses: - option_dict["address"] = addresses - - # Extract default gateway: defaultGateway = "x.x.x.x" - gateway_match = re.search(r'defaultGateway\s*=\s*"([^"]+)"', text) - if gateway_match and gateway_match.group(1): - option_dict["gateway"] = gateway_match.group(1) - - # Extract DNS nameservers: nameservers = [ "x.x.x.x" "y.y.y.y" ] - dns_match = re.search(r'nameservers\s*=\s*\[(.*?)\]', text, re.DOTALL) - if dns_match: - dns_servers = re.findall(r'"([^"]+)"', dns_match.group(1)) - if dns_servers: - option_dict["dns"] = dns_servers - - # Extract interface names from networking.interfaces.NAME or interfaces block - iface_names = set() - for match in re.finditer(r'interfaces\.([a-zA-Z][a-zA-Z0-9_-]*)\.(?:ipv[46]|useDHCP)', text): - iface_names.add(match.group(1)) - lines = text.split("\n") - in_interfaces_block = False - brace_depth = 0 - for line in lines: - stripped = line.strip() - if re.search(r'interfaces\s*=\s*\{', stripped): - in_interfaces_block = True - brace_depth = 1 - continue - if in_interfaces_block: - brace_depth += stripped.count("{") - stripped.count("}") - iface_match = re.match(r'([a-zA-Z][a-zA-Z0-9_-]*)\s*=\s*\{', stripped) - if iface_match and brace_depth == 2: - iface_names.add(iface_match.group(1)) - if brace_depth <= 0: - in_interfaces_block = False - if iface_names: - option_dict["name"] = list(iface_names) - - # Extract DHCP setting - dhcp_match = re.search(r'(?:dhcpcd\.enable|useDHCP)\s*=\s*(true|false)', text) - if dhcp_match: - option_dict["dhcp"] = dhcp_match.group(1) - - for section in self.sections: - config[section] = option_dict - - return dict(config) - def _parse_xml_config(self, xml: ElementTree, sections: list, options: list) -> dict: """Internal function to parse a xml based Linux network manager based configuration file into a dict. @@ -766,12 +679,6 @@ def should_ignore_ip(ip: str) -> bool: ("/etc/sysconfig/network-scripts/ifcfg-*", "/usr/sbin/ifup", "/usr/sbin/ifdown"), ("/etc/sysconfig/network-scripts/ifcfg-*", "/etc/sysconfig/network/ifcfg-*"), ), - # NixOS declarative networking - NetworkManager( - "nixos", - ("/etc/NIXOS", "/etc/nixos/configuration.nix"), - ("/etc/nixos/*.nix",), - ), # Interfaces folder/files NetworkManager( "interfaces", @@ -809,9 +716,6 @@ def should_ignore_ip(ip: str) -> bool: ["ifupdown"], ["ipaddr", "bootproto", "dns", "gateway", "name", "device", "dns1"], ), - "nixos": Template( - "nixos", None, ["nixos"], ["address", "gateway", "dns", "dhcp", "name"] - ), "interfaces": Template( "interfaces", None, ["interfaces"], ["iface", "address", "gateway", "netmask", "dns-nameservers"] ), diff --git a/tests/_data/plugins/os/unix/_os/ips/nixos/configuration.nix b/tests/_data/plugins/os/unix/_os/ips/nixos/configuration.nix deleted file mode 100644 index 8ba2059376..0000000000 --- a/tests/_data/plugins/os/unix/_os/ips/nixos/configuration.nix +++ /dev/null @@ -1,13 +0,0 @@ -{ ... }: { - imports = [ - ./hardware-configuration.nix - ./networking.nix - ]; - - boot.tmp.cleanOnBoot = true; - zramSwap.enable = true; - networking.hostName = "test-nixos"; - networking.domain = ""; - services.openssh.enable = true; - system.stateVersion = "23.11"; -} diff --git a/tests/_data/plugins/os/unix/_os/ips/nixos/networking.nix b/tests/_data/plugins/os/unix/_os/ips/nixos/networking.nix deleted file mode 100644 index 578be2fe93..0000000000 --- a/tests/_data/plugins/os/unix/_os/ips/nixos/networking.nix +++ /dev/null @@ -1,31 +0,0 @@ -{ lib, ... }: { - networking = { - nameservers = [ "10.13.37.1" - "10.13.37.2" - ]; - defaultGateway = "10.13.37.0"; - defaultGateway6 = { - address = ""; - interface = "eth0"; - }; - dhcpcd.enable = false; - usePredictableInterfaceNames = lib.mkForce false; - interfaces = { - eth0 = { - ipv4.addresses = [ - { address="10.13.37.10"; prefixLength=24; } - ]; - ipv6.addresses = [ - { address="2001:db8::1"; prefixLength=64; } - ]; - ipv4.routes = [ { address = "10.13.37.0"; prefixLength = 32; } ]; - ipv6.routes = [ { address = ""; prefixLength = 128; } ]; - }; - - }; - }; - services.udev.extraRules = '' - ATTR{address}=="52:54:00:12:34:56", NAME="eth0" - - ''; -} diff --git a/tests/_data/plugins/os/unix/linux/nixos/configuration.nix b/tests/_data/plugins/os/unix/linux/nixos/configuration.nix new file mode 100644 index 0000000000..5120e7a052 --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/nixos/configuration.nix @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:8856ba0a47f6f1ba5596c6e04e3a5982ea98bd88174377978ca49561d7c31150 +size 275 diff --git a/tests/_data/plugins/os/unix/linux/nixos/networking.nix b/tests/_data/plugins/os/unix/linux/nixos/networking.nix new file mode 100644 index 0000000000..fdae6b7eaf --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/nixos/networking.nix @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:839698cc541350d10c5ba4b2b9d0b6339ea8622580ebe49824d1461ee578673b +size 749 diff --git a/tests/plugins/os/unix/linux/test_network.py b/tests/plugins/os/unix/linux/test_network.py index b94ddf505f..441926d1e2 100644 --- a/tests/plugins/os/unix/linux/test_network.py +++ b/tests/plugins/os/unix/linux/test_network.py @@ -16,6 +16,7 @@ LinuxNetworkPlugin, NetworkManagerConfigParser, NetworkManagerLeaseParser, + NixOSConfigParser, ProcConfigParser, SyslogConfigParser, SystemdNetworkConfigParser, @@ -441,3 +442,63 @@ def test_syslog_config_parser_multiple_lines(target_unix_users: Target, fs_unix: assert results[2].cidr == [ip_interface("10.13.37.1/32")] assert results[1].cidr == [ip_interface("10.13.37.2/24")] assert results[0].cidr == [ip_interface("10.13.37.3/32")] + + +def test_nixos_config_parser(target_linux: Target, fs_linux: VirtualFilesystem) -> None: + """Test NixOS declarative network configuration parsing.""" + fixture_dir = absolute_path("_data/plugins/os/unix/linux/nixos") + fs_linux.map_file("/etc/nixos/configuration.nix", fixture_dir / "configuration.nix") + fs_linux.map_file("/etc/nixos/networking.nix", fixture_dir / "networking.nix") + fs_linux.map_file_fh("/etc/NIXOS", BytesIO(b"")) + + parser = NixOSConfigParser(target_linux) + interfaces = list(parser.interfaces()) + + assert len(interfaces) == 1 + iface = interfaces[0] + + assert iface.name == "eth0" + assert iface.type == "static" + assert iface.enabled is None + assert set(iface.cidr) == {ip_interface("10.13.37.10/24"), ip_interface("2001:db8::1/64")} + assert iface.gateway == [ip_address("10.13.37.0")] + assert set(iface.dns) == {ip_address("10.13.37.1"), ip_address("10.13.37.2")} + assert iface.source == "/etc/nixos/networking.nix" + assert not iface.dhcp_ipv4 + assert not iface.dhcp_ipv6 + assert iface.configurator == "nixos" + + +def test_nixos_config_parser_dhcp(target_linux: Target, fs_linux: VirtualFilesystem) -> None: + """Test NixOS DHCP configuration detection.""" + nix_config = textwrap.dedent("""\ + { ... }: { + networking = { + useDHCP = true; + interfaces = { + ens3 = {}; + }; + }; + } + """) + + fs_linux.map_file_fh("/etc/nixos/configuration.nix", BytesIO(nix_config.encode())) + fs_linux.map_file_fh("/etc/NIXOS", BytesIO(b"")) + + parser = NixOSConfigParser(target_linux) + interfaces = list(parser.interfaces()) + + assert len(interfaces) == 1 + assert interfaces[0].name == "ens3" + assert interfaces[0].type == "dhcp" + assert interfaces[0].dhcp_ipv4 + assert interfaces[0].dhcp_ipv6 + assert interfaces[0].configurator == "nixos" + + +def test_nixos_config_parser_not_nixos(target_linux: Target, fs_linux: VirtualFilesystem) -> None: + """Test that NixOS parser yields nothing on non-NixOS systems.""" + parser = NixOSConfigParser(target_linux) + interfaces = list(parser.interfaces()) + + assert len(interfaces) == 0 diff --git a/tests/plugins/os/unix/test_ips.py b/tests/plugins/os/unix/test_ips.py index 765b47f047..bf7e0b56ad 100644 --- a/tests/plugins/os/unix/test_ips.py +++ b/tests/plugins/os/unix/test_ips.py @@ -252,51 +252,6 @@ def test_regression_ips_unique_strings(target_unix: Target, fs_unix: VirtualFile assert target_unix.ips == ["1.2.3.4"] -def test_ips_nixos_static(target_unix_users: Target, fs_unix: VirtualFilesystem) -> None: - """Test statically defined ipv4 and ipv6 addresses in NixOS /etc/nixos/*.nix.""" - - nixos_dir = absolute_path("_data/plugins/os/unix/_os/ips/nixos") - fs_unix.map_file("/etc/nixos/configuration.nix", nixos_dir / "configuration.nix") - fs_unix.map_file("/etc/nixos/networking.nix", nixos_dir / "networking.nix") - # NixOS detection relies on /etc/NIXOS marker file - fs_unix.map_file_fh("/etc/NIXOS", BytesIO(b"")) - - target_unix_users.add_plugin(LinuxPlugin) - results = target_unix_users.ips - - assert sorted(results) == sorted(["10.13.37.10", "2001:db8::1"]) - - -def test_ips_nixos_dns(target_unix_users: Target, fs_unix: VirtualFilesystem) -> None: - """Test DNS nameserver extraction from NixOS configuration.""" - - nixos_dir = absolute_path("_data/plugins/os/unix/_os/ips/nixos") - fs_unix.map_file("/etc/nixos/configuration.nix", nixos_dir / "configuration.nix") - fs_unix.map_file("/etc/nixos/networking.nix", nixos_dir / "networking.nix") - fs_unix.map_file_fh("/etc/NIXOS", BytesIO(b"")) - - target_unix_users.add_plugin(LinuxPlugin) - results = target_unix_users.dns - - assert len(results) == 1 - assert results == [{"10.13.37.1", "10.13.37.2"}] - - -def test_ips_nixos_gateway(target_unix_users: Target, fs_unix: VirtualFilesystem) -> None: - """Test gateway extraction from NixOS configuration.""" - - nixos_dir = absolute_path("_data/plugins/os/unix/_os/ips/nixos") - fs_unix.map_file("/etc/nixos/configuration.nix", nixos_dir / "configuration.nix") - fs_unix.map_file("/etc/nixos/networking.nix", nixos_dir / "networking.nix") - fs_unix.map_file_fh("/etc/NIXOS", BytesIO(b"")) - - target_unix_users.add_plugin(LinuxPlugin) - results = target_unix_users.gateway - - assert len(results) == 1 - assert results == [{"10.13.37.0"}] - - def test_ips_dhcp_lease_files(target_unix: Target, fs_unix: VirtualFilesystem) -> None: """Test if we can detect DHCP lease files from NetworkManager and dhclient.""" From ab65d4d3b26ef6547822974df0074702a0431cf3 Mon Sep 17 00:00:00 2001 From: jmvermeulen Date: Wed, 25 Feb 2026 19:25:18 +0100 Subject: [PATCH 3/4] refactor: use configutil Nix parser for NixOS network config extraction Add a Nix configuration parser to configutil that handles the subset of Nix syntax used in NixOS configuration files (attribute sets, dotted paths, lists, strings, booleans, lib.mkForce/mkDefault). Refactor NixOSConfigParser to use configutil.parse(hint="nix") instead of regex, and move the class before lease parsers to maintain file structure. --- dissect/target/helpers/configutil.py | 453 ++++++++++++++++++ .../target/plugins/os/unix/linux/network.py | 261 +++++----- tests/helpers/test_configutil.py | 37 ++ 3 files changed, 614 insertions(+), 137 deletions(-) diff --git a/dissect/target/helpers/configutil.py b/dissect/target/helpers/configutil.py index 998ed4d871..3aee6bdde3 100644 --- a/dissect/target/helpers/configutil.py +++ b/dissect/target/helpers/configutil.py @@ -1,9 +1,11 @@ from __future__ import annotations +import enum import io import json import re import sys +import textwrap from collections import deque from collections.abc import Callable, ItemsView, Iterable, Iterator, KeysView from configparser import ConfigParser, MissingSectionHeaderError @@ -911,6 +913,456 @@ def parse_file(self, fh: TextIO) -> None: _update_dictionary(self.parsed_data, key, value) +class _NixTokenType(enum.Enum): + LBRACE = "{" + RBRACE = "}" + LBRACKET = "[" + RBRACKET = "]" + EQUALS = "=" + SEMICOLON = ";" + COLON = ":" + DOT = "." + STRING = "str" + MULTILINE_STRING = "ml" + IDENT = "ident" + EOF = "eof" + + +@dataclass +class _NixToken: + type: _NixTokenType + value: str + + +class _NixTokenizer: + """Character-by-character tokenizer for Nix configuration files. + + Produces a stream of tokens from ``.nix`` file text, handling string literals, + multiline strings (``'' ... ''``), line comments (``#``), and block comments + (``/* ... */``). + """ + + def __init__(self, text: str) -> None: + self._text = text + self._pos = 0 + self._length = len(text) + + def _peek(self, offset: int = 0) -> str | None: + pos = self._pos + offset + return self._text[pos] if pos < self._length else None + + def _advance(self) -> str: + ch = self._text[self._pos] + self._pos += 1 + return ch + + def _skip_whitespace_and_comments(self) -> None: + while self._pos < self._length: + ch = self._peek() + if ch in (" ", "\t", "\n", "\r"): + self._pos += 1 + elif ch == "#": + while self._pos < self._length and self._text[self._pos] != "\n": + self._pos += 1 + elif ch == "/" and self._peek(1) == "*": + self._pos += 2 + while self._pos < self._length: + if self._text[self._pos] == "*" and self._peek(1) == "/": + self._pos += 2 + break + self._pos += 1 + else: + break + + def _read_string(self) -> str: + """Read a ``"..."`` string literal with basic escape handling.""" + self._advance() # consume opening " + parts = [] + while self._pos < self._length: + ch = self._text[self._pos] + if ch == "\\": + self._pos += 1 + if self._pos < self._length: + escaped = self._text[self._pos] + if escaped == "n": + parts.append("\n") + elif escaped == "t": + parts.append("\t") + elif escaped == "r": + parts.append("\r") + else: + parts.append(escaped) + self._pos += 1 + elif ch == '"': + self._pos += 1 + break + else: + parts.append(ch) + self._pos += 1 + return "".join(parts) + + def _read_multiline_string(self) -> str: + """Read a ``'' ... ''`` multiline string literal.""" + self._pos += 2 # consume opening '' + parts = [] + while self._pos < self._length: + ch = self._text[self._pos] + if ch == "'" and self._peek(1) == "'": + # Check for escape: ''' means literal '' + if self._peek(2) == "'": + parts.append("''") + self._pos += 3 + else: + self._pos += 2 # consume closing '' + break + elif ch == "'" and self._peek(1) == "\\": + # ''\ escape sequences + self._pos += 2 + if self._pos < self._length: + escaped = self._text[self._pos] + if escaped == "n": + parts.append("\n") + elif escaped == "t": + parts.append("\t") + else: + parts.append(escaped) + self._pos += 1 + else: + parts.append(ch) + self._pos += 1 + content = "".join(parts) + return textwrap.dedent(content) + + def _read_identifier(self) -> str: + """Read a bare identifier: alphanumeric, ``_``, or ``-``.""" + start = self._pos + while self._pos < self._length: + ch = self._text[self._pos] + if ch.isalnum() or ch in ("_", "-"): + self._pos += 1 + else: + break + return self._text[start : self._pos] + + def _read_path(self) -> str: + """Read a path literal starting with ``./`` or ``/``.""" + start = self._pos + while self._pos < self._length: + ch = self._text[self._pos] + if ch in (" ", "\t", "\n", "\r", ";", "]", "}"): + break + self._pos += 1 + return self._text[start : self._pos] + + def tokenize(self) -> list[_NixToken]: + tokens = [] + while True: + self._skip_whitespace_and_comments() + if self._pos >= self._length: + tokens.append(_NixToken(_NixTokenType.EOF, "")) + break + + ch = self._peek() + + if ch == "{": + tokens.append(_NixToken(_NixTokenType.LBRACE, "{")) + self._advance() + elif ch == "}": + tokens.append(_NixToken(_NixTokenType.RBRACE, "}")) + self._advance() + elif ch == "[": + tokens.append(_NixToken(_NixTokenType.LBRACKET, "[")) + self._advance() + elif ch == "]": + tokens.append(_NixToken(_NixTokenType.RBRACKET, "]")) + self._advance() + elif ch == "=": + tokens.append(_NixToken(_NixTokenType.EQUALS, "=")) + self._advance() + elif ch == ";": + tokens.append(_NixToken(_NixTokenType.SEMICOLON, ";")) + self._advance() + elif ch == ":": + tokens.append(_NixToken(_NixTokenType.COLON, ":")) + self._advance() + elif ch == ".": + if self._peek(1) == "/": + # Path literal: ./foo/bar + tokens.append(_NixToken(_NixTokenType.STRING, self._read_path())) + else: + tokens.append(_NixToken(_NixTokenType.DOT, ".")) + self._advance() + elif ch == "/": + # Absolute path literal + tokens.append(_NixToken(_NixTokenType.STRING, self._read_path())) + elif ch == '"': + tokens.append(_NixToken(_NixTokenType.STRING, self._read_string())) + elif ch == "'" and self._peek(1) == "'": + tokens.append(_NixToken(_NixTokenType.MULTILINE_STRING, self._read_multiline_string())) + elif ch == ",": + # Commas appear in function headers { lib, ... }: — skip + self._advance() + elif ch.isalnum() or ch == "_": + tokens.append(_NixToken(_NixTokenType.IDENT, self._read_identifier())) + else: + # Skip unknown characters + self._advance() + + return tokens + + +class _NixParser: + """Recursive descent parser over a Nix token stream. + + Parses the subset of Nix used in NixOS configuration files into nested + Python dictionaries, lists, strings, booleans, and integers. + """ + + def __init__(self, tokens: list[_NixToken]) -> None: + self._tokens = tokens + self._pos = 0 + + def _peek(self, offset: int = 0) -> _NixToken: + pos = self._pos + offset + if pos < len(self._tokens): + return self._tokens[pos] + return _NixToken(_NixTokenType.EOF, "") + + def _advance(self) -> _NixToken: + token = self._tokens[self._pos] + self._pos += 1 + return token + + def _expect(self, token_type: _NixTokenType) -> _NixToken: + token = self._advance() + if token.type != token_type: + raise ConfigurationParsingError( + f"Expected {token_type.value}, got {token.type.value} ({token.value!r})" + ) + return token + + def parse(self) -> dict: + """Entry point. Handles optional function header, optional let-in, then parses body.""" + self._skip_function_header() + self._skip_let_in() + if self._peek().type == _NixTokenType.LBRACE: + return self._parse_attr_set() + return {} + + def _skip_function_header(self) -> None: + """Skip ``{ lib, ..., ... }:`` function parameter pattern. + + Detects by scanning from the first ``{`` to find if the matching ``}`` is followed by ``:``.""" + if self._peek().type != _NixTokenType.LBRACE: + return + + # Scan ahead to find matching RBRACE + depth = 0 + scan_pos = self._pos + while scan_pos < len(self._tokens): + t = self._tokens[scan_pos] + if t.type == _NixTokenType.LBRACE: + depth += 1 + elif t.type == _NixTokenType.RBRACE: + depth -= 1 + if depth == 0: + # Check if next token is COLON — indicates function header + if scan_pos + 1 < len(self._tokens) and self._tokens[scan_pos + 1].type == _NixTokenType.COLON: + self._pos = scan_pos + 2 # skip past }: + return + scan_pos += 1 + + def _skip_let_in(self) -> None: + """Skip ``let ... in`` binding blocks.""" + while self._peek().type == _NixTokenType.IDENT and self._peek().value == "let": + self._advance() # consume "let" + # Skip until we find "in" at the same depth + depth = 0 + while self._pos < len(self._tokens): + t = self._peek() + if t.type == _NixTokenType.LBRACE: + depth += 1 + self._advance() + elif t.type == _NixTokenType.RBRACE: + depth -= 1 + self._advance() + elif t.type == _NixTokenType.IDENT and t.value == "in" and depth == 0: + self._advance() # consume "in" + break + else: + self._advance() + + def _parse_attr_set(self) -> dict: + """Parse ``{ key = value; ... }`` into a dict.""" + self._expect(_NixTokenType.LBRACE) + result: dict = {} + + while self._peek().type != _NixTokenType.RBRACE and self._peek().type != _NixTokenType.EOF: + # Parse key path (possibly dotted) + key_parts = self._parse_key_path() + if not key_parts: + # Skip unexpected tokens + self._advance() + continue + + self._expect(_NixTokenType.EQUALS) + value = self._parse_value() + self._expect(_NixTokenType.SEMICOLON) + + self._merge_dotted_key(result, key_parts, value) + + if self._peek().type == _NixTokenType.RBRACE: + self._advance() # consume } + + return result + + def _parse_key_path(self) -> list[str]: + """Parse a dotted key path like ``networking.interfaces.eth0``.""" + token = self._peek() + if token.type == _NixTokenType.STRING: + parts = [self._advance().value] + elif token.type == _NixTokenType.IDENT: + parts = [self._advance().value] + else: + return [] + + while self._peek().type == _NixTokenType.DOT: + self._advance() # consume DOT + token = self._peek() + if token.type in (_NixTokenType.IDENT, _NixTokenType.STRING): + parts.append(self._advance().value) + else: + break + + return parts + + def _parse_value(self) -> str | int | bool | dict | list: + """Parse a value expression.""" + token = self._peek() + + if token.type == _NixTokenType.STRING: + return self._advance().value + + if token.type == _NixTokenType.MULTILINE_STRING: + return self._advance().value + + if token.type == _NixTokenType.LBRACE: + return self._parse_attr_set() + + if token.type == _NixTokenType.LBRACKET: + return self._parse_list() + + if token.type == _NixTokenType.IDENT: + # Check for lib.mkForce / lib.mkDefault / lib.mkOverride + if token.value == "lib" and self._peek(1).type == _NixTokenType.DOT: + return self._unwrap_lib_call() + + value = self._advance().value + + # Convert native types + if value == "true": + return True + if value == "false": + return False + if value == "null": + return None + try: + return int(value) + except ValueError: + pass + + # Bare identifier used as value (e.g., variable reference) — check if followed by + # more identifiers (function application like: pkgs.something) + while self._peek().type == _NixTokenType.DOT: + self._advance() # consume DOT + if self._peek().type == _NixTokenType.IDENT: + value = f"{value}.{self._advance().value}" + else: + break + + return value + + # Fallback: skip unexpected token + return self._advance().value + + def _parse_list(self) -> list: + """Parse ``[ elem1 elem2 ... ]`` into a Python list.""" + self._expect(_NixTokenType.LBRACKET) + result = [] + + while self._peek().type != _NixTokenType.RBRACKET and self._peek().type != _NixTokenType.EOF: + result.append(self._parse_value()) + + if self._peek().type == _NixTokenType.RBRACKET: + self._advance() # consume ] + + return result + + def _unwrap_lib_call(self) -> str | int | bool | dict | list: + """Unwrap ``lib.mkForce value``, ``lib.mkDefault value``, ``lib.mkOverride N value``.""" + self._advance() # consume "lib" + self._advance() # consume DOT + func_name = self._advance().value # e.g., "mkForce", "mkDefault", "mkOverride", "mkIf" + + if func_name == "mkOverride": + # lib.mkOverride — skip priority, return value + self._parse_value() # consume and discard priority + return self._parse_value() + + if func_name == "mkIf": + # lib.mkIf — skip condition, return value + self._parse_value() # consume and discard condition + return self._parse_value() + + # lib.mkForce / lib.mkDefault / others — next token is the value + return self._parse_value() + + def _merge_dotted_key(self, target: dict, parts: list[str], value: ...) -> None: + """Merge a dotted key path into nested dicts. + + ``["a", "b", "c"]`` with value ``1`` produces ``{"a": {"b": {"c": 1}}}``. + Merges with existing dicts at each level. + """ + current = target + for part in parts[:-1]: + if part not in current: + current[part] = {} + elif not isinstance(current[part], dict): + current[part] = {} + current = current[part] + final_key = parts[-1] + existing = current.get(final_key) + if isinstance(existing, dict) and isinstance(value, dict): + existing.update(value) + else: + current[final_key] = value + + +class Nix(ConfigurationParser): + """Parse NixOS ``.nix`` configuration files into nested dictionaries. + + Handles the subset of Nix used in declarative NixOS configurations: + function headers, attribute sets (with dotted paths), lists, strings, + multiline strings, booleans, integers, and ``lib.mkForce``/``lib.mkDefault`` + wrappers. + + Examples: + + .. code-block:: + + >>> nix_data = '{ ... }: { networking.hostName = "test"; }' + >>> parser = Nix() + >>> parser.read_file(io.StringIO(nix_data)) + >>> parser.parsed_data + {"networking": {"hostName": "test"}} + """ + + def parse_file(self, fh: TextIO) -> None: + text = fh.read() + tokens = _NixTokenizer(text).tokenize() + self.parsed_data = _NixParser(tokens).parse() + + @dataclass(frozen=True) class ParserOptions: collapse: bool | set[str] | None = None @@ -970,6 +1422,7 @@ def create_parser(self, options: ParserOptions | None = None) -> ConfigurationPa "template": ParserConfig(Txt), "toml": ParserConfig(Toml), "leases": ParserConfig(Leases), + "nix": ParserConfig(Nix), "meta_bare": ParserConfig(Default), # Return the basic config parser } diff --git a/dissect/target/plugins/os/unix/linux/network.py b/dissect/target/plugins/os/unix/linux/network.py index f2457b775e..849cc6a291 100644 --- a/dissect/target/plugins/os/unix/linux/network.py +++ b/dissect/target/plugins/os/unix/linux/network.py @@ -394,6 +394,130 @@ def _parse_gateway(self, value: str | None) -> NetAddress | None: return ip_address(value) +class NixOSConfigParser(LinuxNetworkConfigParser): + """NixOS declarative network configuration parser. + + NixOS uses the Nix functional language for system configuration. Network settings + are declared in ``/etc/nixos/configuration.nix`` or imported ``.nix`` files + (commonly ``networking.nix``). This parser uses :func:`configutil.parse` with + ``hint="nix"`` to parse ``.nix`` files into dictionaries, then extracts network + interface information from the resulting structure. + + References: + - https://nixos.org/manual/nixos/stable/#sec-networking + """ + + config_paths: tuple[str, ...] = ("/etc/nixos/",) + + def interfaces(self) -> Iterator[UnixInterfaceRecord]: + if not ( + self._target.fs.path("/etc/NIXOS").exists() + or self._target.fs.path("/etc/nixos/configuration.nix").exists() + ): + return + + for config_file in self._config_files(self.config_paths, "*.nix"): + try: + config = configutil.parse(config_file, hint="nix") + except Exception as e: + self._target.log.warning("Error parsing NixOS config file %s", config_file) + self._target.log.debug("", exc_info=e) + continue + + networking = config.get("networking", {}) + if not networking: + continue + + dns: set[NetAddress] = set() + for server in networking.get("nameservers", []): + try: + dns.add(ip_address(str(server))) + except ValueError: + self._target.log.debug("Invalid NixOS DNS server: %s", server) + + gateways: set[NetAddress] = set() + for gw_key in ("defaultGateway", "defaultGateway6"): + gw = networking.get(gw_key) + if isinstance(gw, str) and gw: + try: + gateways.add(ip_address(gw)) + except ValueError: + self._target.log.debug("Invalid NixOS gateway: %s", gw) + elif isinstance(gw, dict): + gw_addr = gw.get("address", "") + if gw_addr: + try: + gateways.add(ip_address(str(gw_addr))) + except ValueError: + self._target.log.debug("Invalid NixOS gateway: %s", gw_addr) + + use_dhcp = networking.get("useDHCP", False) is True + dhcpcd = networking.get("dhcpcd", {}) + if isinstance(dhcpcd, dict) and dhcpcd.get("enable") is True: + use_dhcp = True + + interfaces_cfg = networking.get("interfaces", {}) + if not isinstance(interfaces_cfg, dict): + interfaces_cfg = {} + + if not interfaces_cfg: + if dns or gateways or use_dhcp: + yield UnixInterfaceRecord( + name=None, + type="dhcp" if use_dhcp else "static", + enabled=None, + dns=list(dns), + gateway=list(gateways), + source=str(config_file), + dhcp_ipv4=use_dhcp, + dhcp_ipv6=use_dhcp, + configurator="nixos", + _target=self._target, + ) + continue + + for iface_name, iface_cfg in sorted(interfaces_cfg.items()): + if not isinstance(iface_cfg, dict): + continue + + ip_interfaces: set[NetInterface] = set() + iface_dhcp = use_dhcp + + if iface_cfg.get("useDHCP") is True: + iface_dhcp = True + elif iface_cfg.get("useDHCP") is False: + iface_dhcp = False + + for ip_version in ("ipv4", "ipv6"): + ip_cfg = iface_cfg.get(ip_version, {}) + if not isinstance(ip_cfg, dict): + continue + for addr_entry in ip_cfg.get("addresses", []): + if not isinstance(addr_entry, dict): + continue + addr = addr_entry.get("address") + prefix = addr_entry.get("prefixLength") + if addr and prefix is not None: + try: + ip_interfaces.add(ip_interface(f"{addr}/{prefix}")) + except ValueError: + self._target.log.debug("Invalid NixOS address: %s/%s", addr, prefix) + + yield UnixInterfaceRecord( + name=iface_name, + type="dhcp" if iface_dhcp else "static", + enabled=None, + cidr=ip_interfaces, + gateway=list(gateways), + dns=list(dns), + source=str(config_file), + dhcp_ipv4=iface_dhcp, + dhcp_ipv6=iface_dhcp, + configurator="nixos", + _target=self._target, + ) + + class DhclientLeaseParser(LinuxNetworkConfigParser): """Parse network interfaces from dhclient DHCP ``.leases`` files. @@ -763,142 +887,5 @@ def parse_debian_centos_dhclient_lease(log_record: LogRecord) -> DhcpLease | Non return DhcpLease(None, ip_interface(ip), None) if (ip := match.group(1)) else None -class NixOSConfigParser(LinuxNetworkConfigParser): - """NixOS declarative network configuration parser. - - NixOS uses the Nix functional language for system configuration. Network settings - are declared in ``/etc/nixos/configuration.nix`` or imported ``.nix`` files - (commonly ``networking.nix``). This parser uses regex to extract IP addresses, - gateways, DNS servers, interface names, and DHCP settings from these files. - - References: - - https://nixos.org/manual/nixos/stable/#sec-networking - """ - - config_paths: tuple[str, ...] = ("/etc/nixos/",) - - def interfaces(self) -> Iterator[UnixInterfaceRecord]: - # Only parse if this is actually a NixOS system - if not ( - self._target.fs.path("/etc/NIXOS").exists() - or self._target.fs.path("/etc/nixos/configuration.nix").exists() - ): - return - - for config_file in self._config_files(self.config_paths, "*.nix"): - try: - text = config_file.open("rt").read() - except Exception as e: - self._target.log.warning("Error reading NixOS config file %s", config_file) - self._target.log.debug("", exc_info=e) - continue - - ip_interfaces: set[NetInterface] = set() - gateways: set[NetAddress] = set() - dns: set[NetAddress] = set() - iface_names: set[str] = set() - dhcp_ipv4 = False - dhcp_ipv6 = False - - # Extract IP addresses from ipv4.addresses and ipv6.addresses blocks. - # Avoid matching address values inside .routes blocks or udev rules. - in_addresses_block = False - for line in text.split("\n"): - stripped = line.strip() - if re.search(r"\.addresses\s*=\s*\[", stripped): - in_addresses_block = True - if in_addresses_block: - for match in re.finditer( - r'address\s*=\s*"([^"]+)";\s*prefixLength\s*=\s*(\d+)', - stripped, - ): - addr, prefix = match.group(1), match.group(2) - if addr and ("." in addr or ":" in addr): - try: - ip_interfaces.add(ip_interface(f"{addr}/{prefix}")) - except ValueError: - self._target.log.debug("Invalid NixOS address: %s/%s", addr, prefix) - if "]" in stripped: - in_addresses_block = False - - # Extract default gateway: defaultGateway = "x.x.x.x" - gateway_match = re.search(r'defaultGateway\s*=\s*"([^"]+)"', text) - if gateway_match and gateway_match.group(1): - try: - gateways.add(ip_address(gateway_match.group(1))) - except ValueError: - self._target.log.debug("Invalid NixOS gateway: %s", gateway_match.group(1)) - - # Extract DNS nameservers: nameservers = [ "x.x.x.x" "y.y.y.y" ] - dns_match = re.search(r"nameservers\s*=\s*\[(.*?)\]", text, re.DOTALL) - if dns_match: - for server in re.findall(r'"([^"]+)"', dns_match.group(1)): - try: - dns.add(ip_address(server)) - except ValueError: - self._target.log.debug("Invalid NixOS DNS server: %s", server) - - # Extract interface names from interfaces.NAME.ipvX or interfaces = { NAME = { } } - for match in re.finditer(r"interfaces\.([a-zA-Z][a-zA-Z0-9_-]*)\.(?:ipv[46]|useDHCP)", text): - iface_names.add(match.group(1)) - - in_interfaces_block = False - brace_depth = 0 - for line in text.split("\n"): - stripped = line.strip() - if re.search(r"interfaces\s*=\s*\{", stripped): - in_interfaces_block = True - brace_depth = 1 - continue - if in_interfaces_block: - brace_depth += stripped.count("{") - stripped.count("}") - iface_match = re.match(r"([a-zA-Z][a-zA-Z0-9_-]*)\s*=\s*\{", stripped) - if iface_match and brace_depth == 2: - iface_names.add(iface_match.group(1)) - if brace_depth <= 0: - in_interfaces_block = False - - # Extract DHCP setting - dhcp_match = re.search(r"(?:dhcpcd\.enable|useDHCP)\s*=\s*(true|false)", text) - if dhcp_match: - dhcp_ipv4 = dhcp_match.group(1) == "true" - dhcp_ipv6 = dhcp_match.group(1) == "true" - - # Only yield if we found any network information - if not (ip_interfaces or gateways or dns or dhcp_ipv4 or dhcp_ipv6): - continue - - # Yield one record per interface, or a single record if no interface names found - if iface_names: - for name in sorted(iface_names): - yield UnixInterfaceRecord( - name=name, - type="static" if not (dhcp_ipv4 or dhcp_ipv6) else "dhcp", - enabled=None, - cidr=ip_interfaces, - gateway=list(gateways), - dns=list(dns), - source=str(config_file), - dhcp_ipv4=dhcp_ipv4, - dhcp_ipv6=dhcp_ipv6, - configurator="nixos", - _target=self._target, - ) - else: - yield UnixInterfaceRecord( - name=None, - type="static" if not (dhcp_ipv4 or dhcp_ipv6) else "dhcp", - enabled=None, - cidr=ip_interfaces, - gateway=list(gateways), - dns=list(dns), - source=str(config_file), - dhcp_ipv4=dhcp_ipv4, - dhcp_ipv6=dhcp_ipv6, - configurator="nixos", - _target=self._target, - ) - - MANAGERS = [NetworkManagerConfigParser, SystemdNetworkConfigParser, NixOSConfigParser, ProcConfigParser] LEASERS = [DhclientLeaseParser, NetworkManagerLeaseParser] diff --git a/tests/helpers/test_configutil.py b/tests/helpers/test_configutil.py index 241928b1d9..f3c4e92231 100644 --- a/tests/helpers/test_configutil.py +++ b/tests/helpers/test_configutil.py @@ -17,6 +17,7 @@ Indentation, Json, Leases, + Nix, ScopeManager, SystemD, parse, @@ -401,3 +402,39 @@ def test_leases_parser(string_data: str, expected_output: dict) -> None: parser.parse_file(StringIO(string_data)) assert parser.parsed_data == expected_output + + +def test_nix_parser_syntax() -> None: + """Test Nix parser syntax: function headers, dotted keys, types, lists, lib.mkForce, comments.""" + data = textwrap.dedent("""\ + { lib, ... }: + # Comment + { + a.b = "1"; a.c = "2"; + types = { bool = true; int = 42; empty = {}; }; + items = [ "x" { address = "10.0.0.1"; prefixLength = 24; } ]; + forced = lib.mkForce false; + } + """) + result = parse_data(Nix, data) + assert result["a"] == {"b": "1", "c": "2"} + assert result["types"] == {"bool": True, "int": 42, "empty": {}} + assert result["items"] == ["x", {"address": "10.0.0.1", "prefixLength": 24}] + assert result["forced"] is False + + +def test_nix_parser_networking_fixture() -> None: + """Test full networking.nix fixture from nixos-infect.""" + fixture = Path(absolute_path("_data/plugins/os/unix/linux/nixos/networking.nix")) + result = parse_data(Nix, fixture.read_text()) + + networking = result["networking"] + assert networking["nameservers"] == ["10.13.37.1", "10.13.37.2"] + assert networking["defaultGateway"] == "10.13.37.0" + assert networking["defaultGateway6"] == {"address": "", "interface": "eth0"} + assert networking["dhcpcd"]["enable"] is False + assert networking["usePredictableInterfaceNames"] is False + + eth0 = networking["interfaces"]["eth0"] + assert eth0["ipv4"]["addresses"] == [{"address": "10.13.37.10", "prefixLength": 24}] + assert eth0["ipv6"]["addresses"] == [{"address": "2001:db8::1", "prefixLength": 64}] From 13d6ff93a77fad8234d24590f1aa1cb98b40355e Mon Sep 17 00:00:00 2001 From: jmvermeulen Date: Wed, 25 Feb 2026 19:33:33 +0100 Subject: [PATCH 4/4] refactor: trim Nix parser to minimal feature set Remove features not used in nixos-infect networking configs: - let...in blocks, lib.mkOverride/mkIf, block comments - multiline string escapes, null values, path literals --- dissect/target/helpers/configutil.py | 117 +++------------------------ 1 file changed, 13 insertions(+), 104 deletions(-) diff --git a/dissect/target/helpers/configutil.py b/dissect/target/helpers/configutil.py index 3aee6bdde3..07decdcc54 100644 --- a/dissect/target/helpers/configutil.py +++ b/dissect/target/helpers/configutil.py @@ -938,8 +938,7 @@ class _NixTokenizer: """Character-by-character tokenizer for Nix configuration files. Produces a stream of tokens from ``.nix`` file text, handling string literals, - multiline strings (``'' ... ''``), line comments (``#``), and block comments - (``/* ... */``). + multiline strings (``'' ... ''``), and line comments (``#``). """ def __init__(self, text: str) -> None: @@ -964,13 +963,6 @@ def _skip_whitespace_and_comments(self) -> None: elif ch == "#": while self._pos < self._length and self._text[self._pos] != "\n": self._pos += 1 - elif ch == "/" and self._peek(1) == "*": - self._pos += 2 - while self._pos < self._length: - if self._text[self._pos] == "*" and self._peek(1) == "/": - self._pos += 2 - break - self._pos += 1 else: break @@ -1006,32 +998,12 @@ def _read_multiline_string(self) -> str: self._pos += 2 # consume opening '' parts = [] while self._pos < self._length: - ch = self._text[self._pos] - if ch == "'" and self._peek(1) == "'": - # Check for escape: ''' means literal '' - if self._peek(2) == "'": - parts.append("''") - self._pos += 3 - else: - self._pos += 2 # consume closing '' - break - elif ch == "'" and self._peek(1) == "\\": - # ''\ escape sequences - self._pos += 2 - if self._pos < self._length: - escaped = self._text[self._pos] - if escaped == "n": - parts.append("\n") - elif escaped == "t": - parts.append("\t") - else: - parts.append(escaped) - self._pos += 1 - else: - parts.append(ch) - self._pos += 1 - content = "".join(parts) - return textwrap.dedent(content) + if self._text[self._pos] == "'" and self._peek(1) == "'": + self._pos += 2 # consume closing '' + break + parts.append(self._text[self._pos]) + self._pos += 1 + return "".join(parts).strip() def _read_identifier(self) -> str: """Read a bare identifier: alphanumeric, ``_``, or ``-``.""" @@ -1044,16 +1016,6 @@ def _read_identifier(self) -> str: break return self._text[start : self._pos] - def _read_path(self) -> str: - """Read a path literal starting with ``./`` or ``/``.""" - start = self._pos - while self._pos < self._length: - ch = self._text[self._pos] - if ch in (" ", "\t", "\n", "\r", ";", "]", "}"): - break - self._pos += 1 - return self._text[start : self._pos] - def tokenize(self) -> list[_NixToken]: tokens = [] while True: @@ -1086,15 +1048,8 @@ def tokenize(self) -> list[_NixToken]: tokens.append(_NixToken(_NixTokenType.COLON, ":")) self._advance() elif ch == ".": - if self._peek(1) == "/": - # Path literal: ./foo/bar - tokens.append(_NixToken(_NixTokenType.STRING, self._read_path())) - else: - tokens.append(_NixToken(_NixTokenType.DOT, ".")) - self._advance() - elif ch == "/": - # Absolute path literal - tokens.append(_NixToken(_NixTokenType.STRING, self._read_path())) + tokens.append(_NixToken(_NixTokenType.DOT, ".")) + self._advance() elif ch == '"': tokens.append(_NixToken(_NixTokenType.STRING, self._read_string())) elif ch == "'" and self._peek(1) == "'": @@ -1142,9 +1097,8 @@ def _expect(self, token_type: _NixTokenType) -> _NixToken: return token def parse(self) -> dict: - """Entry point. Handles optional function header, optional let-in, then parses body.""" + """Entry point. Handles optional function header then parses body.""" self._skip_function_header() - self._skip_let_in() if self._peek().type == _NixTokenType.LBRACE: return self._parse_attr_set() return {} @@ -1172,26 +1126,6 @@ def _skip_function_header(self) -> None: return scan_pos += 1 - def _skip_let_in(self) -> None: - """Skip ``let ... in`` binding blocks.""" - while self._peek().type == _NixTokenType.IDENT and self._peek().value == "let": - self._advance() # consume "let" - # Skip until we find "in" at the same depth - depth = 0 - while self._pos < len(self._tokens): - t = self._peek() - if t.type == _NixTokenType.LBRACE: - depth += 1 - self._advance() - elif t.type == _NixTokenType.RBRACE: - depth -= 1 - self._advance() - elif t.type == _NixTokenType.IDENT and t.value == "in" and depth == 0: - self._advance() # consume "in" - break - else: - self._advance() - def _parse_attr_set(self) -> dict: """Parse ``{ key = value; ... }`` into a dict.""" self._expect(_NixTokenType.LBRACE) @@ -1264,8 +1198,6 @@ def _parse_value(self) -> str | int | bool | dict | list: return True if value == "false": return False - if value == "null": - return None try: return int(value) except ValueError: @@ -1299,22 +1231,10 @@ def _parse_list(self) -> list: return result def _unwrap_lib_call(self) -> str | int | bool | dict | list: - """Unwrap ``lib.mkForce value``, ``lib.mkDefault value``, ``lib.mkOverride N value``.""" + """Unwrap ``lib.mkForce value`` and ``lib.mkDefault value``.""" self._advance() # consume "lib" self._advance() # consume DOT - func_name = self._advance().value # e.g., "mkForce", "mkDefault", "mkOverride", "mkIf" - - if func_name == "mkOverride": - # lib.mkOverride — skip priority, return value - self._parse_value() # consume and discard priority - return self._parse_value() - - if func_name == "mkIf": - # lib.mkIf — skip condition, return value - self._parse_value() # consume and discard condition - return self._parse_value() - - # lib.mkForce / lib.mkDefault / others — next token is the value + self._advance() # consume function name (mkForce, mkDefault, etc.) return self._parse_value() def _merge_dotted_key(self, target: dict, parts: list[str], value: ...) -> None: @@ -1343,18 +1263,7 @@ class Nix(ConfigurationParser): Handles the subset of Nix used in declarative NixOS configurations: function headers, attribute sets (with dotted paths), lists, strings, - multiline strings, booleans, integers, and ``lib.mkForce``/``lib.mkDefault`` - wrappers. - - Examples: - - .. code-block:: - - >>> nix_data = '{ ... }: { networking.hostName = "test"; }' - >>> parser = Nix() - >>> parser.read_file(io.StringIO(nix_data)) - >>> parser.parsed_data - {"networking": {"hostName": "test"}} + booleans, integers, and ``lib.mkForce``/``lib.mkDefault`` wrappers. """ def parse_file(self, fh: TextIO) -> None: