Skip to content

Commit 4493ba0

Browse files
authored
Fix duration inflation caused by late-subscribing tracks (#859)
End() previously computed endedAt as wallclock + max PTS offset, which included the late-subscription base offset (e.g. +44s for a track that joined 44s late), inflating reported duration far beyond actual recording time. This replaces the offset-based computation with the actual max adjusted PTS reached by any track, making both endedAt and the drain ceiling directly reflect the media timeline.
1 parent 6376347 commit 4493ba0

File tree

3 files changed

+54
-14
lines changed

3 files changed

+54
-14
lines changed

pkg/synchronizer/participant.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,21 +43,21 @@ func (p *participantSynchronizer) onSenderReport(pkt *rtcp.SenderReport) {
4343
}
4444
}
4545

46-
func (p *participantSynchronizer) getMaxOffset() time.Duration {
46+
func (p *participantSynchronizer) getMaxPTSAdjusted() time.Duration {
4747
p.Lock()
4848
defer p.Unlock()
4949

50-
var maxOffset time.Duration
50+
var maxPTS time.Duration
5151
for _, t := range p.tracks {
5252
t.Lock()
53-
o := max(t.currentPTSOffset, t.desiredPTSOffset)
53+
pts := t.lastPTSAdjusted
5454
t.Unlock()
5555

56-
if o > maxOffset {
57-
maxOffset = o
56+
if pts > maxPTS {
57+
maxPTS = pts
5858
}
5959
}
60-
return maxOffset
60+
return maxPTS
6161
}
6262

6363
func (p *participantSynchronizer) drain(maxPTS time.Duration) {

pkg/synchronizer/synchronizer.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -273,20 +273,20 @@ func (s *Synchronizer) OnRTCP(packet rtcp.Packet) {
273273
}
274274

275275
func (s *Synchronizer) End() {
276-
endTime := time.Now()
277-
278276
s.Lock()
279277
defer s.Unlock()
280278

281-
// find the earliest time we can stop all tracks
282-
var maxOffset time.Duration
279+
// maxPTS is the drain ceiling: the maximum adjusted PTS after which tracks
280+
// return EOF. Use the furthest adjusted PTS any track has actually reached
281+
// so that all tracks can drain up to the same point in the output timeline.
282+
var maxPTS time.Duration
283283
for _, p := range s.psByIdentity {
284-
if m := p.getMaxOffset(); m > maxOffset {
285-
maxOffset = m
284+
if m := p.getMaxPTSAdjusted(); m > maxPTS {
285+
maxPTS = m
286286
}
287287
}
288-
s.endedAt = endTime.Add(maxOffset).UnixNano()
289-
maxPTS := time.Duration(s.endedAt - s.startedAt)
288+
289+
s.endedAt = s.startedAt + int64(maxPTS)
290290

291291
// drain all
292292
for _, p := range s.psByIdentity {

pkg/synchronizer/synchronizer_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,46 @@ func fakeVideo90k(ssrc uint32) *synchronizerfakes.FakeTrackRemote {
6565

6666
// ---------- tests ----------
6767

68+
// Regression: a late-subscribing track must not inflate the reported duration.
69+
func TestEnd_LateTrack_DoesNotInflateDuration(t *testing.T) {
70+
s := synchronizer.NewSynchronizerWithOptions()
71+
72+
audio := fakeAudio48k(0xA001)
73+
ts1 := s.AddTrack(audio, "p1")
74+
ts1.Initialize(pkt(1000).Packet)
75+
_, _ = ts1.GetPTS(pkt(1000))
76+
77+
// Push ~2s of audio (100 packets at 20ms)
78+
step := uint32(48000 * 20 / 1000) // 960 ticks
79+
cur := uint32(1000)
80+
for i := 0; i < 100; i++ {
81+
cur += step
82+
_, _ = ts1.GetPTS(pkt(cur))
83+
}
84+
85+
// Late track subscribes 3s after the first
86+
time.Sleep(3 * time.Second)
87+
88+
late := fakeAudio48k(0xA002)
89+
late.IDReturns("audio-late")
90+
ts2 := s.AddTrack(late, "p2")
91+
ts2.Initialize(pkt(5000).Packet)
92+
_, _ = ts2.GetPTS(pkt(5000))
93+
94+
// Push a few packets on late track
95+
cur2 := uint32(5000)
96+
for i := 0; i < 10; i++ {
97+
cur2 += step
98+
_, _ = ts2.GetPTS(pkt(cur2))
99+
}
100+
101+
s.End()
102+
103+
duration := time.Duration(s.GetEndedAt() - s.GetStartedAt())
104+
require.Less(t, duration, 5*time.Second,
105+
"duration must not be inflated by late track's base offset; got %v", duration)
106+
}
107+
68108
// Initialize + same timestamp returns previous adjusted value (near zero right after init)
69109
func TestInitialize_AndSameTimestamp(t *testing.T) {
70110
s := synchronizer.NewSynchronizerWithOptions(synchronizer.WithMaxTsDiff(50 * time.Millisecond))

0 commit comments

Comments
 (0)