Skip to content

Commit ac10b8f

Browse files
author
Bert Vermeiren
committed
feat(memory_pool): add TrackConsumersPool::metrics() to expose consumer snapshots
1 parent abf8f61 commit ac10b8f

1 file changed

Lines changed: 79 additions & 0 deletions

File tree

  • datafusion/execution/src/memory_pool

datafusion/execution/src/memory_pool/pool.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,34 @@ impl TrackedConsumer {
302302
}
303303
}
304304

305+
306+
/// A point-in-time snapshot of a tracked memory consumer's state.
307+
///
308+
/// Returned by [`TrackConsumersPool::metrics()`].
309+
#[derive(Debug, Clone)]
310+
pub struct MemoryConsumerMetrics {
311+
/// The name of the memory consumer
312+
pub name: String,
313+
/// Whether this consumer can spill to disk
314+
pub can_spill: bool,
315+
/// The number of bytes currently reserved by this consumer
316+
pub reserved: usize,
317+
/// The peak number of bytes reserved by this consumer
318+
pub peak: usize,
319+
}
320+
321+
impl From<&TrackedConsumer> for MemoryConsumerMetrics {
322+
fn from(tracked: &TrackedConsumer) -> Self {
323+
Self {
324+
name: tracked.name.clone(),
325+
can_spill: tracked.can_spill,
326+
reserved: tracked.reserved(),
327+
peak: tracked.peak(),
328+
}
329+
}
330+
}
331+
332+
305333
/// A [`MemoryPool`] that tracks the consumers that have
306334
/// reserved memory within the inner memory pool.
307335
///
@@ -381,6 +409,11 @@ impl<I: MemoryPool> TrackConsumersPool<I> {
381409
}
382410
}
383411

412+
/// Returns a snapshot of all currently tracked consumers.
413+
pub fn metrics(&self) -> Vec<MemoryConsumerMetrics> {
414+
self.tracked_consumers.lock().values().map(Into::into).collect()
415+
}
416+
384417
/// Returns a formatted string with the top memory consumers.
385418
pub fn report_top(&self, top: usize) -> String {
386419
let mut consumers = self
@@ -778,6 +811,52 @@ mod tests {
778811
test_per_pool_type(tracked_greedy_pool);
779812
}
780813

814+
#[test]
815+
fn test_track_consumers_pool_metrics() {
816+
let track_consumers_pool = Arc::new(TrackConsumersPool::new(
817+
GreedyMemoryPool::new(1000),
818+
NonZeroUsize::new(3).unwrap(),
819+
));
820+
let memory_pool: Arc<dyn MemoryPool> = Arc::clone(&track_consumers_pool) as _;
821+
822+
// Empty pool has no metrics
823+
assert!(track_consumers_pool.metrics().is_empty());
824+
825+
// Register consumers with different spill settings
826+
let r1 = MemoryConsumer::new("spilling").with_can_spill(true).register(&memory_pool);
827+
let r2 = MemoryConsumer::new("non-spilling").register(&memory_pool);
828+
829+
// Grow r1 in two steps to verify peak tracking
830+
r1.grow(100);
831+
r1.grow(50);
832+
r1.shrink(50); // reserved=100, peak=150
833+
834+
r2.grow(200); // reserved=200, peak=200
835+
836+
let mut metrics = track_consumers_pool.metrics();
837+
metrics.sort_by_key(|m| m.name.clone());
838+
839+
assert_eq!(metrics.len(), 2);
840+
841+
let m_non = &metrics[0];
842+
assert_eq!(m_non.name, "non-spilling");
843+
assert!(!m_non.can_spill);
844+
assert_eq!(m_non.reserved, 200);
845+
assert_eq!(m_non.peak, 200);
846+
847+
let m_spill = &metrics[1];
848+
assert_eq!(m_spill.name, "spilling");
849+
assert!(m_spill.can_spill);
850+
assert_eq!(m_spill.reserved, 100);
851+
assert_eq!(m_spill.peak, 150);
852+
853+
// Unregistered consumers are removed from metrics
854+
drop(r2);
855+
let metrics = track_consumers_pool.metrics();
856+
assert_eq!(metrics.len(), 1);
857+
assert_eq!(metrics[0].name, "spilling");
858+
}
859+
781860
#[test]
782861
fn test_tracked_consumers_pool_use_beyond_errors() {
783862
let setting = make_settings();

0 commit comments

Comments
 (0)