diff --git a/pkg/synchronizer/synchronizer.go b/pkg/synchronizer/synchronizer.go index 7f122274..69cd45a1 100644 --- a/pkg/synchronizer/synchronizer.go +++ b/pkg/synchronizer/synchronizer.go @@ -29,8 +29,9 @@ type SynchronizerOption func(*SynchronizerConfig) // SynchronizerConfig holds configuration for the Synchronizer type SynchronizerConfig struct { - MaxTsDiff time.Duration - OnStarted func() + MaxTsDiff time.Duration + OnStarted func() + AudioPTSAdjustmentDisabled bool } // WithMaxTsDiff sets the maximum acceptable difference between RTP packets @@ -48,6 +49,17 @@ func WithOnStarted(onStarted func()) SynchronizerOption { } } +// WithAudioPTSAdjustmentDisabled - disables auto PTS adjustments after sender reports +// Use case: when media processing pipeline needs stable - monotonically increasing +// PTS sequence - small adjustments coming from RTCP sender reports could cause gaps in the audio +// Media processing pipeline could opt out of auto PTS adjustments and handle the gap +// by e.g modifying tempo to compensate instead +func WithAudioPTSAdjustmentDisabled() SynchronizerOption { + return func(config *SynchronizerConfig) { + config.AudioPTSAdjustmentDisabled = true + } +} + // a single Synchronizer is shared between all audio and video writers type Synchronizer struct { sync.RWMutex diff --git a/pkg/synchronizer/track.go b/pkg/synchronizer/track.go index 3f886543..8c4c07b1 100644 --- a/pkg/synchronizer/track.go +++ b/pkg/synchronizer/track.go @@ -46,7 +46,8 @@ type TrackSynchronizer struct { *rtpConverter // config - maxTsDiff time.Duration // maximum acceptable difference between RTP packets + maxTsDiff time.Duration // maximum acceptable difference between RTP packets + audioPTSAdjustmentsDisabled bool // disable audio packets PTS adjustments on SRs // timing info startTime time.Time // time first packet was pushed @@ -67,11 +68,12 @@ type TrackSynchronizer struct { func newTrackSynchronizer(s *Synchronizer, track TrackRemote) *TrackSynchronizer { t := &TrackSynchronizer{ - sync: s, - track: track, - logger: logger.GetLogger().WithValues("trackID", track.ID(), "codec", track.Codec().MimeType), - rtpConverter: newRTPConverter(int64(track.Codec().ClockRate)), - maxTsDiff: s.config.MaxTsDiff, + sync: s, + track: track, + logger: logger.GetLogger().WithValues("trackID", track.ID(), "codec", track.Codec().MimeType), + rtpConverter: newRTPConverter(int64(track.Codec().ClockRate)), + maxTsDiff: s.config.MaxTsDiff, + audioPTSAdjustmentsDisabled: s.config.AudioPTSAdjustmentDisabled, } return t @@ -123,11 +125,14 @@ func (t *TrackSynchronizer) GetPTS(pkt *rtp.Packet) (time.Duration, error) { t.startRTP = ts - t.toRTP(pts) } - if t.currentPTSOffset > t.desiredPTSOffset { - t.currentPTSOffset = max(t.currentPTSOffset-maxAdjustment, t.desiredPTSOffset) - } else if t.currentPTSOffset < t.desiredPTSOffset { - t.currentPTSOffset = min(t.currentPTSOffset+maxAdjustment, t.desiredPTSOffset) + if t.shouldAdjustPTS() { + if t.currentPTSOffset > t.desiredPTSOffset { + t.currentPTSOffset = max(t.currentPTSOffset-maxAdjustment, t.desiredPTSOffset) + } else if t.currentPTSOffset < t.desiredPTSOffset { + t.currentPTSOffset = min(t.currentPTSOffset+maxAdjustment, t.desiredPTSOffset) + } } + adjusted := pts + t.currentPTSOffset // if past end time, return EOF @@ -179,6 +184,14 @@ func (t *TrackSynchronizer) acceptable(d time.Duration) bool { return d > -t.maxTsDiff && d < t.maxTsDiff } +func (t *TrackSynchronizer) shouldAdjustPTS() bool { + adjustmentEnabled := true + if t.track.Kind() == webrtc.RTPCodecTypeAudio { + adjustmentEnabled = !t.audioPTSAdjustmentsDisabled + } + return adjustmentEnabled && (t.currentPTSOffset != t.desiredPTSOffset) +} + type rtpConverter struct { ts uint64 rtp uint64