-
Notifications
You must be signed in to change notification settings - Fork 89
Add file descriptor and fdinfo parsing to Linux #1609
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -154,6 +154,48 @@ class Environ: | |||||
| variable: str | ||||||
| contents: str | ||||||
|
|
||||||
| class FileDescriptor: | ||||||
| def __init__(self, proc: Path, fd_num: str): | ||||||
| self.proc = proc | ||||||
| self.number = fd_num | ||||||
|
|
||||||
| @property | ||||||
| def path(self) -> Path: | ||||||
| """The path to the file descriptor symlink.""" | ||||||
| return self.proc.joinpath(self.number) | ||||||
|
|
||||||
| @property | ||||||
| def info_path(self) -> Path: | ||||||
| """The path to the fdinfo entry.""" | ||||||
| return self.proc.parent.joinpath("fdinfo", self.number) | ||||||
|
|
||||||
| @cached_property | ||||||
| def link(self) -> str: | ||||||
| """Returns the resolved symlink.""" | ||||||
| try: | ||||||
| return str(self.path.readlink()) | ||||||
| except Exception: | ||||||
| return "unknown" | ||||||
|
|
||||||
| @cached_property | ||||||
| def info(self)-> dict[str, str]: | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think putting this into a Also, please document (in the dataclass) what the specific
Suggested change
|
||||||
| """Parsed key-value pairs from fdinfo.""" | ||||||
| data = {} | ||||||
| if not self.info_path.exists(): | ||||||
| return data | ||||||
|
|
||||||
| for line in self.info_path.read_text().split("\n"): | ||||||
| line = line.strip() | ||||||
| if not line: | ||||||
| continue | ||||||
|
|
||||||
| parts = line.split(None, 1) | ||||||
| if len(parts) != 2: | ||||||
| continue | ||||||
| key, value = parts | ||||||
| data[key] = value | ||||||
|
|
||||||
| return data | ||||||
|
|
||||||
| class ProcessStateEnum(StrEnum): | ||||||
| R = "Running" # Running | ||||||
|
|
@@ -488,6 +530,15 @@ def _parse_environ(self) -> Iterator[Environ]: | |||||
|
|
||||||
| yield Environ(variable, contents) | ||||||
|
|
||||||
| def _parse_fd(self) -> Iterator[FileDescriptor]: | ||||||
| """Internal function to parse entries in ``/proc/[pid]/fd/[fd_num]`` and ``/proc/[pid]/fdinfo/[fd_num]``.""" | ||||||
| if not (fd_path := self.get("fd")).exists(): | ||||||
| return | ||||||
|
|
||||||
| for fd_link in fd_path.iterdir(): | ||||||
| yield FileDescriptor(fd_path, fd_link.name) | ||||||
|
|
||||||
|
|
||||||
| @property | ||||||
| def _boottime(self) -> int | None: | ||||||
| """Returns the boot time of the system. | ||||||
|
|
@@ -583,6 +634,10 @@ def environ(self) -> Iterator[Environ]: | |||||
| """Yields the content of the environ file associated with the process.""" | ||||||
| yield from self._parse_environ() | ||||||
|
|
||||||
| def fd(self) -> Iterator[FileDescriptor]: | ||||||
| """Yields the file descriptors associated with the process.""" | ||||||
| yield from self._parse_fd() | ||||||
|
|
||||||
| @property | ||||||
| def uptime(self) -> timedelta: | ||||||
| """Returns the uptime of the system from the moment it was acquired.""" | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,6 +30,11 @@ def test_process(target_linux_users: Target, fs_linux_proc: VirtualFilesystem) - | |
| assert environ[0].variable == "VAR" | ||
| assert environ[0].contents == "1" | ||
|
|
||
| fd = list(process.fd()) | ||
| assert "socket" in fd[0].link.name | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This not fails due to |
||
| assert fd[0].info["pos"] == "0" | ||
| assert fd[0].info["flags"] == "00000002" | ||
|
|
||
|
|
||
| def test_process_not_found(target_linux_users: Target, fs_linux_proc: VirtualFilesystem) -> None: | ||
| target_linux_users.add_plugin(ProcPlugin) | ||
|
|
@@ -52,6 +57,11 @@ def test_processes(target_linux_users: Target, fs_linux_proc: VirtualFilesystem) | |
| assert env.variable == "VAR" | ||
| assert env.contents == "1" | ||
|
|
||
| for fd in process.fd(): | ||
| assert "socket" in fd.link.name | ||
| assert fd.info["pos"] == "0" | ||
| assert fd.info["flags"] == "00000002" | ||
|
|
||
|
|
||
| def test_processes_without_boottime(target_linux_users: Target, fs_linux_proc: VirtualFilesystem) -> None: | ||
| target_linux_users.add_plugin(ProcPlugin) | ||
|
|
@@ -70,6 +80,11 @@ def test_processes_without_boottime(target_linux_users: Target, fs_linux_proc: V | |
| assert env.variable == "VAR" | ||
| assert env.contents == "1" | ||
|
|
||
| for fd in process.fd(): | ||
| assert "socket" in fd.link.name | ||
| assert fd.info["pos"] == "0" | ||
| assert fd.info["flags"] == "00000002" | ||
|
|
||
|
|
||
| def test_proc_plugin_incompatible(target_linux_users: Target, fs_linux: VirtualFilesystem) -> None: | ||
| with pytest.raises(UnsupportedPluginError, match="No /proc directory found"): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is
unknowna value used in/proc/[pid]/fdsymlink targets? If not, better to keep it as close toprocfsas possible. Your test case also fails now because you cast this to a string.Additionaly, can you implement a test case for when the
Exceptionoccurs?