diff --git a/cmd/m3fs/cluster.go b/cmd/m3fs/cluster.go index c062a27..7211baf 100644 --- a/cmd/m3fs/cluster.go +++ b/cmd/m3fs/cluster.go @@ -15,6 +15,7 @@ package main import ( + "bufio" "encoding/json" "fmt" "os" @@ -41,6 +42,7 @@ import ( "github.com/open3fs/m3fs/pkg/pg/model" "github.com/open3fs/m3fs/pkg/storage" "github.com/open3fs/m3fs/pkg/task" + "github.com/open3fs/m3fs/pkg/utils" ) var clusterCmd = &cli.Command{ @@ -121,6 +123,46 @@ var clusterCmd = &cli.Command{ }, }, }, + { + Name: "add-client", + Usage: "Add 3fs cluster clients", + Action: addClient, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "config", + Aliases: []string{"c"}, + Usage: "Path to the cluster configuration file", + Destination: &configFilePath, + Required: true, + }, + &cli.StringFlag{ + Name: "workdir", + Aliases: []string{"w"}, + Usage: "Path to the working directory (default is current directory)", + Destination: &workDir, + }, + }, + }, + { + Name: "delete-client", + Usage: "Delete 3fs cluster clients", + Action: deleteClient, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "config", + Aliases: []string{"c"}, + Usage: "Path to the cluster configuration file", + Destination: &configFilePath, + Required: true, + }, + &cli.StringFlag{ + Name: "workdir", + Aliases: []string{"w"}, + Usage: "Path to the working directory (default is current directory)", + Destination: &workDir, + }, + }, + }, { Name: "prepare", Usage: "Prepare to deploy a 3fs cluster", @@ -201,7 +243,9 @@ func createCluster(ctx *cli.Context) error { BeginNodeID: 10001, }, new(mgmtd.InitUserAndChainTask), - new(fsclient.Create3FSClientServiceTask), + &fsclient.Create3FSClientServiceTask{ + ClientNodes: cfg.Services.Client.Nodes, + }, ) if err != nil { return errors.Trace(err) @@ -223,7 +267,9 @@ func deleteCluster(ctx *cli.Context) error { } runnerTasks := []task.Interface{ - new(fsclient.Delete3FSClientServiceTask), + &fsclient.Delete3FSClientServiceTask{ + ClientNodes: cfg.Services.Client.Nodes, + }, new(storage.DeleteStorageServiceTask), new(meta.DeleteMetaServiceTask), new(mgmtd.DeleteMgmtdServiceTask), @@ -350,6 +396,56 @@ func syncNodeModels(cfg *config.Config, db *gorm.DB) error { return nil } +func syncClientModels(cfg *config.Config, db *gorm.DB) error { + var nodes []model.Node + err := db.Model(new(model.Node)).Find(&nodes).Error + if err != nil { + return errors.Trace(err) + } + nodesMap := make(map[string]model.Node, len(nodes)) + for _, node := range nodes { + nodesMap[node.Name] = node + } + + var clients []model.FuseClient + err = db.Model(new(model.FuseClient)).Find(&clients).Error + if err != nil { + return errors.Trace(err) + } + clientsMap := make(map[uint]model.FuseClient, len(clients)) + for _, client := range clients { + clientsMap[client.NodeID] = client + } + + clientNodes := cfg.Services.Client.Nodes + err = db.Transaction(func(tx *gorm.DB) error { + for _, clientNode := range clientNodes { + node, ok := nodesMap[clientNode] + if !ok { + return errors.Errorf("client node %s not found", clientNode) + } + _, ok = clientsMap[node.ID] + if ok { + continue + } + client := model.FuseClient{ + Name: cfg.Services.Client.ContainerName, + NodeID: node.ID, + HostMountpoint: cfg.Services.Client.HostMountpoint, + } + if err = tx.Model(&client).Create(&client).Error; err != nil { + return errors.Trace(err) + } + } + return nil + }) + if err != nil { + return errors.Trace(err) + } + + return nil +} + func addStorageNodes(ctx *cli.Context) error { cfg, err := loadClusterConfig() if err != nil { @@ -402,6 +498,17 @@ func addStorageNodes(ctx *cli.Context) error { if len(newStorNodes) == 0 && changePlan == nil { return errors.New("No new storage nodes to add") } + + newStorNodeNames := make([]string, len(newStorNodes)) + for i, node := range newStorNodes { + newStorNodeNames[i] = node.Name + } + log.Logger.Infof("New storage nodes to add: %v", newStorNodeNames) + if !waitUserConfirm() { + log.Logger.Infof("Operation cancelled by user") + return nil + } + storCreated := false for _, step := range steps { if step.OperationType == model.ChangePlanStepOpType.CreateStorService && step.FinishAt != nil { @@ -448,10 +555,168 @@ func addStorageNodes(ctx *cli.Context) error { runner.Runtime.Store(task.RuntimeNodesMapKey, nodesMap) if err = runner.Run(ctx.Context); err != nil { - return errors.Annotate(err, "create cluster") + return errors.Annotate(err, "add storage nodes") } log.Logger.Infof("Add storage nodes success") return nil } + +func addClient(ctx *cli.Context) error { + return syncClient(ctx, true, false) +} + +func deleteClient(ctx *cli.Context) error { + return syncClient(ctx, false, true) +} + +func syncClient(ctx *cli.Context, allowAdd, allowDelete bool) error { + cfg, err := loadClusterConfig() + if err != nil { + return errors.Trace(err) + } + + db, err := setupDB(cfg) + if err != nil { + return errors.Trace(err) + } + + currentClientNodes := []string{} + err = db.Model(&model.FuseClient{}). + Joins("INNER JOIN nodes ON fuse_clients.node_id = nodes.id"). + Pluck("nodes.name", ¤tClientNodes).Error + if err != nil { + return errors.Trace(err) + } + + desiredClientNodes := cfg.Services.Client.Nodes + currentSet := utils.NewSet(currentClientNodes...) + desiredSet := utils.NewSet(desiredClientNodes...) + + nodesToAdd := []string{} + for _, node := range desiredClientNodes { + if !currentSet.Contains(node) { + nodesToAdd = append(nodesToAdd, node) + } + } + + nodesToDelete := []string{} + for _, node := range currentClientNodes { + if !desiredSet.Contains(node) { + nodesToDelete = append(nodesToDelete, node) + } + } + + hasNodesToAdd := len(nodesToAdd) > 0 && allowAdd + hasNodesToDelete := len(nodesToDelete) > 0 && allowDelete + if !hasNodesToAdd && !hasNodesToDelete { + log.Logger.Infof("No client nodes need to add or delete") + return nil + } + + log.Logger.Infof("Current client nodes: %v", currentClientNodes) + if hasNodesToAdd { + log.Logger.Infof("Client nodes to add: %v", nodesToAdd) + } + if hasNodesToDelete { + log.Logger.Infof("Client nodes to delete: %v", nodesToDelete) + } + + if !waitUserConfirm() { + log.Logger.Infof("Operation cancelled by user") + return nil + } + + if err = syncNodeModels(cfg, db); err != nil { + return errors.Trace(err) + } + + nodes := []*model.Node{} + err = db.Model(new(model.Node)).Find(&nodes).Error + if err != nil { + return errors.Trace(err) + } + nodesMap := make(map[string]*model.Node, len(nodes)) + for _, node := range nodes { + nodesMap[node.Name] = node + } + + if hasNodesToDelete { + log.Logger.Infof("Deleting client nodes: %v", nodesToDelete) + + tempCfg := *cfg + tempCfg.Services.Client.Nodes = nodesToDelete + + runner, err := task.NewRunner(&tempCfg, + &fsclient.Delete3FSClientServiceTask{ + ClientNodes: nodesToDelete, + }, + ) + if err != nil { + return errors.Trace(err) + } + runner.Init() + + if err = runner.Run(ctx.Context); err != nil { + return errors.Annotate(err, "delete client") + } + + err = db.Transaction(func(tx *gorm.DB) error { + for _, nodeName := range nodesToDelete { + var node model.Node + if err := tx.Where("name = ?", nodeName).First(&node).Error; err != nil { + return errors.Trace(err) + } + if err := tx.Delete(&model.FuseClient{}, "node_id = ?", node.ID).Error; err != nil { + return errors.Trace(err) + } + } + return nil + }) + if err != nil { + return errors.Annotate(err, "delete client records from database") + } + + log.Logger.Infof("Delete client success") + } + + if hasNodesToAdd { + log.Logger.Infof("Adding client nodes: %v", nodesToAdd) + + runner, err := task.NewRunner(cfg, + &fsclient.Create3FSClientServiceTask{ + ClientNodes: nodesToAdd, + DeleteContainerIfExists: true, + }, + ) + if err != nil { + return errors.Trace(err) + } + runner.Init() + runner.Runtime.Store(task.RuntimeDbKey, db) + runner.Runtime.Store(task.RuntimeNodesMapKey, nodesMap) + + if err = runner.Run(ctx.Context); err != nil { + return errors.Annotate(err, "add client") + } + + log.Logger.Infof("Add client success") + } + + if err = syncClientModels(cfg, db); err != nil { + return errors.Trace(err) + } + + return nil +} + +func waitUserConfirm() bool { + fmt.Print("Do you want to continue with the operation? (Y/N): ") + reader := bufio.NewReader(os.Stdin) + input, err := reader.ReadString('\n') + if err != nil { + return false + } + return strings.ToUpper(strings.TrimSpace(input)) == "Y" +} diff --git a/pkg/3fs_client/tasks.go b/pkg/3fs_client/tasks.go index 70d137c..2d252c4 100644 --- a/pkg/3fs_client/tasks.go +++ b/pkg/3fs_client/tasks.go @@ -63,15 +63,20 @@ func getServiceWorkDir(workDir string) string { // Create3FSClientServiceTask is a task for creating 3fs client services. type Create3FSClientServiceTask struct { task.BaseTask + + // ClientNodes is the nodes name of new client nodes + ClientNodes []string + // DeleteContainerIfExists is a flag to delete the container if it exists. + DeleteContainerIfExists bool } // Init initializes the task. func (t *Create3FSClientServiceTask) Init(r *task.Runtime, logger log.Interface) { t.BaseTask.SetName("Create3FSClientServiceTask") t.BaseTask.Init(r, logger) - nodes := make([]config.Node, len(r.Cfg.Services.Client.Nodes)) + nodes := make([]config.Node, len(t.ClientNodes)) client := r.Cfg.Services.Client - for i, node := range client.Nodes { + for i, node := range t.ClientNodes { nodes[i] = r.Nodes[node] } runContainerVolumes := []*external.VolumeArgs{} @@ -84,6 +89,10 @@ func (t *Create3FSClientServiceTask) Init(r *task.Runtime, logger log.Interface) } workDir := getServiceWorkDir(r.WorkDir) t.SetSteps([]task.StepConfig{ + { + Nodes: []config.Node{nodes[0]}, + NewStep: steps.NewGenAdminCliConfigStep(), + }, { Nodes: nodes, Parallel: true, @@ -126,6 +135,7 @@ func (t *Create3FSClientServiceTask) Init(r *task.Runtime, logger log.Interface) WorkDir: workDir, ExtraVolumes: runContainerVolumes, UseRdmaNetwork: true, + DeleteIfExists: t.DeleteContainerIfExists, ModelObjFunc: func(s *task.BaseStep) any { return &model.FuseClient{ Name: r.Services.Client.ContainerName, @@ -141,15 +151,17 @@ func (t *Create3FSClientServiceTask) Init(r *task.Runtime, logger log.Interface) // Delete3FSClientServiceTask is a task for deleting a 3fs client services. type Delete3FSClientServiceTask struct { task.BaseTask + + // ClientNodes is the nodes name of new client nodes + ClientNodes []string } // Init initializes the task. func (t *Delete3FSClientServiceTask) Init(r *task.Runtime, logger log.Interface) { t.BaseTask.SetName("Delete3FSClientServiceTask") t.BaseTask.Init(r, logger) - client := r.Services.Client - nodes := make([]config.Node, len(client.Nodes)) - for i, node := range client.Nodes { + nodes := make([]config.Node, len(t.ClientNodes)) + for i, node := range t.ClientNodes { nodes[i] = r.Nodes[node] } workDir := getServiceWorkDir(r.WorkDir) @@ -158,7 +170,7 @@ func (t *Delete3FSClientServiceTask) Init(r *task.Runtime, logger log.Interface) Nodes: nodes, Parallel: true, NewStep: steps.NewRm3FSContainerStepFunc( - client.ContainerName, + r.Services.Client.ContainerName, ServiceName, workDir), },