From 9141f75f58c6db9330f6b5278a217106ee270de6 Mon Sep 17 00:00:00 2001 From: Anunay Maheshwari Date: Wed, 8 Apr 2026 11:07:56 +0530 Subject: [PATCH 1/2] feat(PCMLocalTrack): add option to grab track and hw stats --- go.mod | 4 +- go.sum | 6 +- pkg/media/pcmlocaltrack.go | 169 ++++++++++++++++++++++++++++++++++++- 3 files changed, 173 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 7dac0e84..72972b2d 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,8 @@ require ( google.golang.org/protobuf v1.36.11 ) +require github.com/mackerelio/go-osstat v0.2.7 // indirect + require ( buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.11-20260209202127-80ab13bee0bf.1 // indirect buf.build/go/protovalidate v1.1.2 // indirect @@ -136,7 +138,7 @@ require ( golang.org/x/exp v0.0.0-20260212183809-81e46e3db34a // indirect golang.org/x/net v0.50.0 // indirect golang.org/x/sync v0.19.0 - golang.org/x/sys v0.41.0 // indirect + golang.org/x/sys v0.42.0 // indirect golang.org/x/text v0.34.0 // indirect golang.org/x/time v0.14.0 // indirect golang.org/x/tools v0.42.0 // indirect diff --git a/go.sum b/go.sum index 6d844e1e..ba647284 100644 --- a/go.sum +++ b/go.sum @@ -163,6 +163,8 @@ github.com/livekit/protocol v1.45.2-0.20260403151849-8a360e8d0221 h1:loe7h+z1kOu github.com/livekit/protocol v1.45.2-0.20260403151849-8a360e8d0221/go.mod h1:e6QdWDkfot+M2nRh0eitJUS0ZLuwvKCsfiz2pWWSG3s= github.com/livekit/psrpc v0.7.1 h1:ms37az0QTD3UXIWuUC5D/SkmKOlRMVRsI261eBWu/Vw= github.com/livekit/psrpc v0.7.1/go.mod h1:bZ4iHFQptTkbPnB0LasvRNu/OBYXEu1NA6O5BMFo9kk= +github.com/mackerelio/go-osstat v0.2.7 h1:TCavZi10wF49bT6iQZ9eT2keGZQpC69MTDfdJej5e94= +github.com/mackerelio/go-osstat v0.2.7/go.mod h1:dwpYh5pIPmvk+IEwBKNIWRFMB92mrC08CmXOhDC7nQk= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/maxbrunsfeld/counterfeiter/v6 v6.12.1 h1:D4O2wLxB384TS3ohBJMfolnxb4qGmoZ1PnWNtit8LYo= @@ -390,8 +392,8 @@ golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= -golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= +golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= diff --git a/pkg/media/pcmlocaltrack.go b/pkg/media/pcmlocaltrack.go index 552963f1..5b6a6ea0 100644 --- a/pkg/media/pcmlocaltrack.go +++ b/pkg/media/pcmlocaltrack.go @@ -10,6 +10,7 @@ import ( "github.com/livekit/media-sdk" "github.com/livekit/media-sdk/opus" protoLogger "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils/hwstats" "github.com/pion/webrtc/v4" "go.uber.org/atomic" @@ -17,7 +18,9 @@ import ( ) type PCMLocalTrackParams struct { - Encryptor Encryptor + Encryptor Encryptor + EnableStats bool + EnableHWStats bool } type PCMLocalTrackOption func(*PCMLocalTrackParams) @@ -28,6 +31,18 @@ func WithEncryptor(encryptor Encryptor) PCMLocalTrackOption { } } +func WithTrackStats() PCMLocalTrackOption { + return func(p *PCMLocalTrackParams) { + p.EnableStats = true + } +} + +func WithHWStats() PCMLocalTrackOption { + return func(p *PCMLocalTrackParams) { + p.EnableHWStats = true + } +} + type PCMLocalTrack struct { *webrtc.TrackLocalStaticSample @@ -53,6 +68,20 @@ type PCMLocalTrack struct { closed atomic.Bool muted atomic.Bool bound atomic.Bool + + logger protoLogger.Logger + enableStats bool + logState pcmLocalTrackLogState + cpuStats *hwstats.CPUStats + memStats *hwstats.MemoryStats +} + +type pcmLocalTrackLogState struct { + at time.Time + totalWritten uint64 + totalProcessed uint64 + prevWritten uint64 + prevProcessed uint64 } // NewPCMLocalTrack creates a wrapper around a webrtc.TrackLocalStaticSample that accepts PCM16 samples via the WriteSample method, @@ -103,6 +132,13 @@ func NewPCMLocalTrack( resampledPCMWriter = media.ResampleWriter(pcmWriter, sourceSampleRate) } + var cpuStats *hwstats.CPUStats + var memStats *hwstats.MemoryStats + if params.EnableHWStats { + cpuStats, _ = hwstats.NewCPUStats(nil) + memStats, _ = hwstats.NewMemoryStats() + } + // the final chain of writers: // WriteSample -> resamplesPCMWriter (resamples source to target sample rate as necessary) // -> PCMWriter (encodes PCM -> Opus) @@ -117,6 +153,13 @@ func NewPCMLocalTrack( sourceChannels: sourceChannels, chunkBuffer: new(deque.Deque[media.PCM16Sample]), samplesPerFrame: (sourceSampleRate * sourceChannels * int(defaultPCMFrameDuration/time.Nanosecond)) / 1e9, + logger: logger, + enableStats: params.EnableStats, + cpuStats: cpuStats, + memStats: memStats, + logState: pcmLocalTrackLogState{ + at: time.Now(), + }, } t.cond = sync.NewCond(&t.mu) @@ -184,7 +227,12 @@ func (t *PCMLocalTrack) WriteSample(chunk media.PCM16Sample) error { t.mu.Lock() t.chunkBuffer.PushBack(chunkCopy) t.cond.Broadcast() + t.logState.totalWritten += uint64(len(chunk)) + snapshot := t.collectLogSnapshotLocked(time.Now()) t.mu.Unlock() + if snapshot != nil { + t.emitLogSnapshot(snapshot) + } return nil } @@ -197,13 +245,24 @@ func (t *PCMLocalTrack) processSamples() { break } + var frame media.PCM16Sample + var snapshot *pcmLocalTrackLogSnapshot + t.mu.Lock() - frame := t.getFrameFromChunkBuffer() + frame = t.getFrameFromChunkBuffer() if frame != nil { - t.resampledPCMWriter.WriteSample(frame) + t.logState.totalProcessed += uint64(len(frame)) + snapshot = t.collectLogSnapshotLocked(time.Now()) } t.mu.Unlock() + if frame != nil { + t.resampledPCMWriter.WriteSample(frame) + if snapshot != nil { + t.emitLogSnapshot(snapshot) + } + } + <-ticker.C } @@ -256,6 +315,9 @@ func (t *PCMLocalTrack) Close() error { t.mu.Lock() t.cond.Broadcast() t.mu.Unlock() + if t.cpuStats != nil { + t.cpuStats.Stop() + } } return nil } @@ -267,3 +329,104 @@ func (t *PCMLocalTrack) SampleRate() int { func (t *PCMLocalTrack) String() string { return "PCMLocalTrack" } + +type pcmLocalTrackLogSnapshot struct { + interval time.Duration + queueSamples int + totalWritten uint64 + totalProcessed uint64 + deltaWritten uint64 + deltaProcessed uint64 +} + +func (t *PCMLocalTrack) collectLogSnapshotLocked(now time.Time) *pcmLocalTrackLogSnapshot { + if !t.enableStats && t.cpuStats == nil && t.memStats == nil { + return nil + } + + if t.logState.at.IsZero() { + t.logState.at = now + return nil + } + + const logInterval = 5 * time.Second + if now.Sub(t.logState.at) < logInterval { + return nil + } + + interval := now.Sub(t.logState.at) + snapshot := &pcmLocalTrackLogSnapshot{ + interval: interval, + queueSamples: t.getNumSamplesInChunkBuffer(), + totalWritten: t.logState.totalWritten, + totalProcessed: t.logState.totalProcessed, + deltaWritten: t.logState.totalWritten - t.logState.prevWritten, + deltaProcessed: t.logState.totalProcessed - t.logState.prevProcessed, + } + + t.logState.at = now + t.logState.prevWritten = t.logState.totalWritten + t.logState.prevProcessed = t.logState.totalProcessed + + return snapshot +} + +func (t *PCMLocalTrack) emitLogSnapshot(snapshot *pcmLocalTrackLogSnapshot) { + if snapshot == nil || snapshot.interval <= 0 { + return + } + + elapsed := snapshot.interval.Seconds() + if elapsed == 0 { + return + } + + fields := make([]interface{}, 0, 20) + + if t.enableStats { + chanCount := float64(t.sourceChannels) + if chanCount == 0 { + chanCount = 1 + } + + ingressHz := float64(snapshot.deltaWritten) / chanCount / elapsed + + processedSamples := float64(snapshot.deltaProcessed) + if t.sourceSampleRate != 0 && t.sourceSampleRate != DefaultOpusSampleRate { + processedSamples *= float64(DefaultOpusSampleRate) / float64(t.sourceSampleRate) + } + egressHz := processedSamples / chanCount / elapsed + + queueSeconds := 0.0 + if t.sourceSampleRate != 0 { + queueSeconds = float64(snapshot.queueSamples) / (chanCount * float64(t.sourceSampleRate)) + } + + fields = append(fields, + "interval_s", elapsed, + "ingress_hz", ingressHz, + "egress_hz", egressHz, + "queue_samples", snapshot.queueSamples, + "queue_s", queueSeconds, + "total_written", snapshot.totalWritten, + "total_processed", snapshot.totalProcessed, + "source_sample_rate", t.sourceSampleRate, + ) + } + + if t.cpuStats != nil { + fields = append(fields, + "cpu_load", t.cpuStats.GetCPULoad(), + "num_cpu", t.cpuStats.NumCPU(), + ) + } + + if t.memStats != nil { + used, total, err := t.memStats.GetMemory() + if err == nil { + fields = append(fields, "mem_used", used, "mem_total", total) + } + } + + t.logger.Infow("pcm local track stats", fields...) +} From d62548b8e002cc282d82a78a04ba606883e64f0b Mon Sep 17 00:00:00 2001 From: Anunay Maheshwari Date: Wed, 8 Apr 2026 13:06:22 +0530 Subject: [PATCH 2/2] cleanup --- pkg/media/pcmlocaltrack.go | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/pkg/media/pcmlocaltrack.go b/pkg/media/pcmlocaltrack.go index 5b6a6ea0..c3301871 100644 --- a/pkg/media/pcmlocaltrack.go +++ b/pkg/media/pcmlocaltrack.go @@ -69,11 +69,12 @@ type PCMLocalTrack struct { muted atomic.Bool bound atomic.Bool - logger protoLogger.Logger - enableStats bool - logState pcmLocalTrackLogState - cpuStats *hwstats.CPUStats - memStats *hwstats.MemoryStats + logger protoLogger.Logger + enableStats bool + loggingEnabled bool + logState pcmLocalTrackLogState + cpuStats *hwstats.CPUStats + memStats *hwstats.MemoryStats } type pcmLocalTrackLogState struct { @@ -153,10 +154,11 @@ func NewPCMLocalTrack( sourceChannels: sourceChannels, chunkBuffer: new(deque.Deque[media.PCM16Sample]), samplesPerFrame: (sourceSampleRate * sourceChannels * int(defaultPCMFrameDuration/time.Nanosecond)) / 1e9, - logger: logger, - enableStats: params.EnableStats, - cpuStats: cpuStats, - memStats: memStats, + logger: logger, + enableStats: params.EnableStats, + loggingEnabled: params.EnableStats || params.EnableHWStats, + cpuStats: cpuStats, + memStats: memStats, logState: pcmLocalTrackLogState{ at: time.Now(), }, @@ -227,8 +229,11 @@ func (t *PCMLocalTrack) WriteSample(chunk media.PCM16Sample) error { t.mu.Lock() t.chunkBuffer.PushBack(chunkCopy) t.cond.Broadcast() - t.logState.totalWritten += uint64(len(chunk)) - snapshot := t.collectLogSnapshotLocked(time.Now()) + var snapshot *pcmLocalTrackLogSnapshot + if t.loggingEnabled { + t.logState.totalWritten += uint64(len(chunk)) + snapshot = t.collectLogSnapshotLocked(time.Now()) + } t.mu.Unlock() if snapshot != nil { t.emitLogSnapshot(snapshot) @@ -250,7 +255,7 @@ func (t *PCMLocalTrack) processSamples() { t.mu.Lock() frame = t.getFrameFromChunkBuffer() - if frame != nil { + if frame != nil && t.loggingEnabled { t.logState.totalProcessed += uint64(len(frame)) snapshot = t.collectLogSnapshotLocked(time.Now()) } @@ -340,10 +345,6 @@ type pcmLocalTrackLogSnapshot struct { } func (t *PCMLocalTrack) collectLogSnapshotLocked(now time.Time) *pcmLocalTrackLogSnapshot { - if !t.enableStats && t.cpuStats == nil && t.memStats == nil { - return nil - } - if t.logState.at.IsZero() { t.logState.at = now return nil