From d2391de878065ed43abaa303f304fd7adf29d397 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 17 Dec 2025 22:27:16 +0800 Subject: [PATCH] chore: code tidy over spsc Signed-off-by: tison --- fastrace/src/collector/global_collector.rs | 23 +++++---- fastrace/src/util/command_bus.rs | 22 +++++---- fastrace/src/util/spsc.rs | 54 ++++++++++++++-------- 3 files changed, 59 insertions(+), 40 deletions(-) diff --git a/fastrace/src/collector/global_collector.rs b/fastrace/src/collector/global_collector.rs index b8d3f58..3a026ea 100644 --- a/fastrace/src/collector/global_collector.rs +++ b/fastrace/src/collector/global_collector.rs @@ -6,10 +6,8 @@ use std::collections::HashMap; use std::sync::Arc; use std::sync::LazyLock; use std::sync::atomic::AtomicBool; -use std::sync::atomic::AtomicU64; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; -use std::time::Duration; use fastant::Anchor; use fastant::Instant; @@ -36,7 +34,6 @@ use crate::util::command_bus::CommandSender; static NEXT_COLLECT_ID: AtomicUsize = AtomicUsize::new(0); static GLOBAL_COLLECTOR: Mutex> = Mutex::new(None); -static REPORT_INTERVAL: AtomicU64 = AtomicU64::new(0); static REPORTER_READY: AtomicBool = AtomicBool::new(false); static COMMAND_BUS: LazyLock> = LazyLock::new(CommandBus::new); @@ -139,14 +136,14 @@ impl GlobalCollect { // Note that: relationships are not built completely for now so a further job is needed. // - // Every `SpanSet` has its own root spans whose `raw_span.parent_id`s are equal to + // Every `SpanSet` has its own root spans whose `raw_span.parent_id` are equal to // `SpanId::default()`. // // Every root span can have multiple parents where mainly comes from `Span::enter_with_parents`. // Those parents are recorded into `CollectToken` which has several `CollectTokenItem`s. Look // into a `CollectTokenItem`, `parent_ids` can be found. // - // For example, we have a `SpanSet::LocalSpansInner` and a `CollectToken` as follow: + // For example, we have a `SpanSet::LocalSpansInner` and a `CollectToken` as follows: // // SpanSet::LocalSpansInner::spans CollectToken::parent_ids // +------+-----------+-----+ +------------+------------+ @@ -210,8 +207,8 @@ pub(crate) struct GlobalCollector { active_collectors: HashMap, - // Vectors to be reused by collection loops. They must be empty outside of the - // `handle_commands` loop. + // Vectors to be reused by collection loops. They must be empty outside + // the `handle_commands` loop. start_collects: Vec, drop_collects: Vec, commit_collects: Vec, @@ -221,7 +218,6 @@ pub(crate) struct GlobalCollector { impl GlobalCollector { fn start(reporter: impl Reporter, config: Config) { - REPORT_INTERVAL.store(config.report_interval.as_nanos() as u64, Ordering::Relaxed); REPORTER_READY.store(true, Ordering::Relaxed); let mut global_collector = GLOBAL_COLLECTOR.lock(); @@ -249,9 +245,12 @@ impl GlobalCollector { .name("fastrace-global-collector".to_string()) .spawn(move || { loop { - GLOBAL_COLLECTOR.lock().as_mut().unwrap().handle_commands(); - let report_interval = - Duration::from_nanos(REPORT_INTERVAL.load(Ordering::Relaxed)); + let mut global_collector = GLOBAL_COLLECTOR.lock(); + let collector = global_collector.as_mut().unwrap(); + let report_interval = collector.config.report_interval; + collector.handle_commands(); + drop(global_collector); + COMMAND_BUS.wait_timeout(report_interval); } }) @@ -274,7 +273,7 @@ impl GlobalCollector { CollectCommand::SubmitSpans(cmd) => self.submit_spans.push(cmd), }); - // If the reporter is not set, global collectior only clears the channel and then dismiss + // If the reporter is not set, global collector only clears the channel and then dismiss // all messages. if self.reporter.is_none() { self.start_collects.clear(); diff --git a/fastrace/src/util/command_bus.rs b/fastrace/src/util/command_bus.rs index b90e930..cbbf080 100644 --- a/fastrace/src/util/command_bus.rs +++ b/fastrace/src/util/command_bus.rs @@ -1,4 +1,16 @@ -// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. +// Copyright 2024 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. use std::time::Duration; @@ -110,12 +122,6 @@ struct NotifySender { impl NotifySender { fn notify(&self) { #[cfg(not(target_family = "wasm"))] - { - self.notify_tx.try_send(()).ok(); - } - #[cfg(target_family = "wasm")] - { - Ok(()) - } + self.notify_tx.try_send(()).ok(); } } diff --git a/fastrace/src/util/spsc.rs b/fastrace/src/util/spsc.rs index 7ff57d9..1b4172d 100644 --- a/fastrace/src/util/spsc.rs +++ b/fastrace/src/util/spsc.rs @@ -1,4 +1,20 @@ +// Copyright 2024 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This file is derived from [1] under the original license header: // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. +// [1]: https://github.com/tikv/minitrace-rust/blob/v0.6.4/minitrace/src/util/spsc.rs use std::collections::VecDeque; @@ -9,18 +25,13 @@ use rtrb::RingBuffer; pub fn bounded(capacity: usize) -> (Sender, Receiver) { let (tx, rx) = RingBuffer::new(capacity); - ( - Sender { - tx, - pending_messages: VecDeque::new(), - }, - Receiver { rx }, - ) + let pending_msgs = VecDeque::new(); + (Sender { tx, pending_msgs }, Receiver { rx }) } pub struct Sender { tx: Producer, - pending_messages: VecDeque, + pending_msgs: VecDeque, } pub struct Receiver { @@ -32,39 +43,42 @@ pub struct ChannelClosed; impl Sender { #[inline] - pub fn is_under_pressure(&self) -> bool { + pub(crate) fn is_under_pressure(&self) -> bool { let capacity = self.tx.buffer().capacity(); self.tx.slots() * 2 < capacity } pub fn send(&mut self, value: T) { - while let Some(value) = self.pending_messages.pop_front() { - if let Err(PushError::Full(value)) = self.tx.push(value) { - self.pending_messages.push_front(value); - break; + while let Some(pending_value) = self.pending_msgs.pop_front() { + if let Err(PushError::Full(pending_value)) = self.tx.push(pending_value) { + self.pending_msgs.push_front(pending_value); + self.pending_msgs.push_back(value); + return; } } if let Err(PushError::Full(value)) = self.tx.push(value) { - self.pending_messages.push_back(value); + self.pending_msgs.push_back(value); } } } impl Drop for Sender { fn drop(&mut self) { - for command in self.pending_messages.drain(..) { - drop(self.tx.push(command)); + for msg in std::mem::take(&mut self.pending_msgs) { + self.tx.push(msg).ok(); } } } impl Receiver { pub fn try_recv(&mut self) -> Result, ChannelClosed> { - match self.rx.pop() { - Ok(val) => Ok(Some(val)), - Err(_) if self.rx.is_abandoned() => Err(ChannelClosed), - Err(_) => Ok(None), + if let Ok(val) = self.rx.pop() { + Ok(Some(val)) + } else if self.rx.is_abandoned() { + Err(ChannelClosed) + } else { + Ok(None) } } }