diff --git a/.github/workflows/slack-notifier.yaml b/.github/workflows/slack-notifier.yaml new file mode 100644 index 00000000..7770345b --- /dev/null +++ b/.github/workflows/slack-notifier.yaml @@ -0,0 +1,25 @@ +name: PR Slack Notifier + +on: + pull_request: + types: [review_requested, reopened, closed] + pull_request_review: + types: [submitted] + +permissions: + contents: read + pull-requests: write + issues: write + +concurrency: + group: pr-slack-${{ github.event.pull_request.number }}-${{ github.workflow }} + cancel-in-progress: false + +jobs: + notify-devs: + runs-on: ubuntu-latest + steps: + - uses: livekit/slack-notifier-action@main + with: + config_json: ${{ secrets.SLACK_NOTIFY_CONFIG_JSON }} + slack_token: ${{ secrets.SLACK_PR_NOTIFIER_TOKEN }} diff --git a/go.mod b/go.mod index 84a3a243..cda30743 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,11 @@ require ( require golang.org/x/mod v0.27.0 +require ( + github.com/maxbrunsfeld/counterfeiter/v6 v6.11.3 // indirect + golang.org/x/tools v0.36.0 // indirect +) + require ( buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.8-20250717185734-6c6e0d3c608e.1 // indirect buf.build/go/protovalidate v0.14.0 // indirect @@ -87,3 +92,5 @@ require ( gopkg.in/hraban/opus.v2 v2.0.0-20230925203106-0188a62cb302 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +tool github.com/maxbrunsfeld/counterfeiter/v6 diff --git a/go.sum b/go.sum index 535ad95e..218564b1 100644 --- a/go.sum +++ b/go.sum @@ -105,6 +105,8 @@ github.com/livekit/psrpc v0.6.1-0.20250726180611-3915e005e741 h1:KKL1u94l6dF9u4c github.com/livekit/psrpc v0.6.1-0.20250726180611-3915e005e741/go.mod h1:AuDC5uOoEjQJEc69v4Li3t77Ocz0e0NdjQEuFfO+vfk= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= +github.com/maxbrunsfeld/counterfeiter/v6 v6.11.3 h1:Eaq36EIyJNp7b3qDhjV7jmDVq/yPeW2v4pTqzGbOGB4= +github.com/maxbrunsfeld/counterfeiter/v6 v6.11.3/go.mod h1:6KKUoQBZBW6PDXJtNfqeEjPXMj/ITTk+cWK9t9uS5+E= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= @@ -267,6 +269,8 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/tools v0.36.0 h1:kWS0uv/zsvHEle1LbV5LE8QujrxB3wfQyxHfhOk0Qkg= +golang.org/x/tools v0.36.0/go.mod h1:WBDiHKJK8YgLHlcQPYQzNCkUxUypCaa5ZegCVutKm+s= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= diff --git a/pkg/synchronizer/synchronizer_test.go b/pkg/synchronizer/synchronizer_test.go new file mode 100644 index 00000000..538042f6 --- /dev/null +++ b/pkg/synchronizer/synchronizer_test.go @@ -0,0 +1,253 @@ +package synchronizer_test + +import ( + "testing" + "time" + + "github.com/pion/rtcp" + "github.com/pion/rtp" + "github.com/pion/webrtc/v4" + "github.com/stretchr/testify/require" + + "github.com/livekit/mediatransportutil" + "github.com/livekit/server-sdk-go/v2/pkg/synchronizer" + "github.com/livekit/server-sdk-go/v2/pkg/synchronizer/synchronizerfakes" +) + +const timeTolerance = time.Millisecond * 10 + +// ---------- helpers ---------- + +func near(t *testing.T, got, want, tol time.Duration) { + t.Helper() + d := got - want + if d < 0 { + d = -d + } + require.LessOrEqualf(t, d, tol, "got %v, want ~%v (±%v)", got, want, tol) +} + +func pkt(ts uint32) *rtp.Packet { + return &rtp.Packet{Header: rtp.Header{Timestamp: ts}} +} + +func fakeAudio48k(ssrc uint32) *synchronizerfakes.FakeTrackRemote { + f := &synchronizerfakes.FakeTrackRemote{} + f.IDReturns("audio-1") + f.KindReturns(webrtc.RTPCodecTypeAudio) + f.SSRCReturns(webrtc.SSRC(ssrc)) + f.CodecReturns(webrtc.RTPCodecParameters{ + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeOpus, + ClockRate: 48000, + }, + }) + return f +} + +func fakeVideo90k(ssrc uint32) *synchronizerfakes.FakeTrackRemote { + f := &synchronizerfakes.FakeTrackRemote{} + f.IDReturns("video-1") + f.KindReturns(webrtc.RTPCodecTypeVideo) + f.SSRCReturns(webrtc.SSRC(ssrc)) + f.CodecReturns(webrtc.RTPCodecParameters{ + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeVP8, // codec value not important here + ClockRate: 90000, + }, + }) + return f +} + +// ---------- tests ---------- + +// Initialize + same timestamp returns previous adjusted value (near zero right after init) +func TestInitialize_AndSameTimestamp(t *testing.T) { + s := synchronizer.NewSynchronizerWithOptions(synchronizer.WithMaxTsDiff(50 * time.Millisecond)) + tr := fakeAudio48k(0xA0010001) + + ts := uint32(1_000_000) + tsync := s.AddTrack(tr, "p1") + + tsync.Initialize(pkt(ts)) + + adj0, err := tsync.GetPTS(pkt(ts)) + require.NoError(t, err) + near(t, adj0, 0, timeTolerance) + + adj1, err := tsync.GetPTS(pkt(ts)) + require.NoError(t, err) + require.Equal(t, adj0, adj1) +} + +// 20 ms RTP step at 48 kHz → ~20 ms adjusted PTS (with small tolerance) +func TestMonotonicPTS_SmallRTPDelta(t *testing.T) { + s := synchronizer.NewSynchronizerWithOptions(synchronizer.WithMaxTsDiff(50 * time.Millisecond)) + tr := fakeAudio48k(0xA0010002) + + ts0 := uint32(500_000) + delta20ms := uint32(48000 * 20 / 1000) // 960 ticks + + tsync := s.AddTrack(tr, "p1") + tsync.Initialize(pkt(ts0)) + + // establish startTime + _, _ = tsync.GetPTS(pkt(ts0)) + + adj, err := tsync.GetPTS(pkt(ts0 + delta20ms)) + require.NoError(t, err) + near(t, adj, 20*time.Millisecond, timeTolerance) +} + +// Large RTP jump with tight MaxTsDiff should reset to small estimatedPTS (not ~2s) +func TestUnacceptableDrift_ResetsToEstimatedPTS(t *testing.T) { + s := synchronizer.NewSynchronizerWithOptions(synchronizer.WithMaxTsDiff(10 * time.Millisecond)) + tr := fakeAudio48k(0xA0010003) + + ts0 := uint32(777_000) + delta2s := uint32(48000 * 2) // 96,000 ticks + + tsync := s.AddTrack(tr, "p1") + tsync.Initialize(pkt(ts0)) + + // establish startTime + _, _ = tsync.GetPTS(pkt(ts0)) + + adj, err := tsync.GetPTS(pkt(ts0 + delta2s)) + require.NoError(t, err) + require.LessOrEqual(t, adj, 150*time.Millisecond, "expected reset to small estimatedPTS, not ~2s") +} + +func TestOnSenderReport_SlewsTowardDesiredOffset(t *testing.T) { + const ( + maxAdjustment = 5 * time.Millisecond + ts0 = uint32(900_000) + stepRTP = uint32(48000 * 20 / 1000) // 20 ms @ 48 kHz + stepDur = 20 * time.Millisecond + desired = 50 * time.Millisecond // target offset from SR + ) + + s := synchronizer.NewSynchronizerWithOptions(synchronizer.WithMaxTsDiff(1 * time.Second)) + tr := fakeAudio48k(0xA0010004) + + tsync := s.AddTrack(tr, "p1") + tsync.Initialize(pkt(ts0)) + + // Anchor wall-clock just before startTime is set. + t0 := time.Now() + _, _ = tsync.GetPTS(pkt(ts0)) // sets startTime ≈ t0 + + cur := ts0 + stepRTP + baseAdj, _ := tsync.GetPTS(pkt(cur)) // baseline adjusted PTS with 20ms content + + // Craft SR so offset ≈ desired: + // offset := NTP - (startTime + pts_at_SR) + // pts_at_SR here is 20ms, startTime ≈ t0 → set NTP to t0 + 20ms + desired + ntp := mediatransportutil.ToNtpTime(t0.Add(stepDur + desired)) + tsync.OnSenderReport(func(d time.Duration) {}) + s.OnRTCP(&rtcp.SenderReport{ + SSRC: uint32(tr.SSRC()), + NTPTime: uint64(ntp), + RTPTime: cur, + }) + + // Converge in N = ceil(desired / 5ms) steps (5ms maxAdjustment) + N := int((desired + 5*time.Millisecond - 1) / (5 * time.Millisecond)) + + for i := 0; i < N; i++ { + cur += stepRTP + _, err := tsync.GetPTS(pkt(cur)) + require.NoError(t, err) + } + + // After N steps, total adjusted delta over base should be: + // content progression (N * 20ms) + desired (slew) + finalAdj, err := tsync.GetPTS(pkt(cur)) // same TS → returns last adjusted + require.NoError(t, err) + + gotDelta := finalAdj - baseAdj + wantDelta := time.Duration(N)*stepDur + desired + + near(t, gotDelta, wantDelta, timeTolerance) +} + +// Regression: late video start (~2s) + tiny SR offset (~10ms) must NOT emit a big negative drift +func TestOnSenderReport_LateVideoStart_SmallSROffset_NoHugeNegativeDrift(t *testing.T) { + const ( + lateStart = 2 * time.Second + srOffset = 50 * time.Millisecond + stepSlew = 5 * time.Millisecond // TrackSynchronizer's maxAdjustment + ) + + s := synchronizer.NewSynchronizerWithOptions(synchronizer.WithMaxTsDiff(2 * time.Second)) + + // 1) Audio publishes immediately → establishes startedAt + audio := fakeAudio48k(0xA0010005) + tsA0 := uint32(1_000_000) + aSync := s.AddTrack(audio, "p1") + aSync.Initialize(pkt(tsA0)) + _, _ = aSync.GetPTS(pkt(tsA0)) + + // Simulate a real late video publish + time.Sleep(lateStart) + + // 2) Video publishes later + video := fakeVideo90k(0x00BEEF01) + tsV0 := uint32(2_000_000) + vSync := s.AddTrack(video, "p1") + vSync.Initialize(pkt(tsV0)) + _, _ = vSync.GetPTS(pkt(tsV0)) + + // 3) First SR: small positive drift + ntp := mediatransportutil.ToNtpTime(time.Now().Add(srOffset)) + var observedDrift time.Duration + vSync.OnSenderReport(func(d time.Duration) { observedDrift = d }) + + s.OnRTCP(&rtcp.SenderReport{ + SSRC: uint32(video.SSRC()), + NTPTime: uint64(ntp), + RTPTime: tsV0, // equals lastTS → SR uses lastPTS at tsV0 + }) + + near(t, observedDrift, srOffset, timeTolerance) + + // baseline adjusted PTS at the SR moment (same TS => returns last adjusted) + baseAdj, err := vSync.GetPTS(pkt(tsV0)) + require.NoError(t, err) + + step33ms := uint32(90000 * 33 / 1000) // ~33 ms per 30fps frame at 90 kHz + stepDur := 33 * time.Millisecond + + // Converge in N = ceil(srOffset / stepSlew) steps (50ms / 5ms = 10) + N := int((srOffset + stepSlew - 1) / stepSlew) + + cur := tsV0 + var adj time.Duration + + // Drive N frames to converge + for i := 1; i <= N; i++ { + cur += step33ms + adj, err = vSync.GetPTS(pkt(cur)) + require.NoError(t, err) + } + + // After N steps, the extra beyond content cadence should be ~srOffset + gotDelta := adj - baseAdj + wantDelta := time.Duration(N)*stepDur + srOffset + near(t, gotDelta, wantDelta, timeTolerance) + + // Now push more frames and ensure we DON'T keep slewing (stays near srOffset) + const extraFrames = 8 + for i := 1; i <= extraFrames; i++ { + cur += step33ms + adj, err = vSync.GetPTS(pkt(cur)) + require.NoError(t, err) + + // Extra slew measured from the SR moment: + totalContent := time.Duration(N+i) * stepDur + extra := (adj - baseAdj) - totalContent + + // Stay within a small band around srOffset (no steady growth) + near(t, extra, srOffset, timeTolerance) + } +} diff --git a/pkg/synchronizer/synchronizerfakes/fake_track_remote.go b/pkg/synchronizer/synchronizerfakes/fake_track_remote.go new file mode 100644 index 00000000..63d051c4 --- /dev/null +++ b/pkg/synchronizer/synchronizerfakes/fake_track_remote.go @@ -0,0 +1,290 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package synchronizerfakes + +import ( + "sync" + + "github.com/livekit/server-sdk-go/v2/pkg/synchronizer" + webrtc "github.com/pion/webrtc/v4" +) + +type FakeTrackRemote struct { + CodecStub func() webrtc.RTPCodecParameters + codecMutex sync.RWMutex + codecArgsForCall []struct { + } + codecReturns struct { + result1 webrtc.RTPCodecParameters + } + codecReturnsOnCall map[int]struct { + result1 webrtc.RTPCodecParameters + } + IDStub func() string + iDMutex sync.RWMutex + iDArgsForCall []struct { + } + iDReturns struct { + result1 string + } + iDReturnsOnCall map[int]struct { + result1 string + } + KindStub func() webrtc.RTPCodecType + kindMutex sync.RWMutex + kindArgsForCall []struct { + } + kindReturns struct { + result1 webrtc.RTPCodecType + } + kindReturnsOnCall map[int]struct { + result1 webrtc.RTPCodecType + } + SSRCStub func() webrtc.SSRC + sSRCMutex sync.RWMutex + sSRCArgsForCall []struct { + } + sSRCReturns struct { + result1 webrtc.SSRC + } + sSRCReturnsOnCall map[int]struct { + result1 webrtc.SSRC + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeTrackRemote) Codec() webrtc.RTPCodecParameters { + fake.codecMutex.Lock() + ret, specificReturn := fake.codecReturnsOnCall[len(fake.codecArgsForCall)] + fake.codecArgsForCall = append(fake.codecArgsForCall, struct { + }{}) + stub := fake.CodecStub + fakeReturns := fake.codecReturns + fake.recordInvocation("Codec", []interface{}{}) + fake.codecMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeTrackRemote) CodecCallCount() int { + fake.codecMutex.RLock() + defer fake.codecMutex.RUnlock() + return len(fake.codecArgsForCall) +} + +func (fake *FakeTrackRemote) CodecCalls(stub func() webrtc.RTPCodecParameters) { + fake.codecMutex.Lock() + defer fake.codecMutex.Unlock() + fake.CodecStub = stub +} + +func (fake *FakeTrackRemote) CodecReturns(result1 webrtc.RTPCodecParameters) { + fake.codecMutex.Lock() + defer fake.codecMutex.Unlock() + fake.CodecStub = nil + fake.codecReturns = struct { + result1 webrtc.RTPCodecParameters + }{result1} +} + +func (fake *FakeTrackRemote) CodecReturnsOnCall(i int, result1 webrtc.RTPCodecParameters) { + fake.codecMutex.Lock() + defer fake.codecMutex.Unlock() + fake.CodecStub = nil + if fake.codecReturnsOnCall == nil { + fake.codecReturnsOnCall = make(map[int]struct { + result1 webrtc.RTPCodecParameters + }) + } + fake.codecReturnsOnCall[i] = struct { + result1 webrtc.RTPCodecParameters + }{result1} +} + +func (fake *FakeTrackRemote) ID() string { + fake.iDMutex.Lock() + ret, specificReturn := fake.iDReturnsOnCall[len(fake.iDArgsForCall)] + fake.iDArgsForCall = append(fake.iDArgsForCall, struct { + }{}) + stub := fake.IDStub + fakeReturns := fake.iDReturns + fake.recordInvocation("ID", []interface{}{}) + fake.iDMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeTrackRemote) IDCallCount() int { + fake.iDMutex.RLock() + defer fake.iDMutex.RUnlock() + return len(fake.iDArgsForCall) +} + +func (fake *FakeTrackRemote) IDCalls(stub func() string) { + fake.iDMutex.Lock() + defer fake.iDMutex.Unlock() + fake.IDStub = stub +} + +func (fake *FakeTrackRemote) IDReturns(result1 string) { + fake.iDMutex.Lock() + defer fake.iDMutex.Unlock() + fake.IDStub = nil + fake.iDReturns = struct { + result1 string + }{result1} +} + +func (fake *FakeTrackRemote) IDReturnsOnCall(i int, result1 string) { + fake.iDMutex.Lock() + defer fake.iDMutex.Unlock() + fake.IDStub = nil + if fake.iDReturnsOnCall == nil { + fake.iDReturnsOnCall = make(map[int]struct { + result1 string + }) + } + fake.iDReturnsOnCall[i] = struct { + result1 string + }{result1} +} + +func (fake *FakeTrackRemote) Kind() webrtc.RTPCodecType { + fake.kindMutex.Lock() + ret, specificReturn := fake.kindReturnsOnCall[len(fake.kindArgsForCall)] + fake.kindArgsForCall = append(fake.kindArgsForCall, struct { + }{}) + stub := fake.KindStub + fakeReturns := fake.kindReturns + fake.recordInvocation("Kind", []interface{}{}) + fake.kindMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeTrackRemote) KindCallCount() int { + fake.kindMutex.RLock() + defer fake.kindMutex.RUnlock() + return len(fake.kindArgsForCall) +} + +func (fake *FakeTrackRemote) KindCalls(stub func() webrtc.RTPCodecType) { + fake.kindMutex.Lock() + defer fake.kindMutex.Unlock() + fake.KindStub = stub +} + +func (fake *FakeTrackRemote) KindReturns(result1 webrtc.RTPCodecType) { + fake.kindMutex.Lock() + defer fake.kindMutex.Unlock() + fake.KindStub = nil + fake.kindReturns = struct { + result1 webrtc.RTPCodecType + }{result1} +} + +func (fake *FakeTrackRemote) KindReturnsOnCall(i int, result1 webrtc.RTPCodecType) { + fake.kindMutex.Lock() + defer fake.kindMutex.Unlock() + fake.KindStub = nil + if fake.kindReturnsOnCall == nil { + fake.kindReturnsOnCall = make(map[int]struct { + result1 webrtc.RTPCodecType + }) + } + fake.kindReturnsOnCall[i] = struct { + result1 webrtc.RTPCodecType + }{result1} +} + +func (fake *FakeTrackRemote) SSRC() webrtc.SSRC { + fake.sSRCMutex.Lock() + ret, specificReturn := fake.sSRCReturnsOnCall[len(fake.sSRCArgsForCall)] + fake.sSRCArgsForCall = append(fake.sSRCArgsForCall, struct { + }{}) + stub := fake.SSRCStub + fakeReturns := fake.sSRCReturns + fake.recordInvocation("SSRC", []interface{}{}) + fake.sSRCMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeTrackRemote) SSRCCallCount() int { + fake.sSRCMutex.RLock() + defer fake.sSRCMutex.RUnlock() + return len(fake.sSRCArgsForCall) +} + +func (fake *FakeTrackRemote) SSRCCalls(stub func() webrtc.SSRC) { + fake.sSRCMutex.Lock() + defer fake.sSRCMutex.Unlock() + fake.SSRCStub = stub +} + +func (fake *FakeTrackRemote) SSRCReturns(result1 webrtc.SSRC) { + fake.sSRCMutex.Lock() + defer fake.sSRCMutex.Unlock() + fake.SSRCStub = nil + fake.sSRCReturns = struct { + result1 webrtc.SSRC + }{result1} +} + +func (fake *FakeTrackRemote) SSRCReturnsOnCall(i int, result1 webrtc.SSRC) { + fake.sSRCMutex.Lock() + defer fake.sSRCMutex.Unlock() + fake.SSRCStub = nil + if fake.sSRCReturnsOnCall == nil { + fake.sSRCReturnsOnCall = make(map[int]struct { + result1 webrtc.SSRC + }) + } + fake.sSRCReturnsOnCall[i] = struct { + result1 webrtc.SSRC + }{result1} +} + +func (fake *FakeTrackRemote) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeTrackRemote) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ synchronizer.TrackRemote = new(FakeTrackRemote) diff --git a/pkg/synchronizer/track.go b/pkg/synchronizer/track.go index 3b2adc3f..f78212a6 100644 --- a/pkg/synchronizer/track.go +++ b/pkg/synchronizer/track.go @@ -31,6 +31,9 @@ const ( maxAdjustment = time.Millisecond * 5 ) +//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate + +//counterfeiter:generate . TrackRemote type TrackRemote interface { ID() string Codec() webrtc.RTPCodecParameters @@ -59,6 +62,7 @@ type TrackSynchronizer struct { // offsets currentPTSOffset time.Duration // presentation timestamp offset (used for a/v sync) desiredPTSOffset time.Duration // desired presentation timestamp offset (used for a/v sync) + basePTSOffset time.Duration // component of the desired PTS offset (set initially to preserve initial offset) // sender reports lastSR uint32 @@ -94,6 +98,7 @@ func (t *TrackSynchronizer) Initialize(pkt *rtp.Packet) { t.currentPTSOffset = time.Duration(now.UnixNano() - startedAt) t.desiredPTSOffset = t.currentPTSOffset + t.basePTSOffset = t.desiredPTSOffset t.startRTP = pkt.Timestamp t.lastTS = pkt.Timestamp @@ -171,7 +176,7 @@ func (t *TrackSynchronizer) onSenderReport(pkt *rtcp.SenderReport) { return } - t.desiredPTSOffset += offset + t.desiredPTSOffset = t.basePTSOffset + offset t.lastSR = pkt.RTPTime }