Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/poa/poa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func runNode(poaNode *Poa, mockP2P *p2p.MockP2p, wg *sync.WaitGroup) {

k := kernel.NewKernel(cfg, env, land)
for i := 0; i < 10; i++ {
err := k.LocalRun()
err := k.LoopRun()
if err != nil {
panic(err)
}
Expand Down
4 changes: 4 additions & 0 deletions core/context/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func (rc *ReadContext) GetBlockHash() *common.Hash {
return rc.BlockHash
}

func (rc *ReadContext) GetParams() string {
return rc.rdCall.Params
}

func (rc *ReadContext) Json(code int, v any) {
rc.resp = &ResponseData{
StatusCode: code,
Expand Down
132 changes: 65 additions & 67 deletions core/kernel/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,39 +20,27 @@ func (k *Kernel) Run() {
}

}()

switch k.RunMode {
case LocalNode:
for {
select {
case <-k.stopChan:
logrus.Info("Stop the Chain!")
for {
select {
case <-k.stopChan:
logrus.Info("Stop the Chain!")
return
default:
block, err := k.LoopRun()
if err != nil {
logrus.Panicf("local-run blockchain error: %s on Block(%d)", err.Error(), block.Height)
}
if block.Height == k.cfg.MaxBlockNum {
logrus.Infof("Stop the Chain on Block(%d)", block.Height)
return
default:
block, err := k.LocalRun()
if err != nil {
logrus.Panicf("local-run blockchain error: %s on Block(%d)", err.Error(), block.Height)
}
if block.Height == k.cfg.MaxBlockNum {
logrus.Infof("Stop the Chain on Block(%d)", block.Height)
return
}
}

}
case MasterWorker:
for {
err := k.MasterWorkerRun()
logrus.Errorf("master-worker-run blockchain error: %s", err.Error())
}

default:
logrus.Panic(NoRunMode)
}

}

func (k *Kernel) LocalRun() (newBlock *Block, err error) {
func (k *Kernel) LoopRun() (newBlock *Block, err error) {
newBlock, err = k.makeNewBasicBlock()
if err != nil {
return
Expand All @@ -61,9 +49,12 @@ func (k *Kernel) LocalRun() (newBlock *Block, err error) {
// start a new block
err = k.Land.RangeList(func(tri *Tripod) error {
// start := time.Now()
tri.BlockCycle.StartBlock(newBlock)
err = tri.StartBlock(newBlock)
if err != nil {
logrus.Errorf("tripod(%s) StartBlock failed: %v", tri.Name(), err)
}
return err
// metrics.StartBlockDuration.WithLabelValues(strconv.FormatInt(int64(newBlock.Height), 10), tri.Name()).Observe(time.Now().Sub(start).Seconds())
return nil
})
if err != nil {
return
Expand All @@ -72,9 +63,12 @@ func (k *Kernel) LocalRun() (newBlock *Block, err error) {
// end block
err = k.Land.RangeList(func(tri *Tripod) error {
// start := time.Now()
tri.BlockCycle.EndBlock(newBlock)
err = tri.EndBlock(newBlock)
if err != nil {
logrus.Errorf("tripod(%s) EndBlock failed: %v", tri.Name(), err)
}
return err
// metrics.EndBlockDuration.WithLabelValues(strconv.FormatInt(int64(newBlock.Height), 10), tri.Name()).Observe(time.Now().Sub(start).Seconds())
return nil
})
if err != nil {
return
Expand All @@ -83,9 +77,12 @@ func (k *Kernel) LocalRun() (newBlock *Block, err error) {
// finalize this block
err = k.Land.RangeList(func(tri *Tripod) error {
// start := time.Now()
tri.BlockCycle.FinalizeBlock(newBlock)
err = tri.FinalizeBlock(newBlock)
if err != nil {
logrus.Errorf("tripod(%s) FinalizeBlock failed: %v", tri.Name(), err)
}
return err
// metrics.FinalizeBlockDuration.WithLabelValues(strconv.FormatInt(int64(newBlock.Height), 10), tri.Name()).Observe(time.Now().Sub(start).Seconds())
return nil
})
return
}
Expand Down Expand Up @@ -188,42 +185,43 @@ func (k *Kernel) PostExecute(block *Block, receipts map[Hash]*Receipt) error {
return err
}

func (k *Kernel) MasterWorkerRun() error {
//workersIps, err := k.allWorkersIP()
//if err != nil {
// return err
//}
//
//newBlock := k.Chain.NewDefaultBlock()
//
//err = k.nortifyWorker(workersIps, StartBlockPath, newBlock)
//if err != nil {
// return err
//}
//
//// todo: if need broadcast block,
//// k.readyBroadcastBlock(newBlock)
//
//err = k.SyncTxns(newBlock)
//if err != nil {
// return err
//}
//
//err = k.nortifyWorker(workersIps, EndBlockPath, newBlock)
//if err != nil {
// return err
//}
//
//go func() {
// err := k.nortifyWorker(workersIps, ExecuteTxnsPath, newBlock)
// if err != nil {
// logrus.Errorf("nortify worker executing txns error: %s", err.Error())
// }
//}()
//
//return k.nortifyWorker(workersIps, FinalizeBlockPath, newBlock)
return nil
}
//func (k *Kernel) MasterWorkerRun() (newBlock *Block, err error) {
// newBlock, err = k.makeNewBasicBlock()
// if err != nil {
// return
// }
//
// // start a new block
// err = k.Land.RangeList(func(tri *Tripod) error {
// // start := time.Now()
// tri.BlockCycle.StartBlock(newBlock)
// // metrics.StartBlockDuration.WithLabelValues(strconv.FormatInt(int64(newBlock.Height), 10), tri.Name()).Observe(time.Now().Sub(start).Seconds())
// return nil
// })
// if err != nil {
// return
// }
//
// // end block
// err = k.Land.RangeList(func(tri *Tripod) error {
// // start := time.Now()
// tri.BlockCycle.EndBlock(newBlock)
// // metrics.EndBlockDuration.WithLabelValues(strconv.FormatInt(int64(newBlock.Height), 10), tri.Name()).Observe(time.Now().Sub(start).Seconds())
// return nil
// })
// if err != nil {
// return
// }
//
// // finalize this block
// err = k.Land.RangeList(func(tri *Tripod) error {
// // start := time.Now()
// tri.BlockCycle.FinalizeBlock(newBlock)
// // metrics.FinalizeBlockDuration.WithLabelValues(strconv.FormatInt(int64(newBlock.Height), 10), tri.Name()).Observe(time.Now().Sub(start).Seconds())
// return nil
// })
// return
//}

func (k *Kernel) HandleError(err error, ctx *context.WriteContext, block *Block, stxn *SignedTxn) *Receipt {
logrus.Error("push error: ", err.Error())
Expand Down
46 changes: 28 additions & 18 deletions core/startup/grpc.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
package startup

//func StartGrpcServer(cfg *config.KernelConf) {
// if cfg.RunMode != common.MasterWorker {
// return
// }
// lis, err := net.Listen("tcp", cfg.GrpcPort)
// if err != nil {
// logrus.Fatal("listen for grpc failed: ", err)
// }
// grpcServer := grpc.NewServer()
// goproto.RegisterStateDBServer(grpcServer, state.NewGrpcMptKV(StateDB))
// goproto.RegisterLandServer(grpcServer, tripod.NewGrpcLand(Land))
// // TODO: add chain server, pool server, txndb server.
//
// err = grpcServer.Serve(lis)
// if err != nil {
// logrus.Fatal("failed to serve grpc: ", err)
// }
//}
import (
"github.com/sirupsen/logrus"
"github.com/yu-org/yu/common"
"github.com/yu-org/yu/config"
"github.com/yu-org/yu/core/tripod"
"github.com/yu-org/yu/core/types/goproto"
"google.golang.org/grpc"
"net"
)

func StartGrpcServer(cfg *config.KernelConf) {
if cfg.RunMode != common.MasterWorker {
return
}
lis, err := net.Listen("tcp", cfg.GrpcPort)
if err != nil {
logrus.Fatal("listen for grpc failed: ", err)
}
grpcServer := grpc.NewServer()
//goproto.RegisterStateDBServer(grpcServer, state.NewGrpcMptKV(StateDB))
goproto.RegisterLandServer(grpcServer, tripod.NewGrpcLand(Land))
// TODO: add chain server, pool server, txndb server.

err = grpcServer.Serve(lis)
if err != nil {
logrus.Fatal("failed to serve grpc: ", err)
}
}
2 changes: 1 addition & 1 deletion core/startup/startup.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func InitKernel(cfg *config.KernelConf) *kernel.Kernel {
StateDB = state.NewStateDB(cfg.StatedbType, kvdb)
}

// StartGrpcServer(cfg)
StartGrpcServer(cfg)

chainEnv := &env.ChainEnv{
State: StateDB,
Expand Down
Loading