diff --git a/README.md b/README.md index 47e73daa..16922bdf 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,83 @@ -**Rate-limiting pattern** +# Rate Limiting library for .NET + +Below is the description of the proposed solution. The original poblem statement is retained for reference further below. + +## Proposed Solution + +### Considerations + +In approaching the problem, the following considerations, assumptions and acknowledged limitations have been considered: + +* Generic framework
+ Rate-limiting is frequently associated with web frameworks in handling of incoming HTTP requests. However, it's not necessarily limited to such domain, and the need can manifest itself in other communication contexts, such as: + + * RPC (e.g. gRPC, XML-RPC, etc.) + * Message-oriented protocols (e.g. IRC-like chats) + * Database engines (e.g. some SQL db engine using rate limiter as query and execution governor limiting the rate a client can submit queries) + * Even in telephony, limiting incoming call volume and frequency of calls from a particular caller + * ... and so on. + + Application contexts are unlimited. As such, the proposed solution attempts to make no assumption of contex, and strives to be as generic as possible. It sets up a general execution framework, while leaving specifics of applied domain to be delegated to a higher implementation layer via extensibility. + + One example of such consideration is leaving it out to processor to decide how to handle approved and denied requests. In the hypothetical reference example of `UberRpcRequestProcessor`, I chose to use lambda callbacks for communicating back evaluation of the request. But I could've chosen to use `event` C# mechanism, or leave it to decorate request with properties describing the result. But I explicitly avoided defining such contract in `IRequestProcessor` interface -- to be most generic and make least assumptions as possible. +* Asynchronous support
+ It may appear that `async`/`await` is used without justified need, as library doesn't itself deal with any IO or similar operations that would justify use of async model. However, pursuant to consideration stated earlier, it does so to allow implementors and extensions to perform asynchronous operation if their needs so require. +* Applicaction adaptability and flexibility
+ Processor really defines the framework of how policies are declared, managed, configured, etc. For some applications it may make sense to have hard attributes, in others, like in ASP.NET, using named attributes with preconfigured policy templates, yet in others maybe processor wants to have some kind of callback to custom evaluate each potential query for a very fluid and dynamic policy specification (e.g. realtime datastore query to see which target points points are configured how at this precise point in time.), etc.. IOW, this library doesn't impose a particular framework for how to specify and manage policy mappings, and leaving it to the specific processor adapter that's written to specifically plug into a particular framework or service natively. +* Distributed support
+ Consideration here is that modern systems typically have more than one machine in the pool to handle services (i.e. a web farm), and thus rate limiter use may be "global" to cover the whole service pool as a single rate-limited entity. As such, library supports extensibility for data store beyond local memory, albeit the default example `DefaultStateProvider` is limited to local memory of a single box. Nevertheless, it should be easy to see how another state provider can be made to utilize SQL Server, memcached, Reddis, or other applicable storage and/or caching technologies that would allow for distributed/scaled support across farms. + +### Design + +The design can generally be seen broken down in 3 major parts: + +* `RateLimiter`, the main class to which a request is passed for rate limiting evaluation. It is generic and non-service specific. It uses service-specific implementation of RequestProcessor for interoperability with applicable service. +* RequestProcessor, a class that implements `IRequestProcessor` that has the following responsibilities. + * Integration into service context and injection of its request handling in service's request handling pipeline. + * Passing allowed requests to service and handling of rejected requests in service meaningful way. + * Mapping of service's way of marking of applicable policies to requests and provided services, e.g. API endpoints.
+ For example, one specific implementation may use attribute decorations on handling methods to specify endpoints' applicable policies. Yet another may use attributes to decorate message payload types or possibly message payload fields to describe applicable policies. Yet another way may be to have a registry generated on startup that provides mapping lookups at runtime for applicable policies. It all depends on the what is meaningful to the underlying services for which provider is written and how implementers chose to handle the policy references. +* PolicyStateProvidera class that implements `IPolicyStateProvider` that is responsible for providing storage mechanism for policies' state and concrete implementation of general policy types. + +`RateLimiter` is instantiated with a specific RequestProcessor for specific service. + +RequestProcessor knows how to integrate into its specific service, how to handle accepted and rejected requests, and how to create mapping of endpoints and policies. It also uses state provider for policy data store needs. See example implementation `UberRpcServiceProcessor` for a hypothetical `UberRpcService` service. + +#### Extensibility + +Extensibility is achieved by keeping `RateLimiter` bare bones, deferring customization to the processor and state providers. + +Besides the provided basic policies, users can define additional policy types by simply extening from `RatePolicy`, and extending state provider to provide implementation of evaluation of that policy. + +The state provider itself is expected to be written such that it can be extended in some form. + +##### New Services + +After you had a chance to see how UberRpc is handled, now lets suppose that we want to use `RateLimter` with a new QuantumService. This `QuantumServiceProcessor` would be a new processor (defined in some new library/project) that knows how to plug in into QuantumService. However, quantrum world being bizarre, we now need a new policy kind that is not already provided by current solution. Let's call this new policy `SchrödingerCatRatePolicy` and derive it from `RatePolicy`, and give it a property `WaveFunction` of some relevant type. + +For now, we'll be happy with running on a single box and thus happy with `DefaultStateProvider` that just uses local memory. However, this provider doesn't know how to handle `SchrödingerCatRatePolicy`. Thus we can create new `QuantumLocalMemoryStateProvider` and derive it from `DefaultStateProvider` so that we don't need to reimplement existing policy kinds. + +In `QuantumLocalMemoryStateProvider` we'd `override GetPolicy()` to recognize new policy and provide concrete implementation, and call base for all others. The concrete `ConcreteSchrödingerCatRatePolicy` will then have implementation of its logic of evaluating and collapsing policy's `WaveFunction` to determine the fate of the cat request, and memory for storing necessary data to perform its evaluations. + +Thus, we now provided new service support, and provided support for new policy type by extending existing code and only providing the additional, marginal functionality necessary for the new service. + +##### New State Stores + +Local memory-based `DefaultStateProvider` is just a default, simple provider, but not necessarily suitable for service farms, since policy state is local to each member of the farm. + +It's possible, however, to extend rate limiter by implementing new state providers. Some such candidates are ones that use SQLServer, PostgreSQL, Reddis, Memcached, etc. + +Such implementations would, at minimum, need to provide concrete implementations for the basic policies, so as to know how to evaluate them using data stored in the respective technologies. + +When creating such new state providers, implementors should consider extensibility considerations in-turn, such as the example above where `SchrödingerCatRatePolicy` may be desired to work with non-local memory but with new state provider, and that new state provider can in-turn be extended to add support for `SchrödingerCatRatePolicy` while reusing all other policies. + +### Limitations +* There are a lot more unit tests possible, and had to limit to basic, illustrative ones for time reasons. +* Some test cases aren't supported, due to their fuzzy nature. E.g. library hints for light support for efficiency mode to avoid locking at expense of imprecise results. Performing statistical testing to cover these is beyond scope. Some can be handled using the Fakes lib, which, too, kept out of scope. But, given the nature of the middleware and desire for middleware to be efficient, I did give a sample code on how rate limiter library can be made flexible and allow for flexible tradeoffs. +* `DefaultDataStore` internally assumes that it's the only instance in the process, and only one `RateLimiter` using it exists. It's not, however a hard limitation, and it could be changed with a bit more code to not have this limitation in the future. + +## Original Problem Statement +**Rate-limiting pattern** Rate limiting involves restricting the number of requests that a client can make. A client is identified with an access token, which is used for every request to a resource. @@ -17,7 +96,7 @@ The goal is to design a class(-es) that manages each API resource's rate limits We're more interested in the design itself than in some intelligent and tricky rate-limiting algorithm. There is no need to use a database (in-memory storage is fine) or any web framework. Do not waste time on preparing complex environment, reusable class library covered by a set of tests is more than enough. -There is a Test Project set up for you to use. However, you are welcome to create your own test project and use whatever test runner you like. +There is a Test Project set up for you to use. However, you are welcome to create your own test project and use whatever test runner you like. You are welcome to ask any questions regarding the requirements—treat us as product owners, analysts, or whoever knows the business. If you have any questions or concerns, please submit them as a [GitHub issue](https://github.com/crexi-dev/rate-limiter/issues). diff --git a/RateLimiter.Tests/RateLimiter.Tests.csproj b/RateLimiter.Tests/RateLimiter.Tests.csproj index 5cbfc4e8..21684bcb 100644 --- a/RateLimiter.Tests/RateLimiter.Tests.csproj +++ b/RateLimiter.Tests/RateLimiter.Tests.csproj @@ -1,15 +1,16 @@  - net6.0 + net9.0 latest enable - + + - \ No newline at end of file + \ No newline at end of file diff --git a/RateLimiter.Tests/RateLimiterTest.cs b/RateLimiter.Tests/RateLimiterTest.cs index 172d44a7..e36e1f2d 100644 --- a/RateLimiter.Tests/RateLimiterTest.cs +++ b/RateLimiter.Tests/RateLimiterTest.cs @@ -1,13 +1,506 @@ -using NUnit.Framework; +using System; +using System.Linq; +using System.Reflection; +using System.Threading; +using System.Threading.Tasks; +using NUnit.Framework; +using RateLimiter.Policies; +using RateLimiter.Policies.StateProviders; +using RateLimiter.UberRpc; +using UberRpcService; namespace RateLimiter.Tests; [TestFixture] public class RateLimiterTest { - [Test] - public void Example() + class TestRpcService { - Assert.That(true, Is.True); + public Task PlainCallAsync() + { + return Task.CompletedTask; + } + + private uint _concurrentOneDegreePolicyCalls = 0; + [ConcurrentWindowRatePolicy(1)] + public async Task ConcurrentOneDegreePolicyCall(TimeSpan timeSpan) + { + var call = Interlocked.Increment(ref _concurrentOneDegreePolicyCalls); + await Task.Delay(timeSpan); + return call; + } + + private uint _concurrentHighDegreePolicyCalls = 0; + [ConcurrentWindowRatePolicy(1000, ArgIndecies = [], WithAuthContext = false)] + public async Task ConcurrentHighDegreePolicyCall(TimeSpan timeSpan) + { + var call = Interlocked.Increment(ref _concurrentHighDegreePolicyCalls); + await Task.Delay(timeSpan); + return call; + } + + private uint _fixedPolicyCalls = 0; + [FixedWindowRatePolicy(100, 10, ArgIndecies = [], WithAuthContext = false)] + public async Task FixedWindowPolicyCall(TimeSpan timeSpan) + { + var call = Interlocked.Increment(ref _fixedPolicyCalls); + await Task.Delay(timeSpan); + return call; + } + + private uint _fixedPolicyCallsToo = 0; + [FixedWindowRatePolicy(100, 10)] + public async Task FixedWindowPolicyCallToo(TimeSpan timeSpan) + { + var call = Interlocked.Increment(ref _fixedPolicyCallsToo); + await Task.Delay(timeSpan); + return call; + } + + private uint _fixedPolicyWithAuthCalls = 0; + [FixedWindowRatePolicy(100, 10, ArgIndecies = [], WithAuthContext = true)] + public async Task FixedWindowPolicyWithAuthCall(TimeSpan timeSpan) + { + var call = Interlocked.Increment(ref _fixedPolicyWithAuthCalls); + await Task.Delay(timeSpan); + return call; + } + + private uint _fixedPolicyNullArgIndeciesCalls = 0; + [FixedWindowRatePolicy(100, 10)] + public async Task FixedWindowPolicyNullArgIndeciesCall(TimeSpan timeSpan) + { + var call = Interlocked.Increment(ref _fixedPolicyNullArgIndeciesCalls); + await Task.Delay(timeSpan); + return call; + } + + private uint _fixedPolicyBadArgIndexCalls = 0; + [FixedWindowRatePolicy(100, 10, ArgIndecies = [10, 20])] + public async Task FixedWindowPolicyBadArgIndexCall(TimeSpan timeSpan) + { + var call = Interlocked.Increment(ref _fixedPolicyBadArgIndexCalls); + await Task.Delay(timeSpan); + return call; + } + + private uint _slidingPolicyCalls = 0; + [SlidingWindowRatePolicy(100, 10, ArgIndecies = [], WithAuthContext = false)] + public async Task SlidingWindowPolicyCall(TimeSpan timeSpan) + { + var call = Interlocked.Increment(ref _slidingPolicyCalls); + await Task.Delay(timeSpan); + return call; + } + + // Test with multiple args + }; + + private static RateLimiter SetupRateLimitedService(Func accepted, Func denied) + { + var rpcService = new UberRpcService(new TestRpcService()); + var requestProcessor = new UberRpcRequestProcessor( + new DefaultStateProvider( + new DefaultStateProvider.Options { + // In these set of tests, we test the strict behavior as it's more precise. + // testing performance is tricker since, by its proposed nature, the behavior + // would be approximate and thus stocastic, requiring a more elaborate testing + // with some statistical analysis that's currently beyond the scope. + // It's a TODO for the future. + Precision = DefaultStateProvider.Options.PrecisionOption.Strict + }), + accepted, denied + ); + return new RateLimiter(requestProcessor); + } + + + [Test(Description = "Most basic test that minimally validates that plain call works, without rate limits.")] + public async Task PlainCallWorks() + { + var accepted = false; + var denied = false; + var rpcService = new UberRpcService(new TestRpcService()); + var rateLimiter = SetupRateLimitedService(request => { + accepted = true; + return Task.CompletedTask; + }, request => { + denied = true; + return Task.CompletedTask; + }); + + var incomingMessage = new UberRpcServiceCallMessage // i.e. incomingMessage = transport.Read(); + { + MethodName = nameof(TestRpcService.PlainCallAsync), + }; + + await rateLimiter.CheckRequest(new UberRpcRequest(incomingMessage)); + + Assert.That(accepted, Is.True); + Assert.That(denied, Is.False); + } + + [Test(Description = $"Validates {nameof(ConcurrentWindowRatePolicy)} of single {nameof(ConcurrentWindowRatePolicy.Degree)} is properly enforced.")] + public async Task ConcurrentOneDegreeCallWorks() + { + const string ContextAccepted = "Accepted"; + var rpcService = new UberRpcService(new TestRpcService()); + var rateLimiter = SetupRateLimitedService(async request => { + request.Context[ContextAccepted] = true; + await rpcService.InvokeRpcCall(request.Request); + }, request => { + request.Context[ContextAccepted] = false; + return Task.CompletedTask; + }); + + var incomingMessage1 = new UberRpcServiceCallMessage // i.e. incomingMessage = transport.Read(); + { + MethodName = nameof(TestRpcService.ConcurrentOneDegreePolicyCall), + Args = [TimeSpan.FromMilliseconds(100)] + }; + var incomingMessage2 = new UberRpcServiceCallMessage // i.e. incomingMessage = transport.Read(); + { + MethodName = nameof(TestRpcService.ConcurrentOneDegreePolicyCall), + Args = [TimeSpan.FromMilliseconds(100)] + }; + + UberRpcRequest request1 = new(incomingMessage1); + UberRpcRequest request2 = new(incomingMessage2); + var call1Task = rateLimiter.CheckRequest(request1); + var call2Task = rateLimiter.CheckRequest(request2); + + await Task.WhenAll(call1Task, call2Task); + + Assert.That(request1.Context[ContextAccepted], Is.True); + Assert.That(request2.Context[ContextAccepted], Is.False); + } + + [Test(Description = $"Validates {nameof(ConcurrentWindowRatePolicy)} of some high {nameof(ConcurrentWindowRatePolicy.Degree)} is properly enforced.")] + public async Task ConcurrentHighDegreeCallWorks() + { + const string ContextAccepted = "Accepted"; + const string ContextSequence = "Sequence"; + var PolicyHighDegree = typeof(TestRpcService) + .GetMethod(nameof(TestRpcService.ConcurrentHighDegreePolicyCall))! + .GetCustomAttribute()! + .Degree; + var rpcService = new UberRpcService(new TestRpcService()); + var rateLimiter = SetupRateLimitedService(async request => { + request.Context[ContextAccepted] = true; + await rpcService.InvokeRpcCall(request.Request); + }, request => { + request.Context[ContextAccepted] = false; + return Task.CompletedTask; + }); + + var requestTasks = Enumerable.Range(0, (int)(PolicyHighDegree * 2.5)) + .Select(n => { + var request = new UberRpcRequest (new UberRpcServiceCallMessage { + MethodName = nameof(TestRpcService.ConcurrentHighDegreePolicyCall), + Args = [TimeSpan.FromMilliseconds(n % 2 == 0 ? 100 : 200)], + }); + request.Context[ContextSequence] = n; + return request; + }) + .Select(async r => { + await Task.Delay(r.Context[ContextSequence] < PolicyHighDegree * 2 ? 0 : 150); + await rateLimiter.CheckRequest(r); + return r; + }).ToArray(); + + await Task.WhenAll(requestTasks); + + var requests = requestTasks.Select(t => t.Result).ToArray(); + + for (var i = 0; i < PolicyHighDegree; ++i) + { + Assert.That(requests[i].Context[ContextAccepted], Is.True); + } + for (var i = PolicyHighDegree; i < PolicyHighDegree * 2; ++i) + { + Assert.That(requests[i].Context[ContextAccepted], Is.False); + } + for (var i = PolicyHighDegree * 2; i < (int)(PolicyHighDegree * 2.5); ++i) + { + Assert.That(requests[i].Context[ContextAccepted], Is.True); + } + } + + [Test(Description = $"Validates {nameof(FixedWindowRatePolicy)} of some {nameof(FixedWindowRatePolicy.PeriodQuantity)} over {nameof(FixedWindowRatePolicy.Period)} is properly enforced.")] + public async Task FixedWindowCallWorks() + { + const string ContextAccepted = "Accepted"; + const string ContextSequence = "Sequence"; + var attribute = typeof(TestRpcService) + .GetMethod(nameof(TestRpcService.FixedWindowPolicyCall))! + .GetCustomAttribute()!; + var PeriodQuantity = attribute.PeriodQuantity; + var Period = attribute.Period; + var rpcService = new UberRpcService(new TestRpcService()); + var rateLimiter = SetupRateLimitedService(async request => { + request.Context[ContextAccepted] = true; + await rpcService.InvokeRpcCall(request.Request); + }, request => { + request.Context[ContextAccepted] = false; + return Task.CompletedTask; + }); + + var requestTasks = Enumerable.Range(0, (int)(PeriodQuantity * 3)) + .Select(n => { + var request = new UberRpcRequest (new UberRpcServiceCallMessage { + MethodName = nameof(TestRpcService.FixedWindowPolicyCall), + Args = [Period], + }); + request.Context[ContextSequence] = n; + return request; + }) + .Select(async r => { + await Task.Delay(r.Context[ContextSequence] < PeriodQuantity * 2 ? TimeSpan.Zero : Period); + await rateLimiter.CheckRequest(r); + return r; + }).ToArray(); + + await Task.WhenAll(requestTasks); + + var requests = requestTasks.Select(t => t.Result).ToArray(); + + for (var i = 0; i < PeriodQuantity; ++i) + { + Assert.That(requests[i].Context[ContextAccepted], Is.True); + } + for (var i = PeriodQuantity; i < PeriodQuantity * 2; ++i) + { + Assert.That(requests[i].Context[ContextAccepted], Is.False); + } + for (var i = PeriodQuantity * 2; i < (int)(PeriodQuantity * 3); ++i) + { + Assert.That(requests[i].Context[ContextAccepted], Is.True); + } + } + + [Test( Description = $"Validates {nameof(SlidingWindowRatePolicy)} of some {nameof(SlidingWindowRatePolicy.PeriodQuantity)} over {nameof(SlidingWindowRatePolicy.Period)} is properly enforced.")] + [Ignore("This test is timing sensitive to and highly influenced by thread/task scheduling, giving unstable results. It needs Fakes library to control for DateTime values")] + public async Task SlidingWindowCallWorks() + { + const string ContextAccepted = "Accepted"; + const string ContextSequence = "Sequence"; + var attribute = typeof(TestRpcService) + .GetMethod(nameof(TestRpcService.SlidingWindowPolicyCall))! + .GetCustomAttribute()!; + var PeriodQuantity = attribute.PeriodQuantity; + var Period = attribute.Period; + var rpcService = new UberRpcService(new TestRpcService()); + var rateLimiter = SetupRateLimitedService(async request => { + request.Context[ContextAccepted] = true; + await rpcService.InvokeRpcCall(request.Request); + }, request => { + request.Context[ContextAccepted] = false; + return Task.CompletedTask; + }); + + var requestTasks = Enumerable.Range(0, (int)(PeriodQuantity * 3)) + .Select(n => { + var request = new UberRpcRequest (new UberRpcServiceCallMessage { + MethodName = nameof(TestRpcService.SlidingWindowPolicyCall), + Args = [Period], + }); + request.Context[ContextSequence] = n; + return request; + }) + .Select(async r => { + var sequence = (int)r.Context[ContextSequence]; + var delay = sequence * Period / PeriodQuantity / (sequence < PeriodQuantity * 2 ? 2 : 4) + + (sequence < PeriodQuantity * 2 ? TimeSpan.Zero : (Period / 2)); + await Task.Delay(delay); + await rateLimiter.CheckRequest(r); + return r; + }).ToArray(); + + await Task.WhenAll(requestTasks); + + var requests = requestTasks.Select(t => t.Result).ToArray(); + + for (var i = 0; i < PeriodQuantity; ++i) + { + Assert.That(requests[i].Context[ContextAccepted], Is.True); + } + for (var i = PeriodQuantity; i < PeriodQuantity * 2; ++i) + { + Assert.That(requests[i].Context[ContextAccepted], Is.False); + } + + int allowed = 0; + for (var i = PeriodQuantity * 2; i < (int)(PeriodQuantity * 3); ++i) + { + allowed += requests[i].Context[ContextAccepted]; + } + + Assert.That(allowed, Is.EqualTo(PeriodQuantity / 2)); + } + + [Test(Description = $"Validates {nameof(FixedWindowRatePolicy)} property differentiates two separate methods with the same policy.")] + public async Task PolicyDiscriminatesOnMethodsCorrectly() + { + const string ContextAccepted = "Accepted"; + const string ContextSequence = "Sequence"; + var attribute = typeof(TestRpcService) + .GetMethod(nameof(TestRpcService.FixedWindowPolicyCall))! + .GetCustomAttribute()!; + var PeriodQuantity = attribute.PeriodQuantity; + var Period = attribute.Period; + var rpcService = new UberRpcService(new TestRpcService()); + var rateLimiter = SetupRateLimitedService(async request => { + request.Context[ContextAccepted] = true; + await rpcService.InvokeRpcCall(request.Request); + }, request => { + request.Context[ContextAccepted] = false; + return Task.CompletedTask; + }); + + var requestTasks = Enumerable.Range(0, (int)(PeriodQuantity * 3)) + .SelectMany(n => { + var request = new UberRpcRequest (new UberRpcServiceCallMessage { + MethodName =nameof(TestRpcService.FixedWindowPolicyCall), + Args = [Period], + }); + request.Context[ContextSequence] = n; + var requestToo = new UberRpcRequest (new UberRpcServiceCallMessage { + MethodName = nameof(TestRpcService.FixedWindowPolicyCallToo), + Args = [Period], + }); + requestToo.Context[ContextSequence] = n; + return new[] { request, requestToo }; + }) + .Select(async r => { + await Task.Delay(r.Context[ContextSequence] < PeriodQuantity * 2 ? TimeSpan.Zero : Period); + await rateLimiter.CheckRequest(r); + return r; + }).ToArray(); + + await Task.WhenAll(requestTasks); + + var requests = requestTasks.Select(t => t.Result).ToArray(); + + for (var i = 0; i < PeriodQuantity * 2; ++i) + { + Assert.That(requests[i].Context[ContextAccepted], Is.True); + } + for (var i = PeriodQuantity * 2; i < PeriodQuantity * 4; ++i) + { + Assert.That(requests[i].Context[ContextAccepted], Is.False); + } + for (var i = PeriodQuantity * 4; i < PeriodQuantity * 6; ++i) + { + Assert.That(requests[i].Context[ContextAccepted], Is.True); + } + } + + [Test(Description = $"Validates {nameof(FixedWindowRatePolicy)} property differentiates two separate auth contexts with the same policy.")] + public async Task PolicyDiscriminatesOnAuthContextCorrectly() + { + const string ContextAccepted = "Accepted"; + const string ContextSequence = "Sequence"; + var attribute = typeof(TestRpcService) + .GetMethod(nameof(TestRpcService.FixedWindowPolicyWithAuthCall))! + .GetCustomAttribute()!; + var PeriodQuantity = attribute.PeriodQuantity; + var Period = attribute.Period; + var rpcService = new UberRpcService(new TestRpcService()); + var rateLimiter = SetupRateLimitedService(async request => { + request.Context[ContextAccepted] = true; + await rpcService.InvokeRpcCall(request.Request); + }, request => { + request.Context[ContextAccepted] = false; + return Task.CompletedTask; + }); + + var requestTasks = Enumerable.Range(0, (int)(PeriodQuantity * 6)) + .Select(n => { + var request = new UberRpcRequest (new UberRpcServiceCallMessage { + MethodName =nameof(TestRpcService.FixedWindowPolicyWithAuthCall), + AuthorizationContext = n % 2, + Args = [Period], + }); + request.Context[ContextSequence] = n; + return request; + }) + .Select(async r => { + await Task.Delay(r.Context[ContextSequence] < PeriodQuantity * 4 ? TimeSpan.Zero : Period); + await rateLimiter.CheckRequest(r); + return r; + }).ToArray(); + + await Task.WhenAll(requestTasks); + + var requests = requestTasks.Select(t => t.Result).ToArray(); + + for (var i = 0; i < PeriodQuantity * 2; ++i) + { + Assert.That(requests[i].Context[ContextAccepted], Is.True); + } + for (var i = PeriodQuantity * 2; i < PeriodQuantity * 4; ++i) + { + Assert.That(requests[i].Context[ContextAccepted], Is.False); + } + for (var i = PeriodQuantity * 4; i < PeriodQuantity * 6; ++i) + { + Assert.That(requests[i].Context[ContextAccepted], Is.True); + } + } + + [Test(Description = $"Validates {nameof(FixedWindowRatePolicy)} property handles attribute with no indecies mentioned.")] + public async Task PolicyHandlesNoArgIndeciesMentionedCorrectly() + { + const string ContextAccepted = "Accepted"; + var attribute = typeof(TestRpcService) + .GetMethod(nameof(TestRpcService.FixedWindowPolicyNullArgIndeciesCall))! + .GetCustomAttribute()!; + var PeriodQuantity = attribute.PeriodQuantity; + var Period = attribute.Period; + var rpcService = new UberRpcService(new TestRpcService()); + var rateLimiter = SetupRateLimitedService(async request => { + request.Context[ContextAccepted] = true; + await rpcService.InvokeRpcCall(request.Request); + }, request => { + request.Context[ContextAccepted] = false; + return Task.CompletedTask; + }); + + var request = new UberRpcRequest (new UberRpcServiceCallMessage { + MethodName =nameof(TestRpcService.FixedWindowPolicyNullArgIndeciesCall), + Args = [Period], + }); + + await rateLimiter.CheckRequest(request); + + Assert.That(request.Context[ContextAccepted], Is.True); + } + + [Test(Description = $"Validates {nameof(FixedWindowRatePolicy)} property handles attribute with out of range arg index specified.")] + public void PolicyHandlesBadArgIndexCorrectly() + { + const string ContextAccepted = "Accepted"; + var attribute = typeof(TestRpcService) + .GetMethod(nameof(TestRpcService.FixedWindowPolicyBadArgIndexCall))! + .GetCustomAttribute()!; + var PeriodQuantity = attribute.PeriodQuantity; + var Period = attribute.Period; + var rpcService = new UberRpcService(new TestRpcService()); + var rateLimiter = SetupRateLimitedService(async request => { + request.Context[ContextAccepted] = true; + await rpcService.InvokeRpcCall(request.Request); + }, request => { + request.Context[ContextAccepted] = false; + return Task.CompletedTask; + }); + + var request = new UberRpcRequest (new UberRpcServiceCallMessage { + MethodName =nameof(TestRpcService.FixedWindowPolicyBadArgIndexCall), + Args = [Period], + }); + + Assert.ThrowsAsync(() => + rateLimiter.CheckRequest(request) + ); } } \ No newline at end of file diff --git a/RateLimiter.sln b/RateLimiter.sln index 626a7bfa..5dadfaf0 100644 --- a/RateLimiter.sln +++ b/RateLimiter.sln @@ -7,6 +7,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RateLimiter", "RateLimiter\ EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RateLimiter.Tests", "RateLimiter.Tests\RateLimiter.Tests.csproj", "{C4F9249B-010E-46BE-94B8-DD20D82F1E60}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "UberRpcService", "UberRpcService\UberRpcService.csproj", "{C4F9249B-010E-46BE-94B8-DD20D82F1E61}" +EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{9B206889-9841-4B5E-B79B-D5B2610CCCFF}" ProjectSection(SolutionItems) = preProject README.md = README.md @@ -26,6 +28,10 @@ Global {C4F9249B-010E-46BE-94B8-DD20D82F1E60}.Debug|Any CPU.Build.0 = Debug|Any CPU {C4F9249B-010E-46BE-94B8-DD20D82F1E60}.Release|Any CPU.ActiveCfg = Release|Any CPU {C4F9249B-010E-46BE-94B8-DD20D82F1E60}.Release|Any CPU.Build.0 = Release|Any CPU + {C4F9249B-010E-46BE-94B8-DD20D82F1E61}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C4F9249B-010E-46BE-94B8-DD20D82F1E61}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C4F9249B-010E-46BE-94B8-DD20D82F1E61}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C4F9249B-010E-46BE-94B8-DD20D82F1E61}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/RateLimiter/Policies/RatePolicy.cs b/RateLimiter/Policies/RatePolicy.cs new file mode 100644 index 00000000..08ca6434 --- /dev/null +++ b/RateLimiter/Policies/RatePolicy.cs @@ -0,0 +1,148 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace RateLimiter.Policies +{ + // TODO: Idea here is that sometimes it may be desireable to deny rejected request, + // while at other times it may be desireable instead of rejecting to simply queue + // and await when policy has an allocation to grant, without immeidately being + // rejected. Not implemented, but retained here for the idea. + // As well as possibly future use for other set of options. + // public readonly struct RatePolicyOptions + // { + // public bool AwaitLease { get; init; } = false; + + // public RatePolicyOptions() + // { + // } + // } + + /// + /// Base class for all rate policies. + /// + /// + /// + /// This is the base class that all types of policies must derive from, and provides + /// general mean of evaluating policy's grant for a rquest. + /// + /// + /// In general, the inheritance hierarchy is expected to be ConcreteKindRatePolicy : KindRatePolicy : RatePolicy , where: + /// + /// + /// RatePolicy + /// Base for all policies (this class). + /// + /// + /// KindRatePolicy + /// + /// The kind of policy, like , or + /// that are still classes + /// that define properties of the policy and are to be inherited by state provider's specific + /// implementations. + /// + /// + /// + /// ConcreteKindRatePolicy + /// + /// Concrete, store-provider-specific implementation of KindRatePolicy. + /// This implementation would typically deal with actual evaluation of policy with + /// knowledge how to retrieve and persist policy state information with that specifc + /// state store mechanism. + /// + /// + /// + /// + /// + /// While library currently provides a few Kind of policies -- namely , + /// , and , along with + /// that implements them all -- new kinds of policies + /// can be introduced, thereby extending provided set. When new kind of policies are introduced, a state + /// provider supporting those also needs to be introduced (a new one, or one extending support from + /// existing ones). Newly introduced, generally reusable policies are candidates to be included in + /// the general distribution of the library. + /// + /// + public abstract class RatePolicy + { + // See above comment. + // protected RatePolicyOptions PolicyOptions { get; init; } + + /// + /// Obtains lease for a specified request key. Caller should examine + /// to see if request is granted by the policy. + /// + /// Key of the request scope by which allocation bucket policy is to be evaluated. + /// Cancellation token. + /// Async; An instance of . + public abstract Task ObtainLease(string requestPolicyKey, CancellationToken cancel = default); + } + + /// + /// Fixed-window policy. + /// + /// + /// + /// Fixed-window policy is a kind where a fixed number of + /// requests are allowed during a . Requests can be + /// granted anywhere during the period upto the allowed quantity, and then quantity is not + /// replenished until new period begins. + /// + /// + public abstract class FixedWindowRatePolicy : RatePolicy + { + /// + /// Period for replenishment. + /// + public TimeSpan Period { get; set; } + + /// + /// Allowed quantity per period. + /// + public uint PeriodQuantity { get; set; } + } + + /// + /// Sliding-window policy. + /// + /// + /// + /// Sliding-window policy is a kind where a fixed number of + /// requests are allowed during a , where period "slides" + /// with the request. In other words, at any point in time, it looks back the exact period to ensure new + /// request doesn't exceed total allowed requests. The moment prior request expires out of the look-back + /// window, it's allocation is now available back in the pool. + /// + /// + public abstract class SlidingWindowRatePolicy : RatePolicy + { + /// + /// Sliding period for replenishment. + /// + public TimeSpan Period { get; set; } + + /// + /// Allowed quantity per period. + /// + public uint PeriodQuantity { get; set; } + } + + /// + /// Concurrent-window policy. + /// + /// + /// + /// Concurrent-window policy is a kind where there is a strict + /// limit on the number of concurrently executing requests. The moment request + /// completes execution, its allocation is immediately returned to the pool to + /// be available for use by the next request. + /// + /// + public abstract class ConcurrentWindowRatePolicy : RatePolicy + { + /// + /// Concurrency degree. I.e. number of allowed concurrently executing requests. + /// + public uint Degree { get; set; } + } +} \ No newline at end of file diff --git a/RateLimiter/Policies/StateProviders/DefaultStateProvider.cs b/RateLimiter/Policies/StateProviders/DefaultStateProvider.cs new file mode 100644 index 00000000..5732a918 --- /dev/null +++ b/RateLimiter/Policies/StateProviders/DefaultStateProvider.cs @@ -0,0 +1,274 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace RateLimiter.Policies.StateProviders; + +/// +/// Policy state provider that uses locally memory for storing policy state. +/// +/// +/// +/// Note that is not designed or intended to be used with more than +/// one instance in the same process. Due to use of static members to store +/// data, multiple instances can potentially interact with unintended effects, especially in case where +/// request keys collide. While limitation exists currently, in the future, it may be eliminated +/// to allow for more flexibility, if it's determined that more than one rate limiter backed by this +/// provider support is needed. +/// +/// +public class DefaultStateProvider : IPolicyStateProvider +{ + private static readonly object _dummy = new(); // hack: dummy var to make switch expression on template type work syntactically. + + /// + /// Options to specify behavior of . + /// + public struct Options + { + /// + /// Enum to indicate precision options. + /// + public enum PrecisionOption + { + /// + /// Strict precision. + /// + /// + /// + /// Indicates that policy should favor use of strict precision evaluation algorithms over performance, valuing exactness over performance. + /// + /// + Strict, + + /// + /// Performance precision. + /// + /// + /// + /// Indicates that policy should favor performance over strict precision evaluation algorithms valuing performance over exactness, thus allowing for possibility of exceeding specified policy limits. + /// + /// + Performance + } + + /// + /// Precision to use when evaluating policies. + /// + public PrecisionOption Precision { get; init; } + } + + private readonly Options _options; + + public DefaultStateProvider() : this(new Options { Precision = Options.PrecisionOption.Strict}) + { } + + public DefaultStateProvider(Options options) + { + _options = options; + } + + public TPolicyType GetPolicy() where TPolicyType : RatePolicy + { + return (_dummy switch + { + object when typeof(TPolicyType) == typeof(Policies.FixedWindowRatePolicy) => new FixedWindowRatePolicy(_options) as TPolicyType, + object when typeof(TPolicyType) == typeof(Policies.SlidingWindowRatePolicy) => new SlidingWindowRatePolicy(_options) as TPolicyType, + object when typeof(TPolicyType) == typeof(Policies.ConcurrentWindowRatePolicy) => new ConcurrentWindowRatePolicy() as TPolicyType, + _ => throw new Exception($"Unsupporeted policy type {typeof(TPolicyType).Name}"), + })!; + } + + private sealed class ConcurrentWindowRatePolicy : Policies.ConcurrentWindowRatePolicy + { + private static readonly ConcurrentDictionary _map = new(); + public override Task ObtainLease(string requestPolicyKey, CancellationToken cancel = default) + { + var count = _map.AddOrUpdate(requestPolicyKey, 1, (k, v) => ++v); + + var lease = new PolicyLease(count <= Degree, () => + { + if (0 == _map.AddOrUpdate(requestPolicyKey, 0, (k, v) => --v)) + { + _map.TryRemove(KeyValuePair.Create(requestPolicyKey, 0U)); + } + }); + + return Task.FromResult(lease); + } + } + + private sealed class FixedWindowRatePolicy : Policies.FixedWindowRatePolicy + { + private static readonly ConcurrentDictionary, uint> _performanceMap = new(); + private static readonly Dictionary, Tuple> _strictMap = []; + + private readonly Options _options; + + internal FixedWindowRatePolicy(Options options) + { + _options = options; + } + + public override Task ObtainLease(string requestPolicyKey, CancellationToken cancel = default) + { + var key = Tuple.Create(requestPolicyKey, Period); + uint count; + + switch (_options.Precision) + { + case Options.PrecisionOption.Performance: + count = _performanceMap.AddOrUpdate(key, 1, (k, v) => ++v); + + if (1 == count) + { + Task.Run(async () => { + await Task.Delay(Period); + _performanceMap.TryRemove(key, out _); + }, CancellationToken.None); + } + + break; + case Options.PrecisionOption.Strict: + lock (_strictMap) + { + if (!_strictMap.ContainsKey(key)) + { + count = 1U; + _strictMap[key] = Tuple.Create(DateTime.UtcNow + Period, count); + Task.Run(async () => { + await Task.Delay(Period); + lock (_strictMap) + { + if (_strictMap.ContainsKey(key)) + { + (var time, _) = _strictMap[key]; + if(DateTime.UtcNow < time) + { + _strictMap.Remove(key); + } + } + } + }, CancellationToken.None); + } + else + { + (var time, count) = _strictMap[key]; + if (DateTime.UtcNow < time) + { + _strictMap[key] = Tuple.Create(time, ++count); + } + else + { + count = 1U; + _strictMap[key] = Tuple.Create(DateTime.UtcNow + Period, count); + } + } + } + break; + default: + throw new NotSupportedException(); + } + + + + var lease = new PolicyLease(count <= PeriodQuantity); + return Task.FromResult(lease); + } + } + + private sealed class SlidingWindowRatePolicy : Policies.SlidingWindowRatePolicy + { + private static readonly ConcurrentDictionary, ConcurrentQueue> _map = new(); + + private readonly Options _options; + + internal SlidingWindowRatePolicy(Options options) + { + _options = options; + } + + public override Task ObtainLease(string requestPolicyKey, CancellationToken cancel = default) + { + var key = Tuple.Create(requestPolicyKey, Period); + var list = _map.GetOrAdd(key, k => new()); + bool grant; + + bool freshList; + + switch (_options.Precision) + { + case Options.PrecisionOption.Strict: + lock (list) + { + while (list.TryPeek(out var head)) + { + if (head < DateTime.UtcNow) + { + list.TryDequeue(out _); + continue; + } + + break; + } + + grant = list.Count < PeriodQuantity; + if (grant) + { + list.Enqueue(DateTime.UtcNow + Period); + } + } + break; + case Options.PrecisionOption.Performance: + freshList = list.IsEmpty; + grant = list.Count < PeriodQuantity; + if (grant) + { + list.Enqueue(DateTime.UtcNow + Period); + } + if (freshList) + { + Task.Run(async () => { + while(true) + { + DateTime? next = null; + + lock(list) + { + + while (list.TryPeek(out var head)) + { + if (head < DateTime.UtcNow) + { + list.TryDequeue(out _); + continue; + } + + next = head; + + break; + } + } + + if (next.HasValue) + { + await Task.Delay(next.Value - DateTime.UtcNow); + continue; + } + + return; + } + }, CancellationToken.None); + } + break; + default: + throw new NotSupportedException(); + } + + var lease = new PolicyLease(grant); + return Task.FromResult(lease); + } + } +} \ No newline at end of file diff --git a/RateLimiter/PolicyLease.cs b/RateLimiter/PolicyLease.cs new file mode 100644 index 00000000..3e91d8cf --- /dev/null +++ b/RateLimiter/PolicyLease.cs @@ -0,0 +1,64 @@ +using System; +using System.Threading.Tasks; + +namespace RateLimiter; +/// +/// Lease returned by . +/// +/// +/// +/// Lease represents two things. One, it represents policy's answer to the query if a request is allowed +/// by the policy or if policy wishes to limit the request. Second, it represents a borrowing of policy's +/// allowance for allowed leases for the duration of the request, whereby once request processing +/// is completed, lease is disposed, returning policy's allocation back to the pool to be available +/// for other requests. The return of the lease back to the pool is accomplished via disposal +/// infrastructure by calling . +/// +/// +public class PolicyLease : IAsyncDisposable + { + /// + /// Indicates if policy has granted an allocation for the request (true), or denied it (false). + /// + public bool IsGranted { get; private set; } + + private readonly Func? _onRelease; + + /// + /// Constructs lease, indicating if it was granted, and synchronous callback to call on disposal. + /// + /// Indicates if lease is granted. + /// Callback to release lease. + public PolicyLease(bool granted, Action onRelease) + : this(granted, () => { onRelease?.Invoke(); return Task.CompletedTask; }) + { } + + /// + /// Constructs lease, indicating if it was granted, and asynchronous callback to call on disposal. + /// + /// Indicates if lease is granted. + /// Callback to release lease. + public PolicyLease(bool granted, Func onRelease) + : this(granted) + { + _onRelease = onRelease; + } + + /// + /// Constructs lease, indicating if it was granted. + /// + /// Indicates if lease is granted. + public PolicyLease(bool granted) + { + IsGranted = granted; + } + + /// + /// Releases the lease, and calls release callback if one was passed in a constructor. + /// + /// + public virtual async ValueTask DisposeAsync() + { + await (_onRelease?.Invoke() ?? ValueTask.CompletedTask.AsTask()); + } + } \ No newline at end of file diff --git a/RateLimiter/RateLimiter.cs b/RateLimiter/RateLimiter.cs new file mode 100644 index 00000000..2dba7d61 --- /dev/null +++ b/RateLimiter/RateLimiter.cs @@ -0,0 +1,201 @@ +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using RateLimiter.Policies; + +namespace RateLimiter +{ + /// + /// Marker interface to represent a request for to examine. + /// + /// + /// + /// At this time, the interface is nothing more than a marker interface with no methods to simply aid argument typing. + /// + /// + public interface IRequest + { } + + /// + /// Interface to represent policy factory provider. + /// + /// + /// + /// Different policy state providers can have their specific implementations of general policies. + /// This interface provides means for querying for and creating state provider specific + /// implementations of requested policy type. + /// + /// + /// Library includes , a local memory based state provider, that's limited to local use. + /// + /// + /// It's possible, however, to extend other providers, such as, for example, one that + /// uses SQLServer to allow limiting requests across a farm of servers. Or possibly one + /// that uses Memchached as its store. Or Reddis. Depending on capabilities of the backing + /// technology, it's possible that some rate policies may or may not be supported, or possibly + /// supported, but with some affects on their precision. For example, while it may be possible + /// to increment a count precisely with SQLServer as SQLServer provides capabilities for atomic + /// data modifications, the same may not be possible with Memcached if memcached doesn't support + /// atomic, exclusive modification with guaranteed value outcome (but at the same time win on + /// efficiency and speed of vis-a-vis SQLServer). As such, one must carefuly weigh pros and cons + /// of each state provider before choosing one. + /// + /// + public interface IPolicyStateProvider + { + /// + /// Queries for and returns provider-specific implementation of requested policy . + /// + /// Policy type implementation to create. + /// Instance of specific implementation of requested policy. + TPolicyType GetPolicy() where TPolicyType : RatePolicy; + } + + /// + /// Interface that generalizes representation of service-specific integration implementations. + /// + /// + /// + /// A request processor is a specific implementation of some service, like ASP.NET, HttpClient, gRPC, UberRpcService, etc. + /// Processor handles all aspects of: + /// + /// + /// Integration into service context and injection of its request handling in service's request handling pipeline. + /// + /// + /// Passing allowed requests to service and handling of rejected requests in service meaningful way. + /// + /// + /// Mapping of service's way of marking of applicable policies to requests and provided + /// services, e.g. API endpoints.
+ /// For example, one specific implementation may use attribute decorations on handling methods to specify + /// endpoints' applicable policies. Yet another may use attributes to decorate message payload types or + /// possibly message payload fields to describe applicable policies. Yet another way may be to have + /// a registry generated on startup that provides mapping lookups at runtime for applicable policies. + /// It all depends on the what is meaningful to the underlying services for which provider is written + /// and how implementers chose to handle the policy references. + ///
+ ///
+ ///
+ ///
+ public interface IRequestProcessor + { + /// + /// Callback from for requests that have been determined not to be restricted by some policy. + /// + /// Requested in context of evaluation. + /// Cancellation token. + /// Async task. + Task AcceptRequest(IRequest request/*, failureContext*/, CancellationToken cancel = default); + + /// + /// Callback from for requests that have been determined to be restricted by some policy. + /// + /// Requested in context of evaluation. + /// Cancellation token. + /// Async task. + Task DenyRequest(IRequest request/*, failureContext*/, CancellationToken cancel = default); + + /// + /// Queries policy mapping for applicable policies for a request, and scope of application for each matched policy. + /// + /// Request to be mapped for applicable policies and their effective scopes. + /// Cancellation token. + /// + /// + /// Each request can have zero or more policies that are applicable to it, and service-specific processor + /// is responsible for providing mapping between specific requests and all rate limiting policies + /// applicable to that request, as well as the scope of each policy. Scope may be applicable either + /// in case there's some hierarchy to request's structure or some context such as being limited + /// per particular user, or being limited per particular argument to request. + /// + /// + /// The return of the call is an + /// where represents some scope bucket for the policy, + /// and is the applicable + /// at that scope. + /// + /// + /// + /// Async; representing applicable rate policies + /// and their respective scopes. See remarks. + /// + Task>> GetRequestPolicies(IRequest request, CancellationToken cancel = default); + } + + /// + /// Rate limiter to evaluate requests for applicable rate policies and make determination + /// if request is to be accepted or denied. + /// + public sealed class RateLimiter + { + private readonly IRequestProcessor requestProcessor; + + /// + /// Constructs using service-specific implementation of . + /// + /// Request processor. + public RateLimiter(IRequestProcessor requestProcessor) + { + // todo + this.requestProcessor = requestProcessor; + } + + /// + /// Checks request against applicable rate policies to determine if request should be accepted or denied. + /// + /// Request to evaluate. + /// Cancellation token. + /// + /// + /// The method queries request processor for applicable policies, evaluates each policy, + /// and determines if policy is to be accepted or denied. In case of accepted requests, + /// calls to + /// have processor pass request to the underlying service; otherwise calls + /// . + /// + /// + /// Async task. + public async Task CheckRequest(IRequest request, CancellationToken cancel = default) + { + var requestPolicyMatches = await requestProcessor.GetRequestPolicies(request, cancel); + + using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancel); + var cancelationToken = cts.Token; + + var rejectedPolicyDetected = false; + var policyEvalTasks = requestPolicyMatches.Select(async kv => { + var (requestPolicyKey, policy) = kv; + var policyLease = await policy.ObtainLease(requestPolicyKey, cancelationToken); + if (!policyLease.IsGranted) + { + cts.Cancel(); + rejectedPolicyDetected = true; + } + return policyLease; + }).ToArray(); + + await Task.WhenAll(policyEvalTasks); + + // TODO: what to do in case policy eval threw exception. Does it mean reject request? + var policyEvaluations = policyEvalTasks.Select(t => t.Result).ToArray(); + + try + { + if (rejectedPolicyDetected) + { + await requestProcessor.DenyRequest(request, cancel); + } + else + { + await requestProcessor.AcceptRequest(request, cancel); + } + } + finally + { + await Task.WhenAll(policyEvaluations.Select(p => p.DisposeAsync().AsTask())); + } + } + } +} \ No newline at end of file diff --git a/RateLimiter/RateLimiter.csproj b/RateLimiter/RateLimiter.csproj index 19962f52..20025e38 100644 --- a/RateLimiter/RateLimiter.csproj +++ b/RateLimiter/RateLimiter.csproj @@ -1,7 +1,7 @@  - net6.0 + net9.0 latest enable - \ No newline at end of file + \ No newline at end of file diff --git a/UberRpcService/UberRpcRequestProcessor.cs b/UberRpcService/UberRpcRequestProcessor.cs new file mode 100644 index 00000000..f108ce6f --- /dev/null +++ b/UberRpcService/UberRpcRequestProcessor.cs @@ -0,0 +1,286 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using RateLimiter.Policies; +using UberRpcService; + +namespace RateLimiter.UberRpc; + +/// +/// Request processor implementation to be used with for +/// supporting service technology. +/// +/// Service interface of . +/// +/// +/// This processor uses attributes for decorating RPC methods to declare applicable policies for each service API. +/// +/// +public sealed class UberRpcRequestProcessor : IRequestProcessor + where TService : class +{ + private readonly IPolicyStateProvider _policyStateProvider; + private readonly Func _acceptedCallback; + private readonly Func _deniedCallback; + + /// + /// C'tor. + /// + /// Policy state provider to query for concrete policy implementations. + /// Callback for accepted requests. + /// Callback for denied requests. + public UberRpcRequestProcessor(IPolicyStateProvider policyStateProvider, Func accepted, Func denied) + { + ArgumentNullException.ThrowIfNull(policyStateProvider); + ArgumentNullException.ThrowIfNull(accepted); + ArgumentNullException.ThrowIfNull(denied); + + _policyStateProvider = policyStateProvider; + _acceptedCallback = accepted; + _deniedCallback = denied; + } + + /// + /// Called by when it accpets request. This will cause accept callback to be invoked. + /// + /// Request in context. + /// Cancellation token. + /// Async. + /// Thrown if request is not of type. Shouldn't happen. + public async Task AcceptRequest(IRequest request, CancellationToken cancel = default) + { + ArgumentNullException.ThrowIfNull(request); + var uberRpcRequest = request as UberRpcRequest + ?? throw new ArgumentOutOfRangeException(nameof(request), $"Request is not of {nameof(UberRpcRequest)} type."); + await _acceptedCallback(uberRpcRequest); + } + + /// + /// Called by when it accpets request. This will cause accept callback to be invoked. + /// + /// Request in context. + /// Cancellation token. + /// Async. + /// Thrown if request is not of type. Shouldn't happen. + public async Task DenyRequest(IRequest request, CancellationToken cancel = default) + { + ArgumentNullException.ThrowIfNull(request); + var uberRpcRequest = request as UberRpcRequest + ?? throw new ArgumentOutOfRangeException(nameof(request), $"Request is not of {nameof(UberRpcRequest)} type."); + await _deniedCallback(uberRpcRequest); + } + + /// + /// Returns applicable policies for the request and their applicable scope. + /// + /// Request in context. + /// Cancellation token. + /// Enumeration of policies and their scope. + /// + public Task>> GetRequestPolicies(IRequest request, CancellationToken cancel = default) + { + if (request is UberRpcRequest uberRpcRequest) + { + var methodInfo = typeof(TService).GetMethod(uberRpcRequest.Request.MethodName); + if (methodInfo == null) + { + // Let service itself handle invalid request; service-invalid requests aren't subject to rate policy. + return Task.FromResult(Enumerable.Empty>()); + } + + return Task.FromResult(methodInfo.GetCustomAttributes(true) + .Where(a => a != null) + .Select(attribute => { + var sb = new StringBuilder(); + sb.Append(uberRpcRequest.Request.MethodName); + sb.Append(':'); + if (attribute.WithAuthContext && uberRpcRequest.Request.AuthorizationContext.HasValue) + { + sb.Append(uberRpcRequest.Request.AuthorizationContext); + sb.Append(':'); + } + foreach (var argIndex in attribute.ArgIndecies) + { + if (uberRpcRequest.Request.Args == null || argIndex >= uberRpcRequest.Request.Args.Length) + { + // TODO: Custom exception type? + throw new Exception("Attribute references argument index that is out of range."); + } + sb.Append(uberRpcRequest.Request.Args[argIndex].GetHashCode()); + sb.Append(':'); + } + + var policy = attribute.GetPolicy(_policyStateProvider); + + return new KeyValuePair(sb.ToString(), policy); + }) + ); + } + else + { + // We can either throw b/c we got request that's not related to this service, or probably + // better be amiacable here and just return no policies as we don't have anything to do + // with this request. + return Task.FromResult(Enumerable.Empty>()); + } + } +} + +/// +/// Wrapper for request. +/// +public class UberRpcRequest : IRequest +{ + /// + /// Wrapped request. + /// + public readonly UberRpcServiceCallMessage Request; + + /// + /// General purpose context dictionary for the request. + /// + public readonly Dictionary Context = []; + + public UberRpcRequest(UberRpcServiceCallMessage request) + { + Request = request; + } +} + +/// +/// Base attribute that provides some common properties for all attributes. +/// +[AttributeUsage(AttributeTargets.Method)] +public abstract class UberRpcRatePolicyBaseAttribute : Attribute +{ + /// + /// Specifies if policy should discriminate on authorization context, i.e. separate + /// allocation bucket per user, or if single bucket for all users. + /// + public bool WithAuthContext = true; + + /// + /// Specifies, by index position, which API methods parameters should be used to discriminate on. + /// + /// + /// + /// Each reference argument value is evaluated using its to be used + /// to construct a hash to discriminate on that value. As such, referenced argument + /// type must produce plausible hashcode of value for correct behavior. + /// + /// + public int[] ArgIndecies = []; + + /// + /// Get policy that deriving attribute represents. + /// + /// Policy factory to use to create policy instance. + /// + internal abstract RatePolicy GetPolicy(IPolicyStateProvider factory); +} + +/// +/// Specifies that API is bound to . +/// +[AttributeUsage(AttributeTargets.Method)] +public class ConcurrentWindowRatePolicyAttribute : UberRpcRatePolicyBaseAttribute +{ + /// + /// See . + /// + public uint Degree { get; protected set; } + + public ConcurrentWindowRatePolicyAttribute(uint degree) + { + Degree = degree; + } + + internal override RatePolicy GetPolicy(IPolicyStateProvider factory) + { + //TODO: opportunity to cache policies via weak reference with same degree and thus reduce number of instances floating around. + var concurrentPolicy = factory.GetPolicy(); + + concurrentPolicy.Degree = Degree; + return concurrentPolicy; + } +} + +/// +/// Specifies that API is bound to . +/// +[AttributeUsage(AttributeTargets.Method)] +public class FixedWindowRatePolicyAttribute : UberRpcRatePolicyBaseAttribute +{ + /// + /// See . + /// + public TimeSpan Period { get; protected set; } + + /// + /// See . + /// + public uint PeriodQuantity { get; protected set; } + + public FixedWindowRatePolicyAttribute(TimeSpan period, uint periodQuantity) + { + Period = period; + PeriodQuantity = periodQuantity; + } + + // Because https://stackoverflow.com/questions/51000597/c-sharp-a-timespan-in-attribute + public FixedWindowRatePolicyAttribute(int milliseconds, uint periodQuantity) + : this (TimeSpan.FromMilliseconds(milliseconds), periodQuantity) + { } + + internal override RatePolicy GetPolicy(IPolicyStateProvider factory) + { + //TODO: opportunity to cache policies via weak reference with same degree and thus reduce number of instances floating around. + var fixedPolicy = factory.GetPolicy(); + + fixedPolicy.Period = Period; + fixedPolicy.PeriodQuantity = PeriodQuantity; + return fixedPolicy; + } +} + +/// +/// Specifies that API is bound to . +/// +[AttributeUsage(AttributeTargets.Method)] +public class SlidingWindowRatePolicyAttribute : UberRpcRatePolicyBaseAttribute +{ + /// + /// See . + /// + public TimeSpan Period { get; protected set; } + + /// + /// See . + /// + public uint PeriodQuantity { get; protected set; } + + public SlidingWindowRatePolicyAttribute(TimeSpan period, uint periodQuantity) + { + Period = period; + PeriodQuantity = periodQuantity; + } + + // Because https://stackoverflow.com/questions/51000597/c-sharp-a-timespan-in-attribute + public SlidingWindowRatePolicyAttribute(int milliseconds, uint periodQuantity) + : this (TimeSpan.FromMilliseconds(milliseconds), periodQuantity) + { } + + internal override RatePolicy GetPolicy(IPolicyStateProvider factory) + { + //TODO: opportunity to cache policies via weak reference with same degree and thus reduce number of instances floating around. + var fixedPolicy = factory.GetPolicy(); + + fixedPolicy.Period = Period; + fixedPolicy.PeriodQuantity = PeriodQuantity; + return fixedPolicy; + } +} diff --git a/UberRpcService/UberRpcService.cs b/UberRpcService/UberRpcService.cs new file mode 100644 index 00000000..98909be9 --- /dev/null +++ b/UberRpcService/UberRpcService.cs @@ -0,0 +1,114 @@ +using System; +using System.Threading.Tasks; + +namespace UberRpcService; + +public class UberRpcServiceCallMessage +{ + public required string MethodName { get; init; } + public int? AuthorizationContext { get; init; } + public object[]? Args { get; init; } +} + +public class UberRpcServiceResponseMessage +{ + public enum ResultCode + { + Success, + Fail, + NoMethod, + TooMuch + } + + public ResultCode ResultStatus { get; init; } + + public object? ReturnValue { get; init; } + + public Exception? Exception { get; init; } +} + +/// +/// Some example, generic, not yet super popular or even yet-known and yet-heard-of super RPC +/// service that one day will win over gRPC monopoly, like totally. +/// +/// Service interface to wrap RPC service over. +public class UberRpcService where TService : class +{ + private readonly TService _service; + + public UberRpcService(TService service) + { + _service = service; + } + + public async Task InvokeRpcCall(UberRpcServiceCallMessage message) + { + var method = typeof(TService).GetMethod(message.MethodName); + if (method == null) + { + return new UberRpcServiceResponseMessage + { + ResultStatus = UberRpcServiceResponseMessage.ResultCode.NoMethod + }; + } + + try + { + var methodReturn = method.Invoke(_service, message.Args); + + if (method.ReturnType.IsAssignableTo(typeof(Task))) + { + // Async call + var methodTask = methodReturn as Task; + + try + { + await methodTask!; + } + catch + { } + + if (!methodTask!.IsCompletedSuccessfully) + { + return new UberRpcServiceResponseMessage + { + ResultStatus = UberRpcServiceResponseMessage.ResultCode.Fail, + Exception = methodTask.Exception + }; + } + + if (method.ReturnType.GenericTypeArguments.Length == 1) + { + return new UberRpcServiceResponseMessage + { + ResultStatus = UberRpcServiceResponseMessage.ResultCode.Success, + ReturnValue = ((Task)methodTask).Result + }; + } + + return new UberRpcServiceResponseMessage + { + ResultStatus = UberRpcServiceResponseMessage.ResultCode.Success, + ReturnValue = null + }; + } + else + { + // Sync call + return new UberRpcServiceResponseMessage + { + ResultStatus = UberRpcServiceResponseMessage.ResultCode.Success, + ReturnValue = methodReturn + }; + } + } + catch (Exception e) + { + return new UberRpcServiceResponseMessage + { + ResultStatus = UberRpcServiceResponseMessage.ResultCode.Fail, + Exception = e + }; + } + } +} \ No newline at end of file diff --git a/UberRpcService/UberRpcService.csproj b/UberRpcService/UberRpcService.csproj new file mode 100644 index 00000000..7754f2f0 --- /dev/null +++ b/UberRpcService/UberRpcService.csproj @@ -0,0 +1,10 @@ + + + + + + net9.0 + latest + enable + + \ No newline at end of file