diff --git a/flow/record/fieldtypes/__init__.py b/flow/record/fieldtypes/__init__.py index 5aa97ad5..80cc89af 100644 --- a/flow/record/fieldtypes/__init__.py +++ b/flow/record/fieldtypes/__init__.py @@ -752,41 +752,39 @@ def __repr__(self) -> str: class command(FieldType): - executable: path | None = None - args: list[str] | None = None + """The command fieldtype splits a command string into an ``executable`` and its arguments. - _path_type: type[path] = None - _posix: bool + Args: + value: the string that contains the command and arguments + path_type: When specified it forces the command to use a specific path type - def __new__(cls, value: str): - if cls is not command: - return super().__new__(cls) + Example: - if not isinstance(value, str): - raise TypeError(f"Expected a value of type 'str' not {type(value)}") + .. code-block:: text + + 'c:\\windows\\malware.exe /info' -> windows_path('c:\\windows\\malware.exe) ['/info'] + '/usr/bin/env bash' -> posix_path('/usr/bin/env') ['bash'] - # pre checking for windows like paths - # This checks for windows like starts of a path: - # an '%' for an environment variable - # r'\\' for a UNC path - # the strip and check for ":" on the second line is for `:` - stripped_value = value.lstrip("\"'") - windows = value.startswith((r"\\", "%")) or (len(stripped_value) >= 2 and stripped_value[1] == ":") + # In this situation, the executable path needs to be quoted. + 'c:\\user\\John Doe\\malware.exe /all /the /things' -> windows_path('c:\\user\\John') + ['Doe\\malware.exe /all /the /things'] + """ - cls = windows_command if windows else posix_command - return super().__new__(cls) + __executable: path + __args: tuple[str, ...] - def __init__(self, value: str | tuple[str, tuple[str]] | None): - if value is None: - return + __path_type: type[path] - if isinstance(value, str): - self.executable, self.args = self._split(value) - return + def __init__(self, value: str = "", *, path_type: type[path] | None = None): + if not isinstance(value, str): + raise TypeError(f"Expected a value of type 'str' not {type(value)}") + + raw = value.strip() - executable, self.args = value - self.executable = self._path_type(executable) - self.args = list(self.args) + # Detect the kind of path from value if not specified + self.__path_type = path_type or type(path(raw.lstrip("\"'"))) + + self.executable, self.args = self._split(raw) def __repr__(self) -> str: return f"(executable={self.executable!r}, args={self.args})" @@ -795,66 +793,77 @@ def __eq__(self, other: object) -> bool: if isinstance(other, command): return self.executable == other.executable and self.args == other.args if isinstance(other, str): - return self._join() == other + return self.raw == other if isinstance(other, (tuple, list)): - return self.executable == other[0] and self.args == list(other[1:]) + return self.executable == other[0] and self.args == (*other[1:],) return False - def _split(self, value: str) -> tuple[str, list[str]]: - executable, *args = shlex.split(value, posix=self._posix) - executable = executable.strip("'\" ") - - return self._path_type(executable), args + def _split(self, value: str) -> tuple[str, tuple[str, ...]]: + if not value: + return "", () - def _join(self) -> str: - return shlex.join([str(self.executable), *self.args]) + executable, *args = shlex.split(value, posix=self.__path_type is posix_path) + return executable.strip("'\" "), (*args,) - def _pack(self) -> tuple[tuple[str, list], str]: - command_type = TYPE_WINDOWS if isinstance(self, windows_command) else TYPE_POSIX - if self.executable: - _exec, _ = self.executable._pack() - return ((_exec, self.args), command_type) - return (None, command_type) - - @classmethod - def _unpack(cls, data: tuple[tuple[str, tuple] | None, int]) -> command: - _value, _type = data - if _type == TYPE_WINDOWS: - return windows_command(_value) - - return posix_command(_value) + def _pack(self) -> tuple[str, int]: + path_type = TYPE_WINDOWS if self.__path_type is windows_path else TYPE_POSIX + return self.raw, path_type @classmethod - def from_posix(cls, value: str) -> command: - return posix_command(value) + def _unpack(cls, data: tuple[str, int]) -> command: + raw_str, path_type = data + if path_type == TYPE_POSIX: + return command(raw_str, path_type=posix_path) + if path_type == TYPE_WINDOWS: + return command(raw_str, path_type=windows_path) + # default, infer type of path from str + return command(raw_str) - @classmethod - def from_windows(cls, value: str) -> command: - return windows_command(value) + @property + def executable(self) -> path: + return self.__executable + @property + def args(self) -> tuple[str, ...]: + return self.__args -class posix_command(command): - _posix = True - _path_type = posix_path + @executable.setter + def executable(self, val: str | path | None) -> None: + self.__executable = self.__path_type(val) + @args.setter + def args(self, val: str | tuple[str, ...] | list[str] | None) -> None: + if val is None: + self.__args = () + return -class windows_command(command): - _posix = False - _path_type = windows_path + if isinstance(val, str): + self.__args = tuple(shlex.split(val, posix=self.__path_type is posix_path)) + elif isinstance(val, list): + self.__args = tuple(val) + else: + self.__args = val - def _split(self, value: str) -> tuple[str, list[str]]: - executable, args = super()._split(value) - if args: - args = [" ".join(args)] + @property + def raw(self) -> str: + exe = str(self.executable) - return executable, args + if " " in exe: + exe = shlex.quote(exe) - def _join(self) -> str: - arg = f" {self.args[0]}" if self.args else "" - executable_str = str(self.executable) + result = [exe] + # Only quote on posix paths as shlex doesn't remove the quotes on non posix paths + if self.__path_type is posix_path: + result.extend(shlex.quote(part) if " " in part else part for part in self.args) + else: + result.extend(self.args) + return " ".join(result) - if " " in executable_str: - return f"'{executable_str}'{arg}" + @classmethod + def from_posix(cls, value: str) -> command: + return command(value, path_type=posix_path) - return f"{executable_str}{arg}" + @classmethod + def from_windows(cls, value: str) -> command: + return command(value, path_type=windows_path) diff --git a/flow/record/jsonpacker.py b/flow/record/jsonpacker.py index 6f1d85c6..344b5538 100644 --- a/flow/record/jsonpacker.py +++ b/flow/record/jsonpacker.py @@ -75,10 +75,7 @@ def pack_obj(self, obj: Any) -> dict | str: if isinstance(obj, fieldtypes.path): return str(obj) if isinstance(obj, fieldtypes.command): - return { - "executable": obj.executable, - "args": obj.args, - } + return obj.raw raise TypeError(f"Unpackable type {type(obj)}") diff --git a/tests/adapter/test_json.py b/tests/adapter/test_json.py index 1e741d8a..074cfd51 100644 --- a/tests/adapter/test_json.py +++ b/tests/adapter/test_json.py @@ -6,6 +6,7 @@ import pytest from flow.record import RecordReader, RecordWriter +from flow.record.base import RecordDescriptor from tests._utils import generate_records if TYPE_CHECKING: @@ -117,3 +118,40 @@ def test_json_adapter_with_record_descriptors(tmp_path: Path, record_adapter_pat elif record["_type"] == "record": assert "_recorddescriptor" in record assert descriptor_seen == 2 + + +def test_json_command_fieldtype(tmp_path: Path) -> None: + json_file = tmp_path.joinpath("records.json") + record_adapter_path = f"jsonfile://{json_file}" + writer = RecordWriter(record_adapter_path) + + TestRecord = RecordDescriptor( + "test/command", + [ + ("command", "commando"), + ], + ) + + writer.write( + TestRecord( + commando="C:\\help.exe data", + ) + ) + writer.write( + TestRecord( + commando="/usr/bin/env bash", + ) + ) + writer.write(TestRecord()) + writer.flush() + + reader = RecordReader(record_adapter_path) + records = list(reader) + + assert records[0].commando.executable == "C:\\help.exe" + assert records[0].commando.args == ("data",) + + assert records[1].commando.executable == "/usr/bin/env" + assert records[1].commando.args == ("bash",) + + assert len(records) == 3 diff --git a/tests/adapter/test_xlsx.py b/tests/adapter/test_xlsx.py index 558b7234..d495b714 100644 --- a/tests/adapter/test_xlsx.py +++ b/tests/adapter/test_xlsx.py @@ -39,9 +39,9 @@ def test_sanitize_field_values(mock_openpyxl_package: MagicMock) -> None: fieldtypes.net.ipaddress("13.37.13.37"), ["Shaken", "Not", "Stirred"], fieldtypes.posix_path("/home/user"), - fieldtypes.posix_command("/bin/bash -c 'echo hello world'"), + fieldtypes.command.from_posix("/bin/bash -c 'echo hello world'"), fieldtypes.windows_path("C:\\Users\\user\\Desktop"), - fieldtypes.windows_command("C:\\Some.exe /?"), + fieldtypes.command.from_windows("C:\\Some.exe /?"), ] ) ) == [ diff --git a/tests/fieldtypes/test_fieldtypes.py b/tests/fieldtypes/test_fieldtypes.py index 65e2b1fd..46e9a392 100644 --- a/tests/fieldtypes/test_fieldtypes.py +++ b/tests/fieldtypes/test_fieldtypes.py @@ -22,10 +22,8 @@ command, fieldtype_for_value, net, - posix_command, posix_path, uri, - windows_command, windows_path, ) from flow.record.fieldtypes import datetime as dt @@ -1033,6 +1031,20 @@ def test_datetime_comparisions() -> None: assert dt("2023-01-02") != datetime(2023, 3, 4, tzinfo=UTC) +def test_empty_command() -> None: + command = fieldtypes.command() + assert command.executable == "" + assert command.args == () + + command = fieldtypes.command("") + assert command.executable == "" + assert command.args == () + + command = fieldtypes.command(" ") + assert command.executable == "" + assert command.args == () + + def test_command_record() -> None: TestRecord = RecordDescriptor( "test/command", @@ -1041,15 +1053,19 @@ def test_command_record() -> None: ], ) + # path defaults to type depending on the os it runs on, so we emulate this here + _type = windows_path if os.name == "nt" else posix_path + record = TestRecord(commando="help.exe -h") - assert isinstance(record.commando, posix_command) + assert isinstance(record.commando.executable, _type) assert record.commando.executable == "help.exe" - assert record.commando.args == ["-h"] + assert record.commando.args == ("-h",) record = TestRecord(commando="something.so -h -q -something") - assert isinstance(record.commando, posix_command) + args = ("-h", "-q", "-something") + assert isinstance(record.commando.executable, _type) assert record.commando.executable == "something.so" - assert record.commando.args == ["-h", "-q", "-something"] + assert record.commando.args == args def test_command_integration(tmp_path: pathlib.Path) -> None: @@ -1061,15 +1077,16 @@ def test_command_integration(tmp_path: pathlib.Path) -> None: ) with RecordWriter(tmp_path / "command_record") as writer: - record = TestRecord(commando=r"\\.\\?\some_command.exe -h,help /d quiet") + record = TestRecord(commando=r"\.\?\some_command.exe -h,help /d quiet") writer.write(record) - assert record.commando.executable == r"\\.\\?\some_command.exe" - assert record.commando.args == [r"-h,help /d quiet"] + assert record.commando.executable == r"\.\?\some_command.exe" + assert record.commando.args == (r"-h,help", "/d", "quiet") with RecordReader(tmp_path / "command_record") as reader: for record in reader: - assert record.commando.executable == r"\\.\\?\some_command.exe" - assert record.commando.args == [r"-h,help /d quiet"] + assert record.commando.executable == r"\.\?\some_command.exe" + assert record.commando.args == (r"-h,help", "/d", "quiet") + assert record.commando.raw == r"\?\some_command.exe -h,help /d quiet" def test_command_integration_none(tmp_path: pathlib.Path) -> None: @@ -1080,44 +1097,78 @@ def test_command_integration_none(tmp_path: pathlib.Path) -> None: ], ) + # None + with RecordWriter(tmp_path / "command_record") as writer: + record = TestRecord(commando=None) + writer.write(record) + with RecordReader(tmp_path / "command_record") as reader: + for record in reader: + assert record.commando is None + + # empty string + with RecordWriter(tmp_path / "command_record") as writer: + record = TestRecord(commando="") + writer.write(record) + with RecordReader(tmp_path / "command_record") as reader: + for record in reader: + assert record.commando == "" + assert record.commando.executable == "" + assert record.commando.args == () + assert record.commando.raw == "" + + +def test_integration_correct_path(tmp_path: pathlib.Path) -> None: + TestRecord = RecordDescriptor( + "test/command", + [ + ("command", "commando"), + ], + ) + with RecordWriter(tmp_path / "command_record") as writer: - record = TestRecord(commando=command.from_posix(None)) + record = TestRecord(commando=command.from_windows("hello.exe -d")) writer.write(record) with RecordReader(tmp_path / "command_record") as reader: for record in reader: - assert record.commando.executable is None - assert record.commando.args is None + assert record.commando.executable == "hello.exe" + assert isinstance(record.commando.executable, windows_path) + assert record.commando.args == ("-d",) @pytest.mark.parametrize( ("command_string", "expected_executable", "expected_argument"), [ # Test relative windows paths - ("windows.exe something,or,somethingelse", "windows.exe", ["something,or,somethingelse"]), + ("windows.exe something,or,somethingelse", "windows.exe", ("something,or,somethingelse",)), # Test weird command strings for windows - ("windows.dll something,or,somethingelse", "windows.dll", ["something,or,somethingelse"]), + ("windows.dll something,or,somethingelse", "windows.dll", ("something,or,somethingelse",)), # Test environment variables - (r"%WINDIR%\\windows.dll something,or,somethingelse", r"%WINDIR%\\windows.dll", ["something,or,somethingelse"]), - # Test a quoted path - (r"'c:\path to some exe' /d /a", r"c:\path to some exe", [r"/d /a"]), + ( + r"%WINDIR%\\windows.dll something,or,somethingelse", + r"%WINDIR%\\windows.dll", + ("something,or,somethingelse",), + ), + # Test a single quoted path + (r"'c:\path to some exe' /d /a", r"c:\path to some exe", ("/d", "/a")), + # Test a double quoted path + (r'"c:\path to some exe" /d /a', r"c:\path to some exe", ("/d", "/a")), # Test a unquoted path - (r"\Users\test\hello.exe", r"\Users\test\hello.exe", []), + (r"\Users\test\hello.exe", r"\Users\test\hello.exe", ()), # Test an unquoted path with a path as argument - (r"\Users\test\hello.exe c:\startmepls.exe", r"\Users\test\hello.exe", [r"c:\startmepls.exe"]), + (r"\Users\test\hello.exe c:\startmepls.exe", r"\Users\test\hello.exe", (r"c:\startmepls.exe",)), # Test a quoted UNC path - (r"'\\192.168.1.2\Program Files\hello.exe'", r"\\192.168.1.2\Program Files\hello.exe", []), + (r"'\\192.168.1.2\Program Files\hello.exe'", r"\\192.168.1.2\Program Files\hello.exe", ()), # Test an unquoted UNC path - (r"\\192.168.1.2\Users\test\hello.exe /d /a", r"\\192.168.1.2\Users\test\hello.exe", [r"/d /a"]), + (r"\\192.168.1.2\Users\test\hello.exe /d /a", r"\\192.168.1.2\Users\test\hello.exe", ("/d", "/a")), # Test an empty command string - (r"''", r"", []), - # Test None - (None, None, None), + (r"''", r"", ()), ], ) -def test_command_windows(command_string: str, expected_executable: str, expected_argument: list[str]) -> None: - cmd = windows_command(command_string) +def test_command_windows(command_string: str, expected_executable: str, expected_argument: tuple[str, ...]) -> None: + cmd = command.from_windows(command_string) assert cmd.executable == expected_executable + assert isinstance(cmd.executable, windows_path) assert cmd.args == expected_argument @@ -1125,15 +1176,21 @@ def test_command_windows(command_string: str, expected_executable: str, expected ("command_string", "expected_executable", "expected_argument"), [ # Test relative posix command - ("some_file.so -h asdsad -f asdsadas", "some_file.so", ["-h", "asdsad", "-f", "asdsadas"]), + ("some_file.so -h asdsad -f asdsadas", "some_file.so", ("-h", "asdsad", "-f", "asdsadas")), # Test command with spaces - (r"/bin/hello\ world -h -word", r"/bin/hello world", ["-h", "-word"]), + (r"/bin/hello\ world -h -word", r"/bin/hello world", ("-h", "-word")), + (r" /bin/hello\ world", r"/bin/hello world", ()), + # Test single quoted command + (r"'/tmp/ /test/hello' -h -word", r"/tmp/ /test/hello", ("-h", "-word")), + # Test double quoted command + (r'"/tmp/ /test/hello" -h -word', r"/tmp/ /test/hello", ("-h", "-word")), ], ) def test_command_posix(command_string: str, expected_executable: str, expected_argument: list[str]) -> None: - cmd = posix_command(command_string) + cmd = command.from_posix(command_string) assert cmd.executable == expected_executable + assert isinstance(cmd.executable, fieldtypes.posix_path) assert cmd.args == expected_argument @@ -1150,8 +1207,13 @@ def test_command_equal() -> None: # Compare paths that contain spaces assert command("'/home/some folder/file' -h") == "'/home/some folder/file' -h" assert command("'c:\\Program files\\some.dll' -h -q") == "'c:\\Program files\\some.dll' -h -q" - assert command("'c:\\program files\\some.dll' -h -q") == ["c:\\program files\\some.dll", "-h -q"] - assert command("'c:\\Program files\\some.dll' -h -q") == ("c:\\Program files\\some.dll", "-h -q") + assert command("'c:\\program files\\some.dll' -h -q") == ["c:\\program files\\some.dll", "-h", "-q"] + assert command("'c:\\Program files\\some.dll' -h -q") == ("c:\\Program files\\some.dll", "-h", "-q") + + assert ( + command(r"'c:\Program Files\some.dll' --command 'hello world'") + == r"'c:\Program Files\some.dll' --command 'hello world'" + ) # Test failure conditions assert command("hello.so -h") != 1 @@ -1160,6 +1222,51 @@ def test_command_equal() -> None: assert command("hello.so") != ("hello.so", "") +def test_command_assign_posix() -> None: + _command = command("/") + + assert _command.raw == "/" + + # Test whether we can assign executable + _command.executable = "/path/to/home dir/" + assert _command.raw == "'/path/to/home dir'" + + # Test whether it uses the underlying path + _command.executable = fieldtypes.windows_path("path\\to\\dir") + assert _command.raw == r"path/to/dir" + + # As it is windows, this should change to be one value + _command.args = ["command", "arguments", "for", "posix"] + assert _command.args == ("command", "arguments", "for", "posix") + assert _command.raw == r"path/to/dir command arguments for posix" + + _command.args = ("command", "-c", "command string") + assert _command.raw == "path/to/dir command -c 'command string'" + + _command.args = "command -c 'command string2'" + assert _command.args == ("command", "-c", "command string2") + assert _command.raw == "path/to/dir command -c 'command string2'" + + +def test_command_assign_windows() -> None: + _command = command("c:\\") + + assert _command.raw == "c:\\" + + _command.executable = r"c:\windows\path" + assert _command.raw == r"c:\windows\path" + + _command.executable = r"c:\windows\path to file" + assert _command.raw == r"'c:\windows\path to file'" + + _command.args = ("command", "arguments", "for", "windows") + assert _command.args == ("command", "arguments", "for", "windows") + assert _command.raw == r"'c:\windows\path to file' command arguments for windows" + + _command.args = "command arguments for windows2" + assert _command.args == ("command", "arguments", "for", "windows2") + + def test_command_failed() -> None: with pytest.raises(TypeError, match="Expected a value of type 'str'"): command(b"failed")