Skip to content

Commit 006167f

Browse files
feat(client): add retries support
1 parent b153c54 commit 006167f

File tree

4 files changed

+177
-19
lines changed

4 files changed

+177
-19
lines changed

src/Orb/Core/ClientOptions.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ public Uri BaseUrl
2121

2222
public TimeSpan Timeout { get; set; } = TimeSpan.FromMinutes(1);
2323

24+
public int MaxRetries { get; set; } = 2;
25+
2426
Lazy<string> _apiKey = new(() =>
2527
Environment.GetEnvironmentVariable("ORB_API_KEY")
2628
?? throw new OrbInvalidDataException(

src/Orb/Core/HttpResponse.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,15 @@ await Message.Content.ReadAsStreamAsync(cts.Token).ConfigureAwait(false),
3232
}
3333
}
3434

35+
public async Task<string> ReadAsString(CancellationToken cancellationToken = default)
36+
{
37+
using var cts = CancellationTokenSource.CreateLinkedTokenSource(
38+
this.CancellationToken,
39+
cancellationToken
40+
);
41+
return await Message.Content.ReadAsStringAsync(cts.Token).ConfigureAwait(false);
42+
}
43+
3544
public void Dispose()
3645
{
3746
this.Message.Dispose();

src/Orb/IOrbClient.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ public interface IOrbClient
3232

3333
TimeSpan Timeout { get; init; }
3434

35+
int MaxRetries { get; init; }
36+
3537
string APIKey { get; init; }
3638

3739
string? WebhookSecret { get; init; }

src/Orb/OrbClient.cs

Lines changed: 164 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
using System;
2+
using System.Collections.Generic;
3+
using System.IO;
4+
using System.Linq;
5+
using System.Net;
26
using System.Net.Http;
37
using System.Threading;
48
using System.Threading.Tasks;
@@ -25,6 +29,13 @@ namespace Orb;
2529

2630
public sealed class OrbClient : IOrbClient
2731
{
32+
static readonly ThreadLocal<Random> _threadLocalRandom = new(() => new Random());
33+
34+
static Random Random
35+
{
36+
get { return _threadLocalRandom.Value!; }
37+
}
38+
2839
readonly ClientOptions _options;
2940

3041
public HttpClient HttpClient
@@ -51,6 +62,12 @@ public TimeSpan Timeout
5162
init { this._options.Timeout = value; }
5263
}
5364

65+
public int MaxRetries
66+
{
67+
get { return this._options.MaxRetries; }
68+
init { this._options.MaxRetries = value; }
69+
}
70+
5471
public string APIKey
5572
{
5673
get { return this._options.APIKey; }
@@ -169,6 +186,63 @@ public async Task<HttpResponse> Execute<T>(
169186
CancellationToken cancellationToken = default
170187
)
171188
where T : ParamsBase
189+
{
190+
if (this.MaxRetries <= 0)
191+
{
192+
return await ExecuteOnce(request, cancellationToken).ConfigureAwait(false);
193+
}
194+
195+
var retries = 0;
196+
while (true)
197+
{
198+
HttpResponse? response = null;
199+
try
200+
{
201+
response = await ExecuteOnce(request, cancellationToken).ConfigureAwait(false);
202+
}
203+
catch (Exception e)
204+
{
205+
if (++retries > this.MaxRetries || !ShouldRetry(e))
206+
{
207+
throw;
208+
}
209+
}
210+
211+
if (response != null && (++retries > this.MaxRetries || !ShouldRetry(response)))
212+
{
213+
if (response.Message.IsSuccessStatusCode)
214+
{
215+
return response;
216+
}
217+
218+
try
219+
{
220+
throw OrbExceptionFactory.CreateApiException(
221+
response.Message.StatusCode,
222+
await response.ReadAsString(cancellationToken).ConfigureAwait(false)
223+
);
224+
}
225+
catch (HttpRequestException e)
226+
{
227+
throw new OrbIOException("I/O Exception", e);
228+
}
229+
finally
230+
{
231+
response.Dispose();
232+
}
233+
}
234+
235+
var backoff = ComputeRetryBackoff(retries, response);
236+
response?.Dispose();
237+
await Task.Delay(backoff, cancellationToken).ConfigureAwait(false);
238+
}
239+
}
240+
241+
async Task<HttpResponse> ExecuteOnce<T>(
242+
HttpRequest<T> request,
243+
CancellationToken cancellationToken = default
244+
)
245+
where T : ParamsBase
172246
{
173247
using HttpRequestMessage requestMessage = new(request.Method, request.Params.Url(this))
174248
{
@@ -191,29 +265,100 @@ public async Task<HttpResponse> Execute<T>(
191265
)
192266
.ConfigureAwait(false);
193267
}
194-
catch (HttpRequestException e1)
268+
catch (HttpRequestException e)
195269
{
196-
throw new OrbIOException("I/O exception", e1);
270+
throw new OrbIOException("I/O exception", e);
197271
}
198-
if (!responseMessage.IsSuccessStatusCode)
272+
return new() { Message = responseMessage, CancellationToken = cts.Token };
273+
}
274+
275+
static TimeSpan ComputeRetryBackoff(int retries, HttpResponse? response)
276+
{
277+
TimeSpan? apiBackoff = ParseRetryAfterMsHeader(response) ?? ParseRetryAfterHeader(response);
278+
if (apiBackoff != null && apiBackoff < TimeSpan.FromMinutes(1))
199279
{
200-
try
201-
{
202-
throw OrbExceptionFactory.CreateApiException(
203-
responseMessage.StatusCode,
204-
await responseMessage.Content.ReadAsStringAsync(cts.Token).ConfigureAwait(false)
205-
);
206-
}
207-
catch (HttpRequestException e)
208-
{
209-
throw new OrbIOException("I/O Exception", e);
210-
}
211-
finally
212-
{
213-
responseMessage.Dispose();
214-
}
280+
// If the API asks us to wait a certain amount of time (and it's a reasonable amount), then just
281+
// do what it says.
282+
return (TimeSpan)apiBackoff;
215283
}
216-
return new() { Message = responseMessage, CancellationToken = cts.Token };
284+
285+
// Apply exponential backoff, but not more than the max.
286+
var backoffSeconds = Math.Min(0.5 * Math.Pow(2.0, retries - 1), 8.0);
287+
var jitter = 1.0 - 0.25 * Random.NextDouble();
288+
return TimeSpan.FromSeconds(backoffSeconds * jitter);
289+
}
290+
291+
static TimeSpan? ParseRetryAfterMsHeader(HttpResponse? response)
292+
{
293+
IEnumerable<string>? headerValues = null;
294+
response?.Message.Headers.TryGetValues("Retry-After-Ms", out headerValues);
295+
var headerValue = headerValues == null ? null : Enumerable.FirstOrDefault(headerValues);
296+
if (headerValue == null)
297+
{
298+
return null;
299+
}
300+
301+
if (float.TryParse(headerValue.AsSpan(), out var retryAfterMs))
302+
{
303+
return TimeSpan.FromMilliseconds(retryAfterMs);
304+
}
305+
306+
return null;
307+
}
308+
309+
static TimeSpan? ParseRetryAfterHeader(HttpResponse? response)
310+
{
311+
IEnumerable<string>? headerValues = null;
312+
response?.Message.Headers.TryGetValues("Retry-After", out headerValues);
313+
var headerValue = headerValues == null ? null : Enumerable.FirstOrDefault(headerValues);
314+
if (headerValue == null)
315+
{
316+
return null;
317+
}
318+
319+
if (float.TryParse(headerValue.AsSpan(), out var retryAfterSeconds))
320+
{
321+
return TimeSpan.FromSeconds(retryAfterSeconds);
322+
}
323+
else if (DateTimeOffset.TryParse(headerValue.AsSpan(), out var retryAfterDate))
324+
{
325+
return retryAfterDate - DateTimeOffset.Now;
326+
}
327+
328+
return null;
329+
}
330+
331+
static bool ShouldRetry(HttpResponse response)
332+
{
333+
if (
334+
response.Message.Headers.TryGetValues("X-Should-Retry", out var headerValues)
335+
&& bool.TryParse(Enumerable.FirstOrDefault(headerValues), out var shouldRetry)
336+
)
337+
{
338+
// If the server explicitly says whether to retry, then we obey.
339+
return shouldRetry;
340+
}
341+
342+
return response.Message.StatusCode switch
343+
{
344+
// Retry on request timeouts
345+
HttpStatusCode.RequestTimeout
346+
or
347+
// Retry on lock timeouts
348+
HttpStatusCode.Conflict
349+
or
350+
// Retry on rate limits
351+
HttpStatusCode.TooManyRequests
352+
or
353+
// Retry internal errors
354+
>= HttpStatusCode.InternalServerError => true,
355+
_ => false,
356+
};
357+
}
358+
359+
static bool ShouldRetry(Exception e)
360+
{
361+
return e is IOException || e is OrbIOException;
217362
}
218363

219364
public OrbClient()

0 commit comments

Comments
 (0)