Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions src/StackExchange.Redis/CommandRetry/DefaultRetry.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
namespace StackExchange.Redis
{
/// <summary>
/// Command Policy to have all commands being retried for connection exception
/// </summary>
public class AlwaysRetryOnConnectionException : ICommandRetryPolicy
{
/// <summary>
///
/// </summary>
/// <param name="commandStatus"></param>
/// <returns></returns>
public bool ShouldRetryOnConnectionException(CommandStatus commandStatus) => true;
}

/// <summary>
/// Command Policy to have only commands that are not yet sent being retried for connection exception
/// </summary>
public class RetryIfNotSentOnConnectionException : ICommandRetryPolicy
{
/// <summary>
///
/// </summary>
/// <param name="commandStatus"></param>
/// <returns></returns>
public bool ShouldRetryOnConnectionException(CommandStatus commandStatus)
{
return commandStatus == CommandStatus.WaitingToBeSent;
}
}

/// <summary>
/// Command Policy to choose which commands will be retried on a connection exception
/// </summary>
public class CommandRetryPolicy
{
/// <summary>
/// Command Policy to have all commands being retried for connection exception
/// </summary>
/// <returns></returns>
public ICommandRetryPolicy AlwaysRetryOnConnectionException()
{
return new AlwaysRetryOnConnectionException();
}

/// <summary>
/// Command Policy to have only commands that are not yet sent being retried for connection exception
/// </summary>
/// <returns></returns>
public ICommandRetryPolicy RetryIfNotSentOnConnectionException()
{
return new RetryIfNotSentOnConnectionException();
}
}
}
16 changes: 16 additions & 0 deletions src/StackExchange.Redis/CommandRetry/ICommandRetryPolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
namespace StackExchange.Redis
{
/// <summary>
/// interface to implement command retry policy
/// </summary>
public interface ICommandRetryPolicy
{
/// <summary>
/// Called when a message failed due to connection error
/// </summary>
/// <param name="commandStatus">current state of the command</param>
/// <returns></returns>
public bool ShouldRetryOnConnectionException(CommandStatus commandStatus);

}
}
15 changes: 0 additions & 15 deletions src/StackExchange.Redis/CommandRetry/IRetryOnReconnectPolicy.cs

This file was deleted.

3 changes: 2 additions & 1 deletion src/StackExchange.Redis/CommandRetry/MessageRetryQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ internal MessageRetryQueue(IMessageRetryHelper messageRetryHelper, int? maxRetry
_messageRetryHelper = messageRetryHelper;
}

public int RetryQueueLength => _queue.Count;
public int CurrentQueueLength => _queue.Count;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal bool TryHandleFailedCommand(Message message)
Expand All @@ -31,6 +31,7 @@ internal bool TryHandleFailedCommand(Message message)
int count = _queue.Count;
if (_maxRetryQueueLength.HasValue && count >= _maxRetryQueueLength)
{
// TODO: Log an error here when logging is supported
return false;
}
wasEmpty = count == 0;
Expand Down
39 changes: 0 additions & 39 deletions src/StackExchange.Redis/CommandRetry/RetryOnReconnect.cs

This file was deleted.

38 changes: 19 additions & 19 deletions src/StackExchange.Redis/ConfigurationOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,16 @@ internal static SslProtocols ParseSslProtocols(string key, string value)
return tmp;
}

internal static IRetryOnReconnectPolicy ParseRetryCommandsOnReconnect(string key, string value)
internal static ICommandRetryPolicy ParseCommandRetryPolicy(string key, string value)
{
switch (value.ToLower())
{
case "noretry":
return null;
case "alwaysretry":
return RetryOnReconnect.Always;
return new CommandRetryPolicy().AlwaysRetryOnConnectionException();
case "retryifnotsent":
return RetryOnReconnect.IfNotSent;
return new CommandRetryPolicy().RetryIfNotSentOnConnectionException();
default:
throw new ArgumentOutOfRangeException(key, $"Keyword '{key}' can be NoRetry, AlwaysRetry or RetryIfNotSent; the value '{value}' is not recognised.");
}
Expand Down Expand Up @@ -106,8 +106,8 @@ internal const string
Version = "version",
WriteBuffer = "writeBuffer",
CheckCertificateRevocation = "checkCertificateRevocation",
RetryCommandsOnReconnect = "RetryCommandsOnReconnect",
RetryQueueLength = "RetryQueueLength";
CommandRetryPolicy = "commandRetryPolicy",
RetryQueueMaxLength = "retryQueueMaxLength";


private static readonly Dictionary<string, string> normalizedOptions = new[]
Expand Down Expand Up @@ -138,8 +138,8 @@ internal const string
Version,
WriteBuffer,
CheckCertificateRevocation,
RetryCommandsOnReconnect,
RetryQueueLength,
CommandRetryPolicy,
RetryQueueMaxLength,
}.ToDictionary(x => x, StringComparer.OrdinalIgnoreCase);

public static string TryNormalize(string value)
Expand All @@ -160,7 +160,7 @@ public static string TryNormalize(string value)

private Version defaultVersion;

private int? keepAlive, asyncTimeout, syncTimeout, connectTimeout, responseTimeout, writeBuffer, connectRetry, configCheckSeconds, retryQueueLength;
private int? keepAlive, asyncTimeout, syncTimeout, connectTimeout, responseTimeout, writeBuffer, connectRetry, configCheckSeconds, retryQueueMaxLength;

private Proxy? proxy;

Expand Down Expand Up @@ -358,7 +358,7 @@ public bool PreserveAsyncOrder
/// <summary>
/// The retry policy to be used for command retries during connection reconnects
/// </summary>
public IRetryOnReconnectPolicy RetryCommandsOnReconnect { get; set; }
public ICommandRetryPolicy CommandRetryPolicy { get; set; }

/// <summary>
/// Indicates whether endpoints should be resolved via DNS before connecting.
Expand Down Expand Up @@ -426,9 +426,9 @@ public bool PreserveAsyncOrder
public int ConfigCheckSeconds { get { return configCheckSeconds.GetValueOrDefault(60); } set { configCheckSeconds = value; } }

/// <summary>
/// If retry policy is specified, Retry Queue max length, by default there's no queue limit
/// Maximum length of the retry queue (defaults to 100,000)
/// </summary>
public int? RetryQueueMaxLength { get; set; }
public int? RetryQueueMaxLength { get { return retryQueueMaxLength.GetValueOrDefault(100000); } set { retryQueueMaxLength = value; } }

/// <summary>
/// Parse the configuration from a comma-delimited configuration string
Expand Down Expand Up @@ -495,7 +495,7 @@ public ConfigurationOptions Clone()
ReconnectRetryPolicy = reconnectRetryPolicy,
SslProtocols = SslProtocols,
checkCertificateRevocation = checkCertificateRevocation,
RetryCommandsOnReconnect = RetryCommandsOnReconnect,
CommandRetryPolicy = CommandRetryPolicy,
RetryQueueMaxLength = RetryQueueMaxLength,
};
foreach (var item in EndPoints)
Expand Down Expand Up @@ -581,8 +581,8 @@ public string ToString(bool includePassword)
Append(sb, OptionKeys.ConfigCheckSeconds, configCheckSeconds);
Append(sb, OptionKeys.ResponseTimeout, responseTimeout);
Append(sb, OptionKeys.DefaultDatabase, DefaultDatabase);
Append(sb, OptionKeys.RetryCommandsOnReconnect, RetryCommandsOnReconnect);
Append(sb, OptionKeys.RetryQueueLength, retryQueueLength);
Append(sb, OptionKeys.CommandRetryPolicy, CommandRetryPolicy);
Append(sb, OptionKeys.RetryQueueMaxLength, retryQueueMaxLength);
commandMap?.AppendDeltas(sb);
return sb.ToString();
}
Expand Down Expand Up @@ -660,7 +660,7 @@ private static void Append(StringBuilder sb, string prefix, object value)
private void Clear()
{
ClientName = ServiceName = User = Password = tieBreaker = sslHost = configChannel = null;
keepAlive = syncTimeout = asyncTimeout = connectTimeout = writeBuffer = connectRetry = configCheckSeconds = DefaultDatabase = retryQueueLength = null;
keepAlive = syncTimeout = asyncTimeout = connectTimeout = writeBuffer = connectRetry = configCheckSeconds = DefaultDatabase = retryQueueMaxLength = null;
allowAdmin = abortOnConnectFail = highPrioritySocketThreads = resolveDns = ssl = null;
SslProtocols = null;
defaultVersion = null;
Expand All @@ -671,7 +671,7 @@ private void Clear()
CertificateValidation = null;
ChannelPrefix = default(RedisChannel);
SocketManager = null;
RetryCommandsOnReconnect = null;
CommandRetryPolicy = null;
}

object ICloneable.Clone() => Clone();
Expand Down Expand Up @@ -792,10 +792,10 @@ private void DoParse(string configuration, bool ignoreUnknown)
case OptionKeys.SslProtocols:
SslProtocols = OptionKeys.ParseSslProtocols(key, value);
break;
case OptionKeys.RetryCommandsOnReconnect:
RetryCommandsOnReconnect = OptionKeys.ParseRetryCommandsOnReconnect(key, value);
case OptionKeys.CommandRetryPolicy:
CommandRetryPolicy = OptionKeys.ParseCommandRetryPolicy(key, value);
break;
case OptionKeys.RetryQueueLength:
case OptionKeys.RetryQueueMaxLength:
RetryQueueMaxLength = OptionKeys.ParseInt32(key, value, minValue: 0);
break;
default:
Expand Down
6 changes: 3 additions & 3 deletions src/StackExchange.Redis/ConnectionMultiplexer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1287,7 +1287,7 @@ internal ServerEndPoint GetServerEndPoint(EndPoint endpoint, LogProxy log = null
/// <summary>
/// returns the current length of the retry queue
/// </summary>
public int RetryQueueLength => RetryQueueManager.RetryQueueLength;
public int CurrentRetryQueueLength => RetryQueueManager.CurrentQueueLength;

private ConnectionMultiplexer(ConfigurationOptions configuration)
{
Expand Down Expand Up @@ -2810,9 +2810,9 @@ internal bool TryMessageForRetry(Message message, Exception ex)
}

bool shouldRetry = false;
if (RawConfig.RetryCommandsOnReconnect != null && ex is RedisConnectionException)
if (RawConfig.CommandRetryPolicy != null && ex is RedisConnectionException)
{
shouldRetry = message.IsInternalCall || RawConfig.RetryCommandsOnReconnect.ShouldRetry(message.Status);
shouldRetry = message.IsInternalCall || RawConfig.CommandRetryPolicy.ShouldRetryOnConnectionException(message.Status);
}

if (shouldRetry && RetryQueueManager.TryHandleFailedCommand(message))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public async Task RetryAsyncMessageIntegration(bool retryPolicySet)
ConfigurationOptions configClient = new ConfigurationOptions();
configClient.EndPoints.Add("127.0.0.1");
configAdmin.AbortOnConnectFail = false;
configClient.RetryCommandsOnReconnect = retryPolicySet ? RetryOnReconnect.Always : null;
configClient.CommandRetryPolicy = retryPolicySet ? new CommandRetryPolicy().AlwaysRetryOnConnectionException() : null;

using (var adminMuxer = ConnectionMultiplexer.Connect(configAdmin))
using (var clientmuxer = ConnectionMultiplexer.Connect(configClient))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void ValidateMaxQueueLengthFails()
[Fact]
public async void RetryMessageSucceeds()
{
using (var muxer = Create(allowAdmin: true, retryPolicy: RetryOnReconnect.Always))
using (var muxer = Create(allowAdmin: true, retryPolicy: new CommandRetryPolicy().AlwaysRetryOnConnectionException()))
{
var conn = muxer.GetDatabase();
var duration = await conn.PingAsync().ForAwait();
Expand All @@ -51,7 +51,7 @@ public void TryHandleFailedMessageSucceedsOnEndPointAvailable()

messageRetryQueue.TryHandleFailedCommand(message);

Assert.True(messageRetryQueue.RetryQueueLength == 0);
Assert.True(messageRetryQueue.CurrentQueueLength == 0);
messageRetryHelper.Verify(failedCommand => failedCommand.TryResendAsync(message), Times.Once);
}

Expand All @@ -63,7 +63,7 @@ public void TryHandleFailedMessageWaitOnEndPointUnAvailable()

messageRetryQueue.TryHandleFailedCommand(message);

Assert.True(messageRetryQueue.RetryQueueLength == 1);
Assert.True(messageRetryQueue.CurrentQueueLength == 1);
messageRetryHelper.Verify(failedCommand => failedCommand.TryResendAsync(message), Times.Never);
}

Expand All @@ -80,7 +80,7 @@ public void TryHandleFailedMessageTimedoutEndPointAvailable()

messageRetryQueue.TryHandleFailedCommand(message);

Assert.True(messageRetryQueue.RetryQueueLength == 0);
Assert.True(messageRetryQueue.CurrentQueueLength == 0);
messageRetryHelper.Verify(failedCommand => failedCommand.TryResendAsync(message), Times.Never);
messageRetryHelper.Verify(failedCommand => failedCommand.SetExceptionAndComplete(message, timeout), Times.Once);
}
Expand All @@ -94,7 +94,7 @@ public void TryHandleFailedMessageGetEndpointThrows()

messageRetryQueue.TryHandleFailedCommand(message);

Assert.True(messageRetryQueue.RetryQueueLength == 0);
Assert.True(messageRetryQueue.CurrentQueueLength == 0);
messageRetryHelper.Verify(failedCommand => failedCommand.TryResendAsync(message), Times.Never);
messageRetryHelper.Verify(failedCommand => failedCommand.SetExceptionAndComplete(message, ex), Times.Once);
}
Expand All @@ -108,7 +108,7 @@ public void TryHandleFailedMessageDrainsQueue()
messageRetryQueue.TryHandleFailedCommand(message);
messageRetryQueue.Dispose();

Assert.True(messageRetryQueue.RetryQueueLength == 0);
Assert.True(messageRetryQueue.CurrentQueueLength == 0);
messageRetryHelper.Verify(failedCommand => failedCommand.TryResendAsync(message), Times.Never);
messageRetryHelper.Verify(failedCommand => failedCommand.SetExceptionAndComplete(message, It.IsAny<Exception>()), Times.Once);
}
Expand All @@ -125,7 +125,7 @@ public void CheckRetryForTimeoutTimesout(bool hasTimedout, int queueLength)
messageRetryQueue.TryHandleFailedCommand(message);
messageRetryQueue.CheckRetryQueueForTimeouts();

Assert.Equal(queueLength, messageRetryQueue.RetryQueueLength);
Assert.Equal(queueLength, messageRetryQueue.CurrentQueueLength);
messageRetryHelper.Verify(failedCommand => failedCommand.TryResendAsync(message), Times.Never);
messageRetryHelper.Verify(failedCommand => failedCommand.SetExceptionAndComplete(message, It.IsAny<Exception>()), Times.Exactly(hasTimedout ? 1 : 0));
}
Expand All @@ -140,7 +140,7 @@ public void TryHandleFailedMessageTimeoutThrow()

messageRetryQueue.TryHandleFailedCommand(message);

Assert.True(messageRetryQueue.RetryQueueLength == 0);
Assert.True(messageRetryQueue.CurrentQueueLength == 0);
messageRetryHelper.Verify(failedCommand => failedCommand.TryResendAsync(message), Times.Never);
messageRetryHelper.Verify(failedCommand => failedCommand.SetExceptionAndComplete(message, It.IsAny<Exception>()), Times.Once);
}
Expand Down
6 changes: 3 additions & 3 deletions tests/StackExchange.Redis.Tests/TestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ internal virtual IInternalConnectionMultiplexer Create(
bool checkConnect = true, string failMessage = null,
string channelPrefix = null, Proxy? proxy = null,
string configuration = null, bool logTransactionData = true,
bool shared = true, int? defaultDatabase = null, IRetryOnReconnectPolicy retryPolicy = null,
bool shared = true, int? defaultDatabase = null, ICommandRetryPolicy retryPolicy = null,
[CallerMemberName] string caller = null)
{
if (Output == null)
Expand Down Expand Up @@ -270,7 +270,7 @@ public static ConnectionMultiplexer CreateDefault(
string channelPrefix = null, Proxy? proxy = null,
string configuration = null, bool logTransactionData = true,
int? defaultDatabase = null,
IRetryOnReconnectPolicy retryPolicy = null,
ICommandRetryPolicy retryPolicy = null,
[CallerMemberName] string caller = null)
{
StringWriter localLog = null;
Expand Down Expand Up @@ -306,7 +306,7 @@ public static ConnectionMultiplexer CreateDefault(
if (connectTimeout != null) config.ConnectTimeout = connectTimeout.Value;
if (proxy != null) config.Proxy = proxy.Value;
if (defaultDatabase != null) config.DefaultDatabase = defaultDatabase.Value;
if (retryPolicy != null) config.RetryCommandsOnReconnect = retryPolicy;
if (retryPolicy != null) config.CommandRetryPolicy = retryPolicy;
var watch = Stopwatch.StartNew();
var task = ConnectionMultiplexer.ConnectAsync(config, log);
if (!task.Wait(config.ConnectTimeout >= (int.MaxValue / 2) ? int.MaxValue : config.ConnectTimeout * 2))
Expand Down