Skip to content

Commit 37cf8e4

Browse files
author
Bert Vermeiren
committed
feat(memory_pool): add TrackConsumersPool::metrics() to expose consumer snapshots
1 parent abf8f61 commit 37cf8e4

File tree

1 file changed

+83
-0
lines changed
  • datafusion/execution/src/memory_pool

1 file changed

+83
-0
lines changed

datafusion/execution/src/memory_pool/pool.rs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,32 @@ impl TrackedConsumer {
302302
}
303303
}
304304

305+
/// A point-in-time snapshot of a tracked memory consumer's state.
306+
///
307+
/// Returned by [`TrackConsumersPool::metrics()`].
308+
#[derive(Debug, Clone)]
309+
pub struct MemoryConsumerMetrics {
310+
/// The name of the memory consumer
311+
pub name: String,
312+
/// Whether this consumer can spill to disk
313+
pub can_spill: bool,
314+
/// The number of bytes currently reserved by this consumer
315+
pub reserved: usize,
316+
/// The peak number of bytes reserved by this consumer
317+
pub peak: usize,
318+
}
319+
320+
impl From<&TrackedConsumer> for MemoryConsumerMetrics {
321+
fn from(tracked: &TrackedConsumer) -> Self {
322+
Self {
323+
name: tracked.name.clone(),
324+
can_spill: tracked.can_spill,
325+
reserved: tracked.reserved(),
326+
peak: tracked.peak(),
327+
}
328+
}
329+
}
330+
305331
/// A [`MemoryPool`] that tracks the consumers that have
306332
/// reserved memory within the inner memory pool.
307333
///
@@ -381,6 +407,15 @@ impl<I: MemoryPool> TrackConsumersPool<I> {
381407
}
382408
}
383409

410+
/// Returns a snapshot of all currently tracked consumers.
411+
pub fn metrics(&self) -> Vec<MemoryConsumerMetrics> {
412+
self.tracked_consumers
413+
.lock()
414+
.values()
415+
.map(Into::into)
416+
.collect()
417+
}
418+
384419
/// Returns a formatted string with the top memory consumers.
385420
pub fn report_top(&self, top: usize) -> String {
386421
let mut consumers = self
@@ -778,6 +813,54 @@ mod tests {
778813
test_per_pool_type(tracked_greedy_pool);
779814
}
780815

816+
#[test]
817+
fn test_track_consumers_pool_metrics() {
818+
let track_consumers_pool = Arc::new(TrackConsumersPool::new(
819+
GreedyMemoryPool::new(1000),
820+
NonZeroUsize::new(3).unwrap(),
821+
));
822+
let memory_pool: Arc<dyn MemoryPool> = Arc::clone(&track_consumers_pool) as _;
823+
824+
// Empty pool has no metrics
825+
assert!(track_consumers_pool.metrics().is_empty());
826+
827+
// Register consumers with different spill settings
828+
let r1 = MemoryConsumer::new("spilling")
829+
.with_can_spill(true)
830+
.register(&memory_pool);
831+
let r2 = MemoryConsumer::new("non-spilling").register(&memory_pool);
832+
833+
// Grow r1 in two steps to verify peak tracking
834+
r1.grow(100);
835+
r1.grow(50);
836+
r1.shrink(50); // reserved=100, peak=150
837+
838+
r2.grow(200); // reserved=200, peak=200
839+
840+
let mut metrics = track_consumers_pool.metrics();
841+
metrics.sort_by_key(|m| m.name.clone());
842+
843+
assert_eq!(metrics.len(), 2);
844+
845+
let m_non = &metrics[0];
846+
assert_eq!(m_non.name, "non-spilling");
847+
assert!(!m_non.can_spill);
848+
assert_eq!(m_non.reserved, 200);
849+
assert_eq!(m_non.peak, 200);
850+
851+
let m_spill = &metrics[1];
852+
assert_eq!(m_spill.name, "spilling");
853+
assert!(m_spill.can_spill);
854+
assert_eq!(m_spill.reserved, 100);
855+
assert_eq!(m_spill.peak, 150);
856+
857+
// Unregistered consumers are removed from metrics
858+
drop(r2);
859+
let metrics = track_consumers_pool.metrics();
860+
assert_eq!(metrics.len(), 1);
861+
assert_eq!(metrics[0].name, "spilling");
862+
}
863+
781864
#[test]
782865
fn test_tracked_consumers_pool_use_beyond_errors() {
783866
let setting = make_settings();

0 commit comments

Comments
 (0)