From d290e06a82cc7bf0d6252b6b25b663f4e7fe7c3d Mon Sep 17 00:00:00 2001 From: reshke kirill Date: Fri, 28 Nov 2025 20:41:03 +0000 Subject: [PATCH 1/2] Implement network limiter --- config/instance.go | 6 ++++ config/storage.go | 3 +- go.mod | 5 +-- go.sum | 2 ++ pkg/proc/interaction.go | 3 +- pkg/proc/yio/limiter/limiter.go | 55 +++++++++++++++++++++++++++++++++ pkg/proc/yio/yrreader.go | 16 +++++++++- 7 files changed, 85 insertions(+), 5 deletions(-) create mode 100644 pkg/proc/yio/limiter/limiter.go diff --git a/config/instance.go b/config/instance.go index 8a59766..06a771d 100644 --- a/config/instance.go +++ b/config/instance.go @@ -73,6 +73,9 @@ const ( DefaultPsqlPort = 8432 DefaultEndpointSourceScheme = "https" + + /* 1 GB per second */ + DefaultStorageRateLimit = 1024 * 1024 * 1024 ) func EmbedDefaults(cfgInstance *Instance) { @@ -97,6 +100,9 @@ func EmbedDefaults(cfgInstance *Instance) { if cfgInstance.StorageCnf.EndpointSourceScheme == "" { cfgInstance.StorageCnf.EndpointSourceScheme = DefaultEndpointSourceScheme } + if cfgInstance.StorageCnf.StorageRateLimit == 0 { + cfgInstance.StorageCnf.StorageRateLimit = DefaultStorageRateLimit + } cfgInstance.YezzeyRestoreParanoid = false } diff --git a/config/storage.go b/config/storage.go index 200a6c9..5c75a87 100644 --- a/config/storage.go +++ b/config/storage.go @@ -24,7 +24,8 @@ type Storage struct { TablespaceMap map[string]string `json:"tablespace_map" toml:"tablespace_map" yaml:"tablespace_map"` // how many concurrent connection acquire allowed - StorageConcurrency int64 `json:"storage_concurrency" toml:"storage_concurrency" yaml:"storage_concurrency"` + StorageConcurrency int64 `json:"storage_concurrency" toml:"storage_concurrency" yaml:"storage_concurrency"` + StorageRateLimit uint64 `json:"storage_rate_limit" toml:"storage_rate_limit" yaml:"storage_rate_limit"` StorageRegion string `json:"storage_region" toml:"storage_region" yaml:"storage_region"` diff --git a/go.mod b/go.mod index f620430..7318d2a 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,8 @@ module github.com/yezzey-gp/yproxy -go 1.22.0 +go 1.24.0 -toolchain go1.23.1 +toolchain go1.24.10 require ( github.com/BurntSushi/toml v1.3.2 @@ -38,6 +38,7 @@ require ( golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.32.0 // indirect golang.org/x/sys v0.28.0 // indirect + golang.org/x/time v0.14.0 // indirect golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect gonum.org/v1/gonum v0.15.1 // indirect ) diff --git a/go.sum b/go.sum index 95a0245..c6f9622 100644 --- a/go.sum +++ b/go.sum @@ -135,6 +135,8 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= +golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index a08f092..4aa3c4c 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -312,7 +312,8 @@ func ProcessCopyExtended( } /* get reader */ - readerFromOldBucket := yio.NewYRetryReader(yio.NewRestartReader(oldStorage, path, nil), ycl) + readerFromOldBucket := yio.NewYRetryReader( + yio.NewRestartReader(oldStorage, path, nil), ycl) var fromReader io.Reader fromReader = readerFromOldBucket defer func() { _ = readerFromOldBucket.Close() }() diff --git a/pkg/proc/yio/limiter/limiter.go b/pkg/proc/yio/limiter/limiter.go new file mode 100644 index 0000000..44a5ff3 --- /dev/null +++ b/pkg/proc/yio/limiter/limiter.go @@ -0,0 +1,55 @@ +package limiter + +import ( + "context" + "fmt" + "io" + + "github.com/yezzey-gp/yproxy/pkg/ylogger" + "golang.org/x/time/rate" +) + +type Reader struct { + reader io.ReadCloser + limiter *rate.Limiter + ctx context.Context +} + +// Close implements io.ReadCloser. +func (r *Reader) Close() error { + return r.reader.Close() +} + +func NewReader(reader io.ReadCloser, limiter *rate.Limiter) *Reader { + return &Reader{ + reader: reader, + limiter: limiter, + } +} + +func (r *Reader) Read(buf []byte) (int, error) { + if len(buf) == 0 { + return 0, fmt.Errorf("empty buffer passed") + } + + end := len(buf) + if r.limiter.Burst() < end { + end = r.limiter.Burst() + } + n, err := r.reader.Read(buf[:end]) + + if err != nil { + N := n + if n < 0 { + N = 0 + } + limiterErr := r.limiter.WaitN(r.ctx, N) + if limiterErr != nil { + ylogger.Zero.Error().Err(limiterErr).Msg("Error happened while limiting") + } + return n, err + } + + err = r.limiter.WaitN(r.ctx, n) + return n, err +} diff --git a/pkg/proc/yio/yrreader.go b/pkg/proc/yio/yrreader.go index e625aa7..9f44666 100644 --- a/pkg/proc/yio/yrreader.go +++ b/pkg/proc/yio/yrreader.go @@ -5,10 +5,13 @@ import ( "io" "time" + "github.com/yezzey-gp/yproxy/config" "github.com/yezzey-gp/yproxy/pkg/client" + "github.com/yezzey-gp/yproxy/pkg/proc/yio/limiter" "github.com/yezzey-gp/yproxy/pkg/settings" "github.com/yezzey-gp/yproxy/pkg/storage" "github.com/yezzey-gp/yproxy/pkg/ylogger" + "golang.org/x/time/rate" ) type RestartReader interface { @@ -18,6 +21,7 @@ type RestartReader interface { type YRestartReader struct { underlying io.ReadCloser + limiter *rate.Limiter s storage.StorageInteractor name string settings []settings.StorageSettings @@ -33,16 +37,24 @@ func (y *YRestartReader) Close() error { // Read implements RestartReader. func (y *YRestartReader) Read(p []byte) (n int, err error) { + /* read with rate limiter */ + return y.underlying.Read(p) } func NewRestartReader(s storage.StorageInteractor, name string, setts []settings.StorageSettings) RestartReader { + netLimit := config.InstanceConfig().StorageCnf.StorageRateLimit + + l := rate.NewLimiter(rate.Limit(netLimit), + int(netLimit)) + return &YRestartReader{ s: s, name: name, settings: setts, + limiter: l, } } @@ -60,7 +72,9 @@ func (y *YRestartReader) Restart(offsetStart int64) error { return err } - y.underlying = r + /* with limiter */ + + y.underlying = limiter.NewReader(r, y.limiter) return nil } From 8654accdda9e014748e54505f962f3f6f7750744 Mon Sep 17 00:00:00 2001 From: reshke kirill Date: Fri, 28 Nov 2025 21:24:35 +0000 Subject: [PATCH 2/2] f --- pkg/proc/yio/limiter/limiter.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/pkg/proc/yio/limiter/limiter.go b/pkg/proc/yio/limiter/limiter.go index 44a5ff3..8a0291e 100644 --- a/pkg/proc/yio/limiter/limiter.go +++ b/pkg/proc/yio/limiter/limiter.go @@ -22,6 +22,7 @@ func (r *Reader) Close() error { func NewReader(reader io.ReadCloser, limiter *rate.Limiter) *Reader { return &Reader{ + ctx: context.Background(), reader: reader, limiter: limiter, } @@ -32,17 +33,11 @@ func (r *Reader) Read(buf []byte) (int, error) { return 0, fmt.Errorf("empty buffer passed") } - end := len(buf) - if r.limiter.Burst() < end { - end = r.limiter.Burst() - } + end := min(r.limiter.Burst(), len(buf)) n, err := r.reader.Read(buf[:end]) if err != nil { - N := n - if n < 0 { - N = 0 - } + N := max(n, 0) limiterErr := r.limiter.WaitN(r.ctx, N) if limiterErr != nil { ylogger.Zero.Error().Err(limiterErr).Msg("Error happened while limiting")