From add8d849d1ef756c181a9e47aed0c36872c1b07e Mon Sep 17 00:00:00 2001 From: Miauwkeru Date: Wed, 23 Jul 2025 14:31:26 +0000 Subject: [PATCH 01/18] Send raw command string instead of a dictionary --- flow/record/fieldtypes/__init__.py | 45 ++++++++++-------------------- flow/record/jsonpacker.py | 5 +--- tests/_utils.py | 13 +++++++++ tests/adapter/test_json.py | 26 ++++++++++++++--- 4 files changed, 51 insertions(+), 38 deletions(-) diff --git a/flow/record/fieldtypes/__init__.py b/flow/record/fieldtypes/__init__.py index 5aa97ad5..3be19ca7 100644 --- a/flow/record/fieldtypes/__init__.py +++ b/flow/record/fieldtypes/__init__.py @@ -755,10 +755,11 @@ class command(FieldType): executable: path | None = None args: list[str] | None = None + _raw: str | None = None _path_type: type[path] = None - _posix: bool + _posix: bool = True - def __new__(cls, value: str): + def __new__(cls, value: str | None): if cls is not command: return super().__new__(cls) @@ -776,17 +777,12 @@ def __new__(cls, value: str): cls = windows_command if windows else posix_command return super().__new__(cls) - def __init__(self, value: str | tuple[str, tuple[str]] | None): + def __init__(self, value: str | None): if value is None: return - if isinstance(value, str): - self.executable, self.args = self._split(value) - return - - executable, self.args = value - self.executable = self._path_type(executable) - self.args = list(self.args) + self._raw = value + self.executable, self.args = self._split(self._raw) def __repr__(self) -> str: return f"(executable={self.executable!r}, args={self.args})" @@ -795,30 +791,29 @@ def __eq__(self, other: object) -> bool: if isinstance(other, command): return self.executable == other.executable and self.args == other.args if isinstance(other, str): - return self._join() == other + return self._raw == other if isinstance(other, (tuple, list)): return self.executable == other[0] and self.args == list(other[1:]) return False - def _split(self, value: str) -> tuple[str, list[str]]: + def _split(self, value: str) -> tuple[path, list[str]]: executable, *args = shlex.split(value, posix=self._posix) executable = executable.strip("'\" ") return self._path_type(executable), args def _join(self) -> str: - return shlex.join([str(self.executable), *self.args]) + return self._raw - def _pack(self) -> tuple[tuple[str, list], str]: + def _pack(self) -> tuple[str, int]: command_type = TYPE_WINDOWS if isinstance(self, windows_command) else TYPE_POSIX if self.executable: - _exec, _ = self.executable._pack() - return ((_exec, self.args), command_type) + return (self._raw, command_type) return (None, command_type) @classmethod - def _unpack(cls, data: tuple[tuple[str, tuple] | None, int]) -> command: + def _unpack(cls, data: tuple[str | None, int]) -> command: _value, _type = data if _type == TYPE_WINDOWS: return windows_command(_value) @@ -843,18 +838,8 @@ class windows_command(command): _posix = False _path_type = windows_path - def _split(self, value: str) -> tuple[str, list[str]]: - executable, args = super()._split(value) - if args: - args = [" ".join(args)] + def _split(self, value: str) -> tuple[path, list[str]]: + executable, _args = super()._split(value) + args = [" ".join(_args)] if _args else _args return executable, args - - def _join(self) -> str: - arg = f" {self.args[0]}" if self.args else "" - executable_str = str(self.executable) - - if " " in executable_str: - return f"'{executable_str}'{arg}" - - return f"{executable_str}{arg}" diff --git a/flow/record/jsonpacker.py b/flow/record/jsonpacker.py index 6f1d85c6..2c772332 100644 --- a/flow/record/jsonpacker.py +++ b/flow/record/jsonpacker.py @@ -75,10 +75,7 @@ def pack_obj(self, obj: Any) -> dict | str: if isinstance(obj, fieldtypes.path): return str(obj) if isinstance(obj, fieldtypes.command): - return { - "executable": obj.executable, - "args": obj.args, - } + return obj._raw raise TypeError(f"Unpackable type {type(obj)}") diff --git a/tests/_utils.py b/tests/_utils.py index 23ce61c9..e06859fa 100644 --- a/tests/_utils.py +++ b/tests/_utils.py @@ -42,3 +42,16 @@ def generate_plain_records(count: int = 100) -> Iterator[Record]: for i in range(count): yield TestRecord(number=i, dt=datetime.datetime.now(datetime.timezone.utc)) + + +def generate_command_records(count: int = 100) -> Iterator[Record]: + TestRecord = RecordDescriptor( + "test/addapter/command", + [ + ("uint32", "number"), + ("command", "data"), + ], + ) + + for i in range(count): + yield TestRecord(number=i, data=f"/path/to/file {i}") diff --git a/tests/adapter/test_json.py b/tests/adapter/test_json.py index 1e741d8a..dd52cfbf 100644 --- a/tests/adapter/test_json.py +++ b/tests/adapter/test_json.py @@ -1,24 +1,42 @@ from __future__ import annotations import json -from typing import TYPE_CHECKING +from enum import Enum, auto +from typing import TYPE_CHECKING, Callable import pytest from flow.record import RecordReader, RecordWriter -from tests._utils import generate_records +from tests._utils import generate_command_records, generate_records if TYPE_CHECKING: + from collections.abc import Iterator from pathlib import Path + from flow.record import Record -def test_json_adapter(tmp_path: Path) -> None: + +class GeneratorType(Enum): + RECORDS = auto() + COMMAND = auto() + + +FUNCTIONS = { + GeneratorType.RECORDS: generate_records, + GeneratorType.COMMAND: generate_command_records, +} + + +@pytest.mark.parametrize("generator_function", FUNCTIONS.keys()) +def test_json_adapter(generator_function: GeneratorType, tmp_path: Path) -> None: json_file = tmp_path.joinpath("records.json") record_adapter_path = f"jsonfile://{json_file}" writer = RecordWriter(record_adapter_path) nr_records = 1337 - for record in generate_records(nr_records): + gen_func: Callable[[int], Iterator[Record]] = FUNCTIONS.get(generator_function) + + for record in gen_func(nr_records): writer.write(record) writer.flush() From 7db734577a1c927c4e71223dd21cf44b1cbdb15f Mon Sep 17 00:00:00 2001 From: Miauwkeru Date: Thu, 21 Aug 2025 07:56:53 +0000 Subject: [PATCH 02/18] Add tests for empty command types --- flow/record/fieldtypes/__init__.py | 14 +++++++++----- tests/fieldtypes/test_fieldtypes.py | 18 ++++++++++++++++++ 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/flow/record/fieldtypes/__init__.py b/flow/record/fieldtypes/__init__.py index 3be19ca7..0728e50f 100644 --- a/flow/record/fieldtypes/__init__.py +++ b/flow/record/fieldtypes/__init__.py @@ -759,8 +759,8 @@ class command(FieldType): _path_type: type[path] = None _posix: bool = True - def __new__(cls, value: str | None): - if cls is not command: + def __new__(cls, value: str | None = None): + if cls is not command or value is None: return super().__new__(cls) if not isinstance(value, str): @@ -777,7 +777,7 @@ def __new__(cls, value: str | None): cls = windows_command if windows else posix_command return super().__new__(cls) - def __init__(self, value: str | None): + def __init__(self, value: str | None = None): if value is None: return @@ -798,8 +798,12 @@ def __eq__(self, other: object) -> bool: return False def _split(self, value: str) -> tuple[path, list[str]]: - executable, *args = shlex.split(value, posix=self._posix) - executable = executable.strip("'\" ") + if value: + executable, *args = shlex.split(value, posix=self._posix) + executable = executable.strip("'\" ") + else: + executable = "" + args = [] return self._path_type(executable), args diff --git a/tests/fieldtypes/test_fieldtypes.py b/tests/fieldtypes/test_fieldtypes.py index 65e2b1fd..2de9cecc 100644 --- a/tests/fieldtypes/test_fieldtypes.py +++ b/tests/fieldtypes/test_fieldtypes.py @@ -1033,6 +1033,24 @@ def test_datetime_comparisions() -> None: assert dt("2023-01-02") != datetime(2023, 3, 4, tzinfo=UTC) +@pytest.mark.parametrize( + "command_cls", + [ + fieldtypes.posix_command, + fieldtypes.windows_command, + fieldtypes.command, + ], +) +def test_empty_command(command_cls: type[command]) -> None: + command = command_cls() + assert command.executable is None + assert command.args is None + + command = command_cls("") + assert command.executable == "" + assert command.args == [] + + def test_command_record() -> None: TestRecord = RecordDescriptor( "test/command", From e82ccedd6deb446278d4c9ade30f03a8a4e7c00d Mon Sep 17 00:00:00 2001 From: Miauwkeru Date: Thu, 21 Aug 2025 08:35:04 +0000 Subject: [PATCH 03/18] Add a test to read/write command type records from a json file Remove old test --- tests/_utils.py | 13 ------ tests/adapter/test_json.py | 66 +++++++++++++++++++---------- tests/fieldtypes/test_fieldtypes.py | 6 ++- 3 files changed, 48 insertions(+), 37 deletions(-) diff --git a/tests/_utils.py b/tests/_utils.py index e06859fa..23ce61c9 100644 --- a/tests/_utils.py +++ b/tests/_utils.py @@ -42,16 +42,3 @@ def generate_plain_records(count: int = 100) -> Iterator[Record]: for i in range(count): yield TestRecord(number=i, dt=datetime.datetime.now(datetime.timezone.utc)) - - -def generate_command_records(count: int = 100) -> Iterator[Record]: - TestRecord = RecordDescriptor( - "test/addapter/command", - [ - ("uint32", "number"), - ("command", "data"), - ], - ) - - for i in range(count): - yield TestRecord(number=i, data=f"/path/to/file {i}") diff --git a/tests/adapter/test_json.py b/tests/adapter/test_json.py index dd52cfbf..f54f0d3f 100644 --- a/tests/adapter/test_json.py +++ b/tests/adapter/test_json.py @@ -1,42 +1,25 @@ from __future__ import annotations import json -from enum import Enum, auto -from typing import TYPE_CHECKING, Callable +from typing import TYPE_CHECKING import pytest -from flow.record import RecordReader, RecordWriter -from tests._utils import generate_command_records, generate_records +from flow.record import RecordReader, RecordWriter, fieldtypes +from flow.record.base import RecordDescriptor +from tests._utils import generate_records if TYPE_CHECKING: - from collections.abc import Iterator from pathlib import Path - from flow.record import Record - -class GeneratorType(Enum): - RECORDS = auto() - COMMAND = auto() - - -FUNCTIONS = { - GeneratorType.RECORDS: generate_records, - GeneratorType.COMMAND: generate_command_records, -} - - -@pytest.mark.parametrize("generator_function", FUNCTIONS.keys()) -def test_json_adapter(generator_function: GeneratorType, tmp_path: Path) -> None: +def test_json_adapter(tmp_path: Path) -> None: json_file = tmp_path.joinpath("records.json") record_adapter_path = f"jsonfile://{json_file}" writer = RecordWriter(record_adapter_path) nr_records = 1337 - gen_func: Callable[[int], Iterator[Record]] = FUNCTIONS.get(generator_function) - - for record in gen_func(nr_records): + for record in generate_records(nr_records): writer.write(record) writer.flush() @@ -135,3 +118,40 @@ def test_json_adapter_with_record_descriptors(tmp_path: Path, record_adapter_pat elif record["_type"] == "record": assert "_recorddescriptor" in record assert descriptor_seen == 2 + + +def test_json_command_fieldtype(tmp_path: pathlib.Path) -> None: + json_file = tmp_path.joinpath("records.json") + record_adapter_path = f"jsonfile://{json_file}" + writer = RecordWriter(record_adapter_path) + + TestRecord = RecordDescriptor( + "test/command", + [ + ("command", "commando"), + ], + ) + + writer.write( + TestRecord( + commando=fieldtypes.windows_command("C:\\help.exe data"), + ) + ) + writer.write( + TestRecord( + commando=fieldtypes.posix_command("/usr/bin/env bash"), + ) + ) + writer.write(TestRecord()) + writer.flush() + + reader = RecordReader(record_adapter_path) + records = [record for record in reader] + + records[0].commando.executable == "C:\\help.exe" + records[0].commando.args == ["data"] + + records[1].commando.executable == "/usr/bin/env" + records[1].commando.args == ["bash"] + + assert len(records) == 3 diff --git a/tests/fieldtypes/test_fieldtypes.py b/tests/fieldtypes/test_fieldtypes.py index 2de9cecc..4e19434c 100644 --- a/tests/fieldtypes/test_fieldtypes.py +++ b/tests/fieldtypes/test_fieldtypes.py @@ -1115,7 +1115,11 @@ def test_command_integration_none(tmp_path: pathlib.Path) -> None: # Test weird command strings for windows ("windows.dll something,or,somethingelse", "windows.dll", ["something,or,somethingelse"]), # Test environment variables - (r"%WINDIR%\\windows.dll something,or,somethingelse", r"%WINDIR%\\windows.dll", ["something,or,somethingelse"]), + ( + r"%WINDIR%\\windows.dll something,or,somethingelse", + r"%WINDIR%\\windows.dll", + ["something,or,somethingelse"], + ), # Test a quoted path (r"'c:\path to some exe' /d /a", r"c:\path to some exe", [r"/d /a"]), # Test a unquoted path From 6af358689b7b7fc933435dec6903445de06ab5fe Mon Sep 17 00:00:00 2001 From: Miauwkeru Date: Thu, 21 Aug 2025 08:48:12 +0000 Subject: [PATCH 04/18] Fix linting issues --- tests/adapter/test_json.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tests/adapter/test_json.py b/tests/adapter/test_json.py index f54f0d3f..8d6d8b38 100644 --- a/tests/adapter/test_json.py +++ b/tests/adapter/test_json.py @@ -1,6 +1,7 @@ from __future__ import annotations import json +import pathlib from typing import TYPE_CHECKING import pytest @@ -146,12 +147,12 @@ def test_json_command_fieldtype(tmp_path: pathlib.Path) -> None: writer.flush() reader = RecordReader(record_adapter_path) - records = [record for record in reader] + records = list(reader) - records[0].commando.executable == "C:\\help.exe" - records[0].commando.args == ["data"] + assert records[0].commando.executable == "C:\\help.exe" + assert records[0].commando.args == ["data"] - records[1].commando.executable == "/usr/bin/env" - records[1].commando.args == ["bash"] + assert records[1].commando.executable == "/usr/bin/env" + assert records[1].commando.args == ["bash"] assert len(records) == 3 From 82ea9969559ec93727122b29b9219e57335ca4b7 Mon Sep 17 00:00:00 2001 From: Miauwkeru Date: Thu, 21 Aug 2025 08:51:36 +0000 Subject: [PATCH 05/18] Fix linting properly --- tests/adapter/test_json.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/adapter/test_json.py b/tests/adapter/test_json.py index 8d6d8b38..5d81427d 100644 --- a/tests/adapter/test_json.py +++ b/tests/adapter/test_json.py @@ -1,7 +1,6 @@ from __future__ import annotations import json -import pathlib from typing import TYPE_CHECKING import pytest @@ -121,7 +120,7 @@ def test_json_adapter_with_record_descriptors(tmp_path: Path, record_adapter_pat assert descriptor_seen == 2 -def test_json_command_fieldtype(tmp_path: pathlib.Path) -> None: +def test_json_command_fieldtype(tmp_path: Path) -> None: json_file = tmp_path.joinpath("records.json") record_adapter_path = f"jsonfile://{json_file}" writer = RecordWriter(record_adapter_path) From 0c1d393691169dfc925f0e7a258b90eab7faae8a Mon Sep 17 00:00:00 2001 From: Miauwkeru Date: Thu, 21 Aug 2025 14:37:36 +0000 Subject: [PATCH 06/18] Unify posix_command and windows_command into one command class kept the from_windows and from_posix classmethods to enforce the path type --- flow/record/fieldtypes/__init__.py | 85 +++++++++++------------------ tests/adapter/test_json.py | 6 +- tests/adapter/test_xlsx.py | 4 +- tests/fieldtypes/test_fieldtypes.py | 59 ++++++++++++-------- 4 files changed, 73 insertions(+), 81 deletions(-) diff --git a/flow/record/fieldtypes/__init__.py b/flow/record/fieldtypes/__init__.py index 0728e50f..28349ee4 100644 --- a/flow/record/fieldtypes/__init__.py +++ b/flow/record/fieldtypes/__init__.py @@ -755,33 +755,24 @@ class command(FieldType): executable: path | None = None args: list[str] | None = None - _raw: str | None = None - _path_type: type[path] = None - _posix: bool = True + _raw: str + _path_type: path - def __new__(cls, value: str | None = None): - if cls is not command or value is None: + def __new__(cls, value: str | None = None, path_type: type[path] = path): + if cls is not command: return super().__new__(cls) - if not isinstance(value, str): + if not isinstance(value, str) and value is not None: raise TypeError(f"Expected a value of type 'str' not {type(value)}") - # pre checking for windows like paths - # This checks for windows like starts of a path: - # an '%' for an environment variable - # r'\\' for a UNC path - # the strip and check for ":" on the second line is for `:` - stripped_value = value.lstrip("\"'") - windows = value.startswith((r"\\", "%")) or (len(stripped_value) >= 2 and stripped_value[1] == ":") - - cls = windows_command if windows else posix_command return super().__new__(cls) - def __init__(self, value: str | None = None): - if value is None: - return + def __init__(self, value: str | None = None, path_type: type[path] = path): + self._raw = (value or "").strip() + + # Detect the kind of path from the passed value + self._path_type = type(path(self._raw.lstrip("\"'"))) if path_type is path else path_type - self._raw = value self.executable, self.args = self._split(self._raw) def __repr__(self) -> str: @@ -798,52 +789,38 @@ def __eq__(self, other: object) -> bool: return False def _split(self, value: str) -> tuple[path, list[str]]: - if value: - executable, *args = shlex.split(value, posix=self._posix) - executable = executable.strip("'\" ") - else: - executable = "" - args = [] + if not (value): + return self._path_type(), [] + + executable, *args = shlex.split(value, posix=self._path_type is posix_path) + executable = self._path_type(executable.strip("'\" ")) - return self._path_type(executable), args + if self._path_type is windows_path: + args = [" ".join(args)] if args else args + + return executable, args def _join(self) -> str: return self._raw def _pack(self) -> tuple[str, int]: - command_type = TYPE_WINDOWS if isinstance(self, windows_command) else TYPE_POSIX - if self.executable: - return (self._raw, command_type) - return (None, command_type) + path_type = TYPE_WINDOWS if self._path_type is windows_path else TYPE_POSIX + return self._raw, path_type @classmethod - def _unpack(cls, data: tuple[str | None, int]) -> command: - _value, _type = data - if _type == TYPE_WINDOWS: - return windows_command(_value) - - return posix_command(_value) + def _unpack(cls, data: tuple[str, int]) -> command: + raw_str, path_type = data + if path_type == TYPE_POSIX: + return command(raw_str, posix_path) + if path_type == TYPE_WINDOWS: + return command(raw_str, windows_path) + # default, infer type of path from str + return command(raw_str) @classmethod def from_posix(cls, value: str) -> command: - return posix_command(value) + return command(value, posix_path) @classmethod def from_windows(cls, value: str) -> command: - return windows_command(value) - - -class posix_command(command): - _posix = True - _path_type = posix_path - - -class windows_command(command): - _posix = False - _path_type = windows_path - - def _split(self, value: str) -> tuple[path, list[str]]: - executable, _args = super()._split(value) - args = [" ".join(_args)] if _args else _args - - return executable, args + return command(value, windows_path) diff --git a/tests/adapter/test_json.py b/tests/adapter/test_json.py index 5d81427d..5da62923 100644 --- a/tests/adapter/test_json.py +++ b/tests/adapter/test_json.py @@ -5,7 +5,7 @@ import pytest -from flow.record import RecordReader, RecordWriter, fieldtypes +from flow.record import RecordReader, RecordWriter from flow.record.base import RecordDescriptor from tests._utils import generate_records @@ -134,12 +134,12 @@ def test_json_command_fieldtype(tmp_path: Path) -> None: writer.write( TestRecord( - commando=fieldtypes.windows_command("C:\\help.exe data"), + commando="C:\\help.exe data", ) ) writer.write( TestRecord( - commando=fieldtypes.posix_command("/usr/bin/env bash"), + commando="/usr/bin/env bash", ) ) writer.write(TestRecord()) diff --git a/tests/adapter/test_xlsx.py b/tests/adapter/test_xlsx.py index 558b7234..d495b714 100644 --- a/tests/adapter/test_xlsx.py +++ b/tests/adapter/test_xlsx.py @@ -39,9 +39,9 @@ def test_sanitize_field_values(mock_openpyxl_package: MagicMock) -> None: fieldtypes.net.ipaddress("13.37.13.37"), ["Shaken", "Not", "Stirred"], fieldtypes.posix_path("/home/user"), - fieldtypes.posix_command("/bin/bash -c 'echo hello world'"), + fieldtypes.command.from_posix("/bin/bash -c 'echo hello world'"), fieldtypes.windows_path("C:\\Users\\user\\Desktop"), - fieldtypes.windows_command("C:\\Some.exe /?"), + fieldtypes.command.from_windows("C:\\Some.exe /?"), ] ) ) == [ diff --git a/tests/fieldtypes/test_fieldtypes.py b/tests/fieldtypes/test_fieldtypes.py index 4e19434c..f169f00d 100644 --- a/tests/fieldtypes/test_fieldtypes.py +++ b/tests/fieldtypes/test_fieldtypes.py @@ -22,10 +22,8 @@ command, fieldtype_for_value, net, - posix_command, posix_path, uri, - windows_command, windows_path, ) from flow.record.fieldtypes import datetime as dt @@ -1033,20 +1031,16 @@ def test_datetime_comparisions() -> None: assert dt("2023-01-02") != datetime(2023, 3, 4, tzinfo=UTC) -@pytest.mark.parametrize( - "command_cls", - [ - fieldtypes.posix_command, - fieldtypes.windows_command, - fieldtypes.command, - ], -) -def test_empty_command(command_cls: type[command]) -> None: - command = command_cls() - assert command.executable is None - assert command.args is None +def test_empty_command() -> None: + command = fieldtypes.command() + assert command.executable == "" + assert command.args == [] - command = command_cls("") + command = fieldtypes.command("") + assert command.executable == "" + assert command.args == [] + + command = fieldtypes.command(" ") assert command.executable == "" assert command.args == [] @@ -1060,12 +1054,12 @@ def test_command_record() -> None: ) record = TestRecord(commando="help.exe -h") - assert isinstance(record.commando, posix_command) + assert isinstance(record.commando.executable, posix_path) assert record.commando.executable == "help.exe" assert record.commando.args == ["-h"] record = TestRecord(commando="something.so -h -q -something") - assert isinstance(record.commando, posix_command) + assert isinstance(record.commando.executable, posix_path) assert record.commando.executable == "something.so" assert record.commando.args == ["-h", "-q", "-something"] @@ -1103,8 +1097,26 @@ def test_command_integration_none(tmp_path: pathlib.Path) -> None: writer.write(record) with RecordReader(tmp_path / "command_record") as reader: for record in reader: - assert record.commando.executable is None - assert record.commando.args is None + assert record.commando.executable == "" + assert record.commando.args == [] + + +def test_integration_correct_path(tmp_path: pathlib.Path) -> None: + TestRecord = RecordDescriptor( + "test/command", + [ + ("command", "commando"), + ], + ) + + with RecordWriter(tmp_path / "command_record") as writer: + record = TestRecord(commando=command.from_windows("hello.exe -d")) + writer.write(record) + with RecordReader(tmp_path / "command_record") as reader: + for record in reader: + assert record.commando.executable == "hello.exe" + assert isinstance(record.commando.executable, windows_path) + assert record.commando.args == ["-d"] @pytest.mark.parametrize( @@ -1133,13 +1145,14 @@ def test_command_integration_none(tmp_path: pathlib.Path) -> None: # Test an empty command string (r"''", r"", []), # Test None - (None, None, None), + (None, "", []), ], ) def test_command_windows(command_string: str, expected_executable: str, expected_argument: list[str]) -> None: - cmd = windows_command(command_string) + cmd = command.from_windows(command_string) assert cmd.executable == expected_executable + assert isinstance(cmd.executable, windows_path) assert cmd.args == expected_argument @@ -1150,12 +1163,14 @@ def test_command_windows(command_string: str, expected_executable: str, expected ("some_file.so -h asdsad -f asdsadas", "some_file.so", ["-h", "asdsad", "-f", "asdsadas"]), # Test command with spaces (r"/bin/hello\ world -h -word", r"/bin/hello world", ["-h", "-word"]), + (r" /bin/hello\ world", r"/bin/hello world", []), ], ) def test_command_posix(command_string: str, expected_executable: str, expected_argument: list[str]) -> None: - cmd = posix_command(command_string) + cmd = command(command_string) assert cmd.executable == expected_executable + assert isinstance(cmd.executable, fieldtypes.posix_path) assert cmd.args == expected_argument From 218541627f2986122592e7d99ad73cf2c1fd032b Mon Sep 17 00:00:00 2001 From: Miauwkeru Date: Thu, 21 Aug 2025 14:52:01 +0000 Subject: [PATCH 07/18] enforce command.from_posix on test_command_posix --- tests/fieldtypes/test_fieldtypes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/fieldtypes/test_fieldtypes.py b/tests/fieldtypes/test_fieldtypes.py index f169f00d..7706d944 100644 --- a/tests/fieldtypes/test_fieldtypes.py +++ b/tests/fieldtypes/test_fieldtypes.py @@ -1167,7 +1167,7 @@ def test_command_windows(command_string: str, expected_executable: str, expected ], ) def test_command_posix(command_string: str, expected_executable: str, expected_argument: list[str]) -> None: - cmd = command(command_string) + cmd = command.from_posix(command_string) assert cmd.executable == expected_executable assert isinstance(cmd.executable, fieldtypes.posix_path) From 81bcb0326a69de90f514d428b8470a25b10ef0ca Mon Sep 17 00:00:00 2001 From: Miauwkeru Date: Fri, 22 Aug 2025 11:51:19 +0200 Subject: [PATCH 08/18] Apply suggestions from code review Co-authored-by: Yun Zheng Hu --- flow/record/fieldtypes/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flow/record/fieldtypes/__init__.py b/flow/record/fieldtypes/__init__.py index 28349ee4..72bc4fe2 100644 --- a/flow/record/fieldtypes/__init__.py +++ b/flow/record/fieldtypes/__init__.py @@ -767,11 +767,11 @@ def __new__(cls, value: str | None = None, path_type: type[path] = path): return super().__new__(cls) - def __init__(self, value: str | None = None, path_type: type[path] = path): + def __init__(self, value: str | None = None, path_type: type[path] | None = None): self._raw = (value or "").strip() - # Detect the kind of path from the passed value - self._path_type = type(path(self._raw.lstrip("\"'"))) if path_type is path else path_type + # Detect the kind of path from value if not specified + self._path_type = path_type or type(path(self._raw.lstrip("\"'"))) self.executable, self.args = self._split(self._raw) From a0bb171734367629de6ac5d85582b821e7df1f8d Mon Sep 17 00:00:00 2001 From: Miauwkeru Date: Fri, 22 Aug 2025 10:07:36 +0000 Subject: [PATCH 09/18] Add docstring to command type --- flow/record/fieldtypes/__init__.py | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/flow/record/fieldtypes/__init__.py b/flow/record/fieldtypes/__init__.py index 72bc4fe2..14fb8ed8 100644 --- a/flow/record/fieldtypes/__init__.py +++ b/flow/record/fieldtypes/__init__.py @@ -752,13 +752,29 @@ def __repr__(self) -> str: class command(FieldType): + """The command fieldtype splits a command string into an ``executable`` and its arguments. + + Args: + value: the string that contains the command and arguments + path_type: When specified it forces the command to use a specific path type + + Example: + + 'c:\\windows\\malware.exe /info' -> windows_path('c:\\windows\\malware.exe) ['/info'] + '/usr/bin/env bash' -> posix_path('/usr/bin/env') ['bash'] + + # In this situation, the executable path needs to be quoted. + 'c:\\user\\John Doe\\malware.exe /all /the /things' -> windows_path('c:\\user\\John') + ['Doe\\malware.exe /all /the /things'] + """ + executable: path | None = None args: list[str] | None = None _raw: str - _path_type: path + _path_type: type[path] - def __new__(cls, value: str | None = None, path_type: type[path] = path): + def __new__(cls, value: str | None = None, path_type: type[path] | None = None): if cls is not command: return super().__new__(cls) From 8257bb3d549f369cfc2a2599c204784f52f73661 Mon Sep 17 00:00:00 2001 From: Miauwkeru Date: Fri, 22 Aug 2025 10:07:52 +0000 Subject: [PATCH 10/18] Fix test_command_record --- tests/fieldtypes/test_fieldtypes.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/fieldtypes/test_fieldtypes.py b/tests/fieldtypes/test_fieldtypes.py index 7706d944..5393b677 100644 --- a/tests/fieldtypes/test_fieldtypes.py +++ b/tests/fieldtypes/test_fieldtypes.py @@ -1053,13 +1053,16 @@ def test_command_record() -> None: ], ) + # path defaults to type depending on the os it runs on, se we emulate this here + _type = windows_path if os.name == "nt" else posix_path + record = TestRecord(commando="help.exe -h") - assert isinstance(record.commando.executable, posix_path) + assert isinstance(record.commando.executable, _type) assert record.commando.executable == "help.exe" assert record.commando.args == ["-h"] record = TestRecord(commando="something.so -h -q -something") - assert isinstance(record.commando.executable, posix_path) + assert isinstance(record.commando.executable, _type) assert record.commando.executable == "something.so" assert record.commando.args == ["-h", "-q", "-something"] From 4f99c98ef863330c3d855c9b2ee5c835e8e34707 Mon Sep 17 00:00:00 2001 From: Miauwkeru Date: Fri, 22 Aug 2025 10:25:49 +0000 Subject: [PATCH 11/18] Fix issues --- flow/record/fieldtypes/__init__.py | 2 ++ tests/fieldtypes/test_fieldtypes.py | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/flow/record/fieldtypes/__init__.py b/flow/record/fieldtypes/__init__.py index 14fb8ed8..ee0d5fb3 100644 --- a/flow/record/fieldtypes/__init__.py +++ b/flow/record/fieldtypes/__init__.py @@ -760,6 +760,8 @@ class command(FieldType): Example: + .. code-block:: text + 'c:\\windows\\malware.exe /info' -> windows_path('c:\\windows\\malware.exe) ['/info'] '/usr/bin/env bash' -> posix_path('/usr/bin/env') ['bash'] diff --git a/tests/fieldtypes/test_fieldtypes.py b/tests/fieldtypes/test_fieldtypes.py index 5393b677..4136e380 100644 --- a/tests/fieldtypes/test_fieldtypes.py +++ b/tests/fieldtypes/test_fieldtypes.py @@ -1062,9 +1062,10 @@ def test_command_record() -> None: assert record.commando.args == ["-h"] record = TestRecord(commando="something.so -h -q -something") + args = ["-h", "-q", "-something"] assert isinstance(record.commando.executable, _type) assert record.commando.executable == "something.so" - assert record.commando.args == ["-h", "-q", "-something"] + assert record.commando.args == [" ".join(args)] if os.name == "nt" else args def test_command_integration(tmp_path: pathlib.Path) -> None: From 3ab677132b590c472c471690dfc204903aa8d919 Mon Sep 17 00:00:00 2001 From: Yun Zheng Hu Date: Mon, 27 Oct 2025 13:13:02 +0000 Subject: [PATCH 12/18] Refactored command a bit * Move the `str` type check into __init__ * Make the command value explicity `str` type. So cannot be initialized using None * mark `path_type` keyword argument only for clarity --- flow/record/fieldtypes/__init__.py | 19 +++++++------------ tests/fieldtypes/test_fieldtypes.py | 16 ++++++++++++---- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/flow/record/fieldtypes/__init__.py b/flow/record/fieldtypes/__init__.py index ee0d5fb3..f19eb992 100644 --- a/flow/record/fieldtypes/__init__.py +++ b/flow/record/fieldtypes/__init__.py @@ -776,17 +776,12 @@ class command(FieldType): _raw: str _path_type: type[path] - def __new__(cls, value: str | None = None, path_type: type[path] | None = None): - if cls is not command: - return super().__new__(cls) + def __init__(self, value: str = "", *, path_type: type[path] | None = None): - if not isinstance(value, str) and value is not None: + if not isinstance(value, str): raise TypeError(f"Expected a value of type 'str' not {type(value)}") - return super().__new__(cls) - - def __init__(self, value: str | None = None, path_type: type[path] | None = None): - self._raw = (value or "").strip() + self._raw = value.strip() # Detect the kind of path from value if not specified self._path_type = path_type or type(path(self._raw.lstrip("\"'"))) @@ -829,16 +824,16 @@ def _pack(self) -> tuple[str, int]: def _unpack(cls, data: tuple[str, int]) -> command: raw_str, path_type = data if path_type == TYPE_POSIX: - return command(raw_str, posix_path) + return command(raw_str, path_type=posix_path) if path_type == TYPE_WINDOWS: - return command(raw_str, windows_path) + return command(raw_str, path_type=windows_path) # default, infer type of path from str return command(raw_str) @classmethod def from_posix(cls, value: str) -> command: - return command(value, posix_path) + return command(value, path_type=posix_path) @classmethod def from_windows(cls, value: str) -> command: - return command(value, windows_path) + return command(value, path_type=windows_path) diff --git a/tests/fieldtypes/test_fieldtypes.py b/tests/fieldtypes/test_fieldtypes.py index 4136e380..4ebc3f48 100644 --- a/tests/fieldtypes/test_fieldtypes.py +++ b/tests/fieldtypes/test_fieldtypes.py @@ -1053,7 +1053,7 @@ def test_command_record() -> None: ], ) - # path defaults to type depending on the os it runs on, se we emulate this here + # path defaults to type depending on the os it runs on, so we emulate this here _type = windows_path if os.name == "nt" else posix_path record = TestRecord(commando="help.exe -h") @@ -1096,11 +1096,21 @@ def test_command_integration_none(tmp_path: pathlib.Path) -> None: ], ) + # None with RecordWriter(tmp_path / "command_record") as writer: - record = TestRecord(commando=command.from_posix(None)) + record = TestRecord(commando=None) writer.write(record) with RecordReader(tmp_path / "command_record") as reader: for record in reader: + assert record.commando is None + + # empty string + with RecordWriter(tmp_path / "command_record") as writer: + record = TestRecord(commando="") + writer.write(record) + with RecordReader(tmp_path / "command_record") as reader: + for record in reader: + assert record.commando == "" assert record.commando.executable == "" assert record.commando.args == [] @@ -1148,8 +1158,6 @@ def test_integration_correct_path(tmp_path: pathlib.Path) -> None: (r"\\192.168.1.2\Users\test\hello.exe /d /a", r"\\192.168.1.2\Users\test\hello.exe", [r"/d /a"]), # Test an empty command string (r"''", r"", []), - # Test None - (None, "", []), ], ) def test_command_windows(command_string: str, expected_executable: str, expected_argument: list[str]) -> None: From cdac7221049843e47dce1805e00b876453b652a4 Mon Sep 17 00:00:00 2001 From: Yun Zheng Hu Date: Mon, 27 Oct 2025 13:33:11 +0000 Subject: [PATCH 13/18] Fix linting and add single and double quote command test --- flow/record/fieldtypes/__init__.py | 1 - tests/fieldtypes/test_fieldtypes.py | 8 +++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/flow/record/fieldtypes/__init__.py b/flow/record/fieldtypes/__init__.py index f19eb992..c7ba4e55 100644 --- a/flow/record/fieldtypes/__init__.py +++ b/flow/record/fieldtypes/__init__.py @@ -777,7 +777,6 @@ class command(FieldType): _path_type: type[path] def __init__(self, value: str = "", *, path_type: type[path] | None = None): - if not isinstance(value, str): raise TypeError(f"Expected a value of type 'str' not {type(value)}") diff --git a/tests/fieldtypes/test_fieldtypes.py b/tests/fieldtypes/test_fieldtypes.py index 4ebc3f48..58a50b4e 100644 --- a/tests/fieldtypes/test_fieldtypes.py +++ b/tests/fieldtypes/test_fieldtypes.py @@ -1146,8 +1146,10 @@ def test_integration_correct_path(tmp_path: pathlib.Path) -> None: r"%WINDIR%\\windows.dll", ["something,or,somethingelse"], ), - # Test a quoted path + # Test a single quoted path (r"'c:\path to some exe' /d /a", r"c:\path to some exe", [r"/d /a"]), + # Test a double quoted path + (r'"c:\path to some exe" /d /a', r"c:\path to some exe", [r"/d /a"]), # Test a unquoted path (r"\Users\test\hello.exe", r"\Users\test\hello.exe", []), # Test an unquoted path with a path as argument @@ -1176,6 +1178,10 @@ def test_command_windows(command_string: str, expected_executable: str, expected # Test command with spaces (r"/bin/hello\ world -h -word", r"/bin/hello world", ["-h", "-word"]), (r" /bin/hello\ world", r"/bin/hello world", []), + # Test single quoted command + (r"'/tmp/ /test/hello' -h -word", r"/tmp/ /test/hello", ["-h", "-word"]), + # Test double quoted command + (r'"/tmp/ /test/hello" -h -word', r"/tmp/ /test/hello", ["-h", "-word"]), ], ) def test_command_posix(command_string: str, expected_executable: str, expected_argument: list[str]) -> None: From f6551d8ed14c2aa704867638f1ad0594ca351bbe Mon Sep 17 00:00:00 2001 From: Miauwkeru Date: Tue, 28 Oct 2025 13:52:05 +0100 Subject: [PATCH 14/18] Change command.args into a tuple --- flow/record/fieldtypes/__init__.py | 10 +++--- tests/adapter/test_json.py | 4 +-- tests/fieldtypes/test_fieldtypes.py | 52 ++++++++++++++--------------- 3 files changed, 33 insertions(+), 33 deletions(-) diff --git a/flow/record/fieldtypes/__init__.py b/flow/record/fieldtypes/__init__.py index c7ba4e55..2967dc16 100644 --- a/flow/record/fieldtypes/__init__.py +++ b/flow/record/fieldtypes/__init__.py @@ -771,7 +771,7 @@ class command(FieldType): """ executable: path | None = None - args: list[str] | None = None + args: tuple[str] | None = None _raw: str _path_type: type[path] @@ -796,13 +796,13 @@ def __eq__(self, other: object) -> bool: if isinstance(other, str): return self._raw == other if isinstance(other, (tuple, list)): - return self.executable == other[0] and self.args == list(other[1:]) + return self.executable == other[0] and self.args == (*other[1:],) return False - def _split(self, value: str) -> tuple[path, list[str]]: + def _split(self, value: str) -> tuple[path, tuple[str, ...]]: if not (value): - return self._path_type(), [] + return self._path_type(), () executable, *args = shlex.split(value, posix=self._path_type is posix_path) executable = self._path_type(executable.strip("'\" ")) @@ -810,7 +810,7 @@ def _split(self, value: str) -> tuple[path, list[str]]: if self._path_type is windows_path: args = [" ".join(args)] if args else args - return executable, args + return executable, (*args,) def _join(self) -> str: return self._raw diff --git a/tests/adapter/test_json.py b/tests/adapter/test_json.py index 5da62923..074cfd51 100644 --- a/tests/adapter/test_json.py +++ b/tests/adapter/test_json.py @@ -149,9 +149,9 @@ def test_json_command_fieldtype(tmp_path: Path) -> None: records = list(reader) assert records[0].commando.executable == "C:\\help.exe" - assert records[0].commando.args == ["data"] + assert records[0].commando.args == ("data",) assert records[1].commando.executable == "/usr/bin/env" - assert records[1].commando.args == ["bash"] + assert records[1].commando.args == ("bash",) assert len(records) == 3 diff --git a/tests/fieldtypes/test_fieldtypes.py b/tests/fieldtypes/test_fieldtypes.py index 58a50b4e..f7c1e996 100644 --- a/tests/fieldtypes/test_fieldtypes.py +++ b/tests/fieldtypes/test_fieldtypes.py @@ -1034,15 +1034,15 @@ def test_datetime_comparisions() -> None: def test_empty_command() -> None: command = fieldtypes.command() assert command.executable == "" - assert command.args == [] + assert command.args == () command = fieldtypes.command("") assert command.executable == "" - assert command.args == [] + assert command.args == () command = fieldtypes.command(" ") assert command.executable == "" - assert command.args == [] + assert command.args == () def test_command_record() -> None: @@ -1059,13 +1059,13 @@ def test_command_record() -> None: record = TestRecord(commando="help.exe -h") assert isinstance(record.commando.executable, _type) assert record.commando.executable == "help.exe" - assert record.commando.args == ["-h"] + assert record.commando.args == ("-h",) record = TestRecord(commando="something.so -h -q -something") - args = ["-h", "-q", "-something"] + args = ("-h", "-q", "-something") assert isinstance(record.commando.executable, _type) assert record.commando.executable == "something.so" - assert record.commando.args == [" ".join(args)] if os.name == "nt" else args + assert record.commando.args == (" ".join(args),) if os.name == "nt" else args def test_command_integration(tmp_path: pathlib.Path) -> None: @@ -1080,12 +1080,12 @@ def test_command_integration(tmp_path: pathlib.Path) -> None: record = TestRecord(commando=r"\\.\\?\some_command.exe -h,help /d quiet") writer.write(record) assert record.commando.executable == r"\\.\\?\some_command.exe" - assert record.commando.args == [r"-h,help /d quiet"] + assert record.commando.args == (r"-h,help /d quiet",) with RecordReader(tmp_path / "command_record") as reader: for record in reader: assert record.commando.executable == r"\\.\\?\some_command.exe" - assert record.commando.args == [r"-h,help /d quiet"] + assert record.commando.args == (r"-h,help /d quiet",) def test_command_integration_none(tmp_path: pathlib.Path) -> None: @@ -1112,7 +1112,7 @@ def test_command_integration_none(tmp_path: pathlib.Path) -> None: for record in reader: assert record.commando == "" assert record.commando.executable == "" - assert record.commando.args == [] + assert record.commando.args == () def test_integration_correct_path(tmp_path: pathlib.Path) -> None: @@ -1130,39 +1130,39 @@ def test_integration_correct_path(tmp_path: pathlib.Path) -> None: for record in reader: assert record.commando.executable == "hello.exe" assert isinstance(record.commando.executable, windows_path) - assert record.commando.args == ["-d"] + assert record.commando.args == ("-d",) @pytest.mark.parametrize( ("command_string", "expected_executable", "expected_argument"), [ # Test relative windows paths - ("windows.exe something,or,somethingelse", "windows.exe", ["something,or,somethingelse"]), + ("windows.exe something,or,somethingelse", "windows.exe", ("something,or,somethingelse",)), # Test weird command strings for windows - ("windows.dll something,or,somethingelse", "windows.dll", ["something,or,somethingelse"]), + ("windows.dll something,or,somethingelse", "windows.dll", ("something,or,somethingelse",)), # Test environment variables ( r"%WINDIR%\\windows.dll something,or,somethingelse", r"%WINDIR%\\windows.dll", - ["something,or,somethingelse"], + ("something,or,somethingelse",), ), # Test a single quoted path - (r"'c:\path to some exe' /d /a", r"c:\path to some exe", [r"/d /a"]), + (r"'c:\path to some exe' /d /a", r"c:\path to some exe", (r"/d /a",)), # Test a double quoted path - (r'"c:\path to some exe" /d /a', r"c:\path to some exe", [r"/d /a"]), + (r'"c:\path to some exe" /d /a', r"c:\path to some exe", (r"/d /a",)), # Test a unquoted path - (r"\Users\test\hello.exe", r"\Users\test\hello.exe", []), + (r"\Users\test\hello.exe", r"\Users\test\hello.exe", ()), # Test an unquoted path with a path as argument - (r"\Users\test\hello.exe c:\startmepls.exe", r"\Users\test\hello.exe", [r"c:\startmepls.exe"]), + (r"\Users\test\hello.exe c:\startmepls.exe", r"\Users\test\hello.exe", (r"c:\startmepls.exe",)), # Test a quoted UNC path - (r"'\\192.168.1.2\Program Files\hello.exe'", r"\\192.168.1.2\Program Files\hello.exe", []), + (r"'\\192.168.1.2\Program Files\hello.exe'", r"\\192.168.1.2\Program Files\hello.exe", ()), # Test an unquoted UNC path - (r"\\192.168.1.2\Users\test\hello.exe /d /a", r"\\192.168.1.2\Users\test\hello.exe", [r"/d /a"]), + (r"\\192.168.1.2\Users\test\hello.exe /d /a", r"\\192.168.1.2\Users\test\hello.exe", (r"/d /a",)), # Test an empty command string - (r"''", r"", []), + (r"''", r"", ()), ], ) -def test_command_windows(command_string: str, expected_executable: str, expected_argument: list[str]) -> None: +def test_command_windows(command_string: str, expected_executable: str, expected_argument: tuple[str, ...]) -> None: cmd = command.from_windows(command_string) assert cmd.executable == expected_executable @@ -1174,14 +1174,14 @@ def test_command_windows(command_string: str, expected_executable: str, expected ("command_string", "expected_executable", "expected_argument"), [ # Test relative posix command - ("some_file.so -h asdsad -f asdsadas", "some_file.so", ["-h", "asdsad", "-f", "asdsadas"]), + ("some_file.so -h asdsad -f asdsadas", "some_file.so", ("-h", "asdsad", "-f", "asdsadas")), # Test command with spaces - (r"/bin/hello\ world -h -word", r"/bin/hello world", ["-h", "-word"]), - (r" /bin/hello\ world", r"/bin/hello world", []), + (r"/bin/hello\ world -h -word", r"/bin/hello world", ("-h", "-word")), + (r" /bin/hello\ world", r"/bin/hello world", ()), # Test single quoted command - (r"'/tmp/ /test/hello' -h -word", r"/tmp/ /test/hello", ["-h", "-word"]), + (r"'/tmp/ /test/hello' -h -word", r"/tmp/ /test/hello", ("-h", "-word")), # Test double quoted command - (r'"/tmp/ /test/hello" -h -word', r"/tmp/ /test/hello", ["-h", "-word"]), + (r'"/tmp/ /test/hello" -h -word', r"/tmp/ /test/hello", ("-h", "-word")), ], ) def test_command_posix(command_string: str, expected_executable: str, expected_argument: list[str]) -> None: From c7994de592ee23b7a665d28c769b34d5841cb22f Mon Sep 17 00:00:00 2001 From: Miauwkeru Date: Wed, 10 Dec 2025 18:13:53 +0100 Subject: [PATCH 15/18] Add tests and functionality to assign executable and args --- flow/record/fieldtypes/__init__.py | 72 +++++++++++++++++++++-------- flow/record/jsonpacker.py | 2 +- tests/fieldtypes/test_fieldtypes.py | 46 ++++++++++++++++-- 3 files changed, 96 insertions(+), 24 deletions(-) diff --git a/flow/record/fieldtypes/__init__.py b/flow/record/fieldtypes/__init__.py index 2967dc16..01e008dc 100644 --- a/flow/record/fieldtypes/__init__.py +++ b/flow/record/fieldtypes/__init__.py @@ -770,22 +770,21 @@ class command(FieldType): ['Doe\\malware.exe /all /the /things'] """ - executable: path | None = None - args: tuple[str] | None = None + __executable: path + __args: tuple[str, ...] - _raw: str - _path_type: type[path] + __path_type: type[path] def __init__(self, value: str = "", *, path_type: type[path] | None = None): if not isinstance(value, str): raise TypeError(f"Expected a value of type 'str' not {type(value)}") - self._raw = value.strip() + raw = value.strip() # Detect the kind of path from value if not specified - self._path_type = path_type or type(path(self._raw.lstrip("\"'"))) + self.__path_type = path_type or type(path(raw.lstrip("\"'"))) - self.executable, self.args = self._split(self._raw) + self.executable, self.args = self._split(raw) def __repr__(self) -> str: return f"(executable={self.executable!r}, args={self.args})" @@ -794,30 +793,25 @@ def __eq__(self, other: object) -> bool: if isinstance(other, command): return self.executable == other.executable and self.args == other.args if isinstance(other, str): - return self._raw == other + return self.raw == other if isinstance(other, (tuple, list)): return self.executable == other[0] and self.args == (*other[1:],) return False - def _split(self, value: str) -> tuple[path, tuple[str, ...]]: + def _split(self, value: str) -> tuple[str, tuple[str, ...]]: if not (value): - return self._path_type(), () + return "", () - executable, *args = shlex.split(value, posix=self._path_type is posix_path) - executable = self._path_type(executable.strip("'\" ")) - - if self._path_type is windows_path: - args = [" ".join(args)] if args else args - - return executable, (*args,) + executable, *args = shlex.split(value, posix=self.__path_type is posix_path) + return executable.strip("'\" "), (*args,) def _join(self) -> str: - return self._raw + return self.raw def _pack(self) -> tuple[str, int]: - path_type = TYPE_WINDOWS if self._path_type is windows_path else TYPE_POSIX - return self._raw, path_type + path_type = TYPE_WINDOWS if self.__path_type is windows_path else TYPE_POSIX + return self.raw, path_type @classmethod def _unpack(cls, data: tuple[str, int]) -> command: @@ -829,6 +823,44 @@ def _unpack(cls, data: tuple[str, int]) -> command: # default, infer type of path from str return command(raw_str) + @property + def executable(self) -> path: + return self.__executable + + @property + def args(self) -> tuple[str, ...]: + return self.__args + + @executable.setter + def executable(self, val: str | path | None) -> None: + self.__executable = self.__path_type(val) + + @args.setter + def args(self, val: str | tuple[str, ...] | list[str] | None) -> None: + if val is None: + self.__args = () + elif isinstance(val, (tuple, list)): + if val and self.__path_type is windows_path: + val = (" ".join(val),) + self.__args = (*val,) + else: + self.__args = tuple(shlex.split(val, posix=self.__path_type is posix_path)) + + @property + def raw(self) -> str: + exe = str(self.executable) + + if " " in exe: + exe = shlex.quote(exe) + + result = [exe] + if self.__path_type is posix_path: + result.extend(shlex.quote(part) if " " in part else part for part in self.args) + else: + result.extend(self.args) + + return " ".join(result) + @classmethod def from_posix(cls, value: str) -> command: return command(value, path_type=posix_path) diff --git a/flow/record/jsonpacker.py b/flow/record/jsonpacker.py index 2c772332..344b5538 100644 --- a/flow/record/jsonpacker.py +++ b/flow/record/jsonpacker.py @@ -75,7 +75,7 @@ def pack_obj(self, obj: Any) -> dict | str: if isinstance(obj, fieldtypes.path): return str(obj) if isinstance(obj, fieldtypes.command): - return obj._raw + return obj.raw raise TypeError(f"Unpackable type {type(obj)}") diff --git a/tests/fieldtypes/test_fieldtypes.py b/tests/fieldtypes/test_fieldtypes.py index f7c1e996..a63f282a 100644 --- a/tests/fieldtypes/test_fieldtypes.py +++ b/tests/fieldtypes/test_fieldtypes.py @@ -1077,15 +1077,16 @@ def test_command_integration(tmp_path: pathlib.Path) -> None: ) with RecordWriter(tmp_path / "command_record") as writer: - record = TestRecord(commando=r"\\.\\?\some_command.exe -h,help /d quiet") + record = TestRecord(commando=r"\.\?\some_command.exe -h,help /d quiet") writer.write(record) - assert record.commando.executable == r"\\.\\?\some_command.exe" + assert record.commando.executable == r"\.\?\some_command.exe" assert record.commando.args == (r"-h,help /d quiet",) with RecordReader(tmp_path / "command_record") as reader: for record in reader: - assert record.commando.executable == r"\\.\\?\some_command.exe" + assert record.commando.executable == r"\.\?\some_command.exe" assert record.commando.args == (r"-h,help /d quiet",) + assert record.commando.raw == r"\?\some_command.exe -h,help /d quiet" def test_command_integration_none(tmp_path: pathlib.Path) -> None: @@ -1113,6 +1114,7 @@ def test_command_integration_none(tmp_path: pathlib.Path) -> None: assert record.commando == "" assert record.commando.executable == "" assert record.commando.args == () + assert record.commando.raw == "" def test_integration_correct_path(tmp_path: pathlib.Path) -> None: @@ -1215,6 +1217,44 @@ def test_command_equal() -> None: assert command("hello.so") != ("hello.so", "") +def test_command_assign_posix() -> None: + _command = command("/") + + assert _command.raw == "/" + + # Test whether we can assign executable + _command.executable = "/path/to/home dir/" + assert _command.raw == "'/path/to/home dir'" + + # Test whether it uses the underlying path + _command.executable = fieldtypes.windows_path("path\\to\\dir") + assert _command.raw == r"path/to/dir" + + # As it is windows, this should change to be one value + _command.args = ["command", "arguments", "for", "posix"] + assert _command.args == ("command", "arguments", "for", "posix") + assert _command.raw == r"path/to/dir command arguments for posix" + + _command.args = ("command", "-c", "command string") + assert _command.raw == "path/to/dir command -c 'command string'" + + +def test_command_assign_windows() -> None: + _command = command("c:\\") + + assert _command.raw == "c:\\" + + _command.executable = r"c:\windows\path" + assert _command.raw == r"c:\windows\path" + + _command.executable = r"c:\windows\path to file" + assert _command.raw == r"'c:\windows\path to file'" + + _command.args = ("command", "arguments", "for", "windows") + assert _command.args == ("command arguments for windows",) + assert _command.raw == r"'c:\windows\path to file' command arguments for windows" + + def test_command_failed() -> None: with pytest.raises(TypeError, match="Expected a value of type 'str'"): command(b"failed") From 44364711aa96f99d534acb2c9e8dccbe3ff8e028 Mon Sep 17 00:00:00 2001 From: Miauwkeru Date: Thu, 11 Dec 2025 13:14:26 +0100 Subject: [PATCH 16/18] Remove command:_join() --- flow/record/fieldtypes/__init__.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/flow/record/fieldtypes/__init__.py b/flow/record/fieldtypes/__init__.py index 01e008dc..ddeae14a 100644 --- a/flow/record/fieldtypes/__init__.py +++ b/flow/record/fieldtypes/__init__.py @@ -806,9 +806,6 @@ def _split(self, value: str) -> tuple[str, tuple[str, ...]]: executable, *args = shlex.split(value, posix=self.__path_type is posix_path) return executable.strip("'\" "), (*args,) - def _join(self) -> str: - return self.raw - def _pack(self) -> tuple[str, int]: path_type = TYPE_WINDOWS if self.__path_type is windows_path else TYPE_POSIX return self.raw, path_type From eef028746110dcc8e879548cf7809799a0d12b0c Mon Sep 17 00:00:00 2001 From: Miauwkeru Date: Thu, 11 Dec 2025 13:31:54 +0100 Subject: [PATCH 17/18] Fix potential issue when dealing with assigning args with a string on a windows command --- flow/record/fieldtypes/__init__.py | 15 +++++++++------ tests/fieldtypes/test_fieldtypes.py | 7 +++++++ 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/flow/record/fieldtypes/__init__.py b/flow/record/fieldtypes/__init__.py index ddeae14a..411371c7 100644 --- a/flow/record/fieldtypes/__init__.py +++ b/flow/record/fieldtypes/__init__.py @@ -836,12 +836,15 @@ def executable(self, val: str | path | None) -> None: def args(self, val: str | tuple[str, ...] | list[str] | None) -> None: if val is None: self.__args = () - elif isinstance(val, (tuple, list)): - if val and self.__path_type is windows_path: - val = (" ".join(val),) - self.__args = (*val,) - else: - self.__args = tuple(shlex.split(val, posix=self.__path_type is posix_path)) + return + + if isinstance(val, str): + val = tuple(shlex.split(val, posix=self.__path_type is posix_path)) + + if val and self.__path_type is windows_path: + val = (" ".join(val),) + + self.__args = (*val,) @property def raw(self) -> str: diff --git a/tests/fieldtypes/test_fieldtypes.py b/tests/fieldtypes/test_fieldtypes.py index a63f282a..56c32e8d 100644 --- a/tests/fieldtypes/test_fieldtypes.py +++ b/tests/fieldtypes/test_fieldtypes.py @@ -1238,6 +1238,10 @@ def test_command_assign_posix() -> None: _command.args = ("command", "-c", "command string") assert _command.raw == "path/to/dir command -c 'command string'" + _command.args = "command -c 'command string2'" + assert _command.args == ("command", "-c", "command string2") + assert _command.raw == "path/to/dir command -c 'command string2'" + def test_command_assign_windows() -> None: _command = command("c:\\") @@ -1254,6 +1258,9 @@ def test_command_assign_windows() -> None: assert _command.args == ("command arguments for windows",) assert _command.raw == r"'c:\windows\path to file' command arguments for windows" + _command.args = "command arguments for windows2" + assert _command.args == ("command arguments for windows2",) + def test_command_failed() -> None: with pytest.raises(TypeError, match="Expected a value of type 'str'"): From f1cbabf00379d11a6f9ccb9d4b45445d2a88e5c2 Mon Sep 17 00:00:00 2001 From: Miauwkeru Date: Fri, 12 Dec 2025 11:43:01 +0100 Subject: [PATCH 18/18] Remove joining arguments for windows command --- flow/record/fieldtypes/__init__.py | 15 +++++++-------- tests/fieldtypes/test_fieldtypes.py | 25 +++++++++++++++---------- 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/flow/record/fieldtypes/__init__.py b/flow/record/fieldtypes/__init__.py index 411371c7..80cc89af 100644 --- a/flow/record/fieldtypes/__init__.py +++ b/flow/record/fieldtypes/__init__.py @@ -800,7 +800,7 @@ def __eq__(self, other: object) -> bool: return False def _split(self, value: str) -> tuple[str, tuple[str, ...]]: - if not (value): + if not value: return "", () executable, *args = shlex.split(value, posix=self.__path_type is posix_path) @@ -839,12 +839,11 @@ def args(self, val: str | tuple[str, ...] | list[str] | None) -> None: return if isinstance(val, str): - val = tuple(shlex.split(val, posix=self.__path_type is posix_path)) - - if val and self.__path_type is windows_path: - val = (" ".join(val),) - - self.__args = (*val,) + self.__args = tuple(shlex.split(val, posix=self.__path_type is posix_path)) + elif isinstance(val, list): + self.__args = tuple(val) + else: + self.__args = val @property def raw(self) -> str: @@ -854,11 +853,11 @@ def raw(self) -> str: exe = shlex.quote(exe) result = [exe] + # Only quote on posix paths as shlex doesn't remove the quotes on non posix paths if self.__path_type is posix_path: result.extend(shlex.quote(part) if " " in part else part for part in self.args) else: result.extend(self.args) - return " ".join(result) @classmethod diff --git a/tests/fieldtypes/test_fieldtypes.py b/tests/fieldtypes/test_fieldtypes.py index 56c32e8d..46e9a392 100644 --- a/tests/fieldtypes/test_fieldtypes.py +++ b/tests/fieldtypes/test_fieldtypes.py @@ -1065,7 +1065,7 @@ def test_command_record() -> None: args = ("-h", "-q", "-something") assert isinstance(record.commando.executable, _type) assert record.commando.executable == "something.so" - assert record.commando.args == (" ".join(args),) if os.name == "nt" else args + assert record.commando.args == args def test_command_integration(tmp_path: pathlib.Path) -> None: @@ -1080,12 +1080,12 @@ def test_command_integration(tmp_path: pathlib.Path) -> None: record = TestRecord(commando=r"\.\?\some_command.exe -h,help /d quiet") writer.write(record) assert record.commando.executable == r"\.\?\some_command.exe" - assert record.commando.args == (r"-h,help /d quiet",) + assert record.commando.args == (r"-h,help", "/d", "quiet") with RecordReader(tmp_path / "command_record") as reader: for record in reader: assert record.commando.executable == r"\.\?\some_command.exe" - assert record.commando.args == (r"-h,help /d quiet",) + assert record.commando.args == (r"-h,help", "/d", "quiet") assert record.commando.raw == r"\?\some_command.exe -h,help /d quiet" @@ -1149,9 +1149,9 @@ def test_integration_correct_path(tmp_path: pathlib.Path) -> None: ("something,or,somethingelse",), ), # Test a single quoted path - (r"'c:\path to some exe' /d /a", r"c:\path to some exe", (r"/d /a",)), + (r"'c:\path to some exe' /d /a", r"c:\path to some exe", ("/d", "/a")), # Test a double quoted path - (r'"c:\path to some exe" /d /a', r"c:\path to some exe", (r"/d /a",)), + (r'"c:\path to some exe" /d /a', r"c:\path to some exe", ("/d", "/a")), # Test a unquoted path (r"\Users\test\hello.exe", r"\Users\test\hello.exe", ()), # Test an unquoted path with a path as argument @@ -1159,7 +1159,7 @@ def test_integration_correct_path(tmp_path: pathlib.Path) -> None: # Test a quoted UNC path (r"'\\192.168.1.2\Program Files\hello.exe'", r"\\192.168.1.2\Program Files\hello.exe", ()), # Test an unquoted UNC path - (r"\\192.168.1.2\Users\test\hello.exe /d /a", r"\\192.168.1.2\Users\test\hello.exe", (r"/d /a",)), + (r"\\192.168.1.2\Users\test\hello.exe /d /a", r"\\192.168.1.2\Users\test\hello.exe", ("/d", "/a")), # Test an empty command string (r"''", r"", ()), ], @@ -1207,8 +1207,13 @@ def test_command_equal() -> None: # Compare paths that contain spaces assert command("'/home/some folder/file' -h") == "'/home/some folder/file' -h" assert command("'c:\\Program files\\some.dll' -h -q") == "'c:\\Program files\\some.dll' -h -q" - assert command("'c:\\program files\\some.dll' -h -q") == ["c:\\program files\\some.dll", "-h -q"] - assert command("'c:\\Program files\\some.dll' -h -q") == ("c:\\Program files\\some.dll", "-h -q") + assert command("'c:\\program files\\some.dll' -h -q") == ["c:\\program files\\some.dll", "-h", "-q"] + assert command("'c:\\Program files\\some.dll' -h -q") == ("c:\\Program files\\some.dll", "-h", "-q") + + assert ( + command(r"'c:\Program Files\some.dll' --command 'hello world'") + == r"'c:\Program Files\some.dll' --command 'hello world'" + ) # Test failure conditions assert command("hello.so -h") != 1 @@ -1255,11 +1260,11 @@ def test_command_assign_windows() -> None: assert _command.raw == r"'c:\windows\path to file'" _command.args = ("command", "arguments", "for", "windows") - assert _command.args == ("command arguments for windows",) + assert _command.args == ("command", "arguments", "for", "windows") assert _command.raw == r"'c:\windows\path to file' command arguments for windows" _command.args = "command arguments for windows2" - assert _command.args == ("command arguments for windows2",) + assert _command.args == ("command", "arguments", "for", "windows2") def test_command_failed() -> None: