Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 157 additions & 34 deletions cmd/m3fs/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package main

import (
"bufio"
"encoding/json"
"fmt"
"os"
Expand Down Expand Up @@ -142,6 +143,26 @@ var clusterCmd = &cli.Command{
},
},
},
{
Name: "delete-client",
Usage: "Delete 3fs cluster clients",
Action: deleteClient,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "config",
Aliases: []string{"c"},
Usage: "Path to the cluster configuration file",
Destination: &configFilePath,
Required: true,
},
&cli.StringFlag{
Name: "workdir",
Aliases: []string{"w"},
Usage: "Path to the working directory (default is current directory)",
Destination: &workDir,
},
},
},
{
Name: "prepare",
Usage: "Prepare to deploy a 3fs cluster",
Expand Down Expand Up @@ -246,7 +267,9 @@ func deleteCluster(ctx *cli.Context) error {
}

runnerTasks := []task.Interface{
new(fsclient.Delete3FSClientServiceTask),
&fsclient.Delete3FSClientServiceTask{
ClientNodes: cfg.Services.Client.Nodes,
},
new(storage.DeleteStorageServiceTask),
new(meta.DeleteMetaServiceTask),
new(mgmtd.DeleteMgmtdServiceTask),
Expand Down Expand Up @@ -475,6 +498,17 @@ func addStorageNodes(ctx *cli.Context) error {
if len(newStorNodes) == 0 && changePlan == nil {
return errors.New("No new storage nodes to add")
}

newStorNodeNames := make([]string, len(newStorNodes))
for i, node := range newStorNodes {
newStorNodeNames[i] = node.Name
}
log.Logger.Infof("New storage nodes to add: %v", newStorNodeNames)
if !waitUserConfirm() {
log.Logger.Infof("Operation cancelled by user")
return nil
}

storCreated := false
for _, step := range steps {
if step.OperationType == model.ChangePlanStepOpType.CreateStorService && step.FinishAt != nil {
Expand Down Expand Up @@ -530,6 +564,14 @@ func addStorageNodes(ctx *cli.Context) error {
}

func addClient(ctx *cli.Context) error {
return syncClient(ctx, true, false)
}

func deleteClient(ctx *cli.Context) error {
return syncClient(ctx, false, true)
}

func syncClient(ctx *cli.Context, allowAdd, allowDelete bool) error {
cfg, err := loadClusterConfig()
if err != nil {
return errors.Trace(err)
Expand All @@ -540,60 +582,141 @@ func addClient(ctx *cli.Context) error {
return errors.Trace(err)
}

nodes := []*model.Node{}
err = db.Model(new(model.Node)).Find(&nodes).Error
currentClientNodes := []string{}
err = db.Model(&model.FuseClient{}).
Joins("INNER JOIN nodes ON fuse_clients.node_id = nodes.id").
Pluck("nodes.name", &currentClientNodes).Error
if err != nil {
return errors.Trace(err)
}
nodesMap := make(map[string]*model.Node, len(nodes))
for _, node := range nodes {
nodesMap[node.Name] = node
}

clientNodes := []string{}
err = db.Raw(
"SELECT nodes.name FROM fuse_clients INNER JOIN nodes ON fuse_clients.node_id = nodes.id",
).Scan(&clientNodes).Error
if err != nil {
return errors.Trace(err)
desiredClientNodes := cfg.Services.Client.Nodes
currentSet := utils.NewSet(currentClientNodes...)
desiredSet := utils.NewSet(desiredClientNodes...)

nodesToAdd := []string{}
for _, node := range desiredClientNodes {
if !currentSet.Contains(node) {
nodesToAdd = append(nodesToAdd, node)
}
}
clientNodeSet := utils.NewSet(clientNodes...)
newClientNodes := []string{}
for _, clientNode := range cfg.Services.Client.Nodes {
if !clientNodeSet.Contains(clientNode) {
newClientNodes = append(newClientNodes, clientNode)

nodesToDelete := []string{}
for _, node := range currentClientNodes {
if !desiredSet.Contains(node) {
nodesToDelete = append(nodesToDelete, node)
}
}

hasNodesToAdd := len(nodesToAdd) > 0 && allowAdd
hasNodesToDelete := len(nodesToDelete) > 0 && allowDelete
if !hasNodesToAdd && !hasNodesToDelete {
log.Logger.Infof("No client nodes need to add or delete")
return nil
}

log.Logger.Infof("Current client nodes: %v", currentClientNodes)
if hasNodesToAdd {
log.Logger.Infof("Client nodes to add: %v", nodesToAdd)
}
if hasNodesToDelete {
log.Logger.Infof("Client nodes to delete: %v", nodesToDelete)
}

if !waitUserConfirm() {
log.Logger.Infof("Operation cancelled by user")
return nil
}

if err = syncNodeModels(cfg, db); err != nil {
return errors.Trace(err)
}
if len(newClientNodes) == 0 {
return errors.New("No new client nodes to add")
}

runner, err := task.NewRunner(cfg,
&fsclient.Create3FSClientServiceTask{
ClientNodes: newClientNodes,
DeleteContainerIfExists: true,
},
)
nodes := []*model.Node{}
err = db.Model(new(model.Node)).Find(&nodes).Error
if err != nil {
return errors.Trace(err)
}
runner.Init()
runner.Runtime.Store(task.RuntimeDbKey, db)
runner.Runtime.Store(task.RuntimeNodesMapKey, nodesMap)
nodesMap := make(map[string]*model.Node, len(nodes))
for _, node := range nodes {
nodesMap[node.Name] = node
}

if err = runner.Run(ctx.Context); err != nil {
return errors.Annotate(err, "add client")
if hasNodesToDelete {
log.Logger.Infof("Deleting client nodes: %v", nodesToDelete)

tempCfg := *cfg
tempCfg.Services.Client.Nodes = nodesToDelete

runner, err := task.NewRunner(&tempCfg,
&fsclient.Delete3FSClientServiceTask{
ClientNodes: nodesToDelete,
},
)
if err != nil {
return errors.Trace(err)
}
runner.Init()

if err = runner.Run(ctx.Context); err != nil {
return errors.Annotate(err, "delete client")
}

err = db.Transaction(func(tx *gorm.DB) error {
for _, nodeName := range nodesToDelete {
var node model.Node
if err := tx.Where("name = ?", nodeName).First(&node).Error; err != nil {
return errors.Trace(err)
}
if err := tx.Delete(&model.FuseClient{}, "node_id = ?", node.ID).Error; err != nil {
return errors.Trace(err)
}
}
return nil
})
if err != nil {
return errors.Annotate(err, "delete client records from database")
}

log.Logger.Infof("Delete client success")
}

if hasNodesToAdd {
log.Logger.Infof("Adding client nodes: %v", nodesToAdd)

runner, err := task.NewRunner(cfg,
&fsclient.Create3FSClientServiceTask{
ClientNodes: nodesToAdd,
DeleteContainerIfExists: true,
},
)
if err != nil {
return errors.Trace(err)
}
runner.Init()
runner.Runtime.Store(task.RuntimeDbKey, db)
runner.Runtime.Store(task.RuntimeNodesMapKey, nodesMap)

if err = runner.Run(ctx.Context); err != nil {
return errors.Annotate(err, "add client")
}

log.Logger.Infof("Add client success")
}

if err = syncClientModels(cfg, db); err != nil {
return errors.Trace(err)
}

log.Logger.Infof("Add client success")

return nil
}

func waitUserConfirm() bool {
fmt.Print("Do you want to continue with the operation? (Y/N): ")
reader := bufio.NewReader(os.Stdin)
input, err := reader.ReadString('\n')
if err != nil {
return false
}
return strings.ToUpper(strings.TrimSpace(input)) == "Y"
}
10 changes: 6 additions & 4 deletions pkg/3fs_client/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,17 @@ func (t *Create3FSClientServiceTask) Init(r *task.Runtime, logger log.Interface)
// Delete3FSClientServiceTask is a task for deleting a 3fs client services.
type Delete3FSClientServiceTask struct {
task.BaseTask

// ClientNodes is the nodes name of new client nodes
ClientNodes []string
}

// Init initializes the task.
func (t *Delete3FSClientServiceTask) Init(r *task.Runtime, logger log.Interface) {
t.BaseTask.SetName("Delete3FSClientServiceTask")
t.BaseTask.Init(r, logger)
client := r.Services.Client
nodes := make([]config.Node, len(client.Nodes))
for i, node := range client.Nodes {
nodes := make([]config.Node, len(t.ClientNodes))
for i, node := range t.ClientNodes {
nodes[i] = r.Nodes[node]
}
workDir := getServiceWorkDir(r.WorkDir)
Expand All @@ -168,7 +170,7 @@ func (t *Delete3FSClientServiceTask) Init(r *task.Runtime, logger log.Interface)
Nodes: nodes,
Parallel: true,
NewStep: steps.NewRm3FSContainerStepFunc(
client.ContainerName,
r.Services.Client.ContainerName,
ServiceName,
workDir),
},
Expand Down