Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 23 additions & 20 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,10 @@ func (e *RTCEngine) SetLogger(l protoLogger.Logger) {
e.signalHandler.SetLogger(l)
e.signalTransport.SetLogger(l)
if e.publisher != nil {
e.publisher.SetLogger(l)
e.publisher.SetLogger(l.WithValues("transport", livekit.SignalTarget_PUBLISHER))
}
if e.subscriber != nil {
e.subscriber.SetLogger(l)
e.subscriber.SetLogger(l.WithValues("transport", livekit.SignalTarget_SUBSCRIBER))
}
}

Expand All @@ -254,7 +254,7 @@ func (e *RTCEngine) JoinContext(
)
if e.signallingVersion == signalling.SignallingVersionV2 {
e.pclock.Lock()
e.createPublisherPCLocked(webrtc.Configuration{}, false)
e.createPublisherPCLocked(webrtc.Configuration{})

publisherOffer, err = e.publisher.GetOffer()
if err != nil {
Expand Down Expand Up @@ -369,23 +369,23 @@ func (e *RTCEngine) configure(
if e.publisher != nil {
setConfiguration(e.publisher, configuration)
} else {
if err := e.createPublisherPCLocked(configuration, !e.subscriberPrimary); err != nil {
if err := e.createPublisherPCLocked(configuration); err != nil {
return err
}
}

if e.subscriber != nil {
setConfiguration(e.subscriber, configuration)
} else {
if err := e.createSubscriberPCLocked(configuration, e.subscriberPrimary); err != nil {
if err := e.createSubscriberPCLocked(configuration); err != nil {
return err
}
}

return nil
}

func (e *RTCEngine) createPublisherPCLocked(configuration webrtc.Configuration, isPrimary bool) error {
func (e *RTCEngine) createPublisherPCLocked(configuration webrtc.Configuration) error {
var err error
if e.publisher, err = NewPCTransport(PCTransportParams{
Configuration: configuration,
Expand All @@ -397,7 +397,7 @@ func (e *RTCEngine) createPublisherPCLocked(configuration webrtc.Configuration,
}); err != nil {
return err
}
e.publisher.SetLogger(e.log)
e.publisher.SetLogger(e.log.WithValues("transport", livekit.SignalTarget_PUBLISHER))

e.publisher.pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
if candidate == nil {
Expand All @@ -407,20 +407,23 @@ func (e *RTCEngine) createPublisherPCLocked(configuration webrtc.Configuration,
init := candidate.ToJSON()
e.log.Debugw(
"local ICE candidate",
"target", livekit.SignalTarget_PUBLISHER,
"transport", livekit.SignalTarget_PUBLISHER,
"candidate", init.Candidate,
)
if err := e.signalTransport.SendMessage(
e.signalling.SignalICECandidate(
protosignalling.ToProtoTrickle(init, livekit.SignalTarget_PUBLISHER, false),
),
); err != nil {
e.log.Errorw("could not send ICE candidates for publisher", err)
e.log.Errorw(
"could not send ICE candidate", err,
"transport", livekit.SignalTarget_PUBLISHER,
)
}
})

e.publisher.pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
e.handleICEConnectionStateChange(e.publisher, livekit.SignalTarget_PUBLISHER, isPrimary, state)
e.handleICEConnectionStateChange(e.publisher, livekit.SignalTarget_PUBLISHER, state)
})

e.publisher.OnOffer = func(offer webrtc.SessionDescription) {
Expand Down Expand Up @@ -459,7 +462,6 @@ func (e *RTCEngine) createPublisherPCLocked(configuration webrtc.Configuration,

// SIGNALLING-V2-TODO: may need a separate peer connection
// SIGNALLING-V2-TODO: instantiating this should rely on signal transport strategy rather than signalling version
// SIGNALLING-V2-TODO: for signalling v2 instantiate publisher PC before connect and then do just SetConfiguration in OnConnectResponse
if e.signallingVersion == signalling.SignallingVersionV2 {
e.signallingDC, err = e.publisher.pc.CreateDataChannel(signallingDataChannelName, &webrtc.DataChannelInit{
Ordered: &trueVal,
Expand All @@ -475,6 +477,7 @@ func (e *RTCEngine) createPublisherPCLocked(configuration webrtc.Configuration,
SignalHandler: e.signalHandler,
})
e.signalTransport.SetAsyncTransport(signallingTransportDataChannel)
e.signalTransport.Start()
})
e.signallingDC.OnClose(func() {
// SIGNALLING-V2-TODO: should call SignalTransportHandler.OnClose
Expand All @@ -486,15 +489,15 @@ func (e *RTCEngine) createPublisherPCLocked(configuration webrtc.Configuration,
return nil
}

func (e *RTCEngine) createSubscriberPCLocked(configuration webrtc.Configuration, isPrimary bool) error {
func (e *RTCEngine) createSubscriberPCLocked(configuration webrtc.Configuration) error {
var err error
if e.subscriber, err = NewPCTransport(PCTransportParams{
Configuration: configuration,
RetransmitBufferSize: e.connParams.RetransmitBufferSize,
}); err != nil {
return err
}
e.subscriber.SetLogger(e.log)
e.subscriber.SetLogger(e.log.WithValues("transport", livekit.SignalTarget_SUBSCRIBER))

e.subscriber.OnRemoteDescriptionSettled(e.createSubscriberPCAnswerAndSend)

Expand All @@ -506,20 +509,23 @@ func (e *RTCEngine) createSubscriberPCLocked(configuration webrtc.Configuration,
init := candidate.ToJSON()
e.log.Debugw(
"local ICE candidate",
"target", livekit.SignalTarget_SUBSCRIBER,
"transport", livekit.SignalTarget_SUBSCRIBER,
"candidate", init.Candidate,
)
if err := e.signalTransport.SendMessage(
e.signalling.SignalICECandidate(
protosignalling.ToProtoTrickle(init, livekit.SignalTarget_SUBSCRIBER, false),
),
); err != nil {
e.log.Errorw("could not send ICE candidates for subscriber", err)
e.log.Errorw(
"could not send ICE candidate", err,
"transport", livekit.SignalTarget_SUBSCRIBER,
)
}
})

e.subscriber.pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
e.handleICEConnectionStateChange(e.subscriber, livekit.SignalTarget_SUBSCRIBER, isPrimary, state)
e.handleICEConnectionStateChange(e.subscriber, livekit.SignalTarget_SUBSCRIBER, state)
})

e.subscriber.pc.OnTrack(func(remote *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
Expand All @@ -545,7 +551,6 @@ func (e *RTCEngine) createSubscriberPCLocked(configuration webrtc.Configuration,
func (e *RTCEngine) handleICEConnectionStateChange(
transport *PCTransport,
signalTarget livekit.SignalTarget,
isPrimary bool,
state webrtc.ICEConnectionState,
) {
switch state {
Expand All @@ -559,9 +564,7 @@ func (e *RTCEngine) handleICEConnectionStateChange(
e.log.Debugw("ICE disconnected", "transport", signalTarget)
case webrtc.ICEConnectionStateFailed:
e.log.Debugw("ICE failed", "transport", signalTarget)
if isPrimary {
e.handleDisconnect(false)
}
e.handleDisconnect(false)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cnderrauber Addressing comment from previous PR. Handling disconnect on non-primary PC also.

}
}

Expand Down
13 changes: 10 additions & 3 deletions signalling/signaltransport_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,19 @@ func (s *signalTransportHttp) connect(
s.params.Signalling.SignalSdpOffer(protosignalling.ToProtoSessionDescription(publisherOffer, 0))
}

return s.sendHttpRequest(
urlPrefix+s.params.Signalling.Path(),
path := urlPrefix + s.params.Signalling.Path()
wireMessage := s.params.Signalling.PendingMessages()

startedAt := time.Now()
msg, err := s.sendHttpRequest(
path,
http.MethodPost,
token,
s.params.Signalling.PendingMessages(),
wireMessage,
)
s.params.Logger.Debugw("connect response received", "alapsed", time.Since(startedAt))

return msg, err
}

func (s *signalTransportHttp) sendHttpRequest(
Expand Down
5 changes: 4 additions & 1 deletion signalling/signaltransport_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,10 @@ func (s *signalTransportWebSocket) connect(
}

header := NewHTTPHeaderWithToken(token)
path := u.String()

startedAt := time.Now()
conn, hresp, err := websocket.DefaultDialer.DialContext(ctx, u.String(), header)
conn, hresp, err := websocket.DefaultDialer.DialContext(ctx, path, header)
if err != nil {
fields := []interface{}{
"duration", time.Since(startedAt),
Expand All @@ -236,6 +238,7 @@ func (s *signalTransportWebSocket) connect(
if err != nil {
return nil, err
}
s.params.Logger.Debugw("first message received", "elapsed", time.Since(startedAt))

return res, nil
}
Expand Down
Loading