diff --git a/cmd/m3fs/cluster.go b/cmd/m3fs/cluster.go index ac25075..7211baf 100644 --- a/cmd/m3fs/cluster.go +++ b/cmd/m3fs/cluster.go @@ -15,6 +15,7 @@ package main import ( + "bufio" "encoding/json" "fmt" "os" @@ -142,6 +143,26 @@ var clusterCmd = &cli.Command{ }, }, }, + { + Name: "delete-client", + Usage: "Delete 3fs cluster clients", + Action: deleteClient, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "config", + Aliases: []string{"c"}, + Usage: "Path to the cluster configuration file", + Destination: &configFilePath, + Required: true, + }, + &cli.StringFlag{ + Name: "workdir", + Aliases: []string{"w"}, + Usage: "Path to the working directory (default is current directory)", + Destination: &workDir, + }, + }, + }, { Name: "prepare", Usage: "Prepare to deploy a 3fs cluster", @@ -246,7 +267,9 @@ func deleteCluster(ctx *cli.Context) error { } runnerTasks := []task.Interface{ - new(fsclient.Delete3FSClientServiceTask), + &fsclient.Delete3FSClientServiceTask{ + ClientNodes: cfg.Services.Client.Nodes, + }, new(storage.DeleteStorageServiceTask), new(meta.DeleteMetaServiceTask), new(mgmtd.DeleteMgmtdServiceTask), @@ -475,6 +498,17 @@ func addStorageNodes(ctx *cli.Context) error { if len(newStorNodes) == 0 && changePlan == nil { return errors.New("No new storage nodes to add") } + + newStorNodeNames := make([]string, len(newStorNodes)) + for i, node := range newStorNodes { + newStorNodeNames[i] = node.Name + } + log.Logger.Infof("New storage nodes to add: %v", newStorNodeNames) + if !waitUserConfirm() { + log.Logger.Infof("Operation cancelled by user") + return nil + } + storCreated := false for _, step := range steps { if step.OperationType == model.ChangePlanStepOpType.CreateStorService && step.FinishAt != nil { @@ -530,6 +564,14 @@ func addStorageNodes(ctx *cli.Context) error { } func addClient(ctx *cli.Context) error { + return syncClient(ctx, true, false) +} + +func deleteClient(ctx *cli.Context) error { + return syncClient(ctx, false, true) +} + +func syncClient(ctx *cli.Context, allowAdd, allowDelete bool) error { cfg, err := loadClusterConfig() if err != nil { return errors.Trace(err) @@ -540,60 +582,141 @@ func addClient(ctx *cli.Context) error { return errors.Trace(err) } - nodes := []*model.Node{} - err = db.Model(new(model.Node)).Find(&nodes).Error + currentClientNodes := []string{} + err = db.Model(&model.FuseClient{}). + Joins("INNER JOIN nodes ON fuse_clients.node_id = nodes.id"). + Pluck("nodes.name", ¤tClientNodes).Error if err != nil { return errors.Trace(err) } - nodesMap := make(map[string]*model.Node, len(nodes)) - for _, node := range nodes { - nodesMap[node.Name] = node - } - clientNodes := []string{} - err = db.Raw( - "SELECT nodes.name FROM fuse_clients INNER JOIN nodes ON fuse_clients.node_id = nodes.id", - ).Scan(&clientNodes).Error - if err != nil { - return errors.Trace(err) + desiredClientNodes := cfg.Services.Client.Nodes + currentSet := utils.NewSet(currentClientNodes...) + desiredSet := utils.NewSet(desiredClientNodes...) + + nodesToAdd := []string{} + for _, node := range desiredClientNodes { + if !currentSet.Contains(node) { + nodesToAdd = append(nodesToAdd, node) + } } - clientNodeSet := utils.NewSet(clientNodes...) - newClientNodes := []string{} - for _, clientNode := range cfg.Services.Client.Nodes { - if !clientNodeSet.Contains(clientNode) { - newClientNodes = append(newClientNodes, clientNode) + + nodesToDelete := []string{} + for _, node := range currentClientNodes { + if !desiredSet.Contains(node) { + nodesToDelete = append(nodesToDelete, node) } } + hasNodesToAdd := len(nodesToAdd) > 0 && allowAdd + hasNodesToDelete := len(nodesToDelete) > 0 && allowDelete + if !hasNodesToAdd && !hasNodesToDelete { + log.Logger.Infof("No client nodes need to add or delete") + return nil + } + + log.Logger.Infof("Current client nodes: %v", currentClientNodes) + if hasNodesToAdd { + log.Logger.Infof("Client nodes to add: %v", nodesToAdd) + } + if hasNodesToDelete { + log.Logger.Infof("Client nodes to delete: %v", nodesToDelete) + } + + if !waitUserConfirm() { + log.Logger.Infof("Operation cancelled by user") + return nil + } + if err = syncNodeModels(cfg, db); err != nil { return errors.Trace(err) } - if len(newClientNodes) == 0 { - return errors.New("No new client nodes to add") - } - runner, err := task.NewRunner(cfg, - &fsclient.Create3FSClientServiceTask{ - ClientNodes: newClientNodes, - DeleteContainerIfExists: true, - }, - ) + nodes := []*model.Node{} + err = db.Model(new(model.Node)).Find(&nodes).Error if err != nil { return errors.Trace(err) } - runner.Init() - runner.Runtime.Store(task.RuntimeDbKey, db) - runner.Runtime.Store(task.RuntimeNodesMapKey, nodesMap) + nodesMap := make(map[string]*model.Node, len(nodes)) + for _, node := range nodes { + nodesMap[node.Name] = node + } - if err = runner.Run(ctx.Context); err != nil { - return errors.Annotate(err, "add client") + if hasNodesToDelete { + log.Logger.Infof("Deleting client nodes: %v", nodesToDelete) + + tempCfg := *cfg + tempCfg.Services.Client.Nodes = nodesToDelete + + runner, err := task.NewRunner(&tempCfg, + &fsclient.Delete3FSClientServiceTask{ + ClientNodes: nodesToDelete, + }, + ) + if err != nil { + return errors.Trace(err) + } + runner.Init() + + if err = runner.Run(ctx.Context); err != nil { + return errors.Annotate(err, "delete client") + } + + err = db.Transaction(func(tx *gorm.DB) error { + for _, nodeName := range nodesToDelete { + var node model.Node + if err := tx.Where("name = ?", nodeName).First(&node).Error; err != nil { + return errors.Trace(err) + } + if err := tx.Delete(&model.FuseClient{}, "node_id = ?", node.ID).Error; err != nil { + return errors.Trace(err) + } + } + return nil + }) + if err != nil { + return errors.Annotate(err, "delete client records from database") + } + + log.Logger.Infof("Delete client success") + } + + if hasNodesToAdd { + log.Logger.Infof("Adding client nodes: %v", nodesToAdd) + + runner, err := task.NewRunner(cfg, + &fsclient.Create3FSClientServiceTask{ + ClientNodes: nodesToAdd, + DeleteContainerIfExists: true, + }, + ) + if err != nil { + return errors.Trace(err) + } + runner.Init() + runner.Runtime.Store(task.RuntimeDbKey, db) + runner.Runtime.Store(task.RuntimeNodesMapKey, nodesMap) + + if err = runner.Run(ctx.Context); err != nil { + return errors.Annotate(err, "add client") + } + + log.Logger.Infof("Add client success") } if err = syncClientModels(cfg, db); err != nil { return errors.Trace(err) } - log.Logger.Infof("Add client success") - return nil } + +func waitUserConfirm() bool { + fmt.Print("Do you want to continue with the operation? (Y/N): ") + reader := bufio.NewReader(os.Stdin) + input, err := reader.ReadString('\n') + if err != nil { + return false + } + return strings.ToUpper(strings.TrimSpace(input)) == "Y" +} diff --git a/pkg/3fs_client/tasks.go b/pkg/3fs_client/tasks.go index fb2dc2b..2d252c4 100644 --- a/pkg/3fs_client/tasks.go +++ b/pkg/3fs_client/tasks.go @@ -151,15 +151,17 @@ func (t *Create3FSClientServiceTask) Init(r *task.Runtime, logger log.Interface) // Delete3FSClientServiceTask is a task for deleting a 3fs client services. type Delete3FSClientServiceTask struct { task.BaseTask + + // ClientNodes is the nodes name of new client nodes + ClientNodes []string } // Init initializes the task. func (t *Delete3FSClientServiceTask) Init(r *task.Runtime, logger log.Interface) { t.BaseTask.SetName("Delete3FSClientServiceTask") t.BaseTask.Init(r, logger) - client := r.Services.Client - nodes := make([]config.Node, len(client.Nodes)) - for i, node := range client.Nodes { + nodes := make([]config.Node, len(t.ClientNodes)) + for i, node := range t.ClientNodes { nodes[i] = r.Nodes[node] } workDir := getServiceWorkDir(r.WorkDir) @@ -168,7 +170,7 @@ func (t *Delete3FSClientServiceTask) Init(r *task.Runtime, logger log.Interface) Nodes: nodes, Parallel: true, NewStep: steps.NewRm3FSContainerStepFunc( - client.ContainerName, + r.Services.Client.ContainerName, ServiceName, workDir), },