Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions node/rpc/node_rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,9 @@ func (r *RPCServer) GetWorkerInfo(
info := []*protobufs.WorkerInfo{}
for _, worker := range workers {
info = append(info, &protobufs.WorkerInfo{
CoreId: uint32(worker.CoreId),
Filter: worker.Filter,
// TODO(2.1.1+): Expose available storage
AvailableStorage: uint64(worker.TotalStorage),
CoreId: uint32(worker.CoreId),
Filter: worker.Filter,
AvailableStorage: uint64(worker.AvailableStorage),
TotalStorage: uint64(worker.TotalStorage),
})
}
Expand Down
17 changes: 15 additions & 2 deletions node/store/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,9 @@ func encodeWorkerInfo(worker *store.WorkerInfo) ([]byte, error) {
streamListenMultiaddrLen := uint16(len(worker.StreamListenMultiaddr))
filterLen := uint16(len(worker.Filter))

// totalLen = coreId(8) + totalStorage(8) + automatic(1) + allocated(1)
// totalLen = coreId(8) + totalStorage(8) + availableStorage(8) + automatic(1) + allocated(1)
// + 2 + listen + 2 + stream + 2 + filter
totalLen := 8 + 8 + 1 + 1 + 2 + int(listenMultiaddrLen) + 2 +
totalLen := 8 + 8 + 8 + 1 + 1 + 2 + int(listenMultiaddrLen) + 2 +
int(streamListenMultiaddrLen) + 2 + int(filterLen)
data := make([]byte, totalLen)

Expand All @@ -203,6 +203,9 @@ func encodeWorkerInfo(worker *store.WorkerInfo) ([]byte, error) {
binary.BigEndian.PutUint64(data[offset:], uint64(worker.TotalStorage))
offset += 8

binary.BigEndian.PutUint64(data[offset:], uint64(worker.AvailableStorage))
offset += 8

if worker.Automatic {
data[offset] = 1
} else {
Expand Down Expand Up @@ -252,6 +255,15 @@ func decodeWorkerInfo(data []byte) (*store.WorkerInfo, error) {
totalStorage := binary.BigEndian.Uint64(data[offset:])
offset += 8

var availableStorage uint64
// Backwards compatibility
if offset+8 <= len(data) {
availableStorage = binary.BigEndian.Uint64(data[offset:])
offset += 8
} else {
availableStorage = totalStorage
}

if offset+1 > len(data) {
return nil, errors.New("truncated automatic flag")
}
Expand Down Expand Up @@ -312,6 +324,7 @@ func decodeWorkerInfo(data []byte) (*store.WorkerInfo, error) {
StreamListenMultiaddr: streamListenMultiaddr,
Filter: filter,
TotalStorage: uint(totalStorage),
AvailableStorage: uint(availableStorage),
Automatic: automatic,
Allocated: allocated,
}, nil
Expand Down
29 changes: 20 additions & 9 deletions node/worker/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ type WorkerManager struct {
// IPC service clients
serviceClients map[uint]*grpc.ClientConn

// Reconciler configuration (partition estimate)
reconcilerInterval time.Duration
reconcilerBufferBytes uint64
reconcilerBufferPercent float64
reconcilerStatPath string

// In-memory cache for quick lookups
workersByFilter map[string]uint // filter hash -> worker id
filtersByWorker map[uint][]byte // worker id -> filter
Expand All @@ -75,15 +81,16 @@ func NewWorkerManager(
) error,
) typesWorker.WorkerManager {
return &WorkerManager{
store: store,
logger: logger.Named("worker_manager"),
workersByFilter: make(map[string]uint),
filtersByWorker: make(map[uint][]byte),
allocatedWorkers: make(map[uint]bool),
serviceClients: make(map[uint]*grpc.ClientConn),
config: config,
proposeFunc: proposeFunc,
decideFunc: decideFunc,
store: store,
logger: logger.Named("worker_manager"),
workersByFilter: make(map[string]uint),
filtersByWorker: make(map[uint][]byte),
allocatedWorkers: make(map[uint]bool),
serviceClients: make(map[uint]*grpc.ClientConn),
config: config,
proposeFunc: proposeFunc,
decideFunc: decideFunc,
reconcilerStatPath: config.DB.Path,
}
}

Expand All @@ -109,6 +116,8 @@ func (w *WorkerManager) Start(ctx context.Context) error {
return errors.Wrap(err, "start")
}

w.startPartitionReconciler()

w.logger.Info("worker manager started successfully")
return nil
}
Expand Down Expand Up @@ -140,6 +149,7 @@ func (w *WorkerManager) Stop() error {
activeWorkersGauge.Set(0)
allocatedWorkersGauge.Set(0)
totalStorageGauge.Set(0)
availableStorageGauge.Set(0)

w.logger.Info("worker manager stopped")
return nil
Expand Down Expand Up @@ -636,6 +646,7 @@ func (w *WorkerManager) getIPCOfWorker(coreId uint) (
StreamListenMultiaddr: addr.String(),
Filter: nil,
TotalStorage: 0,
AvailableStorage: 0,
Automatic: len(w.config.Engine.DataWorkerP2PMultiaddrs) == 0,
Allocated: false,
})
Expand Down
9 changes: 9 additions & 0 deletions node/worker/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,13 @@ var (
},
[]string{"operation"}, // operation: register/allocate/deallocate/lookup
)

availableStorageGauge = promauto.NewGauge(
prometheus.GaugeOpts{
Namespace: "quilibrium",
Subsystem: "worker_manager",
Name: "available_storage_bytes",
Help: "Aggregated available storage (bytes) across workers as estimated by the manager",
},
)
)
117 changes: 117 additions & 0 deletions node/worker/reconciler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package worker

import (
"syscall"
"time"

"github.com/pkg/errors"
"go.uber.org/zap"
)

func (w *WorkerManager) startPartitionReconciler() {
if w.reconcilerInterval == 0 {
w.reconcilerInterval = 30 * time.Second
}

if w.reconcilerStatPath == "" {
w.reconcilerStatPath = "."
}

go func() {
ticker := time.NewTicker(w.reconcilerInterval)
defer ticker.Stop()
for {
select {
case <-w.ctx.Done():
return
case <-ticker.C:
if err := reconcilePartitionAndPersist(w); err != nil {
w.logger.Error("partition reconcile failed", zap.Error(err))
}
}
}
}()
}

func reconcilePartitionAndPersist(w *WorkerManager) error {
total, avail, err := getPartitionUsage(w.reconcilerStatPath)
if err != nil {
return errors.Wrap(err, "get partition usage")
}

buffer := w.reconcilerBufferBytes
if buffer == 0 && w.reconcilerBufferPercent > 0 {
buffer = uint64(float64(total) * w.reconcilerBufferPercent)
}

if total == 0 {
return errors.New("partition total size is zero")
}

usable := uint64(0)
if avail > buffer {
usable = avail - buffer
} else {
usable = 0
}

workers, err := w.store.RangeWorkers()
if err != nil {
return errors.Wrap(err, "range workers")
}
if len(workers) == 0 {
return nil
}

perWorker := uint64(0)
if len(workers) > 0 {
perWorker = usable / uint64(len(workers))
}

txn, err := w.store.NewTransaction(false)
if err != nil {
return errors.Wrap(err, "new transaction for reconcile")
}

var aggAvailable uint64
for _, worker := range workers {
if worker == nil {
continue
}
if perWorker == 0 || worker.TotalStorage == 0 {
worker.AvailableStorage = 0
} else {
if perWorker > uint64(worker.TotalStorage) {
worker.AvailableStorage = worker.TotalStorage
} else {
worker.AvailableStorage = uint(perWorker)
}
}

if err := w.store.PutWorker(txn, worker); err != nil {
txn.Abort()
return errors.Wrap(err, "put worker during reconcile")
}

aggAvailable += uint64(worker.AvailableStorage)
}

if err := txn.Commit(); err != nil {
txn.Abort()
return errors.Wrap(err, "commit reconcile txn")
}

availableStorageGauge.Set(float64(aggAvailable))
return nil
}

func getPartitionUsage(path string) (uint64, uint64, error) {
var stat syscall.Statfs_t
if err := syscall.Statfs(path, &stat); err != nil {
return 0, 0, errors.Wrapf(err, "statfs %s", path)
}

total := uint64(stat.Blocks) * uint64(stat.Bsize)
avail := uint64(stat.Bavail) * uint64(stat.Bsize)
return total, avail, nil
}
1 change: 1 addition & 0 deletions types/store/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ type WorkerInfo struct {
StreamListenMultiaddr string
Filter []byte
TotalStorage uint
AvailableStorage uint
Automatic bool
Allocated bool
}
Expand Down