From 966d4a630e61a25e44439555da8334d4b1ee5b96 Mon Sep 17 00:00:00 2001 From: pujichun Date: Thu, 15 Aug 2024 22:28:14 +0800 Subject: [PATCH 1/3] feat: add udp appender --- README-zh.md | 14 ++++++ README.md | 13 +++++ examples/udp-log.rs | 53 ++++++++++++++++++++ src/appender/mod.rs | 1 + src/appender/udp.rs | 114 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 195 insertions(+) create mode 100644 examples/udp-log.rs create mode 100644 src/appender/udp.rs diff --git a/README-zh.md b/README-zh.md index f15aee7..413dd21 100644 --- a/README-zh.md +++ b/README-zh.md @@ -166,6 +166,20 @@ let logger = ftlog::builder() .unwrap(); logger.init().unwrap(); ``` + +### 日志发送到 UDP 服务器 + +将日志发送到 UDP 协议的日志机集中收集 + +```rust +// 目标服务监听的地址 +let target_addr = "127.0.0.1:8080"; +let udp_appender = UdpAppender::builder() + .target(target_addr.parse().unwrap()) + .build(); +ftlog::builder().root(udp_appender).try_init().unwrap(); +``` + ## 可选功能 - **tsc** 使用TSC寄存器作为时钟源,实现同等时间精度下更快地获取时间戳。 diff --git a/README.md b/README.md index 29c7077..891c4aa 100644 --- a/README.md +++ b/README.md @@ -254,6 +254,19 @@ let logger = ftlog::builder() let _guard = logger.init().unwrap(); ``` +### Sending Logs to a UDP Server + +Send logs to a logging server using the UDP protocol + +```rust +// Address that the target server is listening on +let target_addr = "127.0.0.1:8080"; +let udp_appender = UdpAppender::builder() + .target(target_addr.parse().unwrap()) + .build(); +ftlog::builder().root(udp_appender).try_init().unwrap(); +``` + ## Features - **tsc** Use [TSC](https://en.wikipedia.org/wiki/Time_Stamp_Counter) for clock source for higher performance without diff --git a/examples/udp-log.rs b/examples/udp-log.rs new file mode 100644 index 0000000..3131a60 --- /dev/null +++ b/examples/udp-log.rs @@ -0,0 +1,53 @@ +use std::net::UdpSocket; +use std::sync::mpsc; +use std::thread; + +use ftlog::{appender::udp::UdpAppender, info}; +use std::time::Duration; + +fn main() { + let target_addr = "127.0.0.1:8080"; + // Create a UDP appender that sends log records to UDP socket at target address + let udp_appender = UdpAppender::builder() + .target(target_addr.parse().unwrap()) + .build(); + // Use channel to send exit signal to the child thread + let (tx, rx) = mpsc::channel(); + let handler = thread::spawn(move || { + // Create a UDP socket to receive log records + let socket = UdpSocket::bind(target_addr).unwrap(); + let mut buf = [0; 1024]; + // Set read timeout. Receiving will time out when the log sending is complete. + socket + .set_read_timeout(Some(Duration::from_secs(1))) + .unwrap(); + loop { + // Check if exit signal is received + if let Ok(_) = rx.try_recv() { + println!("Child thread received exit signal."); + break; + } + match socket.recv_from(&mut buf) { + Ok((size, _)) => match std::str::from_utf8(&buf[..size]) { + Ok(s) => print!("Received: {:}", s), + Err(e) => println!("Error utf-8 str: {}", e), + }, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + println!("Read timed out."); + } + Err(e) => { + eprintln!("Error receiving data: {}", e); + break; + } + } + } + }); + ftlog::builder().root(udp_appender).try_init().unwrap(); + info!("Hello, world!"); + for i in 0..120 { + info!("running {}!", i); + std::thread::sleep(Duration::from_millis(10)); + } + tx.send(1).unwrap(); + handler.join().unwrap(); +} diff --git a/src/appender/mod.rs b/src/appender/mod.rs index 29fd2c1..03fe912 100644 --- a/src/appender/mod.rs +++ b/src/appender/mod.rs @@ -1,5 +1,6 @@ //! Useful appenders pub mod file; +pub mod udp; pub use file::{FileAppender, Period}; use std::io::Write; diff --git a/src/appender/udp.rs b/src/appender/udp.rs new file mode 100644 index 0000000..44b8985 --- /dev/null +++ b/src/appender/udp.rs @@ -0,0 +1,114 @@ +//! Appender that sends log messages over UDP. +//! +//! UdpAppender is a appender that sends log messages over UDP. +//! It is useful when you want to send log messages to a log server. +//! +//! ```rust +//! use ftlog::appender::udp::UdpAppender; +//! let appender = UdpAppender::builder().target("127.0.0.1:8080".parse().unwrap()).build(); +//! ``` + +use std::io::Write; +use std::net::{SocketAddr, ToSocketAddrs, UdpSocket}; +use std::sync::Arc; +use typed_builder::TypedBuilder; + +/// Appender log messages by UDP. +pub struct UdpAppender { + /// UDP socket + socket: Arc, + /// Target address + target: SocketAddr, +} + +#[derive(TypedBuilder)] +#[builder(build_method(vis = "", name = __build_udp_appender), builder_method(vis = "pub"))] +pub struct UdpAppenderBuilder { + target: SocketAddr, + #[builder(default = "127.0.0.1:0".parse().unwrap())] + bind_addr: SocketAddr, +} + +#[allow(dead_code, non_camel_case_types, missing_docs)] +#[automatically_derived] +impl<__bind_addr: typed_builder::Optional> + UdpAppenderBuilderBuilder<((SocketAddr,), __bind_addr)> +{ + pub fn build(self) -> UdpAppender { + let builder = self.__build_udp_appender(); + // Create a UDP socket and bind to the local address + let socket = UdpSocket::bind(&builder.bind_addr).expect("failed to bind to UDP socket"); + UdpAppender { + socket: Arc::new(socket), + target: builder.target.to_socket_addrs().unwrap().next().unwrap(), + } + } +} + +impl UdpAppender { + /// UdpAppender builder + /// you can configure the remote server address and bind local address + /// ```rust + /// use ftlog::appender::udp::UdpAppender; + /// let appender = UdpAppender::builder() + /// .target("127.0.0.1:8080".parse().unwrap()) + /// .bind_addr("127.0.0.1:0") + /// .build(); + /// ``` + + pub fn builder() -> UdpAppenderBuilderBuilder { + UdpAppenderBuilder::builder() + } + + pub fn new(target: T) -> Self { + Self::builder() + .target(target.to_socket_addrs().unwrap().next().unwrap()) + .build() + } +} + +impl Write for UdpAppender { + /// Write log record to the UDP socket + fn write(&mut self, record: &[u8]) -> std::io::Result { + if record.is_empty() { + return Ok(0); + } + // send the record to the target + self.socket.send_to(record, &self.target)?; + Ok(record.len()) + } + + /// Flush always return Ok, because UDP is connectionless + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + use std::io::Write; + use std::net::UdpSocket; + use std::sync::{Arc, Mutex}; + use std::thread; + #[test] + fn test_udp_appender() { + let mut appender = UdpAppender::new("127.0.0.1:8080"); + let flag = Arc::new(Mutex::new(false)); + let flag_clone = flag.clone(); + // create a thread to receive the record + let handler = thread::spawn(move || { + let socket = UdpSocket::bind("127.0.0.1:8080").unwrap(); + // socket recv + let mut buf = [0; 1024]; + let (size, _) = socket.recv_from(&mut buf).unwrap(); + let mut f = flag_clone.lock().unwrap(); + // check the record + *f = &buf[..size] == b"hello"; + }); + // write record + appender.write("hello".as_bytes()).unwrap(); + handler.join().unwrap(); + assert!(*flag.clone().lock().unwrap()); + } +} From 1b557ee39684242afbfa56846b6af84d82f46fe5 Mon Sep 17 00:00:00 2001 From: pujichun Date: Thu, 29 Aug 2024 15:36:24 +0800 Subject: [PATCH 2/3] fix(appender): fix udp appender doctest error --- src/appender/udp.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/appender/udp.rs b/src/appender/udp.rs index 44b8985..73b05dd 100644 --- a/src/appender/udp.rs +++ b/src/appender/udp.rs @@ -40,7 +40,7 @@ impl<__bind_addr: typed_builder::Optional> let socket = UdpSocket::bind(&builder.bind_addr).expect("failed to bind to UDP socket"); UdpAppender { socket: Arc::new(socket), - target: builder.target.to_socket_addrs().unwrap().next().unwrap(), + target: builder.target, } } } @@ -52,7 +52,7 @@ impl UdpAppender { /// use ftlog::appender::udp::UdpAppender; /// let appender = UdpAppender::builder() /// .target("127.0.0.1:8080".parse().unwrap()) - /// .bind_addr("127.0.0.1:0") + /// .bind_addr("127.0.0.1:0".parse().unwrap()) /// .build(); /// ``` From 796dcf24e088f72eafc36babc57bf5d94a03987c Mon Sep 17 00:00:00 2001 From: chun Date: Sat, 7 Sep 2024 12:27:27 +0800 Subject: [PATCH 3/3] fix: fix `test_udp_appender` timeout --- src/appender/udp.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/appender/udp.rs b/src/appender/udp.rs index 73b05dd..a72907e 100644 --- a/src/appender/udp.rs +++ b/src/appender/udp.rs @@ -89,6 +89,7 @@ mod test { use super::*; use std::io::Write; use std::net::UdpSocket; + use std::sync::mpsc::channel; use std::sync::{Arc, Mutex}; use std::thread; #[test] @@ -96,16 +97,19 @@ mod test { let mut appender = UdpAppender::new("127.0.0.1:8080"); let flag = Arc::new(Mutex::new(false)); let flag_clone = flag.clone(); + let (tx, rx) = channel(); // create a thread to receive the record let handler = thread::spawn(move || { let socket = UdpSocket::bind("127.0.0.1:8080").unwrap(); // socket recv + let _ = tx.send(()); let mut buf = [0; 1024]; let (size, _) = socket.recv_from(&mut buf).unwrap(); let mut f = flag_clone.lock().unwrap(); // check the record *f = &buf[..size] == b"hello"; }); + rx.recv().unwrap(); // write record appender.write("hello".as_bytes()).unwrap(); handler.join().unwrap();