Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
4f9c948
WIP
mheffner Nov 21, 2025
2c77e47
framing
mheffner Nov 24, 2025
f0c651a
Move task spawning into fluent receiver
mheffner Nov 24, 2025
6715b49
Push down cancellation token
mheffner Nov 24, 2025
00b64c8
Some renames
mheffner Nov 24, 2025
838b934
Clean up initialization to just logs
mheffner Nov 24, 2025
0e58205
WIP otlp convert
mheffner Nov 25, 2025
ec29571
Cleanups on conversion to OTLP
mheffner Nov 25, 2025
b31a54a
Send async
mheffner Nov 26, 2025
45af058
Fix for single encoding future per conection
mheffner Nov 26, 2025
84f243b
Fix arg parsing, remove printout
mheffner Nov 26, 2025
8210bf3
Remove drop
mheffner Nov 26, 2025
027634c
Remove logging
mheffner Nov 26, 2025
1219c82
Move to feature flag and fix spawning
mheffner Nov 26, 2025
f07ca3f
Fix config
mheffner Nov 27, 2025
60cac37
Switch to direct serde, fix timeout
mheffner Nov 28, 2025
11b56c9
Cleanups and performance
mheffner Nov 28, 2025
ec0b929
Fix tests
mheffner Nov 28, 2025
93eb678
Drain batch on exit
mheffner Nov 28, 2025
d0c3c64
Fix batching for Forward messages
mheffner Nov 30, 2025
128f034
Rename to socket_path to better match
mheffner Dec 1, 2025
ceed7b5
Update readme
mheffner Dec 1, 2025
7a1eefa
Comments
mheffner Dec 1, 2025
3f8a9b8
Send full batch immediately after encoding
mheffner Dec 1, 2025
c001c5c
Lost biasing in refactor
mheffner Dec 1, 2025
5672210
Add counters
mheffner Dec 2, 2025
83aed54
Fix conditional_wait
mheffner Dec 2, 2025
656ad5c
Merge from main.
mheffner Dec 14, 2025
7f75944
Update src/receivers/fluent/receiver.rs
mheffner Dec 14, 2025
3e34fb0
Switch to ownership on record convert
mheffner Dec 14, 2025
1218d49
Remove additional clones
mheffner Dec 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pin-project = "1.1.10"
futures-util = "0.3.31"
rotel_python_processor_sdk = { path = "rotel_python_processor_sdk", optional = true }
pyo3 = { version = "0.24.1", optional = true }
chrono = "0.4.40"
chrono = { version = "0.4.40", features = [ "serde" ] }
serde = { version = "1.0.217", features = ["derive"] }
thiserror = "2.0.12"
lz4_flex = { version = "0.11.3", default-features = false, features = ["std"] }
Expand All @@ -84,6 +84,8 @@ parquet = { version = "55.1.0", optional = true }
arrow = { version = "55.1.0", optional = true }
base64 = { version = "0.22.1", optional = true }
wildcard = "0.3.0"
rmpv = { version = "1.3.0", features = ["with-serde"], optional = true }
rmp-serde = { version = "1.3.0", optional = true }
aws-config = { version = "1.8.6", optional = true }
aws-credential-types = { version = "1.2.6", optional = true }
# Use a custom branch that backdates to otel sdk 0.30.0 (we are not compatible with 0.31.0 yet). Also sort data points + attributes for visual stability
Expand Down Expand Up @@ -113,6 +115,7 @@ prost-build = "0.13.4"

[features]
default = ["rdkafka", "aws_iam"]
fluent_receiver = ["dep:rmpv", "dep:rmp-serde"]
pprof = ["dep:pprof"]
pyo3 = ["dep:pyo3", "dep:rotel_python_processor_sdk"]
rdkafka = ["dep:rdkafka"]
Expand Down
41 changes: 41 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,47 @@ rotel start \
--clickhouse-exporter-endpoint "https://clickhouse.example.com:8443"
```

### Fluent Receiver configuration

_The Fluent Receiver is currently only included when built with the opt-in feature `--features fluent_receiver`._

The Fluent Receiver allows Rotel to receive telemetry data in Fluentd/Fluent Bit [forward protocol format](https://chronosphere.io/learn/forward-protocol-fluentd-fluent-bit/). This enables compatibility with existing Fluentd and Fluent Bit deployments, allowing them to send logs directly to Rotel for processing and export. Select the Fluent receiver with the option `--receiver fluent`.

The receiver supports both UNIX domain sockets and TCP endpoints, converting incoming Fluent messages to OpenTelemetry logs format.

| Option | Default | Description |
|-------------------------------------|---------|---------------------------------------------------------|
| --fluent-receiver-socket-path | | Path to UNIX socket file for receiving Fluent messages |
| --fluent-receiver-endpoint | | TCP endpoint to bind (e.g., 127.0.0.1:24224) |

**Note**: At least one of `--fluent-receiver-socket-path` or `--fluent-receiver-endpoint` must be specified when using the Fluent
receiver.

**Example Usage**:

```bash
# Using UNIX socket
rotel start \
--receiver fluent \
--fluent-receiver-socket-path /var/run/rotel-fluent.sock \
[...exporter args]

# Using TCP endpoint
rotel start \
--receiver fluent \
--fluent-receiver-endpoint 127.0.0.1:24224 \
[...exporter args]

# Using both socket and TCP
rotel start \
--receiver fluent \
--fluent-receiver-socket-path /var/run/rotel-fluent.sock \
--fluent-receiver-endpoint 0.0.0.0:24224 \
[...exporter args]
```

_Compression and message acknowledgement are not supported at the moment._

### Batch configuration

You can configure the properties of the batch processor, controlling both the size limit of the batch and how long the
Expand Down
7 changes: 7 additions & 0 deletions src/init/activation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ fn all_traces_receivers_disabled(rc: &HashMap<Receiver, ReceiverConfig>) -> bool
return false;
}
}
#[cfg(feature = "fluent_receiver")]
ReceiverConfig::Fluent(_) => {}
}
}
true
Expand All @@ -84,6 +86,9 @@ fn all_metrics_receivers_disabled(rc: &HashMap<Receiver, ReceiverConfig>) -> boo
return false;
}
}

#[cfg(feature = "fluent_receiver")]
ReceiverConfig::Fluent(_) => {}
}
}
true
Expand All @@ -103,6 +108,8 @@ fn all_logs_receivers_disabled(rc: &HashMap<Receiver, ReceiverConfig>) -> bool {
return false;
}
}
#[cfg(feature = "fluent_receiver")]
ReceiverConfig::Fluent(_) => return false,
}
}
true
Expand Down
32 changes: 32 additions & 0 deletions src/init/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use crate::init::datadog_exporter::DatadogRegion;
use crate::init::pprof;
use crate::init::wait;
use crate::listener::Listener;
#[cfg(feature = "fluent_receiver")]
use crate::receivers::fluent::receiver::FluentReceiver;
#[cfg(feature = "rdkafka")]
use crate::receivers::kafka::offset_ack_committer::KafkaOffsetCommitter;
#[cfg(feature = "rdkafka")]
Expand Down Expand Up @@ -908,6 +910,36 @@ impl Agent {
let receivers_cancel = receivers_cancel.clone();
receivers_task_set.spawn(async move { kafka.run(receivers_cancel).await });
}
#[cfg(feature = "fluent_receiver")]
ReceiverConfig::Fluent(config) => {
let fluent = FluentReceiver::new(config.clone(), logs_output.clone()).await?;

let mut fluent_task_set = JoinSet::new();
// Fluent receiver may spawn multiple listener tasks
fluent
.start(&mut fluent_task_set, &receivers_cancel)
.await?;

let receivers_cancel = receivers_cancel.clone();
receivers_task_set.spawn(async move {
loop {
select! {
e = wait::wait_for_any_task(&mut fluent_task_set) => {
match e {
Ok(()) => {
info!("Unexpected early exit of fluent receiver task.");
},
Err(e) => break Err(e),
}
},
_ = receivers_cancel.cancelled() => {
// Wait up to 500 millis for fluent tasks to finish
break wait::wait_for_tasks_with_timeout(&mut fluent_task_set, Duration::from_millis(500)).await;
}
}
}
});
}
}
}

Expand Down
12 changes: 12 additions & 0 deletions src/init/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use crate::init::clickhouse_exporter::ClickhouseExporterArgs;
use crate::init::datadog_exporter::DatadogExporterArgs;
#[cfg(feature = "file_exporter")]
use crate::init::file_exporter::FileExporterArgs;
#[cfg(feature = "fluent_receiver")]
use crate::init::fluent_receiver::FluentReceiverArgs;
#[cfg(feature = "rdkafka")]
use crate::init::kafka_exporter::KafkaExporterArgs;
#[cfg(feature = "rdkafka")]
Expand Down Expand Up @@ -60,6 +62,10 @@ pub struct AgentRun {
#[cfg(feature = "rdkafka")]
pub kafka_receiver: KafkaReceiverArgs,

#[command(flatten)]
#[cfg(feature = "fluent_receiver")]
pub fluent_receiver: FluentReceiverArgs,

/// Single receiver (type)
#[arg(value_enum, long, env = "ROTEL_RECEIVER")]
pub receiver: Option<Receiver>,
Expand Down Expand Up @@ -153,6 +159,8 @@ impl Default for AgentRun {
otlp_receiver: OTLPReceiverArgs::default(),
#[cfg(feature = "rdkafka")]
kafka_receiver: KafkaReceiverArgs::default(),
#[cfg(feature = "fluent_receiver")]
fluent_receiver: FluentReceiverArgs::default(),
otlp_with_trace_processor: Vec::new(),
otlp_with_logs_processor: Vec::new(),
otlp_with_metrics_processor: Vec::new(),
Expand Down Expand Up @@ -250,6 +258,8 @@ pub enum Receiver {
Otlp,
#[cfg(feature = "rdkafka")]
Kafka,
#[cfg(feature = "fluent_receiver")]
Fluent,
}

impl FromStr for Receiver {
Expand All @@ -259,6 +269,8 @@ impl FromStr for Receiver {
"otlp" => Ok(Receiver::Otlp),
#[cfg(feature = "rdkafka")]
"kafka" => Ok(Receiver::Kafka),
#[cfg(feature = "fluent_receiver")]
"fluent" => Ok(Receiver::Fluent),
_ => Err("Unknown receiver"),
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/init/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use crate::init::otlp_exporter::{
};
use crate::init::parse::parse_bool_value;
use crate::init::xray_exporter::XRayExporterArgs;
#[cfg(feature = "fluent_receiver")]
use crate::receivers::fluent::config::FluentReceiverConfig;
#[cfg(feature = "rdkafka")]
use crate::receivers::kafka::config::KafkaReceiverConfig;
use crate::receivers::otlp::OTLPReceiverConfig;
Expand Down Expand Up @@ -200,6 +202,8 @@ pub(crate) enum ReceiverConfig {
Otlp(OTLPReceiverConfig),
#[cfg(feature = "rdkafka")]
Kafka(KafkaReceiverConfig),
#[cfg(feature = "fluent_receiver")]
Fluent(FluentReceiverConfig),
}

impl TryIntoConfig for ExporterArgs {
Expand Down Expand Up @@ -598,6 +602,8 @@ fn get_receiver_config(config: &AgentRun, receiver: Receiver) -> ReceiverConfig
Receiver::Otlp => ReceiverConfig::Otlp(OTLPReceiverConfig::from(&config.otlp_receiver)),
#[cfg(feature = "rdkafka")]
Receiver::Kafka => ReceiverConfig::Kafka(config.kafka_receiver.build_config()),
#[cfg(feature = "fluent_receiver")]
Receiver::Fluent => ReceiverConfig::Fluent(config.fluent_receiver.build_config()),
}
}

Expand Down
38 changes: 38 additions & 0 deletions src/init/fluent_receiver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// SPDX-License-Identifier: Apache-2.0

use crate::init::parse;
use crate::receivers::fluent::config::FluentReceiverConfig;
use clap::Args;
use serde::Deserialize;
use std::net::SocketAddr;
use std::path::PathBuf;

#[derive(Debug, Args, Clone, Deserialize)]
#[serde(default)]
pub struct FluentReceiverArgs {
/// Path to the UNIX socket file for Fluent receiver
#[arg(long, env = "ROTEL_FLUENT_RECEIVER_SOCKET_PATH")]
pub fluent_receiver_socket_path: Option<PathBuf>,

/// TCP endpoint for Fluent receiver (e.g., 127.0.0.1:23890)
#[arg(long, env = "ROTEL_FLUENT_RECEIVER_ENDPOINT", value_parser = parse::parse_endpoint)]
pub fluent_receiver_endpoint: Option<SocketAddr>,
}

impl Default for FluentReceiverArgs {
fn default() -> Self {
Self {
fluent_receiver_socket_path: None,
fluent_receiver_endpoint: None,
}
}
}

impl FluentReceiverArgs {
pub fn build_config(&self) -> FluentReceiverConfig {
FluentReceiverConfig::new(
self.fluent_receiver_socket_path.clone(),
self.fluent_receiver_endpoint,
)
}
}
3 changes: 3 additions & 0 deletions src/init/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ pub mod wait;
#[cfg(feature = "rdkafka")]
mod kafka_receiver;

#[cfg(feature = "fluent_receiver")]
mod fluent_receiver;

mod awsemf_exporter;
mod clickhouse_exporter;
mod datadog_exporter;
Expand Down
32 changes: 32 additions & 0 deletions src/receivers/fluent/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// SPDX-License-Identifier: Apache-2.0

use std::net::SocketAddr;
use std::path::PathBuf;

/// Configuration for the Fluent receiver
#[derive(Debug, Clone)]
pub struct FluentReceiverConfig {
/// Path to the UNIX socket file (optional)
pub socket_path: Option<PathBuf>,

/// TCP endpoint to bind to (optional)
pub endpoint: Option<SocketAddr>,
}

impl Default for FluentReceiverConfig {
fn default() -> Self {
Self {
socket_path: None,
endpoint: None,
}
}
}

impl FluentReceiverConfig {
pub fn new(socket_path: Option<PathBuf>, endpoint: Option<SocketAddr>) -> Self {
Self {
socket_path,
endpoint,
}
}
}
Loading