diff --git a/node/rpc/node_rpc_server.go b/node/rpc/node_rpc_server.go index e29874330..33a91a51c 100644 --- a/node/rpc/node_rpc_server.go +++ b/node/rpc/node_rpc_server.go @@ -222,10 +222,9 @@ func (r *RPCServer) GetWorkerInfo( info := []*protobufs.WorkerInfo{} for _, worker := range workers { info = append(info, &protobufs.WorkerInfo{ - CoreId: uint32(worker.CoreId), - Filter: worker.Filter, - // TODO(2.1.1+): Expose available storage - AvailableStorage: uint64(worker.TotalStorage), + CoreId: uint32(worker.CoreId), + Filter: worker.Filter, + AvailableStorage: uint64(worker.AvailableStorage), TotalStorage: uint64(worker.TotalStorage), }) } diff --git a/node/store/worker.go b/node/store/worker.go index 699132e26..a1e41c317 100644 --- a/node/store/worker.go +++ b/node/store/worker.go @@ -190,9 +190,9 @@ func encodeWorkerInfo(worker *store.WorkerInfo) ([]byte, error) { streamListenMultiaddrLen := uint16(len(worker.StreamListenMultiaddr)) filterLen := uint16(len(worker.Filter)) - // totalLen = coreId(8) + totalStorage(8) + automatic(1) + allocated(1) + // totalLen = coreId(8) + totalStorage(8) + availableStorage(8) + automatic(1) + allocated(1) // + 2 + listen + 2 + stream + 2 + filter - totalLen := 8 + 8 + 1 + 1 + 2 + int(listenMultiaddrLen) + 2 + + totalLen := 8 + 8 + 8 + 1 + 1 + 2 + int(listenMultiaddrLen) + 2 + int(streamListenMultiaddrLen) + 2 + int(filterLen) data := make([]byte, totalLen) @@ -203,6 +203,9 @@ func encodeWorkerInfo(worker *store.WorkerInfo) ([]byte, error) { binary.BigEndian.PutUint64(data[offset:], uint64(worker.TotalStorage)) offset += 8 + binary.BigEndian.PutUint64(data[offset:], uint64(worker.AvailableStorage)) + offset += 8 + if worker.Automatic { data[offset] = 1 } else { @@ -252,6 +255,15 @@ func decodeWorkerInfo(data []byte) (*store.WorkerInfo, error) { totalStorage := binary.BigEndian.Uint64(data[offset:]) offset += 8 + var availableStorage uint64 + // Backwards compatibility + if offset+8 <= len(data) { + availableStorage = binary.BigEndian.Uint64(data[offset:]) + offset += 8 + } else { + availableStorage = totalStorage + } + if offset+1 > len(data) { return nil, errors.New("truncated automatic flag") } @@ -312,6 +324,7 @@ func decodeWorkerInfo(data []byte) (*store.WorkerInfo, error) { StreamListenMultiaddr: streamListenMultiaddr, Filter: filter, TotalStorage: uint(totalStorage), + AvailableStorage: uint(availableStorage), Automatic: automatic, Allocated: allocated, }, nil diff --git a/node/worker/manager.go b/node/worker/manager.go index 239e7773e..6c67549b6 100644 --- a/node/worker/manager.go +++ b/node/worker/manager.go @@ -52,6 +52,12 @@ type WorkerManager struct { // IPC service clients serviceClients map[uint]*grpc.ClientConn + // Reconciler configuration (partition estimate) + reconcilerInterval time.Duration + reconcilerBufferBytes uint64 + reconcilerBufferPercent float64 + reconcilerStatPath string + // In-memory cache for quick lookups workersByFilter map[string]uint // filter hash -> worker id filtersByWorker map[uint][]byte // worker id -> filter @@ -75,15 +81,16 @@ func NewWorkerManager( ) error, ) typesWorker.WorkerManager { return &WorkerManager{ - store: store, - logger: logger.Named("worker_manager"), - workersByFilter: make(map[string]uint), - filtersByWorker: make(map[uint][]byte), - allocatedWorkers: make(map[uint]bool), - serviceClients: make(map[uint]*grpc.ClientConn), - config: config, - proposeFunc: proposeFunc, - decideFunc: decideFunc, + store: store, + logger: logger.Named("worker_manager"), + workersByFilter: make(map[string]uint), + filtersByWorker: make(map[uint][]byte), + allocatedWorkers: make(map[uint]bool), + serviceClients: make(map[uint]*grpc.ClientConn), + config: config, + proposeFunc: proposeFunc, + decideFunc: decideFunc, + reconcilerStatPath: config.DB.Path, } } @@ -109,6 +116,8 @@ func (w *WorkerManager) Start(ctx context.Context) error { return errors.Wrap(err, "start") } + w.startPartitionReconciler() + w.logger.Info("worker manager started successfully") return nil } @@ -140,6 +149,7 @@ func (w *WorkerManager) Stop() error { activeWorkersGauge.Set(0) allocatedWorkersGauge.Set(0) totalStorageGauge.Set(0) + availableStorageGauge.Set(0) w.logger.Info("worker manager stopped") return nil @@ -636,6 +646,7 @@ func (w *WorkerManager) getIPCOfWorker(coreId uint) ( StreamListenMultiaddr: addr.String(), Filter: nil, TotalStorage: 0, + AvailableStorage: 0, Automatic: len(w.config.Engine.DataWorkerP2PMultiaddrs) == 0, Allocated: false, }) diff --git a/node/worker/metrics.go b/node/worker/metrics.go index 4e4499071..f9c9536cf 100644 --- a/node/worker/metrics.go +++ b/node/worker/metrics.go @@ -55,4 +55,13 @@ var ( }, []string{"operation"}, // operation: register/allocate/deallocate/lookup ) + + availableStorageGauge = promauto.NewGauge( + prometheus.GaugeOpts{ + Namespace: "quilibrium", + Subsystem: "worker_manager", + Name: "available_storage_bytes", + Help: "Aggregated available storage (bytes) across workers as estimated by the manager", + }, + ) ) diff --git a/node/worker/reconciler.go b/node/worker/reconciler.go new file mode 100644 index 000000000..d245e3106 --- /dev/null +++ b/node/worker/reconciler.go @@ -0,0 +1,117 @@ +package worker + +import ( + "syscall" + "time" + + "github.com/pkg/errors" + "go.uber.org/zap" +) + +func (w *WorkerManager) startPartitionReconciler() { + if w.reconcilerInterval == 0 { + w.reconcilerInterval = 30 * time.Second + } + + if w.reconcilerStatPath == "" { + w.reconcilerStatPath = "." + } + + go func() { + ticker := time.NewTicker(w.reconcilerInterval) + defer ticker.Stop() + for { + select { + case <-w.ctx.Done(): + return + case <-ticker.C: + if err := reconcilePartitionAndPersist(w); err != nil { + w.logger.Error("partition reconcile failed", zap.Error(err)) + } + } + } + }() +} + +func reconcilePartitionAndPersist(w *WorkerManager) error { + total, avail, err := getPartitionUsage(w.reconcilerStatPath) + if err != nil { + return errors.Wrap(err, "get partition usage") + } + + buffer := w.reconcilerBufferBytes + if buffer == 0 && w.reconcilerBufferPercent > 0 { + buffer = uint64(float64(total) * w.reconcilerBufferPercent) + } + + if total == 0 { + return errors.New("partition total size is zero") + } + + usable := uint64(0) + if avail > buffer { + usable = avail - buffer + } else { + usable = 0 + } + + workers, err := w.store.RangeWorkers() + if err != nil { + return errors.Wrap(err, "range workers") + } + if len(workers) == 0 { + return nil + } + + perWorker := uint64(0) + if len(workers) > 0 { + perWorker = usable / uint64(len(workers)) + } + + txn, err := w.store.NewTransaction(false) + if err != nil { + return errors.Wrap(err, "new transaction for reconcile") + } + + var aggAvailable uint64 + for _, worker := range workers { + if worker == nil { + continue + } + if perWorker == 0 || worker.TotalStorage == 0 { + worker.AvailableStorage = 0 + } else { + if perWorker > uint64(worker.TotalStorage) { + worker.AvailableStorage = worker.TotalStorage + } else { + worker.AvailableStorage = uint(perWorker) + } + } + + if err := w.store.PutWorker(txn, worker); err != nil { + txn.Abort() + return errors.Wrap(err, "put worker during reconcile") + } + + aggAvailable += uint64(worker.AvailableStorage) + } + + if err := txn.Commit(); err != nil { + txn.Abort() + return errors.Wrap(err, "commit reconcile txn") + } + + availableStorageGauge.Set(float64(aggAvailable)) + return nil +} + +func getPartitionUsage(path string) (uint64, uint64, error) { + var stat syscall.Statfs_t + if err := syscall.Statfs(path, &stat); err != nil { + return 0, 0, errors.Wrapf(err, "statfs %s", path) + } + + total := uint64(stat.Blocks) * uint64(stat.Bsize) + avail := uint64(stat.Bavail) * uint64(stat.Bsize) + return total, avail, nil +} diff --git a/types/store/worker.go b/types/store/worker.go index f91b328aa..d859c34aa 100644 --- a/types/store/worker.go +++ b/types/store/worker.go @@ -6,6 +6,7 @@ type WorkerInfo struct { StreamListenMultiaddr string Filter []byte TotalStorage uint + AvailableStorage uint Automatic bool Allocated bool }