diff --git a/cmd/client/main.go b/cmd/client/main.go index c16c88e..1eb1cf2 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -295,6 +295,33 @@ func deleteFunc(con net.Conn, instanceCnf *config.Instance, args []string) error return nil } +func delete2Func(con net.Conn, instanceCnf *config.Instance, args []string) error { + ylogger.Zero.Info().Msg("Execute delete2 command") + + ylogger.Zero.Info().Str("name", args[0]).Msg("delete2") + msg := message.NewDelete2Message(args[0], confirm, garbage).Encode() + _, err := con.Write(msg) + if err != nil { + return err + } + + ylogger.Zero.Debug().Bytes("msg", msg).Msg("constructed delete2 msg") + + client := client.NewYClient(con) + protoReader := proc.NewProtoReader(client) + + ansType, body, err := protoReader.ReadPacket() + if err != nil { + ylogger.Zero.Debug().Err(err).Msg("error while receiving answer") + return err + } + + if ansType != message.MessageTypeReadyForQuery { + return fmt.Errorf("failed to delete2, msg: %v", body) + } + + return nil +} func untrashifyFunc(con net.Conn, instanceCnf *config.Instance, args []string) error { ylogger.Zero.Info().Msg("Execute untrashify command") @@ -408,6 +435,13 @@ var goolCmd = &cobra.Command{ RunE: Runner(goolFunc), } +var delete2Cmd = &cobra.Command{ + Use: "deleteTrash", + Short: "deleteTrash", + RunE: Runner(delete2Func), + Args: cobra.ExactArgs(1), // name_prefix +} + func init() { rootCmd.PersistentFlags().StringVarP(&cfgPath, "config", "c", "/etc/yproxy/yproxy.yaml", "path to yproxy config file") rootCmd.PersistentFlags().StringVarP(&logLevel, "log-level", "l", "", "log level") @@ -446,6 +480,10 @@ func init() { untrashifyCmd.PersistentFlags().Uint64VarP(&segmentNum, "segnum", "s", 0, "logical number of a segment") untrashifyCmd.PersistentFlags().BoolVarP(&confirm, "confirm", "", false, "confirm deletion") rootCmd.AddCommand(untrashifyCmd) + + delete2Cmd.PersistentFlags().BoolVarP(&confirm, "confirm", "", false, "confirm deletion") + delete2Cmd.PersistentFlags().BoolVarP(&garbage, "garbage", "g", false, "delete garbage") + rootCmd.AddCommand(delete2Cmd) } func main() { diff --git a/pkg/message/delete2_message.go b/pkg/message/delete2_message.go new file mode 100644 index 0000000..c938d03 --- /dev/null +++ b/pkg/message/delete2_message.go @@ -0,0 +1,55 @@ +package message + +import ( + "encoding/binary" +) + +type Delete2Message struct { //seg port + Prefix string + Confirm bool + Garbage bool +} + +var _ ProtoMessage = &Delete2Message{} + +func NewDelete2Message(prefix string, confirm bool, garbage bool) *Delete2Message { + return &Delete2Message{ + Prefix: prefix, + Confirm: confirm, + Garbage: garbage, + } +} + +func (c *Delete2Message) Encode() []byte { + bt := []byte{ + byte(MessageTypeDelete2), + 0, + 0, + 0, + } + + if c.Confirm { + bt[1] = 1 + } + if c.Garbage { + bt[2] = 1 + } + + bt = append(bt, []byte(c.Prefix)...) + bt = append(bt, 0) + + ln := len(bt) + 8 + bs := make([]byte, 8) + binary.BigEndian.PutUint64(bs, uint64(ln)) + return append(bs, bt...) +} + +func (c *Delete2Message) Decode(body []byte) { + if body[1] == 1 { + c.Confirm = true + } + if body[2] == 1 { + c.Garbage = true + } + c.Prefix, _ = GetCstring(body[4:]) +} diff --git a/pkg/message/message.go b/pkg/message/message.go index 115396c..3b08c75 100644 --- a/pkg/message/message.go +++ b/pkg/message/message.go @@ -32,6 +32,7 @@ const ( MessageTypeCopyV2 MessageTypeCopyComplete MessageTypeListV2 + MessageTypeDelete2 MessageCollectObsolete = MessageType(64) MessageDeleteObsolete = MessageType(65) @@ -92,6 +93,8 @@ func (m MessageType) String() string { return "DELETE OBSOLETE" case MessageTypeCopyComplete: return "COPY COMPLETE" + case MessageTypeDelete2: + return "DELETE2" } return "UNKNOWN" } diff --git a/pkg/object/objectInfo.go b/pkg/object/objectInfo.go index ae7f549..99ff5d3 100644 --- a/pkg/object/objectInfo.go +++ b/pkg/object/objectInfo.go @@ -1,6 +1,9 @@ package object +import "time" + type ObjectInfo struct { - Path string - Size int64 + Path string + Size int64 + LastMod time.Time } diff --git a/pkg/proc/delete_handler.go b/pkg/proc/delete_handler.go index a3af89d..c2122ad 100644 --- a/pkg/proc/delete_handler.go +++ b/pkg/proc/delete_handler.go @@ -4,12 +4,14 @@ import ( "fmt" "path" "strings" + "time" "github.com/pkg/errors" "github.com/yezzey-gp/yproxy/config" "github.com/yezzey-gp/yproxy/pkg/backups" "github.com/yezzey-gp/yproxy/pkg/database" "github.com/yezzey-gp/yproxy/pkg/message" + "github.com/yezzey-gp/yproxy/pkg/object" "github.com/yezzey-gp/yproxy/pkg/storage" "github.com/yezzey-gp/yproxy/pkg/ylogger" ) @@ -155,7 +157,112 @@ func (dh *BasicGarbageMgr) HandleDeleteGarbage(msg message.DeleteMessage) error } return nil } +func (dh *BasicGarbageMgr) ListDelete2Files(bucket string, msg message.Delete2Message) ([]*object.ObjectInfo, error) { + //get first backup lsn + var err error + + //list files in storage + ylogger.Zero.Info().Str("path", msg.Prefix).Msg("listing prefix") + objectMetas, err := dh.StorageInterractor.ListBucketPath(bucket, msg.Prefix, true) + if err != nil { + return nil, errors.Wrap(err, "could not list objects") + } + ylogger.Zero.Info().Int("amount", len(objectMetas)).Msg("objects count") + + filesToDelete := make([]*object.ObjectInfo, 0) + for i := range objectMetas { + filename := objectMetas[i].Path + ylogger.Zero.Debug().Str("name", filename).Msg("lookup chunk") + if !strings.HasPrefix(filename, msg.Prefix) { + continue + } + ylogger.Zero.Debug().Str("file", objectMetas[i].Path). + Str("has prefix so will be deleted", msg.Prefix) + + filesToDelete = append(filesToDelete, objectMetas[i]) + + } + + ylogger.Zero.Info().Int("amount", len(filesToDelete)).Msg("files will be deleted") + + return filesToDelete, nil +} +func (dh *BasicGarbageMgr) DeletePrefixInBucket(bucket string, msg message.Delete2Message) error { + fileList, err := dh.ListDelete2Files(bucket, msg) + if err != nil { + return errors.Wrap(err, "failed to delete file") + } + uploads, err := dh.StorageInterractor.ListFailedMultipartUploads(bucket) + if err != nil { + return err + } + ylogger.Zero.Info().Str("bucket", bucket).Int("amount", len(uploads)).Msg("multipart uploads will be aborted") + + for _, file := range fileList { + ylogger.Zero.Info().Str("bucket", bucket).Str("file", file.Path).Msg("file will be deleted") + } + for _, upload := range uploads { + ylogger.Zero.Info().Str("bucket", bucket).Str("uploadId", upload).Msg("upload will be aborted") + } + + if !msg.Confirm { //do not delete files if no confirmation flag provided + ylogger.Zero.Info().Msg("do not perform actual delete files as no confirmation flag provided") + return nil + } + if !msg.Garbage { + ylogger.Zero.Info().Msg("delete any files blocked now") + msg.Garbage = true + } + if msg.Garbage && !strings.Contains(msg.Prefix, "trash") { + ylogger.Zero.Info().Msg("prefix doesnt contain trash aborted") + return nil + } + var failed []*object.ObjectInfo + retryCount := 0 + for len(fileList) > 0 && retryCount < 10 { + retryCount++ + for i := 0; i < len(fileList); i++ { + if !msg.Garbage { + ylogger.Zero.Info().Str("bucket", bucket).Str("path", fileList[i].Path).Msg("simply delete without any 'plan B' (do nothing)") + + } else if strings.Contains(fileList[i].Path, "trash") && fileList[i].LastMod.Add(time.Hour*24*7).Unix() < time.Now().Unix() { + ylogger.Zero.Info().Str("bucket", bucket).Str("path", fileList[i].Path).Msg("simply delete without any 'plan B'") + err = dh.StorageInterractor.DeleteObject(bucket, fileList[i].Path) + + } + if err != nil { + ylogger.Zero.Warn().AnErr("err", err).Str("bucket", bucket).Str("file", fileList[i].Path).Msg("failed to delete file") + failed = append(failed, fileList[i]) + } + } + fileList = failed + failed = make([]*object.ObjectInfo, 0) + } + + if len(fileList) > 0 { + ylogger.Zero.Error().Str("bucket", bucket).Int("failed files count", len(fileList)).Msg("some files were not moved") + ylogger.Zero.Error().Str("bucket", bucket).Any("failed files", fileList).Msg("failed to move some files") + return errors.Wrap(err, "failed to move some files") + } + + for key, uploadId := range uploads { + if err := dh.StorageInterractor.AbortMultipartUpload(bucket, key, uploadId); err != nil { + return err + } + } + + return nil +} + +func (dh *BasicGarbageMgr) HandleDelete2Prefix(msg message.Delete2Message) error { + for _, b := range dh.StorageInterractor.ListBuckets() { + if err := dh.DeletePrefixInBucket(b, msg); err != nil { + return err + } + } + return nil +} func (dh *BasicGarbageMgr) HandleDeleteFile(msg message.DeleteMessage) error { if !msg.Confirm { return nil diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index a08f092..0ca13e5 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -481,6 +481,49 @@ func ProcessDeleteExtended(msg message.DeleteMessage, s storage.StorageInteracto return nil } +func ProcessDelete2Extended(msg message.Delete2Message, s storage.StorageInteractor, bs storage.StorageInteractor, ycl client.YproxyClient, cnf *config.Vacuum) error { + ycl.SetExternalFilePath(msg.Prefix) + + dbInterractor := &database.DatabaseHandler{} + backupHandler := &backups.StorageBackupInteractor{Storage: bs} + + var dh = &BasicGarbageMgr{ + StorageInterractor: s, + DbInterractor: dbInterractor, + BackupInterractor: backupHandler, + Cnf: cnf, + } + + if msg.Garbage { + ylogger.Zero.Debug(). + Str("Prefix", msg.Prefix). + Bool("confirm", msg.Confirm).Msg("requested to delete old trash files") + } else { + ylogger.Zero.Debug(). + Str("Name", msg.Prefix). + Bool("confirm", msg.Confirm).Msg("requested to delete any files") + } + err := dh.HandleDelete2Prefix(msg) + if err != nil { + _ = ycl.ReplyError(err, "failed to finish operation") + return err + } + + if _, err := ycl.GetRW().Write(message.NewReadyForQueryMessage().Encode()); err != nil { + _ = ycl.ReplyError(err, "failed to upload") + return err + } + if !msg.Confirm { + ylogger.Zero.Warn().Msg("It was a dry-run, nothing was deleted") + } else if msg.Garbage { + ylogger.Zero.Info().Msg("Deleted garbage successfully") + } else { + ylogger.Zero.Info().Msg("Deleted chunk successfully") + } + + return nil +} + func ProcessUntrashify(msg message.UntrashifyMessage, s storage.StorageInteractor, bs storage.StorageInteractor, ycl client.YproxyClient) error { ycl.SetExternalFilePath(msg.Name) @@ -768,7 +811,13 @@ func ProcConn(s storage.StorageInteractor, bs storage.StorageInteractor, cr cryp if err != nil { return err } - + case message.MessageTypeDelete2: + msg := message.Delete2Message{} + msg.Decode(body) + err := ProcessDelete2Extended(msg, s, bs, ycl, cnf) + if err != nil { + return err + } case message.MessageTypeUntrashify: // receive message msg := message.UntrashifyMessage{} diff --git a/pkg/storage/s3storage.go b/pkg/storage/s3storage.go index 4c8edd1..0f4056e 100644 --- a/pkg/storage/s3storage.go +++ b/pkg/storage/s3storage.go @@ -235,11 +235,11 @@ func (s *S3StorageInteractor) ListBucketPath(bucket, prefix string, useCache boo if cPath[0] != '/' { cPath = "/" + cPath } - ylogger.Zero.Debug().Str("path", path).Str("cpath", cPath).Msg("appending file to s3 result") metas = append(metas, &object.ObjectInfo{ - Path: cPath, - Size: *obj.Size, + Path: cPath, + Size: *obj.Size, + LastMod: *obj.LastModified, }) }