Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions apps/eth/ethrpc/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func (e *EthAPIBackend) Call(ctx context.Context, args TransactionArgs, blockNrO
rdCall.FuncName = "Call"
rdCall.Params = string(requestByt)

response, err := e.chain.HandleRead(rdCall)
response, err := e.chain.HandleReading(rdCall)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -441,7 +441,7 @@ func (e *EthAPIBackend) SendTx(ctx context.Context, signedTx *ethtypes.Transacti
},
}

return e.chain.HandleTxn(signedWrCall)
return e.chain.HandleWriting(signedWrCall)
}

func YuTxn2EthTxn(yuSignedTxn *yutypes.SignedTxn) (*ethtypes.Transaction, error) {
Expand Down Expand Up @@ -790,7 +790,7 @@ func (e *EthAPIBackend) adaptChainRead(req any, funcName string) (*yucontext.Res
Params: params,
}

resp, err := e.chain.HandleRead(rdCall)
resp, err := e.chain.HandleReading(rdCall)
if err != nil {
logrus.Error(fmt.Errorf("EthAPIBackend %v meet err: %v, param:%v", funcName, err, params))
return nil, fmt.Errorf("EthAPIBackend %v meet err: %v", funcName, err)
Expand Down
9 changes: 5 additions & 4 deletions common/p2p_topic.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package common

const (
StartBlockTopic = "start-block"
EndBlockTopic = "end-block"
FinalizeBlockTopic = "finalize-block"
UnpackedTxnsTopic = "unpacked-txns"
StartBlockTopic = "start-block"
EndBlockTopic = "end-block"
FinalizeBlockTopic = "finalize-block"
UnpackedWritingTopic = "unpacked-writing"
UnpackedExtraWritingTopic = "unpacked-extra-writing"
)
44 changes: 35 additions & 9 deletions core/kernel/handle_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
"github.com/yu-org/yu/metrics"
)

// HandleTxn handles txn from outside.
// You can also self-define your input by calling HandleTxn (not only by default http and ws)
func (k *Kernel) HandleTxn(signedWrCall *protocol.SignedWrCall) error {
// HandleWriting handles txn from outside.
// You can also self-define your input by calling HandleWriting (not only by default http and ws)
func (k *Kernel) HandleWriting(signedWrCall *protocol.SignedWrCall) error {
stxn, err := NewSignedTxn(signedWrCall.Call, signedWrCall.Pubkey, signedWrCall.Address, signedWrCall.Signature)
if err != nil {
return err
Expand All @@ -23,21 +23,44 @@ func (k *Kernel) HandleTxn(signedWrCall *protocol.SignedWrCall) error {
if err != nil {
return err
}
err = k.handleTxnLocally(stxn)
err = k.handleTxnLocally(stxn, "")
if err != nil {
return err
}
go func() {
err = k.pubUnpackedTxns(FromArray(stxn))
err = k.pubUnpackedWritings(FromArray(stxn))
if err != nil {
logrus.Error("publish unpacked txns error: ", err)
logrus.Error("publish unpacked writing error: ", err)
}
}()

return nil
}

func (k *Kernel) handleTxnLocally(stxn *SignedTxn) error {
func (k *Kernel) HandleExtraWriting(call *protocol.SignedWrCall) error {
stxn, err := NewSignedTxn(call.Call, call.Pubkey, call.Address, call.Signature)
if err != nil {
return err
}
exWrCall := call.Call
_, err = k.Land.GetExtraWriting(exWrCall.TripodName, exWrCall.FuncName)
if err != nil {
return err
}
err = k.handleTxnLocally(stxn, common.UnpackedExtraWritingTopic)
if err != nil {
return err
}
go func() {
err = k.pubExtraWritings(FromArray(stxn))
if err != nil {
logrus.Error("publish extra writings error: ", err)
}
}()
return nil
}

func (k *Kernel) handleTxnLocally(stxn *SignedTxn, topic string) error {
metrics.KernelHandleTxnCounter.WithLabelValues().Inc()
tri := k.Land.GetTripod(stxn.TripodName())
if tri != nil {
Expand All @@ -53,10 +76,13 @@ func (k *Kernel) handleTxnLocally(stxn *SignedTxn) error {
if err != nil {
return err
}
return k.Pool.Insert(stxn)
if topic == "" {
return k.Pool.Insert(stxn)
}
return k.Pool.InsertWithTopic(topic, stxn)
}

func (k *Kernel) HandleRead(rdCall *common.RdCall) (*context.ResponseData, error) {
func (k *Kernel) HandleReading(rdCall *common.RdCall) (*context.ResponseData, error) {
ctx, err := context.NewReadContext(rdCall)
if err != nil {
return nil, err
Expand Down
21 changes: 19 additions & 2 deletions core/kernel/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ func (k *Kernel) HandleHttp() {
api.POST(RdCallType, func(c *gin.Context) {
k.handleHttpRd(c)
})
// POST extra-writing call
api.POST(ExWrCallType, func(c *gin.Context) {
k.handleHttpExWr(c)
})

api.GET("block", k.GetBlock)

Expand Down Expand Up @@ -48,7 +52,20 @@ func (k *Kernel) handleHttpWr(c *gin.Context) {
return
}

err = k.HandleTxn(signedWrCall)
err = k.HandleWriting(signedWrCall)
if err != nil {
c.AbortWithError(http.StatusBadRequest, err)
}
}

func (k *Kernel) handleHttpExWr(c *gin.Context) {
signedExWrCall, err := GetSignedWrCall(c)
if err != nil {
c.AbortWithError(http.StatusBadRequest, err)
return
}

err = k.HandleExtraWriting(signedExWrCall)
if err != nil {
c.AbortWithError(http.StatusBadRequest, err)
}
Expand All @@ -61,7 +78,7 @@ func (k *Kernel) handleHttpRd(c *gin.Context) {
return
}

respData, err := k.HandleRead(rdCall)
respData, err := k.HandleReading(rdCall)
if err != nil {
c.AbortWithError(http.StatusBadRequest, err)
return
Expand Down
63 changes: 52 additions & 11 deletions core/kernel/kernel.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func NewKernel(
wg: &sync.WaitGroup{},
}

env.Execute = k.OrderedExecute
env.Execute = k.SeqExecuteWritings

// Configure the handlers in P2P network

Expand Down Expand Up @@ -103,49 +103,90 @@ func (k *Kernel) InitBlockChain() {
})
}

func (k *Kernel) AcceptUnpkgTxns() error {
txns, err := k.subUnpackedTxns()
func (k *Kernel) AcceptUnpackedTxns() error {
writings, err := k.subUnpackedWritings()
if err != nil {
return err
}

for _, txn := range txns {
for _, txn := range writings {
if k.CheckReplayAttack(txn) {
continue
}
txn.FromP2P = true

logrus.WithField("p2p", "accept-txn").
logrus.WithField("p2p", "accept-writing").
Tracef("txn(%s) from network, content: %v", txn.TxnHash.String(), txn.Raw.WrCall)

err = k.Pool.CheckTxn(txn)
if err != nil {
logrus.Error("check txn from P2P into txpool error: ", err)
logrus.Error("check writing from P2P into txpool error: ", err)
continue
}
err = k.Pool.Insert(txn)
if err != nil {
logrus.Error("insert txn from P2P into txpool error: ", err)
logrus.Error("insert writing from P2P into txpool error: ", err)
}
}

extraWritings, err := k.subExtraWritings()
if err != nil {
return err
}

for _, txn := range extraWritings {
if k.CheckReplayAttack(txn) {
continue
}
txn.FromP2P = true

logrus.WithField("p2p", "accept-extra-writing").
Tracef("txn(%s) from network, content: %v", txn.TxnHash.String(), txn.Raw.WrCall)

err = k.Pool.CheckTxn(txn)
if err != nil {
logrus.Error("check extra writing from P2P into txpool error: ", err)
continue
}
err = k.Pool.Insert(txn)
if err != nil {
logrus.Error("insert extra writing from P2P into txpool error: ", err)
}
}

return nil
}

func (k *Kernel) subUnpackedTxns() (types.SignedTxns, error) {
byt, err := k.P2pNetwork.SubP2P(common.UnpackedTxnsTopic)
func (k *Kernel) subUnpackedWritings() (types.SignedTxns, error) {
byt, err := k.P2pNetwork.SubP2P(common.UnpackedWritingTopic)
if err != nil {
return nil, err
}
return types.DecodeSignedTxns(byt)
}

func (k *Kernel) pubUnpackedWritings(txns types.SignedTxns) error {
byt, err := txns.Encode()
if err != nil {
return err
}
return k.P2pNetwork.PubP2P(common.UnpackedWritingTopic, byt)
}

func (k *Kernel) subExtraWritings() (types.SignedTxns, error) {
byt, err := k.P2pNetwork.SubP2P(common.UnpackedExtraWritingTopic)
if err != nil {
return nil, err
}
return types.DecodeSignedTxns(byt)
}

func (k *Kernel) pubUnpackedTxns(txns types.SignedTxns) error {
func (k *Kernel) pubExtraWritings(txns types.SignedTxns) error {
byt, err := txns.Encode()
if err != nil {
return err
}
return k.P2pNetwork.PubP2P(common.UnpackedTxnsTopic, byt)
return k.P2pNetwork.PubP2P(common.UnpackedExtraWritingTopic, byt)
}

func (k *Kernel) GetTripodInstance(name string) any {
Expand Down
Loading