Skip to content

Conversation

@mheffner
Copy link
Contributor

@mheffner mheffner commented Nov 26, 2025

This is a minimal implementation of the Fluent (forwarder) protocol used by Fluentd, Fluent Bit, Docker's Fluent driver, etc. It is essentially MessagePack logs delivered over streaming TCP or Unix sockets.

Log messages can be sent individually in single message pack structures. Instead of populating single-log OTLP resource record structures, we introduce a small batching component to the fluent receiver. This will continue reading additional messages and construct a single resource record. While we perform batching at the pipeline level, this is after the processing step, so reducing the number of payloads reduces processing calls. This is hard coded at the moment, but we can make the configurable in the future.

Since this is early and experimental, I've feature flagged it initially. Depending on the stability, binary size increase, and usefulness we can move this to a default feature later.

Performance: Tested locally and I see about 125k logs/sec/core with Docker's fluent log driver.

Remaining pieces to follow on:

  • compressed forwarded message support (gzip)
  • delivery acknowledgement (requires sender to opt-in)

@mheffner mheffner marked this pull request as ready for review December 1, 2025 23:57
@mheffner mheffner requested a review from rjenkins December 1, 2025 23:57
let curr_batch = batch.take();

pending_encode = Some(tokio::task::spawn_blocking(move || {
Some(convert_to_otlp_logs(curr_batch))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to consider partitioning of these batches into separate ResourceLogs and ScopeLogs where applicable? Particularly for the TCP connection endpoint? So for example if there is a tag that is "service.name" do we want to hoist that into a Resource attribute and partition these accordingly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question, at the moment this is mostly following the OTEL collector implementation that serializes all tags to the log record attributes. In fluent bit you can assign tags to messages, but I think it's done at the input/output level. Meaning I don't know how often you would see a tag, like service.name, be different for messages received on a single socket connection.

Copy link
Contributor

@rjenkins rjenkins Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so it sounds like we're not messing up the OTel Resource/Scope hierarchy, specifically because we're not sticking anything in them anyway 🙃, feels like hoisting is something we could look at later as a nice to have. With hoisting though, we have to keep partitioning in mind so a bit of a pita. We'll see if anyone asks for it.

let mut batch = Batch::new();

// Track single pending encoding task
let mut pending_encode: Option<tokio::task::JoinHandle<Option<ResourceLogs>>> = None;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a good reason to use a single encoding task vs. an OrderedFutures similar to how we do this in the exporters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this receiver loop is operating per-connection, so if we ended up with 10 concurrent connections we'd have up to 10 concurrent encoding tasks. I guess there were two alternatives we could do, but neither really felt great to me:

  1. Have a shared FuturesOrdered across connections (with some locking). Could this lead to starvation and unfair consumption across connections?
  2. Have a FuturesOrdered per connection. This could lead to a large number of encoding tasks if there were many connections. We would likely want some global max pending futures across all connections to limit memory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, OK so it's essentially unbounded, or bounded only by the number of connections. I think it's fine in the sense that we're not artificially constricting throughput but is there a likelihood this could eat up a lot of heap? IIRC the default max threads for spawn blocking is 512, so in terms of stack space it shouldn't be too much, but if there are 512 decoders running simultaneously I wonder if they'd eat up a lot more memory. When you ran your 125k logs/sec/core how many connections did you test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it looks like all of our http-based receivers could have an unbounded memory allocation explosion. Looking at the OTEL Collector they lean on the MaxConnsPerHost config from the Go Transport to limit concurrent connections. That's sort of a round-a-bout way to handle it, we could go the same way or ideally have a "receiver max memory" control that handles the limitation in a smarter fashion.

// Initiate async send without awaiting
if let Some(logs_output) = &self.logs_output {
let payload_msg = payload::Message::new(None, vec![resource_logs]);
pending_send = Some(logs_output.send_async(payload_msg));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of cool because you essentially end up hoisting this select logic up to the main select! so you don't have to await here and also select on cancellation. However I find managing the state (setting and clearing pending_send, pending_encode) to be quite complex and potentially error prone on refactor. Hopefully we can find patterns to encapsulate this in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it would be nice to layer this into some abstractions so it's less messy and we can handle it similarly across the board.

let mut batch = Batch::new();

// Track single pending encoding task
let mut pending_encode: Option<tokio::task::JoinHandle<Option<ResourceLogs>>> = None;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, OK so it's essentially unbounded, or bounded only by the number of connections. I think it's fine in the sense that we're not artificially constricting throughput but is there a likelihood this could eat up a lot of heap? IIRC the default max threads for spawn blocking is 512, so in terms of stack space it shouldn't be too much, but if there are 512 decoders running simultaneously I wonder if they'd eat up a lot more memory. When you ran your 125k logs/sec/core how many connections did you test?

Some(any_value::Value::BytesValue(s.as_bytes().to_vec()))
}
}
Value::Binary(b) => Some(any_value::Value::BytesValue(b.clone())),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary clone?

Value::Ext(_tag, data) => {
// Extension types are converted to bytes with a special attribute for the tag
// For now, just convert to bytes
Some(any_value::Value::BytesValue(data.clone()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary clone?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled on this a bit more by switching to ownership over reference for the whole conversion. There's no need to maintain reference since we are fully converting here. Anyways, pushed a couple of commits that refactor to this and allow for removing the clones.

Copy link
Contributor

@rjenkins rjenkins left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a couple small fixes/suggestions. Only other concern would be potentially unbounded memory growth for encoders but otherwise looking good.

@mheffner mheffner merged commit ec30d9f into main Dec 16, 2025
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants