Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 81 additions & 72 deletions flow/record/fieldtypes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,41 +752,39 @@ def __repr__(self) -> str:


class command(FieldType):
executable: path | None = None
args: list[str] | None = None
"""The command fieldtype splits a command string into an ``executable`` and its arguments.

_path_type: type[path] = None
_posix: bool
Args:
value: the string that contains the command and arguments
path_type: When specified it forces the command to use a specific path type

def __new__(cls, value: str):
if cls is not command:
return super().__new__(cls)
Example:

if not isinstance(value, str):
raise TypeError(f"Expected a value of type 'str' not {type(value)}")
.. code-block:: text

'c:\\windows\\malware.exe /info' -> windows_path('c:\\windows\\malware.exe) ['/info']
'/usr/bin/env bash' -> posix_path('/usr/bin/env') ['bash']

# pre checking for windows like paths
# This checks for windows like starts of a path:
# an '%' for an environment variable
# r'\\' for a UNC path
# the strip and check for ":" on the second line is for `<drive_letter>:`
stripped_value = value.lstrip("\"'")
windows = value.startswith((r"\\", "%")) or (len(stripped_value) >= 2 and stripped_value[1] == ":")
# In this situation, the executable path needs to be quoted.
'c:\\user\\John Doe\\malware.exe /all /the /things' -> windows_path('c:\\user\\John')
['Doe\\malware.exe /all /the /things']
"""

cls = windows_command if windows else posix_command
return super().__new__(cls)
__executable: path
__args: tuple[str, ...]

def __init__(self, value: str | tuple[str, tuple[str]] | None):
if value is None:
return
__path_type: type[path]

if isinstance(value, str):
self.executable, self.args = self._split(value)
return
def __init__(self, value: str = "", *, path_type: type[path] | None = None):
if not isinstance(value, str):
raise TypeError(f"Expected a value of type 'str' not {type(value)}")

raw = value.strip()

executable, self.args = value
self.executable = self._path_type(executable)
self.args = list(self.args)
# Detect the kind of path from value if not specified
self.__path_type = path_type or type(path(raw.lstrip("\"'")))

self.executable, self.args = self._split(raw)

def __repr__(self) -> str:
return f"(executable={self.executable!r}, args={self.args})"
Expand All @@ -795,66 +793,77 @@ def __eq__(self, other: object) -> bool:
if isinstance(other, command):
return self.executable == other.executable and self.args == other.args
if isinstance(other, str):
return self._join() == other
return self.raw == other
if isinstance(other, (tuple, list)):
return self.executable == other[0] and self.args == list(other[1:])
return self.executable == other[0] and self.args == (*other[1:],)

return False

def _split(self, value: str) -> tuple[str, list[str]]:
executable, *args = shlex.split(value, posix=self._posix)
executable = executable.strip("'\" ")

return self._path_type(executable), args
def _split(self, value: str) -> tuple[str, tuple[str, ...]]:
if not value:
return "", ()

def _join(self) -> str:
return shlex.join([str(self.executable), *self.args])
executable, *args = shlex.split(value, posix=self.__path_type is posix_path)
return executable.strip("'\" "), (*args,)

def _pack(self) -> tuple[tuple[str, list], str]:
command_type = TYPE_WINDOWS if isinstance(self, windows_command) else TYPE_POSIX
if self.executable:
_exec, _ = self.executable._pack()
return ((_exec, self.args), command_type)
return (None, command_type)

@classmethod
def _unpack(cls, data: tuple[tuple[str, tuple] | None, int]) -> command:
_value, _type = data
if _type == TYPE_WINDOWS:
return windows_command(_value)

return posix_command(_value)
def _pack(self) -> tuple[str, int]:
path_type = TYPE_WINDOWS if self.__path_type is windows_path else TYPE_POSIX
return self.raw, path_type

@classmethod
def from_posix(cls, value: str) -> command:
return posix_command(value)
def _unpack(cls, data: tuple[str, int]) -> command:
raw_str, path_type = data
if path_type == TYPE_POSIX:
return command(raw_str, path_type=posix_path)
if path_type == TYPE_WINDOWS:
return command(raw_str, path_type=windows_path)
# default, infer type of path from str
return command(raw_str)

@classmethod
def from_windows(cls, value: str) -> command:
return windows_command(value)
@property
def executable(self) -> path:
return self.__executable

@property
def args(self) -> tuple[str, ...]:
return self.__args

class posix_command(command):
_posix = True
_path_type = posix_path
@executable.setter
def executable(self, val: str | path | None) -> None:
self.__executable = self.__path_type(val)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with the latest merged PR #200 we should be able to directly initialize path with the value to determine the correct path type. Then you can als get rid of __path_type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean removing the whole __path_type in general? As in that case, there will be issues with for example hello_world.exe that will be interpreted differently depending on what system it gets run on.

If you just mean this specific case, this could still be an issue because the whole type can change from windows to posix path and vise versa. Tho I already have an idea on how to fix that specific issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah wait I thought __path_type was a function to determine the path type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, no. It is a type[path], so either a windows_path or a posix_path :)


@args.setter
def args(self, val: str | tuple[str, ...] | list[str] | None) -> None:
if val is None:
self.__args = ()
return

class windows_command(command):
_posix = False
_path_type = windows_path
if isinstance(val, str):
self.__args = tuple(shlex.split(val, posix=self.__path_type is posix_path))
elif isinstance(val, list):
self.__args = tuple(val)
else:
self.__args = val

def _split(self, value: str) -> tuple[str, list[str]]:
executable, args = super()._split(value)
if args:
args = [" ".join(args)]
@property
def raw(self) -> str:
exe = str(self.executable)

return executable, args
if " " in exe:
exe = shlex.quote(exe)

def _join(self) -> str:
arg = f" {self.args[0]}" if self.args else ""
executable_str = str(self.executable)
result = [exe]
# Only quote on posix paths as shlex doesn't remove the quotes on non posix paths
if self.__path_type is posix_path:
result.extend(shlex.quote(part) if " " in part else part for part in self.args)
else:
result.extend(self.args)
return " ".join(result)

if " " in executable_str:
return f"'{executable_str}'{arg}"
@classmethod
def from_posix(cls, value: str) -> command:
return command(value, path_type=posix_path)

return f"{executable_str}{arg}"
@classmethod
def from_windows(cls, value: str) -> command:
return command(value, path_type=windows_path)
5 changes: 1 addition & 4 deletions flow/record/jsonpacker.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,7 @@ def pack_obj(self, obj: Any) -> dict | str:
if isinstance(obj, fieldtypes.path):
return str(obj)
if isinstance(obj, fieldtypes.command):
return {
"executable": obj.executable,
"args": obj.args,
}
return obj.raw

raise TypeError(f"Unpackable type {type(obj)}")

Expand Down
38 changes: 38 additions & 0 deletions tests/adapter/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pytest

from flow.record import RecordReader, RecordWriter
from flow.record.base import RecordDescriptor
from tests._utils import generate_records

if TYPE_CHECKING:
Expand Down Expand Up @@ -117,3 +118,40 @@ def test_json_adapter_with_record_descriptors(tmp_path: Path, record_adapter_pat
elif record["_type"] == "record":
assert "_recorddescriptor" in record
assert descriptor_seen == 2


def test_json_command_fieldtype(tmp_path: Path) -> None:
json_file = tmp_path.joinpath("records.json")
record_adapter_path = f"jsonfile://{json_file}"
writer = RecordWriter(record_adapter_path)

TestRecord = RecordDescriptor(
"test/command",
[
("command", "commando"),
],
)

writer.write(
TestRecord(
commando="C:\\help.exe data",
)
)
writer.write(
TestRecord(
commando="/usr/bin/env bash",
)
)
writer.write(TestRecord())
writer.flush()

reader = RecordReader(record_adapter_path)
records = list(reader)

assert records[0].commando.executable == "C:\\help.exe"
assert records[0].commando.args == ("data",)

assert records[1].commando.executable == "/usr/bin/env"
assert records[1].commando.args == ("bash",)

assert len(records) == 3
4 changes: 2 additions & 2 deletions tests/adapter/test_xlsx.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ def test_sanitize_field_values(mock_openpyxl_package: MagicMock) -> None:
fieldtypes.net.ipaddress("13.37.13.37"),
["Shaken", "Not", "Stirred"],
fieldtypes.posix_path("/home/user"),
fieldtypes.posix_command("/bin/bash -c 'echo hello world'"),
fieldtypes.command.from_posix("/bin/bash -c 'echo hello world'"),
fieldtypes.windows_path("C:\\Users\\user\\Desktop"),
fieldtypes.windows_command("C:\\Some.exe /?"),
fieldtypes.command.from_windows("C:\\Some.exe /?"),
]
)
) == [
Expand Down
Loading