diff --git a/node/app/data_worker_node.go b/node/app/data_worker_node.go index 47337b7de..609ae43c4 100644 --- a/node/app/data_worker_node.go +++ b/node/app/data_worker_node.go @@ -2,10 +2,15 @@ package app import ( "fmt" + "net" "os" "sync" + "time" + "github.com/multiformats/go-multiaddr" + "github.com/pkg/errors" "go.uber.org/zap" + "source.quilibrium.com/quilibrium/monorepo/config" consensustime "source.quilibrium.com/quilibrium/monorepo/node/consensus/time" "source.quilibrium.com/quilibrium/monorepo/node/datarpc" "source.quilibrium.com/quilibrium/monorepo/types/crypto" @@ -15,6 +20,7 @@ import ( type DataWorkerNode struct { logger *zap.Logger + config *config.Config dataProofStore store.DataProofStore clockStore store.ClockStore coinStore store.TokenStore @@ -25,12 +31,14 @@ type DataWorkerNode struct { frameProver crypto.FrameProver globalTimeReel *consensustime.GlobalTimeReel parentProcess int + rpcMultiaddr string quit chan struct{} stopOnce sync.Once } func newDataWorkerNode( logger *zap.Logger, + config *config.Config, dataProofStore store.DataProofStore, clockStore store.ClockStore, coinStore store.TokenStore, @@ -41,10 +49,12 @@ func newDataWorkerNode( globalTimeReel *consensustime.GlobalTimeReel, coreId uint, parentProcess int, + rpcMultiaddr string, ) (*DataWorkerNode, error) { logger = logger.With(zap.String("process", fmt.Sprintf("worker %d", coreId))) return &DataWorkerNode{ logger: logger, + config: config, dataProofStore: dataProofStore, clockStore: clockStore, coinStore: coinStore, @@ -55,6 +65,7 @@ func newDataWorkerNode( frameProver: frameProver, globalTimeReel: globalTimeReel, parentProcess: parentProcess, + rpcMultiaddr: rpcMultiaddr, quit: make(chan struct{}), }, nil } @@ -63,17 +74,36 @@ func (n *DataWorkerNode) Start( done chan os.Signal, quitCh chan struct{}, ) error { + n.logger.Info( + "starting data worker node", + zap.Uint("core_id", n.coreId), + zap.String("rpc_multiaddr", n.rpcMultiaddr), + ) + go func() { err := n.ipcServer.Start() if err != nil { n.logger.Error( "error while starting ipc server for core", zap.Uint64("core", uint64(n.coreId)), + zap.Error(err), ) n.Stop() + } else { + n.logger.Info( + "IPC server started successfully", + zap.Uint("core_id", n.coreId), + ) } }() + // Start port health check in background + n.logger.Info( + "starting port health check monitor", + zap.Uint("core_id", n.coreId), + ) + go n.monitorPortHealth() + n.logger.Info("data worker node started", zap.Uint("core_id", n.coreId)) select { @@ -149,3 +179,258 @@ func (n *DataWorkerNode) GetFrameProver() crypto.FrameProver { func (n *DataWorkerNode) GetIPCServer() *datarpc.DataWorkerIPCServer { return n.ipcServer } + +// extractPortFromMultiaddr extracts the TCP port from a multiaddr string +func extractPortFromMultiaddr(multiaddrStr string) (string, error) { + ma, err := multiaddr.NewMultiaddr(multiaddrStr) + if err != nil { + return "", errors.Wrap(err, "failed to parse multiaddr") + } + + port, err := ma.ValueForProtocol(multiaddr.P_TCP) + if err != nil { + // Try UDP as fallback + port, err = ma.ValueForProtocol(multiaddr.P_UDP) + if err != nil { + return "", errors.Wrap(err, "failed to extract port from multiaddr") + } + } + + return port, nil +} + +// isPortListening checks if a port is currently listening (in use) +func isPortListening(port string) bool { + ln, err := net.Listen("tcp", ":"+port) + if err != nil { + // Port is in use (listening) or in TIME_WAIT state + return true + } + if err := ln.Close(); err != nil { + // Log but don't fail - port was available + } + // Port is available (not listening) + return false +} + +// waitForPortAvailable waits for a port to become available, checking periodically +// Returns true if port becomes available, false if timeout is reached +func waitForPortAvailable(port string, timeout time.Duration, logger *zap.Logger) bool { + deadline := time.Now().Add(timeout) + checkInterval := 500 * time.Millisecond + + logger.Info( + "waiting for port to become available", + zap.String("port", port), + zap.Duration("timeout", timeout), + ) + + for time.Now().Before(deadline) { + if !isPortListening(port) { + logger.Info( + "port is now available", + zap.String("port", port), + ) + return true + } + time.Sleep(checkInterval) + } + + logger.Warn( + "port did not become available within timeout", + zap.String("port", port), + zap.Duration("timeout", timeout), + ) + return false +} + +// WaitForWorkerPortsAvailable waits for both P2P and stream ports to become available +// This helps avoid race conditions when processes restart quickly +// Returns true if all ports are available, false otherwise +func WaitForWorkerPortsAvailable( + logger *zap.Logger, + config *config.Config, + coreId uint, + rpcMultiaddr string, + timeout time.Duration, +) bool { + var wg sync.WaitGroup + streamPortAvailable := make(chan bool, 1) + p2pPortAvailable := make(chan bool, 1) + + // Check stream port in parallel + streamPort, err := extractPortFromMultiaddr(rpcMultiaddr) + if err != nil { + logger.Warn( + "failed to extract stream port, skipping stream port availability check", + zap.String("multiaddr", rpcMultiaddr), + zap.Uint("core_id", coreId), + zap.Error(err), + ) + streamPortAvailable <- true // Skip check, assume available + } else { + wg.Add(1) + go func() { + defer wg.Done() + available := waitForPortAvailable(streamPort, timeout, logger) + streamPortAvailable <- available + }() + } + + // Check P2P port in parallel + if config.Engine.DataWorkerBaseP2PPort > 0 { + p2pPort := int(config.Engine.DataWorkerBaseP2PPort) + int(coreId) - 1 + p2pPortStr := fmt.Sprintf("%d", p2pPort) + wg.Add(1) + go func() { + defer wg.Done() + available := waitForPortAvailable(p2pPortStr, timeout, logger) + p2pPortAvailable <- available + }() + } else { + p2pPortAvailable <- true // Skip check, assume available + } + + // Wait for both checks to complete + wg.Wait() + + // Read results + streamOk := <-streamPortAvailable + p2pOk := <-p2pPortAvailable + + return streamOk && p2pOk +} + +// monitorPortHealth checks if both the stream port and P2P listen port are listening after startup +// The stream port is calculated as: base_stream_port + core_index - 1 +// The P2P listen port is calculated as: base_p2p_port + core_index - 1 +// The stream port check waits for the IPC server to be ready before checking +func (n *DataWorkerNode) monitorPortHealth() { + n.logger.Info( + "checking port health", + zap.Uint("core_id", n.coreId), + zap.String("rpc_multiaddr", n.rpcMultiaddr), + ) + + var wg sync.WaitGroup + streamResult := make(chan bool, 1) + p2pResult := make(chan bool, 1) + + // Extract stream port from multiaddr + streamPort, err := extractPortFromMultiaddr(n.rpcMultiaddr) + if err != nil { + n.logger.Error( + "failed to extract stream port from multiaddr, skipping stream port health check", + zap.String("multiaddr", n.rpcMultiaddr), + zap.Uint("core_id", n.coreId), + zap.Error(err), + ) + streamResult <- false // Mark as failed since we couldn't check + } else { + // Wait for IPC server to be ready before checking stream port + wg.Add(1) + go func() { + defer wg.Done() + // Wait for IPC server to start listening + n.logger.Debug( + "waiting for IPC server to be ready before checking stream port", + zap.String("port", streamPort), + zap.Uint("core_id", n.coreId), + ) + <-n.ipcServer.Ready() + n.logger.Debug( + "IPC server is ready, checking stream port", + zap.String("port", streamPort), + zap.Uint("core_id", n.coreId), + ) + isStreamListening := isPortListening(streamPort) + n.logger.Debug( + "stream port check completed", + zap.String("port", streamPort), + zap.Bool("is_listening", isStreamListening), + zap.Uint("core_id", n.coreId), + ) + + if !isStreamListening { + n.logger.Warn( + "stream port is not yet listening, may not be ready yet", + zap.String("port", streamPort), + zap.Uint("core_id", n.coreId), + ) + streamResult <- false + } else { + n.logger.Info( + "stream port is listening successfully", + zap.String("port", streamPort), + zap.Uint("core_id", n.coreId), + ) + streamResult <- true + } + }() + } + + // Check P2P listen port in parallel + // Calculate P2P port: base_p2p_port + core_index - 1 + if n.config.Engine.DataWorkerBaseP2PPort == 0 { + n.logger.Warn( + "DataWorkerBaseP2PPort is not set, skipping P2P port health check", + zap.Uint("core_id", n.coreId), + ) + p2pResult <- true // Skip check, assume OK + } else { + p2pPort := int(n.config.Engine.DataWorkerBaseP2PPort) + int(n.coreId) - 1 + p2pPortStr := fmt.Sprintf("%d", p2pPort) + + wg.Add(1) + go func() { + defer wg.Done() + n.logger.Debug( + "attempting to bind to P2P port to check if it's listening", + zap.String("port", p2pPortStr), + zap.Uint("core_id", n.coreId), + ) + isP2PListening := isPortListening(p2pPortStr) + n.logger.Debug( + "P2P port check completed", + zap.String("port", p2pPortStr), + zap.Bool("is_listening", isP2PListening), + zap.Uint("core_id", n.coreId), + ) + + if !isP2PListening { + n.logger.Error( + "P2P listen port is not yet listening, may not be ready yet", + zap.String("port", p2pPortStr), + zap.Uint("core_id", n.coreId), + ) + p2pResult <- false + } else { + n.logger.Info( + "P2P listen port is listening successfully", + zap.String("port", p2pPortStr), + zap.Uint("core_id", n.coreId), + ) + p2pResult <- true + } + }() + } + + // Wait for both checks to complete + wg.Wait() + + // Read results + streamOk := <-streamResult + p2pOk := <-p2pResult + + // Ports are listening successfully, reset attempt counter + if streamOk && p2pOk { + n.logger.Info( + "all ports are listening successfully", + zap.Uint("core_id", n.coreId), + ) + } + n.logger.Info( + "port health check completed", + zap.Uint("core_id", n.coreId), + ) +} diff --git a/node/app/wire_gen.go b/node/app/wire_gen.go index 4f89c2a25..07d0533e1 100644 --- a/node/app/wire_gen.go +++ b/node/app/wire_gen.go @@ -125,7 +125,7 @@ func NewDataWorkerNodeWithProxyPubsub(logger *zap.Logger, config2 *config.Config if err != nil { return nil, err } - dataWorkerNode, err := newDataWorkerNode(logger, pebbleDataProofStore, pebbleClockStore, pebbleTokenStore, fileKeyManager, pebbleDB, frameProver, dataWorkerIPCServer, globalTimeReel, coreId, parentProcess) + dataWorkerNode, err := newDataWorkerNode(logger, config2, pebbleDataProofStore, pebbleClockStore, pebbleTokenStore, fileKeyManager, pebbleDB, frameProver, dataWorkerIPCServer, globalTimeReel, coreId, parentProcess, rpcMultiaddr) if err != nil { return nil, err } @@ -182,7 +182,7 @@ func NewDataWorkerNodeWithoutProxyPubsub(logger *zap.Logger, config2 *config.Con if err != nil { return nil, err } - dataWorkerNode, err := newDataWorkerNode(logger, pebbleDataProofStore, pebbleClockStore, pebbleTokenStore, fileKeyManager, pebbleDB, frameProver, dataWorkerIPCServer, globalTimeReel, coreId, parentProcess) + dataWorkerNode, err := newDataWorkerNode(logger, config2, pebbleDataProofStore, pebbleClockStore, pebbleTokenStore, fileKeyManager, pebbleDB, frameProver, dataWorkerIPCServer, globalTimeReel, coreId, parentProcess, rpcMultiaddr) if err != nil { return nil, err } diff --git a/node/datarpc/data_worker_ipc_server.go b/node/datarpc/data_worker_ipc_server.go index edf365d8e..bc0c020e9 100644 --- a/node/datarpc/data_worker_ipc_server.go +++ b/node/datarpc/data_worker_ipc_server.go @@ -3,6 +3,7 @@ package datarpc import ( "context" "encoding/hex" + "sync" "time" pcrypto "github.com/libp2p/go-libp2p/core/crypto" @@ -48,6 +49,8 @@ type DataWorkerIPCServer struct { quit chan struct{} peerInfoCtx lifecycle.SignalerContext peerInfoCancel context.CancelFunc + ready chan struct{} + readyOnce sync.Once } func NewDataWorkerIPCServer( @@ -102,10 +105,12 @@ func NewDataWorkerIPCServer( frameProver: frameProver, peerInfoManager: peerInfoManager, quit: make(chan struct{}), + ready: make(chan struct{}), }, nil } func (r *DataWorkerIPCServer) Start() error { + r.logger.Info("starting DataWorkerIPCServer", zap.Uint32("core_id", r.coreId)) peerInfoCtx, peerInfoCancel, _ := lifecycle.WithSignallerAndCancel( context.Background(), ) @@ -118,15 +123,19 @@ func (r *DataWorkerIPCServer) Start() error { ) select { case <-peerInfoReady: + r.logger.Info("peer info manager started successfully", zap.Uint32("core_id", r.coreId)) case <-time.After(5 * time.Second): - r.logger.Warn("peer info manager did not start before timeout") + r.logger.Warn("peer info manager did not start before timeout", zap.Uint32("core_id", r.coreId)) } r.peerInfoCtx = peerInfoCtx r.peerInfoCancel = peerInfoCancel + r.logger.Info("respawning server", zap.Uint32("core_id", r.coreId)) r.RespawnServer(nil) + r.logger.Info("data worker ipc server started", zap.Uint32("core_id", r.coreId)) <-r.quit + r.logger.Info("data worker ipc server quit signal received, stopping", zap.Uint32("core_id", r.coreId)) return nil } @@ -218,6 +227,7 @@ func (r *DataWorkerIPCServer) RespawnServer(filter []byte) error { return errors.Wrap(err, "respawn server") } + r.logger.Info("attempting to listen on address", zap.String("address", r.listenAddrGRPC)) lis, err := mn.Listen(mg) if err != nil { return errors.Wrap(err, "respawn server") @@ -228,6 +238,10 @@ func (r *DataWorkerIPCServer) RespawnServer(filter []byte) error { zap.String("address", r.listenAddrGRPC), zap.String("resolved", lis.Addr().String()), ) + // Signal that the server is ready (listening) + r.readyOnce.Do(func() { + close(r.ready) + }) if len(filter) != 0 { globalTimeReel, err := r.appConsensusEngineFactory.CreateGlobalTimeReel() if err != nil { @@ -257,6 +271,11 @@ func (r *DataWorkerIPCServer) RespawnServer(filter []byte) error { return nil } +// Ready returns a channel that will be closed when the server has started listening +func (r *DataWorkerIPCServer) Ready() <-chan struct{} { + return r.ready +} + // CreateJoinProof implements protobufs.DataIPCServiceServer. func (r *DataWorkerIPCServer) CreateJoinProof( ctx context.Context, diff --git a/node/main.go b/node/main.go index 8a7d9f8a7..44b0876e1 100644 --- a/node/main.go +++ b/node/main.go @@ -528,6 +528,20 @@ func main() { rpcMultiaddr = strings.Replace(rpcMultiaddr, "/quic-v1", "", 1) rpcMultiaddr = strings.Replace(rpcMultiaddr, "udp", "tcp", 1) + if !app.WaitForWorkerPortsAvailable( + logger, + nodeConfig, + uint(*core), + rpcMultiaddr, + 30*time.Second, // Wait up to 30 seconds for ports to become available + ) { + logger.Error( + "ports not available, exiting - could not reserve necessary ports", + zap.Uint("core_id", uint(*core)), + ) + os.Exit(1) + } + dataWorkerNode, err := app.NewDataWorkerNode( logger, nodeConfig,