Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions api/protos/bootstrap/v1/bootstrap.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ message Upstream {

// Timeout for upstream connection stream.
// If unset defaults to no timeout.
// A new stream will be opened (retried) after the timeout is hit.
// Usage example: 2m to represent 2 minutes.
string stream_timeout = 3;

Expand All @@ -58,6 +59,87 @@ message Upstream {
// jitter=5s -> stream timeout is a random value between 15s and 20s.
// If unset defaults to no jitter.
string stream_timeout_jitter = 4;

// Optional. Maximum timeout for blocked gRPC sends.
// If this timeout is reached, the stream will be disconnected and re-opened.
//
// The value of gRPC send timeout is generally,
// min(stream_timeout - stream_open_duration, stream_send_max_timeout)
//
// A stream_send_min_timeout buffer is added for short final send timeout values to prevent
// scenarios where the stream deadline is reached too quickly. The complete accuracy of the
// value is therefore,
// stream send timeout = max(stream_send_min_timeout, min(stream_timeout - stream_open_duration, stream_send_max_timeout))
//
// If unset, the stream_timeout value is used.
//
// Examples:
//
// Ex 1:
// stream_timeout = 15m
// stream_send_max_timeout = 5m
// stream_send_min_timeout = 1m
// ... 1m in send blocks
// final send timeout = max(1m, min(15m - 1m, 5m)) = 5m
// ... 11m in send blocks
// final send timeout = max(1m, min(15m - 11m, 5m)) = 4m
// ... 14.5m in send blocks
// A 1m buffer is added for short final send timeout values to prevent scenarios where
// the stream deadline is reached too quickly.
// final send timeout = max(1m, min(15m - 14.5m, 5m)) = 1m
//
// Ex 2:
// stream_timeout = 5m
// stream_send_max_timeout = 10m
// stream_send_min_timeout = "" // not configured
// ... 1m in send blocks
// final send timeout = min(5m - 1m, 10m) = 4m
// ... 4m in send blocks
// final send timeout = min(5m - 4m, 10m) = 1m
// ... > 5m in send blocks will never occur because of the 5m stream timeout
//
// Ex 3:
// stream_timeout = "" // not configured
// stream_send_max_timeout = 5m
// stream_send_min_timeout = 4m
// ... in all send block scenarios,
// final send timeout = max(4m, 5m) = 5m
//
// Ex 4:
// stream_timeout = 10m
// stream_send_max_timeout = "" // not configured
// stream_send_min_timeout = 1m
// ... in all send block scenarios,
// final send timeout = max(1m, 10m) = 10m
//
string stream_send_max_timeout = 5;

// Optional. Used in conjunction with stream_send_max_timeout.
// Refer to document for stream_send_max_timeout.
// This value must not be > stream_send_max_timeout. Server enforced.
string stream_send_min_timeout = 6;

// Optional. Maximum timeout for blocked gRPC recvs.
// If this timeout is reached, a NACK will be sent to the upstream server with the same
// version and nounce as the prior attempted send.
//
// The value of gRPC recv timeout is generally,
// min(stream_timeout - stream_open_duration, stream_recv_max_timeout)
//
// A stream_recv_min_timeout buffer is added for short final recv timeout values to prevent
// scenarios where the stream deadline is reached too quickly. The complete accuracy of the
// value is therefore,
// stream recv timeout = max(stream_recv_min_timeout, min(stream_timeout - stream_open_duration, stream_recv_max_timeout))
//
// If unset, the stream_timeout value is used.
//
// Please refer to stream_send_max_timeout for examples, replacing send with recv.
string stream_recv_max_timeout = 7;

// Optional. Used in conjunction with stream_recv_max_timeout.
// Refer to document for stream_recv_max_timeout.
// This value must not be > stream_recv_max_timeout. Server enforced.
string stream_recv_min_timeout = 8;
}

// [#next-free-field: 3]
Expand Down
65 changes: 46 additions & 19 deletions internal/app/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,25 +85,7 @@ func RunWithContext(ctx context.Context, cancel context.CancelFunc, bootstrapCon
panicOnError(ctx, logger, err, "failed to configure stats client")

// Initialize upstream client.
upstreamPort := strconv.FormatUint(uint64(bootstrapConfig.OriginServer.Address.PortValue), 10)
upstreamAddress := net.JoinHostPort(bootstrapConfig.OriginServer.Address.Address, upstreamPort)
upstreamStreamTimeout, err := util.StringToDuration(bootstrapConfig.OriginServer.StreamTimeout, 0*time.Second)
panicOnError(ctx, logger, err, "failed to parse upstream stream timeout")
upstreamStreamJitter, err := util.StringToDuration(bootstrapConfig.OriginServer.StreamTimeoutJitter, 0*time.Second)
panicOnError(ctx, logger, err, "failed to parse upstream stream jitter")
upstreamConnTimeout, err := util.StringToDuration(bootstrapConfig.OriginServer.KeepAliveTime, 5*time.Minute)
panicOnError(ctx, logger, err, "failed to parse upstream connection timeout")
upstreamClient, err := upstream.New(
ctx,
upstreamAddress,
upstream.CallOptions{
StreamTimeout: upstreamStreamTimeout,
StreamTimeoutJitter: upstreamStreamJitter,
ConnKeepaliveTimeout: upstreamConnTimeout,
},
logger,
scope,
)
upstreamClient, err := initializeUpstreamClient(ctx, bootstrapConfig, logger, scope)
panicOnError(ctx, logger, err, "failed to initialize upstream client")

// Initialize request aggregation mapper component.
Expand Down Expand Up @@ -180,6 +162,51 @@ func RunWithContext(ctx context.Context, cancel context.CancelFunc, bootstrapCon
}
}

func initializeUpstreamClient(
ctx context.Context,
bootstrapConfig *bootstrapv1.Bootstrap,
logger log.Logger,
scope tally.Scope,
) (upstream.Client, error) {
upstreamPort := strconv.FormatUint(uint64(bootstrapConfig.OriginServer.Address.PortValue), 10)
upstreamAddress := net.JoinHostPort(bootstrapConfig.OriginServer.Address.Address, upstreamPort)
upstreamStreamTimeout, err := util.StringToDuration(bootstrapConfig.OriginServer.StreamTimeout, 0*time.Second)
panicOnError(ctx, logger, err, "failed to parse upstream stream timeout")
upstreamStreamJitter, err := util.StringToDuration(bootstrapConfig.OriginServer.StreamTimeoutJitter, 0*time.Second)
panicOnError(ctx, logger, err, "failed to parse upstream stream jitter")
upstreamStreamSendMaxTimeout, err := util.StringToDuration(bootstrapConfig.OriginServer.StreamSendMaxTimeout, 0*time.Second)
panicOnError(ctx, logger, err, "failed to parse upstream stream max timeout")
upstreamStreamSendMinTimeout, err := util.StringToDuration(bootstrapConfig.OriginServer.StreamSendMinTimeout, 0*time.Second)
panicOnError(ctx, logger, err, "failed to parse upstream stream min timeout")
upstreamStreamRecvMaxTimeout, err := util.StringToDuration(bootstrapConfig.OriginServer.StreamRecvMaxTimeout, 0*time.Second)
panicOnError(ctx, logger, err, "failed to parse upstream stream recv max timeout")
upstreamStreamRecvMinTimeout, err := util.StringToDuration(bootstrapConfig.OriginServer.StreamRecvMinTimeout, 0*time.Second)
panicOnError(ctx, logger, err, "failed to parse upstream stream recv min timeout")
upstreamConnTimeout, err := util.StringToDuration(bootstrapConfig.OriginServer.KeepAliveTime, 5*time.Minute)
panicOnError(ctx, logger, err, "failed to parse upstream connection timeout")
if upstreamStreamSendMinTimeout.Nanoseconds > upstreamStreamSendMaxTimeout {
panicOnError(ctx, logger, err, "stream send min timeout cannot be larger than stream send max timeout")
}
if upstreamStreamRecvMinTimeout.Nanoseconds > upstreamStreamRecvMaxTimeout {
panicOnError(ctx, logger, err, "stream recv min timeout cannot be larger than stream recv max timeout")
}
return upstream.New(
ctx,
upstreamAddress,
upstream.CallOptions{
StreamTimeout: upstreamStreamTimeout,
StreamTimeoutJitter: upstreamStreamJitter,
StreamSendMaxTimeout: upstreamStreamSendMaxTimeout,
StreamSendMinTimeout: upstreamStreamSendMinTimeout,
StreamRecvMaxTimeout: upstreamStreamRecvMaxTimeout,
StreamRecvMinTimeout: upstreamStreamRecvMinTimeout,
ConnKeepaliveTimeout: upstreamConnTimeout,
},
logger,
scope,
)
}

func panicOnError(ctx context.Context, logger log.Logger, err error, msg string) {
if err != nil {
logger.With("error", err).Panic(ctx, msg)
Expand Down
10 changes: 9 additions & 1 deletion internal/app/transport/stream.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
package transport

import (
"context"
"time"

status "google.golang.org/genproto/googleapis/rpc/status"
)

// Stream abstracts the grpc client stream and DiscoveryRequest/Response
type Stream interface {
SendMsg(version string, nonce string) error
SendMsg(version string, nonce string, errorDetail *status.Status) error
RecvMsg() (Response, error)
RecvMsgWithTimeout(ctx context.Context, timeout time.Duration) (Response, error)
CloseSend() error
}
27 changes: 25 additions & 2 deletions internal/app/transport/streamv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package transport

import (
"context"
"time"

v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/envoyproxy/xds-relay/internal/pkg/log"
status "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
)

Expand All @@ -25,11 +27,11 @@ func NewStreamV2(clientStream grpc.ClientStream, req Request, l log.Logger) Stre
}
}

func (s *streamv2) SendMsg(version string, nonce string) error {
func (s *streamv2) SendMsg(version string, nonce string, errorDetail *status.Status) error {
msg := s.initialRequest.GetRaw().V2
msg.VersionInfo = version
msg.ResponseNonce = nonce
msg.ErrorDetail = nil
msg.ErrorDetail = errorDetail
s.logger.With(
"request_type", msg.GetTypeUrl(),
"request_version", msg.GetVersionInfo(),
Expand All @@ -51,6 +53,27 @@ func (s *streamv2) RecvMsg() (Response, error) {
return NewResponseV2(s.initialRequest.GetRaw().V2, resp), nil
}

func (s *streamv2) RecvMsgWithTimeout(ctx context.Context, timeout time.Duration) (Response, error) {
type response struct {
resp Response
err error
}
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
respChan := make(chan response, 1)
go func() {
resp, err := s.RecvMsg()
respChan <- response{resp: resp, err: err}
close(respChan)
}()
select {
case <-timeoutCtx.Done():
return nil, timeoutCtx.Err()
case r := <-respChan:
return r.resp, r.err
}
}

func (s *streamv2) CloseSend() error {
return s.grpcClientStream.CloseSend()
}
27 changes: 25 additions & 2 deletions internal/app/transport/streamv3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package transport

import (
"context"
"time"

v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/xds-relay/internal/pkg/log"
status "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
)

Expand All @@ -25,11 +27,11 @@ func NewStreamV3(clientStream grpc.ClientStream, req Request, l log.Logger) Stre
}
}

func (s *streamv3) SendMsg(version string, nonce string) error {
func (s *streamv3) SendMsg(version string, nonce string, errorDetail *status.Status) error {
msg := s.initialRequest.GetRaw().V3
msg.VersionInfo = version
msg.ErrorDetail = nil
msg.ResponseNonce = nonce
msg.ErrorDetail = errorDetail
s.logger.With(
"request_type", msg.GetTypeUrl(),
"request_version", msg.GetVersionInfo(),
Expand All @@ -53,6 +55,27 @@ func (s *streamv3) RecvMsg() (Response, error) {
return NewResponseV3(s.initialRequest.GetRaw().V3, resp), nil
}

func (s *streamv3) RecvMsgWithTimeout(ctx context.Context, timeout time.Duration) (Response, error) {
type response struct {
resp Response
err error
}
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
respChan := make(chan response, 1)
go func() {
resp, err := s.RecvMsg()
respChan <- response{resp: resp, err: err}
close(respChan)
}()
select {
case <-timeoutCtx.Done():
return nil, timeoutCtx.Err()
case r := <-respChan:
return r.resp, r.err
}
}

func (s *streamv3) CloseSend() error {
return s.grpcClientStream.CloseSend()
}
Loading