diff --git a/fastrace/src/collector/global_collector.rs b/fastrace/src/collector/global_collector.rs index a5e93f4..b998e76 100644 --- a/fastrace/src/collector/global_collector.rs +++ b/fastrace/src/collector/global_collector.rs @@ -6,10 +6,8 @@ use std::collections::HashMap; use std::sync::Arc; use std::sync::LazyLock; use std::sync::atomic::AtomicBool; -use std::sync::atomic::AtomicU64; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; -use std::time::Duration; use fastant::Anchor; use fastant::Instant; @@ -36,7 +34,6 @@ use crate::util::command_bus::CommandSender; static NEXT_COLLECT_ID: AtomicUsize = AtomicUsize::new(0); static GLOBAL_COLLECTOR: Mutex> = Mutex::new(None); -static REPORT_INTERVAL: AtomicU64 = AtomicU64::new(0); static REPORTER_READY: AtomicBool = AtomicBool::new(false); static COMMAND_BUS: LazyLock> = LazyLock::new(CommandBus::new); @@ -139,14 +136,14 @@ impl GlobalCollect { // Note that: relationships are not built completely for now so a further job is needed. // - // Every `SpanSet` has its own root spans whose `raw_span.parent_id`s are equal to + // Every `SpanSet` has its own root spans whose `raw_span.parent_id` are equal to // `SpanId::default()`. // // Every root span can have multiple parents where mainly comes from `Span::enter_with_parents`. // Those parents are recorded into `CollectToken` which has several `CollectTokenItem`s. Look // into a `CollectTokenItem`, `parent_ids` can be found. // - // For example, we have a `SpanSet::LocalSpansInner` and a `CollectToken` as follow: + // For example, we have a `SpanSet::LocalSpansInner` and a `CollectToken` as follows: // // SpanSet::LocalSpansInner::spans CollectToken::parent_ids // +------+-----------+-----+ +------------+------------+ @@ -211,8 +208,8 @@ pub(crate) struct GlobalCollector { active_collectors: HashMap, - // Vectors to be reused by collection loops. They must be empty outside of the - // `handle_commands` loop. + // Vectors to be reused by collection loops. They must be empty outside + // the `handle_commands` loop. start_collects: Vec, cancel_collects: Vec, drop_collects: Vec, @@ -222,7 +219,6 @@ pub(crate) struct GlobalCollector { impl GlobalCollector { fn start(reporter: impl Reporter, config: Config) { - REPORT_INTERVAL.store(config.report_interval.as_nanos() as u64, Ordering::Relaxed); REPORTER_READY.store(true, Ordering::Relaxed); let mut global_collector = GLOBAL_COLLECTOR.lock(); @@ -250,9 +246,12 @@ impl GlobalCollector { .name("fastrace-global-collector".to_string()) .spawn(move || { loop { - GLOBAL_COLLECTOR.lock().as_mut().unwrap().handle_commands(); - let report_interval = - Duration::from_nanos(REPORT_INTERVAL.load(Ordering::Relaxed)); + let mut global_collector = GLOBAL_COLLECTOR.lock(); + let collector = global_collector.as_mut().unwrap(); + let report_interval = collector.config.report_interval; + collector.handle_commands(); + drop(global_collector); + COMMAND_BUS.wait_timeout(report_interval); } }) @@ -275,7 +274,7 @@ impl GlobalCollector { CollectCommand::SubmitSpans(cmd) => self.submit_spans.push(cmd), }); - // If the reporter is not set, global collectior only clears the channel and then dismiss + // If the reporter is not set, global collector only clears the channel and then dismiss // all messages. if self.reporter.is_none() { self.start_collects.clear(); diff --git a/fastrace/src/util/command_bus.rs b/fastrace/src/util/command_bus.rs index 90ffe98..cbbf080 100644 --- a/fastrace/src/util/command_bus.rs +++ b/fastrace/src/util/command_bus.rs @@ -1,4 +1,4 @@ -// Copyright 2025 FastLabs Developers +// Copyright 2024 FastLabs Developers // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/fastrace/src/util/spsc.rs b/fastrace/src/util/spsc.rs index abf668e..1b4172d 100644 --- a/fastrace/src/util/spsc.rs +++ b/fastrace/src/util/spsc.rs @@ -1,4 +1,20 @@ +// Copyright 2024 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This file is derived from [1] under the original license header: // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. +// [1]: https://github.com/tikv/minitrace-rust/blob/v0.6.4/minitrace/src/util/spsc.rs use std::collections::VecDeque; @@ -9,18 +25,13 @@ use rtrb::RingBuffer; pub fn bounded(capacity: usize) -> (Sender, Receiver) { let (tx, rx) = RingBuffer::new(capacity); - ( - Sender { - tx, - pending: VecDeque::new(), - }, - Receiver { rx }, - ) + let pending_msgs = VecDeque::new(); + (Sender { tx, pending_msgs }, Receiver { rx }) } pub struct Sender { tx: Producer, - pending: VecDeque, + pending_msgs: VecDeque, } pub struct Receiver { @@ -32,40 +43,42 @@ pub struct ChannelClosed; impl Sender { #[inline] - pub fn is_under_pressure(&self) -> bool { + pub(crate) fn is_under_pressure(&self) -> bool { let capacity = self.tx.buffer().capacity(); self.tx.slots() * 2 < capacity } pub fn send(&mut self, value: T) { - while let Some(pending_value) = self.pending.pop_front() { + while let Some(pending_value) = self.pending_msgs.pop_front() { if let Err(PushError::Full(pending_value)) = self.tx.push(pending_value) { - self.pending.push_front(pending_value); - self.pending.push_back(value); + self.pending_msgs.push_front(pending_value); + self.pending_msgs.push_back(value); return; } } if let Err(PushError::Full(value)) = self.tx.push(value) { - self.pending.push_back(value); + self.pending_msgs.push_back(value); } } } impl Drop for Sender { fn drop(&mut self) { - for command in self.pending.drain(..) { - drop(self.tx.push(command)); + for msg in std::mem::take(&mut self.pending_msgs) { + self.tx.push(msg).ok(); } } } impl Receiver { pub fn try_recv(&mut self) -> Result, ChannelClosed> { - match self.rx.pop() { - Ok(val) => Ok(Some(val)), - Err(_) if self.rx.is_abandoned() => Err(ChannelClosed), - Err(_) => Ok(None), + if let Ok(val) = self.rx.pop() { + Ok(Some(val)) + } else if self.rx.is_abandoned() { + Err(ChannelClosed) + } else { + Ok(None) } } }