Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions pkg/synchronizer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ type SynchronizerOption func(*SynchronizerConfig)

// SynchronizerConfig holds configuration for the Synchronizer
type SynchronizerConfig struct {
MaxTsDiff time.Duration
OnStarted func()
MaxTsDiff time.Duration
OnStarted func()
AudioPTSAdjustmentDisabled bool
}

// WithMaxTsDiff sets the maximum acceptable difference between RTP packets
Expand All @@ -48,6 +49,17 @@ func WithOnStarted(onStarted func()) SynchronizerOption {
}
}

// WithAudioPTSAdjustmentDisabled - disables auto PTS adjustments after sender reports
// Use case: when media processing pipeline needs stable - monotonically increasing
// PTS sequence - small adjustments coming from RTCP sender reports could cause gaps in the audio
// Media processing pipeline could opt out of auto PTS adjustments and handle the gap
// by e.g modifying tempo to compensate instead
func WithAudioPTSAdjustmentDisabled() SynchronizerOption {
return func(config *SynchronizerConfig) {
config.AudioPTSAdjustmentDisabled = true
}
}

// a single Synchronizer is shared between all audio and video writers
type Synchronizer struct {
sync.RWMutex
Expand Down
33 changes: 23 additions & 10 deletions pkg/synchronizer/track.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ type TrackSynchronizer struct {
*rtpConverter

// config
maxTsDiff time.Duration // maximum acceptable difference between RTP packets
maxTsDiff time.Duration // maximum acceptable difference between RTP packets
audioPTSAdjustmentsDisabled bool // disable audio packets PTS adjustments on SRs

// timing info
startTime time.Time // time first packet was pushed
Expand All @@ -67,11 +68,12 @@ type TrackSynchronizer struct {

func newTrackSynchronizer(s *Synchronizer, track TrackRemote) *TrackSynchronizer {
t := &TrackSynchronizer{
sync: s,
track: track,
logger: logger.GetLogger().WithValues("trackID", track.ID(), "codec", track.Codec().MimeType),
rtpConverter: newRTPConverter(int64(track.Codec().ClockRate)),
maxTsDiff: s.config.MaxTsDiff,
sync: s,
track: track,
logger: logger.GetLogger().WithValues("trackID", track.ID(), "codec", track.Codec().MimeType),
rtpConverter: newRTPConverter(int64(track.Codec().ClockRate)),
maxTsDiff: s.config.MaxTsDiff,
audioPTSAdjustmentsDisabled: s.config.AudioPTSAdjustmentDisabled,
}

return t
Expand Down Expand Up @@ -123,11 +125,14 @@ func (t *TrackSynchronizer) GetPTS(pkt *rtp.Packet) (time.Duration, error) {
t.startRTP = ts - t.toRTP(pts)
}

if t.currentPTSOffset > t.desiredPTSOffset {
t.currentPTSOffset = max(t.currentPTSOffset-maxAdjustment, t.desiredPTSOffset)
} else if t.currentPTSOffset < t.desiredPTSOffset {
t.currentPTSOffset = min(t.currentPTSOffset+maxAdjustment, t.desiredPTSOffset)
if t.shouldAdjustPTS() {
if t.currentPTSOffset > t.desiredPTSOffset {
t.currentPTSOffset = max(t.currentPTSOffset-maxAdjustment, t.desiredPTSOffset)
} else if t.currentPTSOffset < t.desiredPTSOffset {
t.currentPTSOffset = min(t.currentPTSOffset+maxAdjustment, t.desiredPTSOffset)
}
}

adjusted := pts + t.currentPTSOffset

// if past end time, return EOF
Expand Down Expand Up @@ -179,6 +184,14 @@ func (t *TrackSynchronizer) acceptable(d time.Duration) bool {
return d > -t.maxTsDiff && d < t.maxTsDiff
}

func (t *TrackSynchronizer) shouldAdjustPTS() bool {
adjustmentEnabled := true
if t.track.Kind() == webrtc.RTPCodecTypeAudio {
adjustmentEnabled = !t.audioPTSAdjustmentsDisabled
}
return adjustmentEnabled && (t.currentPTSOffset != t.desiredPTSOffset)
}

type rtpConverter struct {
ts uint64
rtp uint64
Expand Down
Loading