diff --git a/engine.go b/engine.go index 26068c31..2f700cde 100644 --- a/engine.go +++ b/engine.go @@ -24,7 +24,6 @@ import ( "github.com/pion/webrtc/v4" "go.uber.org/atomic" - "golang.org/x/mod/semver" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" @@ -72,6 +71,7 @@ type engineHandler interface { OnStreamTrailer(*livekit.DataStream_Trailer) OnLocalTrackSubscribed(trackSubscribed *livekit.TrackSubscribed) OnSubscribedQualityUpdate(subscribedQualityUpdate *livekit.SubscribedQualityUpdate) + OnMediaSectionsRequirement(mediaSectionsRequirement *livekit.MediaSectionsRequirement) } type nullEngineHandler struct{} @@ -103,6 +103,8 @@ func (n *nullEngineHandler) OnStreamTrailer(*livekit.DataStream_Trailer) func (n *nullEngineHandler) OnLocalTrackSubscribed(trackSubscribed *livekit.TrackSubscribed) {} func (n *nullEngineHandler) OnSubscribedQualityUpdate(subscribedQualityUpdate *livekit.SubscribedQualityUpdate) { } +func (n *nullEngineHandler) OnMediaSectionsRequirement(mediaSectionsRequirement *livekit.MediaSectionsRequirement) { +} // ------------------------------------------- @@ -125,6 +127,7 @@ const ( type RTCEngine struct { log protoLogger.Logger + useSinglePeerConnection bool engineHandler engineHandler cbGetLocalParticipantSID func() string @@ -166,18 +169,20 @@ type RTCEngine struct { } func NewRTCEngine( + useSinglePeerConnection bool, engineHandler engineHandler, getLocalParticipantSID func() string, ) *RTCEngine { e := &RTCEngine{ log: logger, + useSinglePeerConnection: useSinglePeerConnection, engineHandler: engineHandler, cbGetLocalParticipantSID: getLocalParticipantSID, trackPublishedListeners: make(map[string]chan *livekit.TrackPublishedResponse), joinTimeout: 15 * time.Second, reliableMsgSeq: 1, } - if semver.Compare("v"+Version, "v3.0.0") < 0 { + if !useSinglePeerConnection { e.signalling = signalling.NewSignalling(signalling.SignallingParams{ Logger: e.log, }) @@ -299,7 +304,7 @@ func (e *RTCEngine) IsConnected() bool { e.pclock.Lock() defer e.pclock.Unlock() - if e.publisher == nil || e.subscriber == nil { + if e.publisher == nil || (!e.useSinglePeerConnection && e.subscriber == nil) { return false } if e.subscriberPrimary { @@ -445,6 +450,10 @@ func (e *RTCEngine) createPublisherPCLocked(configuration webrtc.Configuration) } func (e *RTCEngine) createSubscriberPCLocked(configuration webrtc.Configuration) error { + if e.useSinglePeerConnection { + return nil + } + var err error if e.subscriber, err = NewPCTransport(PCTransportParams{ Configuration: configuration, @@ -817,6 +826,7 @@ func (e *RTCEngine) createSubscriberPCAnswerAndSend() error { e.log.Errorw("could not set subscriber local description", err) return err } + e.log.Debugw("sending answer for subscriber", "answer", answer) if err := e.signalTransport.SendMessage( e.signalling.SignalSdpAnswer( protosignalling.ToProtoSessionDescription(answer, 0), @@ -1242,14 +1252,18 @@ func (e *RTCEngine) OnReconnectResponse(res *livekit.ReconnectResponse) error { e.pclock.Lock() defer e.pclock.Unlock() - if err := e.publisher.SetConfiguration(configuration); err != nil { - e.log.Errorw("could not set rtc configuration for publisher", err) - return err + if e.publisher != nil { + if err := e.publisher.SetConfiguration(configuration); err != nil { + e.log.Errorw("could not set rtc configuration for publisher", err) + return err + } } - if err := e.subscriber.SetConfiguration(configuration); err != nil { - e.log.Errorw("could not set rtc configuration for subscriber", err) - return err + if e.subscriber != nil { + if err := e.subscriber.SetConfiguration(configuration); err != nil { + e.log.Errorw("could not set rtc configuration for subscriber", err) + return err + } } return nil @@ -1274,7 +1288,7 @@ func (e *RTCEngine) OnOffer(sd webrtc.SessionDescription, offerId uint32) { return } - e.log.Debugw("received offer for subscriber") + e.log.Debugw("received offer for subscriber", "offer", sd, "offerId", offerId) if err := e.subscriber.SetRemoteDescription(sd); err != nil { e.log.Errorw("could not set remote description", err) return @@ -1371,6 +1385,10 @@ func (e *RTCEngine) OnSubscribedQualityUpdate(subscribedQualityUpdate *livekit.S e.engineHandler.OnSubscribedQualityUpdate(subscribedQualityUpdate) } +func (e *RTCEngine) OnMediaSectionsRequirement(mediaSectionsRequirement *livekit.MediaSectionsRequirement) { + e.engineHandler.OnMediaSectionsRequirement(mediaSectionsRequirement) +} + // ------------------------------------ func setConfiguration(pcTransport *PCTransport, configuration webrtc.Configuration) { diff --git a/go.mod b/go.mod index a55ef181..8d7d0ff0 100644 --- a/go.mod +++ b/go.mod @@ -10,26 +10,26 @@ require ( github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 github.com/livekit/media-sdk v0.0.0-20250518151703-b07af88637c5 github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded - github.com/livekit/protocol v1.39.4-0.20250807105828-ccbae8154e54 + github.com/livekit/protocol v1.40.1-0.20250826073447-c714707269e5 github.com/magefile/mage v1.15.0 github.com/pion/dtls/v3 v3.0.7 github.com/pion/interceptor v0.1.40 github.com/pion/rtcp v1.2.15 github.com/pion/rtp v1.8.21 github.com/pion/sdp/v3 v3.0.15 - github.com/pion/webrtc/v4 v4.1.3 + github.com/pion/webrtc/v4 v4.1.5-0.20250825162555-4b37165dcc27 github.com/stretchr/testify v1.10.0 github.com/twitchtv/twirp v8.1.3+incompatible go.uber.org/atomic v1.11.0 - golang.org/x/crypto v0.40.0 - golang.org/x/exp v0.0.0-20250718183923-645b1fa84792 - google.golang.org/protobuf v1.36.7 + golang.org/x/crypto v0.41.0 + golang.org/x/exp v0.0.0-20250819193227-8b4c13bb791b + google.golang.org/protobuf v1.36.8 ) -require golang.org/x/mod v0.26.0 +require golang.org/x/mod v0.27.0 require ( - buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.6-20250717185734-6c6e0d3c608e.1 // indirect + buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.8-20250717185734-6c6e0d3c608e.1 // indirect buf.build/go/protovalidate v0.14.0 // indirect buf.build/go/protoyaml v0.6.0 // indirect cel.dev/expr v0.24.0 // indirect @@ -38,7 +38,7 @@ require ( github.com/benbjohnson/clock v1.3.5 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/dennwc/iters v1.1.0 // indirect + github.com/dennwc/iters v1.2.2 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/frostbyte73/core v0.1.1 // indirect github.com/fsnotify/fsnotify v1.9.0 // indirect @@ -54,7 +54,7 @@ require ( github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/lithammer/shortuuid/v4 v4.2.0 // indirect github.com/livekit/psrpc v0.6.1-0.20250726180611-3915e005e741 // indirect - github.com/nats-io/nats.go v1.44.0 // indirect + github.com/nats-io/nats.go v1.45.0 // indirect github.com/nats-io/nkeys v0.4.11 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/opencontainers/runc v1.1.14 // indirect @@ -64,26 +64,26 @@ require ( github.com/pion/mdns/v2 v2.0.7 // indirect github.com/pion/randutil v0.1.0 // indirect github.com/pion/sctp v1.8.39 // indirect - github.com/pion/srtp/v3 v3.0.6 // indirect + github.com/pion/srtp/v3 v3.0.7 // indirect github.com/pion/stun/v3 v3.0.0 // indirect github.com/pion/transport/v3 v3.0.7 // indirect - github.com/pion/turn/v4 v4.0.2 // indirect + github.com/pion/turn/v4 v4.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/puzpuzpuz/xsync/v3 v3.5.1 // indirect - github.com/redis/go-redis/v9 v9.12.0 // indirect + github.com/redis/go-redis/v9 v9.12.1 // indirect github.com/stoewer/go-strcase v1.3.1 // indirect github.com/wlynxg/anet v0.0.5 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect go.uber.org/zap/exp v0.3.0 // indirect - golang.org/x/net v0.42.0 // indirect + golang.org/x/net v0.43.0 // indirect golang.org/x/sync v0.16.0 // indirect - golang.org/x/sys v0.34.0 // indirect - golang.org/x/text v0.27.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20250804133106-a7a43d27e69b // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b // indirect - google.golang.org/grpc v1.74.2 // indirect + golang.org/x/sys v0.35.0 // indirect + golang.org/x/text v0.28.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 // indirect + google.golang.org/grpc v1.75.0 // indirect gopkg.in/hraban/opus.v2 v2.0.0-20230925203106-0188a62cb302 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 44cf32ac..dabdab59 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.6-20250717185734-6c6e0d3c608e.1 h1:Lg6klmCi3v7VvpqeeLEER9/m5S8y9e9DjhqQnSCNy4k= -buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.6-20250717185734-6c6e0d3c608e.1/go.mod h1:avRlCjnFzl98VPaeCtJ24RrV/wwHFzB8sWXhj26+n/U= +buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.8-20250717185734-6c6e0d3c608e.1 h1:sjY1k5uszbIZfv11HO2keV4SLhNA47SabPO886v7Rvo= +buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.8-20250717185734-6c6e0d3c608e.1/go.mod h1:8EQ5GzyGJQ5tEIwMSxCl8RKJYsjCpAwkdcENoioXT6g= buf.build/go/protovalidate v0.14.0 h1:kr/rC/no+DtRyYX+8KXLDxNnI1rINz0imk5K44ZpZ3A= buf.build/go/protovalidate v0.14.0/go.mod h1:+F/oISho9MO7gJQNYC2VWLzcO1fTPmaTA08SDYJZncA= buf.build/go/protoyaml v0.6.0 h1:Nzz1lvcXF8YgNZXk+voPPwdU8FjDPTUV4ndNTXN0n2w= @@ -35,8 +35,8 @@ github.com/containerd/continuity v0.4.3/go.mod h1:F6PTNCKepoxEaXLQp3wDAjygEnImnZ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dennwc/iters v1.1.0 h1:PsS3DbOU7GxSUQO0e7SGmzHkPhtwOlwbqggJ++Bgnr8= -github.com/dennwc/iters v1.1.0/go.mod h1:M9KuuMBeyEXYTmB7EnI9SCyALFCmPWOIxn5W1L0CjGg= +github.com/dennwc/iters v1.2.2 h1:XH2/Etihiy9ZvPOVCR+icQXeYlhbvS7k0qro4x/2qQo= +github.com/dennwc/iters v1.2.2/go.mod h1:M9KuuMBeyEXYTmB7EnI9SCyALFCmPWOIxn5W1L0CjGg= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/docker/cli v26.1.4+incompatible h1:I8PHdc0MtxEADqYJZvhBrW9bo8gawKwwenxRM7/rLu8= @@ -99,8 +99,8 @@ github.com/livekit/media-sdk v0.0.0-20250518151703-b07af88637c5 h1:aFCwt/rticj5L github.com/livekit/media-sdk v0.0.0-20250518151703-b07af88637c5/go.mod h1:7ssWiG+U4xnbvLih9WiZbhQP6zIKMjgXdUtIE1bm/E8= github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded h1:ylZPdnlX1RW9Z15SD4mp87vT2D2shsk0hpLJwSPcq3g= github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded/go.mod h1:mSNtYzSf6iY9xM3UX42VEI+STHvMgHmrYzEHPcdhB8A= -github.com/livekit/protocol v1.39.4-0.20250807105828-ccbae8154e54 h1:Bh4IzI1UUqyxUnAJScR2rT/7V8mJqkzFSoEuMfm/T5k= -github.com/livekit/protocol v1.39.4-0.20250807105828-ccbae8154e54/go.mod h1:YlgUxAegtU8jZ0tVXoIV/4fHeHqqLvS+6JnPKDbpFPU= +github.com/livekit/protocol v1.40.1-0.20250826073447-c714707269e5 h1:aBqHlrgCI3qzVUAoOx7n5Kt8GQ8g7UbNp69fUt2gO8I= +github.com/livekit/protocol v1.40.1-0.20250826073447-c714707269e5/go.mod h1:YlgUxAegtU8jZ0tVXoIV/4fHeHqqLvS+6JnPKDbpFPU= github.com/livekit/psrpc v0.6.1-0.20250726180611-3915e005e741 h1:KKL1u94l6dF9u4cBwnnfozk27GH1txWy2SlvkfgmzoY= github.com/livekit/psrpc v0.6.1-0.20250726180611-3915e005e741/go.mod h1:AuDC5uOoEjQJEc69v4Li3t77Ocz0e0NdjQEuFfO+vfk= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= @@ -111,8 +111,8 @@ github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3N github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= -github.com/nats-io/nats.go v1.44.0 h1:ECKVrDLdh/kDPV1g0gAQ+2+m2KprqZK5O/eJAyAnH2M= -github.com/nats-io/nats.go v1.44.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= +github.com/nats-io/nats.go v1.45.0 h1:/wGPbnYXDM0pLKFjZTX+2JOw9TQPoIgTFrUaH97giwA= +github.com/nats-io/nats.go v1.45.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= @@ -147,24 +147,24 @@ github.com/pion/sctp v1.8.39 h1:PJma40vRHa3UTO3C4MyeJDQ+KIobVYRZQZ0Nt7SjQnE= github.com/pion/sctp v1.8.39/go.mod h1:cNiLdchXra8fHQwmIoqw0MbLLMs+f7uQ+dGMG2gWebE= github.com/pion/sdp/v3 v3.0.15 h1:F0I1zds+K/+37ZrzdADmx2Q44OFDOPRLhPnNTaUX9hk= github.com/pion/sdp/v3 v3.0.15/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E= -github.com/pion/srtp/v3 v3.0.6 h1:E2gyj1f5X10sB/qILUGIkL4C2CqK269Xq167PbGCc/4= -github.com/pion/srtp/v3 v3.0.6/go.mod h1:BxvziG3v/armJHAaJ87euvkhHqWe9I7iiOy50K2QkhY= +github.com/pion/srtp/v3 v3.0.7 h1:QUElw0A/FUg3MP8/KNMZB3i0m8F9XeMnTum86F7S4bs= +github.com/pion/srtp/v3 v3.0.7/go.mod h1:qvnHeqbhT7kDdB+OGB05KA/P067G3mm7XBfLaLiaNF0= github.com/pion/stun/v3 v3.0.0 h1:4h1gwhWLWuZWOJIJR9s2ferRO+W3zA/b6ijOI6mKzUw= github.com/pion/stun/v3 v3.0.0/go.mod h1:HvCN8txt8mwi4FBvS3EmDghW6aQJ24T+y+1TKjB5jyU= github.com/pion/transport/v3 v3.0.7 h1:iRbMH05BzSNwhILHoBoAPxoB9xQgOaJk+591KC9P1o0= github.com/pion/transport/v3 v3.0.7/go.mod h1:YleKiTZ4vqNxVwh77Z0zytYi7rXHl7j6uPLGhhz9rwo= -github.com/pion/turn/v4 v4.0.2 h1:ZqgQ3+MjP32ug30xAbD6Mn+/K4Sxi3SdNOTFf+7mpps= -github.com/pion/turn/v4 v4.0.2/go.mod h1:pMMKP/ieNAG/fN5cZiN4SDuyKsXtNTr0ccN7IToA1zs= -github.com/pion/webrtc/v4 v4.1.3 h1:YZ67Boj9X/hk190jJZ8+HFGQ6DqSZ/fYP3sLAZv7c3c= -github.com/pion/webrtc/v4 v4.1.3/go.mod h1:rsq+zQ82ryfR9vbb0L1umPJ6Ogq7zm8mcn9fcGnxomM= +github.com/pion/turn/v4 v4.1.1 h1:9UnY2HB99tpDyz3cVVZguSxcqkJ1DsTSZ+8TGruh4fc= +github.com/pion/turn/v4 v4.1.1/go.mod h1:2123tHk1O++vmjI5VSD0awT50NywDAq5A2NNNU4Jjs8= +github.com/pion/webrtc/v4 v4.1.5-0.20250825162555-4b37165dcc27 h1:3kMHKeDsmhGui8M9ImsIBUlTLRDsXtt/f5HHk2VUvEM= +github.com/pion/webrtc/v4 v4.1.5-0.20250825162555-4b37165dcc27/go.mod h1:Oab9npu1iZtQRMic3K3toYq5zFPvToe/QBw7dMI2ok4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/puzpuzpuz/xsync/v3 v3.5.1 h1:GJYJZwO6IdxN/IKbneznS6yPkVC+c3zyY/j19c++5Fg= github.com/puzpuzpuz/xsync/v3 v3.5.1/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA= -github.com/redis/go-redis/v9 v9.12.0 h1:XlVPGlflh4nxfhsNXPA8Qp6EmEfTo0rp8oaBzPipXnU= -github.com/redis/go-redis/v9 v9.12.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= +github.com/redis/go-redis/v9 v9.12.1 h1:k5iquqv27aBtnTm2tIkROUDp8JBXhXZIVu1InSgvovg= +github.com/redis/go-redis/v9 v9.12.1/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= github.com/shoenig/test v1.7.0 h1:eWcHtTXa6QLnBvm0jgEabMRN/uJ4DMV3M8xUGgRkZmk= github.com/shoenig/test v1.7.0/go.mod h1:UxJ6u/x2v/TNs/LoLxBNJRV9DiwBBKYxXSyczsBHFoI= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= @@ -195,6 +195,18 @@ github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ= +go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I= +go.opentelemetry.io/otel/metric v1.37.0 h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/WgbsdpcPoZE= +go.opentelemetry.io/otel/metric v1.37.0/go.mod h1:04wGrZurHYKOc+RKeye86GwKiTb9FKm1WHtO+4EVr2E= +go.opentelemetry.io/otel/sdk v1.37.0 h1:ItB0QUqnjesGRvNcmAcU0LyvkVyGJ2xftD29bWdDvKI= +go.opentelemetry.io/otel/sdk v1.37.0/go.mod h1:VredYzxUvuo2q3WRcDnKDjbdvmO0sCzOvVAiY+yUkAg= +go.opentelemetry.io/otel/sdk/metric v1.37.0 h1:90lI228XrB9jCMuSdA0673aubgRobVZFhbjxHHspCPc= +go.opentelemetry.io/otel/sdk/metric v1.37.0/go.mod h1:cNen4ZWfiD37l5NhS+Keb5RXVWZWpRE+9WyVCpbo5ps= +go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4= +go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= @@ -208,21 +220,21 @@ go.uber.org/zap/exp v0.3.0/go.mod h1:5I384qq7XGxYyByIhHm6jg5CHkGY0nsTfbDLgDDlgJQ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= -golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM= -golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY= -golang.org/x/exp v0.0.0-20250718183923-645b1fa84792 h1:R9PFI6EUdfVKgwKjZef7QIwGcBKu86OEFpJ9nUEP2l4= -golang.org/x/exp v0.0.0-20250718183923-645b1fa84792/go.mod h1:A+z0yzpGtvnG90cToK5n2tu8UJVP2XUATh+r+sfOOOc= +golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4= +golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc= +golang.org/x/exp v0.0.0-20250819193227-8b4c13bb791b h1:DXr+pvt3nC887026GRP39Ej11UATqWDmWuS99x26cD0= +golang.org/x/exp v0.0.0-20250819193227-8b4c13bb791b/go.mod h1:4QTo5u+SEIbbKW1RacMZq1YEfOBqeXa19JeshGi+zc4= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/mod v0.26.0 h1:EGMPT//Ezu+ylkCijjPc+f4Aih7sZvaAr+O3EHBxvZg= -golang.org/x/mod v0.26.0/go.mod h1:/j6NAhSk8iQ723BGAUyoAcn7SlD7s15Dp9Nd/SfeaFQ= +golang.org/x/mod v0.27.0 h1:kb+q2PyFnEADO2IEF935ehFUXlWiNjJWtRNgBLSfbxQ= +golang.org/x/mod v0.27.0/go.mod h1:rWI627Fq0DEoudcK+MBkNkCe0EetEaDSwJJkCcjpazc= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= -golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs= -golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8= +golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= +golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -236,8 +248,8 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA= -golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= @@ -249,21 +261,23 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4= -golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU= +golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= +golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/api v0.0.0-20250804133106-a7a43d27e69b h1:ULiyYQ0FdsJhwwZUwbaXpZF5yUE3h+RA+gxvBu37ucc= -google.golang.org/genproto/googleapis/api v0.0.0-20250804133106-a7a43d27e69b/go.mod h1:oDOGiMSXHL4sDTJvFvIB9nRQCGdLP1o/iVaqQK8zB+M= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b h1:zPKJod4w6F1+nRGDI9ubnXYhU9NSWoFAijkHkUXeTK8= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= -google.golang.org/grpc v1.74.2 h1:WoosgB65DlWVC9FqI82dGsZhWFNBSLjQ84bjROOpMu4= -google.golang.org/grpc v1.74.2/go.mod h1:CtQ+BGjaAIXHs/5YS3i473GqwBBa1zGQNevxdeBEXrM= -google.golang.org/protobuf v1.36.7 h1:IgrO7UwFQGJdRNXH/sQux4R1Dj1WAKcLElzeeRaXV2A= -google.golang.org/protobuf v1.36.7/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= +gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= +gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= +google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5 h1:BIRfGDEjiHRrk0QKZe3Xv2ieMhtgRGeLcZQ0mIVn4EY= +google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5/go.mod h1:j3QtIyytwqGr1JUDtYXwtMXWPKsEa5LtzIFN1Wn5WvE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 h1:eaY8u2EuxbRv7c3NiGK0/NedzVsCcV6hDuU5qPX5EGE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5/go.mod h1:M4/wBTSeyLxupu3W3tJtOgB14jILAS/XWPSSa3TAlJc= +google.golang.org/grpc v1.75.0 h1:+TW+dqTd2Biwe6KKfhE5JpiYIBWq865PhKGSXiivqt4= +google.golang.org/grpc v1.75.0/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ= +google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= +google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/localparticipant.go b/localparticipant.go index 413b657a..b306f4f1 100644 --- a/localparticipant.go +++ b/localparticipant.go @@ -67,8 +67,8 @@ func (p *LocalParticipant) PublishTrack(track webrtc.TrackLocal, opts *TrackPubl } } - publisher, ok := p.engine.Publisher() - if !ok { + transport := p.getPublishTransport() + if transport == nil { return nil, ErrNoPeerConnection } @@ -79,6 +79,20 @@ func (p *LocalParticipant) PublishTrack(track webrtc.TrackLocal, opts *TrackPubl pub := NewLocalTrackPublication(kind, track, *opts, p.engine) pub.onMuteChanged = p.onTrackMuted + // add transceivers - re-use if possible, AddTrack will try to re-use. + // NOTE: `AddTrack` technically cannot re-use transceiver if it was ever + // used to send media, i. e. if it was ever in a `sendrecv` or `sendonly` + // direction. But, pion does not enforce that based on browser behaviour + // observed in practice. + sender, err := transport.PeerConnection().AddTrack(track) + if err != nil { + return nil, err + } + + // LocalTrack will consume rtcp packets so we don't need to consume again + _, isSampleTrack := track.(*LocalTrack) + pub.setSender(sender, !isSampleTrack) + req := &livekit.AddTrackRequest{ Cid: track.ID(), Name: opts.Name, @@ -105,21 +119,7 @@ func (p *LocalParticipant) PublishTrack(track webrtc.TrackLocal, opts *TrackPubl return nil, err } - // add transceivers - re-use if possible, AddTrack will try to re-use. - // NOTE: `AddTrack` technically cannot re-use transceiver if it was ever - // used to send media, i. e. if it was ever in a `sendrecv` or `sendonly` - // direction. But, pion does not enforce that based on browser behaviour - // observed in practice. - sender, err := publisher.PeerConnection().AddTrack(track) - if err != nil { - return nil, err - } - - // LocalTrack will consume rtcp packets so we don't need to consume again - _, isSampleTrack := track.(*LocalTrack) - pub.setSender(sender, !isSampleTrack) - - publisher.Negotiate() + transport.Negotiate() var pubRes *livekit.TrackPublishedResponse select { @@ -179,48 +179,18 @@ func (p *LocalParticipant) PublishSimulcastTrack(tracks []*LocalTrack, opts *Tra pub := NewLocalTrackPublication(KindFromRTPType(mainTrack.Kind()), nil, *opts, p.engine) pub.onMuteChanged = p.onTrackMuted - var layers []*livekit.VideoLayer - for _, st := range tracksCopy { - layers = append(layers, st.videoLayer) - } - err := p.engine.SendAddTrack( - &livekit.AddTrackRequest{ - Cid: mainTrack.ID(), - Name: opts.Name, - Source: opts.Source, - Type: pub.Kind().ProtoType(), - Width: mainTrack.videoLayer.Width, - Height: mainTrack.videoLayer.Height, - Layers: layers, - SimulcastCodecs: []*livekit.SimulcastCodec{ - { - Codec: mainTrack.Codec().MimeType, - Cid: mainTrack.ID(), - }, - }, - }, - ) - if err != nil { - return nil, err - } - - var pubRes *livekit.TrackPublishedResponse - select { - case pubRes = <-pubChan: - break - case <-time.After(trackPublishTimeout): - return nil, ErrTrackPublishTimeout - } - - publisher, ok := p.engine.Publisher() - if !ok { + transport := p.getPublishTransport() + if transport == nil { return nil, ErrNoPeerConnection } // add transceivers - publishPC := publisher.PeerConnection() - var transceiver *webrtc.RTPTransceiver - var sender *webrtc.RTPSender + var ( + transceiver *webrtc.RTPTransceiver + sender *webrtc.RTPSender + err error + ) + pc := transport.PeerConnection() for idx, st := range tracksCopy { if idx == 0 { // add transceivers - re-use if possible, AddTrack will try to re-use. @@ -228,13 +198,13 @@ func (p *LocalParticipant) PublishSimulcastTrack(tracks []*LocalTrack, opts *Tra // used to send media, i. e. if it was ever in a `sendrecv` or `sendonly` // direction. But, pion does not enforce that based on browser behaviour // observed in practice. - sender, err = publishPC.AddTrack(st) + sender, err = pc.AddTrack(st) if err != nil { return nil, err } // as there is no way to get transceiver from sender, search - for _, tr := range publishPC.GetTransceivers() { + for _, tr := range pc.GetTransceivers() { if tr.Sender() == sender { transceiver = tr break @@ -251,10 +221,43 @@ func (p *LocalParticipant) PublishSimulcastTrack(tracks []*LocalTrack, opts *Tra st.SetTransceiver(transceiver) } + var layers []*livekit.VideoLayer + for _, st := range tracksCopy { + layers = append(layers, st.videoLayer) + } + err = p.engine.SendAddTrack( + &livekit.AddTrackRequest{ + Cid: mainTrack.ID(), + Name: opts.Name, + Source: opts.Source, + Type: pub.Kind().ProtoType(), + Width: mainTrack.videoLayer.Width, + Height: mainTrack.videoLayer.Height, + Layers: layers, + SimulcastCodecs: []*livekit.SimulcastCodec{ + { + Codec: mainTrack.Codec().MimeType, + Cid: mainTrack.ID(), + }, + }, + }, + ) + if err != nil { + return nil, err + } + + var pubRes *livekit.TrackPublishedResponse + select { + case pubRes = <-pubChan: + break + case <-time.After(trackPublishTimeout): + return nil, ErrTrackPublishTimeout + } + pub.updateInfo(pubRes.Track) p.addPublication(pub) - publisher.Negotiate() + transport.Negotiate() p.Callback.OnLocalTrackPublished(pub, p) p.roomCallback.OnLocalTrackPublished(pub, p) @@ -382,17 +385,17 @@ func (p *LocalParticipant) UnpublishTrack(sid string) error { var err error if localTrack, ok := pub.track.(webrtc.TrackLocal); ok { - publisher, ok := p.engine.Publisher() - if !ok { + transport := p.getPublishTransport() + if transport == nil { return ErrNoPeerConnection } - for _, sender := range publisher.pc.GetSenders() { + for _, sender := range transport.pc.GetSenders() { if sender.Track() == localTrack { - err = publisher.pc.RemoveTrack(sender) + err = transport.pc.RemoveTrack(sender) break } } - publisher.Negotiate() + transport.Negotiate() } pub.CloseTrack() @@ -859,3 +862,12 @@ func (p *LocalParticipant) SendFile(filePath string, options StreamBytesOptions) return &writer.Info, nil } + +func (p *LocalParticipant) getPublishTransport() *PCTransport { + publisher, ok := p.engine.Publisher() + if ok { + return publisher + } + + return nil +} diff --git a/room.go b/room.go index 15a23fac..491cccd3 100644 --- a/room.go +++ b/room.go @@ -27,6 +27,7 @@ import ( "github.com/pion/rtcp" "github.com/pion/webrtc/v4" "golang.org/x/exp/maps" + "golang.org/x/mod/semver" "google.golang.org/protobuf/proto" protoLogger "github.com/livekit/protocol/logger" @@ -161,14 +162,15 @@ func WithExtraAttributes(attrs map[string]string) ConnectOption { type PLIWriter func(webrtc.SSRC) type Room struct { - log protoLogger.Logger - engine *RTCEngine - sid string - name string - LocalParticipant *LocalParticipant - callback *RoomCallback - connectionState ConnectionState - sidReady chan struct{} + log protoLogger.Logger + useSinglePeerConnection bool + engine *RTCEngine + sid string + name string + LocalParticipant *LocalParticipant + callback *RoomCallback + connectionState ConnectionState + sidReady chan struct{} remoteParticipants map[livekit.ParticipantIdentity]*RemoteParticipant sidToIdentity map[livekit.ParticipantID]livekit.ParticipantIdentity @@ -192,23 +194,24 @@ type Room struct { // NewRoom can be used to update callbacks before calling Join func NewRoom(callback *RoomCallback) *Room { r := &Room{ - log: logger, - remoteParticipants: make(map[livekit.ParticipantIdentity]*RemoteParticipant), - sidToIdentity: make(map[livekit.ParticipantID]livekit.ParticipantIdentity), - sidDefers: make(map[livekit.ParticipantID]map[livekit.TrackID]func(*RemoteParticipant)), - callback: NewRoomCallback(), - sidReady: make(chan struct{}), - connectionState: ConnectionStateDisconnected, - regionURLProvider: newRegionURLProvider(), - byteStreamHandlers: &sync.Map{}, - byteStreamReaders: &sync.Map{}, - textStreamHandlers: &sync.Map{}, - textStreamReaders: &sync.Map{}, - rpcHandlers: &sync.Map{}, + log: logger, + useSinglePeerConnection: semver.Compare("v"+Version, "v3.0.0") >= 0, + remoteParticipants: make(map[livekit.ParticipantIdentity]*RemoteParticipant), + sidToIdentity: make(map[livekit.ParticipantID]livekit.ParticipantIdentity), + sidDefers: make(map[livekit.ParticipantID]map[livekit.TrackID]func(*RemoteParticipant)), + callback: NewRoomCallback(), + sidReady: make(chan struct{}), + connectionState: ConnectionStateDisconnected, + regionURLProvider: newRegionURLProvider(), + byteStreamHandlers: &sync.Map{}, + byteStreamReaders: &sync.Map{}, + textStreamHandlers: &sync.Map{}, + textStreamReaders: &sync.Map{}, + rpcHandlers: &sync.Map{}, } r.callback.Merge(callback) - r.engine = NewRTCEngine(r, r.getLocalParticipantSID) + r.engine = NewRTCEngine(r.useSinglePeerConnection, r, r.getLocalParticipantSID) r.LocalParticipant = newLocalParticipant(r.engine, r.callback, r.serverInfo) return r } @@ -506,20 +509,39 @@ func (r *Room) addRemoteParticipant(pi *livekit.ParticipantInfo, updateExisting } func (r *Room) sendSyncState() { - subscriber, ok := r.engine.Subscriber() - if !ok || subscriber.pc.RemoteDescription() == nil { + var previousOffer *webrtc.SessionDescription + var previousAnswer *webrtc.SessionDescription + if r.useSinglePeerConnection { + publisher, ok := r.engine.Publisher() + if ok { + previousOffer = publisher.pc.RemoteDescription() + previousAnswer = publisher.pc.LocalDescription() + } + } else { + subscriber, ok := r.engine.Subscriber() + if ok { + previousOffer = subscriber.pc.RemoteDescription() + previousAnswer = subscriber.pc.LocalDescription() + } + } + if previousOffer == nil || previousAnswer == nil { return } - previousSdp := subscriber.pc.LocalDescription() - var trackSids []string + var trackSidsDisabled []string sendUnsub := r.engine.connParams.AutoSubscribe for _, rp := range r.GetRemoteParticipants() { for _, t := range rp.TrackPublications() { if t.IsSubscribed() != sendUnsub { trackSids = append(trackSids, t.SID()) } + + if rpub, ok := t.(*RemoteTrackPublication); ok { + if !rpub.IsEnabled() { + trackSidsDisabled = append(trackSidsDisabled, t.SID()) + } + } } } @@ -550,13 +572,16 @@ func (r *Room) sendSyncState() { getDCinfo(r.engine.GetDataChannelSub(livekit.DataPacket_LOSSY), livekit.SignalTarget_SUBSCRIBER) r.engine.SendSyncState(&livekit.SyncState{ - Answer: protosignalling.ToProtoSessionDescription(*previousSdp, 0), + Offer: protosignalling.ToProtoSessionDescription(*previousOffer, 0), + Answer: protosignalling.ToProtoSessionDescription(*previousAnswer, 0), Subscription: &livekit.UpdateSubscription{ TrackSids: trackSids, Subscribe: !sendUnsub, }, - PublishTracks: publishedTracks, - DataChannels: dataChannels, + PublishTracks: publishedTracks, + DataChannels: dataChannels, + TrackSidsDisabled: trackSidsDisabled, + // MIGRATION-TODO DatachannelReceiveStates }) } @@ -1012,6 +1037,47 @@ func (r *Room) OnSubscribedQualityUpdate(subscribedQualityUpdate *livekit.Subscr } } +func (r *Room) OnMediaSectionsRequirement(mediaSectionsRequirement *livekit.MediaSectionsRequirement) { + addTransceivers := func(transport *PCTransport, kind webrtc.RTPCodecType, count uint32) { + for i := uint32(0); i < count; i++ { + if _, err := transport.PeerConnection().AddTransceiverFromKind( + kind, + webrtc.RTPTransceiverInit{ + Direction: webrtc.RTPTransceiverDirectionRecvonly, + }, + ); err != nil { + r.log.Warnw( + "could not add transceiver", err, + "room", r.name, + "roomID", r.sid, + "participant", r.LocalParticipant.Identity(), + "pID", r.LocalParticipant.SID(), + "kind", kind, + ) + } else { + r.log.Debugw( + "added transceiver of kind", + "room", r.name, + "roomID", r.sid, + "participant", r.LocalParticipant.Identity(), + "pID", r.LocalParticipant.SID(), + "kind", kind, + ) + } + } + } + + publisher, ok := r.engine.Publisher() + if !ok { + r.log.Warnw("no publisher peer connection", ErrNoPeerConnection) + return + } + + addTransceivers(publisher, webrtc.RTPCodecTypeAudio, mediaSectionsRequirement.NumAudios) + addTransceivers(publisher, webrtc.RTPCodecTypeVideo, mediaSectionsRequirement.NumVideos) + publisher.Negotiate() +} + func (r *Room) OnStreamHeader(streamHeader *livekit.DataStream_Header, participantIdentity string) { switch header := streamHeader.ContentHeader.(type) { case *livekit.DataStream_Header_TextHeader: diff --git a/signalling/interfaces.go b/signalling/interfaces.go index 2189ff1c..21ad93ad 100644 --- a/signalling/interfaces.go +++ b/signalling/interfaces.go @@ -136,4 +136,5 @@ type SignalProcessor interface { OnLeave(*livekit.LeaveRequest) OnLocalTrackSubscribed(trackSubscribed *livekit.TrackSubscribed) OnSubscribedQualityUpdate(subscribedQualityUpdate *livekit.SubscribedQualityUpdate) + OnMediaSectionsRequirement(mediaSectionsRequirement *livekit.MediaSectionsRequirement) } diff --git a/signalling/signalhandler.go b/signalling/signalhandler.go index 961c2bcf..6c058290 100644 --- a/signalling/signalhandler.go +++ b/signalling/signalhandler.go @@ -115,6 +115,9 @@ func (s *signalhandler) HandleMessage(msg proto.Message) error { case *livekit.SignalResponse_SubscribedQualityUpdate: s.params.Processor.OnSubscribedQualityUpdate(payload.SubscribedQualityUpdate) + + case *livekit.SignalResponse_MediaSectionsRequirement: + s.params.Processor.OnMediaSectionsRequirement(payload.MediaSectionsRequirement) } return nil