Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added examples/__init__.py
Empty file.
57 changes: 28 additions & 29 deletions examples/filesystem.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import os
import stat
from __future__ import annotations

from datetime import datetime
import stat
from pathlib import Path
from typing import TYPE_CHECKING

from flow.record import RecordDescriptor, RecordWriter

if TYPE_CHECKING:
from collections.abc import Iterator


descriptor = """
filesystem/unix/entry
string path;
Expand All @@ -22,34 +27,32 @@
FilesystemFile = RecordDescriptor(descriptor)


def hash_file(path, t):
f = open(path, "rb")
while 1:
d = f.read(4096)
if d == "":
break
f.close()
def hash_file(path: str | Path) -> None:
with Path(path).open("rb") as f:
while True:
d = f.read(4096)
if not d:
break


class FilesystemIterator:
basepath = None

def __init__(self, basepath):
def __init__(self, basepath: str | None):
self.basepath = basepath
self.recordType = FilesystemFile

def classify(self, source, classification):
def classify(self, source: str, classification: str) -> None:
self.recordType = FilesystemFile.base(_source=source, _classification=classification)

def iter(self, path):
path = os.path.abspath(path)
return self._iter(path)
def iter(self, path: str | Path) -> Iterator[FilesystemFile]:
return self._iter(Path(path).resolve())

def _iter(self, path):
if path.startswith("/proc"):
def _iter(self, path: Path) -> Iterator[FilesystemFile]:
if path.is_relative_to("/proc"):
return

st = os.lstat(path)
st = path.lstat()

abspath = path
if self.basepath and abspath.startswith(self.basepath):
Expand All @@ -59,7 +62,7 @@ def _iter(self, path):

link = None
if ifmt == stat.S_IFLNK:
link = os.readlink(path)
link = path.readlink()

yield self.recordType(
path=abspath,
Expand All @@ -69,20 +72,16 @@ def _iter(self, path):
size=st.st_size,
uid=st.st_uid,
gid=st.st_gid,
ctime=datetime.fromtimestamp(st.st_ctime),
mtime=datetime.fromtimestamp(st.st_mtime),
atime=datetime.fromtimestamp(st.st_atime),
ctime=st.st_ctime,
mtime=st.st_mtime,
atime=st.st_atime,
link=link,
)

if ifmt == stat.S_IFDIR:
for i in os.listdir(path):
if i in (".", ".."):
continue

fullpath = os.path.join(path, i)
for e in self.iter(fullpath):
yield e
for i in path.iterdir():
fullpath = path.joinpath(i)
yield from self.iter(fullpath)


chunk = []
Expand Down
21 changes: 12 additions & 9 deletions examples/passivedns.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
#!/usr/bin/env pypy
import record
from __future__ import annotations

import sys
import datetime
from datetime import datetime, timezone

import net.ipv4

import record
from fileprocessing import DirectoryProcessor

UTC_TIMEZONE = timezone.utc


def ts(s):
return datetime.datetime.fromtimestamp(float(s))
def ts(s: float) -> datetime:
return datetime.fromtimestamp(float(s), tz=UTC_TIMEZONE)


def ip(s):
def ip(s: str) -> net.ipv4.Address:
return net.ipv4.Address(s)


Expand All @@ -21,7 +24,7 @@ class SeparatedFile:
seperator = None
format = None

def __init__(self, fp, seperator, format):
def __init__(self, fp: list[str], seperator: str | None, format: list[tuple]):
self.fp = fp
self.seperator = seperator
self.format = format
Expand All @@ -46,7 +49,7 @@ def __iter__(self):
yield recordtype(**r)


def PassiveDnsFile(fp):
def PassiveDnsFile(fp: list[str]) -> SeparatedFile:
return SeparatedFile(fp, "||", PASSIVEDNS_FORMAT)


Expand All @@ -63,7 +66,7 @@ def PassiveDnsFile(fp):
]


def main():
def main() -> None:
rs = record.RecordOutput(sys.stdout)
for r in DirectoryProcessor(sys.argv[1], PassiveDnsFile, r"\.log\.gz"):
rs.write(r)
Expand Down
8 changes: 5 additions & 3 deletions examples/tcpconn.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import random
from datetime import datetime, timezone

from datetime import datetime
from flow import record

UTC_TIMEZONE = timezone.utc

descriptor = """
network/traffic/tcp/connection
datetime ts;
Expand Down Expand Up @@ -32,9 +34,9 @@

rs = record.RecordWriter()

for i in range(500):
for _ in range(500):
r = conn(
ts=datetime.now(),
ts=datetime.now(tz=UTC_TIMEZONE),
src=random.choice(ip_list),
srcport=random.choice(port_list),
dst=random.choice(ip_list),
Expand Down
6 changes: 2 additions & 4 deletions flow/record/adapter/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import threading
from typing import TYPE_CHECKING

import urllib3

try:
import elasticsearch
import elasticsearch.helpers
Expand Down Expand Up @@ -103,8 +105,6 @@ def __init__(

if not verify_certs:
# Disable InsecureRequestWarning of urllib3, caused by the verify_certs flag.
import urllib3

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

self.metadata_fields = {}
Expand Down Expand Up @@ -235,8 +235,6 @@ def __init__(

if not verify_certs:
# Disable InsecureRequestWarning of urllib3, caused by the verify_certs flag.
import urllib3

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

def __iter__(self) -> Iterator[Record]:
Expand Down
1 change: 1 addition & 0 deletions flow/record/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ def _asdict(self, fields: list[str] | None = None, exclude: list[str] | None = N
return OrderedDict((k, getattr(self, k)) for k in self.__slots__ if k not in exclude)

if TYPE_CHECKING:

def __getattr__(self, name: str) -> Any: ...

def __setattr__(self, k: str, v: Any) -> None:
Expand Down
3 changes: 1 addition & 2 deletions flow/record/tools/rdump.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python
from __future__ import annotations

import argparse
import logging
import sys
from importlib import import_module
Expand Down Expand Up @@ -69,8 +70,6 @@ def list_adapters() -> None:

@catch_sigpipe
def main(argv: list[str] | None = None) -> int:
import argparse

parser = argparse.ArgumentParser(
description="Record dumper, a tool that can read, write and filter records",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,14 @@ duckdb = [
splunk = [
"httpx",
]
xlsx = [
"openpyxl",
]
test = [
"flow.record[compression]",
"flow.record[avro]",
"flow.record[elastic]",
"flow.record[xlsx]",
"duckdb; platform_python_implementation != 'PyPy' and python_version < '3.12'", # duckdb
"pytz; platform_python_implementation != 'PyPy' and python_version < '3.12'", # duckdb
"tqdm",
Expand Down
2 changes: 1 addition & 1 deletion tests/standalone_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import traceback
from typing import Callable


Expand All @@ -14,6 +15,5 @@ def main(glob: dict[str, Callable[..., None]]) -> None:
print("PASSED")
except Exception:
print("FAILED")
import traceback

traceback.print_exc()
12 changes: 9 additions & 3 deletions tests/test_fieldtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,15 +376,21 @@ def test_uri_type() -> None:
assert r.path.protocol == "http"
assert r.path.hostname == "example.com"

with pytest.warns(DeprecationWarning):
with pytest.warns(
DeprecationWarning, match=r"Do not use class uri\(...\) for filesystem paths, use class path\(...\)"
):
r = TestRecord(uri.from_windows(r"c:\windows\program files\Fox-IT B.V\flow.exe"))
assert r.path.filename == "flow.exe"

r = TestRecord()
with pytest.warns(DeprecationWarning):
with pytest.warns(
DeprecationWarning, match=r"Do not use class uri\(...\) for filesystem paths, use class path\(...\)"
):
r.path = uri.normalize(r"c:\Users\Fox-IT\Downloads\autoruns.exe")
assert r.path.filename == "autoruns.exe"
with pytest.warns(DeprecationWarning):
with pytest.warns(
DeprecationWarning, match=r"Do not use class uri\(...\) for filesystem paths, use class path\(...\)"
):
assert r.path.dirname == uri.normalize(r"\Users\Fox-IT\Downloads")
assert r.path.dirname == "/Users/Fox-IT/Downloads"

Expand Down
12 changes: 7 additions & 5 deletions tests/test_packer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@ def test_uri_packing() -> None:
],
)

# construct with an url
# Construct with an url
record = TestRecord("http://www.google.com/evil.bin")
data = packer.pack(record)
record = packer.unpack(data)
assert record.path == "http://www.google.com/evil.bin"
assert record.path.filename == "evil.bin"
assert record.path.dirname == "/"

# construct from uri() -> for windows=True
with pytest.warns(DeprecationWarning):
with pytest.warns(
DeprecationWarning, match=r"Do not use class uri\(...\) for filesystem paths, use class path\(...\)"
):
path = uri.from_windows(r"c:\Program Files\Fox-IT\flow is awesome.exe")
record = TestRecord(path)
data = packer.pack(record)
Expand All @@ -40,8 +41,9 @@ def test_uri_packing() -> None:
assert record.path.filename == "flow is awesome.exe"
assert record.path.dirname == "/Program Files/Fox-IT"

# construct using uri.from_windows()
with pytest.warns(DeprecationWarning):
with pytest.warns(
DeprecationWarning, match=r"Do not use class uri\(...\) for filesystem paths, use class path\(...\)"
):
path = uri.from_windows(r"c:\Users\Hello World\foo.bar.exe")
record = TestRecord(path)
data = packer.pack(record)
Expand Down
3 changes: 1 addition & 2 deletions tests/test_rdump.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import flow.record.fieldtypes
from flow.record import RecordDescriptor, RecordReader, RecordWriter
from flow.record.adapter.line import field_types_for_record_descriptor
from flow.record.fieldtypes import flow_record_tz
from flow.record.tools import rdump

Expand Down Expand Up @@ -681,8 +682,6 @@ def test_rdump_line_verbose(tmp_path: Path, capsys: pytest.CaptureFixture, rdump
writer.write(TestRecord(counter=2))
writer.write(TestRecord(counter=3))

from flow.record.adapter.line import field_types_for_record_descriptor

field_types_for_record_descriptor.cache_clear()
assert field_types_for_record_descriptor.cache_info().currsize == 0
rdump.main([str(record_path), *rdump_params])
Expand Down
3 changes: 1 addition & 2 deletions tests/test_xlsx_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pytest

from flow.record import fieldtypes
from flow.record.adapter.xlsx import sanitize_fieldvalues

if TYPE_CHECKING:
from collections.abc import Iterator
Expand All @@ -27,8 +28,6 @@ def mock_openpyxl_package(monkeypatch: pytest.MonkeyPatch) -> Iterator[MagicMock


def test_sanitize_field_values(mock_openpyxl_package: MagicMock) -> None:
from flow.record.adapter.xlsx import sanitize_fieldvalues

assert list(
sanitize_fieldvalues(
[
Expand Down