Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions flow/record/adapter/csvfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,20 @@
from flow.record.adapter import AbstractReader, AbstractWriter
from flow.record.base import Record, normalize_fieldname
from flow.record.selector import make_selector
from flow.record.utils import is_stdout
from flow.record.utils import boolean_argument, is_stdout

if TYPE_CHECKING:
from collections.abc import Iterator

__usage__ = """
Comma-separated values (CSV) adapter
---
Write usage: rdump -w csvfile://[PATH]?lineterminator=[TERMINATOR]
Write usage: rdump -w csvfile://[PATH]?lineterminator=[TERMINATOR]&header=[HEADER]
Read usage: rdump csvfile://[PATH]?fields=[FIELDS]
[PATH]: path to file. Leave empty or "-" to output to stdout

Optional parameters:
[HEADER]: if set to false, it will not print the CSV header (default: true)
[TERMINATOR]: line terminator, default is \\r\\n
[FIELDS]: comma-separated list of CSV fields (in case of missing CSV header)
"""
Expand All @@ -34,6 +35,7 @@ def __init__(
fields: str | list[str] | None = None,
exclude: str | list[str] | None = None,
lineterminator: str = "\r\n",
header: str = "true",
**kwargs,
):
self.fp = None
Expand All @@ -52,13 +54,16 @@ def __init__(
self.fields = self.fields.split(",")
if isinstance(self.exclude, str):
self.exclude = self.exclude.split(",")
self.header = boolean_argument(header)

def write(self, r: Record) -> None:
rdict = r._asdict(fields=self.fields, exclude=self.exclude)
if not self.desc or self.desc != r._desc:
self.desc = r._desc
self.writer = csv.DictWriter(self.fp, rdict, lineterminator=self.lineterminator)
self.writer.writeheader()
if self.header:
# Write header only if it is requested
self.writer.writeheader()
self.writer.writerow(rdict)

def flush(self) -> None:
Expand Down
11 changes: 6 additions & 5 deletions flow/record/adapter/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from flow.record.base import Record, RecordDescriptor
from flow.record.fieldtypes import fieldtype_for_value
from flow.record.jsonpacker import JsonRecordPacker
from flow.record.utils import boolean_argument

if TYPE_CHECKING:
from collections.abc import Iterator
Expand Down Expand Up @@ -72,9 +73,9 @@

self.index = index
self.uri = uri
verify_certs = str(verify_certs).lower() in ("1", "true")
http_compress = str(http_compress).lower() in ("1", "true")
self.hash_record = str(hash_record).lower() in ("1", "true")
verify_certs = boolean_argument(verify_certs)
http_compress = boolean_argument(http_compress)
self.hash_record = boolean_argument(hash_record)
queue_size = int(queue_size)

if not uri.lower().startswith(("http://", "https://")):
Expand Down Expand Up @@ -216,8 +217,8 @@
self.index = index
self.uri = uri
self.selector = selector
verify_certs = str(verify_certs).lower() in ("1", "true")
http_compress = str(http_compress).lower() in ("1", "true")
verify_certs = boolean_argument(verify_certs)
http_compress = boolean_argument(http_compress)

Check warning on line 221 in flow/record/adapter/elastic.py

View check run for this annotation

Codecov / codecov/patch

flow/record/adapter/elastic.py#L220-L221

Added lines #L220 - L221 were not covered by tests

if not uri.lower().startswith(("http://", "https://")):
uri = "http://" + uri
Expand Down
4 changes: 2 additions & 2 deletions flow/record/adapter/jsonfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from flow.record.adapter import AbstractReader, AbstractWriter
from flow.record.fieldtypes import fieldtype_for_value
from flow.record.selector import make_selector
from flow.record.utils import is_stdout
from flow.record.utils import boolean_argument, is_stdout

if TYPE_CHECKING:
from collections.abc import Iterator
Expand All @@ -33,7 +33,7 @@ class JsonfileWriter(AbstractWriter):
def __init__(
self, path: str | Path | BinaryIO, indent: str | int | None = None, descriptors: bool = True, **kwargs
):
self.descriptors = str(descriptors).lower() in ("true", "1")
self.descriptors = boolean_argument(descriptors)
self.fp = record.open_path_or_stream(path, "w")
if isinstance(indent, str):
indent = int(indent)
Expand Down
4 changes: 2 additions & 2 deletions flow/record/adapter/splunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from flow.record.adapter import AbstractReader, AbstractWriter
from flow.record.jsonpacker import JsonRecordPacker
from flow.record.utils import to_base64, to_bytes, to_str
from flow.record.utils import boolean_argument, to_base64, to_bytes, to_str

if TYPE_CHECKING:
from flow.record.base import Record
Expand Down Expand Up @@ -218,7 +218,7 @@ def __init__(
self.token = f"Splunk {self.token}"

# Assume verify=True unless specified otherwise.
self.verify = str(ssl_verify).lower() not in ("0", "false")
self.verify = boolean_argument(ssl_verify)
if not self.verify:
log.warning("Certificate verification is disabled")

Expand Down
20 changes: 17 additions & 3 deletions flow/record/tools/rdump.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ def main(argv: list[str] | None = None) -> int:
output.add_argument("--skip", metavar="COUNT", type=int, default=0, help="Skip the first COUNT records")
output.add_argument("-w", "--writer", metavar="OUTPUT", default=None, help="Write records to output")
output.add_argument(
"-m", "--mode", default=None, choices=("csv", "json", "jsonlines", "line", "line-verbose"), help="Output mode"
"-m",
"--mode",
default=None,
choices=("csv", "csv-no-header", "json", "jsonlines", "line", "line-verbose"),
help="Output mode",
)
output.add_argument(
"--split", metavar="COUNT", default=None, type=int, help="Write record files smaller than COUNT records"
Expand Down Expand Up @@ -180,6 +184,15 @@ def main(argv: list[str] | None = None) -> int:
default=argparse.SUPPRESS,
help="Short for --mode=line-verbose",
)
aliases.add_argument(
"-Cn",
"--csv-no-header",
action="store_const",
const="csv-no-header",
dest="mode",
default=argparse.SUPPRESS,
help="Short for --mode=csv-no-header",
)

args = parser.parse_args(argv)

Expand All @@ -198,6 +211,7 @@ def main(argv: list[str] | None = None) -> int:
if not args.writer:
mode_to_uri = {
"csv": "csvfile://",
"csv-no-header": "csvfile://?header=false",
"json": "jsonfile://?indent=2&descriptors=false",
"jsonlines": "jsonfile://?descriptors=false",
"line": "line://",
Expand All @@ -210,7 +224,7 @@ def main(argv: list[str] | None = None) -> int:
"format_spec": args.format,
}
query = urlencode({k: v for k, v in qparams.items() if v})
uri += "&" if urlparse(uri).query else "?" + query
uri += f"&{query}" if urlparse(uri).query else f"?{query}"

if args.split:
if not args.writer:
Expand All @@ -221,7 +235,7 @@ def main(argv: list[str] | None = None) -> int:
query_dict = dict(parse_qsl(parsed.query))
query_dict.update({"count": args.split, "suffix-length": args.suffix_length})
query = urlencode(query_dict)
uri = parsed.scheme + "://" + parsed.netloc + parsed.path + "?" + query
uri = f"{parsed.scheme}://{parsed.netloc}{parsed.path}?{query}"

record_field_rewriter = None
if fields or fields_to_exclude or args.exec_expression:
Expand Down
29 changes: 29 additions & 0 deletions flow/record/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,32 @@ def remove_handler(self, callback: Callable[..., None]) -> None:
def __call__(self, *args, **kwargs) -> None:
for h in self.handlers:
h(*args, **kwargs)


def boolean_argument(value: str | bool | int) -> bool:
"""Convert a string, boolean, or integer to a boolean value.

This function interprets various string representations of boolean values,
such as "true", "false", "1", "0", "yes", "no".
It also accepts boolean and integer values directly.

Arguments:
value: The value to convert. Can be a string, boolean, or integer.

Returns:
bool: The converted boolean value.

Raises:
ValueError: If the value cannot be interpreted as a boolean.
"""
if isinstance(value, bool):
return value
if isinstance(value, int):
return bool(value)
if isinstance(value, str):
value = value.lower()
if value in ("true", "1", "y", "yes", "on"):
return True
if value in ("false", "0", "n", "no", "off"):
return False
raise ValueError(f"Invalid boolean argument: {value}")
2 changes: 1 addition & 1 deletion tests/test_record_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def test_record_adapter_archive(tmp_path: Path) -> None:

# defaults to always archive by /YEAR/MONTH/DAY/ dir structure
outdir = tmp_path.joinpath(f"{dt:%Y/%m/%d}")
assert len(list(outdir.iterdir()))
assert list(outdir.iterdir())

# read the archived records and test filename and counts
count2 = 0
Expand Down
20 changes: 20 additions & 0 deletions tests/test_regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import sys
from datetime import datetime, timezone
from io import BytesIO
from pathlib import Path
from typing import Callable
from unittest.mock import MagicMock, patch

Expand Down Expand Up @@ -691,5 +692,24 @@ def test_record_writer_default_stdout(capsysbinary: pytest.CaptureFixture) -> No
assert stdout.startswith(b"\x00\x00\x00\x0f\xc4\rRECORDSTREAM\n")


def test_rdump_selected_fields(capsysbinary: pytest.CaptureFixture) -> None:
"""Test rdump regression where selected fields was not propagated properly to adapter."""

# Pastebin record used for this test
example_records_json_path = Path(__file__).parent.parent / "examples" / "records.json"

# rdump --fields key,title,syntax --csv
rdump.main([str(example_records_json_path), "--fields", "key,title,syntax", "--csv"])
captured = capsysbinary.readouterr()
assert captured.err == b""
assert captured.out == b"key,title,syntax\r\nQ42eWSaF,A sample pastebin record,text\r\n"

# rdump --fields key,title,syntax --csv
rdump.main([str(example_records_json_path), "--fields", "key,title,syntax", "--csv-no-header"])
captured = capsysbinary.readouterr()
assert captured.err == b""
assert captured.out == b"Q42eWSaF,A sample pastebin record,text\r\n"


if __name__ == "__main__":
__import__("standalone_test").main(globals())
25 changes: 25 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import pytest

from flow.record.utils import boolean_argument


def test_boolean_argument() -> None:
assert boolean_argument("True") is True
assert boolean_argument("true") is True
assert boolean_argument("trUe") is True
assert boolean_argument("False") is False
assert boolean_argument("false") is False
assert boolean_argument("1") is True
assert boolean_argument("0") is False
assert boolean_argument("yes") is True
assert boolean_argument("no") is False
assert boolean_argument("y") is True
assert boolean_argument("n") is False
assert boolean_argument("on") is True
assert boolean_argument("off") is False
assert boolean_argument(True) is True
assert boolean_argument(False) is False
assert boolean_argument(1) is True
assert boolean_argument(0) is False
with pytest.raises(ValueError, match="Invalid boolean argument: .*"):
boolean_argument("maybe")