diff --git a/cmd/recordtester/recordtester.go b/cmd/recordtester/recordtester.go index 1bb93504..3ee9a1ba 100644 --- a/cmd/recordtester/recordtester.go +++ b/cmd/recordtester/recordtester.go @@ -60,6 +60,10 @@ func main() { testMP4 := fs.Bool("mp4", false, "Download MP4 of recording") testStreamHealth := fs.Bool("stream-health", false, "Check stream health during test") testVod := fs.Bool("vod", false, "Check VOD workflow") + testAccessControl := fs.Bool("access-control", false, "Test access control") + testRecording := fs.Bool("recording", true, "Test recordings") + signingKey := fs.String("signing-key", "", "Signing key for access control") + publicKey := fs.String("public-key", "", "Public key for access control") recordObjectStoreId := fs.String("record-object-store-id", "", "ID for the Object Store to use for recording storage. Forwarded to the streams created in the API") discordURL := fs.String("discord-url", "", "URL of Discord's webhook to send messages to Discord channel") discordUserName := fs.String("discord-user-name", "", "User name to use when sending messages to Discord") @@ -216,6 +220,10 @@ func main() { UseHTTP: *useHttp, TestMP4: *testMP4, TestStreamHealth: *testStreamHealth, + TestAccessControl: *testAccessControl, + TestRecording: *testRecording, + SigningKey: *signingKey, + PublicKey: *publicKey, } vtOpts := vodtester.VodTesterOptions{ API: lapi, diff --git a/go.mod b/go.mod index bd3f519f..5561a4da 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/golang/glog v1.0.0 github.com/gosuri/uilive v0.0.3 // indirect github.com/gosuri/uiprogress v0.0.1 - github.com/livepeer/go-api-client v0.2.9-0.20220916171125-c13c05817515 + github.com/livepeer/go-api-client v0.3.2-0.20221111121231-26895f4f5e47 github.com/livepeer/go-livepeer v0.5.31 github.com/livepeer/joy4 v0.1.2-0.20220210094601-95e4d28f5f07 github.com/livepeer/leaderboard-serverless v1.0.0 @@ -53,6 +53,7 @@ require ( github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-stack/stack v1.8.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang-jwt/jwt/v4 v4.4.2 github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect github.com/golang/mock v1.5.0 // indirect github.com/golang/protobuf v1.5.2 // indirect diff --git a/go.sum b/go.sum index f2db92d2..55cb0eeb 100644 --- a/go.sum +++ b/go.sum @@ -462,6 +462,8 @@ github.com/gogo/protobuf v1.3.0/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXP github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-jwt/jwt/v4 v4.4.2 h1:rcc4lwaZgFMCZ5jxF9ABolDcIHdBytAFgqFPbSJQAYs= +github.com/golang-jwt/jwt/v4 v4.4.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= github.com/golang/geo v0.0.0-20190916061304-5b978397cfec/go.mod h1:QZ0nwyI2jOfgRAoBvP+ab5aRr7c9x7lhGEJrKvBwjWI= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -710,8 +712,8 @@ github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= -github.com/livepeer/go-api-client v0.2.9-0.20220916171125-c13c05817515 h1:3UvLoSvntPi0Z/yW6zskPmZZwA+lnm0pQVIvG/uBnrE= -github.com/livepeer/go-api-client v0.2.9-0.20220916171125-c13c05817515/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw= +github.com/livepeer/go-api-client v0.3.2-0.20221111121231-26895f4f5e47 h1:+uRySA5kZDErpf/hOEFz7iQoJqdoxuWnZTAl8i3cruw= +github.com/livepeer/go-api-client v0.3.2-0.20221111121231-26895f4f5e47/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw= github.com/livepeer/go-livepeer v0.5.31 h1:LcN+qDnqWRws7fdVYc4ucZPVcLQRs2tehUYCQVnlnRw= github.com/livepeer/go-livepeer v0.5.31/go.mod h1:cpBikcGWApkx0cyR0Ht+uAym7j3uAwXGpPbvaOA8XUU= github.com/livepeer/joy4 v0.1.2-0.20191121080656-b2fea45cbded/go.mod h1:xkDdm+akniYxVT9KW1Y2Y7Hso6aW+rZObz3nrA9yTHw= diff --git a/internal/app/recordtester/recordtester_app.go b/internal/app/recordtester/recordtester_app.go index 037e02b4..3b0e6a22 100644 --- a/internal/app/recordtester/recordtester_app.go +++ b/internal/app/recordtester/recordtester_app.go @@ -3,6 +3,7 @@ package recordtester import ( "bytes" "context" + "encoding/base64" "errors" "fmt" "io/ioutil" @@ -10,6 +11,7 @@ import ( "os" "time" + "github.com/golang-jwt/jwt/v4" "github.com/golang/glog" api "github.com/livepeer/go-api-client" "github.com/livepeer/joy4/format/mp4" @@ -42,6 +44,10 @@ type ( UseHTTP bool TestMP4 bool TestStreamHealth bool + TestAccessControl bool + TestRecording bool + SigningKey string + PublicKey string } recordTester struct { @@ -55,6 +61,10 @@ type ( useHTTP bool mp4 bool streamHealth bool + accessControl bool + testRecording bool + signingKey string + publicKey string // mutable fields streamID string @@ -77,6 +87,10 @@ func NewRecordTester(gctx context.Context, opts RecordTesterOptions) IRecordTest useHTTP: opts.UseHTTP, mp4: opts.TestMP4, streamHealth: opts.TestStreamHealth, + accessControl: opts.TestAccessControl, + testRecording: opts.TestRecording, + signingKey: opts.SigningKey, + publicKey: opts.PublicKey, } return rt } @@ -123,7 +137,20 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. streamName := fmt.Sprintf("%s_%s", hostName, time.Now().Format("2006-01-02T15:04:05Z07:00")) var stream *api.Stream for { - stream, err = rt.lapi.CreateStream(api.CreateStreamReq{Name: streamName, Record: true, RecordObjectStoreId: rt.recordObjectStoreId}) + streamOptions := api.CreateStreamReq{Name: streamName, Record: rt.testRecording, RecordObjectStoreId: rt.recordObjectStoreId} + + if rt.accessControl { + streamOptions.PlaybackPolicy = api.PlaybackPolicy{ + Type: "jwt", + } + glog.Infof("Creating stream with access control") + } + + if rt.testRecording { + glog.Infof("Creating stream with recording enabled") + } + + stream, err = rt.lapi.CreateStream(streamOptions) if err != nil { if testers.Timedout(err) && apiTry < 3 { apiTry++ @@ -138,6 +165,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. apiTry = 0 rt.streamID = stream.ID rt.stream = stream + defer rt.lapi.DeleteStream(stream.ID) messenger.SendMessage(fmt.Sprintf(":information_source: Created stream id=%s", stream.ID)) // createdAPIStreams = append(createdAPIStreams, stream.ID) glog.V(model.VERBOSE).Infof("Created Livepeer stream id=%s streamKey=%s playbackId=%s name=%s", stream.ID, stream.StreamKey, stream.PlaybackID, streamName) @@ -156,6 +184,19 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. mediaURL := fmt.Sprintf("%s/%s/index.m3u8", ingest.Playback, stream.PlaybackID) glog.V(model.SHORT).Infof("RTMP: %s", rtmpURL) glog.V(model.SHORT).Infof("MEDIA: %s", mediaURL) + + if rt.accessControl && (rt.signingKey != "" && rt.publicKey != "") { + token, err := rt.signJwt(stream) + if err != nil { + return 1, err + } + mediaURL = fmt.Sprintf("%s?jwt=%s", mediaURL, token) + glog.V(model.VERBOSE).Infof("URL with access control for stream id=%s playbackId=%s name=%s mediaURL=%s", stream.ID, stream.PlaybackID, streamName, mediaURL) + } else { + glog.Warningf("No access control for stream id=%s playbackId=%s name=%s mediaURL=%s", stream.ID, stream.PlaybackID, streamName, mediaURL) + return 2, nil + } + if rt.useHTTP { sterr := rt.doOneHTTPStream(fileName, streamName, broadcasters[0], testDuration, stream) if sterr != nil { @@ -280,51 +321,41 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. return 0, err } - sess = sessions[0] - statusShould := livepeer.RecordingStatusReady - if rt.useForceURL { - statusShould = livepeer.RecordingStatusWaiting - } - if sess.RecordingStatus != statusShould { - err := fmt.Errorf("recording status is %s but should be %s", sess.RecordingStatus, statusShould) - return 240, err - // exit(250, fileName, *fileArg, err) - } - if sess.RecordingURL == "" { - err := fmt.Errorf("recording URL should appear by now") - return 249, err - // exit(249, fileName, *fileArg, err) - } - glog.Infof("recordingURL=%s downloading now", sess.RecordingURL) + if rt.testRecording { + sess = sessions[0] + statusShould := livepeer.RecordingStatusReady + if rt.useForceURL { + statusShould = livepeer.RecordingStatusWaiting + } + if sess.RecordingStatus != statusShould { + err := fmt.Errorf("recording status is %s but should be %s", sess.RecordingStatus, statusShould) + return 240, err + } + if sess.RecordingURL == "" { + err := fmt.Errorf("recording URL should appear by now") + return 249, err + } + glog.Infof("recordingURL=%s downloading now", sess.RecordingURL) + if err = rt.isCancelled(); err != nil { + return 0, err + } + if rt.mp4 { + es, err := rt.checkDownMp4(stream, sess.Mp4Url, testDuration, pauseDuration > 0) + if err != nil { + return es, err + } + } - // started := time.Now() - // downloader := testers.NewM3utester2(gctx, sess.RecordingURL, false, false, false, false, 5*time.Second, nil) - // <-downloader.Done() - // glog.Infof(`Pulling stopped after %s`, time.Since(started)) - // exit(55, fileName, *fileArg, err) - glog.Info("Done Record Test") + es, err := rt.checkDown(stream, sess.RecordingURL, testDuration, pauseDuration > 0) - // lapi.DeleteStream(stream.ID) - // exit(0, fileName, *fileArg, err) - if err = rt.isCancelled(); err != nil { - return 0, err - } - if rt.mp4 { - es, err := rt.checkDownMp4(stream, sess.Mp4Url, testDuration, pauseDuration > 0) if err != nil { return es, err } } - es, err := rt.checkDown(stream, sess.RecordingURL, testDuration, pauseDuration > 0) - if es == 0 { - rt.lapi.DeleteStream(stream.ID) - // exit(0, fileName, *fileArg, err) - } + glog.Info("Done Record Test") - // uploader := testers.NewRtmpStreamer(gctx, rtmpURL) - // uploader.StartUpload(fileName, rtmpURL, -1, 30*time.Second) - return es, err + return 0, err } func (rt *recordTester) getIngestInfo() (*api.Ingest, error) { @@ -389,6 +420,31 @@ func (rt *recordTester) isCancelled() error { return nil } +func (rt *recordTester) signJwt(stream *api.Stream) (string, error) { + expiration := time.Now().Add(time.Minute * 5).Unix() + unsignedToken := jwt.NewWithClaims(jwt.SigningMethodES256, jwt.MapClaims{ + "sub": stream.PlaybackID, + "pub": rt.publicKey, + "exp": expiration, + }) + + decodedPrivateKey, _ := base64.StdEncoding.DecodeString(rt.signingKey) + + pk, err := jwt.ParseECPrivateKeyFromPEM(decodedPrivateKey) + + if err != nil { + glog.Errorf("Unable to parse provided signing key for access control signingKey=%s", rt.signingKey) + } + + token, err := unsignedToken.SignedString(pk) + + if err != nil { + glog.Errorf("Unable to sign JWT with provided private key for access control signingKey=%s", rt.signingKey) + } + + return token, nil +} + func (rt *recordTester) checkDownMp4(stream *api.Stream, url string, streamDuration time.Duration, doubled bool) (int, error) { es := 0 started := time.Now()