From 7b093d7b0792506cb6028ca3760fd7433f31c8cb Mon Sep 17 00:00:00 2001 From: reshke kirill Date: Sat, 29 Nov 2025 06:29:33 +0000 Subject: [PATCH] Add limit for yproxy writer --- pkg/proc/yio/limiter/limiter.go | 66 +++++++++++++++++++++++++++++++++ pkg/proc/yio/yrreader.go | 12 ++---- pkg/proc/yio/ywriter.go | 11 +++++- 3 files changed, 80 insertions(+), 9 deletions(-) diff --git a/pkg/proc/yio/limiter/limiter.go b/pkg/proc/yio/limiter/limiter.go index 8a0291e..8f40d0d 100644 --- a/pkg/proc/yio/limiter/limiter.go +++ b/pkg/proc/yio/limiter/limiter.go @@ -4,7 +4,9 @@ import ( "context" "fmt" "io" + "sync" + "github.com/yezzey-gp/yproxy/config" "github.com/yezzey-gp/yproxy/pkg/ylogger" "golang.org/x/time/rate" ) @@ -48,3 +50,67 @@ func (r *Reader) Read(buf []byte) (int, error) { err = r.limiter.WaitN(r.ctx, n) return n, err } + +type Writer struct { + writer io.WriteCloser + limiter *rate.Limiter + ctx context.Context +} + +// Close implements io.ReadCloser. +func (r *Writer) Close() error { + return r.writer.Close() +} + +func NewWriter(writer io.WriteCloser, limiter *rate.Limiter) *Writer { + return &Writer{ + ctx: context.Background(), + writer: writer, + limiter: limiter, + } +} + +func (r *Writer) Write(buf []byte) (int, error) { + if len(buf) == 0 { + return 0, fmt.Errorf("empty buffer passed") + } + + end := min(r.limiter.Burst(), len(buf)) + n, err := r.writer.Write(buf[:end]) + + if err != nil { + N := max(n, 0) + limiterErr := r.limiter.WaitN(r.ctx, N) + if limiterErr != nil { + ylogger.Zero.Error().Err(limiterErr).Msg("Error happened while limiting") + } + return n, err + } + + err = r.limiter.WaitN(r.ctx, n) + return n, err +} + +var ( + /* Single limiter for all external storage interaction */ + mu sync.Mutex + netLimiter *rate.Limiter = nil +) + +func GetLimiter() *rate.Limiter { + mu.Lock() + defer mu.Unlock() + + if netLimiter != nil { + ylogger.Zero.Debug().Msg("reuse limiter") + return netLimiter + } + + netLimit := config.InstanceConfig().StorageCnf.StorageRateLimit + ylogger.Zero.Debug().Uint64("bytes per sec", netLimit).Msg("allocate limiter") + + netLimiter = rate.NewLimiter(rate.Limit(netLimit), + int(netLimit)) + + return netLimiter +} diff --git a/pkg/proc/yio/yrreader.go b/pkg/proc/yio/yrreader.go index 9f44666..993e85c 100644 --- a/pkg/proc/yio/yrreader.go +++ b/pkg/proc/yio/yrreader.go @@ -5,7 +5,6 @@ import ( "io" "time" - "github.com/yezzey-gp/yproxy/config" "github.com/yezzey-gp/yproxy/pkg/client" "github.com/yezzey-gp/yproxy/pkg/proc/yio/limiter" "github.com/yezzey-gp/yproxy/pkg/settings" @@ -21,7 +20,7 @@ type RestartReader interface { type YRestartReader struct { underlying io.ReadCloser - limiter *rate.Limiter + lim *rate.Limiter s storage.StorageInteractor name string settings []settings.StorageSettings @@ -45,16 +44,13 @@ func (y *YRestartReader) Read(p []byte) (n int, err error) { func NewRestartReader(s storage.StorageInteractor, name string, setts []settings.StorageSettings) RestartReader { - netLimit := config.InstanceConfig().StorageCnf.StorageRateLimit - - l := rate.NewLimiter(rate.Limit(netLimit), - int(netLimit)) + l := limiter.GetLimiter() return &YRestartReader{ s: s, name: name, settings: setts, - limiter: l, + lim: l, } } @@ -74,7 +70,7 @@ func (y *YRestartReader) Restart(offsetStart int64) error { /* with limiter */ - y.underlying = limiter.NewReader(r, y.limiter) + y.underlying = limiter.NewReader(r, y.lim) return nil } diff --git a/pkg/proc/yio/ywriter.go b/pkg/proc/yio/ywriter.go index a1aaae8..eba0214 100644 --- a/pkg/proc/yio/ywriter.go +++ b/pkg/proc/yio/ywriter.go @@ -4,13 +4,17 @@ import ( "io" "github.com/yezzey-gp/yproxy/pkg/client" + "github.com/yezzey-gp/yproxy/pkg/proc/yio/limiter" "github.com/yezzey-gp/yproxy/pkg/ylogger" + "golang.org/x/time/rate" ) /* TBD: support restart */ type YproxyWriter struct { underlying io.WriteCloser + lim *rate.Limiter + selfCl client.YproxyClient offsetReached int64 @@ -34,11 +38,16 @@ func (y *YproxyWriter) Write(p []byte) (n int, err error) { } func NewYproxyWriter(under io.WriteCloser, selfCl client.YproxyClient) io.WriteCloser { - return &YproxyWriter{ + + w := &YproxyWriter{ underlying: under, + lim: limiter.GetLimiter(), selfCl: selfCl, offsetReached: 0, } + + w.underlying = limiter.NewWriter(under, w.lim) + return w } var _ io.WriteCloser = &YproxyWriter{}