Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/poa/poa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func runNode(poaNode *Poa, mockP2P *p2p.MockP2p, wg *sync.WaitGroup) {
cfg := config.InitDefaultCfg()

land := tripod.NewLand()
land.SetTripods(poaNode.Tripod)
land.RegisterTripods(poaNode.Tripod)

kvdb, err := kv.NewKvdb(&config.KVconf{
KvType: "bolt",
Expand Down
15 changes: 10 additions & 5 deletions common/p2p_topic.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package common

const (
StartBlockTopic = "start-block"
EndBlockTopic = "end-block"
FinalizeBlockTopic = "finalize-block"
UnpackedWritingTopic = "unpacked-writing"
UnpackedExtraWritingTopic = "unpacked-extra-writing"
StartBlockTopic = "start-block"
EndBlockTopic = "end-block"
FinalizeBlockTopic = "finalize-block"
UnpackedWritingTopic = "unpacked-writing"
)

const TopicWritingTopicPrefix = "topic_writing_"

func TopicWritingTopic(topic string) string {
return TopicWritingTopicPrefix + topic
}
1 change: 1 addition & 0 deletions common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type (
// WrCall from clients, it is an instance of an 'Writing'.
WrCall struct {
ChainID uint64 `json:"chain_id"`
Topic string `json:"topic,omitempty"`
TripodName string `json:"tripod_name"`
FuncName string `json:"func_name"`
Params string `json:"params"`
Expand Down
12 changes: 6 additions & 6 deletions common/yerror/basic_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,16 @@ func WritingNotFound(name string) ErrWritingNotFound {
return ErrWritingNotFound{WritingName: name}
}

type ErrExtraWritingNotFound struct {
ExtraWritingName string
type ErrTopicWritingNotFound struct {
TopicWritingName string
}

func ExtraWritingNotFound(name string) ErrExtraWritingNotFound {
return ErrExtraWritingNotFound{ExtraWritingName: name}
func TopicWritingNotFound(name string) ErrTopicWritingNotFound {
return ErrTopicWritingNotFound{TopicWritingName: name}
}

func (e ErrExtraWritingNotFound) Error() string {
return errors.Errorf("ExtraWriting(%s) NOT Found", e.ExtraWritingName).Error()
func (e ErrTopicWritingNotFound) Error() string {
return errors.Errorf("TopicWriting(%s) NOT Found", e.TopicWritingName).Error()
}

type ErrReadingNotFound struct {
Expand Down
13 changes: 7 additions & 6 deletions core/kernel/handle_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,25 @@ func (k *Kernel) HandleWriting(signedWrCall *protocol.SignedWrCall) error {
return nil
}

func (k *Kernel) HandleExtraWriting(call *protocol.SignedWrCall) error {
func (k *Kernel) HandleTopicWriting(call *protocol.SignedWrCall) error {
stxn, err := NewSignedTxn(call.Call, call.Pubkey, call.Address, call.Signature)
if err != nil {
return err
}
exWrCall := call.Call
_, err = k.Land.GetExtraWriting(exWrCall.TripodName, exWrCall.FuncName)
tpWrCall := call.Call
_, err = k.Land.GetTopicWriting(tpWrCall.TripodName, tpWrCall.FuncName, tpWrCall.Topic)
if err != nil {
return err
}
err = k.handleTxnLocally(stxn, common.UnpackedExtraWritingTopic)
p2pTopic := common.TopicWritingTopic(tpWrCall.Topic)
err = k.handleTxnLocally(stxn, p2pTopic)
if err != nil {
return err
}
go func() {
err = k.pubExtraWritings(FromArray(stxn))
err = k.pubTopicWritings(p2pTopic, FromArray(stxn))
if err != nil {
logrus.Error("publish extra writings error: ", err)
logrus.Error("publish topic writings error: ", err)
}
}()
return nil
Expand Down
12 changes: 6 additions & 6 deletions core/kernel/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ func (k *Kernel) HandleHttp() {
api.POST(RdCallType, func(c *gin.Context) {
k.handleHttpRd(c)
})
// POST extra-writing call
api.POST(ExWrCallType, func(c *gin.Context) {
k.handleHttpExWr(c)
// POST topic-writing call
api.POST(TopicWrCallType, func(c *gin.Context) {
k.handleHttpTopicWr(c)
})

api.GET("block", k.GetBlock)
Expand Down Expand Up @@ -58,14 +58,14 @@ func (k *Kernel) handleHttpWr(c *gin.Context) {
}
}

func (k *Kernel) handleHttpExWr(c *gin.Context) {
signedExWrCall, err := GetSignedWrCall(c)
func (k *Kernel) handleHttpTopicWr(c *gin.Context) {
signedTopicWrCall, err := GetSignedWrCall(c)
if err != nil {
c.AbortWithError(http.StatusBadRequest, err)
return
}

err = k.HandleExtraWriting(signedExWrCall)
err = k.HandleTopicWriting(signedTopicWrCall)
if err != nil {
c.AbortWithError(http.StatusBadRequest, err)
}
Expand Down
82 changes: 55 additions & 27 deletions core/kernel/kernel.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/sirupsen/logrus"

"github.com/yu-org/yu/common"
"github.com/yu-org/yu/common/yerror"
"github.com/yu-org/yu/config"
"github.com/yu-org/yu/core/env"
"github.com/yu-org/yu/core/tripod"
Expand Down Expand Up @@ -129,28 +130,34 @@ func (k *Kernel) AcceptUnpackedTxns() error {
}
}

extraWritings, err := k.subExtraWritings()
topicTxnMap, err := k.subTopicWritings()
if err != nil {
return err
}

for _, txn := range extraWritings {
if k.CheckReplayAttack(txn) {
continue
}
txn.FromP2P = true

logrus.WithField("p2p", "accept-extra-writing").
Tracef("txn(%s) from network, content: %v", txn.TxnHash.String(), txn.Raw.WrCall)

err = k.Pool.CheckTxn(txn)
if err != nil {
logrus.Error("check extra writing from P2P into txpool error: ", err)
continue
}
err = k.Pool.Insert(txn)
if err != nil {
logrus.Error("insert extra writing from P2P into txpool error: ", err)
for topic, txns := range topicTxnMap {
for _, txn := range txns {
if txn == nil {
continue
}
if k.CheckReplayAttack(txn) {
continue
}
txn.FromP2P = true

logrus.WithField("p2p", "accept-topic-writing").
WithField("topic", topic).
Tracef("txn(%s) from network, content: %v", txn.TxnHash.String(), txn.Raw.WrCall)

err = k.Pool.CheckTxn(txn)
if err != nil {
logrus.WithError(err).WithField("topic", topic).Error("check topic writing from P2P into txpool error")
continue
}
err = k.Pool.InsertWithTopic(topic, txn)
if err != nil {
logrus.WithError(err).WithField("topic", topic).Error("insert topic writing from P2P into txpool error")
}
}
}

Expand All @@ -173,20 +180,41 @@ func (k *Kernel) pubUnpackedWritings(txns types.SignedTxns) error {
return k.P2pNetwork.PubP2P(common.UnpackedWritingTopic, byt)
}

func (k *Kernel) subExtraWritings() (types.SignedTxns, error) {
byt, err := k.P2pNetwork.SubP2P(common.UnpackedExtraWritingTopic)
if err != nil {
return nil, err
func (k *Kernel) subTopicWritings() (map[string]types.SignedTxns, error) {
if k.Land == nil {
return nil, nil
}
return types.DecodeSignedTxns(byt)
topicTxns := make(map[string]types.SignedTxns)
for _, topicTripod := range k.Land.OrderedTopicTripods() {
if topicTripod.Topic == "" {
continue
}
p2pTopic := common.TopicWritingTopic(topicTripod.Topic)
byt, err := k.P2pNetwork.SubP2P(p2pTopic)
if err != nil {
if err == yerror.NoP2PTopic {
continue
}
return nil, err
}
txns, err := types.DecodeSignedTxns(byt)
if err != nil {
return nil, err
}
if len(txns) == 0 {
continue
}
topicTxns[p2pTopic] = txns
}
return topicTxns, nil
}

func (k *Kernel) pubExtraWritings(txns types.SignedTxns) error {
func (k *Kernel) pubTopicWritings(topic string, txns types.SignedTxns) error {
byt, err := txns.Encode()
if err != nil {
return err
}
return k.P2pNetwork.PubP2P(common.UnpackedExtraWritingTopic, byt)
return k.P2pNetwork.PubP2P(topic, byt)
}

func (k *Kernel) GetTripodInstance(name string) any {
Expand Down Expand Up @@ -214,7 +242,7 @@ func (k *Kernel) WithBronzes(bronzeInstances ...any) *Kernel {
t.SetInstance(bronzeInstances[i])
}

k.Land.SetBronzes(bronzes...)
k.Land.RegisterBronzes(bronzes...)

for _, bronzeInstance := range bronzeInstances {
err := tripod.InjectToBronze(k.Land, bronzeInstance)
Expand All @@ -237,7 +265,7 @@ func (k *Kernel) WithTripods(tripodInstances ...any) *Kernel {
t.SetInstance(tripodInstances[i])
}

k.Land.SetTripods(tripods...)
k.Land.RegisterTripods(tripods...)

for _, tri := range tripods {
k.Pool.WithTripodCheck(tri.Name(), tri.TxnChecker)
Expand Down
21 changes: 11 additions & 10 deletions core/kernel/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package kernel

import (
"fmt"
"net/http"

"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
. "github.com/yu-org/yu/core/protocol"
"net/http"
)

func (k *Kernel) HandleWS() {
Expand All @@ -18,9 +19,9 @@ func (k *Kernel) HandleWS() {
r.GET(RdApiPath, func(ctx *gin.Context) {
k.handleWS(ctx, reading)
})
// POST extra-writing call
r.POST(ExWrApiPath, func(ctx *gin.Context) {
k.handleWS(ctx, extraWriting)
// POST topic-writing call
r.POST(TopicWrApiPath, func(ctx *gin.Context) {
k.handleWS(ctx, topicWriting)
})

r.GET(SubResultsPath, func(ctx *gin.Context) {
Expand All @@ -35,7 +36,7 @@ func (k *Kernel) HandleWS() {
const (
reading = iota
writing
extraWriting
topicWriting
subscription
)

Expand All @@ -62,8 +63,8 @@ func (k *Kernel) handleWS(ctx *gin.Context, typ int) {
k.handleWsWr(ctx, string(params))
//case reading:
// k.handleWsRd(c, req, string(params))
case extraWriting:
k.handleWsExWr(ctx, string(params))
case topicWriting:
k.handleWsTopicWr(ctx, string(params))
}

}
Expand All @@ -81,14 +82,14 @@ func (k *Kernel) handleWsWr(ctx *gin.Context, params string) {
}
}

func (k *Kernel) handleWsExWr(ctx *gin.Context, params string) {
signedExWrCall, err := GetSignedWrCall(ctx)
func (k *Kernel) handleWsTopicWr(ctx *gin.Context, params string) {
signedTopicWrCall, err := GetSignedWrCall(ctx)
if err != nil {
ctx.AbortWithError(http.StatusBadRequest, err)
return
}

err = k.HandleExtraWriting(signedExWrCall)
err = k.HandleTopicWriting(signedTopicWrCall)
if err != nil {
ctx.AbortWithError(http.StatusInternalServerError, err)
}
Expand Down
15 changes: 8 additions & 7 deletions core/protocol/url.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package protocol

import (
"path/filepath"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/gin-gonic/gin"
. "github.com/yu-org/yu/common"
"path/filepath"
)

// A complete writing-call url is POST /api/writing
Expand All @@ -13,11 +14,11 @@ import (
const (
// RootApiPath For developers, every customized Writing and Read of tripods
// will base on '/api'.
RootApiPath = "/api"
WrCallType = "writing"
RdCallType = "reading"
ExWrCallType = "extra-writing"
AdminType = "admin"
RootApiPath = "/api"
WrCallType = "writing"
RdCallType = "reading"
TopicWrCallType = "topic-writing"
AdminType = "admin"

TripodNameKey = "tripod_name"
FuncNameKey = "func_name"
Expand All @@ -26,7 +27,7 @@ const (

var (
WrApiPath = filepath.Join(RootApiPath, WrCallType)
ExWrApiPath = filepath.Join(RootApiPath, ExWrCallType)
TopicWrApiPath = filepath.Join(RootApiPath, TopicWrCallType)
RdApiPath = filepath.Join(RootApiPath, RdCallType)
AdminApiPath = filepath.Join(RootApiPath, AdminType)
SubResultsPath = "/subscribe/results"
Expand Down
5 changes: 3 additions & 2 deletions core/tripod/dev/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ type (
// This operation has no consensus reached in the blockchain network.
// respObj is a json object
Reading func(ctx *ReadContext)
// ExtraWriting will not be executed in the Executor. It should be invoked explicitly.
ExtraWriting func(ctx *WriteContext) error
// TopicWriting will not be executed in the Executor. It should be invoked explicitly, for example, in the BlockCycle.
// topic is the topic name, it means this writing will be inserted in the topic txpool.
TopicWriting func(topic string, ctx *WriteContext) error
// P2pHandler is a p2p server handler. You can define the services in P2P server.
// Just like TCP handler.
P2pHandler func([]byte) ([]byte, error)
Expand Down
2 changes: 1 addition & 1 deletion core/tripod/inject_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestInject(t *testing.T) {
boyi.SetLand(land)
boyi.SetInstance(boyi)

land.SetTripods(testTri.Tripod, dayu.Tripod, boyi.Tripod)
land.RegisterTripods(testTri.Tripod, dayu.Tripod, boyi.Tripod)

err := InjectToTripod(testTri)
assert.NoError(t, err)
Expand Down
Loading