Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,23 @@ func NewRTCEngine(engineHandler engineHandler, getLocalParticipantSID func() str
SignalTransportHandler: e,
SignalHandler: e.signalHandler,
})
/*
e.signalling = signalling.NewSignallingv2(signalling.Signallingv2Params{
Logger: e.log,
})
e.signalHandler = signalling.NewSignalHandlerv2(signalling.SignalHandlerv2Params{
Logger: e.log,
Processor: e,
Signalling: e.signalling,
})
e.signalTransport = signalling.NewSignalTransportHybrid(signalling.SignalTransportHybridParams{
Logger: e.log,
Version: Version,
Protocol: PROTOCOL,
Signalling: e.signalling,
SignalHandler: e.signalHandler,
})
*/

e.onClose = []func(){}
return e
Expand Down Expand Up @@ -1085,14 +1102,18 @@ func (e *RTCEngine) validate(
case http.StatusUnauthorized:
errString = "unauthorized: "
case http.StatusNotFound:
errString = "not found: "
errString = "not found"
case http.StatusServiceUnavailable:
errString = "unavailable: "
}
body, err := io.ReadAll(hresp.Body)
if err == nil {
errString += string(body)
if hresp.StatusCode != http.StatusNotFound {
body, err := io.ReadAll(hresp.Body)
if err == nil {
errString += e.signalling.DecodeErrorResponse(body)
}
e.log.Errorw("validation error", errors.New(errString), "httpResponse", hresp)
} else {
e.log.Errorw("validation error", errors.New(errString))
}
return errors.New(errString)
}
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731
github.com/livekit/media-sdk v0.0.0-20250518151703-b07af88637c5
github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded
github.com/livekit/protocol v1.39.4-0.20250721114233-52633eee694f
github.com/livekit/protocol v1.39.4-0.20250725083335-7313a8195a4b
github.com/magefile/mage v1.15.0
github.com/pion/dtls/v3 v3.0.6
github.com/pion/interceptor v0.1.40
Expand Down Expand Up @@ -51,7 +51,7 @@ require (
github.com/klauspost/compress v1.18.0 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/lithammer/shortuuid/v4 v4.2.0 // indirect
github.com/livekit/psrpc v0.6.1-0.20250511053145-465289d72c3c // indirect
github.com/livekit/psrpc v0.6.1-0.20250724161801-262a822e7cd7 // indirect
github.com/nats-io/nats.go v1.43.0 // indirect
github.com/nats-io/nkeys v0.4.11 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
Expand Down Expand Up @@ -81,7 +81,7 @@ require (
golang.org/x/text v0.27.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250721164621-a45f3dfb1074 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250721164621-a45f3dfb1074 // indirect
google.golang.org/grpc v1.73.0 // indirect
google.golang.org/grpc v1.74.2 // indirect
gopkg.in/hraban/opus.v2 v2.0.0-20230925203106-0188a62cb302 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ github.com/livekit/media-sdk v0.0.0-20250518151703-b07af88637c5 h1:aFCwt/rticj5L
github.com/livekit/media-sdk v0.0.0-20250518151703-b07af88637c5/go.mod h1:7ssWiG+U4xnbvLih9WiZbhQP6zIKMjgXdUtIE1bm/E8=
github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded h1:ylZPdnlX1RW9Z15SD4mp87vT2D2shsk0hpLJwSPcq3g=
github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded/go.mod h1:mSNtYzSf6iY9xM3UX42VEI+STHvMgHmrYzEHPcdhB8A=
github.com/livekit/protocol v1.39.4-0.20250721114233-52633eee694f h1:Cwe38+/ld3r5dnNmIZSALSoZPWNEMeYPZIi/qjpplLo=
github.com/livekit/protocol v1.39.4-0.20250721114233-52633eee694f/go.mod h1:YlgUxAegtU8jZ0tVXoIV/4fHeHqqLvS+6JnPKDbpFPU=
github.com/livekit/psrpc v0.6.1-0.20250511053145-465289d72c3c h1:WwEr0YBejYbKzk8LSaO9h8h0G9MnE7shyDu8yXQWmEc=
github.com/livekit/psrpc v0.6.1-0.20250511053145-465289d72c3c/go.mod h1:kmD+AZPkWu0MaXIMv57jhNlbiSZZ/Jx4bzlxBDVmJes=
github.com/livekit/protocol v1.39.4-0.20250725083335-7313a8195a4b h1:uxvoeGd0vmGDIL0JyLLV9h2o97tpt3rR9s4ikuLVz/g=
github.com/livekit/protocol v1.39.4-0.20250725083335-7313a8195a4b/go.mod h1:YlgUxAegtU8jZ0tVXoIV/4fHeHqqLvS+6JnPKDbpFPU=
github.com/livekit/psrpc v0.6.1-0.20250724161801-262a822e7cd7 h1:x50axcjXwfwnII7sMhJPyZ6f5LpPapZtsp75KJX8nIQ=
github.com/livekit/psrpc v0.6.1-0.20250724161801-262a822e7cd7/go.mod h1:kmD+AZPkWu0MaXIMv57jhNlbiSZZ/Jx4bzlxBDVmJes=
github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg=
github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
Expand Down Expand Up @@ -258,8 +258,8 @@ google.golang.org/genproto/googleapis/api v0.0.0-20250721164621-a45f3dfb1074 h1:
google.golang.org/genproto/googleapis/api v0.0.0-20250721164621-a45f3dfb1074/go.mod h1:vYFwMYFbmA8vl6Z/krj/h7+U/AqpHknwJX4Uqgfyc7I=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250721164621-a45f3dfb1074 h1:qJW29YvkiJmXOYMu5Tf8lyrTp3dOS+K4z6IixtLaCf8=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250721164621-a45f3dfb1074/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
google.golang.org/grpc v1.73.0 h1:VIWSmpI2MegBtTuFt5/JWy2oXxtjJ/e89Z70ImfD2ok=
google.golang.org/grpc v1.73.0/go.mod h1:50sbHOUqWoCQGI8V2HQLJM0B+LMlIUjNSZmow7EVBQc=
google.golang.org/grpc v1.74.2 h1:WoosgB65DlWVC9FqI82dGsZhWFNBSLjQ84bjROOpMu4=
google.golang.org/grpc v1.74.2/go.mod h1:CtQ+BGjaAIXHs/5YS3i473GqwBBa1zGQNevxdeBEXrM=
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
3 changes: 3 additions & 0 deletions signalling/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type Signalling interface {
connectParams *ConnectParams,
participantSID string,
) (*http.Request, error)
DecodeErrorResponse(errorDetails []byte) string

SignalLeaveRequest(leave *livekit.LeaveRequest) proto.Message
SignalICECandidate(trickle *livekit.TrickleRequest) proto.Message
Expand All @@ -100,6 +101,8 @@ type Signalling interface {
AckMessageId(ackMessageId uint32)
SetLastProcessedRemoteMessageId(lastProcessedRemoteMessageId uint32)

PendingMessages() proto.Message

SignalConnectRequest(connectRequest *livekit.ConnectRequest) proto.Message
}

Expand Down
7 changes: 1 addition & 6 deletions signalling/signalhandlerv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ func (s *signalhandlerv2) HandleMessage(msg proto.Message) error {
switch msg := wireMessage.GetMessage().(type) {
case *livekit.Signalv2WireMessage_Envelope:
for _, serverMessage := range msg.Envelope.ServerMessages {
/* SIGNALLING-V2-TODO: uncomment once server sends proper id
sequencer := serverMessage.GetSequencer()
if sequencer == nil || sequencer.MessageId == 0 {
s.params.Logger.Warnw(
Expand All @@ -87,16 +86,14 @@ func (s *signalhandlerv2) HandleMessage(msg proto.Message) error {
continue
}

// SIGNALLING-V2-TODO: ask for replay if there are gaps
if lprmi != 0 && sequencer.MessageId != lprmi+1 {
s.params.Logger.Infow(
"gap in message stream",
"last", lprmi,
"current", serverMessage.Sequencer.MessageId,
)
}
*/

// SIGNALLING-V2-TODO: ask for replay if there are gaps

// SIGNALLING-V2-TODO: process messages
switch payload := serverMessage.GetMessage().(type) {
Expand All @@ -116,11 +113,9 @@ func (s *signalhandlerv2) HandleMessage(msg proto.Message) error {
)
}

/* SIGNALLING-V2-TODO: uncomment once sequencer is implemented on both sides
s.lastProcessedRemoteMessageId.Store(sequencer.MessageId)
s.params.Signalling.AckMessageId(sequencer.LastProcessedRemoteMessageId)
s.params.Signalling.SetLastProcessedRemoteMessageId(sequencer.MessageId)
*/
}

case *livekit.Signalv2WireMessage_Fragment:
Expand Down
4 changes: 4 additions & 0 deletions signalling/signalling.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ func (s *signalling) HTTPRequestForValidate(
return req, nil
}

func (s *signalling) DecodeErrorResponse(errorDetails []byte) string {
return string(errorDetails)
}

func (s *signalling) SignalLeaveRequest(leave *livekit.LeaveRequest) proto.Message {
return &livekit.SignalRequest{
Message: &livekit.SignalRequest_Leave{
Expand Down
8 changes: 8 additions & 0 deletions signalling/signallingunimplemented.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ func (s *signallingUnimplemented) HTTPRequestForValidate(
return nil, ErrUnimplemented
}

func (s *signallingUnimplemented) DecodeErrorResponse(errorDetails []byte) string {
return ""
}

func (s *signallingUnimplemented) SignalLeaveRequest(leave *livekit.LeaveRequest) proto.Message {
return nil
}
Expand Down Expand Up @@ -129,6 +133,10 @@ func (u *signallingUnimplemented) AckMessageId(ackMessageId uint32) {}
func (u *signallingUnimplemented) SetLastProcessedRemoteMessageId(lastProcessedRemoteMessageId uint32) {
}

func (u *signallingUnimplemented) PendingMessages() proto.Message {
return nil
}

func (s *signallingUnimplemented) SignalConnectRequest(connectRequest *livekit.ConnectRequest) proto.Message {
return nil
}
53 changes: 47 additions & 6 deletions signalling/signallingv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ package signalling
import (
"bytes"
"context"
"encoding/json"
"net/http"
"net/url"
"runtime"

"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
protosignalling "github.com/livekit/protocol/signalling"
"google.golang.org/protobuf/proto"
)

Expand All @@ -36,11 +38,16 @@ type signallingv2 struct {
signallingUnimplemented

params Signallingv2Params

signalCache *protosignalling.Signalv2ClientMessageCache
}

func NewSignallingv2(params Signallingv2Params) Signalling {
return &signallingv2{
params: params,
signalCache: protosignalling.NewSignalv2ClientMessageCache(protosignalling.SignalCacheParams{
Logger: params.Logger,
}),
}
}

Expand Down Expand Up @@ -133,12 +140,39 @@ func (s *signallingv2) HTTPRequestForValidate(
return req, nil
}

func (s *signallingv2) DecodeErrorResponse(details []byte) string {
var errorDetails struct {
Error string `json:"error"`
}
err := json.Unmarshal(details, &errorDetails)
if err != nil {
return string(details)
}

return errorDetails.Error
}

func (s *signallingv2) AckMessageId(ackMessageId uint32) {
// SIGNALLING-V2-TODO s.signalCache.Clear(ackMessageId)
s.signalCache.Clear(ackMessageId)
}

func (s *signallingv2) SetLastProcessedRemoteMessageId(lastProcessedRemoteMessageId uint32) {
// SIGNALLING-V2-TODO s.signalCache.SetLastProcessedRemoteMessageId(lastProcessedRemoteMessageId)
s.signalCache.SetLastProcessedRemoteMessageId(lastProcessedRemoteMessageId)
}

func (s *signallingv2) PendingMessages() proto.Message {
clientMessages := s.signalCache.GetFromFront()
if len(clientMessages) == 0 {
return nil
}

return &livekit.Signalv2WireMessage{
Message: &livekit.Signalv2WireMessage_Envelope{
Envelope: &livekit.Envelope{
ClientMessages: clientMessages,
},
},
}
}

func (s *signallingv2) SignalConnectRequest(connectRequest *livekit.ConnectRequest) proto.Message {
Expand Down Expand Up @@ -168,13 +202,20 @@ func (s *signallingv2) SignalSdpAnswer(answer *livekit.SessionDescription) proto
return s.cacheAndReturnEnvelope(clientMessage)
}

func (s *signallingv2) SignalICECandidate(trickle *livekit.TrickleRequest) proto.Message {
clientMessage := &livekit.Signalv2ClientMessage{
Message: &livekit.Signalv2ClientMessage_Trickle{
Trickle: trickle,
},
}
return s.cacheAndReturnEnvelope(clientMessage)
}

func (s *signallingv2) cacheAndReturnEnvelope(cm *livekit.Signalv2ClientMessage) proto.Message {
/* SIGNALLING-V2-TODO
sm = s.signalCache.Add(sm)
if sm == nil {
cm = s.signalCache.Add(cm)
if cm == nil {
return nil
}
*/

return &livekit.Signalv2WireMessage{
Message: &livekit.Signalv2WireMessage_Envelope{
Expand Down
14 changes: 7 additions & 7 deletions signalling/signaltransport_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,21 +208,21 @@ func (s *signalTransportHttp) sendHttpRequest(
return nil, err
}

defer hresp.Body.Close()

if hresp.Header.Get("Content-type") != "application/x-protobuf" {
return nil, fmt.Errorf("%w: %s", ErrUnsupportedContentType, hresp.Header.Get("Content-type"))
}

s.params.Logger.Infow("http response received", "elapsed", time.Since(startedAt))

defer hresp.Body.Close()

body, err := io.ReadAll(hresp.Body)
if err != nil {
return nil, err
}

if hresp.StatusCode != http.StatusOK {
return nil, errors.New(string(body))
return nil, errors.New(s.params.Signalling.DecodeErrorResponse(body))
}

if hresp.Header.Get("Content-type") != "application/x-protobuf" {
return nil, fmt.Errorf("%w: %s", ErrUnsupportedContentType, hresp.Header.Get("Content-type"))
}

respWireMessage := &livekit.Signalv2WireMessage{}
Expand Down