From 05510e019dd52a4d4bfbc163b1b2c29aa991bfaa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?x=C2=B3u=C2=B3?= Date: Thu, 17 Jul 2025 18:16:55 +0800 Subject: [PATCH 1/2] support to add new client (#145) Signed-off-by: xxxuuu --- cmd/m3fs/cluster.go | 146 +++++++++++++++++++++++++++++++++++++++- pkg/3fs_client/tasks.go | 14 +++- 2 files changed, 156 insertions(+), 4 deletions(-) diff --git a/cmd/m3fs/cluster.go b/cmd/m3fs/cluster.go index c062a27..ac25075 100644 --- a/cmd/m3fs/cluster.go +++ b/cmd/m3fs/cluster.go @@ -41,6 +41,7 @@ import ( "github.com/open3fs/m3fs/pkg/pg/model" "github.com/open3fs/m3fs/pkg/storage" "github.com/open3fs/m3fs/pkg/task" + "github.com/open3fs/m3fs/pkg/utils" ) var clusterCmd = &cli.Command{ @@ -121,6 +122,26 @@ var clusterCmd = &cli.Command{ }, }, }, + { + Name: "add-client", + Usage: "Add 3fs cluster clients", + Action: addClient, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "config", + Aliases: []string{"c"}, + Usage: "Path to the cluster configuration file", + Destination: &configFilePath, + Required: true, + }, + &cli.StringFlag{ + Name: "workdir", + Aliases: []string{"w"}, + Usage: "Path to the working directory (default is current directory)", + Destination: &workDir, + }, + }, + }, { Name: "prepare", Usage: "Prepare to deploy a 3fs cluster", @@ -201,7 +222,9 @@ func createCluster(ctx *cli.Context) error { BeginNodeID: 10001, }, new(mgmtd.InitUserAndChainTask), - new(fsclient.Create3FSClientServiceTask), + &fsclient.Create3FSClientServiceTask{ + ClientNodes: cfg.Services.Client.Nodes, + }, ) if err != nil { return errors.Trace(err) @@ -350,6 +373,56 @@ func syncNodeModels(cfg *config.Config, db *gorm.DB) error { return nil } +func syncClientModels(cfg *config.Config, db *gorm.DB) error { + var nodes []model.Node + err := db.Model(new(model.Node)).Find(&nodes).Error + if err != nil { + return errors.Trace(err) + } + nodesMap := make(map[string]model.Node, len(nodes)) + for _, node := range nodes { + nodesMap[node.Name] = node + } + + var clients []model.FuseClient + err = db.Model(new(model.FuseClient)).Find(&clients).Error + if err != nil { + return errors.Trace(err) + } + clientsMap := make(map[uint]model.FuseClient, len(clients)) + for _, client := range clients { + clientsMap[client.NodeID] = client + } + + clientNodes := cfg.Services.Client.Nodes + err = db.Transaction(func(tx *gorm.DB) error { + for _, clientNode := range clientNodes { + node, ok := nodesMap[clientNode] + if !ok { + return errors.Errorf("client node %s not found", clientNode) + } + _, ok = clientsMap[node.ID] + if ok { + continue + } + client := model.FuseClient{ + Name: cfg.Services.Client.ContainerName, + NodeID: node.ID, + HostMountpoint: cfg.Services.Client.HostMountpoint, + } + if err = tx.Model(&client).Create(&client).Error; err != nil { + return errors.Trace(err) + } + } + return nil + }) + if err != nil { + return errors.Trace(err) + } + + return nil +} + func addStorageNodes(ctx *cli.Context) error { cfg, err := loadClusterConfig() if err != nil { @@ -448,10 +521,79 @@ func addStorageNodes(ctx *cli.Context) error { runner.Runtime.Store(task.RuntimeNodesMapKey, nodesMap) if err = runner.Run(ctx.Context); err != nil { - return errors.Annotate(err, "create cluster") + return errors.Annotate(err, "add storage nodes") } log.Logger.Infof("Add storage nodes success") return nil } + +func addClient(ctx *cli.Context) error { + cfg, err := loadClusterConfig() + if err != nil { + return errors.Trace(err) + } + + db, err := setupDB(cfg) + if err != nil { + return errors.Trace(err) + } + + nodes := []*model.Node{} + err = db.Model(new(model.Node)).Find(&nodes).Error + if err != nil { + return errors.Trace(err) + } + nodesMap := make(map[string]*model.Node, len(nodes)) + for _, node := range nodes { + nodesMap[node.Name] = node + } + + clientNodes := []string{} + err = db.Raw( + "SELECT nodes.name FROM fuse_clients INNER JOIN nodes ON fuse_clients.node_id = nodes.id", + ).Scan(&clientNodes).Error + if err != nil { + return errors.Trace(err) + } + clientNodeSet := utils.NewSet(clientNodes...) + newClientNodes := []string{} + for _, clientNode := range cfg.Services.Client.Nodes { + if !clientNodeSet.Contains(clientNode) { + newClientNodes = append(newClientNodes, clientNode) + } + } + + if err = syncNodeModels(cfg, db); err != nil { + return errors.Trace(err) + } + if len(newClientNodes) == 0 { + return errors.New("No new client nodes to add") + } + + runner, err := task.NewRunner(cfg, + &fsclient.Create3FSClientServiceTask{ + ClientNodes: newClientNodes, + DeleteContainerIfExists: true, + }, + ) + if err != nil { + return errors.Trace(err) + } + runner.Init() + runner.Runtime.Store(task.RuntimeDbKey, db) + runner.Runtime.Store(task.RuntimeNodesMapKey, nodesMap) + + if err = runner.Run(ctx.Context); err != nil { + return errors.Annotate(err, "add client") + } + + if err = syncClientModels(cfg, db); err != nil { + return errors.Trace(err) + } + + log.Logger.Infof("Add client success") + + return nil +} diff --git a/pkg/3fs_client/tasks.go b/pkg/3fs_client/tasks.go index 70d137c..fb2dc2b 100644 --- a/pkg/3fs_client/tasks.go +++ b/pkg/3fs_client/tasks.go @@ -63,15 +63,20 @@ func getServiceWorkDir(workDir string) string { // Create3FSClientServiceTask is a task for creating 3fs client services. type Create3FSClientServiceTask struct { task.BaseTask + + // ClientNodes is the nodes name of new client nodes + ClientNodes []string + // DeleteContainerIfExists is a flag to delete the container if it exists. + DeleteContainerIfExists bool } // Init initializes the task. func (t *Create3FSClientServiceTask) Init(r *task.Runtime, logger log.Interface) { t.BaseTask.SetName("Create3FSClientServiceTask") t.BaseTask.Init(r, logger) - nodes := make([]config.Node, len(r.Cfg.Services.Client.Nodes)) + nodes := make([]config.Node, len(t.ClientNodes)) client := r.Cfg.Services.Client - for i, node := range client.Nodes { + for i, node := range t.ClientNodes { nodes[i] = r.Nodes[node] } runContainerVolumes := []*external.VolumeArgs{} @@ -84,6 +89,10 @@ func (t *Create3FSClientServiceTask) Init(r *task.Runtime, logger log.Interface) } workDir := getServiceWorkDir(r.WorkDir) t.SetSteps([]task.StepConfig{ + { + Nodes: []config.Node{nodes[0]}, + NewStep: steps.NewGenAdminCliConfigStep(), + }, { Nodes: nodes, Parallel: true, @@ -126,6 +135,7 @@ func (t *Create3FSClientServiceTask) Init(r *task.Runtime, logger log.Interface) WorkDir: workDir, ExtraVolumes: runContainerVolumes, UseRdmaNetwork: true, + DeleteIfExists: t.DeleteContainerIfExists, ModelObjFunc: func(s *task.BaseStep) any { return &model.FuseClient{ Name: r.Services.Client.ContainerName, From 7dd35f6b0a75e67524f22d1218b216e88b278941 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?x=C2=B3u=C2=B3?= Date: Fri, 18 Jul 2025 17:46:47 +0800 Subject: [PATCH 2/2] support to delete client (#149) Signed-off-by: xxxuuu --- cmd/m3fs/cluster.go | 191 +++++++++++++++++++++++++++++++++------- pkg/3fs_client/tasks.go | 10 ++- 2 files changed, 163 insertions(+), 38 deletions(-) diff --git a/cmd/m3fs/cluster.go b/cmd/m3fs/cluster.go index ac25075..7211baf 100644 --- a/cmd/m3fs/cluster.go +++ b/cmd/m3fs/cluster.go @@ -15,6 +15,7 @@ package main import ( + "bufio" "encoding/json" "fmt" "os" @@ -142,6 +143,26 @@ var clusterCmd = &cli.Command{ }, }, }, + { + Name: "delete-client", + Usage: "Delete 3fs cluster clients", + Action: deleteClient, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "config", + Aliases: []string{"c"}, + Usage: "Path to the cluster configuration file", + Destination: &configFilePath, + Required: true, + }, + &cli.StringFlag{ + Name: "workdir", + Aliases: []string{"w"}, + Usage: "Path to the working directory (default is current directory)", + Destination: &workDir, + }, + }, + }, { Name: "prepare", Usage: "Prepare to deploy a 3fs cluster", @@ -246,7 +267,9 @@ func deleteCluster(ctx *cli.Context) error { } runnerTasks := []task.Interface{ - new(fsclient.Delete3FSClientServiceTask), + &fsclient.Delete3FSClientServiceTask{ + ClientNodes: cfg.Services.Client.Nodes, + }, new(storage.DeleteStorageServiceTask), new(meta.DeleteMetaServiceTask), new(mgmtd.DeleteMgmtdServiceTask), @@ -475,6 +498,17 @@ func addStorageNodes(ctx *cli.Context) error { if len(newStorNodes) == 0 && changePlan == nil { return errors.New("No new storage nodes to add") } + + newStorNodeNames := make([]string, len(newStorNodes)) + for i, node := range newStorNodes { + newStorNodeNames[i] = node.Name + } + log.Logger.Infof("New storage nodes to add: %v", newStorNodeNames) + if !waitUserConfirm() { + log.Logger.Infof("Operation cancelled by user") + return nil + } + storCreated := false for _, step := range steps { if step.OperationType == model.ChangePlanStepOpType.CreateStorService && step.FinishAt != nil { @@ -530,6 +564,14 @@ func addStorageNodes(ctx *cli.Context) error { } func addClient(ctx *cli.Context) error { + return syncClient(ctx, true, false) +} + +func deleteClient(ctx *cli.Context) error { + return syncClient(ctx, false, true) +} + +func syncClient(ctx *cli.Context, allowAdd, allowDelete bool) error { cfg, err := loadClusterConfig() if err != nil { return errors.Trace(err) @@ -540,60 +582,141 @@ func addClient(ctx *cli.Context) error { return errors.Trace(err) } - nodes := []*model.Node{} - err = db.Model(new(model.Node)).Find(&nodes).Error + currentClientNodes := []string{} + err = db.Model(&model.FuseClient{}). + Joins("INNER JOIN nodes ON fuse_clients.node_id = nodes.id"). + Pluck("nodes.name", ¤tClientNodes).Error if err != nil { return errors.Trace(err) } - nodesMap := make(map[string]*model.Node, len(nodes)) - for _, node := range nodes { - nodesMap[node.Name] = node - } - clientNodes := []string{} - err = db.Raw( - "SELECT nodes.name FROM fuse_clients INNER JOIN nodes ON fuse_clients.node_id = nodes.id", - ).Scan(&clientNodes).Error - if err != nil { - return errors.Trace(err) + desiredClientNodes := cfg.Services.Client.Nodes + currentSet := utils.NewSet(currentClientNodes...) + desiredSet := utils.NewSet(desiredClientNodes...) + + nodesToAdd := []string{} + for _, node := range desiredClientNodes { + if !currentSet.Contains(node) { + nodesToAdd = append(nodesToAdd, node) + } } - clientNodeSet := utils.NewSet(clientNodes...) - newClientNodes := []string{} - for _, clientNode := range cfg.Services.Client.Nodes { - if !clientNodeSet.Contains(clientNode) { - newClientNodes = append(newClientNodes, clientNode) + + nodesToDelete := []string{} + for _, node := range currentClientNodes { + if !desiredSet.Contains(node) { + nodesToDelete = append(nodesToDelete, node) } } + hasNodesToAdd := len(nodesToAdd) > 0 && allowAdd + hasNodesToDelete := len(nodesToDelete) > 0 && allowDelete + if !hasNodesToAdd && !hasNodesToDelete { + log.Logger.Infof("No client nodes need to add or delete") + return nil + } + + log.Logger.Infof("Current client nodes: %v", currentClientNodes) + if hasNodesToAdd { + log.Logger.Infof("Client nodes to add: %v", nodesToAdd) + } + if hasNodesToDelete { + log.Logger.Infof("Client nodes to delete: %v", nodesToDelete) + } + + if !waitUserConfirm() { + log.Logger.Infof("Operation cancelled by user") + return nil + } + if err = syncNodeModels(cfg, db); err != nil { return errors.Trace(err) } - if len(newClientNodes) == 0 { - return errors.New("No new client nodes to add") - } - runner, err := task.NewRunner(cfg, - &fsclient.Create3FSClientServiceTask{ - ClientNodes: newClientNodes, - DeleteContainerIfExists: true, - }, - ) + nodes := []*model.Node{} + err = db.Model(new(model.Node)).Find(&nodes).Error if err != nil { return errors.Trace(err) } - runner.Init() - runner.Runtime.Store(task.RuntimeDbKey, db) - runner.Runtime.Store(task.RuntimeNodesMapKey, nodesMap) + nodesMap := make(map[string]*model.Node, len(nodes)) + for _, node := range nodes { + nodesMap[node.Name] = node + } - if err = runner.Run(ctx.Context); err != nil { - return errors.Annotate(err, "add client") + if hasNodesToDelete { + log.Logger.Infof("Deleting client nodes: %v", nodesToDelete) + + tempCfg := *cfg + tempCfg.Services.Client.Nodes = nodesToDelete + + runner, err := task.NewRunner(&tempCfg, + &fsclient.Delete3FSClientServiceTask{ + ClientNodes: nodesToDelete, + }, + ) + if err != nil { + return errors.Trace(err) + } + runner.Init() + + if err = runner.Run(ctx.Context); err != nil { + return errors.Annotate(err, "delete client") + } + + err = db.Transaction(func(tx *gorm.DB) error { + for _, nodeName := range nodesToDelete { + var node model.Node + if err := tx.Where("name = ?", nodeName).First(&node).Error; err != nil { + return errors.Trace(err) + } + if err := tx.Delete(&model.FuseClient{}, "node_id = ?", node.ID).Error; err != nil { + return errors.Trace(err) + } + } + return nil + }) + if err != nil { + return errors.Annotate(err, "delete client records from database") + } + + log.Logger.Infof("Delete client success") + } + + if hasNodesToAdd { + log.Logger.Infof("Adding client nodes: %v", nodesToAdd) + + runner, err := task.NewRunner(cfg, + &fsclient.Create3FSClientServiceTask{ + ClientNodes: nodesToAdd, + DeleteContainerIfExists: true, + }, + ) + if err != nil { + return errors.Trace(err) + } + runner.Init() + runner.Runtime.Store(task.RuntimeDbKey, db) + runner.Runtime.Store(task.RuntimeNodesMapKey, nodesMap) + + if err = runner.Run(ctx.Context); err != nil { + return errors.Annotate(err, "add client") + } + + log.Logger.Infof("Add client success") } if err = syncClientModels(cfg, db); err != nil { return errors.Trace(err) } - log.Logger.Infof("Add client success") - return nil } + +func waitUserConfirm() bool { + fmt.Print("Do you want to continue with the operation? (Y/N): ") + reader := bufio.NewReader(os.Stdin) + input, err := reader.ReadString('\n') + if err != nil { + return false + } + return strings.ToUpper(strings.TrimSpace(input)) == "Y" +} diff --git a/pkg/3fs_client/tasks.go b/pkg/3fs_client/tasks.go index fb2dc2b..2d252c4 100644 --- a/pkg/3fs_client/tasks.go +++ b/pkg/3fs_client/tasks.go @@ -151,15 +151,17 @@ func (t *Create3FSClientServiceTask) Init(r *task.Runtime, logger log.Interface) // Delete3FSClientServiceTask is a task for deleting a 3fs client services. type Delete3FSClientServiceTask struct { task.BaseTask + + // ClientNodes is the nodes name of new client nodes + ClientNodes []string } // Init initializes the task. func (t *Delete3FSClientServiceTask) Init(r *task.Runtime, logger log.Interface) { t.BaseTask.SetName("Delete3FSClientServiceTask") t.BaseTask.Init(r, logger) - client := r.Services.Client - nodes := make([]config.Node, len(client.Nodes)) - for i, node := range client.Nodes { + nodes := make([]config.Node, len(t.ClientNodes)) + for i, node := range t.ClientNodes { nodes[i] = r.Nodes[node] } workDir := getServiceWorkDir(r.WorkDir) @@ -168,7 +170,7 @@ func (t *Delete3FSClientServiceTask) Init(r *task.Runtime, logger log.Interface) Nodes: nodes, Parallel: true, NewStep: steps.NewRm3FSContainerStepFunc( - client.ContainerName, + r.Services.Client.ContainerName, ServiceName, workDir), },