From 5e1c03b7d080461d8142fbf74db49cdbf98b6b6f Mon Sep 17 00:00:00 2001 From: Milos Pesic Date: Thu, 28 Aug 2025 20:03:47 +0200 Subject: [PATCH 1/4] Decompose desired PTS offset into 2 parts One is fixed and determined on track start up, the other needs to be updated on SRs --- pkg/synchronizer/track.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/synchronizer/track.go b/pkg/synchronizer/track.go index 3b2adc3f..b21c5b55 100644 --- a/pkg/synchronizer/track.go +++ b/pkg/synchronizer/track.go @@ -59,6 +59,7 @@ type TrackSynchronizer struct { // offsets currentPTSOffset time.Duration // presentation timestamp offset (used for a/v sync) desiredPTSOffset time.Duration // desired presentation timestamp offset (used for a/v sync) + basePTSOffset time.Duration // component of the desired PTS offset (set initially to preseve initial offset) // sender reports lastSR uint32 @@ -94,6 +95,7 @@ func (t *TrackSynchronizer) Initialize(pkt *rtp.Packet) { t.currentPTSOffset = time.Duration(now.UnixNano() - startedAt) t.desiredPTSOffset = t.currentPTSOffset + t.basePTSOffset = t.desiredPTSOffset t.startRTP = pkt.Timestamp t.lastTS = pkt.Timestamp @@ -171,7 +173,7 @@ func (t *TrackSynchronizer) onSenderReport(pkt *rtcp.SenderReport) { return } - t.desiredPTSOffset += offset + t.desiredPTSOffset = t.basePTSOffset + offset t.lastSR = pkt.RTPTime } From 19f168c9a01fda2d2069ed5dc5e3113fae9d6bfc Mon Sep 17 00:00:00 2001 From: Milos Pesic Date: Fri, 29 Aug 2025 13:50:21 +0200 Subject: [PATCH 2/4] Adding basic track syncrhonizer unit tests --- go.mod | 7 + go.sum | 4 + pkg/synchronizer/synchronizer_test.go | 254 +++++++++++++++ .../synchronizerfakes/fake_track_remote.go | 290 ++++++++++++++++++ pkg/synchronizer/track.go | 3 + 5 files changed, 558 insertions(+) create mode 100644 pkg/synchronizer/synchronizer_test.go create mode 100644 pkg/synchronizer/synchronizerfakes/fake_track_remote.go diff --git a/go.mod b/go.mod index 84a3a243..cda30743 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,11 @@ require ( require golang.org/x/mod v0.27.0 +require ( + github.com/maxbrunsfeld/counterfeiter/v6 v6.11.3 // indirect + golang.org/x/tools v0.36.0 // indirect +) + require ( buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.8-20250717185734-6c6e0d3c608e.1 // indirect buf.build/go/protovalidate v0.14.0 // indirect @@ -87,3 +92,5 @@ require ( gopkg.in/hraban/opus.v2 v2.0.0-20230925203106-0188a62cb302 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +tool github.com/maxbrunsfeld/counterfeiter/v6 diff --git a/go.sum b/go.sum index 535ad95e..218564b1 100644 --- a/go.sum +++ b/go.sum @@ -105,6 +105,8 @@ github.com/livekit/psrpc v0.6.1-0.20250726180611-3915e005e741 h1:KKL1u94l6dF9u4c github.com/livekit/psrpc v0.6.1-0.20250726180611-3915e005e741/go.mod h1:AuDC5uOoEjQJEc69v4Li3t77Ocz0e0NdjQEuFfO+vfk= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= +github.com/maxbrunsfeld/counterfeiter/v6 v6.11.3 h1:Eaq36EIyJNp7b3qDhjV7jmDVq/yPeW2v4pTqzGbOGB4= +github.com/maxbrunsfeld/counterfeiter/v6 v6.11.3/go.mod h1:6KKUoQBZBW6PDXJtNfqeEjPXMj/ITTk+cWK9t9uS5+E= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= @@ -267,6 +269,8 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/tools v0.36.0 h1:kWS0uv/zsvHEle1LbV5LE8QujrxB3wfQyxHfhOk0Qkg= +golang.org/x/tools v0.36.0/go.mod h1:WBDiHKJK8YgLHlcQPYQzNCkUxUypCaa5ZegCVutKm+s= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= diff --git a/pkg/synchronizer/synchronizer_test.go b/pkg/synchronizer/synchronizer_test.go new file mode 100644 index 00000000..b24688f5 --- /dev/null +++ b/pkg/synchronizer/synchronizer_test.go @@ -0,0 +1,254 @@ +package synchronizer_test + +import ( + "testing" + "time" + + "github.com/pion/rtcp" + "github.com/pion/rtp" + "github.com/pion/webrtc/v4" + "github.com/stretchr/testify/require" + + "github.com/livekit/mediatransportutil" + "github.com/livekit/server-sdk-go/v2/pkg/synchronizer" + "github.com/livekit/server-sdk-go/v2/pkg/synchronizer/synchronizerfakes" +) + +// ---------- helpers ---------- + +func near(t *testing.T, got, want, tol time.Duration) { + t.Helper() + d := got - want + if d < 0 { + d = -d + } + require.LessOrEqualf(t, d, tol, "got %v, want ~%v (±%v)", got, want, tol) +} + +func pkt(ts uint32) *rtp.Packet { + return &rtp.Packet{Header: rtp.Header{Timestamp: ts}} +} + +func fakeAudio48k(ssrc uint32) *synchronizerfakes.FakeTrackRemote { + f := &synchronizerfakes.FakeTrackRemote{} + f.IDReturns("audio-1") + f.KindReturns(webrtc.RTPCodecTypeAudio) + f.SSRCReturns(webrtc.SSRC(ssrc)) + f.CodecReturns(webrtc.RTPCodecParameters{ + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeOpus, + ClockRate: 48000, + }, + }) + return f +} + +func fakeVideo90k(ssrc uint32) *synchronizerfakes.FakeTrackRemote { + f := &synchronizerfakes.FakeTrackRemote{} + f.IDReturns("video-1") + f.KindReturns(webrtc.RTPCodecTypeVideo) + f.SSRCReturns(webrtc.SSRC(ssrc)) + f.CodecReturns(webrtc.RTPCodecParameters{ + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeVP8, // codec value not important here + ClockRate: 90000, + }, + }) + return f +} + +// ---------- tests ---------- + +// Initialize + same timestamp returns previous adjusted value (near zero right after init) +func TestInitialize_AndSameTimestamp(t *testing.T) { + s := synchronizer.NewSynchronizerWithOptions(synchronizer.WithMaxTsDiff(50 * time.Millisecond)) + tr := fakeAudio48k(0xA0010001) + + ts := uint32(1_000_000) + tsync := s.AddTrack(tr, "p1") + + tsync.Initialize(pkt(ts)) + + adj0, err := tsync.GetPTS(pkt(ts)) + require.NoError(t, err) + near(t, adj0, 0, 5*time.Millisecond) + + adj1, err := tsync.GetPTS(pkt(ts)) + require.NoError(t, err) + near(t, adj1, adj0, 1*time.Millisecond) +} + +// 20 ms RTP step at 48 kHz → ~20 ms adjusted PTS (with small tolerance) +func TestMonotonicPTS_SmallRTPDelta(t *testing.T) { + s := synchronizer.NewSynchronizerWithOptions(synchronizer.WithMaxTsDiff(50 * time.Millisecond)) + tr := fakeAudio48k(0xA0010002) + + ts0 := uint32(500_000) + delta20ms := uint32(48000 * 20 / 1000) // 960 ticks + + tsync := s.AddTrack(tr, "p1") + tsync.Initialize(pkt(ts0)) + + // establish startTime + _, _ = tsync.GetPTS(pkt(ts0)) + + adj, err := tsync.GetPTS(pkt(ts0 + delta20ms)) + require.NoError(t, err) + near(t, adj, 20*time.Millisecond, 3*time.Millisecond) +} + +// Large RTP jump with tight MaxTsDiff should reset to small estimatedPTS (not ~2s) +func TestUnacceptableDrift_ResetsToEstimatedPTS(t *testing.T) { + s := synchronizer.NewSynchronizerWithOptions(synchronizer.WithMaxTsDiff(10 * time.Millisecond)) + tr := fakeAudio48k(0xA0010003) + + ts0 := uint32(777_000) + delta2s := uint32(48000 * 2) // 96,000 ticks + + tsync := s.AddTrack(tr, "p1") + tsync.Initialize(pkt(ts0)) + + // establish startTime + _, _ = tsync.GetPTS(pkt(ts0)) + + adj, err := tsync.GetPTS(pkt(ts0 + delta2s)) + require.NoError(t, err) + require.LessOrEqual(t, adj, 150*time.Millisecond, "expected reset to small estimatedPTS, not ~2s") +} + +func TestOnSenderReport_SlewsTowardDesiredOffset(t *testing.T) { + const ( + maxAdjustment = 5 * time.Millisecond + ts0 = uint32(900_000) + stepRTP = uint32(48000 * 20 / 1000) // 20 ms @ 48 kHz + stepDur = 20 * time.Millisecond + desired = 50 * time.Millisecond // target offset from SR + tol = 5 * time.Millisecond + ) + + s := synchronizer.NewSynchronizerWithOptions(synchronizer.WithMaxTsDiff(1 * time.Second)) + tr := fakeAudio48k(0xA0010004) + + tsync := s.AddTrack(tr, "p1") + tsync.Initialize(pkt(ts0)) + + // Anchor wall-clock just before startTime is set. + t0 := time.Now() + _, _ = tsync.GetPTS(pkt(ts0)) // sets startTime ≈ t0 + + cur := ts0 + stepRTP + baseAdj, _ := tsync.GetPTS(pkt(cur)) // baseline adjusted PTS with 20ms content + + // Craft SR so offset ≈ desired: + // offset := NTP - (startTime + pts_at_SR) + // pts_at_SR here is 20ms, startTime ≈ t0 → set NTP to t0 + 20ms + desired + ntp := mediatransportutil.ToNtpTime(t0.Add(stepDur + desired)) + tsync.OnSenderReport(func(d time.Duration) {}) + s.OnRTCP(&rtcp.SenderReport{ + SSRC: uint32(tr.SSRC()), + NTPTime: uint64(ntp), + RTPTime: cur, + }) + + // Converge in N = ceil(desired / 5ms) steps (5ms maxAdjustment) + N := int((desired + 5*time.Millisecond - 1) / (5 * time.Millisecond)) + + for i := 0; i < N; i++ { + cur += stepRTP + _, err := tsync.GetPTS(pkt(cur)) + require.NoError(t, err) + } + + // After N steps, total adjusted delta over base should be: + // content progression (N * 20ms) + desired (slew) + finalAdj, err := tsync.GetPTS(pkt(cur)) // same TS → returns last adjusted + require.NoError(t, err) + + gotDelta := finalAdj - baseAdj + wantDelta := time.Duration(N)*stepDur + desired + + near(t, gotDelta, wantDelta, tol) +} + +// Regression: late video start (~2s) + tiny SR offset (~10ms) must NOT emit a big negative drift +func TestOnSenderReport_LateVideoStart_SmallSROffset_NoHugeNegativeDrift(t *testing.T) { + const ( + lateStart = 2 * time.Second + srOffset = 50 * time.Millisecond + + stepSlew = 5 * time.Millisecond // TrackSynchronizer's maxAdjustment + tol = 5 * time.Millisecond // tolerance for rounding/timing + ) + + s := synchronizer.NewSynchronizerWithOptions(synchronizer.WithMaxTsDiff(2 * time.Second)) + + // 1) Audio publishes immediately → establishes startedAt + audio := fakeAudio48k(0xA0010005) + tsA0 := uint32(1_000_000) + aSync := s.AddTrack(audio, "p1") + aSync.Initialize(pkt(tsA0)) + _, _ = aSync.GetPTS(pkt(tsA0)) + + // Simulate a real late video publish + time.Sleep(lateStart) + + // 2) Video publishes later + video := fakeVideo90k(0x00BEEF01) + tsV0 := uint32(2_000_000) + vSync := s.AddTrack(video, "p1") + vSync.Initialize(pkt(tsV0)) + _, _ = vSync.GetPTS(pkt(tsV0)) + + // 3) First SR: small positive drift + ntp := mediatransportutil.ToNtpTime(time.Now().Add(srOffset)) + var observedDrift time.Duration + vSync.OnSenderReport(func(d time.Duration) { observedDrift = d }) + + s.OnRTCP(&rtcp.SenderReport{ + SSRC: uint32(video.SSRC()), + NTPTime: uint64(ntp), + RTPTime: tsV0, // equals lastTS → SR uses lastPTS at tsV0 + }) + + near(t, observedDrift, srOffset, tol) + + // baseline adjusted PTS at the SR moment (same TS => returns last adjusted) + baseAdj, err := vSync.GetPTS(pkt(tsV0)) + require.NoError(t, err) + + step33ms := uint32(90000 * 33 / 1000) // ~33 ms per 30fps frame at 90 kHz + stepDur := 33 * time.Millisecond + + // Converge in N = ceil(srOffset / stepSlew) steps (50ms / 5ms = 10) + N := int((srOffset + stepSlew - 1) / stepSlew) + + cur := tsV0 + var adj time.Duration + + // Drive N frames to converge + for i := 1; i <= N; i++ { + cur += step33ms + adj, err = vSync.GetPTS(pkt(cur)) + require.NoError(t, err) + } + + // After N steps, the extra beyond content cadence should be ~srOffset + gotDelta := adj - baseAdj + wantDelta := time.Duration(N)*stepDur + srOffset + near(t, gotDelta, wantDelta, tol) + + // Now push more frames and ensure we DON'T keep slewing (stays near srOffset) + const extraFrames = 8 + for i := 1; i <= extraFrames; i++ { + cur += step33ms + adj, err = vSync.GetPTS(pkt(cur)) + require.NoError(t, err) + + // Extra slew measured from the SR moment: + totalContent := time.Duration(N+i) * stepDur + extra := (adj - baseAdj) - totalContent + + // Stay within a small band around srOffset (no steady growth) + near(t, extra, srOffset, tol) + } +} diff --git a/pkg/synchronizer/synchronizerfakes/fake_track_remote.go b/pkg/synchronizer/synchronizerfakes/fake_track_remote.go new file mode 100644 index 00000000..63d051c4 --- /dev/null +++ b/pkg/synchronizer/synchronizerfakes/fake_track_remote.go @@ -0,0 +1,290 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package synchronizerfakes + +import ( + "sync" + + "github.com/livekit/server-sdk-go/v2/pkg/synchronizer" + webrtc "github.com/pion/webrtc/v4" +) + +type FakeTrackRemote struct { + CodecStub func() webrtc.RTPCodecParameters + codecMutex sync.RWMutex + codecArgsForCall []struct { + } + codecReturns struct { + result1 webrtc.RTPCodecParameters + } + codecReturnsOnCall map[int]struct { + result1 webrtc.RTPCodecParameters + } + IDStub func() string + iDMutex sync.RWMutex + iDArgsForCall []struct { + } + iDReturns struct { + result1 string + } + iDReturnsOnCall map[int]struct { + result1 string + } + KindStub func() webrtc.RTPCodecType + kindMutex sync.RWMutex + kindArgsForCall []struct { + } + kindReturns struct { + result1 webrtc.RTPCodecType + } + kindReturnsOnCall map[int]struct { + result1 webrtc.RTPCodecType + } + SSRCStub func() webrtc.SSRC + sSRCMutex sync.RWMutex + sSRCArgsForCall []struct { + } + sSRCReturns struct { + result1 webrtc.SSRC + } + sSRCReturnsOnCall map[int]struct { + result1 webrtc.SSRC + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeTrackRemote) Codec() webrtc.RTPCodecParameters { + fake.codecMutex.Lock() + ret, specificReturn := fake.codecReturnsOnCall[len(fake.codecArgsForCall)] + fake.codecArgsForCall = append(fake.codecArgsForCall, struct { + }{}) + stub := fake.CodecStub + fakeReturns := fake.codecReturns + fake.recordInvocation("Codec", []interface{}{}) + fake.codecMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeTrackRemote) CodecCallCount() int { + fake.codecMutex.RLock() + defer fake.codecMutex.RUnlock() + return len(fake.codecArgsForCall) +} + +func (fake *FakeTrackRemote) CodecCalls(stub func() webrtc.RTPCodecParameters) { + fake.codecMutex.Lock() + defer fake.codecMutex.Unlock() + fake.CodecStub = stub +} + +func (fake *FakeTrackRemote) CodecReturns(result1 webrtc.RTPCodecParameters) { + fake.codecMutex.Lock() + defer fake.codecMutex.Unlock() + fake.CodecStub = nil + fake.codecReturns = struct { + result1 webrtc.RTPCodecParameters + }{result1} +} + +func (fake *FakeTrackRemote) CodecReturnsOnCall(i int, result1 webrtc.RTPCodecParameters) { + fake.codecMutex.Lock() + defer fake.codecMutex.Unlock() + fake.CodecStub = nil + if fake.codecReturnsOnCall == nil { + fake.codecReturnsOnCall = make(map[int]struct { + result1 webrtc.RTPCodecParameters + }) + } + fake.codecReturnsOnCall[i] = struct { + result1 webrtc.RTPCodecParameters + }{result1} +} + +func (fake *FakeTrackRemote) ID() string { + fake.iDMutex.Lock() + ret, specificReturn := fake.iDReturnsOnCall[len(fake.iDArgsForCall)] + fake.iDArgsForCall = append(fake.iDArgsForCall, struct { + }{}) + stub := fake.IDStub + fakeReturns := fake.iDReturns + fake.recordInvocation("ID", []interface{}{}) + fake.iDMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeTrackRemote) IDCallCount() int { + fake.iDMutex.RLock() + defer fake.iDMutex.RUnlock() + return len(fake.iDArgsForCall) +} + +func (fake *FakeTrackRemote) IDCalls(stub func() string) { + fake.iDMutex.Lock() + defer fake.iDMutex.Unlock() + fake.IDStub = stub +} + +func (fake *FakeTrackRemote) IDReturns(result1 string) { + fake.iDMutex.Lock() + defer fake.iDMutex.Unlock() + fake.IDStub = nil + fake.iDReturns = struct { + result1 string + }{result1} +} + +func (fake *FakeTrackRemote) IDReturnsOnCall(i int, result1 string) { + fake.iDMutex.Lock() + defer fake.iDMutex.Unlock() + fake.IDStub = nil + if fake.iDReturnsOnCall == nil { + fake.iDReturnsOnCall = make(map[int]struct { + result1 string + }) + } + fake.iDReturnsOnCall[i] = struct { + result1 string + }{result1} +} + +func (fake *FakeTrackRemote) Kind() webrtc.RTPCodecType { + fake.kindMutex.Lock() + ret, specificReturn := fake.kindReturnsOnCall[len(fake.kindArgsForCall)] + fake.kindArgsForCall = append(fake.kindArgsForCall, struct { + }{}) + stub := fake.KindStub + fakeReturns := fake.kindReturns + fake.recordInvocation("Kind", []interface{}{}) + fake.kindMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeTrackRemote) KindCallCount() int { + fake.kindMutex.RLock() + defer fake.kindMutex.RUnlock() + return len(fake.kindArgsForCall) +} + +func (fake *FakeTrackRemote) KindCalls(stub func() webrtc.RTPCodecType) { + fake.kindMutex.Lock() + defer fake.kindMutex.Unlock() + fake.KindStub = stub +} + +func (fake *FakeTrackRemote) KindReturns(result1 webrtc.RTPCodecType) { + fake.kindMutex.Lock() + defer fake.kindMutex.Unlock() + fake.KindStub = nil + fake.kindReturns = struct { + result1 webrtc.RTPCodecType + }{result1} +} + +func (fake *FakeTrackRemote) KindReturnsOnCall(i int, result1 webrtc.RTPCodecType) { + fake.kindMutex.Lock() + defer fake.kindMutex.Unlock() + fake.KindStub = nil + if fake.kindReturnsOnCall == nil { + fake.kindReturnsOnCall = make(map[int]struct { + result1 webrtc.RTPCodecType + }) + } + fake.kindReturnsOnCall[i] = struct { + result1 webrtc.RTPCodecType + }{result1} +} + +func (fake *FakeTrackRemote) SSRC() webrtc.SSRC { + fake.sSRCMutex.Lock() + ret, specificReturn := fake.sSRCReturnsOnCall[len(fake.sSRCArgsForCall)] + fake.sSRCArgsForCall = append(fake.sSRCArgsForCall, struct { + }{}) + stub := fake.SSRCStub + fakeReturns := fake.sSRCReturns + fake.recordInvocation("SSRC", []interface{}{}) + fake.sSRCMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeTrackRemote) SSRCCallCount() int { + fake.sSRCMutex.RLock() + defer fake.sSRCMutex.RUnlock() + return len(fake.sSRCArgsForCall) +} + +func (fake *FakeTrackRemote) SSRCCalls(stub func() webrtc.SSRC) { + fake.sSRCMutex.Lock() + defer fake.sSRCMutex.Unlock() + fake.SSRCStub = stub +} + +func (fake *FakeTrackRemote) SSRCReturns(result1 webrtc.SSRC) { + fake.sSRCMutex.Lock() + defer fake.sSRCMutex.Unlock() + fake.SSRCStub = nil + fake.sSRCReturns = struct { + result1 webrtc.SSRC + }{result1} +} + +func (fake *FakeTrackRemote) SSRCReturnsOnCall(i int, result1 webrtc.SSRC) { + fake.sSRCMutex.Lock() + defer fake.sSRCMutex.Unlock() + fake.SSRCStub = nil + if fake.sSRCReturnsOnCall == nil { + fake.sSRCReturnsOnCall = make(map[int]struct { + result1 webrtc.SSRC + }) + } + fake.sSRCReturnsOnCall[i] = struct { + result1 webrtc.SSRC + }{result1} +} + +func (fake *FakeTrackRemote) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeTrackRemote) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ synchronizer.TrackRemote = new(FakeTrackRemote) diff --git a/pkg/synchronizer/track.go b/pkg/synchronizer/track.go index b21c5b55..b5a44f4b 100644 --- a/pkg/synchronizer/track.go +++ b/pkg/synchronizer/track.go @@ -31,6 +31,9 @@ const ( maxAdjustment = time.Millisecond * 5 ) +//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate + +//counterfeiter:generate . TrackRemote type TrackRemote interface { ID() string Codec() webrtc.RTPCodecParameters From 7dfabcdf8006bba9406fe846641346c3b6ed938c Mon Sep 17 00:00:00 2001 From: Milos Pesic Date: Fri, 29 Aug 2025 16:22:51 +0200 Subject: [PATCH 3/4] increasing time cmp tolerance --- pkg/synchronizer/synchronizer_test.go | 21 ++++++++++----------- pkg/synchronizer/track.go | 2 +- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/pkg/synchronizer/synchronizer_test.go b/pkg/synchronizer/synchronizer_test.go index b24688f5..538042f6 100644 --- a/pkg/synchronizer/synchronizer_test.go +++ b/pkg/synchronizer/synchronizer_test.go @@ -14,6 +14,8 @@ import ( "github.com/livekit/server-sdk-go/v2/pkg/synchronizer/synchronizerfakes" ) +const timeTolerance = time.Millisecond * 10 + // ---------- helpers ---------- func near(t *testing.T, got, want, tol time.Duration) { @@ -71,11 +73,11 @@ func TestInitialize_AndSameTimestamp(t *testing.T) { adj0, err := tsync.GetPTS(pkt(ts)) require.NoError(t, err) - near(t, adj0, 0, 5*time.Millisecond) + near(t, adj0, 0, timeTolerance) adj1, err := tsync.GetPTS(pkt(ts)) require.NoError(t, err) - near(t, adj1, adj0, 1*time.Millisecond) + require.Equal(t, adj0, adj1) } // 20 ms RTP step at 48 kHz → ~20 ms adjusted PTS (with small tolerance) @@ -94,7 +96,7 @@ func TestMonotonicPTS_SmallRTPDelta(t *testing.T) { adj, err := tsync.GetPTS(pkt(ts0 + delta20ms)) require.NoError(t, err) - near(t, adj, 20*time.Millisecond, 3*time.Millisecond) + near(t, adj, 20*time.Millisecond, timeTolerance) } // Large RTP jump with tight MaxTsDiff should reset to small estimatedPTS (not ~2s) @@ -123,7 +125,6 @@ func TestOnSenderReport_SlewsTowardDesiredOffset(t *testing.T) { stepRTP = uint32(48000 * 20 / 1000) // 20 ms @ 48 kHz stepDur = 20 * time.Millisecond desired = 50 * time.Millisecond // target offset from SR - tol = 5 * time.Millisecond ) s := synchronizer.NewSynchronizerWithOptions(synchronizer.WithMaxTsDiff(1 * time.Second)) @@ -167,7 +168,7 @@ func TestOnSenderReport_SlewsTowardDesiredOffset(t *testing.T) { gotDelta := finalAdj - baseAdj wantDelta := time.Duration(N)*stepDur + desired - near(t, gotDelta, wantDelta, tol) + near(t, gotDelta, wantDelta, timeTolerance) } // Regression: late video start (~2s) + tiny SR offset (~10ms) must NOT emit a big negative drift @@ -175,9 +176,7 @@ func TestOnSenderReport_LateVideoStart_SmallSROffset_NoHugeNegativeDrift(t *test const ( lateStart = 2 * time.Second srOffset = 50 * time.Millisecond - - stepSlew = 5 * time.Millisecond // TrackSynchronizer's maxAdjustment - tol = 5 * time.Millisecond // tolerance for rounding/timing + stepSlew = 5 * time.Millisecond // TrackSynchronizer's maxAdjustment ) s := synchronizer.NewSynchronizerWithOptions(synchronizer.WithMaxTsDiff(2 * time.Second)) @@ -210,7 +209,7 @@ func TestOnSenderReport_LateVideoStart_SmallSROffset_NoHugeNegativeDrift(t *test RTPTime: tsV0, // equals lastTS → SR uses lastPTS at tsV0 }) - near(t, observedDrift, srOffset, tol) + near(t, observedDrift, srOffset, timeTolerance) // baseline adjusted PTS at the SR moment (same TS => returns last adjusted) baseAdj, err := vSync.GetPTS(pkt(tsV0)) @@ -235,7 +234,7 @@ func TestOnSenderReport_LateVideoStart_SmallSROffset_NoHugeNegativeDrift(t *test // After N steps, the extra beyond content cadence should be ~srOffset gotDelta := adj - baseAdj wantDelta := time.Duration(N)*stepDur + srOffset - near(t, gotDelta, wantDelta, tol) + near(t, gotDelta, wantDelta, timeTolerance) // Now push more frames and ensure we DON'T keep slewing (stays near srOffset) const extraFrames = 8 @@ -249,6 +248,6 @@ func TestOnSenderReport_LateVideoStart_SmallSROffset_NoHugeNegativeDrift(t *test extra := (adj - baseAdj) - totalContent // Stay within a small band around srOffset (no steady growth) - near(t, extra, srOffset, tol) + near(t, extra, srOffset, timeTolerance) } } diff --git a/pkg/synchronizer/track.go b/pkg/synchronizer/track.go index b5a44f4b..f78212a6 100644 --- a/pkg/synchronizer/track.go +++ b/pkg/synchronizer/track.go @@ -62,7 +62,7 @@ type TrackSynchronizer struct { // offsets currentPTSOffset time.Duration // presentation timestamp offset (used for a/v sync) desiredPTSOffset time.Duration // desired presentation timestamp offset (used for a/v sync) - basePTSOffset time.Duration // component of the desired PTS offset (set initially to preseve initial offset) + basePTSOffset time.Duration // component of the desired PTS offset (set initially to preserve initial offset) // sender reports lastSR uint32 From aff2f5e07cb4a6e85485616160f403b6529318f9 Mon Sep 17 00:00:00 2001 From: Milos Pesic Date: Fri, 29 Aug 2025 17:07:53 +0200 Subject: [PATCH 4/4] Adding slack notifier action --- .github/workflows/slack-notifier.yaml | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 .github/workflows/slack-notifier.yaml diff --git a/.github/workflows/slack-notifier.yaml b/.github/workflows/slack-notifier.yaml new file mode 100644 index 00000000..7770345b --- /dev/null +++ b/.github/workflows/slack-notifier.yaml @@ -0,0 +1,25 @@ +name: PR Slack Notifier + +on: + pull_request: + types: [review_requested, reopened, closed] + pull_request_review: + types: [submitted] + +permissions: + contents: read + pull-requests: write + issues: write + +concurrency: + group: pr-slack-${{ github.event.pull_request.number }}-${{ github.workflow }} + cancel-in-progress: false + +jobs: + notify-devs: + runs-on: ubuntu-latest + steps: + - uses: livekit/slack-notifier-action@main + with: + config_json: ${{ secrets.SLACK_NOTIFY_CONFIG_JSON }} + slack_token: ${{ secrets.SLACK_PR_NOTIFIER_TOKEN }}