Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions pkg/proc/yio/limiter/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"fmt"
"io"
"sync"

"github.com/yezzey-gp/yproxy/config"
"github.com/yezzey-gp/yproxy/pkg/ylogger"
"golang.org/x/time/rate"
)
Expand Down Expand Up @@ -48,3 +50,67 @@ func (r *Reader) Read(buf []byte) (int, error) {
err = r.limiter.WaitN(r.ctx, n)
return n, err
}

type Writer struct {
writer io.WriteCloser
limiter *rate.Limiter
ctx context.Context
}

// Close implements io.ReadCloser.
func (r *Writer) Close() error {
return r.writer.Close()
}

func NewWriter(writer io.WriteCloser, limiter *rate.Limiter) *Writer {
return &Writer{
ctx: context.Background(),
writer: writer,
limiter: limiter,
}
}

func (r *Writer) Write(buf []byte) (int, error) {
if len(buf) == 0 {
return 0, fmt.Errorf("empty buffer passed")
}

end := min(r.limiter.Burst(), len(buf))
n, err := r.writer.Write(buf[:end])

if err != nil {
N := max(n, 0)
limiterErr := r.limiter.WaitN(r.ctx, N)
if limiterErr != nil {
ylogger.Zero.Error().Err(limiterErr).Msg("Error happened while limiting")
}
return n, err
}

err = r.limiter.WaitN(r.ctx, n)
return n, err
}

var (
/* Single limiter for all external storage interaction */
mu sync.Mutex
netLimiter *rate.Limiter = nil
)

func GetLimiter() *rate.Limiter {
mu.Lock()
defer mu.Unlock()

if netLimiter != nil {
ylogger.Zero.Debug().Msg("reuse limiter")
return netLimiter
}

netLimit := config.InstanceConfig().StorageCnf.StorageRateLimit
ylogger.Zero.Debug().Uint64("bytes per sec", netLimit).Msg("allocate limiter")

netLimiter = rate.NewLimiter(rate.Limit(netLimit),
int(netLimit))

return netLimiter
}
12 changes: 4 additions & 8 deletions pkg/proc/yio/yrreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"io"
"time"

"github.com/yezzey-gp/yproxy/config"
"github.com/yezzey-gp/yproxy/pkg/client"
"github.com/yezzey-gp/yproxy/pkg/proc/yio/limiter"
"github.com/yezzey-gp/yproxy/pkg/settings"
Expand All @@ -21,7 +20,7 @@ type RestartReader interface {

type YRestartReader struct {
underlying io.ReadCloser
limiter *rate.Limiter
lim *rate.Limiter
s storage.StorageInteractor
name string
settings []settings.StorageSettings
Expand All @@ -45,16 +44,13 @@ func (y *YRestartReader) Read(p []byte) (n int, err error) {
func NewRestartReader(s storage.StorageInteractor,
name string, setts []settings.StorageSettings) RestartReader {

netLimit := config.InstanceConfig().StorageCnf.StorageRateLimit

l := rate.NewLimiter(rate.Limit(netLimit),
int(netLimit))
l := limiter.GetLimiter()

return &YRestartReader{
s: s,
name: name,
settings: setts,
limiter: l,
lim: l,
}
}

Expand All @@ -74,7 +70,7 @@ func (y *YRestartReader) Restart(offsetStart int64) error {

/* with limiter */

y.underlying = limiter.NewReader(r, y.limiter)
y.underlying = limiter.NewReader(r, y.lim)

return nil
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/proc/yio/ywriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@ import (
"io"

"github.com/yezzey-gp/yproxy/pkg/client"
"github.com/yezzey-gp/yproxy/pkg/proc/yio/limiter"
"github.com/yezzey-gp/yproxy/pkg/ylogger"
"golang.org/x/time/rate"
)

/* TBD: support restart */
type YproxyWriter struct {
underlying io.WriteCloser

lim *rate.Limiter

selfCl client.YproxyClient

offsetReached int64
Expand All @@ -34,11 +38,16 @@ func (y *YproxyWriter) Write(p []byte) (n int, err error) {
}

func NewYproxyWriter(under io.WriteCloser, selfCl client.YproxyClient) io.WriteCloser {
return &YproxyWriter{

w := &YproxyWriter{
underlying: under,
lim: limiter.GetLimiter(),
selfCl: selfCl,
offsetReached: 0,
}

w.underlying = limiter.NewWriter(under, w.lim)
return w
}

var _ io.WriteCloser = &YproxyWriter{}
Loading