diff --git a/pkg/synchronizer/participant.go b/pkg/synchronizer/participant.go index 06417ba0..553fdf6d 100644 --- a/pkg/synchronizer/participant.go +++ b/pkg/synchronizer/participant.go @@ -25,7 +25,8 @@ import ( type participantSynchronizer struct { sync.Mutex - tracks map[uint32]*TrackSynchronizer + tracks map[uint32]*TrackSynchronizer + maxRemovedPTS time.Duration // high-water mark from tracks that have been removed } func newParticipantSynchronizer() *participantSynchronizer { @@ -43,17 +44,26 @@ func (p *participantSynchronizer) onSenderReport(pkt *rtcp.SenderReport) { } } +func (p *participantSynchronizer) removeTrack(ssrc uint32) { + p.Lock() + defer p.Unlock() + + if t := p.tracks[ssrc]; t != nil { + if pts := t.LastPTSAdjusted(); pts > p.maxRemovedPTS { + p.maxRemovedPTS = pts + } + t.Close() + } + delete(p.tracks, ssrc) +} + func (p *participantSynchronizer) getMaxPTSAdjusted() time.Duration { p.Lock() defer p.Unlock() - var maxPTS time.Duration + maxPTS := p.maxRemovedPTS for _, t := range p.tracks { - t.Lock() - pts := t.lastPTSAdjusted - t.Unlock() - - if pts > maxPTS { + if pts := t.LastPTSAdjusted(); pts > maxPTS { maxPTS = pts } } diff --git a/pkg/synchronizer/synchronizer.go b/pkg/synchronizer/synchronizer.go index 0222d5d2..9227aa7d 100644 --- a/pkg/synchronizer/synchronizer.go +++ b/pkg/synchronizer/synchronizer.go @@ -307,12 +307,7 @@ func (s *Synchronizer) RemoveTrack(trackID string) { return } - p.Lock() - if ts := p.tracks[ssrc]; ts != nil { - ts.Close() - } - delete(p.tracks, ssrc) - p.Unlock() + p.removeTrack(ssrc) } func (s *Synchronizer) GetStartedAt() int64 { diff --git a/pkg/synchronizer/synchronizer_test.go b/pkg/synchronizer/synchronizer_test.go index e211c21b..a485989e 100644 --- a/pkg/synchronizer/synchronizer_test.go +++ b/pkg/synchronizer/synchronizer_test.go @@ -16,6 +16,7 @@ import ( ) const timeTolerance = time.Millisecond * 10 +const fakeAudioTrackID = "audio-1" // ---------- helpers ---------- @@ -37,7 +38,7 @@ func pkt(ts uint32) jitter.ExtPacket { func fakeAudio48k(ssrc uint32) *synchronizerfakes.FakeTrackRemote { f := &synchronizerfakes.FakeTrackRemote{} - f.IDReturns("audio-1") + f.IDReturns(fakeAudioTrackID) f.KindReturns(webrtc.RTPCodecTypeAudio) f.SSRCReturns(webrtc.SSRC(ssrc)) f.CodecReturns(webrtc.RTPCodecParameters{ @@ -300,3 +301,29 @@ func TestOnSenderReport_LateVideoStart_SmallSROffset_NoHugeNegativeDrift(t *test near(t, extra, srOffset, timeTolerance) } } + +// Regression: tracks removed before End() must still contribute to endedAt. +func TestEnd_RemovedTracks_StillContributeDuration(t *testing.T) { + s := synchronizer.NewSynchronizerWithOptions() + + audio := fakeAudio48k(0xA001) + ts := s.AddTrack(audio, "p1") + ts.Initialize(pkt(1000).Packet) + _, _ = ts.GetPTS(pkt(1000)) + + // Push ~2s of audio (100 packets at 20ms) + step := uint32(48000 * 20 / 1000) // 960 ticks + cur := uint32(1000) + for range 100 { + cur += step + _, _ = ts.GetPTS(pkt(cur)) + } + + // Remove the track before calling End (reproduces the egress shutdown race) + s.RemoveTrack(fakeAudioTrackID) + s.End() + + duration := time.Duration(s.GetEndedAt() - s.GetStartedAt()) + require.Greater(t, duration, time.Second, + "duration must reflect removed track's PTS; got %v", duration) +} diff --git a/pkg/synchronizer/track.go b/pkg/synchronizer/track.go index 3858f0b3..5b3a1e15 100644 --- a/pkg/synchronizer/track.go +++ b/pkg/synchronizer/track.go @@ -229,6 +229,12 @@ func (t *TrackSynchronizer) initialize(extPkt jitter.ExtPacket) { ) } +func (t *TrackSynchronizer) LastPTSAdjusted() time.Duration { + t.Lock() + defer t.Unlock() + return t.lastPTSAdjusted +} + func (t *TrackSynchronizer) Close() { t.Lock() defer t.Unlock()