diff --git a/apps/eth/ethrpc/api_backend.go b/apps/eth/ethrpc/api_backend.go index 9933681a..cc719566 100644 --- a/apps/eth/ethrpc/api_backend.go +++ b/apps/eth/ethrpc/api_backend.go @@ -383,7 +383,7 @@ func (e *EthAPIBackend) Call(ctx context.Context, args TransactionArgs, blockNrO rdCall.FuncName = "Call" rdCall.Params = string(requestByt) - response, err := e.chain.HandleRead(rdCall) + response, err := e.chain.HandleReading(rdCall) if err != nil { return nil, err } @@ -441,7 +441,7 @@ func (e *EthAPIBackend) SendTx(ctx context.Context, signedTx *ethtypes.Transacti }, } - return e.chain.HandleTxn(signedWrCall) + return e.chain.HandleWriting(signedWrCall) } func YuTxn2EthTxn(yuSignedTxn *yutypes.SignedTxn) (*ethtypes.Transaction, error) { @@ -790,7 +790,7 @@ func (e *EthAPIBackend) adaptChainRead(req any, funcName string) (*yucontext.Res Params: params, } - resp, err := e.chain.HandleRead(rdCall) + resp, err := e.chain.HandleReading(rdCall) if err != nil { logrus.Error(fmt.Errorf("EthAPIBackend %v meet err: %v, param:%v", funcName, err, params)) return nil, fmt.Errorf("EthAPIBackend %v meet err: %v", funcName, err) diff --git a/common/p2p_topic.go b/common/p2p_topic.go index 3e54bfc3..a66cd6c4 100644 --- a/common/p2p_topic.go +++ b/common/p2p_topic.go @@ -1,8 +1,9 @@ package common const ( - StartBlockTopic = "start-block" - EndBlockTopic = "end-block" - FinalizeBlockTopic = "finalize-block" - UnpackedTxnsTopic = "unpacked-txns" + StartBlockTopic = "start-block" + EndBlockTopic = "end-block" + FinalizeBlockTopic = "finalize-block" + UnpackedWritingTopic = "unpacked-writing" + UnpackedExtraWritingTopic = "unpacked-extra-writing" ) diff --git a/core/kernel/handle_input.go b/core/kernel/handle_input.go index d6ded86d..817ac486 100644 --- a/core/kernel/handle_input.go +++ b/core/kernel/handle_input.go @@ -11,9 +11,9 @@ import ( "github.com/yu-org/yu/metrics" ) -// HandleTxn handles txn from outside. -// You can also self-define your input by calling HandleTxn (not only by default http and ws) -func (k *Kernel) HandleTxn(signedWrCall *protocol.SignedWrCall) error { +// HandleWriting handles txn from outside. +// You can also self-define your input by calling HandleWriting (not only by default http and ws) +func (k *Kernel) HandleWriting(signedWrCall *protocol.SignedWrCall) error { stxn, err := NewSignedTxn(signedWrCall.Call, signedWrCall.Pubkey, signedWrCall.Address, signedWrCall.Signature) if err != nil { return err @@ -23,21 +23,44 @@ func (k *Kernel) HandleTxn(signedWrCall *protocol.SignedWrCall) error { if err != nil { return err } - err = k.handleTxnLocally(stxn) + err = k.handleTxnLocally(stxn, "") if err != nil { return err } go func() { - err = k.pubUnpackedTxns(FromArray(stxn)) + err = k.pubUnpackedWritings(FromArray(stxn)) if err != nil { - logrus.Error("publish unpacked txns error: ", err) + logrus.Error("publish unpacked writing error: ", err) } }() return nil } -func (k *Kernel) handleTxnLocally(stxn *SignedTxn) error { +func (k *Kernel) HandleExtraWriting(call *protocol.SignedWrCall) error { + stxn, err := NewSignedTxn(call.Call, call.Pubkey, call.Address, call.Signature) + if err != nil { + return err + } + exWrCall := call.Call + _, err = k.Land.GetExtraWriting(exWrCall.TripodName, exWrCall.FuncName) + if err != nil { + return err + } + err = k.handleTxnLocally(stxn, common.UnpackedExtraWritingTopic) + if err != nil { + return err + } + go func() { + err = k.pubExtraWritings(FromArray(stxn)) + if err != nil { + logrus.Error("publish extra writings error: ", err) + } + }() + return nil +} + +func (k *Kernel) handleTxnLocally(stxn *SignedTxn, topic string) error { metrics.KernelHandleTxnCounter.WithLabelValues().Inc() tri := k.Land.GetTripod(stxn.TripodName()) if tri != nil { @@ -53,10 +76,13 @@ func (k *Kernel) handleTxnLocally(stxn *SignedTxn) error { if err != nil { return err } - return k.Pool.Insert(stxn) + if topic == "" { + return k.Pool.Insert(stxn) + } + return k.Pool.InsertWithTopic(topic, stxn) } -func (k *Kernel) HandleRead(rdCall *common.RdCall) (*context.ResponseData, error) { +func (k *Kernel) HandleReading(rdCall *common.RdCall) (*context.ResponseData, error) { ctx, err := context.NewReadContext(rdCall) if err != nil { return nil, err diff --git a/core/kernel/http.go b/core/kernel/http.go index b56ec1b9..e783eb17 100644 --- a/core/kernel/http.go +++ b/core/kernel/http.go @@ -21,6 +21,10 @@ func (k *Kernel) HandleHttp() { api.POST(RdCallType, func(c *gin.Context) { k.handleHttpRd(c) }) + // POST extra-writing call + api.POST(ExWrCallType, func(c *gin.Context) { + k.handleHttpExWr(c) + }) api.GET("block", k.GetBlock) @@ -48,7 +52,20 @@ func (k *Kernel) handleHttpWr(c *gin.Context) { return } - err = k.HandleTxn(signedWrCall) + err = k.HandleWriting(signedWrCall) + if err != nil { + c.AbortWithError(http.StatusBadRequest, err) + } +} + +func (k *Kernel) handleHttpExWr(c *gin.Context) { + signedExWrCall, err := GetSignedWrCall(c) + if err != nil { + c.AbortWithError(http.StatusBadRequest, err) + return + } + + err = k.HandleExtraWriting(signedExWrCall) if err != nil { c.AbortWithError(http.StatusBadRequest, err) } @@ -61,7 +78,7 @@ func (k *Kernel) handleHttpRd(c *gin.Context) { return } - respData, err := k.HandleRead(rdCall) + respData, err := k.HandleReading(rdCall) if err != nil { c.AbortWithError(http.StatusBadRequest, err) return diff --git a/core/kernel/kernel.go b/core/kernel/kernel.go index b2d87562..8af78ba8 100644 --- a/core/kernel/kernel.go +++ b/core/kernel/kernel.go @@ -48,7 +48,7 @@ func NewKernel( wg: &sync.WaitGroup{}, } - env.Execute = k.OrderedExecute + env.Execute = k.SeqExecuteWritings // Configure the handlers in P2P network @@ -103,49 +103,90 @@ func (k *Kernel) InitBlockChain() { }) } -func (k *Kernel) AcceptUnpkgTxns() error { - txns, err := k.subUnpackedTxns() +func (k *Kernel) AcceptUnpackedTxns() error { + writings, err := k.subUnpackedWritings() if err != nil { return err } - for _, txn := range txns { + for _, txn := range writings { if k.CheckReplayAttack(txn) { continue } txn.FromP2P = true - logrus.WithField("p2p", "accept-txn"). + logrus.WithField("p2p", "accept-writing"). Tracef("txn(%s) from network, content: %v", txn.TxnHash.String(), txn.Raw.WrCall) err = k.Pool.CheckTxn(txn) if err != nil { - logrus.Error("check txn from P2P into txpool error: ", err) + logrus.Error("check writing from P2P into txpool error: ", err) continue } err = k.Pool.Insert(txn) if err != nil { - logrus.Error("insert txn from P2P into txpool error: ", err) + logrus.Error("insert writing from P2P into txpool error: ", err) + } + } + + extraWritings, err := k.subExtraWritings() + if err != nil { + return err + } + + for _, txn := range extraWritings { + if k.CheckReplayAttack(txn) { + continue + } + txn.FromP2P = true + + logrus.WithField("p2p", "accept-extra-writing"). + Tracef("txn(%s) from network, content: %v", txn.TxnHash.String(), txn.Raw.WrCall) + + err = k.Pool.CheckTxn(txn) + if err != nil { + logrus.Error("check extra writing from P2P into txpool error: ", err) + continue + } + err = k.Pool.Insert(txn) + if err != nil { + logrus.Error("insert extra writing from P2P into txpool error: ", err) } } return nil } -func (k *Kernel) subUnpackedTxns() (types.SignedTxns, error) { - byt, err := k.P2pNetwork.SubP2P(common.UnpackedTxnsTopic) +func (k *Kernel) subUnpackedWritings() (types.SignedTxns, error) { + byt, err := k.P2pNetwork.SubP2P(common.UnpackedWritingTopic) + if err != nil { + return nil, err + } + return types.DecodeSignedTxns(byt) +} + +func (k *Kernel) pubUnpackedWritings(txns types.SignedTxns) error { + byt, err := txns.Encode() + if err != nil { + return err + } + return k.P2pNetwork.PubP2P(common.UnpackedWritingTopic, byt) +} + +func (k *Kernel) subExtraWritings() (types.SignedTxns, error) { + byt, err := k.P2pNetwork.SubP2P(common.UnpackedExtraWritingTopic) if err != nil { return nil, err } return types.DecodeSignedTxns(byt) } -func (k *Kernel) pubUnpackedTxns(txns types.SignedTxns) error { +func (k *Kernel) pubExtraWritings(txns types.SignedTxns) error { byt, err := txns.Encode() if err != nil { return err } - return k.P2pNetwork.PubP2P(common.UnpackedTxnsTopic, byt) + return k.P2pNetwork.PubP2P(common.UnpackedExtraWritingTopic, byt) } func (k *Kernel) GetTripodInstance(name string) any { diff --git a/core/kernel/run.go b/core/kernel/run.go index 67e43d2e..7216bfb8 100644 --- a/core/kernel/run.go +++ b/core/kernel/run.go @@ -3,17 +3,17 @@ package kernel import ( "github.com/sirupsen/logrus" - . "github.com/yu-org/yu/common" - . "github.com/yu-org/yu/common/yerror" + "github.com/yu-org/yu/common" + "github.com/yu-org/yu/common/yerror" "github.com/yu-org/yu/core/context" - . "github.com/yu-org/yu/core/tripod" - . "github.com/yu-org/yu/core/types" + "github.com/yu-org/yu/core/tripod" + "github.com/yu-org/yu/core/types" ytime "github.com/yu-org/yu/utils/time" ) func (k *Kernel) AcceptUnpkgTxnsJob() { for { - err := k.AcceptUnpkgTxns() + err := k.AcceptUnpackedTxns() if err != nil { logrus.Errorf("accept unpacked txns error: %s", err.Error()) } @@ -26,7 +26,7 @@ func (k *Kernel) Run() { k.wg.Done() }() switch k.RunMode { - case LocalNode: + case common.LocalNode: for { select { case <-k.stopChan: @@ -44,7 +44,7 @@ func (k *Kernel) Run() { } } - case MasterWorker: + case common.MasterWorker: for { select { case <-k.stopChan: @@ -56,18 +56,18 @@ func (k *Kernel) Run() { } } default: - logrus.Panic(NoRunMode) + logrus.Panic(yerror.NoRunMode) } } -func (k *Kernel) LocalRun() (newBlock *Block, err error) { +func (k *Kernel) LocalRun() (newBlock *types.Block, err error) { newBlock, err = k.makeNewBasicBlock() if err != nil { return } // start a new block - err = k.Land.RangeList(func(tri *Tripod) error { + err = k.Land.RangeList(func(tri *tripod.Tripod) error { // start := time.Now() tri.BlockCycle.StartBlock(newBlock) // metrics.StartBlockDuration.WithLabelValues(strconv.FormatInt(int64(newBlock.Height), 10), tri.Name()).Observe(time.Now().Sub(start).Seconds()) @@ -78,7 +78,7 @@ func (k *Kernel) LocalRun() (newBlock *Block, err error) { } // end block - err = k.Land.RangeList(func(tri *Tripod) error { + err = k.Land.RangeList(func(tri *tripod.Tripod) error { // start := time.Now() tri.BlockCycle.EndBlock(newBlock) // metrics.EndBlockDuration.WithLabelValues(strconv.FormatInt(int64(newBlock.Height), 10), tri.Name()).Observe(time.Now().Sub(start).Seconds()) @@ -89,7 +89,7 @@ func (k *Kernel) LocalRun() (newBlock *Block, err error) { } // finalize this block - err = k.Land.RangeList(func(tri *Tripod) error { + err = k.Land.RangeList(func(tri *tripod.Tripod) error { // start := time.Now() tri.BlockCycle.FinalizeBlock(newBlock) // metrics.FinalizeBlockDuration.WithLabelValues(strconv.FormatInt(int64(newBlock.Height), 10), tri.Name()).Observe(time.Now().Sub(start).Seconds()) @@ -98,7 +98,7 @@ func (k *Kernel) LocalRun() (newBlock *Block, err error) { return } -func (k *Kernel) makeGenesisBlock() *Block { +func (k *Kernel) makeGenesisBlock() *types.Block { genesisBlock := k.Chain.NewEmptyBlock() genesisBlock.Timestamp = ytime.NowTsU64() @@ -108,7 +108,7 @@ func (k *Kernel) makeGenesisBlock() *Block { return genesisBlock } -func (k *Kernel) makeNewBasicBlock() (*Block, error) { +func (k *Kernel) makeNewBasicBlock() (*types.Block, error) { newBlock := k.Chain.NewEmptyBlock() newBlock.Timestamp = ytime.NowTsU64() @@ -123,10 +123,10 @@ func (k *Kernel) makeNewBasicBlock() (*Block, error) { return newBlock, nil } -func (k *Kernel) OrderedExecute(block *Block) error { +func (k *Kernel) SeqExecuteWritings(block *types.Block) error { stxns := block.Txns - receipts := make(map[Hash]*Receipt) + receipts := make(map[common.Hash]*types.Receipt) for i, stxn := range stxns { wrCall := stxn.Raw.WrCall @@ -137,12 +137,12 @@ func (k *Kernel) OrderedExecute(block *Block) error { continue } - writing, _ := k.Land.GetWriting(wrCall.TripodName, wrCall.FuncName) + write, _ := k.Land.GetWriting(wrCall.TripodName, wrCall.FuncName) - err = writing(ctx) - if IfLeiOut(ctx.LeiCost, block) { + err = write(ctx) + if types.IfLeiOut(ctx.LeiCost, block) { k.State.Discard() - receipt := k.HandleError(OutOfLei, ctx, block, stxn) + receipt := k.HandleError(yerror.OutOfLei, ctx, block, stxn) receipts[stxn.TxnHash] = receipt break } @@ -166,8 +166,8 @@ func (k *Kernel) OrderedExecute(block *Block) error { return k.PostExecute(block, receipts) } -func (k *Kernel) PostExecute(block *Block, receipts map[Hash]*Receipt) error { - k.Land.RangeList(func(t *Tripod) error { +func (k *Kernel) PostExecute(block *types.Block, receipts map[common.Hash]*types.Receipt) error { + k.Land.RangeList(func(t *tripod.Tripod) error { t.Committer.Commit(block) return nil }) @@ -185,13 +185,13 @@ func (k *Kernel) PostExecute(block *Block, receipts map[Hash]*Receipt) error { } // Because tripod.Committer could update this field. - if block.StateRoot == NullHash && stateRoot != nil { - block.StateRoot = BytesToHash(stateRoot) + if block.StateRoot == common.NullHash && stateRoot != nil { + block.StateRoot = common.BytesToHash(stateRoot) } // Because tripod.Committer could update this field. - if block.ReceiptRoot == NullHash { - block.ReceiptRoot, err = CaculateReceiptRoot(receipts) + if block.ReceiptRoot == common.NullHash { + block.ReceiptRoot, err = types.CaculateReceiptRoot(receipts) } return err } @@ -233,22 +233,22 @@ func (k *Kernel) MasterWorkerRun() error { return nil } -func (k *Kernel) HandleError(err error, ctx *context.WriteContext, block *Block, stxn *SignedTxn) *Receipt { +func (k *Kernel) HandleError(err error, ctx *context.WriteContext, block *types.Block, stxn *types.SignedTxn) *types.Receipt { logrus.Error("push error: ", err.Error()) - receipt := NewReceipt(ctx.Events, err, ctx.Extra) + receipt := types.NewReceipt(ctx.Events, err, ctx.Extra) k.HandleReceipt(ctx, receipt, block, stxn) return receipt } -func (k *Kernel) HandleEvent(ctx *context.WriteContext, block *Block, stxn *SignedTxn) *Receipt { - receipt := NewReceipt(ctx.Events, nil, ctx.Extra) +func (k *Kernel) HandleEvent(ctx *context.WriteContext, block *types.Block, stxn *types.SignedTxn) *types.Receipt { + receipt := types.NewReceipt(ctx.Events, nil, ctx.Extra) k.HandleReceipt(ctx, receipt, block, stxn) return receipt } -func (k *Kernel) HandleReceipt(ctx *context.WriteContext, receipt *Receipt, block *Block, stxn *SignedTxn) { +func (k *Kernel) HandleReceipt(ctx *context.WriteContext, receipt *types.Receipt, block *types.Block, stxn *types.SignedTxn) { receipt.FillMetadata(block, stxn, ctx.LeiCost) - receipt.BlockStage = ExecuteTxnsStage + receipt.BlockStage = common.ExecuteTxnsStage if k.Sub != nil { k.Sub.Emit(receipt) diff --git a/core/kernel/websocket.go b/core/kernel/websocket.go index 8291a312..13092641 100644 --- a/core/kernel/websocket.go +++ b/core/kernel/websocket.go @@ -18,6 +18,10 @@ func (k *Kernel) HandleWS() { r.GET(RdApiPath, func(ctx *gin.Context) { k.handleWS(ctx, reading) }) + // POST extra-writing call + r.POST(ExWrApiPath, func(ctx *gin.Context) { + k.handleWS(ctx, extraWriting) + }) r.GET(SubResultsPath, func(ctx *gin.Context) { k.handleWS(ctx, subscription) @@ -31,6 +35,7 @@ func (k *Kernel) HandleWS() { const ( reading = iota writing + extraWriting subscription ) @@ -57,6 +62,8 @@ func (k *Kernel) handleWS(ctx *gin.Context, typ int) { k.handleWsWr(ctx, string(params)) //case reading: // k.handleWsRd(c, req, string(params)) + case extraWriting: + k.handleWsExWr(ctx, string(params)) } } @@ -68,7 +75,20 @@ func (k *Kernel) handleWsWr(ctx *gin.Context, params string) { return } - err = k.HandleTxn(signedWrCall) + err = k.HandleWriting(signedWrCall) + if err != nil { + ctx.AbortWithError(http.StatusInternalServerError, err) + } +} + +func (k *Kernel) handleWsExWr(ctx *gin.Context, params string) { + signedExWrCall, err := GetSignedWrCall(ctx) + if err != nil { + ctx.AbortWithError(http.StatusBadRequest, err) + return + } + + err = k.HandleExtraWriting(signedExWrCall) if err != nil { ctx.AbortWithError(http.StatusInternalServerError, err) } diff --git a/core/protocol/url.go b/core/protocol/url.go index 62cce4dc..fdaceeb0 100644 --- a/core/protocol/url.go +++ b/core/protocol/url.go @@ -13,10 +13,11 @@ import ( const ( // RootApiPath For developers, every customized Writing and Read of tripods // will base on '/api'. - RootApiPath = "/api" - WrCallType = "writing" - RdCallType = "reading" - AdminType = "admin" + RootApiPath = "/api" + WrCallType = "writing" + RdCallType = "reading" + ExWrCallType = "extra-writing" + AdminType = "admin" TripodNameKey = "tripod_name" FuncNameKey = "func_name" @@ -25,6 +26,7 @@ const ( var ( WrApiPath = filepath.Join(RootApiPath, WrCallType) + ExWrApiPath = filepath.Join(RootApiPath, ExWrCallType) RdApiPath = filepath.Join(RootApiPath, RdCallType) AdminApiPath = filepath.Join(RootApiPath, AdminType) SubResultsPath = "/subscribe/results" diff --git a/infra/p2p/topics.go b/infra/p2p/topics.go index 6e33e54a..41454e6f 100644 --- a/infra/p2p/topics.go +++ b/infra/p2p/topics.go @@ -14,7 +14,7 @@ func (p *LibP2P) AddDefaultTopics() { p.AddTopic(StartBlockTopic) p.AddTopic(EndBlockTopic) p.AddTopic(FinalizeBlockTopic) - p.AddTopic(UnpackedTxnsTopic) + p.AddTopic(UnpackedWritingTopic) } func (p *LibP2P) AddTopic(topicName string) {