From 499dd5665d8577f1a21a9434881ccd0317d071c9 Mon Sep 17 00:00:00 2001 From: Milos Pesic Date: Wed, 25 Mar 2026 17:57:31 +0100 Subject: [PATCH 1/3] Fix data race contributing to wrong value of ended_at Fix `endedAt == startedAt` (zero duration) when all tracks are removed before `Synchronizer.End()` is called. This is a race in the egress shutdown path: track workers call `RemoveTrack` (which deletes from `p.tracks`) before the pipeline calls `End()`. When `End()` iterates participants to find `maxPTS`, the tracks map is already empty, so `maxPTS = 0` and `endedAt = startedAt`. The bug was latent before the duration inflation fix but masked by `time.Now()` fallback. --- pkg/synchronizer/participant.go | 20 ++++++++++++++++-- pkg/synchronizer/synchronizer.go | 7 +------ pkg/synchronizer/synchronizer_test.go | 29 ++++++++++++++++++++++++++- 3 files changed, 47 insertions(+), 9 deletions(-) diff --git a/pkg/synchronizer/participant.go b/pkg/synchronizer/participant.go index 06417ba0..56d468eb 100644 --- a/pkg/synchronizer/participant.go +++ b/pkg/synchronizer/participant.go @@ -25,7 +25,8 @@ import ( type participantSynchronizer struct { sync.Mutex - tracks map[uint32]*TrackSynchronizer + tracks map[uint32]*TrackSynchronizer + maxRemovedPTS time.Duration // high-water mark from tracks that have been removed } func newParticipantSynchronizer() *participantSynchronizer { @@ -43,11 +44,26 @@ func (p *participantSynchronizer) onSenderReport(pkt *rtcp.SenderReport) { } } +func (p *participantSynchronizer) removeTrack(ssrc uint32) { + p.Lock() + defer p.Unlock() + + if t := p.tracks[ssrc]; t != nil { + t.Lock() + if t.lastPTSAdjusted > p.maxRemovedPTS { + p.maxRemovedPTS = t.lastPTSAdjusted + } + t.Unlock() + t.Close() + } + delete(p.tracks, ssrc) +} + func (p *participantSynchronizer) getMaxPTSAdjusted() time.Duration { p.Lock() defer p.Unlock() - var maxPTS time.Duration + maxPTS := p.maxRemovedPTS for _, t := range p.tracks { t.Lock() pts := t.lastPTSAdjusted diff --git a/pkg/synchronizer/synchronizer.go b/pkg/synchronizer/synchronizer.go index 0222d5d2..9227aa7d 100644 --- a/pkg/synchronizer/synchronizer.go +++ b/pkg/synchronizer/synchronizer.go @@ -307,12 +307,7 @@ func (s *Synchronizer) RemoveTrack(trackID string) { return } - p.Lock() - if ts := p.tracks[ssrc]; ts != nil { - ts.Close() - } - delete(p.tracks, ssrc) - p.Unlock() + p.removeTrack(ssrc) } func (s *Synchronizer) GetStartedAt() int64 { diff --git a/pkg/synchronizer/synchronizer_test.go b/pkg/synchronizer/synchronizer_test.go index e211c21b..a485989e 100644 --- a/pkg/synchronizer/synchronizer_test.go +++ b/pkg/synchronizer/synchronizer_test.go @@ -16,6 +16,7 @@ import ( ) const timeTolerance = time.Millisecond * 10 +const fakeAudioTrackID = "audio-1" // ---------- helpers ---------- @@ -37,7 +38,7 @@ func pkt(ts uint32) jitter.ExtPacket { func fakeAudio48k(ssrc uint32) *synchronizerfakes.FakeTrackRemote { f := &synchronizerfakes.FakeTrackRemote{} - f.IDReturns("audio-1") + f.IDReturns(fakeAudioTrackID) f.KindReturns(webrtc.RTPCodecTypeAudio) f.SSRCReturns(webrtc.SSRC(ssrc)) f.CodecReturns(webrtc.RTPCodecParameters{ @@ -300,3 +301,29 @@ func TestOnSenderReport_LateVideoStart_SmallSROffset_NoHugeNegativeDrift(t *test near(t, extra, srOffset, timeTolerance) } } + +// Regression: tracks removed before End() must still contribute to endedAt. +func TestEnd_RemovedTracks_StillContributeDuration(t *testing.T) { + s := synchronizer.NewSynchronizerWithOptions() + + audio := fakeAudio48k(0xA001) + ts := s.AddTrack(audio, "p1") + ts.Initialize(pkt(1000).Packet) + _, _ = ts.GetPTS(pkt(1000)) + + // Push ~2s of audio (100 packets at 20ms) + step := uint32(48000 * 20 / 1000) // 960 ticks + cur := uint32(1000) + for range 100 { + cur += step + _, _ = ts.GetPTS(pkt(cur)) + } + + // Remove the track before calling End (reproduces the egress shutdown race) + s.RemoveTrack(fakeAudioTrackID) + s.End() + + duration := time.Duration(s.GetEndedAt() - s.GetStartedAt()) + require.Greater(t, duration, time.Second, + "duration must reflect removed track's PTS; got %v", duration) +} From 07a4887bc13758874bf6ea19c341b19663f26ba7 Mon Sep 17 00:00:00 2001 From: Milos Pesic Date: Wed, 25 Mar 2026 20:17:41 +0100 Subject: [PATCH 2/3] extract LastPTSAdjusted for better encapsulation --- pkg/synchronizer/participant.go | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/pkg/synchronizer/participant.go b/pkg/synchronizer/participant.go index 56d468eb..553fdf6d 100644 --- a/pkg/synchronizer/participant.go +++ b/pkg/synchronizer/participant.go @@ -49,11 +49,9 @@ func (p *participantSynchronizer) removeTrack(ssrc uint32) { defer p.Unlock() if t := p.tracks[ssrc]; t != nil { - t.Lock() - if t.lastPTSAdjusted > p.maxRemovedPTS { - p.maxRemovedPTS = t.lastPTSAdjusted + if pts := t.LastPTSAdjusted(); pts > p.maxRemovedPTS { + p.maxRemovedPTS = pts } - t.Unlock() t.Close() } delete(p.tracks, ssrc) @@ -65,11 +63,7 @@ func (p *participantSynchronizer) getMaxPTSAdjusted() time.Duration { maxPTS := p.maxRemovedPTS for _, t := range p.tracks { - t.Lock() - pts := t.lastPTSAdjusted - t.Unlock() - - if pts > maxPTS { + if pts := t.LastPTSAdjusted(); pts > maxPTS { maxPTS = pts } } From 98438bbd283daef215919a35fe493d7bba23915f Mon Sep 17 00:00:00 2001 From: Milos Pesic Date: Wed, 25 Mar 2026 20:18:09 +0100 Subject: [PATCH 3/3] adding LastPTSAdjusted --- pkg/synchronizer/track.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/synchronizer/track.go b/pkg/synchronizer/track.go index 3858f0b3..5b3a1e15 100644 --- a/pkg/synchronizer/track.go +++ b/pkg/synchronizer/track.go @@ -229,6 +229,12 @@ func (t *TrackSynchronizer) initialize(extPkt jitter.ExtPacket) { ) } +func (t *TrackSynchronizer) LastPTSAdjusted() time.Duration { + t.Lock() + defer t.Unlock() + return t.lastPTSAdjusted +} + func (t *TrackSynchronizer) Close() { t.Lock() defer t.Unlock()