forked from vectordotdev/vector
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathkafka.rs
More file actions
108 lines (94 loc) · 3.22 KB
/
kafka.rs
File metadata and controls
108 lines (94 loc) · 3.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
use crate::internal_events::KafkaStatisticsReceived;
use crate::tls::TlsOptions;
use rdkafka::{consumer::ConsumerContext, ClientConfig, ClientContext, Statistics};
use serde::{Deserialize, Serialize};
use snafu::Snafu;
use std::path::{Path, PathBuf};
#[derive(Debug, Snafu)]
enum KafkaError {
#[snafu(display("invalid path: {:?}", path))]
InvalidPath { path: PathBuf },
}
#[derive(Clone, Copy, Debug, Derivative, Deserialize, Serialize)]
#[derivative(Default)]
#[serde(rename_all = "lowercase")]
pub enum KafkaCompression {
#[derivative(Default)]
None,
Gzip,
Snappy,
Lz4,
Zstd,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct KafkaAuthConfig {
pub sasl: Option<KafkaSaslConfig>,
pub tls: Option<KafkaTlsConfig>,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct KafkaSaslConfig {
pub enabled: Option<bool>,
pub username: Option<String>,
pub password: Option<String>,
pub mechanism: Option<String>,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct KafkaTlsConfig {
pub enabled: Option<bool>,
#[serde(flatten)]
pub options: TlsOptions,
}
impl KafkaAuthConfig {
pub fn apply(&self, client: &mut ClientConfig) -> crate::Result<()> {
let sasl_enabled = self.sasl.as_ref().and_then(|s| s.enabled).unwrap_or(false);
let tls_enabled = self.tls.as_ref().and_then(|s| s.enabled).unwrap_or(false);
let protocol = match (sasl_enabled, tls_enabled) {
(false, false) => "plaintext",
(false, true) => "ssl",
(true, false) => "sasl_plaintext",
(true, true) => "sasl_ssl",
};
client.set("security.protocol", protocol);
if sasl_enabled {
let sasl = self.sasl.as_ref().unwrap();
if let Some(username) = &sasl.username {
client.set("sasl.username", username);
}
if let Some(password) = &sasl.password {
client.set("sasl.password", password);
}
if let Some(mechanism) = &sasl.mechanism {
client.set("sasl.mechanism", mechanism);
}
}
if tls_enabled {
let tls = self.tls.as_ref().unwrap();
if let Some(path) = &tls.options.ca_file {
client.set("ssl.ca.location", pathbuf_to_string(path)?);
}
if let Some(path) = &tls.options.crt_file {
client.set("ssl.certificate.location", pathbuf_to_string(path)?);
}
if let Some(path) = &tls.options.key_file {
client.set("ssl.key.location", pathbuf_to_string(path)?);
}
if let Some(pass) = &tls.options.key_pass {
client.set("ssl.key.password", pass);
}
}
Ok(())
}
}
fn pathbuf_to_string(path: &Path) -> crate::Result<&str> {
path.to_str()
.ok_or_else(|| KafkaError::InvalidPath { path: path.into() }.into())
}
pub struct KafkaStatisticsContext;
impl ClientContext for KafkaStatisticsContext {
fn stats(&self, statistics: Statistics) {
emit!(&KafkaStatisticsReceived {
statistics: &statistics
});
}
}
impl ConsumerContext for KafkaStatisticsContext {}