Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions fastrace/src/collector/global_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::sync::LazyLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::time::Duration;

use fastant::Anchor;
use fastant::Instant;
Expand All @@ -36,7 +34,6 @@ use crate::util::command_bus::CommandSender;

static NEXT_COLLECT_ID: AtomicUsize = AtomicUsize::new(0);
static GLOBAL_COLLECTOR: Mutex<Option<GlobalCollector>> = Mutex::new(None);
static REPORT_INTERVAL: AtomicU64 = AtomicU64::new(0);
static REPORTER_READY: AtomicBool = AtomicBool::new(false);
static COMMAND_BUS: LazyLock<CommandBus<CollectCommand>> = LazyLock::new(CommandBus::new);

Expand Down Expand Up @@ -139,14 +136,14 @@ impl GlobalCollect {

// Note that: relationships are not built completely for now so a further job is needed.
//
// Every `SpanSet` has its own root spans whose `raw_span.parent_id`s are equal to
// Every `SpanSet` has its own root spans whose `raw_span.parent_id` are equal to
// `SpanId::default()`.
//
// Every root span can have multiple parents where mainly comes from `Span::enter_with_parents`.
// Those parents are recorded into `CollectToken` which has several `CollectTokenItem`s. Look
// into a `CollectTokenItem`, `parent_ids` can be found.
//
// For example, we have a `SpanSet::LocalSpansInner` and a `CollectToken` as follow:
// For example, we have a `SpanSet::LocalSpansInner` and a `CollectToken` as follows:
//
// SpanSet::LocalSpansInner::spans CollectToken::parent_ids
// +------+-----------+-----+ +------------+------------+
Expand Down Expand Up @@ -211,8 +208,8 @@ pub(crate) struct GlobalCollector {

active_collectors: HashMap<usize, ActiveCollector>,

// Vectors to be reused by collection loops. They must be empty outside of the
// `handle_commands` loop.
// Vectors to be reused by collection loops. They must be empty outside
// the `handle_commands` loop.
start_collects: Vec<StartCollect>,
cancel_collects: Vec<CancelCollect>,
drop_collects: Vec<DropCollect>,
Expand All @@ -222,7 +219,6 @@ pub(crate) struct GlobalCollector {

impl GlobalCollector {
fn start(reporter: impl Reporter, config: Config) {
REPORT_INTERVAL.store(config.report_interval.as_nanos() as u64, Ordering::Relaxed);
REPORTER_READY.store(true, Ordering::Relaxed);

let mut global_collector = GLOBAL_COLLECTOR.lock();
Expand Down Expand Up @@ -250,9 +246,12 @@ impl GlobalCollector {
.name("fastrace-global-collector".to_string())
.spawn(move || {
loop {
GLOBAL_COLLECTOR.lock().as_mut().unwrap().handle_commands();
let report_interval =
Duration::from_nanos(REPORT_INTERVAL.load(Ordering::Relaxed));
let mut global_collector = GLOBAL_COLLECTOR.lock();
let collector = global_collector.as_mut().unwrap();
let report_interval = collector.config.report_interval;
collector.handle_commands();
drop(global_collector);

COMMAND_BUS.wait_timeout(report_interval);
}
})
Expand All @@ -275,7 +274,7 @@ impl GlobalCollector {
CollectCommand::SubmitSpans(cmd) => self.submit_spans.push(cmd),
});

// If the reporter is not set, global collectior only clears the channel and then dismiss
// If the reporter is not set, global collector only clears the channel and then dismiss
// all messages.
if self.reporter.is_none() {
self.start_collects.clear();
Expand Down
2 changes: 1 addition & 1 deletion fastrace/src/util/command_bus.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2025 FastLabs Developers
// Copyright 2024 FastLabs Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
51 changes: 32 additions & 19 deletions fastrace/src/util/spsc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,20 @@
// Copyright 2024 FastLabs Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This file is derived from [1] under the original license header:
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.
// [1]: https://github.com/tikv/minitrace-rust/blob/v0.6.4/minitrace/src/util/spsc.rs

use std::collections::VecDeque;

Expand All @@ -9,18 +25,13 @@ use rtrb::RingBuffer;

pub fn bounded<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
let (tx, rx) = RingBuffer::new(capacity);
(
Sender {
tx,
pending: VecDeque::new(),
},
Receiver { rx },
)
let pending_msgs = VecDeque::new();
(Sender { tx, pending_msgs }, Receiver { rx })
}

pub struct Sender<T> {
tx: Producer<T>,
pending: VecDeque<T>,
pending_msgs: VecDeque<T>,
}

pub struct Receiver<T> {
Expand All @@ -32,40 +43,42 @@ pub struct ChannelClosed;

impl<T> Sender<T> {
#[inline]
pub fn is_under_pressure(&self) -> bool {
pub(crate) fn is_under_pressure(&self) -> bool {
let capacity = self.tx.buffer().capacity();
self.tx.slots() * 2 < capacity
}

pub fn send(&mut self, value: T) {
while let Some(pending_value) = self.pending.pop_front() {
while let Some(pending_value) = self.pending_msgs.pop_front() {
if let Err(PushError::Full(pending_value)) = self.tx.push(pending_value) {
self.pending.push_front(pending_value);
self.pending.push_back(value);
self.pending_msgs.push_front(pending_value);
self.pending_msgs.push_back(value);
return;
}
}

if let Err(PushError::Full(value)) = self.tx.push(value) {
self.pending.push_back(value);
self.pending_msgs.push_back(value);
}
}
}

impl<T> Drop for Sender<T> {
fn drop(&mut self) {
for command in self.pending.drain(..) {
drop(self.tx.push(command));
for msg in std::mem::take(&mut self.pending_msgs) {
self.tx.push(msg).ok();
}
}
}

impl<T> Receiver<T> {
pub fn try_recv(&mut self) -> Result<Option<T>, ChannelClosed> {
match self.rx.pop() {
Ok(val) => Ok(Some(val)),
Err(_) if self.rx.is_abandoned() => Err(ChannelClosed),
Err(_) => Ok(None),
if let Ok(val) = self.rx.pop() {
Ok(Some(val))
} else if self.rx.is_abandoned() {
Err(ChannelClosed)
} else {
Ok(None)
}
}
}