From 6f9c2d9af82b0c1898a92c70585aed65a8ca36f0 Mon Sep 17 00:00:00 2001
From: JSCU-CNI <121175071+JSCU-CNI@users.noreply.github.com>
Date: Wed, 4 Mar 2026 17:54:16 +0100
Subject: [PATCH] Refactor and extend OpenVPN plugins
---
dissect/target/plugins/apps/vpn/openvpn.py | 220 --------
.../plugins/apps/vpn/openvpn/__init__.py | 0
.../target/plugins/apps/vpn/openvpn/client.py | 297 +++++++++++
.../plugins/apps/vpn/openvpn/openvpn.py | 9 +
.../target/plugins/apps/vpn/openvpn/server.py | 490 ++++++++++++++++++
.../target/plugins/apps/vpn/openvpn/util.py | 84 +++
.../apps/vpn/openvpn/{ => client}/client.conf | 0
.../apps/vpn/openvpn/client/client.crt | 0
.../apps/vpn/openvpn/client/client.key | 0
.../apps/vpn/openvpn/client/client.ovpn | 3 +
.../apps/vpn/openvpn/client/config.json | 3 +
.../apps/vpn/openvpn/client/name-profile.log | 3 +
.../apps/vpn/openvpn/regression/client.conf | 3 +
.../apps/vpn/openvpn/regression/server.conf | 3 +
.../plugins/apps/vpn/openvpn/server.conf | 3 -
.../plugins/apps/vpn/openvpn/server/log.db | 3 +
.../apps/vpn/openvpn/server/openvpn.log | 3 +
.../vpn/openvpn/server/openvpnas.node.log | 3 +
.../apps/vpn/openvpn/server/server.conf | 3 +
.../apps/vpn/openvpn/server/status.log | 3 +
.../apps/vpn/openvpn/server/userprop.db | 3 +
tests/plugins/apps/vpn/openvpn/__init__.py | 0
tests/plugins/apps/vpn/openvpn/test_client.py | 122 +++++
.../apps/vpn/openvpn/test_regression.py | 123 +++++
tests/plugins/apps/vpn/openvpn/test_server.py | 231 +++++++++
tests/plugins/apps/vpn/test_openvpn.py | 127 -----
26 files changed, 1389 insertions(+), 350 deletions(-)
delete mode 100644 dissect/target/plugins/apps/vpn/openvpn.py
create mode 100644 dissect/target/plugins/apps/vpn/openvpn/__init__.py
create mode 100644 dissect/target/plugins/apps/vpn/openvpn/client.py
create mode 100644 dissect/target/plugins/apps/vpn/openvpn/openvpn.py
create mode 100644 dissect/target/plugins/apps/vpn/openvpn/server.py
create mode 100644 dissect/target/plugins/apps/vpn/openvpn/util.py
rename tests/_data/plugins/apps/vpn/openvpn/{ => client}/client.conf (100%)
mode change 100644 => 100755
create mode 100644 tests/_data/plugins/apps/vpn/openvpn/client/client.crt
create mode 100644 tests/_data/plugins/apps/vpn/openvpn/client/client.key
create mode 100755 tests/_data/plugins/apps/vpn/openvpn/client/client.ovpn
create mode 100755 tests/_data/plugins/apps/vpn/openvpn/client/config.json
create mode 100755 tests/_data/plugins/apps/vpn/openvpn/client/name-profile.log
create mode 100644 tests/_data/plugins/apps/vpn/openvpn/regression/client.conf
create mode 100644 tests/_data/plugins/apps/vpn/openvpn/regression/server.conf
delete mode 100644 tests/_data/plugins/apps/vpn/openvpn/server.conf
create mode 100755 tests/_data/plugins/apps/vpn/openvpn/server/log.db
create mode 100755 tests/_data/plugins/apps/vpn/openvpn/server/openvpn.log
create mode 100755 tests/_data/plugins/apps/vpn/openvpn/server/openvpnas.node.log
create mode 100755 tests/_data/plugins/apps/vpn/openvpn/server/server.conf
create mode 100755 tests/_data/plugins/apps/vpn/openvpn/server/status.log
create mode 100755 tests/_data/plugins/apps/vpn/openvpn/server/userprop.db
create mode 100644 tests/plugins/apps/vpn/openvpn/__init__.py
create mode 100644 tests/plugins/apps/vpn/openvpn/test_client.py
create mode 100644 tests/plugins/apps/vpn/openvpn/test_regression.py
create mode 100644 tests/plugins/apps/vpn/openvpn/test_server.py
delete mode 100644 tests/plugins/apps/vpn/test_openvpn.py
diff --git a/dissect/target/plugins/apps/vpn/openvpn.py b/dissect/target/plugins/apps/vpn/openvpn.py
deleted file mode 100644
index 4e136045f6..0000000000
--- a/dissect/target/plugins/apps/vpn/openvpn.py
+++ /dev/null
@@ -1,220 +0,0 @@
-from __future__ import annotations
-
-import io
-import itertools
-from itertools import product
-from typing import TYPE_CHECKING, Final
-
-from dissect.target.exceptions import ConfigurationParsingError, UnsupportedPluginError
-from dissect.target.helpers.configutil import Default, ListUnwrapper, _update_dictionary
-from dissect.target.helpers.record import TargetRecordDescriptor
-from dissect.target.plugin import OperatingSystem, Plugin, arg, export
-
-if TYPE_CHECKING:
- from collections.abc import Iterator
-
- from dissect.target.helpers import fsutil
- from dissect.target.target import Target
-
-COMMON_ELEMENTS = [
- ("string", "name"), # basename of .conf file
- ("string", "proto"),
- ("string", "dev"),
- ("string", "ca"),
- ("string", "cert"),
- ("string", "key"),
- ("boolean", "redacted_key"),
- ("string", "tls_auth"),
- ("string", "status"),
- ("string", "log"),
- ("string", "source"),
-]
-
-OpenVPNServer = TargetRecordDescriptor(
- "application/vpn/openvpn/server",
- [
- ("net.ipaddress", "local"),
- ("uint16", "port"),
- ("string", "dh"),
- ("string", "topology"),
- ("string", "server"),
- ("string", "ifconfig_pool_persist"),
- ("string[]", "pushed_options"),
- ("boolean", "client_to_client"),
- ("boolean", "duplicate_cn"),
- *COMMON_ELEMENTS,
- ],
-)
-
-OpenVPNClient = TargetRecordDescriptor(
- "application/vpn/openvpn/client",
- [
- ("string[]", "remote"),
- *COMMON_ELEMENTS,
- ],
-)
-
-
-class OpenVPNParser(Default):
- def __init__(self, *args, **kwargs):
- boolean_fields = OpenVPNServer.getfields("boolean") + OpenVPNClient.getfields("boolean")
- self.boolean_field_names = {field.name.replace("_", "-") for field in boolean_fields}
-
- super().__init__(*args, separator=(r"\s",), collapse=["key", "ca", "cert"], **kwargs)
-
- def parse_file(self, fh: io.TextIOBase) -> None:
- root = {}
- iterator = self.line_reader(fh)
- for line in iterator:
- if line.startswith("<"):
- key = line.strip().strip("<>")
- value = self._read_blob(iterator)
- _update_dictionary(root, key, value)
- continue
-
- self._parse_line(root, line)
-
- self.parsed_data = ListUnwrapper.unwrap(root)
-
- def _read_blob(self, lines: Iterator[str]) -> str | list[dict]:
- """Read the whole section between ```` sections."""
- output = ""
- with io.StringIO() as buffer:
- for line in lines:
- if "" in line:
- break
-
- buffer.write(line)
- output = buffer.getvalue()
-
- # Check for connection profile blocks
- if not output.startswith("-----"):
- profile_dict = {}
- for line in output.splitlines():
- self._parse_line(profile_dict, line)
-
- # We put it as a list as _update_dictionary appends data in a list.
- output = [profile_dict]
-
- return output
-
- def _parse_line(self, root: dict, line: str) -> None:
- key, *value = self.SEPARATOR.split(line, 1)
- # Unquote data
- value = value[0].strip() if value else ""
-
- value = value.strip("'\"")
-
- if key in self.boolean_field_names:
- value = True
-
- _update_dictionary(root, key, value)
-
-
-class OpenVPNPlugin(Plugin):
- """OpenVPN configuration parser.
-
- References:
- - man (8) openvpn
- """
-
- __namespace__ = "openvpn"
-
- config_globs = (
- # This catches openvpn@, openvpn-client@, and openvpn-server@ systemd configurations
- # Linux
- "/etc/openvpn/",
- # Windows
- "sysvol/Program Files/OpenVPN/config/",
- )
-
- user_config_paths: Final[dict[str, list[str]]] = {
- OperatingSystem.WINDOWS.value: ["OpenVPN/config/"],
- OperatingSystem.OSX.value: ["Library/Application Support/OpenVPN Connect/profiles/"],
- }
-
- def __init__(self, target: Target):
- super().__init__(target)
- self.configs: list[fsutil.TargetPath] = []
- for base, glob in product(self.config_globs, ["*.conf", "*.ovpn"]):
- self.configs.extend(self.target.fs.path(base).rglob(glob))
-
- user_paths = self.user_config_paths.get(target.os, [])
- for path, glob, user_details in itertools.product(
- user_paths, ["*.conf", "*.ovpn"], self.target.user_details.all_with_home()
- ):
- self.configs.extend(user_details.home_path.joinpath(path).rglob(glob))
-
- def check_compatible(self) -> None:
- if not self.configs:
- raise UnsupportedPluginError("No OpenVPN configuration files found")
-
- def _load_config(self, parser: OpenVPNParser, config_path: fsutil.TargetPath) -> dict | None:
- with config_path.open("rt") as file:
- try:
- parser.parse_file(file)
- except ConfigurationParsingError as e:
- # Couldn't parse file, continue
- self.target.log.info("An issue occurred during parsing of %s, continuing", config_path)
- self.target.log.debug("", exc_info=e)
- return None
-
- return parser.parsed_data
-
- @export(record=[OpenVPNServer, OpenVPNClient])
- @arg("--export-key", action="store_true", help="export private keys to records")
- def config(self, export_key: bool = False) -> Iterator[OpenVPNServer | OpenVPNClient]:
- """Parses config files from openvpn interfaces."""
- # We define the parser here so we can reuse it
- parser = OpenVPNParser()
-
- for config_path in self.configs:
- config = self._load_config(parser, config_path)
-
- common_elements = {
- "name": config_path.stem,
- "proto": config.get("proto", "udp"), # Default is UDP
- "dev": config.get("dev"),
- "ca": config.get("ca"),
- "cert": config.get("cert"),
- "key": config.get("key"),
- "status": config.get("status"),
- "log": config.get("log"),
- "source": config_path,
- "_target": self.target,
- }
-
- if not export_key and "PRIVATE KEY" in common_elements.get("key"):
- common_elements.update({"key": None})
- common_elements.update({"redacted_key": True})
-
- tls_auth = config.get("tls-auth", "")
- # The format of tls-auth is 'tls-auth ta.key '.
- # NUM is either 0 or 1 depending on whether the configuration
- # is for the client or server, and that does not interest us
- # This gets rid of the number at the end, while still supporting spaces
- tls_auth = " ".join(tls_auth.split(" ")[:-1]).strip("'\"")
-
- common_elements.update({"tls_auth": tls_auth})
-
- if "client" in config:
- remote = config.get("remote", [])
-
- yield OpenVPNClient(
- **common_elements,
- remote=remote,
- )
- else:
- # Defaults here are taken from `man (8) openvpn`
- yield OpenVPNServer(
- **common_elements,
- local=config.get("local", "0.0.0.0"),
- port=int(config.get("port", "1194")),
- dh=config.get("dh"),
- topology=config.get("topology"),
- server=config.get("server"),
- ifconfig_pool_persist=config.get("ifconfig-pool-persist"),
- pushed_options=config.get("push", []),
- client_to_client=config.get("client-to-client", False),
- duplicate_cn=config.get("duplicate-cn", False),
- )
diff --git a/dissect/target/plugins/apps/vpn/openvpn/__init__.py b/dissect/target/plugins/apps/vpn/openvpn/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/dissect/target/plugins/apps/vpn/openvpn/client.py b/dissect/target/plugins/apps/vpn/openvpn/client.py
new file mode 100644
index 0000000000..3972e5e7a4
--- /dev/null
+++ b/dissect/target/plugins/apps/vpn/openvpn/client.py
@@ -0,0 +1,297 @@
+from __future__ import annotations
+
+import json
+import re
+from datetime import datetime, timezone
+from typing import TYPE_CHECKING
+
+from dissect.target.exceptions import UnsupportedPluginError
+from dissect.target.helpers.certificate import (
+ COMMON_CERTIFICATE_FIELDS,
+ parse_x509,
+)
+from dissect.target.helpers.fsutil import open_decompress
+from dissect.target.helpers.record import TargetRecordDescriptor
+from dissect.target.plugin import export
+from dissect.target.plugins.apps.vpn.openvpn.openvpn import OpenVPNPlugin
+from dissect.target.plugins.apps.vpn.openvpn.util import OpenVPNParser, parse_config
+
+if TYPE_CHECKING:
+ from collections.abc import Iterator
+ from pathlib import Path
+
+ from dissect.target.target import Target
+
+
+OpenVPNLogRecord = TargetRecordDescriptor(
+ "application/vpn/openvpn/client/log",
+ [
+ ("datetime", "ts"),
+ ("string", "message"),
+ ("path", "source"),
+ ],
+)
+
+OpenVPNProfileRecord = TargetRecordDescriptor(
+ "application/vpn/openvpn/client/profile",
+ [
+ ("datetime", "ts"),
+ ("string", "proto"),
+ ("string", "dev"),
+ ("string[]", "remote"),
+ ("string", "ca"),
+ ("string", "cert"),
+ ("string", "key"),
+ ("string", "auth"),
+ ("string", "status"),
+ ("string", "log"),
+ ("string", "verb"),
+ ("string", "tls_auth"),
+ ("path", "source"),
+ ],
+)
+
+OpenVPNCertificateRecord = TargetRecordDescriptor(
+ "application/vpn/openvpn/client/profile/certificate",
+ [
+ ("datetime", "ts"),
+ *COMMON_CERTIFICATE_FIELDS,
+ ("path", "source"),
+ ],
+)
+
+OpenVPNConfigProxyRecord = TargetRecordDescriptor(
+ "application/vpn/openvpn/client/config/proxy",
+ [
+ ("datetime", "ts"),
+ ("string", "proxy_id"),
+ ("string", "display_name"),
+ ("net.ipaddress", "host"),
+ ("uint16", "port"),
+ ("string", "username"),
+ ("string", "password"),
+ ("path", "source"),
+ ],
+)
+
+OpenVPNConfigProfileRecord = TargetRecordDescriptor(
+ "application/vpn/openvpn/client/config/profile",
+ [
+ ("datetime", "ts"),
+ ("string", "profile_id"),
+ ("string", "display_name"),
+ ("net.ipaddress", "host"),
+ ("path", "file_path"),
+ ("datetime", "last_connected"),
+ ("string", "saved_password"),
+ ("string", "private_key_password"),
+ ("path", "source"),
+ ],
+)
+
+RE_LOG_MESSAGE = re.compile(r"^(?P\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}(?:\+\d{4})?)\s*(?P.+)")
+
+
+class OpenVPNClientPlugin(OpenVPNPlugin):
+ """OpenVPN client plugin.
+
+ Tested on Windows OpenVPN GUI v11.56.0.0, Windows OpenVPN Connect Client v3.8.0 and macOS OpenVPN Connect v3.
+
+ Linux OpenVPN clients store logs in the journal or syslog.
+
+ References:
+ - https://support.openvpn.com/hc/en-us/articles/35154796757275-CloudConnexa-Where-to-Find-OpenVPN-Client-Logs
+ - https://codeberg.org/OpenVPN/openvpn3-linux#logging
+ """
+
+ __namespace__ = "openvpn.client"
+
+ DEFAULT_SYSTEM_PATHS = (
+ # Windows
+ "/sysvol/Program Files/OpenVPN",
+ # macOS
+ "/Library/Application Support/OpenVPN Connect",
+ )
+ DEFAULT_USER_PATHS = (
+ # Windows
+ "OpenVPN",
+ "AppData/Roaming/OpenVPN Connect",
+ # macOS
+ "Library/Application Support/OpenVPN Connect",
+ )
+
+ def __init__(self, target: Target):
+ super().__init__(target)
+
+ self.log_files = list(self._find_log_files())
+ self.profile_files = list(self._find_profile_files())
+ self.config_files = list(self._find_config_files())
+
+ def _find_log_files(self) -> Iterator[Path]:
+ """Search user home folders and system paths for OpenVPN log files."""
+
+ for user_details in self.target.user_details.all_with_home():
+ home_dir = user_details.home_path
+
+ for openvpn_path in self.DEFAULT_USER_PATHS:
+ if (install_path := home_dir.joinpath(openvpn_path)).is_dir():
+ yield from install_path.rglob("*.log")
+
+ for system_path in self.DEFAULT_SYSTEM_PATHS:
+ if (log_path := self.target.fs.path(system_path)).is_dir():
+ yield from log_path.rglob("*.log")
+
+ def _find_profile_files(self) -> Iterator[Path]:
+ """Search user home folders and system paths for connection profile files."""
+
+ for user_details in self.target.user_details.all_with_home():
+ home_dir = user_details.home_path
+
+ for openvpn_path in self.DEFAULT_USER_PATHS:
+ if (install_path := home_dir.joinpath(openvpn_path)).is_dir():
+ yield from install_path.rglob("*.ovpn")
+
+ for openvpn_path in self.DEFAULT_SYSTEM_PATHS:
+ if (install_path := self.target.fs.path(openvpn_path)).is_dir():
+ yield from install_path.rglob("*.ovpn")
+
+ def _find_config_files(self) -> Iterator[Path]:
+ """Searches user home folders for OpenVPN Connect client config files."""
+
+ for user_details in self.target.user_details.all_with_home():
+ home_dir = user_details.home_path
+
+ for openvpn_path in self.DEFAULT_USER_PATHS:
+ if (install_path := home_dir.joinpath(openvpn_path)).is_dir():
+ yield from install_path.rglob("*.json")
+
+ def check_compatible(self) -> None:
+ if not any([self.log_files, self.profile_files, self.config_files]):
+ raise UnsupportedPluginError("No OpenVPN Client file(s) found on target")
+
+ @export(record=OpenVPNLogRecord)
+ def logs(self) -> Iterator[OpenVPNLogRecord]:
+ """Parses full log files from OpenVPN installs"""
+
+ for log_file in self.log_files:
+ for line in open_decompress(log_file, "rt"):
+ if not (line := line.strip()):
+ continue
+
+ if not (match := RE_LOG_MESSAGE.search(line)):
+ self.target.log.warning("Unable to match OpenVPN log line in %s: %r", log_file, line)
+ continue
+
+ group = match.groupdict()
+ ts = (
+ datetime.strptime(group["normal_ts"], "%Y-%m-%d %H:%M:%S")
+ .replace(tzinfo=self.target.datetime.tzinfo)
+ .astimezone(timezone.utc)
+ )
+
+ yield OpenVPNLogRecord(
+ ts=ts,
+ message=group["message"],
+ source=log_file,
+ _target=self.target,
+ )
+
+ @export(record=OpenVPNProfileRecord)
+ def profiles(self) -> Iterator[OpenVPNProfileRecord]:
+ """Yield OpenVPN client connection profile (*.ovpn) records."""
+
+ parser = OpenVPNParser(boolean_fields={})
+
+ for profile_path in self.profile_files:
+ if not (config := parse_config(self.target, parser, profile_path)):
+ continue
+
+ yield OpenVPNProfileRecord(
+ ts=profile_path.lstat().st_mtime,
+ proto=config.get("proto"),
+ dev=config.get("dev"),
+ remote=config.get("remote"),
+ ca=config.get("ca"),
+ cert=config.get("cert"),
+ key=config.get("key"),
+ auth=config.get("auth"),
+ status=config.get("status"),
+ log=config.get("log"),
+ verb=config.get("verb"),
+ tls_auth=config.get("tls-auth"),
+ source=profile_path,
+ _target=self.target,
+ )
+
+ # Yield certificate records for each x509 blob in the config
+ for cert in config.get("ca"), config.get("cert"), config.get("key"):
+ if cert.startswith("-----"):
+ data = cert
+ elif (path := profile_path.parent.joinpath(cert)).is_file():
+ data = path.read_text()
+ else:
+ self.target.log.warning("Profile %s references invalid certificate: %r", profile_path, cert)
+ continue
+
+ try:
+ crt = parse_x509(data)
+ except (ValueError, TypeError) as e:
+ self.target.log.warning("Unable to parse OpenVPN Server certificate in %s: %s", profile_path, e)
+ self.target.log.debug("", exc_info=e)
+ continue
+
+ yield OpenVPNCertificateRecord(
+ ts=crt.not_valid_before,
+ **crt._asdict(),
+ source=profile_path,
+ _target=self.target,
+ )
+
+ @export(records=[OpenVPNConfigProxyRecord, OpenVPNConfigProfileRecord])
+ def config(self) -> Iterator[OpenVPNConfigProxyRecord, OpenVPNConfigProfileRecord]:
+ """Yield Windows OpenVPN Connect client configuration records.
+
+ Currently does not parse embedded certificates in ``config.json``.
+ """
+
+ for config_path in self.config_files:
+ with config_path.open("rt") as fh:
+ try:
+ status_data = json.loads(json.loads(json.loads(fh.read())["persist:root"])["status"])
+ except (UnicodeDecodeError, json.JSONDecodeError) as e:
+ self.target.warning("Failed to parse JSON in file %s: %s", config_path, e)
+ continue
+
+ proxy_list = status_data.get("proxyList", {})
+ profile_list = status_data.get("profiles", {})
+
+ for proxy_id, group in proxy_list.items():
+ yield OpenVPNConfigProxyRecord(
+ ts=config_path.lstat().st_mtime,
+ proxy_id=proxy_id,
+ display_name=group["displayName"],
+ host=group["hostname"],
+ port=int(group["port"]),
+ username=group["username"],
+ password=group["password"],
+ source=config_path,
+ _target=self.target,
+ )
+
+ for profile_id, group in profile_list.items():
+ last_connected = datetime.fromtimestamp(
+ (group["lastConnected"] / 1000), self.target.datetime.tzinfo
+ ).strftime("%Y-%m-%d %H:%M:%S%z")
+
+ yield OpenVPNConfigProfileRecord(
+ ts=last_connected,
+ profile_id=profile_id,
+ display_name=group["profileDisplayName"],
+ host=group["hostname"],
+ file_path=group["filePath"],
+ last_connected=last_connected,
+ saved_password=group["savedPassword"],
+ private_key_password=group["privateKeyPassword"],
+ source=config_path,
+ _target=self.target,
+ )
diff --git a/dissect/target/plugins/apps/vpn/openvpn/openvpn.py b/dissect/target/plugins/apps/vpn/openvpn/openvpn.py
new file mode 100644
index 0000000000..96be7f8df0
--- /dev/null
+++ b/dissect/target/plugins/apps/vpn/openvpn/openvpn.py
@@ -0,0 +1,9 @@
+from __future__ import annotations
+
+from dissect.target.plugin import NamespacePlugin
+
+
+class OpenVPNPlugin(NamespacePlugin):
+ """OpenVPN namespace plugin."""
+
+ __namespace__ = "openvpn"
diff --git a/dissect/target/plugins/apps/vpn/openvpn/server.py b/dissect/target/plugins/apps/vpn/openvpn/server.py
new file mode 100644
index 0000000000..c0f33b598e
--- /dev/null
+++ b/dissect/target/plugins/apps/vpn/openvpn/server.py
@@ -0,0 +1,490 @@
+from __future__ import annotations
+
+import re
+from datetime import datetime, timezone
+from typing import TYPE_CHECKING
+
+from dissect.database import SQLite3
+from dissect.database.exception import Error as DBError
+
+from dissect.target.exceptions import UnsupportedPluginError
+from dissect.target.helpers.certificate import COMMON_CERTIFICATE_FIELDS, parse_x509
+from dissect.target.helpers.fsutil import open_decompress
+from dissect.target.helpers.record import TargetRecordDescriptor
+from dissect.target.plugin import arg, export
+from dissect.target.plugins.apps.vpn.openvpn.openvpn import OpenVPNPlugin
+from dissect.target.plugins.apps.vpn.openvpn.util import OpenVPNParser, parse_config
+
+if TYPE_CHECKING:
+ from collections.abc import Iterator
+ from pathlib import Path
+
+ from dissect.target.target import Target
+
+
+OpenVPNLogRecord = TargetRecordDescriptor(
+ "application/vpn/openvpn/server/log",
+ [
+ ("datetime", "ts"),
+ ("string", "message"),
+ ("path", "source"),
+ ],
+)
+
+OpenVPNLiveConnectionRecord = TargetRecordDescriptor(
+ "application/vpn/openvpn/server/connection/live",
+ [
+ ("datetime", "client_conn_since"),
+ ("string", "client_common_name"),
+ ("net.ipaddress", "client_ip"),
+ ("varint", "client_port"),
+ ("net.ipaddress[]", "client_vpn_ip"),
+ ("string", "client_username"),
+ ("string", "client_id"),
+ ("string", "peer_id"),
+ ("varint", "bytes_received"),
+ ("varint", "bytes_sent"),
+ ("string", "client_ciphers"),
+ ("path", "source"),
+ ],
+)
+
+
+OpenVPNHistoryConnectionRecord = TargetRecordDescriptor(
+ "application/vpn/openvpn/server/connection/history",
+ [
+ ("datetime", "ts"),
+ ("string", "client_id"),
+ ("string", "client_username"),
+ ("net.ipaddress", "client_ip"),
+ ("varint", "client_port"),
+ ("net.ipaddress[]", "client_vpn_ip"),
+ ("string", "client_proto"),
+ ("string", "client_version"),
+ ("string", "client_platform"),
+ ("string", "client_plat_rel"),
+ ("string", "client_gui_ver"),
+ ("string", "client_ciphers"),
+ ("string", "client_ssl"),
+ ("string", "client_hwaddr"),
+ ("uint16", "client_conn_duration"),
+ ("path", "source"),
+ ],
+)
+
+OpenVPNConfigRecord = TargetRecordDescriptor(
+ "application/vpn/openvpn/server/config",
+ [
+ ("datetime", "ts"),
+ ("net.ipaddress", "local"),
+ ("uint16", "port"),
+ ("string", "proto"),
+ ("string", "dev"),
+ ("string", "ca"),
+ ("string", "cert"),
+ ("string", "key"),
+ ("string", "dh"),
+ ("string", "auth"),
+ ("string", "topology"),
+ ("string", "server"),
+ ("string", "ifconfig_pool_persist"),
+ ("string[]", "pushed_options"),
+ ("boolean", "client_to_client"),
+ ("boolean", "duplicate_cn"),
+ ("string", "status"),
+ ("string", "log"),
+ ("string", "verb"),
+ ("string", "tls_auth"),
+ ("path", "source"),
+ ],
+)
+
+OpenVPNCertificateRecord = TargetRecordDescriptor(
+ "application/vpn/openvpn/server/config/certificate",
+ [
+ ("datetime", "ts"),
+ *COMMON_CERTIFICATE_FIELDS,
+ ("path", "source"),
+ ],
+)
+
+OpenVPNUser = TargetRecordDescriptor(
+ "application/vpn/openvpn/server/config/user",
+ [
+ ("string", "user_id"),
+ ("string", "user_name"),
+ ("string", "user_type"),
+ ("boolean", "is_superuser"),
+ ("string", "password_digest"),
+ ("string", "user_auth_type"),
+ ("path", "source"),
+ ],
+)
+
+RE_LOG_MESSAGE = re.compile(r"^(?P\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}(?:\+\d{4})?)\s*(?P.+)")
+RE_LOG_CONNECTION = re.compile(
+ r"^\[stdout#info\] \[OVPN (?P\d+)\] OUT: '(?P\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) "
+ r"(?P\d{1,3}(?:\.\d{1,3}){3}):(?P\d+) peer info: (?P[A-Z_]+)=(?P\S+)'$"
+)
+
+
+class OpenVPNServerPlugin(OpenVPNPlugin):
+ """OpenVPN Server (Linux) plugin.
+
+ Supports OpenVPN Server v2 and OpenVPN Access Server (AS) artifacts. Tested with OpenVPN Server v2.6.14
+ and OpenVPN Access Server v3.0.1 on Linux. Does not parse OpenVPN Server v2 and v3 logs as those
+ are saved in the Linux journal or syslog.
+
+ Does not parse custom ``log`` or ``log-append`` directives (yet).
+
+ References:
+ - https://openvpn.net/vpn-server-resources/logging-and-debug-flag-options-for-%20access-server/
+ - https://openvpn.net/as-docs/tutorials/tutorial--syslog.html
+ """
+
+ __namespace__ = "openvpn.server"
+
+ def __init__(self, target: Target):
+ super().__init__(target)
+
+ self.DEFAULT_LOG_GLOBS = [
+ # OpenVPN Server
+ "/var/log/openvpn.log*",
+ "/sysvol/Program files/OpenVPN/log/*.log",
+ # OpenVPN Access Server
+ "/var/log/openvpnas.log*",
+ "/var/log/openvpnas.node.log",
+ # OpenVPN Server v2
+ "/etc/openvpn/server/openvpn.log",
+ ]
+
+ self.DEFAULT_CONFIG_GLOBS = [
+ # OpenVPN Server
+ "/etc/openvpn/server.conf",
+ "/etc/openvpn/server/server.conf",
+ "/sysvol/Program Files/OpenVPN/config/*.conf",
+ # OpenVPN Access Server
+ "/usr/local/openvpn_as/etc/as.conf",
+ ]
+
+ self.DEFAULT_STATUS_PATHS = [
+ "/run/openvpn-server/status-server.log",
+ "/var/log/openvpn/status.log",
+ "/etc/openvpn/openvpn-status.log",
+ "/var/log/openvpn/openvpn-status.log",
+ ]
+
+ self.DEFAULT_CONNECTION_DB_PATHS = [
+ "/usr/local/openvpn_as/etc/db/log.db",
+ ]
+
+ self.DEFAULT_USERS_DB_PATHS = [
+ "/usr/local/openvpn_as/etc/db/userprop.db",
+ "/usr/local/openvpn_as/etc/db/cluster.db",
+ ]
+
+ self.config_files = list(self._find_config_files())
+ self.log_files = list(self._find_logs_files())
+ self.user_db_files = list(self._find_user_db_files())
+ self.connection_db_files = list(self._find_connection_db_files())
+ self.status_files = list(self._find_status_files())
+
+ def _find_config_files(self) -> Iterator[Path]:
+ """Find configuration files for OpenVPN installs."""
+ seen = set()
+
+ for config_path in self.DEFAULT_CONFIG_GLOBS:
+ if "*" in config_path:
+ base, _, glob = config_path.rpartition("/")
+ for path in self.target.fs.path(base).glob(glob):
+ if path not in seen:
+ seen.add(path)
+ yield path
+
+ elif (path := self.target.fs.path(config_path)).is_file() and path not in seen:
+ seen.add(path)
+ yield path
+
+ def _find_logs_files(self) -> Iterator[Path]:
+ """Find log paths for OpenVPN installs"""
+ seen = set()
+
+ for log in self.DEFAULT_LOG_GLOBS:
+ if "*" in log:
+ base, _, glob = log.rpartition("/")
+ for path in self.target.fs.path(base).glob(glob):
+ if path not in seen:
+ seen.add(path)
+ yield path
+ elif (path := self.target.fs.path(log)).is_file() and path not in seen:
+ seen.add(path)
+ yield path
+
+ def _find_user_db_files(self) -> Iterator[Path]:
+ """Find database paths for OpenVPN installs."""
+ for db in self.DEFAULT_USERS_DB_PATHS:
+ if (db_path := self.target.fs.path(db)).is_file():
+ yield db_path
+
+ def _find_connection_db_files(self) -> Iterator[Path]:
+ """Find database paths for OpenVPN installs."""
+ for db in self.DEFAULT_CONNECTION_DB_PATHS:
+ if (db_path := self.target.fs.path(db)).is_file():
+ yield db_path
+
+ def _find_status_files(self) -> Iterator[Path]:
+ """Find OpenVPN server status files."""
+ for file in self.DEFAULT_STATUS_PATHS:
+ if (path := self.target.fs.path(file)).is_file():
+ yield path
+
+ def check_compatible(self) -> None:
+ if not any([self.config_files, self.log_files, self.user_db_files, self.connection_db_files]):
+ raise UnsupportedPluginError("No OpenVPN Server install found on target")
+
+ @export(record=[OpenVPNConfigRecord, OpenVPNCertificateRecord])
+ @arg("--export-key", action="store_true", help="export private keys to records")
+ def config(self, export_key: bool = False) -> Iterator[OpenVPNConfigRecord, OpenVPNCertificateRecord]:
+ """Yield OpenVPN server configuration records."""
+
+ parser = OpenVPNParser(boolean_fields=OpenVPNConfigRecord.getfields("boolean"))
+
+ for config_path in self.config_files:
+ if not (config := parse_config(self.target, parser, config_path)):
+ continue
+
+ # Infers default values from openvpn man page (8)
+ yield OpenVPNConfigRecord(
+ ts=config_path.lstat().st_mtime,
+ local=config.get("local", "0.0.0.0"),
+ port=int(config.get("port", "1194")),
+ proto=config.get("proto"),
+ dev=config.get("dev"),
+ ca=config.get("ca"),
+ cert=config.get("cert"),
+ key=config.get("key") if export_key else None,
+ dh=config.get("dh"),
+ auth=config.get("auth"),
+ topology=config.get("topology"),
+ server=config.get("server"),
+ ifconfig_pool_persist=config.get("ifconfig-pool-persist"),
+ pushed_options=config.get("push"),
+ client_to_client=config.get("client-to-client", False),
+ duplicate_cn=config.get("duplicate-cn", False),
+ status=config.get("status"),
+ log=config.get("log"),
+ verb=config.get("verb"),
+ tls_auth=config.get("tls-auth"),
+ source=config_path,
+ _target=self.target,
+ )
+
+ # Yield certificate records for each x509 blob in the config
+ for cert in config.get("ca"), config.get("cert"), config.get("key"):
+ try:
+ crt = parse_x509(cert)
+ except (ValueError, TypeError) as e:
+ self.target.log.warning("Unable to parse OpenVPN Server certificate in %s: %s", config_path, e)
+ self.target.log.debug("", exc_info=e)
+ continue
+
+ yield OpenVPNCertificateRecord(
+ ts=crt.not_valid_before,
+ **crt._asdict(),
+ source=config_path,
+ _target=self.target,
+ )
+
+ @export(record=[OpenVPNLogRecord])
+ def logs(self) -> Iterator[OpenVPNLogRecord]:
+ """Yields OpenVPN Server logs."""
+
+ for log_file in self.log_files:
+ for line in open_decompress(log_file, "rt"):
+ if not (line := line.strip()):
+ continue
+
+ if not (match := RE_LOG_MESSAGE.search(line)):
+ self.target.log.warning("Unable to match OpenVPN log line in %s: %r", log_file, line)
+ continue
+
+ group = match.groupdict()
+ ts = parse_datetime(group["normal_ts"], self.target.datetime.tzinfo)
+
+ yield OpenVPNLogRecord(
+ ts=ts,
+ message=group["message"],
+ source=log_file,
+ _target=self.target,
+ )
+
+ @export(record=OpenVPNUser)
+ def users(self) -> Iterator[OpenVPNUser]:
+ """Yield configured users from OpenVPN Server databases."""
+
+ for db_path in self.user_db_files:
+ try:
+ db = SQLite3(db_path)
+ except DBError as e:
+ self.target.log.warning("Unable to open SQLite3 database %s: %s", db_path, e)
+ continue
+
+ # Connect the profile id and name with the user config
+ users = {}
+ for row in db.table("profile").rows():
+ users[row["id"]] = {"username": row["name"], "type": row["type"]}
+ for row in db.table("config").rows():
+ if row["profile_id"] in users:
+ users[row["profile_id"]][row["name"]] = row["value"]
+
+ for user_id, user_info in users.items():
+ yield OpenVPNUser(
+ user_id=user_id,
+ user_name=user_info.get("username"),
+ user_type=user_info.get("type"),
+ is_superuser=user_info.get("prop_superuser") == "true",
+ password_digest=user_info.get("pvt_password_digest"),
+ user_auth_type=user_info.get("user_auth_type"),
+ source=db_path,
+ _target=self.target,
+ )
+
+ @export(record=[OpenVPNLiveConnectionRecord, OpenVPNHistoryConnectionRecord])
+ def connections(self) -> Iterator[OpenVPNLiveConnectionRecord | OpenVPNHistoryConnectionRecord]:
+ """Yield live and historic OpenVPN Server connections with clients."""
+
+ # TODO: Refactor this function.
+ yield from self._live_connections()
+ yield from self._history_log_connections()
+ yield from self._history_db_connections()
+
+ def _live_connections(self) -> Iterator[OpenVPNLiveConnectionRecord]:
+ """Yield live connections from OpenVPN status log files."""
+ client_list = []
+
+ for status_path in self.status_files:
+ for log_file in open_decompress(status_path, "rt"):
+ try:
+ parts = log_file.split(",")
+ if parts[0] == "HEADER" and parts[1] == "CLIENT_LIST":
+ headers = parts[2:]
+ elif parts[0] == "CLIENT_LIST":
+ group = dict(zip(headers, parts[1:], strict=True))
+
+ if group in client_list:
+ continue
+
+ ts = parse_datetime(group["Connected Since"], self.target.datetime.tzinfo)
+ client_list.append(group)
+
+ except Exception as e:
+ self.target.log.warning("Unable to parse OpenVPN status log: %s with error: %e", status_path, e)
+ continue
+
+ for group in client_list:
+ yield OpenVPNLiveConnectionRecord(
+ client_conn_since=ts,
+ client_common_name=group.get("Common Name"),
+ client_ip=group.get("Real Address", "").split(":")[0],
+ client_port=group.get("Real Address", "").split(":")[-1],
+ client_vpn_ip=[ip for f in ("Virtual Address", "Virtual IPv6 Address") if (ip := group.get(f))],
+ client_username=group.get("Username"),
+ client_id=int(group.get("Client ID", 0)),
+ peer_id=int(group.get("Peer ID", 0)),
+ bytes_received=group.get("Bytes Received"),
+ bytes_sent=group.get("Bytes Sent"),
+ client_ciphers=group.get("Data Channel Cipher\n"),
+ source=status_path,
+ _target=self.target,
+ )
+
+ def _history_log_connections(self) -> Iterator[OpenVPNHistoryConnectionRecord]:
+ """Yields history connection logs from regular logs."""
+
+ connection = {}
+
+ for record in self.logs():
+ if not (match := RE_LOG_CONNECTION.search(record.message)):
+ continue
+
+ group = match.groupdict()
+ ts = parse_datetime(group.get("acces_server_ts", group.get("normal_ts")), self.target.datetime.tzinfo)
+
+ # Save all the meta info from one connection
+ if len(connection) == 0:
+ connection.update(
+ {
+ "ip": group["ip"],
+ "ts": ts,
+ "port": int(group["port"]),
+ "connection_id": group["connection_id"],
+ group["key"]: group["value"],
+ }
+ )
+ else:
+ connection.update({group["key"]: group["value"]})
+
+ if group["key"] == "IV_SSO":
+ yield OpenVPNHistoryConnectionRecord(
+ ts=connection.get("ts"),
+ client_id=int(connection.get("connection_id", 0)),
+ client_ip=connection.get("ip"),
+ client_port=int(connection.get("port", 0)),
+ client_proto=connection.get("IV_PROTO"),
+ client_version=connection.get("IV_VER"),
+ client_platform=connection.get("IV_PLAT"),
+ client_plat_rel=connection.get("UV_PLAT_REL"),
+ client_gui_ver=connection.get("IV_GUI_VER"),
+ client_ciphers=connection.get("IV_CIPHERS"),
+ client_ssl=connection.get("IV_SSL"),
+ client_hwaddr=connection.get("IV_HWADDR"),
+ source=record.source,
+ _target=self.target,
+ )
+ connection = {}
+
+ def _history_db_connections(self) -> Iterator[OpenVPNHistoryConnectionRecord]:
+ """Yields history connection logs from SQLite3 databases."""
+
+ ts_fmt = "%Y-%m-%d %H:%M:%S%z"
+
+ for db_path in self.connection_db_files:
+ try:
+ db = SQLite3(db_path)
+ except DBError as e:
+ self.target.log.warning("Unable to open SQLite3 database %s: %s", db_path, e)
+ continue
+
+ for row in db.table("log").rows():
+ if row.service != "VPN":
+ continue
+
+ yield OpenVPNHistoryConnectionRecord(
+ ts=datetime.fromtimestamp(row["timestamp"], self.target.datetime.tzinfo).strftime(ts_fmt),
+ client_id=row["node"],
+ client_username=row["username"],
+ client_ip=row["real_ip"],
+ client_port=int(row["port"]),
+ client_vpn_ip=[row["vpn_ip"]],
+ client_proto=row["proto"],
+ client_version=row["version"],
+ client_platform=row["platform"],
+ client_gui_ver=row["gui_version"],
+ client_conn_duration=int(row["duration"]),
+ source=db_path,
+ _target=self.target,
+ )
+
+
+def parse_datetime(datetime_str: str, target_tz: timezone) -> datetime:
+ """Convert local system datetime from log and status files to UTC datetime objects.
+
+ OpenVPN Server v2 example::
+ 2025-10-20 19:20:14
+
+ OpenVPN Access Server example::
+ 2025-10-09T18:45:25+1000
+ """
+
+ fmt = "%Y-%m-%dT%H:%M:%S%z" if "T" in datetime_str else "%Y-%m-%d %H:%M:%S"
+ return datetime.strptime(datetime_str, fmt).replace(tzinfo=target_tz).astimezone(timezone.utc)
diff --git a/dissect/target/plugins/apps/vpn/openvpn/util.py b/dissect/target/plugins/apps/vpn/openvpn/util.py
new file mode 100644
index 0000000000..abafb1a93f
--- /dev/null
+++ b/dissect/target/plugins/apps/vpn/openvpn/util.py
@@ -0,0 +1,84 @@
+from __future__ import annotations
+
+import io
+from typing import TYPE_CHECKING
+
+from dissect.target.exceptions import ConfigurationParsingError
+from dissect.target.helpers.configutil import Default, ListUnwrapper, _update_dictionary
+
+if TYPE_CHECKING:
+ from collections.abc import Iterator
+ from pathlib import Path
+
+ from dissect.target.target import Target
+
+
+class OpenVPNParser(Default):
+ def __init__(self, *args, boolean_fields: dict | None = None, **kwargs):
+ self.boolean_field_names = {field.name.replace("_", "-") for field in boolean_fields} if boolean_fields else {}
+
+ super().__init__(*args, separator=(r"\s",), collapse=["key", "ca", "cert"], **kwargs)
+
+ def parse_file(self, fh: io.TextIOBase) -> None:
+ root = {}
+ iterator = self.line_reader(fh)
+ for line in iterator:
+ if line.startswith("<"):
+ key = line.strip().strip("<>")
+ value = self._read_blob(iterator)
+ _update_dictionary(root, key, value)
+ continue
+
+ self._parse_line(root, line)
+
+ self.parsed_data = ListUnwrapper.unwrap(root)
+
+ def _read_blob(self, lines: Iterator[str]) -> str | list[dict]:
+ """Read the whole section between ```` sections."""
+ output = ""
+ with io.StringIO() as buffer:
+ for line in lines:
+ if "" in line:
+ break
+
+ buffer.write(line)
+ output = buffer.getvalue()
+
+ # Check for connection profile blocks
+ if not output.startswith("-----"):
+ profile_dict = {}
+ for line in output.splitlines():
+ self._parse_line(profile_dict, line)
+
+ # We put it as a list as _update_dictionary appends data in a list.
+ output = [profile_dict]
+
+ return output
+
+ def _parse_line(self, root: dict, line: str) -> None:
+ key, *value = self.SEPARATOR.split(line, 1)
+ value = value[0].strip() if value else ""
+
+ if key in self.boolean_field_names:
+ value = True
+
+ # Format of tls-auth is `tls-auth '/path/to/a ta.key' `, we remove the number
+ if key == "tls-auth":
+ value, _, _ = value.rpartition(" ")
+
+ # Unquote
+ value = value.strip("'\"")
+
+ _update_dictionary(root, key, value)
+
+
+def parse_config(target: Target, parser: OpenVPNParser, config_path: Path) -> dict | None:
+ with config_path.open("rt") as file:
+ try:
+ parser.parse_file(file)
+ except ConfigurationParsingError as e:
+ target.log.info("An issue occurred during parsing of %s, continuing", config_path)
+ target.log.debug("", exc_info=e)
+ return None
+
+ return parser.parsed_data
diff --git a/tests/_data/plugins/apps/vpn/openvpn/client.conf b/tests/_data/plugins/apps/vpn/openvpn/client/client.conf
old mode 100644
new mode 100755
similarity index 100%
rename from tests/_data/plugins/apps/vpn/openvpn/client.conf
rename to tests/_data/plugins/apps/vpn/openvpn/client/client.conf
diff --git a/tests/_data/plugins/apps/vpn/openvpn/client/client.crt b/tests/_data/plugins/apps/vpn/openvpn/client/client.crt
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/tests/_data/plugins/apps/vpn/openvpn/client/client.key b/tests/_data/plugins/apps/vpn/openvpn/client/client.key
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/tests/_data/plugins/apps/vpn/openvpn/client/client.ovpn b/tests/_data/plugins/apps/vpn/openvpn/client/client.ovpn
new file mode 100755
index 0000000000..8db1213f91
--- /dev/null
+++ b/tests/_data/plugins/apps/vpn/openvpn/client/client.ovpn
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:d483fd960f00c8f68bf3a0f182021aa628092349ea55c521f8e0270a1e552b3a
+size 4258
diff --git a/tests/_data/plugins/apps/vpn/openvpn/client/config.json b/tests/_data/plugins/apps/vpn/openvpn/client/config.json
new file mode 100755
index 0000000000..1e7e5eaf7f
--- /dev/null
+++ b/tests/_data/plugins/apps/vpn/openvpn/client/config.json
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:3c96dd1be9ad08621c8b41f7ad57c16e3f6fb40f379551e6e22f78e6f9b396b7
+size 26604
diff --git a/tests/_data/plugins/apps/vpn/openvpn/client/name-profile.log b/tests/_data/plugins/apps/vpn/openvpn/client/name-profile.log
new file mode 100755
index 0000000000..971badbea0
--- /dev/null
+++ b/tests/_data/plugins/apps/vpn/openvpn/client/name-profile.log
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:2a7472692bb659f009c5875b94b0c620d9b8c756f07b5c30a8ee5be4661db743
+size 512
diff --git a/tests/_data/plugins/apps/vpn/openvpn/regression/client.conf b/tests/_data/plugins/apps/vpn/openvpn/regression/client.conf
new file mode 100644
index 0000000000..ef398a9eb0
--- /dev/null
+++ b/tests/_data/plugins/apps/vpn/openvpn/regression/client.conf
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:1061571c0bb616172546943973dcb0689aa1c7ca3f0816082d19c42a4d221d44
+size 3696
diff --git a/tests/_data/plugins/apps/vpn/openvpn/regression/server.conf b/tests/_data/plugins/apps/vpn/openvpn/regression/server.conf
new file mode 100644
index 0000000000..b14c17a211
--- /dev/null
+++ b/tests/_data/plugins/apps/vpn/openvpn/regression/server.conf
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:1b24ffbc5d629d998b7eb15859aacc86a874076f2b15f692830c3dfc69c886d0
+size 11105
diff --git a/tests/_data/plugins/apps/vpn/openvpn/server.conf b/tests/_data/plugins/apps/vpn/openvpn/server.conf
deleted file mode 100644
index 311352a2ed..0000000000
--- a/tests/_data/plugins/apps/vpn/openvpn/server.conf
+++ /dev/null
@@ -1,3 +0,0 @@
-version https://git-lfs.github.com/spec/v1
-oid sha256:738d76687e3749c3c17a3674cbe3b5d033fcf0f37ca68b8d1389b8651e5fc178
-size 11106
diff --git a/tests/_data/plugins/apps/vpn/openvpn/server/log.db b/tests/_data/plugins/apps/vpn/openvpn/server/log.db
new file mode 100755
index 0000000000..4689cd2e2c
--- /dev/null
+++ b/tests/_data/plugins/apps/vpn/openvpn/server/log.db
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:a4aa29bd52f29c71f4bb38924772877cb79d16f57fbef45cb25bc8e8f1bb7d8d
+size 40960
diff --git a/tests/_data/plugins/apps/vpn/openvpn/server/openvpn.log b/tests/_data/plugins/apps/vpn/openvpn/server/openvpn.log
new file mode 100755
index 0000000000..8a124be05e
--- /dev/null
+++ b/tests/_data/plugins/apps/vpn/openvpn/server/openvpn.log
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:f1483dd75eecb9092330a82af30d231f9211d5146d6fe7c349f67d5879ea156e
+size 300
diff --git a/tests/_data/plugins/apps/vpn/openvpn/server/openvpnas.node.log b/tests/_data/plugins/apps/vpn/openvpn/server/openvpnas.node.log
new file mode 100755
index 0000000000..88fb78c704
--- /dev/null
+++ b/tests/_data/plugins/apps/vpn/openvpn/server/openvpnas.node.log
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:9e1f9c346a7ae878854c46874a9dab79c227e209b824a77a1a6b03b193cd4724
+size 7498
diff --git a/tests/_data/plugins/apps/vpn/openvpn/server/server.conf b/tests/_data/plugins/apps/vpn/openvpn/server/server.conf
new file mode 100755
index 0000000000..7f4c460c5a
--- /dev/null
+++ b/tests/_data/plugins/apps/vpn/openvpn/server/server.conf
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:250d5fa4494b865f270dbe060def5846f7bae3bfa5f6b6b813108c34b4a2178e
+size 12444
diff --git a/tests/_data/plugins/apps/vpn/openvpn/server/status.log b/tests/_data/plugins/apps/vpn/openvpn/server/status.log
new file mode 100755
index 0000000000..f3cdb00a85
--- /dev/null
+++ b/tests/_data/plugins/apps/vpn/openvpn/server/status.log
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:74ab933a758f9e7a0ea9a0528d2bca3c1c4ffdf5424b2ad56226c2b662c93878
+size 722
diff --git a/tests/_data/plugins/apps/vpn/openvpn/server/userprop.db b/tests/_data/plugins/apps/vpn/openvpn/server/userprop.db
new file mode 100755
index 0000000000..90a55553ef
--- /dev/null
+++ b/tests/_data/plugins/apps/vpn/openvpn/server/userprop.db
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:e182208c480b0e4471585a4c6fbb8541e876f5007ddb074bda81bb8a6785e0f1
+size 49152
diff --git a/tests/plugins/apps/vpn/openvpn/__init__.py b/tests/plugins/apps/vpn/openvpn/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/tests/plugins/apps/vpn/openvpn/test_client.py b/tests/plugins/apps/vpn/openvpn/test_client.py
new file mode 100644
index 0000000000..ba64bdc236
--- /dev/null
+++ b/tests/plugins/apps/vpn/openvpn/test_client.py
@@ -0,0 +1,122 @@
+from __future__ import annotations
+
+from datetime import datetime, timezone
+from typing import TYPE_CHECKING
+
+from dissect.target.plugins.apps.vpn.openvpn.client import OpenVPNClientPlugin
+from tests._utils import absolute_path
+
+if TYPE_CHECKING:
+ from dissect.target.filesystem import VirtualFilesystem
+ from dissect.target.target import Target
+
+
+def test_logs(target_win_users: Target, fs_win: VirtualFilesystem) -> None:
+ """Test if we can parse OpenVPN client logs."""
+
+ fs_win.map_file(
+ "Users/John/OpenVPN/name-profile.log", absolute_path("_data/plugins/apps/vpn/openvpn/client/name-profile.log")
+ )
+ target_win_users.add_plugin(OpenVPNClientPlugin)
+ records = list(target_win_users.openvpn.client.logs())
+
+ assert records[0].ts == datetime(2025, 10, 20, 11, 20, 13, tzinfo=timezone.utc)
+ assert (
+ records[0].message == "Note: --cipher is not set. OpenVPN versions before 2.5 defaulted to BF-CBC as fallback "
+ "when cipher negotiation failed in this case. If you need this fallback please add "
+ "'--data-ciphers-fallback BF-CBC' to your configuration and/or add BF-CBC to --data-ciphers."
+ )
+ assert records[0].source == "C:\\Users\\John\\OpenVPN\\name-profile.log"
+
+ assert records[1].ts == datetime(2025, 10, 20, 11, 20, 13, tzinfo=timezone.utc)
+ assert (
+ records[1].message == "OpenVPN 2.6.15 [git:v2.6.15/90bdd59a95170169] "
+ "Windows [SSL (OpenSSL)] [LZO] [LZ4] [PKCS11] [AEAD] [DCO] built on Sep 22 2025"
+ )
+ assert records[1].source == "C:\\Users\\John\\OpenVPN\\name-profile.log"
+
+ assert records[2].ts == datetime(2025, 10, 20, 11, 20, 13, tzinfo=timezone.utc)
+ assert records[2].message == "Windows version 10.0 (Windows 10 or greater), amd64 executable"
+ assert records[2].source == "C:\\Users\\John\\OpenVPN\\name-profile.log"
+
+
+def test_profiles(target_win_users: Target, fs_win: VirtualFilesystem) -> None:
+ """Test if we can find and parse OpenVPN client connection profile files (*.ovpn)."""
+
+ fs_win.map_file(
+ "Users/John/OpenVPN/client.ovpn", absolute_path("_data/plugins/apps/vpn/openvpn/client/client.ovpn")
+ )
+ target_win_users.add_plugin(OpenVPNClientPlugin)
+ records = list(target_win_users.openvpn.client.profiles())
+
+ assert records[0].proto == "udp"
+ assert records[0].dev == "tun"
+ assert records[0].ca == (
+ "-----BEGIN CERTIFICATE-----\n"
+ "MIIBeTCB/6ADAgECAgRo5QC9MAoGCCqGSM49BAMCMBUxEzARBgNVBAMMCk9wZW5W\n"
+ "UE4gQ0EwHhcNMjUxMDA2MjE1OTU3WhcNMzUxMDA1MjE1OTU3WjAVMRMwEQYDVQQD\n"
+ "DApPcGVuVlBOIENBMHYwEAYHKoZIzj0CAQYFK4EEACIDYgAEH6+NsCoi7mllD7hV\n"
+ "egK88M/hy0kmQD7Jgh5IsO7IX5KIMTb679ltfBZwWJLJ6hMcs4hG/OgP77RrH+Bn\n"
+ "73z7giio9XcDe7iqHB5YrW4I7aIiCfULIfGLSzvOWG8vG9L/oyAwHjAPBgNVHRMB\n"
+ "Af8EBTADAQH/MAsGA1UdDwQEAwIBBjAKBggqhkjOPQQDAgNpADBmAjEApgTC7OSs\n"
+ "IlDx9GKY05zsSwh0AGPsWdL9QL3eZx1kQQZYJmPGAgKSV2GTI+YH2c/bAjEAw1qD\n"
+ "09jj90+YuLMmLyDl+Z+UBHra619/byZK8GddkhJnfOVnAE3F17OKg9ZGFrVq\n"
+ "-----END CERTIFICATE-----\n"
+ )
+ assert records[0].cert == "client.crt"
+ assert records[0].key == "client.key"
+ assert records[0].auth is None
+ assert records[0].status is None
+ assert records[0].log is None
+ assert records[0].verb == "3"
+ assert records[0].tls_auth == "ta.key"
+ assert records[0].source == "C:\\Users\\John\\OpenVPN\\client.ovpn"
+
+ assert records[1].fingerprint.md5 == "cb38a3ebff4c16736434ea8df0244ae4"
+ assert records[1].fingerprint.sha1 == "daf2bf53ba3a3b9080115ec39111f71f40cd8484"
+ assert records[1].fingerprint.sha256 == "91b022ea3af6bcfc58b5123d2d9f990ec388f6746bf692003d4caf9a05c69687"
+ assert records[1].issuer_dn == "CN=OpenVPN CA"
+ assert records[1].not_valid_before == datetime(2025, 10, 6, 21, 59, 57, tzinfo=timezone.utc)
+ assert records[1].not_valid_after == datetime(2035, 10, 5, 21, 59, 57, tzinfo=timezone.utc)
+ assert records[1].subject_dn == "CN=OpenVPN CA"
+ assert records[1].pem
+ assert records[1].source == "C:\\Users\\John\\OpenVPN\\client.ovpn"
+
+
+def test_config(target_win_users: Target, fs_win: VirtualFilesystem) -> None:
+ """Test if we can parse OpenVPN Connect client configuration json files."""
+
+ fs_win.map_file(
+ "Users/John/appdata/roaming/openvpn connect/config.json",
+ absolute_path("_data/plugins/apps/vpn/openvpn/client/config.json"),
+ )
+ target_win_users.add_plugin(OpenVPNClientPlugin)
+ records = list(target_win_users.openvpn.client.config())
+
+ # proxy
+ assert records[0].proxy_id == "proxy_1759841383472_7"
+ assert records[0].display_name == "Access Server - Linux"
+ assert records[0].host == "192.168.173.130"
+ assert records[0].port == 1194
+ assert records[0].username == "openvpn"
+ assert records[0].password == "1XQtc2heauTD"
+ assert records[0].source == "C:\\Users\\John\\AppData\\Roaming\\OpenVPN Connect\\config.json"
+
+ # profiles
+ assert records[1].profile_id == "1759841467737"
+ assert records[1].display_name == "openvpn@192.168.173.130 [profile-userlocked]"
+ assert records[1].host == "192.168.173.130"
+ assert records[1].file_path == "C:\\Users\\test\\Downloads\\profile-userlocked.ovpn"
+ assert records[1].last_connected == datetime(2025, 10, 7, 12, 51, 16, tzinfo=timezone.utc)
+ assert records[1].saved_password == "False"
+ assert records[1].private_key_password == "False"
+ assert records[1].source == "C:\\Users\\John\\AppData\\Roaming\\OpenVPN Connect\\config.json"
+
+ assert records[2].profile_id == "1760949762439"
+ assert records[2].display_name == "192.168.173.130 [openvpn-client]"
+ assert records[2].host == "192.168.173.130"
+ assert records[2].file_path == "C:\\Users\\test\\Downloads\\openvpn-client.ovpn"
+ assert records[2].last_connected == datetime(2025, 10, 20, 14, 34, 26, tzinfo=timezone.utc)
+ assert records[2].saved_password == "False"
+ assert records[2].private_key_password == "False"
+ assert records[2].source == "C:\\Users\\John\\AppData\\Roaming\\OpenVPN Connect\\config.json"
diff --git a/tests/plugins/apps/vpn/openvpn/test_regression.py b/tests/plugins/apps/vpn/openvpn/test_regression.py
new file mode 100644
index 0000000000..6abf98b2d5
--- /dev/null
+++ b/tests/plugins/apps/vpn/openvpn/test_regression.py
@@ -0,0 +1,123 @@
+"""Tests from previous OpenVPNPlugin implementation to ensure backwards compatibility."""
+
+from __future__ import annotations
+
+import io
+from typing import TYPE_CHECKING
+
+import pytest
+
+from dissect.target.plugins.apps.vpn.openvpn.client import OpenVPNClientPlugin
+from dissect.target.plugins.apps.vpn.openvpn.server import OpenVPNServerPlugin
+from dissect.target.plugins.apps.vpn.openvpn.util import OpenVPNParser
+from tests._utils import absolute_path
+
+if TYPE_CHECKING:
+ from dissect.target.filesystem import Filesystem
+ from dissect.target.target import Target
+
+
+@pytest.mark.parametrize(
+ ("target", "fs", "map_path"),
+ [
+ (
+ "target_win_users",
+ "fs_win",
+ "Program Files/OpenVPN/config/helper",
+ ),
+ (
+ "target_win_users",
+ "fs_win",
+ "Users/John/OpenVPN/config/helper",
+ ),
+ ],
+)
+def test_config_client(target: str, fs: str, map_path: str, request: pytest.FixtureRequest) -> None:
+ """Test if we can parse OpenVPN client connection profile files as before."""
+
+ target: Target = request.getfixturevalue(target)
+ fs: Filesystem = request.getfixturevalue(fs)
+
+ fs.map_file(f"{map_path}/client.ovpn", absolute_path("_data/plugins/apps/vpn/openvpn/regression/client.conf"))
+ target.add_plugin(OpenVPNClientPlugin)
+
+ record = next(target.openvpn.client.profiles())
+ assert record.remote == [
+ "my-server-1 1194",
+ "my-server-2 1194",
+ "my-server-3 1195",
+ "my-server-4 11912",
+ ]
+ assert record.proto == "udp"
+ assert record.dev == "tun"
+ assert record.ca == "ca.crt"
+ assert record.cert == "client.crt"
+ assert record.key == "client.key"
+ assert record.tls_auth == "ta.key"
+ assert record.status is None
+ assert record.log is None
+
+
+@pytest.mark.parametrize(
+ ("target", "fs", "map_path"),
+ [
+ (
+ "target_unix_users",
+ "fs_unix",
+ "etc/openvpn",
+ ),
+ ],
+)
+def test_config_server(target: str, fs: str, map_path: str, request: pytest.FixtureRequest) -> None:
+ """Test if we can parse OpenVPN server configuration files as before."""
+
+ target: Target = request.getfixturevalue(target)
+ fs: Filesystem = request.getfixturevalue(fs)
+
+ fs.map_file(f"{map_path}/server.conf", absolute_path("_data/plugins/apps/vpn/openvpn/regression/server.conf"))
+ target.add_plugin(OpenVPNServerPlugin)
+
+ record = next(target.openvpn.server.config())
+
+ assert record.local == "0.0.0.0"
+ assert record.port == 1194
+ assert record.topology is None
+ assert record.server == "10.8.0.0 255.255.255.0"
+ assert record.ifconfig_pool_persist == "/var/log/openvpn/ipp.txt"
+ assert record.pushed_options == [
+ "route 192.168.10.0 255.255.255.0",
+ "route 192.168.20.0 255.255.255.0",
+ "route 192.168.30.0 255.255.255.0",
+ ]
+ assert record.client_to_client.value is False
+ assert record.duplicate_cn.value is False
+ assert record.proto == "udp"
+ assert record.dev == "tun"
+ assert "BEGIN CERTIFICATE" in record.ca
+ assert "BEGIN CERTIFICATE" in record.cert
+ assert record.key is None
+ assert record.tls_auth == "/etc/a ta.key"
+ assert record.status == "/var/log/openvpn/openvpn-status.log"
+ assert record.log is None
+
+
+@pytest.mark.parametrize(
+ ("data_string", "expected_data"),
+ [
+ (
+ "\nroute data\n\n",
+ {"connection": {"list_item0": {"route": "data"}}},
+ ),
+ (
+ "\n----- BEGIN PRIVATE DATA -----\n",
+ {
+ "ca": "----- BEGIN PRIVATE DATA -----\n",
+ },
+ ),
+ ],
+)
+def test_parser(data_string: str, expected_data: dict | list) -> None:
+ parser = OpenVPNParser()
+ parser.parse_file(io.StringIO(data_string))
+
+ assert parser.parsed_data == expected_data
diff --git a/tests/plugins/apps/vpn/openvpn/test_server.py b/tests/plugins/apps/vpn/openvpn/test_server.py
new file mode 100644
index 0000000000..80583e46bc
--- /dev/null
+++ b/tests/plugins/apps/vpn/openvpn/test_server.py
@@ -0,0 +1,231 @@
+from __future__ import annotations
+
+from datetime import datetime, timezone
+from typing import TYPE_CHECKING
+
+from dissect.target.plugins.apps.vpn.openvpn.openvpn import OpenVPNPlugin
+from dissect.target.plugins.apps.vpn.openvpn.server import OpenVPNServerPlugin
+from tests._utils import absolute_path
+
+if TYPE_CHECKING:
+ from dissect.target.filesystem import VirtualFilesystem
+ from dissect.target.target import Target
+
+
+def test_logs(target_unix: Target, fs_unix: VirtualFilesystem) -> None:
+ """Test if we can parse regular OpenVPN server log files."""
+
+ fs_unix.map_file("var/log/openvpn.log", absolute_path("_data/plugins/apps/vpn/openvpn/server/openvpn.log"))
+ target_unix.add_plugin(OpenVPNServerPlugin)
+ records = list(target_unix.openvpn.server.logs())
+
+ assert records[0].ts == datetime(2025, 10, 21, 00, 34, 24, tzinfo=timezone.utc)
+ assert (
+ records[0].message
+ == "OpenVPN 2.6.14 x86_64-pc-linux-gnu [SSL (OpenSSL)] [LZO] [LZ4] [EPOLL] [PKCS11] [MH/PKTINFO] [AEAD] [DCO]"
+ )
+ assert records[0].source == "/var/log/openvpn.log"
+
+ assert records[1].ts == datetime(2025, 10, 21, 00, 34, 25, tzinfo=timezone.utc)
+ assert records[1].message == "Initialization Sequence Completed"
+ assert records[1].source == "/var/log/openvpn.log"
+
+ assert records[2].ts == datetime(2025, 10, 21, 00, 34, 25, tzinfo=timezone.utc)
+ assert (
+ records[2].message
+ == "192.168.173.129:64259 [openvpn-client] Peer Connection Initiated with [AF_INET]192.168.173.129:64259"
+ )
+ assert records[2].source == "/var/log/openvpn.log"
+
+
+def test_config(target_unix: Target, fs_unix: VirtualFilesystem) -> None:
+ """Test if we can parse OpenVPN server configuration files."""
+
+ fs_unix.map_file("etc/openvpn/server.conf", absolute_path("_data/plugins/apps/vpn/openvpn/server/server.conf"))
+ target_unix.add_plugin(OpenVPNServerPlugin)
+ records = list(target_unix.openvpn.server.config(export_key=True))
+
+ assert records[0].ts
+ assert records[0].port == 1194
+ assert records[0].proto == "udp"
+ assert records[0].dev == "tun"
+ assert records[0].ca == (
+ "-----BEGIN CERTIFICATE-----\n"
+ "MIIBeTCB/6ADAgECAgRo5QC9MAoGCCqGSM49BAMCMBUxEzARBgNVBAMMCk9wZW5W\n"
+ "UE4gQ0EwHhcNMjUxMDA2MjE1OTU3WhcNMzUxMDA1MjE1OTU3WjAVMRMwEQYDVQQD\n"
+ "DApPcGVuVlBOIENBMHYwEAYHKoZIzj0CAQYFK4EEACIDYgAEH6+NsCoi7mllD7hV\n"
+ "egK88M/hy0kmQD7Jgh5IsO7IX5KIMTb679ltfBZwWJLJ6hMcs4hG/OgP77RrH+Bn\n"
+ "73z7giio9XcDe7iqHB5YrW4I7aIiCfULIfGLSzvOWG8vG9L/oyAwHjAPBgNVHRMB\n"
+ "Af8EBTADAQH/MAsGA1UdDwQEAwIBBjAKBggqhkjOPQQDAgNpADBmAjEApgTC7OSs\n"
+ "IlDx9GKY05zsSwh0AGPsWdL9QL3eZx1kQQZYJmPGAgKSV2GTI+YH2c/bAjEAw1qD\n"
+ "09jj90+YuLMmLyDl+Z+UBHra619/byZK8GddkhJnfOVnAE3F17OKg9ZGFrVq\n"
+ "-----END CERTIFICATE-----\n"
+ )
+ assert records[0].cert == (
+ "-----BEGIN CERTIFICATE-----\n"
+ "MIIBnjCCASSgAwIBAgIHVzbVWySnYDAKBggqhkjOPQQDAjAVMRMwEQYDVQQDDApP\n"
+ "cGVuVlBOIENBMB4XDTI1MTAwNjIyNDMzNloXDTM1MTAwNTIyNDMzNlowEjEQMA4G\n"
+ "A1UEAwwHb3BlbnZwbjB2MBAGByqGSM49AgEGBSuBBAAiA2IABL5XINW+VxgT86u6\n"
+ "cFWDTUik4+2XNyfLxpvMQwvShcJC5M3ns5Up45cQHHGIAiAZH7Mr2fA6oj5ub8s5\n"
+ "F4LKDujUzjp3b4yokP6Sw/34/A+sb3Asjqx7z54lhW2vH8vl1qNFMEMwDAYDVR0T\n"
+ "AQH/BAIwADALBgNVHQ8EBAMCB4AwEwYDVR0lBAwwCgYIKwYBBQUHAwIwEQYJYIZI\n"
+ "AYb4QgEBBAQDAgeAMAoGCCqGSM49BAMCA2gAMGUCMQCr/b8W9ydlmS9h8Exv1UR9\n"
+ "Ae0jSNSjzSQ06p8uITDxkzCQ+sIiYRi0bi8UZLgG8zoCMFD/Or9tgP3bhdfFsruM\n"
+ "95DAhxXiA5cfLpDXc9OpDSnyFFvjKpcyZ7Zwqh+zvxEcXA==\n"
+ "-----END CERTIFICATE-----\n"
+ )
+ assert records[0].key == (
+ "-----BEGIN PRIVATE KEY-----\n"
+ "MIG2AgEAMBAGByqGSM49AgEGBSuBBAAiBIGeMIGbAgEBBDDtsxwuIQmB8gaB+SvU\n"
+ "XAidYBbRT3J6nbqpnY899nzbrbdLhDN9XvngHjohEyyv8+ehZANiAAS+VyDVvlcY\n"
+ "E/OrunBVg01IpOPtlzcny8abzEML0oXCQuTN57OVKeOXEBxxiAIgGR+zK9nwOqI+\n"
+ "bm/LOReCyg7o1M46d2+MqJD+ksP9+PwPrG9wLI6se8+eJYVtrx/L5dY=\n"
+ "-----END PRIVATE KEY-----\n"
+ )
+ assert records[0].dh == "dh2048.pem"
+ assert records[0].server == "10.8.0.0 255.255.255.0"
+ assert records[0].ifconfig_pool_persist == "/var/log/openvpn/ipp.txt"
+ assert records[0].pushed_options == [
+ "route 192.168.10.0 255.255.255.0",
+ "route 192.168.20.0 255.255.255.0",
+ "route 192.168.30.0 255.255.255.0",
+ ]
+ assert not records[0].client_to_client
+ assert not records[0].duplicate_cn
+ assert records[0].status == "/var/log/openvpn/openvpn-status.log"
+ assert records[0].verb == "3"
+ assert records[0].tls_auth == "/etc/a ta.key"
+ assert records[0].source == "/etc/openvpn/server.conf"
+
+ assert records[1].ts == datetime(2025, 10, 6, 21, 59, 57, tzinfo=timezone.utc)
+ assert records[1].fingerprint.sha1 == "daf2bf53ba3a3b9080115ec39111f71f40cd8484"
+ assert records[1].serial_number == 1759838397
+ assert records[1].serial_number_hex == "68e500bd"
+ assert records[1].not_valid_before == datetime(2025, 10, 6, 21, 59, 57, tzinfo=timezone.utc)
+ assert records[1].not_valid_after == datetime(2035, 10, 5, 21, 59, 57, tzinfo=timezone.utc)
+ assert records[1].issuer_dn == "CN=OpenVPN CA"
+ assert records[1].subject_dn == "CN=OpenVPN CA"
+ assert records[1].pem
+ assert records[1].source == "/etc/openvpn/server.conf"
+
+ assert records[2].fingerprint.md5 == "21f078a3b8443ce1521ae1e9daf92ead"
+ assert records[2].fingerprint.sha1 == "d49e69953e4743b64dc17779c5ac909ee0da20cf"
+ assert records[2].fingerprint.sha256 == "4100214673511d2ff2593a87865beaaa8d3a551e937c8564b97494c7c404934b"
+ assert records[2].hostname == "localhost"
+ assert records[2].issuer_dn == "CN=OpenVPN CA"
+ assert records[2].not_valid_before == datetime(2025, 10, 6, 22, 43, 36, tzinfo=timezone.utc)
+ assert records[2].not_valid_after == datetime(2035, 10, 5, 22, 43, 36, tzinfo=timezone.utc)
+ assert records[2].subject_dn == "CN=openvpn"
+ assert records[2].pem
+ assert records[2].source == "/etc/openvpn/server.conf"
+
+
+def test_users(target_unix: Target, fs_unix: VirtualFilesystem) -> None:
+ """Test if we can parse OpenVPN AS Server configured users."""
+
+ fs_unix.map_file(
+ "usr/local/openvpn_as/etc/db/userprop.db", absolute_path("_data/plugins/apps/vpn/openvpn/server/userprop.db")
+ )
+
+ target_unix.add_plugin(OpenVPNServerPlugin)
+ records = list(target_unix.openvpn.server.users())
+
+ assert records[0].user_id == "1"
+ assert records[0].user_name == "__DEFAULT__"
+ assert records[0].user_type == "user_default"
+ assert not records[0].is_superuser
+ assert records[0].password_digest is None
+ assert records[0].user_auth_type is None
+ assert records[0].source == "/usr/local/openvpn_as/etc/db/userprop.db"
+
+ assert records[1].user_id == "2"
+ assert records[1].user_name == "openvpn"
+ assert records[1].user_type == "user_compile"
+ assert records[1].is_superuser
+ assert records[1].password_digest == "$P$+Azokv3BqzV/ogvLyP0qZA==$y5iPl61UtrrFs9xl2yUZkC2nxp4Qe68LsY2iZAROFQI="
+ assert records[1].user_auth_type == "local"
+ assert records[1].source == "/usr/local/openvpn_as/etc/db/userprop.db"
+
+
+def test_openvpn_server_history_connection(target_unix: Target, fs_unix: VirtualFilesystem) -> None:
+ openvpn_log_file = absolute_path("_data/plugins/apps/vpn/openvpn/server/openvpnas.node.log")
+ openvpn_log_db_file = absolute_path("_data/plugins/apps/vpn/openvpn/server/log.db")
+
+ fs_unix.map_file("var/log/openvpnas.node.log", openvpn_log_file)
+ fs_unix.map_file("usr/local/openvpn_as/etc/db/log.db", openvpn_log_db_file)
+
+ target_unix.add_plugin(OpenVPNServerPlugin)
+
+ connection_records = list(target_unix.openvpn.server.connections())
+
+ # Log file
+ connection_record = connection_records[0]
+ assert connection_record.ts == datetime(2025, 10, 9, 8, 29, 19, tzinfo=timezone.utc)
+ assert connection_record.client_id == "2"
+ assert connection_record.client_ip == "192.168.173.131"
+ assert connection_record.client_port == 34193
+ assert connection_record.client_proto == "8094"
+ assert connection_record.client_version == "v3.11.5"
+ assert connection_record.client_platform == "linux"
+ assert connection_record.client_plat_rel is None
+ assert connection_record.client_gui_ver == "OpenVPN3/Linux/v26"
+ assert connection_record.client_ciphers == "AES-128-GCM:AES-192-GCM:AES-256-GCM:CHACHA20-POLY1305"
+ assert connection_record.client_ssl == "OpenSSL_3.0.13_30_Jan_2024"
+ assert connection_record.client_hwaddr == "e3fd6191e473c963af1810df1316b58b45a5918a6409b9e57561735d71d519dc"
+ assert connection_record.source == "/var/log/openvpnas.node.log"
+
+ # Log database
+ db_record = connection_records[1]
+ assert db_record.ts == datetime(2025, 10, 7, 13, 50, 0, 0, tzinfo=timezone.utc)
+ assert db_record.client_id == "test-VM"
+ assert db_record.client_username == "openvpn"
+ assert db_record.client_ip == "192.168.173.129"
+ assert db_record.client_port == 1194
+ assert db_record.client_vpn_ip == ["172.27.232.2"]
+ assert db_record.client_proto == "UDP"
+ assert db_record.client_version == "3.11.3"
+ assert db_record.client_gui_ver == "OCWindows_3.8.0-4528"
+ assert db_record.client_platform == "win"
+ assert db_record.client_conn_duration == 3526
+ assert db_record.source == "/usr/local/openvpn_as/etc/db/log.db"
+
+ db_record = connection_records[2]
+ assert db_record.ts == datetime(2025, 10, 9, 9, 41, 53, 0, tzinfo=timezone.utc)
+ assert db_record.client_id == "test-VM"
+ assert db_record.client_username == "openvpn"
+ assert db_record.client_ip == "192.168.173.131"
+ assert db_record.client_port == 1194
+ assert db_record.client_vpn_ip == ["172.27.232.2"]
+ assert db_record.client_proto == "UDP"
+ assert db_record.client_version == "v3.11.5"
+ assert db_record.client_gui_ver == "OpenVPN3/Linux/v26"
+ assert db_record.client_platform == "linux"
+ assert db_record.client_conn_duration == 3388
+ assert db_record.source == "/usr/local/openvpn_as/etc/db/log.db"
+
+
+def test_openvpn_server_live_connection(target_unix: Target, fs_unix: VirtualFilesystem) -> None:
+ openvpn_status_file = absolute_path("_data/plugins/apps/vpn/openvpn/server/status.log")
+ openvpn_log_file = absolute_path("_data/plugins/apps/vpn/openvpn/server/openvpn.log")
+ fs_unix.map_file("var/log/openvpn/status.log", openvpn_status_file)
+ fs_unix.map_file("var/log/openvpn/openvpn-status.log", openvpn_status_file)
+ fs_unix.map_file("var/log/openvpn.log", openvpn_log_file)
+
+ target_unix.add_plugin(OpenVPNPlugin)
+ target_unix.add_plugin(OpenVPNServerPlugin)
+ status_records = list(target_unix.openvpn.server.connections())
+
+ status_record = status_records[0]
+ assert status_record.client_conn_since == datetime(2025, 10, 21, 00, 34, 25, tzinfo=timezone.utc)
+ assert status_record.client_common_name == "openvpn-client"
+
+ assert status_record.client_ip == "192.168.173.129"
+ assert status_record.client_port == 64259
+ assert status_record.client_vpn_ip == ["10.8.0.2"]
+ assert status_record.client_username == "UNDEF"
+ assert status_record.client_id == "0"
+ assert status_record.peer_id == "0"
+ assert status_record.bytes_received == 16284
+ assert status_record.bytes_sent == 11160
+ assert status_record.client_ciphers == "AES-256-GCM\n"
+ assert status_record.source == "/var/log/openvpn/status.log"
diff --git a/tests/plugins/apps/vpn/test_openvpn.py b/tests/plugins/apps/vpn/test_openvpn.py
deleted file mode 100644
index 291815b700..0000000000
--- a/tests/plugins/apps/vpn/test_openvpn.py
+++ /dev/null
@@ -1,127 +0,0 @@
-from __future__ import annotations
-
-import io
-from typing import TYPE_CHECKING
-
-import pytest
-
-from dissect.target.plugins.apps.vpn.openvpn import (
- OpenVPNClient,
- OpenVPNParser,
- OpenVPNPlugin,
- OpenVPNServer,
-)
-from tests._utils import absolute_path
-
-if TYPE_CHECKING:
- from pathlib import Path
-
- from dissect.target.filesystem import Filesystem
- from dissect.target.target import Target
-
-
-def map_openvpn_configs(filesystem: Filesystem, target_dir: Path) -> None:
- client_config = absolute_path("_data/plugins/apps/vpn/openvpn/client.conf")
- server_config = absolute_path("_data/plugins/apps/vpn/openvpn/server.conf")
- filesystem.map_file(str(target_dir.joinpath("server.conf")), server_config)
- filesystem.map_file(str(target_dir.joinpath("server.ovpn")), server_config)
- filesystem.map_file(str(target_dir.joinpath("client.conf")), client_config)
- filesystem.map_file(str(target_dir.joinpath("client.ovpn")), client_config)
-
-
-@pytest.mark.parametrize(
- ("target", "fs", "map_path"),
- [
- (
- "target_win_users",
- "fs_win",
- "Program Files/OpenVPN/config/helper",
- ),
- (
- "target_win_users",
- "fs_win",
- "Users/John/OpenVPN/config/helper",
- ),
- (
- "target_unix_users",
- "fs_unix",
- "etc/openvpn",
- ),
- ],
-)
-def test_openvpn_plugin(target: str, fs: str, map_path: str, request: pytest.FixtureRequest) -> None:
- target: Target = request.getfixturevalue(target)
- fs: Filesystem = request.getfixturevalue(fs)
- map_openvpn_configs(fs, fs.path(map_path))
- target.add_plugin(OpenVPNPlugin)
- records = list(target.openvpn.config())
- _verify_records(records)
-
-
-def _verify_records(records: list[OpenVPNClient | OpenVPNServer]) -> None:
- assert len(records) == 4
-
- for record in records:
- if record.name == "server":
- assert record.name == "server"
- assert record.local.val.compressed == "0.0.0.0"
- assert record.port == 1194
- assert record.topology is None
- assert record.server == "10.8.0.0 255.255.255.0"
- assert record.ifconfig_pool_persist == "/var/log/openvpn/ipp.txt"
- assert record.pushed_options == [
- "route 192.168.10.0 255.255.255.0",
- "route 192.168.20.0 255.255.255.0",
- "route 192.168.30.0 255.255.255.0",
- ]
- assert record.client_to_client.value is False
- assert record.duplicate_cn.value is False
- assert record.proto == "udp"
- assert record.dev == "tun"
- assert "BEGIN CERTIFICATE" in record.ca
- assert "BEGIN CERTIFICATE" in record.cert
- assert record.key is None
- assert record.redacted_key
- assert record.tls_auth == "/etc/a ta.key"
- assert record.status == "/var/log/openvpn/openvpn-status.log"
- assert record.log is None
- else:
- # Client
- assert record.remote == [
- "my-server-1 1194",
- "my-server-2 1194",
- "my-server-3 1195",
- "my-server-4 11912",
- ]
- assert record.name == "client"
- assert record.proto == "udp"
- assert record.dev == "tun"
- assert record.ca == "ca.crt"
- assert record.cert == "client.crt"
- assert record.key == "client.key"
- assert not record.redacted_key
- assert record.tls_auth == "ta.key"
- assert record.status is None
- assert record.log is None
-
-
-@pytest.mark.parametrize(
- ("data_string", "expected_data"),
- [
- (
- "\nroute data\n\n",
- {"connection": {"list_item0": {"route": "data"}}},
- ),
- (
- "\n----- BEGIN PRIVATE DATA -----\n",
- {
- "ca": "----- BEGIN PRIVATE DATA -----\n",
- },
- ),
- ],
-)
-def test_openvpn_config(data_string: str, expected_data: dict | list) -> None:
- parser = OpenVPNParser()
- parser.parse_file(io.StringIO(data_string))
-
- assert parser.parsed_data == expected_data