Skip to content

Commit fbcb73e

Browse files
authored
Fix data race contributing to wrong value of ended_at (#868)
* Fix data race contributing to wrong value of ended_at Fix `endedAt == startedAt` (zero duration) when all tracks are removed before `Synchronizer.End()` is called. This is a race in the egress shutdown path: track workers call `RemoveTrack` (which deletes from `p.tracks`) before the pipeline calls `End()`. When `End()` iterates participants to find `maxPTS`, the tracks map is already empty, so `maxPTS = 0` and `endedAt = startedAt`. The bug was latent before the duration inflation fix but masked by `time.Now()` fallback. * extract LastPTSAdjusted for better encapsulation * adding LastPTSAdjusted
1 parent 3302552 commit fbcb73e

File tree

4 files changed

+52
-14
lines changed

4 files changed

+52
-14
lines changed

pkg/synchronizer/participant.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ import (
2525
type participantSynchronizer struct {
2626
sync.Mutex
2727

28-
tracks map[uint32]*TrackSynchronizer
28+
tracks map[uint32]*TrackSynchronizer
29+
maxRemovedPTS time.Duration // high-water mark from tracks that have been removed
2930
}
3031

3132
func newParticipantSynchronizer() *participantSynchronizer {
@@ -43,17 +44,26 @@ func (p *participantSynchronizer) onSenderReport(pkt *rtcp.SenderReport) {
4344
}
4445
}
4546

47+
func (p *participantSynchronizer) removeTrack(ssrc uint32) {
48+
p.Lock()
49+
defer p.Unlock()
50+
51+
if t := p.tracks[ssrc]; t != nil {
52+
if pts := t.LastPTSAdjusted(); pts > p.maxRemovedPTS {
53+
p.maxRemovedPTS = pts
54+
}
55+
t.Close()
56+
}
57+
delete(p.tracks, ssrc)
58+
}
59+
4660
func (p *participantSynchronizer) getMaxPTSAdjusted() time.Duration {
4761
p.Lock()
4862
defer p.Unlock()
4963

50-
var maxPTS time.Duration
64+
maxPTS := p.maxRemovedPTS
5165
for _, t := range p.tracks {
52-
t.Lock()
53-
pts := t.lastPTSAdjusted
54-
t.Unlock()
55-
56-
if pts > maxPTS {
66+
if pts := t.LastPTSAdjusted(); pts > maxPTS {
5767
maxPTS = pts
5868
}
5969
}

pkg/synchronizer/synchronizer.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -307,12 +307,7 @@ func (s *Synchronizer) RemoveTrack(trackID string) {
307307
return
308308
}
309309

310-
p.Lock()
311-
if ts := p.tracks[ssrc]; ts != nil {
312-
ts.Close()
313-
}
314-
delete(p.tracks, ssrc)
315-
p.Unlock()
310+
p.removeTrack(ssrc)
316311
}
317312

318313
func (s *Synchronizer) GetStartedAt() int64 {

pkg/synchronizer/synchronizer_test.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
)
1717

1818
const timeTolerance = time.Millisecond * 10
19+
const fakeAudioTrackID = "audio-1"
1920

2021
// ---------- helpers ----------
2122

@@ -37,7 +38,7 @@ func pkt(ts uint32) jitter.ExtPacket {
3738

3839
func fakeAudio48k(ssrc uint32) *synchronizerfakes.FakeTrackRemote {
3940
f := &synchronizerfakes.FakeTrackRemote{}
40-
f.IDReturns("audio-1")
41+
f.IDReturns(fakeAudioTrackID)
4142
f.KindReturns(webrtc.RTPCodecTypeAudio)
4243
f.SSRCReturns(webrtc.SSRC(ssrc))
4344
f.CodecReturns(webrtc.RTPCodecParameters{
@@ -300,3 +301,29 @@ func TestOnSenderReport_LateVideoStart_SmallSROffset_NoHugeNegativeDrift(t *test
300301
near(t, extra, srOffset, timeTolerance)
301302
}
302303
}
304+
305+
// Regression: tracks removed before End() must still contribute to endedAt.
306+
func TestEnd_RemovedTracks_StillContributeDuration(t *testing.T) {
307+
s := synchronizer.NewSynchronizerWithOptions()
308+
309+
audio := fakeAudio48k(0xA001)
310+
ts := s.AddTrack(audio, "p1")
311+
ts.Initialize(pkt(1000).Packet)
312+
_, _ = ts.GetPTS(pkt(1000))
313+
314+
// Push ~2s of audio (100 packets at 20ms)
315+
step := uint32(48000 * 20 / 1000) // 960 ticks
316+
cur := uint32(1000)
317+
for range 100 {
318+
cur += step
319+
_, _ = ts.GetPTS(pkt(cur))
320+
}
321+
322+
// Remove the track before calling End (reproduces the egress shutdown race)
323+
s.RemoveTrack(fakeAudioTrackID)
324+
s.End()
325+
326+
duration := time.Duration(s.GetEndedAt() - s.GetStartedAt())
327+
require.Greater(t, duration, time.Second,
328+
"duration must reflect removed track's PTS; got %v", duration)
329+
}

pkg/synchronizer/track.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,12 @@ func (t *TrackSynchronizer) initialize(extPkt jitter.ExtPacket) {
229229
)
230230
}
231231

232+
func (t *TrackSynchronizer) LastPTSAdjusted() time.Duration {
233+
t.Lock()
234+
defer t.Unlock()
235+
return t.lastPTSAdjusted
236+
}
237+
232238
func (t *TrackSynchronizer) Close() {
233239
t.Lock()
234240
defer t.Unlock()

0 commit comments

Comments
 (0)