Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 268 additions & 3 deletions cmd/m3fs/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package main

import (
"bufio"
"encoding/json"
"fmt"
"os"
Expand All @@ -41,6 +42,7 @@ import (
"github.com/open3fs/m3fs/pkg/pg/model"
"github.com/open3fs/m3fs/pkg/storage"
"github.com/open3fs/m3fs/pkg/task"
"github.com/open3fs/m3fs/pkg/utils"
)

var clusterCmd = &cli.Command{
Expand Down Expand Up @@ -121,6 +123,46 @@ var clusterCmd = &cli.Command{
},
},
},
{
Name: "add-client",
Usage: "Add 3fs cluster clients",
Action: addClient,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "config",
Aliases: []string{"c"},
Usage: "Path to the cluster configuration file",
Destination: &configFilePath,
Required: true,
},
&cli.StringFlag{
Name: "workdir",
Aliases: []string{"w"},
Usage: "Path to the working directory (default is current directory)",
Destination: &workDir,
},
},
},
{
Name: "delete-client",
Usage: "Delete 3fs cluster clients",
Action: deleteClient,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "config",
Aliases: []string{"c"},
Usage: "Path to the cluster configuration file",
Destination: &configFilePath,
Required: true,
},
&cli.StringFlag{
Name: "workdir",
Aliases: []string{"w"},
Usage: "Path to the working directory (default is current directory)",
Destination: &workDir,
},
},
},
{
Name: "prepare",
Usage: "Prepare to deploy a 3fs cluster",
Expand Down Expand Up @@ -201,7 +243,9 @@ func createCluster(ctx *cli.Context) error {
BeginNodeID: 10001,
},
new(mgmtd.InitUserAndChainTask),
new(fsclient.Create3FSClientServiceTask),
&fsclient.Create3FSClientServiceTask{
ClientNodes: cfg.Services.Client.Nodes,
},
)
if err != nil {
return errors.Trace(err)
Expand All @@ -223,7 +267,9 @@ func deleteCluster(ctx *cli.Context) error {
}

runnerTasks := []task.Interface{
new(fsclient.Delete3FSClientServiceTask),
&fsclient.Delete3FSClientServiceTask{
ClientNodes: cfg.Services.Client.Nodes,
},
new(storage.DeleteStorageServiceTask),
new(meta.DeleteMetaServiceTask),
new(mgmtd.DeleteMgmtdServiceTask),
Expand Down Expand Up @@ -350,6 +396,56 @@ func syncNodeModels(cfg *config.Config, db *gorm.DB) error {
return nil
}

func syncClientModels(cfg *config.Config, db *gorm.DB) error {
var nodes []model.Node
err := db.Model(new(model.Node)).Find(&nodes).Error
if err != nil {
return errors.Trace(err)
}
nodesMap := make(map[string]model.Node, len(nodes))
for _, node := range nodes {
nodesMap[node.Name] = node
}

var clients []model.FuseClient
err = db.Model(new(model.FuseClient)).Find(&clients).Error
if err != nil {
return errors.Trace(err)
}
clientsMap := make(map[uint]model.FuseClient, len(clients))
for _, client := range clients {
clientsMap[client.NodeID] = client
}

clientNodes := cfg.Services.Client.Nodes
err = db.Transaction(func(tx *gorm.DB) error {
for _, clientNode := range clientNodes {
node, ok := nodesMap[clientNode]
if !ok {
return errors.Errorf("client node %s not found", clientNode)
}
_, ok = clientsMap[node.ID]
if ok {
continue
}
client := model.FuseClient{
Name: cfg.Services.Client.ContainerName,
NodeID: node.ID,
HostMountpoint: cfg.Services.Client.HostMountpoint,
}
if err = tx.Model(&client).Create(&client).Error; err != nil {
return errors.Trace(err)
}
}
return nil
})
if err != nil {
return errors.Trace(err)
}

return nil
}

func addStorageNodes(ctx *cli.Context) error {
cfg, err := loadClusterConfig()
if err != nil {
Expand Down Expand Up @@ -402,6 +498,17 @@ func addStorageNodes(ctx *cli.Context) error {
if len(newStorNodes) == 0 && changePlan == nil {
return errors.New("No new storage nodes to add")
}

newStorNodeNames := make([]string, len(newStorNodes))
for i, node := range newStorNodes {
newStorNodeNames[i] = node.Name
}
log.Logger.Infof("New storage nodes to add: %v", newStorNodeNames)
if !waitUserConfirm() {
log.Logger.Infof("Operation cancelled by user")
return nil
}

storCreated := false
for _, step := range steps {
if step.OperationType == model.ChangePlanStepOpType.CreateStorService && step.FinishAt != nil {
Expand Down Expand Up @@ -448,10 +555,168 @@ func addStorageNodes(ctx *cli.Context) error {
runner.Runtime.Store(task.RuntimeNodesMapKey, nodesMap)

if err = runner.Run(ctx.Context); err != nil {
return errors.Annotate(err, "create cluster")
return errors.Annotate(err, "add storage nodes")
}

log.Logger.Infof("Add storage nodes success")

return nil
}

func addClient(ctx *cli.Context) error {
return syncClient(ctx, true, false)
}

func deleteClient(ctx *cli.Context) error {
return syncClient(ctx, false, true)
}

func syncClient(ctx *cli.Context, allowAdd, allowDelete bool) error {
cfg, err := loadClusterConfig()
if err != nil {
return errors.Trace(err)
}

db, err := setupDB(cfg)
if err != nil {
return errors.Trace(err)
}

currentClientNodes := []string{}
err = db.Model(&model.FuseClient{}).
Joins("INNER JOIN nodes ON fuse_clients.node_id = nodes.id").
Pluck("nodes.name", &currentClientNodes).Error
if err != nil {
return errors.Trace(err)
}

desiredClientNodes := cfg.Services.Client.Nodes
currentSet := utils.NewSet(currentClientNodes...)
desiredSet := utils.NewSet(desiredClientNodes...)

nodesToAdd := []string{}
for _, node := range desiredClientNodes {
if !currentSet.Contains(node) {
nodesToAdd = append(nodesToAdd, node)
}
}

nodesToDelete := []string{}
for _, node := range currentClientNodes {
if !desiredSet.Contains(node) {
nodesToDelete = append(nodesToDelete, node)
}
}

hasNodesToAdd := len(nodesToAdd) > 0 && allowAdd
hasNodesToDelete := len(nodesToDelete) > 0 && allowDelete
if !hasNodesToAdd && !hasNodesToDelete {
log.Logger.Infof("No client nodes need to add or delete")
return nil
}

log.Logger.Infof("Current client nodes: %v", currentClientNodes)
if hasNodesToAdd {
log.Logger.Infof("Client nodes to add: %v", nodesToAdd)
}
if hasNodesToDelete {
log.Logger.Infof("Client nodes to delete: %v", nodesToDelete)
}

if !waitUserConfirm() {
log.Logger.Infof("Operation cancelled by user")
return nil
}

if err = syncNodeModels(cfg, db); err != nil {
return errors.Trace(err)
}

nodes := []*model.Node{}
err = db.Model(new(model.Node)).Find(&nodes).Error
if err != nil {
return errors.Trace(err)
}
nodesMap := make(map[string]*model.Node, len(nodes))
for _, node := range nodes {
nodesMap[node.Name] = node
}

if hasNodesToDelete {
log.Logger.Infof("Deleting client nodes: %v", nodesToDelete)

tempCfg := *cfg
tempCfg.Services.Client.Nodes = nodesToDelete

runner, err := task.NewRunner(&tempCfg,
&fsclient.Delete3FSClientServiceTask{
ClientNodes: nodesToDelete,
},
)
if err != nil {
return errors.Trace(err)
}
runner.Init()

if err = runner.Run(ctx.Context); err != nil {
return errors.Annotate(err, "delete client")
}

err = db.Transaction(func(tx *gorm.DB) error {
for _, nodeName := range nodesToDelete {
var node model.Node
if err := tx.Where("name = ?", nodeName).First(&node).Error; err != nil {
return errors.Trace(err)
}
if err := tx.Delete(&model.FuseClient{}, "node_id = ?", node.ID).Error; err != nil {
return errors.Trace(err)
}
}
return nil
})
if err != nil {
return errors.Annotate(err, "delete client records from database")
}

log.Logger.Infof("Delete client success")
}

if hasNodesToAdd {
log.Logger.Infof("Adding client nodes: %v", nodesToAdd)

runner, err := task.NewRunner(cfg,
&fsclient.Create3FSClientServiceTask{
ClientNodes: nodesToAdd,
DeleteContainerIfExists: true,
},
)
if err != nil {
return errors.Trace(err)
}
runner.Init()
runner.Runtime.Store(task.RuntimeDbKey, db)
runner.Runtime.Store(task.RuntimeNodesMapKey, nodesMap)

if err = runner.Run(ctx.Context); err != nil {
return errors.Annotate(err, "add client")
}

log.Logger.Infof("Add client success")
}

if err = syncClientModels(cfg, db); err != nil {
return errors.Trace(err)
}

return nil
}

func waitUserConfirm() bool {
fmt.Print("Do you want to continue with the operation? (Y/N): ")
reader := bufio.NewReader(os.Stdin)
input, err := reader.ReadString('\n')
if err != nil {
return false
}
return strings.ToUpper(strings.TrimSpace(input)) == "Y"
}
Loading