Skip to content
6 changes: 3 additions & 3 deletions flow/record/adapter/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ def __init__(
retry_on_timeout=True,
max_retries=self.max_retries,
)

self.json_packer = JsonRecordPacker()
self.json_packer = JsonRecordPacker(pack_descriptors=False)

self.thread = threading.Thread(target=self.streaming_bulk_thread)
self.thread.start()
Expand All @@ -122,7 +122,7 @@ def excepthook(self, exc: threading.ExceptHookArgs, *args, **kwargs) -> None:

def record_to_document(self, record: Record, index: str) -> dict:
"""Convert a record to a Elasticsearch compatible document dictionary"""
rdict = record._asdict()
rdict = self.json_packer.pack_obj(record)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible, could the reviewer give a comment on this change? We are unsure what the implications of this are exactly.


# Store record metadata under `_record_metadata`.
rdict_meta = {
Expand Down
4 changes: 4 additions & 0 deletions flow/record/jsonpacker.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ def pack_obj(self, obj: Any) -> dict | str:
if field_type == "boolean" and isinstance(serial[field_name], int):
serial[field_name] = bool(serial[field_name])

# Flatten command type
elif field_type == "command" and isinstance(serial[field_name], fieldtypes.command):
serial[field_name] = serial[field_name]._join()

return serial
if isinstance(obj, RecordDescriptor):
return {
Expand Down
43 changes: 37 additions & 6 deletions tests/test_elastic_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,50 @@
[
("string", "field_one"),
("string", "field_two"),
("string", "field_three"),
],
)

AnotherRecord = RecordDescriptor(
"my/record",
[
("command", "field_one"),
("boolean", "field_two"),
("bytes", "field_three"),
],
)


@pytest.mark.parametrize(
"record",
"record, expected_output",
[
MyRecord("first", "record"),
MyRecord("second", "record"),
(
MyRecord("first", "record", "!"),
{
"field_one": "first",
"field_two": "record",
"field_three": "!",
},
),
(
MyRecord("second", "record", "!"),
{
"field_one": "second",
"field_two": "record",
"field_three": "!",
},
),
(
AnotherRecord("/bin/bash -c 'echo hello'", False, b"\x01\x02\x03\x04"),
{
"field_one": "/bin/bash -c 'echo hello'",
"field_two": False,
"field_three": "AQIDBA==",
},
),
],
)
def test_elastic_writer_metadata(record: Record) -> None:
def test_elastic_writer_metadata(record: MyRecord | AnotherRecord, expected_output: dict) -> None:
options = {
"_meta_foo": "some value",
"_meta_bar": "another value",
Expand All @@ -40,8 +72,7 @@ def test_elastic_writer_metadata(record: Record) -> None:
"_index": "some-index",
"_source": json.dumps(
{
"field_one": record.field_one,
"field_two": record.field_two,
**expected_output,
"_record_metadata": {
"descriptor": {
"name": "my/record",
Expand Down
16 changes: 16 additions & 0 deletions tests/test_json_packer.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,19 @@ def test_record_pack_surrogateescape() -> None:

# pack the json string back to a record and make sure it is the same as before
assert packer.unpack(data) == record


def test_record_pack_command_type() -> None:
TestRecord = RecordDescriptor(
"test/record_with_commands",
[
("command", "win_command"),
("command", "nix_command"),
],
)

record = TestRecord(win_command="foo.exe /H /E /L /O", nix_command="/bin/bash -c 'echo hello'")
packer = JsonRecordPacker()
data = packer.pack(record)

assert data.startswith('{"win_command": "foo.exe /H /E /L /O", "nix_command": "/bin/bash -c \'echo hello\'", ')
Loading