Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 24 additions & 47 deletions pkg/discovery/p2p/xatu/xatu.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,89 +137,66 @@ func (c *Coordinator) startCrons(ctx context.Context) error {
gocron.DurationJob(c.config.Restart),
gocron.NewTask(
func(ctx context.Context) {
forkIDHashes := make([][]byte, len(c.config.ForkIDHashes))
var bootNodes []string

// Fetch execution boot node
forkIDHashes := make([][]byte, len(c.config.ForkIDHashes))
for i, forkIDHash := range c.config.ForkIDHashes {
forkIDHashBytes, err := hex.DecodeString(forkIDHash[2:])
if err == nil {
forkIDHashes[i] = forkIDHashBytes
}
}

req := xatu.GetDiscoveryExecutionNodeRecordRequest{
execReq := xatu.GetDiscoveryExecutionNodeRecordRequest{
NetworkIds: c.config.NetworkIds,
ForkIdHashes: forkIDHashes,
}

md := metadata.New(c.config.Headers)
ctx = metadata.NewOutgoingContext(ctx, md)

res, err := c.pb.GetDiscoveryExecutionNodeRecord(ctx, &req, grpc.UseCompressor(gzip.Name))

execRes, err := c.pb.GetDiscoveryExecutionNodeRecord(ctx, &execReq, grpc.UseCompressor(gzip.Name))
if err != nil {
c.log.WithError(err).Error("Failed to get a execution discovery node record")

return
c.log.WithError(err).Error("Failed to get execution discovery node record")
} else {
bootNodes = append(bootNodes, execRes.NodeRecord)
}

if err = c.discV5.UpdateBootNodes([]string{res.NodeRecord}); err != nil {
c.log.WithError(err).Error("Failed to update discV5 boot nodes")

return
}

if err := c.discV5.Start(ctx); err != nil {
c.log.WithError(err).Error("Failed to start discV5")

return
}
},
ctx,
),
gocron.WithStartAt(gocron.WithStartImmediately()),
); err != nil {
return err
}

if _, err := c.scheduler.NewJob(
gocron.DurationJob(c.config.Restart),
gocron.NewTask(
func(ctx context.Context) {
// Fetch consensus boot node
forkDigests := make([][]byte, len(c.config.ForkDigests))

for i, forkDigest := range c.config.ForkDigests {
forkDigestBytes, err := hex.DecodeString(forkDigest[2:])
if err == nil {
forkDigests[i] = forkDigestBytes
}
}

req := xatu.GetDiscoveryConsensusNodeRecordRequest{
consReq := xatu.GetDiscoveryConsensusNodeRecordRequest{
NetworkIds: c.config.NetworkIds,
ForkDigests: forkDigests,
}

md := metadata.New(c.config.Headers)
ctx = metadata.NewOutgoingContext(ctx, md)

res, err := c.pb.GetDiscoveryConsensusNodeRecord(ctx, &req, grpc.UseCompressor(gzip.Name))

consRes, err := c.pb.GetDiscoveryConsensusNodeRecord(ctx, &consReq, grpc.UseCompressor(gzip.Name))
if err != nil {
c.log.WithError(err).Error("Failed to get a consensus discovery node record")

return
c.log.WithError(err).Error("Failed to get consensus discovery node record")
} else {
bootNodes = append(bootNodes, consRes.NodeRecord)
}

if err = c.discV5.UpdateBootNodes([]string{res.NodeRecord}); err != nil {
c.log.WithError(err).Error("Failed to update discV5 boot nodes")
// Update boot nodes if we have any
if len(bootNodes) > 0 {
if err = c.discV5.UpdateBootNodes(bootNodes); err != nil {
c.log.WithError(err).Error("Failed to update discV5 boot nodes")

return
}
return
}

if err := c.discV5.Start(ctx); err != nil {
c.log.WithError(err).Error("Failed to start discV5")
if err := c.discV5.Start(ctx); err != nil {
c.log.WithError(err).Error("Failed to start discV5")

return
return
}
}
},
ctx,
Expand Down
Loading