Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/pion/webrtc/v4"
"go.uber.org/atomic"
"golang.org/x/mod/semver"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -72,6 +71,7 @@ type engineHandler interface {
OnStreamTrailer(*livekit.DataStream_Trailer)
OnLocalTrackSubscribed(trackSubscribed *livekit.TrackSubscribed)
OnSubscribedQualityUpdate(subscribedQualityUpdate *livekit.SubscribedQualityUpdate)
OnMediaSectionsRequirement(mediaSectionsRequirement *livekit.MediaSectionsRequirement)
}

type nullEngineHandler struct{}
Expand Down Expand Up @@ -103,6 +103,8 @@ func (n *nullEngineHandler) OnStreamTrailer(*livekit.DataStream_Trailer)
func (n *nullEngineHandler) OnLocalTrackSubscribed(trackSubscribed *livekit.TrackSubscribed) {}
func (n *nullEngineHandler) OnSubscribedQualityUpdate(subscribedQualityUpdate *livekit.SubscribedQualityUpdate) {
}
func (n *nullEngineHandler) OnMediaSectionsRequirement(mediaSectionsRequirement *livekit.MediaSectionsRequirement) {
}

// -------------------------------------------

Expand All @@ -125,6 +127,7 @@ const (
type RTCEngine struct {
log protoLogger.Logger

useSinglePeerConnection bool
engineHandler engineHandler
cbGetLocalParticipantSID func() string

Expand Down Expand Up @@ -166,18 +169,20 @@ type RTCEngine struct {
}

func NewRTCEngine(
useSinglePeerConnection bool,
engineHandler engineHandler,
getLocalParticipantSID func() string,
) *RTCEngine {
e := &RTCEngine{
log: logger,
useSinglePeerConnection: useSinglePeerConnection,
engineHandler: engineHandler,
cbGetLocalParticipantSID: getLocalParticipantSID,
trackPublishedListeners: make(map[string]chan *livekit.TrackPublishedResponse),
joinTimeout: 15 * time.Second,
reliableMsgSeq: 1,
}
if semver.Compare("v"+Version, "v3.0.0") < 0 {
if !useSinglePeerConnection {
e.signalling = signalling.NewSignalling(signalling.SignallingParams{
Logger: e.log,
})
Expand Down Expand Up @@ -299,7 +304,7 @@ func (e *RTCEngine) IsConnected() bool {
e.pclock.Lock()
defer e.pclock.Unlock()

if e.publisher == nil || e.subscriber == nil {
if e.publisher == nil || (!e.useSinglePeerConnection && e.subscriber == nil) {
return false
}
if e.subscriberPrimary {
Expand Down Expand Up @@ -445,6 +450,10 @@ func (e *RTCEngine) createPublisherPCLocked(configuration webrtc.Configuration)
}

func (e *RTCEngine) createSubscriberPCLocked(configuration webrtc.Configuration) error {
if e.useSinglePeerConnection {
return nil
}

var err error
if e.subscriber, err = NewPCTransport(PCTransportParams{
Configuration: configuration,
Expand Down Expand Up @@ -817,6 +826,7 @@ func (e *RTCEngine) createSubscriberPCAnswerAndSend() error {
e.log.Errorw("could not set subscriber local description", err)
return err
}
e.log.Debugw("sending answer for subscriber", "answer", answer)
if err := e.signalTransport.SendMessage(
e.signalling.SignalSdpAnswer(
protosignalling.ToProtoSessionDescription(answer, 0),
Expand Down Expand Up @@ -1242,14 +1252,18 @@ func (e *RTCEngine) OnReconnectResponse(res *livekit.ReconnectResponse) error {
e.pclock.Lock()
defer e.pclock.Unlock()

if err := e.publisher.SetConfiguration(configuration); err != nil {
e.log.Errorw("could not set rtc configuration for publisher", err)
return err
if e.publisher != nil {
if err := e.publisher.SetConfiguration(configuration); err != nil {
e.log.Errorw("could not set rtc configuration for publisher", err)
return err
}
}

if err := e.subscriber.SetConfiguration(configuration); err != nil {
e.log.Errorw("could not set rtc configuration for subscriber", err)
return err
if e.subscriber != nil {
if err := e.subscriber.SetConfiguration(configuration); err != nil {
e.log.Errorw("could not set rtc configuration for subscriber", err)
return err
}
}

return nil
Expand All @@ -1274,7 +1288,7 @@ func (e *RTCEngine) OnOffer(sd webrtc.SessionDescription, offerId uint32) {
return
}

e.log.Debugw("received offer for subscriber")
e.log.Debugw("received offer for subscriber", "offer", sd, "offerId", offerId)
if err := e.subscriber.SetRemoteDescription(sd); err != nil {
e.log.Errorw("could not set remote description", err)
return
Expand Down Expand Up @@ -1371,6 +1385,10 @@ func (e *RTCEngine) OnSubscribedQualityUpdate(subscribedQualityUpdate *livekit.S
e.engineHandler.OnSubscribedQualityUpdate(subscribedQualityUpdate)
}

func (e *RTCEngine) OnMediaSectionsRequirement(mediaSectionsRequirement *livekit.MediaSectionsRequirement) {
e.engineHandler.OnMediaSectionsRequirement(mediaSectionsRequirement)
}

// ------------------------------------

func setConfiguration(pcTransport *PCTransport, configuration webrtc.Configuration) {
Expand Down
36 changes: 18 additions & 18 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,26 @@ require (
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731
github.com/livekit/media-sdk v0.0.0-20250518151703-b07af88637c5
github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded
github.com/livekit/protocol v1.39.4-0.20250807105828-ccbae8154e54
github.com/livekit/protocol v1.40.1-0.20250826073447-c714707269e5
github.com/magefile/mage v1.15.0
github.com/pion/dtls/v3 v3.0.7
github.com/pion/interceptor v0.1.40
github.com/pion/rtcp v1.2.15
github.com/pion/rtp v1.8.21
github.com/pion/sdp/v3 v3.0.15
github.com/pion/webrtc/v4 v4.1.3
github.com/pion/webrtc/v4 v4.1.5-0.20250825162555-4b37165dcc27
github.com/stretchr/testify v1.10.0
github.com/twitchtv/twirp v8.1.3+incompatible
go.uber.org/atomic v1.11.0
golang.org/x/crypto v0.40.0
golang.org/x/exp v0.0.0-20250718183923-645b1fa84792
google.golang.org/protobuf v1.36.7
golang.org/x/crypto v0.41.0
golang.org/x/exp v0.0.0-20250819193227-8b4c13bb791b
google.golang.org/protobuf v1.36.8
)

require golang.org/x/mod v0.26.0
require golang.org/x/mod v0.27.0

require (
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.6-20250717185734-6c6e0d3c608e.1 // indirect
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.8-20250717185734-6c6e0d3c608e.1 // indirect
buf.build/go/protovalidate v0.14.0 // indirect
buf.build/go/protoyaml v0.6.0 // indirect
cel.dev/expr v0.24.0 // indirect
Expand All @@ -38,7 +38,7 @@ require (
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dennwc/iters v1.1.0 // indirect
github.com/dennwc/iters v1.2.2 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/frostbyte73/core v0.1.1 // indirect
github.com/fsnotify/fsnotify v1.9.0 // indirect
Expand All @@ -54,7 +54,7 @@ require (
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/lithammer/shortuuid/v4 v4.2.0 // indirect
github.com/livekit/psrpc v0.6.1-0.20250726180611-3915e005e741 // indirect
github.com/nats-io/nats.go v1.44.0 // indirect
github.com/nats-io/nats.go v1.45.0 // indirect
github.com/nats-io/nkeys v0.4.11 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/opencontainers/runc v1.1.14 // indirect
Expand All @@ -64,26 +64,26 @@ require (
github.com/pion/mdns/v2 v2.0.7 // indirect
github.com/pion/randutil v0.1.0 // indirect
github.com/pion/sctp v1.8.39 // indirect
github.com/pion/srtp/v3 v3.0.6 // indirect
github.com/pion/srtp/v3 v3.0.7 // indirect
github.com/pion/stun/v3 v3.0.0 // indirect
github.com/pion/transport/v3 v3.0.7 // indirect
github.com/pion/turn/v4 v4.0.2 // indirect
github.com/pion/turn/v4 v4.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/puzpuzpuz/xsync/v3 v3.5.1 // indirect
github.com/redis/go-redis/v9 v9.12.0 // indirect
github.com/redis/go-redis/v9 v9.12.1 // indirect
github.com/stoewer/go-strcase v1.3.1 // indirect
github.com/wlynxg/anet v0.0.5 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
go.uber.org/zap/exp v0.3.0 // indirect
golang.org/x/net v0.42.0 // indirect
golang.org/x/net v0.43.0 // indirect
golang.org/x/sync v0.16.0 // indirect
golang.org/x/sys v0.34.0 // indirect
golang.org/x/text v0.27.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250804133106-a7a43d27e69b // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b // indirect
google.golang.org/grpc v1.74.2 // indirect
golang.org/x/sys v0.35.0 // indirect
golang.org/x/text v0.28.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 // indirect
google.golang.org/grpc v1.75.0 // indirect
gopkg.in/hraban/opus.v2 v2.0.0-20230925203106-0188a62cb302 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading
Loading