diff --git a/cmd/relay/format.go b/cmd/relay/format.go new file mode 100644 index 0000000..cbcbd21 --- /dev/null +++ b/cmd/relay/format.go @@ -0,0 +1,97 @@ +package main + +import ( + "encoding/hex" + "fmt" + "net" + + "github.com/nareix/joy5/format" + "github.com/nareix/joy5/format/flv" + "github.com/nareix/joy5/format/flv/flvio" + "github.com/nareix/joy5/format/rtmp" +) + +var debugRtmpChunkData = false +var debugRtmpNetEvent = false +var debugRtmpStage = false +var debugRtmpOptsMap = map[string]*bool{ + "chunk": &debugRtmpChunkData, + "net": &debugRtmpNetEvent, + "stage": &debugRtmpStage, +} + +var debugFlvHeader = false +var debugFlvOptsMap = map[string]*bool{ + "filehdr": &debugFlvHeader, +} + +func handleRtmpClientFlags(c *rtmp.Client) { + if debugRtmpNetEvent { + c.LogEvent = func(c *rtmp.Conn, nc net.Conn, e int) { + es := rtmp.EventString[e] + fmt.Println("RtmpEvent", nc.LocalAddr(), nc.RemoteAddr(), es) + } + } +} + +func handleRtmpServerFlags(s *rtmp.Server) { + if debugRtmpNetEvent { + s.LogEvent = func(c *rtmp.Conn, nc net.Conn, e int) { + es := rtmp.EventString[e] + fmt.Println("RtmpEvent", nc.LocalAddr(), nc.RemoteAddr(), es) + } + } +} + +func handleRtmpConnFlags(c *rtmp.Conn) { + if debugRtmpChunkData { + c.LogChunkDataEvent = func(isRead bool, b []byte) { + dir := "" + if isRead { + dir = "<" + } else { + dir = ">" + } + fmt.Println(dir, len(b)) + fmt.Print(hex.Dump(b)) + } + } + if debugRtmpStage { + c.LogStageEvent = func(e string, url string) { + fmt.Println("RtmpStage", e, url) + } + } +} + +func handleFlvDemuxerFlags(r *flv.Demuxer) { + if debugFlvHeader { + r.LogHeaderEvent = func(flags uint8) { + avflags := "" + if flags&flvio.FILE_HAS_AUDIO != 0 { + avflags += "A" + } + if flags&flvio.FILE_HAS_VIDEO != 0 { + avflags += "V" + } + fmt.Println("FLVHeader", "AVFlags", avflags) + } + } +} + +func newFormatOpener() *format.URLOpener { + fo := &format.URLOpener{ + OnNewFlvDemuxer: func(r *flv.Demuxer) { + handleFlvDemuxerFlags(r) + }, + OnNewRtmpConn: func(c *rtmp.Conn) { + handleRtmpConnFlags(c) + }, + OnNewRtmpServer: func(s *rtmp.Server) { + handleRtmpServerFlags(s) + }, + OnNewRtmpClient: func(c *rtmp.Client) { + handleRtmpClientFlags(c) + }, + } + return fo +} diff --git a/cmd/relay/main.go b/cmd/relay/main.go new file mode 100644 index 0000000..f1434f7 --- /dev/null +++ b/cmd/relay/main.go @@ -0,0 +1,39 @@ +package main + +import ( + "net/http" + "os" + "os/signal" + "syscall" + + "github.com/nareix/joy5/cmd/relay/handlers" + "gopkg.in/yaml.v3" +) + +var ( + config appconfig +) + +func readConfigs() { + yamlFile, err := os.ReadFile("config.yaml") + if err != nil { + panic(err) + } + err = yaml.Unmarshal(yamlFile, &config) + if err != nil { + panic(err) + } +} + +func main() { + readConfigs() + svc, _ := doPubsubRtmp(":1935") + + h := handlers.SetHttpHandlers(svc) + go http.ListenAndServe(":8181", h) + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + <-sigs + svc.Stop() +} diff --git a/cmd/relay/models.go b/cmd/relay/models.go new file mode 100644 index 0000000..ac5e234 --- /dev/null +++ b/cmd/relay/models.go @@ -0,0 +1,17 @@ +package main + +type appconfig struct { + Accounts map[string]Restream `yaml:"accounts"` +} + +type Endpoint struct { + Name string `json:"name"` + URL string `json:"url"` + Enabled bool `json:"enabled"` +} + +type Restream struct { + ID string `yaml:"id"` + Name string `yaml:"name"` + Endpoints map[string]*Endpoint `yaml:"endpoints"` +} diff --git a/cmd/relay/pubsub.go b/cmd/relay/pubsub.go new file mode 100644 index 0000000..efc150a --- /dev/null +++ b/cmd/relay/pubsub.go @@ -0,0 +1,425 @@ +package main + +import ( + "bytes" + "context" + "errors" + "log" + "net" + "strings" + "sync" + "sync/atomic" + "time" + "unsafe" + + "github.com/nareix/joy5/av" + "github.com/nareix/joy5/format" + "github.com/nareix/joy5/format/rtmp" +) + +type gopCacheSnapshot struct { + pkts []av.Packet + idx int +} + +type gopCache struct { + pkts []av.Packet + idx int + curst unsafe.Pointer +} + +func (gc *gopCache) put(pkt av.Packet) { + if pkt.IsKeyFrame { + gc.pkts = []av.Packet{} + } + gc.pkts = append(gc.pkts, pkt) + gc.idx++ + st := &gopCacheSnapshot{ + pkts: gc.pkts, + idx: gc.idx, + } + atomic.StorePointer(&gc.curst, unsafe.Pointer(st)) +} + +func (gc *gopCache) curSnapshot() *gopCacheSnapshot { + return (*gopCacheSnapshot)(atomic.LoadPointer(&gc.curst)) +} + +type gopCacheReadCursor struct { + lastidx int +} + +func (rc *gopCacheReadCursor) advance(cur *gopCacheSnapshot) []av.Packet { + lastidx := rc.lastidx + rc.lastidx = cur.idx + if diff := cur.idx - lastidx; diff <= len(cur.pkts) { + return cur.pkts[len(cur.pkts)-diff:] + } else { + return cur.pkts + } +} + +type mergeSeqhdr struct { + cb func(av.Packet) + hdrpkt av.Packet +} + +func (m *mergeSeqhdr) do(pkt av.Packet) { + switch pkt.Type { + case av.H264DecoderConfig: + m.hdrpkt.VSeqHdr = append([]byte(nil), pkt.Data...) + case av.H264: + pkt.Metadata = m.hdrpkt.Metadata + if pkt.IsKeyFrame { + pkt.VSeqHdr = m.hdrpkt.VSeqHdr + } + m.cb(pkt) + case av.AACDecoderConfig: + m.hdrpkt.ASeqHdr = append([]byte(nil), pkt.Data...) + case av.AAC: + pkt.Metadata = m.hdrpkt.Metadata + pkt.ASeqHdr = m.hdrpkt.ASeqHdr + m.cb(pkt) + case av.Metadata: + m.hdrpkt.Metadata = pkt.Data + } +} + +type splitSeqhdr struct { + cb func(av.Packet) error + hdrpkt av.Packet +} + +func (s *splitSeqhdr) sendmeta(pkt av.Packet) error { + if bytes.Compare(s.hdrpkt.Metadata, pkt.Metadata) != 0 { + if err := s.cb(av.Packet{ + Type: av.Metadata, + Data: pkt.Metadata, + }); err != nil { + return err + } + s.hdrpkt.Metadata = pkt.Metadata + } + return nil +} + +func (s *splitSeqhdr) do(pkt av.Packet) error { + switch pkt.Type { + case av.H264: + if err := s.sendmeta(pkt); err != nil { + return err + } + if pkt.IsKeyFrame { + if bytes.Compare(s.hdrpkt.VSeqHdr, pkt.VSeqHdr) != 0 { + if err := s.cb(av.Packet{ + Type: av.H264DecoderConfig, + Data: pkt.VSeqHdr, + }); err != nil { + return err + } + s.hdrpkt.VSeqHdr = pkt.VSeqHdr + } + } + return s.cb(pkt) + case av.AAC: + if err := s.sendmeta(pkt); err != nil { + return err + } + if bytes.Compare(s.hdrpkt.ASeqHdr, pkt.ASeqHdr) != 0 { + if err := s.cb(av.Packet{ + Type: av.AACDecoderConfig, + Data: pkt.ASeqHdr, + }); err != nil { + return err + } + s.hdrpkt.ASeqHdr = pkt.ASeqHdr + } + return s.cb(pkt) + } + return nil +} + +type streamSub struct { + notify chan struct{} + stop chan struct{} // deactivate the sub +} + +type streamPub struct { + cancel func() + gc *gopCache +} + +type stream struct { + n int64 // number of subs + pub + sub sync.Map // subscribers + pub unsafe.Pointer // +} + +func (s *stream) curGopCacheSnapshot() *gopCacheSnapshot { + sp := (*streamPub)(atomic.LoadPointer(&s.pub)) + if sp == nil { + return nil + } + return sp.gc.curSnapshot() +} + +func (s *stream) notifySub() { + s.sub.Range(func(key, value interface{}) bool { + ss := value.(*streamSub) + select { + case ss.notify <- struct{}{}: + default: + } + return true + }) +} + +func (s *stream) setPub(r av.PacketReader) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sp := &streamPub{ + cancel: cancel, + gc: &gopCache{}, + } + + oldsp := (*streamPub)(atomic.SwapPointer(&s.pub, unsafe.Pointer(sp))) + if oldsp != nil { + oldsp.cancel() + } + + seqmerge := mergeSeqhdr{ + cb: func(pkt av.Packet) { + sp.gc.put(pkt) + s.notifySub() + }, + } + + for { + select { + case <-ctx.Done(): + return + default: + } + + pkt, err := r.ReadPacket() + if err != nil { + return + } + seqmerge.do(pkt) + } +} + +type streams struct { + l sync.RWMutex + m map[string]*stream +} + +func newStreams() *streams { + return &streams{ + m: map[string]*stream{}, + } +} + +func (ss *streams) get(k string) *stream { + ss.l.Lock() + defer ss.l.Unlock() + + return ss.m[k] +} + +func (ss *streams) add(k string) (*stream, func()) { + ss.l.Lock() + defer ss.l.Unlock() + + log.Println("stream", k, "add") + + s, ok := ss.m[k] + if !ok { + s = &stream{} + ss.m[k] = s + } + s.n++ + + return s, func() { + log.Println("stream", k, "remove") + + ss.l.Lock() + defer ss.l.Unlock() + + s.n-- + if s.n == 0 { + delete(ss.m, k) + } + } +} + +// add external sub, we are actively restreaming to +func (s *stream) addSubExt(ss *streamSub, key string, close <-chan bool, w av.PacketWriter) { + + var cursor *gopCacheReadCursor + var lastsp *streamPub + + seqsplit := splitSeqhdr{ + cb: func(pkt av.Packet) error { + return w.WritePacket(pkt) + }, + } + + for { + var pkts []av.Packet + + sp := (*streamPub)(atomic.LoadPointer(&s.pub)) + if sp != lastsp { + cursor = &gopCacheReadCursor{} + lastsp = sp + } + if sp != nil { + cur := sp.gc.curSnapshot() + if cur != nil { + pkts = cursor.advance(cur) + } + } + + if len(pkts) == 0 { + select { + case <-ss.stop: + log.Println("stop substream", key) + return + case <-close: + return + case <-ss.notify: + } + } else { + for _, pkt := range pkts { + if err := seqsplit.do(pkt); err != nil { + return + } + } + } + } +} + +func activateSubStream(s *stream, ep_id, ep_url string) { + log.Println("activateSub", ep_id, ep_url) + + ss := &streamSub{ + notify: make(chan struct{}, 1), + stop: make(chan struct{}, 1), + } + s.sub.Store(ep_id, ss) + defer s.sub.Delete(ep_id) + + fo := newFormatOpener() + var err error + var w *format.Writer + for { + select { + case <-ss.stop: + log.Println("stop substream creation key:", ep_id) + return + default: + } + + if w, err = fo.Create(ep_url); err != nil { + log.Println("DialFailed", err) + time.Sleep(5 * time.Second) + } else { + break + } + } + + nc2 := w.NetConn + defer nc2.Close() + log.Println("Dial outbound OK") + + log.Println("activate substream", ep_id, ep_url) + s.addSubExt(ss, ep_id, w.Rtmp.CloseNotify(), w) + log.Println("deactivate substream", ep_id, ep_url) +} + +type pubsubService struct { + streams *streams + lis net.Listener +} + +func (s *pubsubService) StopSubStream(key, subkey string) { + stream := s.streams.get(key) + if stream == nil { + log.Println("no active stream!") + return + } + stream.sub.Range(func(key, value interface{}) bool { + if key.(string) == subkey { + p := value.(*streamSub) + p.stop <- struct{}{} + } + return true + }) +} + +func (s *pubsubService) handleRtmpConn(c *rtmp.Conn, nc net.Conn) { + streamPublishPrefix := "/live/" + + if !strings.HasPrefix(c.URL.Path, streamPublishPrefix) { + return + } + pubkey := strings.TrimPrefix(c.URL.Path, streamPublishPrefix) + log.Println("[HandleConn] pubkey:", pubkey) + + stream, remove := s.streams.add(pubkey) + defer remove() + + if c.Publishing { + account := config.Accounts[pubkey] + + for epID, ep := range account.Endpoints { + if !ep.Enabled { + continue + } + // make local copy to avoid a race + epID, epURL := epID, ep.URL + go activateSubStream(stream, epID, epURL) + } + + stream.setPub(c) + } +} + +func doPubsubRtmp(listenAddr string) (svc *pubsubService, err error) { + svc = &pubsubService{} + svc.lis, err = net.Listen("tcp", listenAddr) + if err != nil { + return + } + svc.streams = newStreams() + + s := rtmp.NewServer() + + handleRtmpServerFlags(s) + s.LogEvent = func(c *rtmp.Conn, nc net.Conn, e int) { + es := rtmp.EventString[e] + log.Println(nc.LocalAddr(), nc.RemoteAddr(), es) + } + s.HandleConn = svc.handleRtmpConn + + go func() { + for { + nc, err := svc.lis.Accept() + if err != nil { + if errors.Is(err, net.ErrClosed) { + return + } + + time.Sleep(time.Second) + continue + } + go s.HandleNetConn(nc) + } + }() + return +} + +func (s *pubsubService) Stop() { + s.lis.Close() +} diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..3e0a29a --- /dev/null +++ b/config.yaml @@ -0,0 +1,13 @@ +accounts: + 145ddbd8-97fa-4e7f-acc7-c5211d6fcde0: + name: AwesomeTV LLC + endpoints: + 451e4604-246e-4790-9cf9-18312aaaefd0: + name: Telegram + url: rtmps://dc4-1.rtmp.t.me/s/2222222222:secretkey + enabled: true + 7da3748a-c649-4b42-93cc-5369d19508f4: + name: Facebook + url: + enabled: false + diff --git a/format/rtmp/client.go b/format/rtmp/client.go index 242c497..07f43a5 100644 --- a/format/rtmp/client.go +++ b/format/rtmp/client.go @@ -6,6 +6,7 @@ import ( "crypto/tls" "net" "net/url" + "os" "time" ) @@ -16,6 +17,10 @@ func (t *Client) FromNetConn(nc net.Conn, u *url.URL, flags int) (c *Conn, err e } c_ := NewConn(rw) c_.URL = u + // c_.LogChunkDataEvent = func(isRead bool, b []byte) { + // fmt.Println("LogChunkDataEvent >", len(b), isRead) + // fmt.Println(hex.EncodeToString(b)) + // } nc.SetDeadline(time.Now().Add(time.Second * 15)) if err = c_.Prepare(StageGotPublishOrPlayCommand, flags); err != nil { @@ -88,7 +93,14 @@ func (t *Client) Dial(url_ string, flags int) (c *Conn, nc net.Conn, err error) if nc_, err = t.doDial(host); err != nil { return } - nc_ = tls.Client(nc_, &tls.Config{InsecureSkipVerify: true}) + f, err := os.OpenFile("c:/users/user/keys", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600) + if err != nil { + panic(err) + } + nc_ = tls.Client(nc_, &tls.Config{ + InsecureSkipVerify: true, + KeyLogWriter: f, + }) } var c_ *Conn diff --git a/format/rtmp/cmd.go b/format/rtmp/cmd.go index fdba509..bf2a675 100644 --- a/format/rtmp/cmd.go +++ b/format/rtmp/cmd.go @@ -41,7 +41,7 @@ func (c *Conn) writeBasicConf() (err error) { if err = c.writeSetPeerBandwidth(2500000, 2); err != nil { return } - if err = c.setAndWriteChunkSize(65536); err != nil { + if err = c.setAndWriteChunkSize(4096 * 4); err != nil { return } return @@ -322,7 +322,7 @@ func (c *Conn) writeConnect(path string) (err error) { if err = c.writeCommand(3, 0, "connect", 1, flvio.AMFMap{ {K: "app", V: path}, - {K: "flashVer", V: "LNX 9,0,124,2"}, + {K: "flashVer", V: "FMLE/3.0 (compatible; FMSc/1.0)"}, {K: "tcUrl", V: getTcURL(c.URL)}, {K: "fpad", V: false}, {K: "capabilities", V: 15}, diff --git a/go.mod b/go.mod index a7328d8..043f676 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,10 @@ module github.com/nareix/joy5 go 1.13 require ( + github.com/go-chi/chi/v5 v5.2.1 + github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/spf13/cobra v0.0.4-0.20190109003409-7547e83b2d85 github.com/spf13/pflag v1.0.4-0.20181223182923-24fa6976df40 golang.org/x/net v0.0.0-20190522155817-f3200d17e092 + gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..02f97d9 --- /dev/null +++ b/go.sum @@ -0,0 +1,15 @@ +github.com/go-chi/chi/v5 v5.2.1 h1:KOIHODQj58PmL80G2Eak4WdvUzjSJSm0vG72crDCqb8= +github.com/go-chi/chi/v5 v5.2.1/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/spf13/cobra v0.0.4-0.20190109003409-7547e83b2d85 h1:RghwryY75x76zKqO9v7NF+9lcmfW1/RNZBfqK4LSCKE= +github.com/spf13/cobra v0.0.4-0.20190109003409-7547e83b2d85/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= +github.com/spf13/pflag v1.0.4-0.20181223182923-24fa6976df40 h1:2gwxRRQ5I+FcDbxGtkIC9kWD7EFBewHjQqD8rDQAVQA= +github.com/spf13/pflag v1.0.4-0.20181223182923-24fa6976df40/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=