Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions pkg/synchronizer/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import (
type participantSynchronizer struct {
sync.Mutex

tracks map[uint32]*TrackSynchronizer
tracks map[uint32]*TrackSynchronizer
maxRemovedPTS time.Duration // high-water mark from tracks that have been removed
}

func newParticipantSynchronizer() *participantSynchronizer {
Expand All @@ -43,17 +44,26 @@ func (p *participantSynchronizer) onSenderReport(pkt *rtcp.SenderReport) {
}
}

func (p *participantSynchronizer) removeTrack(ssrc uint32) {
p.Lock()
defer p.Unlock()

if t := p.tracks[ssrc]; t != nil {
if pts := t.LastPTSAdjusted(); pts > p.maxRemovedPTS {
p.maxRemovedPTS = pts
}
t.Close()
}
delete(p.tracks, ssrc)
}

func (p *participantSynchronizer) getMaxPTSAdjusted() time.Duration {
p.Lock()
defer p.Unlock()

var maxPTS time.Duration
maxPTS := p.maxRemovedPTS
for _, t := range p.tracks {
t.Lock()
pts := t.lastPTSAdjusted
t.Unlock()

if pts > maxPTS {
if pts := t.LastPTSAdjusted(); pts > maxPTS {
maxPTS = pts
}
}
Expand Down
7 changes: 1 addition & 6 deletions pkg/synchronizer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,7 @@ func (s *Synchronizer) RemoveTrack(trackID string) {
return
}

p.Lock()
if ts := p.tracks[ssrc]; ts != nil {
ts.Close()
}
delete(p.tracks, ssrc)
p.Unlock()
p.removeTrack(ssrc)
}

func (s *Synchronizer) GetStartedAt() int64 {
Expand Down
29 changes: 28 additions & 1 deletion pkg/synchronizer/synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
)

const timeTolerance = time.Millisecond * 10
const fakeAudioTrackID = "audio-1"

// ---------- helpers ----------

Expand All @@ -37,7 +38,7 @@ func pkt(ts uint32) jitter.ExtPacket {

func fakeAudio48k(ssrc uint32) *synchronizerfakes.FakeTrackRemote {
f := &synchronizerfakes.FakeTrackRemote{}
f.IDReturns("audio-1")
f.IDReturns(fakeAudioTrackID)
f.KindReturns(webrtc.RTPCodecTypeAudio)
f.SSRCReturns(webrtc.SSRC(ssrc))
f.CodecReturns(webrtc.RTPCodecParameters{
Expand Down Expand Up @@ -300,3 +301,29 @@ func TestOnSenderReport_LateVideoStart_SmallSROffset_NoHugeNegativeDrift(t *test
near(t, extra, srOffset, timeTolerance)
}
}

// Regression: tracks removed before End() must still contribute to endedAt.
func TestEnd_RemovedTracks_StillContributeDuration(t *testing.T) {
s := synchronizer.NewSynchronizerWithOptions()

audio := fakeAudio48k(0xA001)
ts := s.AddTrack(audio, "p1")
ts.Initialize(pkt(1000).Packet)
_, _ = ts.GetPTS(pkt(1000))

// Push ~2s of audio (100 packets at 20ms)
step := uint32(48000 * 20 / 1000) // 960 ticks
cur := uint32(1000)
for range 100 {
cur += step
_, _ = ts.GetPTS(pkt(cur))
}

// Remove the track before calling End (reproduces the egress shutdown race)
s.RemoveTrack(fakeAudioTrackID)
s.End()

duration := time.Duration(s.GetEndedAt() - s.GetStartedAt())
require.Greater(t, duration, time.Second,
"duration must reflect removed track's PTS; got %v", duration)
}
6 changes: 6 additions & 0 deletions pkg/synchronizer/track.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,12 @@ func (t *TrackSynchronizer) initialize(extPkt jitter.ExtPacket) {
)
}

func (t *TrackSynchronizer) LastPTSAdjusted() time.Duration {
t.Lock()
defer t.Unlock()
return t.lastPTSAdjusted
}

func (t *TrackSynchronizer) Close() {
t.Lock()
defer t.Unlock()
Expand Down
Loading