Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 66 additions & 23 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ var (
// -------------------------------------------

const (
reliableDataChannelName = "_reliable"
lossyDataChannelName = "_lossy"
signallingDataChannelName = "_signalling"
reliableDataChannelName = "_reliable"
lossyDataChannelName = "_lossy"

maxReconnectCount = 10
initialReconnectInterval = 300 * time.Millisecond
Expand All @@ -123,6 +124,8 @@ const (
type RTCEngine struct {
log protoLogger.Logger

signallingVersion signalling.SignallingVersion

engineHandler engineHandler
cbGetLocalParticipantSID func() string

Expand All @@ -135,6 +138,7 @@ type RTCEngine struct {
signalTransport signalling.SignalTransport

dclock sync.RWMutex
signallingDC *webrtc.DataChannel
reliableDC *webrtc.DataChannel
lossyDC *webrtc.DataChannel
reliableDCSub *webrtc.DataChannel
Expand Down Expand Up @@ -162,32 +166,39 @@ type RTCEngine struct {
onCloseLock sync.Mutex
}

func NewRTCEngine(engineHandler engineHandler, getLocalParticipantSID func() string) *RTCEngine {
func NewRTCEngine(
signallingVersion signalling.SignallingVersion,
engineHandler engineHandler,
getLocalParticipantSID func() string,
) *RTCEngine {
e := &RTCEngine{
log: logger,
signallingVersion: signallingVersion,
engineHandler: engineHandler,
cbGetLocalParticipantSID: getLocalParticipantSID,
trackPublishedListeners: make(map[string]chan *livekit.TrackPublishedResponse),
joinTimeout: 15 * time.Second,
reliableMsgSeq: 1,
}
// SIGNALLING-V2-TODO: have to instantiate objects based on signal version & transport
e.signalling = signalling.NewSignalling(signalling.SignallingParams{
Logger: e.log,
})
e.signalHandler = signalling.NewSignalHandler(signalling.SignalHandlerParams{
Logger: e.log,
Processor: e,
})
e.signalTransport = signalling.NewSignalTransportWebSocket(signalling.SignalTransportWebSocketParams{
Logger: e.log,
Version: Version,
Protocol: PROTOCOL,
Signalling: e.signalling,
SignalTransportHandler: e,
SignalHandler: e.signalHandler,
})
/*
switch signallingVersion {
case signalling.SignallingVersionV1:
e.signalling = signalling.NewSignalling(signalling.SignallingParams{
Logger: e.log,
})
e.signalHandler = signalling.NewSignalHandler(signalling.SignalHandlerParams{
Logger: e.log,
Processor: e,
})
e.signalTransport = signalling.NewSignalTransportWebSocket(signalling.SignalTransportWebSocketParams{
Logger: e.log,
Version: Version,
Protocol: PROTOCOL,
Signalling: e.signalling,
SignalTransportHandler: e,
SignalHandler: e.signalHandler,
})

case signalling.SignallingVersionV2:
e.signalling = signalling.NewSignallingv2(signalling.Signallingv2Params{
Logger: e.log,
})
Expand All @@ -203,7 +214,10 @@ func NewRTCEngine(engineHandler engineHandler, getLocalParticipantSID func() str
Signalling: e.signalling,
SignalHandler: e.signalHandler,
})
*/

default:
return nil
}

e.onClose = []func(){}
return e
Expand Down Expand Up @@ -318,8 +332,8 @@ func (e *RTCEngine) setRTT(rtt uint32) {
func (e *RTCEngine) configure(
iceServers []*livekit.ICEServer,
clientConfig *livekit.ClientConfiguration,
subscriberPrimary *bool) error {

subscriberPrimary *bool,
) error {
configuration := e.makeRTCConfiguration(iceServers, clientConfig)

// reset reliable message sequence
Expand Down Expand Up @@ -467,6 +481,7 @@ func (e *RTCEngine) configure(
return err
}
e.lossyDC.OnMessage(e.handleDataPacket)

e.reliableDC, err = e.publisher.PeerConnection().CreateDataChannel(reliableDataChannelName, &webrtc.DataChannelInit{
Ordered: &trueVal,
})
Expand All @@ -475,6 +490,30 @@ func (e *RTCEngine) configure(
return err
}
e.reliableDC.OnMessage(e.handleDataPacket)

// SIGNALLING-V2-TODO: instantiating this rely on signal transport strategy rather than signalling version
// SIGNALLING-V2-TODO: for signalling v2 instantiate publisher PC before connect and then do just SetConfiguration in OnConnectResponse
if e.signallingVersion == signalling.SignallingVersionV2 {
e.signallingDC, err = e.publisher.PeerConnection().CreateDataChannel(signallingDataChannelName, &webrtc.DataChannelInit{
Ordered: &trueVal,
})
if err != nil {
e.dclock.Unlock()
return err
}
e.signallingDC.OnOpen(func() {
signallingTransportDataChannel := signalling.NewSignalTransportDataChannel(signalling.SignalTransportDataChannelParams{
Logger: e.log,
DataChannel: e.signallingDC,
SignalHandler: e.signalHandler,
})
e.signalTransport.SetAsyncTransport(signallingTransportDataChannel)
})
e.signallingDC.OnClose(func() {
// SIGNALLING-V2-TODO: should call SignalTransportHandler.OnClose
})
e.signallingDC.OnMessage(e.handleSignalling)
}
e.dclock.Unlock()

return nil
Expand Down Expand Up @@ -637,6 +676,10 @@ func (e *RTCEngine) readDataPacket(msg webrtc.DataChannelMessage) (*livekit.Data
return dataPacket, err
}

func (e *RTCEngine) handleSignalling(msg webrtc.DataChannelMessage) {
e.signalHandler.HandleEncodedMessage(msg.Data)
}

func (e *RTCEngine) handleDisconnect(fullReconnect bool) {
// do not retry until fully connected
if e.closed.Load() || !e.hasConnected.Load() {
Expand Down
10 changes: 9 additions & 1 deletion room.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ import (
"github.com/livekit/protocol/livekit"
)

var (
signallingVersion signalling.SignallingVersion = signalling.SignallingVersionV1
)

func WithSignallingVersion(v signalling.SignallingVersion) {
signallingVersion = v
}

var (
_ engineHandler = (*Room)(nil)
)
Expand Down Expand Up @@ -204,7 +212,7 @@ func NewRoom(callback *RoomCallback) *Room {
}
r.callback.Merge(callback)

r.engine = NewRTCEngine(r, r.getLocalParticipantSID)
r.engine = NewRTCEngine(signallingVersion, r, r.getLocalParticipantSID)
r.LocalParticipant = newLocalParticipant(r.engine, r.callback, r.serverInfo)
return r
}
Expand Down
3 changes: 2 additions & 1 deletion signalling/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ var (
ErrInvalidMessageType = errors.New("invalid message type")
ErrInvalidParameter = errors.New("invalid parameter")
ErrCannotDialSignal = errors.New("could not dial signal connection")
ErrEmptyResponse = errors.New("empty response")
ErrMessageQueueNotStarted = errors.New("message queue not started")
ErrMessageQueueFull = errors.New("message queue is full")
)
30 changes: 30 additions & 0 deletions signalling/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,32 @@ import (
"google.golang.org/protobuf/proto"
)

type SignallingVersion int

const (
SignallingVersionUnused SignallingVersion = iota
SignallingVersionV1 // v1
SignallingVersionV2 // v2
)

func (s SignallingVersion) String() string {
switch s {
case SignallingVersionUnused:
return "UNUSED"

case SignallingVersionV1:
return "V1"

case SignallingVersionV2:
return "V2"

default:
return fmt.Sprintf("UNKNOWN: %d", s)
}
}

// ---------------------------

type joinMethod int

const (
Expand Down Expand Up @@ -124,6 +150,7 @@ type ConnectParams struct {

type SignalTransport interface {
SetLogger(l protoLogger.Logger)
SetAsyncTransport(asyncTransport SignalTransport)

Start()
IsStarted() bool
Expand Down Expand Up @@ -153,6 +180,9 @@ type SignalHandler interface {
SetLogger(l protoLogger.Logger)

HandleMessage(msg proto.Message) error
HandleEncodedMessage(data []byte) error

PruneStaleReassemblies()
}

type SignalProcessor interface {
Expand Down
107 changes: 107 additions & 0 deletions signalling/messagequeue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package signalling

import (
"sync"

"github.com/livekit/protocol/logger"
"google.golang.org/protobuf/proto"
)

type messageQueueParams struct {
Logger logger.Logger
HandleMessage func(msg proto.Message)
}

type messageQueue struct {
params messageQueueParams

lock sync.RWMutex
isStarted bool
msgChan chan proto.Message
}

func newMessageQueue(params messageQueueParams) *messageQueue {
return &messageQueue{
params: params,
}
}

func (m *messageQueue) SetLogger(l logger.Logger) {
m.params.Logger = l
}

func (m *messageQueue) Start() {
m.lock.Lock()
defer m.lock.Unlock()

if m.isStarted {
return
}
m.isStarted = true

m.msgChan = make(chan proto.Message, 100)
go m.worker(m.msgChan)
}

func (m *messageQueue) IsStarted() bool {
m.lock.RLock()
defer m.lock.RUnlock()

return m.isStarted
}

func (m *messageQueue) Close() {
m.lock.Lock()
defer m.lock.Unlock()

if !m.isStarted {
return
}
m.isStarted = false

close(m.msgChan)
}

func (m *messageQueue) Enqueue(msg proto.Message) error {
if msg == nil {
return nil
}

m.lock.RLock()
defer m.lock.RUnlock()
if !m.isStarted {
return ErrMessageQueueNotStarted
}

select {
case m.msgChan <- msg:
return nil
default:
return ErrMessageQueueFull
}
}

func (m *messageQueue) worker(msgChan chan proto.Message) {
for {
msg, more := <-msgChan
if !more {
return
}

m.params.HandleMessage(msg)
}
}
9 changes: 9 additions & 0 deletions signalling/signalhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,12 @@ func (s *signalhandler) HandleMessage(msg proto.Message) error {

return nil
}

func (s *signalhandler) HandleEncodedMessage(data []byte) error {
signalResponse := &livekit.SignalResponse{}
if err := proto.Unmarshal(data, signalResponse); err != nil {
return err
}

return s.HandleMessage(signalResponse)
}
6 changes: 6 additions & 0 deletions signalling/signalhandlerunimplemented.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,9 @@ func (s *signalhandlerUnimplemented) SetLogger(l logger.Logger) {}
func (s *signalhandlerUnimplemented) HandleMessage(msg proto.Message) error {
return nil
}

func (s *signalhandlerUnimplemented) HandleEncodedMessage(data []byte) error {
return nil
}

func (s *signalhandlerUnimplemented) PruneStaleReassemblies() {}
Loading
Loading