diff --git a/apps/poa/poa_test.go b/apps/poa/poa_test.go index b86f9dbc..a1866080 100644 --- a/apps/poa/poa_test.go +++ b/apps/poa/poa_test.go @@ -126,7 +126,7 @@ func runNode(poaNode *Poa, mockP2P *p2p.MockP2p, wg *sync.WaitGroup) { cfg := config.InitDefaultCfg() land := tripod.NewLand() - land.SetTripods(poaNode.Tripod) + land.RegisterTripods(poaNode.Tripod) kvdb, err := kv.NewKvdb(&config.KVconf{ KvType: "bolt", diff --git a/common/p2p_topic.go b/common/p2p_topic.go index a66cd6c4..1a4aa0a0 100644 --- a/common/p2p_topic.go +++ b/common/p2p_topic.go @@ -1,9 +1,14 @@ package common const ( - StartBlockTopic = "start-block" - EndBlockTopic = "end-block" - FinalizeBlockTopic = "finalize-block" - UnpackedWritingTopic = "unpacked-writing" - UnpackedExtraWritingTopic = "unpacked-extra-writing" + StartBlockTopic = "start-block" + EndBlockTopic = "end-block" + FinalizeBlockTopic = "finalize-block" + UnpackedWritingTopic = "unpacked-writing" ) + +const TopicWritingTopicPrefix = "topic_writing_" + +func TopicWritingTopic(topic string) string { + return TopicWritingTopicPrefix + topic +} diff --git a/common/types.go b/common/types.go index cc70e451..18c524cc 100644 --- a/common/types.go +++ b/common/types.go @@ -40,6 +40,7 @@ type ( // WrCall from clients, it is an instance of an 'Writing'. WrCall struct { ChainID uint64 `json:"chain_id"` + Topic string `json:"topic,omitempty"` TripodName string `json:"tripod_name"` FuncName string `json:"func_name"` Params string `json:"params"` diff --git a/common/yerror/basic_error.go b/common/yerror/basic_error.go index 0a0a4784..fb7b217c 100644 --- a/common/yerror/basic_error.go +++ b/common/yerror/basic_error.go @@ -126,16 +126,16 @@ func WritingNotFound(name string) ErrWritingNotFound { return ErrWritingNotFound{WritingName: name} } -type ErrExtraWritingNotFound struct { - ExtraWritingName string +type ErrTopicWritingNotFound struct { + TopicWritingName string } -func ExtraWritingNotFound(name string) ErrExtraWritingNotFound { - return ErrExtraWritingNotFound{ExtraWritingName: name} +func TopicWritingNotFound(name string) ErrTopicWritingNotFound { + return ErrTopicWritingNotFound{TopicWritingName: name} } -func (e ErrExtraWritingNotFound) Error() string { - return errors.Errorf("ExtraWriting(%s) NOT Found", e.ExtraWritingName).Error() +func (e ErrTopicWritingNotFound) Error() string { + return errors.Errorf("TopicWriting(%s) NOT Found", e.TopicWritingName).Error() } type ErrReadingNotFound struct { diff --git a/core/kernel/handle_input.go b/core/kernel/handle_input.go index 817ac486..c4873820 100644 --- a/core/kernel/handle_input.go +++ b/core/kernel/handle_input.go @@ -37,24 +37,25 @@ func (k *Kernel) HandleWriting(signedWrCall *protocol.SignedWrCall) error { return nil } -func (k *Kernel) HandleExtraWriting(call *protocol.SignedWrCall) error { +func (k *Kernel) HandleTopicWriting(call *protocol.SignedWrCall) error { stxn, err := NewSignedTxn(call.Call, call.Pubkey, call.Address, call.Signature) if err != nil { return err } - exWrCall := call.Call - _, err = k.Land.GetExtraWriting(exWrCall.TripodName, exWrCall.FuncName) + tpWrCall := call.Call + _, err = k.Land.GetTopicWriting(tpWrCall.TripodName, tpWrCall.FuncName, tpWrCall.Topic) if err != nil { return err } - err = k.handleTxnLocally(stxn, common.UnpackedExtraWritingTopic) + p2pTopic := common.TopicWritingTopic(tpWrCall.Topic) + err = k.handleTxnLocally(stxn, p2pTopic) if err != nil { return err } go func() { - err = k.pubExtraWritings(FromArray(stxn)) + err = k.pubTopicWritings(p2pTopic, FromArray(stxn)) if err != nil { - logrus.Error("publish extra writings error: ", err) + logrus.Error("publish topic writings error: ", err) } }() return nil diff --git a/core/kernel/http.go b/core/kernel/http.go index e783eb17..103a1ada 100644 --- a/core/kernel/http.go +++ b/core/kernel/http.go @@ -21,9 +21,9 @@ func (k *Kernel) HandleHttp() { api.POST(RdCallType, func(c *gin.Context) { k.handleHttpRd(c) }) - // POST extra-writing call - api.POST(ExWrCallType, func(c *gin.Context) { - k.handleHttpExWr(c) + // POST topic-writing call + api.POST(TopicWrCallType, func(c *gin.Context) { + k.handleHttpTopicWr(c) }) api.GET("block", k.GetBlock) @@ -58,14 +58,14 @@ func (k *Kernel) handleHttpWr(c *gin.Context) { } } -func (k *Kernel) handleHttpExWr(c *gin.Context) { - signedExWrCall, err := GetSignedWrCall(c) +func (k *Kernel) handleHttpTopicWr(c *gin.Context) { + signedTopicWrCall, err := GetSignedWrCall(c) if err != nil { c.AbortWithError(http.StatusBadRequest, err) return } - err = k.HandleExtraWriting(signedExWrCall) + err = k.HandleTopicWriting(signedTopicWrCall) if err != nil { c.AbortWithError(http.StatusBadRequest, err) } diff --git a/core/kernel/kernel.go b/core/kernel/kernel.go index 8af78ba8..d8141eb0 100644 --- a/core/kernel/kernel.go +++ b/core/kernel/kernel.go @@ -6,6 +6,7 @@ import ( "github.com/sirupsen/logrus" "github.com/yu-org/yu/common" + "github.com/yu-org/yu/common/yerror" "github.com/yu-org/yu/config" "github.com/yu-org/yu/core/env" "github.com/yu-org/yu/core/tripod" @@ -129,28 +130,34 @@ func (k *Kernel) AcceptUnpackedTxns() error { } } - extraWritings, err := k.subExtraWritings() + topicTxnMap, err := k.subTopicWritings() if err != nil { return err } - for _, txn := range extraWritings { - if k.CheckReplayAttack(txn) { - continue - } - txn.FromP2P = true - - logrus.WithField("p2p", "accept-extra-writing"). - Tracef("txn(%s) from network, content: %v", txn.TxnHash.String(), txn.Raw.WrCall) - - err = k.Pool.CheckTxn(txn) - if err != nil { - logrus.Error("check extra writing from P2P into txpool error: ", err) - continue - } - err = k.Pool.Insert(txn) - if err != nil { - logrus.Error("insert extra writing from P2P into txpool error: ", err) + for topic, txns := range topicTxnMap { + for _, txn := range txns { + if txn == nil { + continue + } + if k.CheckReplayAttack(txn) { + continue + } + txn.FromP2P = true + + logrus.WithField("p2p", "accept-topic-writing"). + WithField("topic", topic). + Tracef("txn(%s) from network, content: %v", txn.TxnHash.String(), txn.Raw.WrCall) + + err = k.Pool.CheckTxn(txn) + if err != nil { + logrus.WithError(err).WithField("topic", topic).Error("check topic writing from P2P into txpool error") + continue + } + err = k.Pool.InsertWithTopic(topic, txn) + if err != nil { + logrus.WithError(err).WithField("topic", topic).Error("insert topic writing from P2P into txpool error") + } } } @@ -173,20 +180,41 @@ func (k *Kernel) pubUnpackedWritings(txns types.SignedTxns) error { return k.P2pNetwork.PubP2P(common.UnpackedWritingTopic, byt) } -func (k *Kernel) subExtraWritings() (types.SignedTxns, error) { - byt, err := k.P2pNetwork.SubP2P(common.UnpackedExtraWritingTopic) - if err != nil { - return nil, err +func (k *Kernel) subTopicWritings() (map[string]types.SignedTxns, error) { + if k.Land == nil { + return nil, nil } - return types.DecodeSignedTxns(byt) + topicTxns := make(map[string]types.SignedTxns) + for _, topicTripod := range k.Land.OrderedTopicTripods() { + if topicTripod.Topic == "" { + continue + } + p2pTopic := common.TopicWritingTopic(topicTripod.Topic) + byt, err := k.P2pNetwork.SubP2P(p2pTopic) + if err != nil { + if err == yerror.NoP2PTopic { + continue + } + return nil, err + } + txns, err := types.DecodeSignedTxns(byt) + if err != nil { + return nil, err + } + if len(txns) == 0 { + continue + } + topicTxns[p2pTopic] = txns + } + return topicTxns, nil } -func (k *Kernel) pubExtraWritings(txns types.SignedTxns) error { +func (k *Kernel) pubTopicWritings(topic string, txns types.SignedTxns) error { byt, err := txns.Encode() if err != nil { return err } - return k.P2pNetwork.PubP2P(common.UnpackedExtraWritingTopic, byt) + return k.P2pNetwork.PubP2P(topic, byt) } func (k *Kernel) GetTripodInstance(name string) any { @@ -214,7 +242,7 @@ func (k *Kernel) WithBronzes(bronzeInstances ...any) *Kernel { t.SetInstance(bronzeInstances[i]) } - k.Land.SetBronzes(bronzes...) + k.Land.RegisterBronzes(bronzes...) for _, bronzeInstance := range bronzeInstances { err := tripod.InjectToBronze(k.Land, bronzeInstance) @@ -237,7 +265,7 @@ func (k *Kernel) WithTripods(tripodInstances ...any) *Kernel { t.SetInstance(tripodInstances[i]) } - k.Land.SetTripods(tripods...) + k.Land.RegisterTripods(tripods...) for _, tri := range tripods { k.Pool.WithTripodCheck(tri.Name(), tri.TxnChecker) diff --git a/core/kernel/websocket.go b/core/kernel/websocket.go index 13092641..f53f9b4e 100644 --- a/core/kernel/websocket.go +++ b/core/kernel/websocket.go @@ -2,11 +2,12 @@ package kernel import ( "fmt" + "net/http" + "github.com/gin-gonic/gin" "github.com/gorilla/websocket" "github.com/sirupsen/logrus" . "github.com/yu-org/yu/core/protocol" - "net/http" ) func (k *Kernel) HandleWS() { @@ -18,9 +19,9 @@ func (k *Kernel) HandleWS() { r.GET(RdApiPath, func(ctx *gin.Context) { k.handleWS(ctx, reading) }) - // POST extra-writing call - r.POST(ExWrApiPath, func(ctx *gin.Context) { - k.handleWS(ctx, extraWriting) + // POST topic-writing call + r.POST(TopicWrApiPath, func(ctx *gin.Context) { + k.handleWS(ctx, topicWriting) }) r.GET(SubResultsPath, func(ctx *gin.Context) { @@ -35,7 +36,7 @@ func (k *Kernel) HandleWS() { const ( reading = iota writing - extraWriting + topicWriting subscription ) @@ -62,8 +63,8 @@ func (k *Kernel) handleWS(ctx *gin.Context, typ int) { k.handleWsWr(ctx, string(params)) //case reading: // k.handleWsRd(c, req, string(params)) - case extraWriting: - k.handleWsExWr(ctx, string(params)) + case topicWriting: + k.handleWsTopicWr(ctx, string(params)) } } @@ -81,14 +82,14 @@ func (k *Kernel) handleWsWr(ctx *gin.Context, params string) { } } -func (k *Kernel) handleWsExWr(ctx *gin.Context, params string) { - signedExWrCall, err := GetSignedWrCall(ctx) +func (k *Kernel) handleWsTopicWr(ctx *gin.Context, params string) { + signedTopicWrCall, err := GetSignedWrCall(ctx) if err != nil { ctx.AbortWithError(http.StatusBadRequest, err) return } - err = k.HandleExtraWriting(signedExWrCall) + err = k.HandleTopicWriting(signedTopicWrCall) if err != nil { ctx.AbortWithError(http.StatusInternalServerError, err) } diff --git a/core/protocol/url.go b/core/protocol/url.go index fdaceeb0..beae7c76 100644 --- a/core/protocol/url.go +++ b/core/protocol/url.go @@ -1,10 +1,11 @@ package protocol import ( + "path/filepath" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/gin-gonic/gin" . "github.com/yu-org/yu/common" - "path/filepath" ) // A complete writing-call url is POST /api/writing @@ -13,11 +14,11 @@ import ( const ( // RootApiPath For developers, every customized Writing and Read of tripods // will base on '/api'. - RootApiPath = "/api" - WrCallType = "writing" - RdCallType = "reading" - ExWrCallType = "extra-writing" - AdminType = "admin" + RootApiPath = "/api" + WrCallType = "writing" + RdCallType = "reading" + TopicWrCallType = "topic-writing" + AdminType = "admin" TripodNameKey = "tripod_name" FuncNameKey = "func_name" @@ -26,7 +27,7 @@ const ( var ( WrApiPath = filepath.Join(RootApiPath, WrCallType) - ExWrApiPath = filepath.Join(RootApiPath, ExWrCallType) + TopicWrApiPath = filepath.Join(RootApiPath, TopicWrCallType) RdApiPath = filepath.Join(RootApiPath, RdCallType) AdminApiPath = filepath.Join(RootApiPath, AdminType) SubResultsPath = "/subscribe/results" diff --git a/core/tripod/dev/funcs.go b/core/tripod/dev/funcs.go index 22a98104..a8e2326e 100644 --- a/core/tripod/dev/funcs.go +++ b/core/tripod/dev/funcs.go @@ -12,8 +12,9 @@ type ( // This operation has no consensus reached in the blockchain network. // respObj is a json object Reading func(ctx *ReadContext) - // ExtraWriting will not be executed in the Executor. It should be invoked explicitly. - ExtraWriting func(ctx *WriteContext) error + // TopicWriting will not be executed in the Executor. It should be invoked explicitly, for example, in the BlockCycle. + // topic is the topic name, it means this writing will be inserted in the topic txpool. + TopicWriting func(topic string, ctx *WriteContext) error // P2pHandler is a p2p server handler. You can define the services in P2P server. // Just like TCP handler. P2pHandler func([]byte) ([]byte, error) diff --git a/core/tripod/inject_test.go b/core/tripod/inject_test.go index 1ae5dddd..9adc4217 100644 --- a/core/tripod/inject_test.go +++ b/core/tripod/inject_test.go @@ -62,7 +62,7 @@ func TestInject(t *testing.T) { boyi.SetLand(land) boyi.SetInstance(boyi) - land.SetTripods(testTri.Tripod, dayu.Tripod, boyi.Tripod) + land.RegisterTripods(testTri.Tripod, dayu.Tripod, boyi.Tripod) err := InjectToTripod(testTri) assert.NoError(t, err) diff --git a/core/tripod/land.go b/core/tripod/land.go index 014b6822..542b8da9 100644 --- a/core/tripod/land.go +++ b/core/tripod/land.go @@ -1,6 +1,9 @@ package tripod import ( + "fmt" + "strings" + . "github.com/yu-org/yu/common/yerror" . "github.com/yu-org/yu/core/tripod/dev" ) @@ -10,30 +13,67 @@ type Land struct { // Key: the Name of Tripod tripodsMap map[string]*Tripod + // Key: topic::tripodName + topicTripods map[string]*Tripod + orderedTopicTripods []string + bronzes map[string]*Bronze } func NewLand() *Land { return &Land{ - tripodsMap: make(map[string]*Tripod), - orderedTripods: make([]*Tripod, 0), - bronzes: make(map[string]*Bronze), + tripodsMap: make(map[string]*Tripod), + orderedTripods: make([]*Tripod, 0), + topicTripods: make(map[string]*Tripod), + orderedTopicTripods: make([]string, 0), + bronzes: make(map[string]*Bronze), } } -func (l *Land) SetBronzes(bronzes ...*Bronze) { +func (l *Land) RegisterBronzes(bronzes ...*Bronze) { for _, bronze := range bronzes { l.bronzes[bronze.Name()] = bronze } } -func (l *Land) SetTripods(tripods ...*Tripod) { +func (l *Land) RegisterTripods(tripods ...*Tripod) { for _, tri := range tripods { triName := tri.Name() l.tripodsMap[triName] = tri l.orderedTripods = append(l.orderedTripods, tri) + + for topic := range tri.topicWritings { + l.registerTopicTripod(topic, tri) + } + } +} + +const topicTripodKeySep = "::" + +func makeTopicTripodKey(topic, tripodName string) string { + return fmt.Sprintf("%s%s%s", topic, topicTripodKeySep, tripodName) +} + +func (l *Land) registerTopicTripod(topic string, tri *Tripod) { + key := makeTopicTripodKey(topic, tri.Name()) + if _, exists := l.topicTripods[key]; !exists { + l.orderedTopicTripods = append(l.orderedTopicTripods, key) } + l.topicTripods[key] = tri +} + +func splitTopicTripodKey(key string) (topic, tripodName string) { + parts := strings.SplitN(key, topicTripodKeySep, 2) + if len(parts) != 2 { + return key, "" + } + return parts[0], parts[1] +} + +type TopicTripod struct { + Topic string + TripodName string } func (l *Land) GetTripodInstance(name string) interface{} { @@ -59,15 +99,20 @@ func (l *Land) GetWriting(tripodName, wrName string) (Writing, error) { return fn, nil } -func (l *Land) GetExtraWriting(tripodName, ewName string) (ExtraWriting, error) { - tripod, ok := l.tripodsMap[tripodName] +func (l *Land) GetTopicWriting(tripodName, ewName, topic string) (TopicWriting, error) { + key := makeTopicTripodKey(topic, tripodName) + tripod, ok := l.topicTripods[key] if !ok { - return nil, TripodNotFound(tripodName) + if _, exist := l.tripodsMap[tripodName]; !exist { + return nil, TripodNotFound(tripodName) + } + return nil, TopicWritingNotFound(topic) } - ew := tripod.GetExtraWriting(ewName) + ew := tripod.GetTopicWriting(topic) if ew == nil { - return nil, ExtraWritingNotFound(ewName) + return nil, TopicWritingNotFound(topic) } + _ = ewName return ew, nil } @@ -102,3 +147,37 @@ func (l *Land) RangeList(fn func(*Tripod) error) error { } return nil } + +func (l *Land) TopicTripods() map[string]*Tripod { + result := make(map[string]*Tripod, len(l.topicTripods)) + for key, tri := range l.topicTripods { + result[key] = tri + } + return result +} + +func (l *Land) OrderedTopicTripods() []TopicTripod { + result := make([]TopicTripod, 0, len(l.orderedTopicTripods)) + for _, key := range l.orderedTopicTripods { + topic, tripodName := splitTopicTripodKey(key) + result = append(result, TopicTripod{ + Topic: topic, + TripodName: tripodName, + }) + } + return result +} + +func (l *Land) TopicNames() []string { + names := make([]string, 0, len(l.orderedTopicTripods)) + seen := make(map[string]struct{}) + for _, key := range l.orderedTopicTripods { + topic, _ := splitTopicTripodKey(key) + if _, exists := seen[topic]; exists { + continue + } + seen[topic] = struct{}{} + names = append(names, topic) + } + return names +} diff --git a/core/tripod/land_grpc.go b/core/tripod/land_grpc.go index 3a134afe..a3937105 100644 --- a/core/tripod/land_grpc.go +++ b/core/tripod/land_grpc.go @@ -8,7 +8,7 @@ func NewGrpcLand(land *Land) *GrpcLand { return &GrpcLand{land} } -//func (g *GrpcLand) SetTripods(_ context.Context, info *goproto.TripodsInfo) (*emptypb.Empty, error) { +//func (g *GrpcLand) RegisterTripods(_ context.Context, info *goproto.TripodsInfo) (*emptypb.Empty, error) { // tripods := make([]*Tripod, 0) // for _, triInfo := range info.Tripods { // tripod := NewTripodWithName(triInfo.Name) @@ -24,6 +24,6 @@ func NewGrpcLand(land *Land) *GrpcLand { // // tripods = append(tripods) // } -// g.land.SetTripods(tripods...) +// g.land.RegisterTripods(tripods...) // return nil, nil //} diff --git a/core/tripod/tripod.go b/core/tripod/tripod.go index f77a1cf4..28de2ebd 100644 --- a/core/tripod/tripod.go +++ b/core/tripod/tripod.go @@ -1,16 +1,17 @@ package tripod import ( + "path/filepath" + "reflect" + "runtime" + "strings" + "github.com/sirupsen/logrus" "github.com/yu-org/yu/common" "github.com/yu-org/yu/core/context" "github.com/yu-org/yu/core/env" "github.com/yu-org/yu/core/tripod/dev" "github.com/yu-org/yu/core/types" - "path/filepath" - "reflect" - "runtime" - "strings" ) type Tripod struct { @@ -34,8 +35,8 @@ type Tripod struct { writings map[string]dev.Writing // Key: Reading Name readings map[string]dev.Reading - // Key: ExtraWriting Name - extraWritings map[string]dev.ExtraWriting + // Key: TopicWriting Topic + topicWritings map[string]dev.TopicWriting // key: p2p-handler type code P2pHandlers map[int]dev.P2pHandler } @@ -49,7 +50,7 @@ func NewTripodWithName(name string) *Tripod { name: name, writings: make(map[string]dev.Writing), readings: make(map[string]dev.Reading), - extraWritings: make(map[string]dev.ExtraWriting), + topicWritings: make(map[string]dev.TopicWriting), P2pHandlers: make(map[int]dev.P2pHandler), BlockVerifier: new(DefaultBlockVerifier), @@ -96,8 +97,8 @@ func (t *Tripod) SetInstance(tripodInstance any) { logrus.Infof("register Reading (%s) into Tripod(%s) \n", name, t.name) } - for name, _ := range t.extraWritings { - logrus.Infof("register ExtraWriting (%s) into Tripod(%s) \n", name, t.name) + for topic := range t.topicWritings { + logrus.Infof("register TopicWriting (%s) into Tripod(%s) \n", topic, t.name) } t.Instance = tripodInstance @@ -159,11 +160,10 @@ func (t *Tripod) SetWritings(wrs ...dev.Writing) { } } -func (t *Tripod) SetExtraWritings(extraWritings ...dev.ExtraWriting) { - for _, ew := range extraWritings { - name := getFuncName(ew) - t.extraWritings[name] = ew - } +func (t *Tripod) SetTopicWriting(topic string, topicWriting dev.TopicWriting) *Tripod { + t.topicWritings[topic] = topicWriting + t.registerTopicP2P(topic) + return t } func (t *Tripod) SetReadings(readings ...dev.Reading) { @@ -192,8 +192,8 @@ func (t *Tripod) ExistWriting(name string) bool { return ok } -func (t *Tripod) ExistExtraWriting(name string) bool { - _, ok := t.extraWritings[name] +func (t *Tripod) ExistTopicWriting(topic string) bool { + _, ok := t.topicWritings[topic] return ok } @@ -205,12 +205,12 @@ func (t *Tripod) GetWritingFromLand(tripodName, funcName string) (dev.Writing, e return t.Land.GetWriting(tripodName, funcName) } -func (t *Tripod) GetExtraWriting(name string) dev.ExtraWriting { - return t.extraWritings[name] +func (t *Tripod) GetTopicWriting(topic string) dev.TopicWriting { + return t.topicWritings[topic] } -func (t *Tripod) GetExtraWritingFromLand(tripodName, funcName string) (dev.ExtraWriting, error) { - return t.Land.GetExtraWriting(tripodName, funcName) +func (t *Tripod) GetTopicWritingFromLand(tripodName, funcName, topic string) (dev.TopicWriting, error) { + return t.Land.GetTopicWriting(tripodName, funcName, topic) } func (t *Tripod) GetReading(name string) dev.Reading { @@ -237,9 +237,9 @@ func (t *Tripod) AllWritingNames() []string { return allNames } -func (t *Tripod) AllExtraWritingNames() []string { +func (t *Tripod) AllTopicWritingNames() []string { allNames := make([]string, 0) - for name, _ := range t.extraWritings { + for name, _ := range t.topicWritings { allNames = append(allNames, name) } return allNames @@ -296,3 +296,11 @@ func (t *Tripod) HandleReceipt(ctx *context.WriteContext, receipt *types.Receipt t.Sub.Emit(receipt) } } + +func (t *Tripod) registerTopicP2P(topic string) { + if t.ChainEnv == nil || t.P2pNetwork == nil { + return + } + p2pTopic := common.TopicWritingTopic(topic) + t.P2pNetwork.AddTopic(p2pTopic) +} diff --git a/core/types/goproto/tripod.pb.go b/core/types/goproto/tripod.pb.go index 6620ec8b..5c6c9f26 100644 --- a/core/types/goproto/tripod.pb.go +++ b/core/types/goproto/tripod.pb.go @@ -343,13 +343,13 @@ var file_tripod_proto_depIdxs = []int32{ 3, // 5: Tripod.StartBlock:input_type -> TripodBlockRequest 3, // 6: Tripod.EndBlock:input_type -> TripodBlockRequest 3, // 7: Tripod.FinalizeBlock:input_type -> TripodBlockRequest - 0, // 8: Land.SetTripods:input_type -> TripodsInfo + 0, // 8: Land.RegisterTripods:input_type -> TripodsInfo 6, // 9: Tripod.CheckTxn:output_type -> Err 7, // 10: Tripod.VerifyBlock:output_type -> Bool 6, // 11: Tripod.StartBlock:output_type -> Err 6, // 12: Tripod.EndBlock:output_type -> Err 6, // 13: Tripod.FinalizeBlock:output_type -> Err - 8, // 14: Land.SetTripods:output_type -> google.protobuf.Empty + 8, // 14: Land.RegisterTripods:output_type -> google.protobuf.Empty 9, // [9:15] is the sub-list for method output_type 3, // [3:9] is the sub-list for method input_type 3, // [3:3] is the sub-list for extension type_name diff --git a/infra/p2p/interface.go b/infra/p2p/interface.go index 0688c437..33e470a9 100644 --- a/infra/p2p/interface.go +++ b/infra/p2p/interface.go @@ -13,6 +13,7 @@ type P2pNetwork interface { ConnectBootNodes() error AddTopic(topicName string) + HasTopic(topicName string) bool SetHandlers(handlers map[int]dev.P2pHandler) RequestPeer(peerID peer.ID, code int, request []byte) (response []byte, err error) diff --git a/infra/p2p/mock.go b/infra/p2p/mock.go index dd562484..096766e5 100644 --- a/infra/p2p/mock.go +++ b/infra/p2p/mock.go @@ -34,6 +34,11 @@ func (m *MockP2p) AddTopic(topicName string) { m.topicChan[topicName] = make(chan []byte, m.nodesNum) } +func (m *MockP2p) HasTopic(topicName string) bool { + _, ok := m.topicChan[topicName] + return ok +} + func (m *MockP2p) SetHandlers(handlers map[int]dev.P2pHandler) {} func (m *MockP2p) RequestPeer(peerID peer.ID, code int, request []byte) (response []byte, err error) { diff --git a/infra/p2p/p2p.go b/infra/p2p/p2p.go index fc653d8d..bb46766a 100644 --- a/infra/p2p/p2p.go +++ b/infra/p2p/p2p.go @@ -5,6 +5,12 @@ import ( "bytes" "context" "fmt" + "io" + "math/rand" + "os" + "strconv" + "sync" + "github.com/libp2p/go-libp2p" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/crypto" @@ -18,10 +24,6 @@ import ( "github.com/yu-org/yu/common/yerror" "github.com/yu-org/yu/config" "github.com/yu-org/yu/core/tripod/dev" - "io" - "math/rand" - "os" - "strconv" ) const ( @@ -34,6 +36,10 @@ type LibP2P struct { bootNodes []*peerstore.AddrInfo pid protocol.ID ps *pubsub.PubSub + + topicMu sync.RWMutex + registeredTopics map[string]*pubsub.Topic + subscriptions map[string]*pubsub.Subscription } func NewP2P(cfg *config.P2pConf) P2pNetwork { @@ -59,10 +65,12 @@ func NewP2P(cfg *config.P2pConf) P2pNetwork { } p := &LibP2P{ - host: p2pHost, - bootNodes: bootNodes, - pid: protocol.ID(cfg.ProtocolID), - ps: ps, + host: p2pHost, + bootNodes: bootNodes, + pid: protocol.ID(cfg.ProtocolID), + ps: ps, + registeredTopics: make(map[string]*pubsub.Topic), + subscriptions: make(map[string]*pubsub.Subscription), } p.AddDefaultTopics() return p @@ -123,7 +131,9 @@ func (p *LibP2P) RequestPeer(peerID peerstore.ID, code int, request []byte) ([]b } func (p *LibP2P) PubP2P(topic string, msg []byte) error { - t, ok := TopicsMap[topic] + p.topicMu.RLock() + t, ok := p.registeredTopics[topic] + p.topicMu.RUnlock() if !ok { return yerror.NoP2PTopic } @@ -131,7 +141,9 @@ func (p *LibP2P) PubP2P(topic string, msg []byte) error { } func (p *LibP2P) SubP2P(topic string) ([]byte, error) { - sub, ok := SubsMap[topic] + p.topicMu.RLock() + sub, ok := p.subscriptions[topic] + p.topicMu.RUnlock() if !ok { return nil, yerror.NoP2PTopic } diff --git a/infra/p2p/topics.go b/infra/p2p/topics.go index 41454e6f..c46ce3bf 100644 --- a/infra/p2p/topics.go +++ b/infra/p2p/topics.go @@ -1,15 +1,9 @@ package p2p import ( - pubsub "github.com/libp2p/go-libp2p-pubsub" . "github.com/yu-org/yu/common" ) -var ( - TopicsMap = make(map[string]*pubsub.Topic, 0) - SubsMap = make(map[string]*pubsub.Subscription, 0) -) - func (p *LibP2P) AddDefaultTopics() { p.AddTopic(StartBlockTopic) p.AddTopic(EndBlockTopic) @@ -18,14 +12,31 @@ func (p *LibP2P) AddDefaultTopics() { } func (p *LibP2P) AddTopic(topicName string) { + p.topicMu.RLock() + if _, exists := p.registeredTopics[topicName]; exists { + p.topicMu.RUnlock() + return + } + p.topicMu.RUnlock() + topic, err := p.ps.Join(topicName) if err != nil { return } - TopicsMap[topicName] = topic sub, err := topic.Subscribe() if err != nil { return } - SubsMap[topicName] = sub + + p.topicMu.Lock() + p.registeredTopics[topicName] = topic + p.subscriptions[topicName] = sub + p.topicMu.Unlock() +} + +func (p *LibP2P) HasTopic(topicName string) bool { + p.topicMu.RLock() + defer p.topicMu.RUnlock() + _, ok := p.registeredTopics[topicName] + return ok }