Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,33 @@ func deleteFunc(con net.Conn, instanceCnf *config.Instance, args []string) error

return nil
}
func delete2Func(con net.Conn, instanceCnf *config.Instance, args []string) error {
ylogger.Zero.Info().Msg("Execute delete2 command")

ylogger.Zero.Info().Str("name", args[0]).Msg("delete2")
msg := message.NewDelete2Message(args[0], confirm, garbage).Encode()
_, err := con.Write(msg)
if err != nil {
return err
}

ylogger.Zero.Debug().Bytes("msg", msg).Msg("constructed delete2 msg")

client := client.NewYClient(con)
protoReader := proc.NewProtoReader(client)

ansType, body, err := protoReader.ReadPacket()
if err != nil {
ylogger.Zero.Debug().Err(err).Msg("error while receiving answer")
return err
}

if ansType != message.MessageTypeReadyForQuery {
return fmt.Errorf("failed to delete2, msg: %v", body)
}

return nil
}

func untrashifyFunc(con net.Conn, instanceCnf *config.Instance, args []string) error {
ylogger.Zero.Info().Msg("Execute untrashify command")
Expand Down Expand Up @@ -408,6 +435,13 @@ var goolCmd = &cobra.Command{
RunE: Runner(goolFunc),
}

var delete2Cmd = &cobra.Command{
Use: "deleteTrash",
Short: "deleteTrash",
RunE: Runner(delete2Func),
Args: cobra.ExactArgs(1), // name_prefix
}

func init() {
rootCmd.PersistentFlags().StringVarP(&cfgPath, "config", "c", "/etc/yproxy/yproxy.yaml", "path to yproxy config file")
rootCmd.PersistentFlags().StringVarP(&logLevel, "log-level", "l", "", "log level")
Expand Down Expand Up @@ -446,6 +480,10 @@ func init() {
untrashifyCmd.PersistentFlags().Uint64VarP(&segmentNum, "segnum", "s", 0, "logical number of a segment")
untrashifyCmd.PersistentFlags().BoolVarP(&confirm, "confirm", "", false, "confirm deletion")
rootCmd.AddCommand(untrashifyCmd)

delete2Cmd.PersistentFlags().BoolVarP(&confirm, "confirm", "", false, "confirm deletion")
delete2Cmd.PersistentFlags().BoolVarP(&garbage, "garbage", "g", false, "delete garbage")
rootCmd.AddCommand(delete2Cmd)
}

func main() {
Expand Down
55 changes: 55 additions & 0 deletions pkg/message/delete2_message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package message

import (
"encoding/binary"
)

type Delete2Message struct { //seg port
Prefix string
Confirm bool
Garbage bool
}

var _ ProtoMessage = &Delete2Message{}

func NewDelete2Message(prefix string, confirm bool, garbage bool) *Delete2Message {
return &Delete2Message{
Prefix: prefix,
Confirm: confirm,
Garbage: garbage,
}
}

func (c *Delete2Message) Encode() []byte {
bt := []byte{
byte(MessageTypeDelete2),
0,
0,
0,
}

if c.Confirm {
bt[1] = 1
}
if c.Garbage {
bt[2] = 1
}

bt = append(bt, []byte(c.Prefix)...)
bt = append(bt, 0)

ln := len(bt) + 8
bs := make([]byte, 8)
binary.BigEndian.PutUint64(bs, uint64(ln))
return append(bs, bt...)
}

func (c *Delete2Message) Decode(body []byte) {
if body[1] == 1 {
c.Confirm = true
}
if body[2] == 1 {
c.Garbage = true
}
c.Prefix, _ = GetCstring(body[4:])
}
3 changes: 3 additions & 0 deletions pkg/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
MessageTypeCopyV2
MessageTypeCopyComplete
MessageTypeListV2
MessageTypeDelete2

MessageCollectObsolete = MessageType(64)
MessageDeleteObsolete = MessageType(65)
Expand Down Expand Up @@ -92,6 +93,8 @@ func (m MessageType) String() string {
return "DELETE OBSOLETE"
case MessageTypeCopyComplete:
return "COPY COMPLETE"
case MessageTypeDelete2:
return "DELETE2"
}
return "UNKNOWN"
}
7 changes: 5 additions & 2 deletions pkg/object/objectInfo.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package object

import "time"

type ObjectInfo struct {
Path string
Size int64
Path string
Size int64
LastMod time.Time
}
107 changes: 107 additions & 0 deletions pkg/proc/delete_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"fmt"
"path"
"strings"
"time"

"github.com/pkg/errors"
"github.com/yezzey-gp/yproxy/config"
"github.com/yezzey-gp/yproxy/pkg/backups"
"github.com/yezzey-gp/yproxy/pkg/database"
"github.com/yezzey-gp/yproxy/pkg/message"
"github.com/yezzey-gp/yproxy/pkg/object"
"github.com/yezzey-gp/yproxy/pkg/storage"
"github.com/yezzey-gp/yproxy/pkg/ylogger"
)
Expand Down Expand Up @@ -155,7 +157,112 @@ func (dh *BasicGarbageMgr) HandleDeleteGarbage(msg message.DeleteMessage) error
}
return nil
}
func (dh *BasicGarbageMgr) ListDelete2Files(bucket string, msg message.Delete2Message) ([]*object.ObjectInfo, error) {
//get first backup lsn
var err error

//list files in storage
ylogger.Zero.Info().Str("path", msg.Prefix).Msg("listing prefix")
objectMetas, err := dh.StorageInterractor.ListBucketPath(bucket, msg.Prefix, true)
if err != nil {
return nil, errors.Wrap(err, "could not list objects")
}
ylogger.Zero.Info().Int("amount", len(objectMetas)).Msg("objects count")

filesToDelete := make([]*object.ObjectInfo, 0)
for i := range objectMetas {
filename := objectMetas[i].Path
ylogger.Zero.Debug().Str("name", filename).Msg("lookup chunk")
if !strings.HasPrefix(filename, msg.Prefix) {
continue
}
ylogger.Zero.Debug().Str("file", objectMetas[i].Path).
Str("has prefix so will be deleted", msg.Prefix)

filesToDelete = append(filesToDelete, objectMetas[i])

}

ylogger.Zero.Info().Int("amount", len(filesToDelete)).Msg("files will be deleted")

return filesToDelete, nil
}

func (dh *BasicGarbageMgr) DeletePrefixInBucket(bucket string, msg message.Delete2Message) error {
fileList, err := dh.ListDelete2Files(bucket, msg)
if err != nil {
return errors.Wrap(err, "failed to delete file")
}
uploads, err := dh.StorageInterractor.ListFailedMultipartUploads(bucket)
if err != nil {
return err
}
ylogger.Zero.Info().Str("bucket", bucket).Int("amount", len(uploads)).Msg("multipart uploads will be aborted")

for _, file := range fileList {
ylogger.Zero.Info().Str("bucket", bucket).Str("file", file.Path).Msg("file will be deleted")
}
for _, upload := range uploads {
ylogger.Zero.Info().Str("bucket", bucket).Str("uploadId", upload).Msg("upload will be aborted")
}

if !msg.Confirm { //do not delete files if no confirmation flag provided
ylogger.Zero.Info().Msg("do not perform actual delete files as no confirmation flag provided")
return nil
}
if !msg.Garbage {
ylogger.Zero.Info().Msg("delete any files blocked now")
msg.Garbage = true
}
if msg.Garbage && !strings.Contains(msg.Prefix, "trash") {
ylogger.Zero.Info().Msg("prefix doesnt contain trash aborted")
return nil
}
var failed []*object.ObjectInfo
retryCount := 0
for len(fileList) > 0 && retryCount < 10 {
retryCount++
for i := 0; i < len(fileList); i++ {
if !msg.Garbage {
ylogger.Zero.Info().Str("bucket", bucket).Str("path", fileList[i].Path).Msg("simply delete without any 'plan B' (do nothing)")

} else if strings.Contains(fileList[i].Path, "trash") && fileList[i].LastMod.Add(time.Hour*24*7).Unix() < time.Now().Unix() {
ylogger.Zero.Info().Str("bucket", bucket).Str("path", fileList[i].Path).Msg("simply delete without any 'plan B'")
err = dh.StorageInterractor.DeleteObject(bucket, fileList[i].Path)

}
if err != nil {
ylogger.Zero.Warn().AnErr("err", err).Str("bucket", bucket).Str("file", fileList[i].Path).Msg("failed to delete file")
failed = append(failed, fileList[i])
}
}
fileList = failed
failed = make([]*object.ObjectInfo, 0)
}

if len(fileList) > 0 {
ylogger.Zero.Error().Str("bucket", bucket).Int("failed files count", len(fileList)).Msg("some files were not moved")
ylogger.Zero.Error().Str("bucket", bucket).Any("failed files", fileList).Msg("failed to move some files")
return errors.Wrap(err, "failed to move some files")
}

for key, uploadId := range uploads {
if err := dh.StorageInterractor.AbortMultipartUpload(bucket, key, uploadId); err != nil {
return err
}
}

return nil
}

func (dh *BasicGarbageMgr) HandleDelete2Prefix(msg message.Delete2Message) error {
for _, b := range dh.StorageInterractor.ListBuckets() {
if err := dh.DeletePrefixInBucket(b, msg); err != nil {
return err
}
}
return nil
}
func (dh *BasicGarbageMgr) HandleDeleteFile(msg message.DeleteMessage) error {
if !msg.Confirm {
return nil
Expand Down
51 changes: 50 additions & 1 deletion pkg/proc/interaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,49 @@ func ProcessDeleteExtended(msg message.DeleteMessage, s storage.StorageInteracto
return nil
}

func ProcessDelete2Extended(msg message.Delete2Message, s storage.StorageInteractor, bs storage.StorageInteractor, ycl client.YproxyClient, cnf *config.Vacuum) error {
ycl.SetExternalFilePath(msg.Prefix)

dbInterractor := &database.DatabaseHandler{}
backupHandler := &backups.StorageBackupInteractor{Storage: bs}

var dh = &BasicGarbageMgr{
StorageInterractor: s,
DbInterractor: dbInterractor,
BackupInterractor: backupHandler,
Cnf: cnf,
}

if msg.Garbage {
ylogger.Zero.Debug().
Str("Prefix", msg.Prefix).
Bool("confirm", msg.Confirm).Msg("requested to delete old trash files")
} else {
ylogger.Zero.Debug().
Str("Name", msg.Prefix).
Bool("confirm", msg.Confirm).Msg("requested to delete any files")
}
err := dh.HandleDelete2Prefix(msg)
if err != nil {
_ = ycl.ReplyError(err, "failed to finish operation")
return err
}

if _, err := ycl.GetRW().Write(message.NewReadyForQueryMessage().Encode()); err != nil {
_ = ycl.ReplyError(err, "failed to upload")
return err
}
if !msg.Confirm {
ylogger.Zero.Warn().Msg("It was a dry-run, nothing was deleted")
} else if msg.Garbage {
ylogger.Zero.Info().Msg("Deleted garbage successfully")
} else {
ylogger.Zero.Info().Msg("Deleted chunk successfully")
}

return nil
}

func ProcessUntrashify(msg message.UntrashifyMessage, s storage.StorageInteractor, bs storage.StorageInteractor, ycl client.YproxyClient) error {
ycl.SetExternalFilePath(msg.Name)

Expand Down Expand Up @@ -768,7 +811,13 @@ func ProcConn(s storage.StorageInteractor, bs storage.StorageInteractor, cr cryp
if err != nil {
return err
}

case message.MessageTypeDelete2:
msg := message.Delete2Message{}
msg.Decode(body)
err := ProcessDelete2Extended(msg, s, bs, ycl, cnf)
if err != nil {
return err
}
case message.MessageTypeUntrashify:
// receive message
msg := message.UntrashifyMessage{}
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/s3storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,11 @@ func (s *S3StorageInteractor) ListBucketPath(bucket, prefix string, useCache boo
if cPath[0] != '/' {
cPath = "/" + cPath
}

ylogger.Zero.Debug().Str("path", path).Str("cpath", cPath).Msg("appending file to s3 result")
metas = append(metas, &object.ObjectInfo{
Path: cPath,
Size: *obj.Size,
Path: cPath,
Size: *obj.Size,
LastMod: *obj.LastModified,
})
}

Expand Down
Loading