From abeeb82a22d644575df4141b257265f1a31ed691 Mon Sep 17 00:00:00 2001 From: Joris Hartog Date: Thu, 5 Mar 2026 10:36:27 +0100 Subject: [PATCH] refactor: remove rate_limiter actor Remove the rate_limiter actor and unify in-memory and external store hit paths through a single Store interface. Store operations reduced from 4 (get/set/lock/unlock) to 3 (lock_and_get/set_and_unlock/unlock). In-memory mode uses no-op locks backed by the memory_store actor. --- CHANGELOG.md | 18 + README.md | 28 +- examples/redis/src/app.gleam | 82 ++- gleam.toml | 1 - src/glimit.gleam | 135 ++-- src/glimit/bucket.gleam | 40 +- src/glimit/memory_store.gleam | 76 +-- src/glimit/rate_limiter.gleam | 193 ------ test/glimit_rate_limiter_test.gleam | 345 ----------- test/glimit_test.gleam | 929 ++++++++++++++++++++-------- 10 files changed, 898 insertions(+), 949 deletions(-) delete mode 100644 src/glimit/rate_limiter.gleam delete mode 100644 test/glimit_rate_limiter_test.gleam diff --git a/CHANGELOG.md b/CHANGELOG.md index 00bc9bc..93a9af0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,24 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), ## [Unreleased] +### Added + +- `glimit.hit(limiter, identifier)` for direct rate limit checks without the `apply` wrapper. +- `glimit.HitError` type with `RateLimited`, `Unavailable`, and `StoreLockFailed` constructors (was a re-export, now standalone with the same variants). +- `bucket.compute_ttl(bucket_state)` — computes a TTL in seconds for bucket state, useful for external store adapters. +- `RateLimiter` record has a `now` field (`fn() -> Int`) for test time overrides — construct a new limiter with a different `now` to control time in tests. + +### Changed + +- Removed the internal `rate_limiter` actor. Both in-memory and external store hits now go through a unified `Store` interface (`lock_and_get` / `set_and_unlock` / `unlock`). The builder API (`glimit.new() |> glimit.per_second(10) |> ...`) is unchanged. +- `Store` type simplified from 4 operations (`get`, `set`, `lock`, `unlock`) to 3 (`lock_and_get`, `set_and_unlock`, `unlock`). Adapters combine lock+get and set+unlock into single operations. +- `RateLimiter` record fields changed: `rate_limiter_actor` replaced by `per_second`, `burst_limit`, `store`, `memory_store`, and `now`. The builder API is unchanged. + +### Removed + +- `glimit.sweep()` (was a no-op). +- `glimit.set_now()` — time overrides are now done by constructing a new `RateLimiter` with a different `now` field. + ## 1.3.1 - 2026-03-05 diff --git a/README.md b/README.md index 0adda5d..d57ced6 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,26 @@ func("🚀") // "OK" func("🚀") // "Stop!" ``` +You can also use `glimit.build` and `glimit.hit` for direct rate limit checks +without wrapping a function: + +```gleam +import glimit + +let assert Ok(limiter) = + glimit.new() + |> glimit.per_second(10) + |> glimit.identifier(fn(x) { x }) + |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) + |> glimit.build + +case glimit.hit(limiter, "user_123") { + Ok(Nil) -> // allowed + Error(glimit.RateLimited) -> // rejected + Error(_) -> // store unavailable, fails open +} +``` + More practical examples can be found in the `examples/` directory, such as Wisp or Mist servers, or a Redis backend. @@ -48,7 +68,7 @@ More practical examples can be found in the `examples/` directory, such as Wisp By default, rate limit state is stored in-memory using an OTP actor. For distributed rate limiting across multiple nodes, you can provide a custom `Store` that persists bucket state in an external service like Redis or Postgres. -All token bucket logic stays in glimit — adapters only implement simple get/set/lock/unlock operations. The `glimit/bucket` module provides `to_pairs`/`from_pairs` helpers for serialization. +All token bucket logic stays in glimit — adapters only implement `lock_and_get` / `set_and_unlock` / `unlock` operations. The `glimit/bucket` module provides `to_pairs`/`from_pairs` helpers for serialization. See `examples/redis/` for a complete Redis adapter using [valkyrie](https://hexdocs.pm/valkyrie/). @@ -60,10 +80,10 @@ When no store is configured, the rate limiter uses the default in-memory backend ## Performance -All rate limiter state is held in a single OTP actor. Each hit is one message to this actor, which performs the token bucket calculation inline. +Every hit goes through the pluggable `Store` interface (`lock_and_get` / `set_and_unlock`). In-memory mode backs this with an OTP actor (two messages per hit); external store mode calls the adapter directly. -* **Memory** (in-memory mode): One dict entry per unique identifier. Idle identifiers (full token buckets) are automatically swept every 10 seconds. -* **Fail-open**: If the rate limiter actor is unavailable or a store lock fails, the request is allowed through rather than rejected. +* **Memory** (in-memory mode): One dict entry per unique identifier. Full and idle buckets are automatically swept every 10 seconds. +* **Fail-open**: If the store is unavailable or a lock cannot be acquired, the request is allowed through rather than rejected. ## Documentation diff --git a/examples/redis/src/app.gleam b/examples/redis/src/app.gleam index 5a54519..efb61ae 100644 --- a/examples/redis/src/app.gleam +++ b/examples/redis/src/app.gleam @@ -23,51 +23,67 @@ fn resp_to_string(value: resp.Value) -> Result(String, Nil) { } fn redis_store(conn: valkyrie.Connection) -> glimit.Store { + let lock = fn(key) { + let opts = + valkyrie.SetOptions( + existence_condition: Some(valkyrie.IfNotExists), + return_old: False, + expiry_option: Some(valkyrie.ExpirySeconds(5)), + ) + case valkyrie.set(conn, key <> ":lock", "1", Some(opts), redis_timeout) { + Ok(_) -> Ok(Nil) + Error(_) -> Error(Nil) + } + } + + let get = fn(key) { + case valkyrie.hgetall(conn, key, redis_timeout) { + Ok(fields) -> { + let pairs = + dict.to_list(fields) + |> list.filter_map(fn(p) { + use k <- result.try(resp_to_string(p.0)) + use v <- result.try(resp_to_string(p.1)) + Ok(#(k, v)) + }) + case bucket.from_pairs(pairs) { + Ok(state) -> Ok(Some(state)) + _ -> Ok(None) + } + } + Error(_) -> Error(Nil) + } + } + + let unlock = fn(key) { + let _ = valkyrie.del(conn, [key <> ":lock"], redis_timeout) + Ok(Nil) + } + bucket.Store( - get: fn(key) { - case valkyrie.hgetall(conn, key, redis_timeout) { - Ok(fields) -> { - let pairs = - dict.to_list(fields) - |> list.filter_map(fn(p) { - use k <- result.try(resp_to_string(p.0)) - use v <- result.try(resp_to_string(p.1)) - Ok(#(k, v)) - }) - case bucket.from_pairs(pairs) { - Ok(state) -> Ok(Some(state)) - _ -> Ok(None) - } + lock_and_get: fn(key) { + use _ <- result.try(lock(key)) + case get(key) { + Ok(result) -> Ok(result) + Error(_) -> { + let _ = unlock(key) + Error(Nil) } - Error(_) -> Error(Nil) } }, - set: fn(key, state, ttl) { + set_and_unlock: fn(key, state, ttl) { let fields = bucket.to_pairs(state) |> dict.from_list - case valkyrie.hset(conn, key, fields, redis_timeout) { + let result = case valkyrie.hset(conn, key, fields, redis_timeout) { Ok(_) -> { let _ = valkyrie.expire(conn, key, ttl, None, redis_timeout) Ok(Nil) } Error(_) -> Error(Nil) } + let _ = unlock(key) + result }, - lock: fn(key) { - let opts = - valkyrie.SetOptions( - existence_condition: Some(valkyrie.IfNotExists), - return_old: False, - expiry_option: Some(valkyrie.ExpirySeconds(5)), - ) - case valkyrie.set(conn, key <> ":lock", "1", Some(opts), redis_timeout) { - Ok(_) -> Ok(Nil) - Error(_) -> Error(Nil) - } - }, - unlock: fn(key) { - let _ = valkyrie.del(conn, [key <> ":lock"], redis_timeout) - Ok(Nil) - }, + unlock: unlock, ) } diff --git a/gleam.toml b/gleam.toml index f9efbd0..acdc9d1 100644 --- a/gleam.toml +++ b/gleam.toml @@ -6,7 +6,6 @@ licences = ["MIT"] repository = { type = "github", user = "nootr", repo = "glimit" } target = "erlang" internal_modules = [ - "glimit/rate_limiter", "glimit/memory_store", "glimit/utils", ] diff --git a/src/glimit.gleam b/src/glimit.gleam index a40d24e..3294e75 100644 --- a/src/glimit.gleam +++ b/src/glimit.gleam @@ -1,12 +1,15 @@ //// This module provides a rate limiter that can be used to limit the number of //// requests or function calls per second for a given identifier. //// -//// A single rate limiter actor stores all token bucket state. Each hit is a single -//// message to the rate limiter, which performs the Token Bucket calculation inline. +//// In-memory mode: each hit sends two messages to an OTP actor (get + set). //// A periodic sweep removes full or idle buckets to reduce memory usage. The //// idle threshold defaults to 60 seconds and can be configured via `max_idle`. -//// The rate limiter fails open — if the rate limiter actor is unavailable, -//// requests are allowed through. +//// +//// External store mode: each hit calls the store's lock_and_get/set_and_unlock +//// interface directly, with no actor overhead. +//// +//// The rate limiter fails open — if the store is unavailable, requests are +//// allowed through. //// //// The rate limits are configured using the following two options: //// @@ -61,9 +64,10 @@ //// distributed rate limiting (e.g. across multiple nodes), you can provide a //// custom `Store` that persists bucket state externally (Redis, Postgres, etc.). //// -//// All token bucket logic stays in glimit — adapters only implement simple -//// get/set/lock/unlock operations. The `glimit/bucket` module is public and -//// provides `to_pairs`/`from_pairs` helpers for serialization. +//// All token bucket logic stays in glimit — adapters only implement +//// `lock_and_get` / `set_and_unlock` / `unlock` operations. The `glimit/bucket` +//// module is public and provides `to_pairs`/`from_pairs` helpers for +//// serialization. //// //// See `examples/redis/` for a complete Redis adapter using //// [valkyrie](https://hexdocs.pm/valkyrie/). @@ -74,7 +78,9 @@ import gleam/result import gleam/string import glimit/bucket import glimit/memory_store.{type MemoryStore} -import glimit/rate_limiter +import glimit/utils + +const store_key_prefix = "glimit:" /// A pluggable storage backend for distributed rate limiting. /// See `glimit/bucket.Store` for full documentation. @@ -84,17 +90,26 @@ pub type Store = /// Error type returned when a rate limit check fails. /// -pub type HitError = - rate_limiter.HitError +pub type HitError { + /// The rate limit has been exceeded. + RateLimited + /// The rate limiter is unavailable. + Unavailable + /// The store lock could not be acquired (fails open). + StoreLockFailed +} /// A rate limiter. /// pub type RateLimiter(a, b, id) { RateLimiter( - rate_limiter_actor: rate_limiter.RateLimiterActor(id), on_limit_exceeded: fn(a) -> b, identifier: fn(a) -> id, + per_second: fn(id) -> Int, + burst_limit: fn(id) -> Int, + store: Store, memory_store: Option(MemoryStore), + now: fn() -> Int, ) } @@ -265,8 +280,8 @@ pub fn max_idle( /// Set a pluggable store backend for distributed rate limiting. /// /// When a store is configured, bucket state is read from and written to the -/// store on each hit instead of being kept in the actor's in-memory dictionary. -/// The periodic sweep becomes a no-op since external stores handle expiry via TTL. +/// store on each hit instead of being kept in memory. +/// External stores handle expiry via TTL. /// /// # Example /// @@ -327,6 +342,41 @@ pub fn identifier( RateLimiterBuilder(..limiter, identifier: Some(identifier)) } +fn string_key(identifier: id) -> String { + store_key_prefix <> string.inspect(identifier) +} + +fn store_hit( + store: Store, + now: fn() -> Int, + key: String, + max_token_count: Int, + token_rate: Int, +) -> Result(Nil, HitError) { + use maybe_bucket <- result.try( + store.lock_and_get(key) |> result.replace_error(StoreLockFailed), + ) + let b = case maybe_bucket { + Some(b) -> Ok(b) + None -> bucket.new(max_token_count, token_rate) + } + case b { + Error(_) -> { + let _ = store.unlock(key) + Error(Unavailable) + } + Ok(b) -> { + let #(hit_result, new_b) = bucket.hit(b, now()) + let ttl = bucket.compute_ttl(new_b) + let _ = store.set_and_unlock(key, new_b, ttl) + case hit_result { + Ok(Nil) -> Ok(Nil) + Error(Nil) -> Error(RateLimited) + } + } + } +} + /// Build the rate limiter. /// /// Note that using `apply` will already build the rate limiter, so this function is @@ -356,30 +406,43 @@ pub fn build( None -> Error("`on_limit_exceeded` function is required") }) - // Resolve the store: use provided store or create an in-memory store - use #(resolved_store, mem_store) <- result.try(case config.store { - Some(s) -> Ok(#(s, None)) - None -> { + use #(store, ms) <- result.try(case config.store { + Some(ext_store) -> Ok(#(ext_store, None)) + None -> case memory_store.new(config.max_idle_ms, 10_000) { - Ok(#(s, handle)) -> Ok(#(s, Some(handle))) + Ok(handle) -> Ok(#(memory_store.make_store(handle), Some(handle))) Error(_) -> Error("Failed to start memory store") } - } }) - use rate_limiter_actor <- result.try( - rate_limiter.new(per_second, burst_limit, resolved_store) - |> result.map_error(fn(_) { "Failed to start rate limiter" }), - ) - Ok(RateLimiter( - rate_limiter_actor: rate_limiter_actor, on_limit_exceeded: on_limit_exceeded, identifier: identifier, - memory_store: mem_store, + per_second: per_second, + burst_limit: burst_limit, + store: store, + memory_store: ms, + now: utils.now, )) } +/// Hit the rate limiter for the given identifier directly. +/// +pub fn hit( + limiter: RateLimiter(a, b, id), + identifier: id, +) -> Result(Nil, HitError) { + let key = string_key(identifier) + case utils.rescue(fn() { limiter.burst_limit(identifier) }) { + Error(_) -> Error(Unavailable) + Ok(max) -> + case utils.rescue(fn() { limiter.per_second(identifier) }) { + Error(_) -> Error(Unavailable) + Ok(rate) -> store_hit(limiter.store, limiter.now, key, max, rate) + } + } +} + /// Apply the rate limiter to a request handler or function. /// /// Panics if the rate limiter cannot be started or if the `identifier` @@ -407,11 +470,10 @@ pub fn apply_built( ) -> fn(a) -> b { fn(input: a) -> b { let identifier = limiter.identifier(input) - case rate_limiter.hit(limiter.rate_limiter_actor, identifier) { + case hit(limiter, identifier) { Ok(Nil) -> func(input) - Error(rate_limiter.RateLimited) -> limiter.on_limit_exceeded(input) - Error(rate_limiter.Unavailable) | Error(rate_limiter.StoreLockFailed) -> - func(input) + Error(RateLimited) -> limiter.on_limit_exceeded(input) + Error(Unavailable) | Error(StoreLockFailed) -> func(input) } } } @@ -434,7 +496,7 @@ pub fn get_count(limiter: RateLimiter(a, b, id)) -> Int { pub fn remove(limiter: RateLimiter(a, b, id), identifier: id) -> Nil { case limiter.memory_store { Some(ms) -> { - let key = "glimit:" <> string.inspect(identifier) + let key = string_key(identifier) let _ = memory_store.remove(ms, key) Nil } @@ -442,17 +504,6 @@ pub fn remove(limiter: RateLimiter(a, b, id), identifier: id) -> Nil { } } -/// Remove full or idle buckets from the in-memory store synchronously. -/// -/// No-op if the rate limiter uses an external store. -/// -pub fn sweep(limiter: RateLimiter(a, b, id)) -> Nil { - case limiter.memory_store { - Some(_ms) -> Nil - None -> Nil - } -} - /// Apply the rate limiter to a 2-argument function. /// /// The config's `identifier` and `on_limit_exceeded` receive a `#(a, b)` tuple. diff --git a/src/glimit/bucket.gleam b/src/glimit/bucket.gleam index 36b213f..11dbb40 100644 --- a/src/glimit/bucket.gleam +++ b/src/glimit/bucket.gleam @@ -1,4 +1,4 @@ -//// This module contains pure token-bucket functions used by the rate limiter actor. +//// This module contains pure token-bucket functions used by the rate limiter. //// import gleam/float @@ -18,20 +18,24 @@ const key_token_rate = "tr" /// A pluggable storage backend for distributed rate limiting. /// /// All token bucket logic is handled by glimit — store adapters only need to -/// implement simple get/set/lock/unlock operations on string-keyed bucket state. +/// implement simple lock-and-get / set-and-unlock operations on string-keyed +/// bucket state. /// -/// - `get`: Retrieve bucket state by key. Return `Ok(None)` for a new/missing key. -/// - `set`: Persist bucket state with a TTL in seconds for automatic expiry. -/// - `lock`: Acquire an exclusive lock for the key. Return `Error(Nil)` if unavailable. -/// - `unlock`: Release the lock for the key. +/// - `lock_and_get`: Acquire an exclusive lock for the key and retrieve its +/// bucket state. Return `Ok(None)` for a new/missing key. If the lock cannot +/// be acquired, return `Error(Nil)`. +/// - `set_and_unlock`: Persist bucket state with a TTL in seconds for automatic +/// expiry, then release the lock. +/// - `unlock`: Release the lock without writing. Used on error paths when there +/// is nothing to persist. /// -/// When a lock or get fails, the rate limiter **fails open** (allows the request). +/// When `lock_and_get` fails, the rate limiter **fails open** (allows the +/// request). /// pub type Store { Store( - get: fn(String) -> Result(Option(BucketState), Nil), - set: fn(String, BucketState, Int) -> Result(Nil, Nil), - lock: fn(String) -> Result(Nil, Nil), + lock_and_get: fn(String) -> Result(Option(BucketState), Nil), + set_and_unlock: fn(String, BucketState, Int) -> Result(Nil, Nil), unlock: fn(String) -> Result(Nil, Nil), ) } @@ -159,6 +163,22 @@ pub fn from_pairs(pairs: List(#(String, String))) -> Result(BucketState, Nil) { )) } +/// Compute a TTL in seconds for the bucket state. +/// +/// The TTL is based on the time it takes to fully refill from empty, with a +/// minimum of 60 seconds. +/// +pub fn compute_ttl(b: BucketState) -> Int { + case b.token_rate > 0 { + True -> { + let refill_seconds = + { b.max_token_count + b.token_rate - 1 } / b.token_rate + int.max(refill_seconds * 2, 60) + } + False -> 60 + } +} + /// Returns True if the bucket is full after refilling. /// pub fn is_full(state: BucketState, now: Int) -> Bool { diff --git a/src/glimit/memory_store.gleam b/src/glimit/memory_store.gleam index 2acd7cc..0f146be 100644 --- a/src/glimit/memory_store.gleam +++ b/src/glimit/memory_store.gleam @@ -13,7 +13,7 @@ import glimit/utils const call_timeout = 1000 /// Opaque handle to the in-memory store actor. -/// Provides sweep/count/remove operations that only make sense for in-memory storage. +/// Provides get/set/sweep/count/remove operations for in-memory rate limiting. /// pub opaque type MemoryStore { MemoryStore(subject: Subject(Msg)) @@ -29,15 +29,8 @@ type State { } type Msg { - Get(key: String, reply: Subject(Result(Option(BucketState), Nil))) - Set( - key: String, - state: BucketState, - ttl: Int, - reply: Subject(Result(Nil, Nil)), - ) - Lock(key: String, reply: Subject(Result(Nil, Nil))) - Unlock(key: String, reply: Subject(Result(Nil, Nil))) + Get(key: String, reply: Subject(Option(BucketState))) + Set(key: String, state: BucketState, reply: Subject(Nil)) Sweep(now: Int, max_idle_ms: Option(Int), reply: Subject(Nil)) SweepTimer GetCount(reply: Subject(Int)) @@ -46,16 +39,13 @@ type Msg { /// Create a new in-memory store. /// -/// Returns a tuple of the `Store` interface (for the rate limiter) and a -/// `MemoryStore` handle (for sweep/count/remove operations). -/// /// `max_idle_ms` is the idle eviction threshold, or `None` to disable. /// `sweep_interval_ms` is the interval between automatic sweeps. /// pub fn new( max_idle_ms: Option(Int), sweep_interval_ms: Int, -) -> Result(#(bucket.Store, MemoryStore), Nil) { +) -> Result(MemoryStore, Nil) { let start_result = actor.new_with_initialiser(call_timeout, fn(self_subject) { let state = @@ -77,64 +67,40 @@ pub fn new( |> actor.start case start_result { - Ok(started) -> { - let subject = started.data - let store = make_store(subject) - let handle = MemoryStore(subject: subject) - Ok(#(store, handle)) - } + Ok(started) -> Ok(MemoryStore(subject: started.data)) Error(_) -> Error(Nil) } } -fn make_store(subject: Subject(Msg)) -> bucket.Store { +/// Create a `bucket.Store` backed by this in-memory actor. +/// +/// Lock and unlock are no-ops since the OTP actor serializes messages. +/// +pub fn make_store(store: MemoryStore) -> bucket.Store { bucket.Store( - get: fn(key) { - utils.safe_call(subject, Get(key, _), call_timeout) - |> result.flatten + lock_and_get: fn(key) { + utils.safe_call(store.subject, Get(key, _), call_timeout) }, - set: fn(key, state, ttl) { - utils.safe_call(subject, Set(key, state, ttl, _), call_timeout) - |> result.flatten - }, - lock: fn(_key) { - // No-op: the actor serializes access - Ok(Nil) - }, - unlock: fn(_key) { - // No-op: the actor serializes access - Ok(Nil) + set_and_unlock: fn(key, state, _ttl) { + utils.safe_call(store.subject, Set(key, state, _), call_timeout) }, + unlock: fn(_key) { Ok(Nil) }, ) } fn handle_message(state: State, msg: Msg) -> actor.Next(State, Msg) { case msg { Get(key, reply) -> { - let result = case dict.get(state.data, key) { - Ok(b) -> Ok(Some(b)) - Error(_) -> Ok(None) - } - actor.send(reply, result) + actor.send(reply, dict.get(state.data, key) |> option.from_result) actor.continue(state) } - Set(key, bucket_state, _ttl, reply) -> { + Set(key, bucket_state, reply) -> { let data = dict.insert(state.data, key, bucket_state) - actor.send(reply, Ok(Nil)) + actor.send(reply, Nil) actor.continue(State(..state, data: data)) } - Lock(_key, reply) -> { - actor.send(reply, Ok(Nil)) - actor.continue(state) - } - - Unlock(_key, reply) -> { - actor.send(reply, Ok(Nil)) - actor.continue(state) - } - Sweep(now, max_idle_ms, reply) -> { let data = do_sweep(state.data, now, max_idle_ms) actor.send(reply, Nil) @@ -189,6 +155,12 @@ fn schedule_sweep(state: State) -> Nil { Nil } +/// Return the pid of the memory store actor. +/// +pub fn pid(store: MemoryStore) -> Result(process.Pid, Nil) { + process.subject_owner(store.subject) +} + /// Return the number of tracked identifiers. /// pub fn get_count(store: MemoryStore) -> Int { diff --git a/src/glimit/rate_limiter.gleam b/src/glimit/rate_limiter.gleam deleted file mode 100644 index 28bb06c..0000000 --- a/src/glimit/rate_limiter.gleam +++ /dev/null @@ -1,193 +0,0 @@ -//// This module contains a rate limiter actor that delegates to a pluggable Store. -//// - -import gleam/erlang/process.{type Subject} -import gleam/int -import gleam/option.{type Option, None, Some} -import gleam/otp/actor -import gleam/result -import gleam/string -import glimit/bucket.{type BucketState} -import glimit/utils - -const call_timeout = 1000 - -const store_key_prefix = "glimit:" - -/// Error type returned by `hit`. -/// -pub type HitError { - /// The rate limit has been exceeded. - RateLimited - /// The rate limiter is unavailable. - Unavailable - /// The store lock could not be acquired (fails open). - StoreLockFailed -} - -pub type RateLimiterActor(id) = - Subject(Message(id)) - -/// The rate limiter state. -/// -type State(id) { - State( - /// The maximum number of tokens. - /// - max_token_count: fn(id) -> Int, - /// The rate of token generation per second. - /// - token_rate: fn(id) -> Int, - /// The store backend. - /// - store: bucket.Store, - /// Test time override. - /// - now: Option(Int), - ) -} - -pub type Message(id) { - /// Hit the rate limiter for the given id. - /// - Hit(identifier: id, reply_with: Subject(Result(Nil, HitError))) - /// Set the current time for testing purposes. - /// - SetNow(now: Int, reply_with: Subject(Nil)) -} - -fn get_now(state: State(id)) -> Int { - case state.now { - Some(now) -> now - None -> utils.now() - } -} - -fn handle_store_hit(state: State(id), identifier: id) -> Result(Nil, HitError) { - let key = string_key(identifier) - // 1. Lock - case state.store.lock(key) { - Error(_) -> Error(StoreLockFailed) - Ok(_) -> { - // 2. Get existing state or create new bucket - let bucket_result = case state.store.get(key) { - Ok(Some(b)) -> Ok(b) - Ok(None) -> { - use max <- result.try( - utils.rescue(fn() { state.max_token_count(identifier) }), - ) - use rate <- result.try( - utils.rescue(fn() { state.token_rate(identifier) }), - ) - bucket.new(max, rate) - } - Error(_) -> { - let _ = state.store.unlock(key) - Error(Nil) - } - } - case bucket_result { - Error(_) -> { - let _ = state.store.unlock(key) - Error(Unavailable) - } - Ok(b) -> { - // 3. Refill + consume - let now = get_now(state) - let #(hit_result, new_b) = bucket.hit(b, now) - // 4. Persist - let ttl = compute_ttl(new_b) - let _ = state.store.set(key, new_b, ttl) - // 5. Unlock - let _ = state.store.unlock(key) - case hit_result { - Ok(Nil) -> Ok(Nil) - Error(Nil) -> Error(RateLimited) - } - } - } - } - } -} - -fn string_key(identifier: id) -> String { - store_key_prefix <> string.inspect(identifier) -} - -fn compute_ttl(b: BucketState) -> Int { - case b.token_rate > 0 { - True -> { - // Time to fully refill from empty, plus a buffer - let refill_seconds = - { b.max_token_count + b.token_rate - 1 } / b.token_rate - int.max(refill_seconds * 2, 60) - } - False -> 60 - } -} - -fn handle_message( - state: State(id), - message: Message(id), -) -> actor.Next(State(id), Message(id)) { - case message { - Hit(identifier, client) -> { - let result = handle_store_hit(state, identifier) - actor.send(client, result) - actor.continue(state) - } - - SetNow(now, client) -> { - actor.send(client, Nil) - actor.continue(State(..state, now: Some(now))) - } - } -} - -/// Create a new rate limiter. -/// -pub fn new( - per_second: fn(id) -> Int, - burst_limit: fn(id) -> Int, - store: bucket.Store, -) -> Result(RateLimiterActor(id), Nil) { - actor.new_with_initialiser(call_timeout, fn(self_subject) { - let state = - State( - max_token_count: burst_limit, - token_rate: per_second, - store: store, - now: None, - ) - - Ok( - actor.initialised(state) - |> actor.returning(self_subject), - ) - }) - |> actor.on_message(handle_message) - |> actor.start - |> result.map(fn(started) { started.data }) - |> result.map_error(fn(_) { Nil }) -} - -/// Hit the rate limiter for the given identifier. -/// -pub fn hit( - rate_limiter: RateLimiterActor(id), - identifier: id, -) -> Result(Nil, HitError) { - case utils.safe_call(rate_limiter, Hit(identifier, _), call_timeout) { - Ok(Ok(Nil)) -> Ok(Nil) - Ok(Error(err)) -> Error(err) - Error(Nil) -> Error(Unavailable) - } -} - -/// Set the current time for testing purposes. -/// The `now` value is in milliseconds. -/// -pub fn set_now(rate_limiter: RateLimiterActor(id), now: Int) -> Nil { - let _ = utils.safe_call(rate_limiter, SetNow(now, _), call_timeout) - Nil -} diff --git a/test/glimit_rate_limiter_test.gleam b/test/glimit_rate_limiter_test.gleam deleted file mode 100644 index 69214d3..0000000 --- a/test/glimit_rate_limiter_test.gleam +++ /dev/null @@ -1,345 +0,0 @@ -import gleam/erlang/process -import gleam/list -import gleam/option.{Some} -import gleeunit/should -import glimit/memory_store -import glimit/rate_limiter - -fn new_rl( - per_second: fn(String) -> Int, - burst_limit: fn(String) -> Int, - max_idle_ms: option.Option(Int), -) -> #(rate_limiter.RateLimiterActor(String), memory_store.MemoryStore) { - let assert Ok(#(store, ms)) = memory_store.new(max_idle_ms, 10_000) - let assert Ok(rl) = rate_limiter.new(per_second, burst_limit, store) - #(rl, ms) -} - -pub fn hit_returns_ok_test() { - let #(rl, _ms) = new_rl(fn(_) { 2 }, fn(_) { 2 }, Some(60_000)) - rate_limiter.hit(rl, "a") |> should.be_ok -} - -pub fn hit_rate_limited_test() { - let #(rl, _ms) = new_rl(fn(_) { 2 }, fn(_) { 2 }, Some(60_000)) - rate_limiter.hit(rl, "a") |> should.be_ok - rate_limiter.hit(rl, "a") |> should.be_ok - rate_limiter.hit(rl, "a") |> should.equal(Error(rate_limiter.RateLimited)) -} - -pub fn hit_different_ids_test() { - let #(rl, _ms) = new_rl(fn(_) { 1 }, fn(_) { 1 }, Some(60_000)) - rate_limiter.hit(rl, "a") |> should.be_ok - rate_limiter.hit(rl, "a") |> should.equal(Error(rate_limiter.RateLimited)) - rate_limiter.hit(rl, "b") |> should.be_ok - rate_limiter.hit(rl, "b") |> should.equal(Error(rate_limiter.RateLimited)) -} - -pub fn get_count_empty_test() { - let #(_rl, ms) = new_rl(fn(_) { 2 }, fn(_) { 2 }, Some(60_000)) - memory_store.get_count(ms) |> should.equal(0) -} - -pub fn get_count_after_hits_test() { - let #(rl, ms) = new_rl(fn(_) { 2 }, fn(_) { 2 }, Some(60_000)) - rate_limiter.hit(rl, "a") |> should.be_ok - rate_limiter.hit(rl, "b") |> should.be_ok - memory_store.get_count(ms) |> should.equal(2) -} - -pub fn same_id_same_count_test() { - let #(rl, ms) = new_rl(fn(_) { 2 }, fn(_) { 2 }, Some(60_000)) - rate_limiter.hit(rl, "a") |> should.be_ok - rate_limiter.hit(rl, "a") |> should.be_ok - memory_store.get_count(ms) |> should.equal(1) -} - -pub fn sweep_full_bucket_test() { - let #(rl, ms) = new_rl(fn(_) { 2 }, fn(_) { 2 }, Some(60_000)) - rate_limiter.set_now(rl, 0) - // Hit then let it refill to full - rate_limiter.hit(rl, "a") |> should.be_ok - let assert Ok(Nil) = memory_store.sweep(ms, 1_000_000, Some(60_000)) - // Full bucket should be swept - memory_store.get_count(ms) |> should.equal(0) -} - -pub fn sweep_not_full_bucket_test() { - let #(rl, ms) = new_rl(fn(_) { 2 }, fn(_) { 2 }, Some(60_000)) - rate_limiter.set_now(rl, 0) - rate_limiter.hit(rl, "a") |> should.be_ok - let assert Ok(Nil) = memory_store.sweep(ms, 0, Some(60_000)) - // Not full — should be kept - memory_store.get_count(ms) |> should.equal(1) -} - -pub fn sweep_after_long_time_test() { - let #(rl, ms) = new_rl(fn(_) { 2 }, fn(_) { 2 }, Some(60_000)) - rate_limiter.set_now(rl, 0) - rate_limiter.hit(rl, "a") |> should.be_ok - rate_limiter.hit(rl, "a") |> should.be_ok - rate_limiter.hit(rl, "a") - |> should.equal(Error(rate_limiter.RateLimited)) - - // After a long time, bucket refills to full - let assert Ok(Nil) = memory_store.sweep(ms, 1_000_000, Some(60_000)) - memory_store.get_count(ms) |> should.equal(0) -} - -pub fn sweep_empty_test() { - let #(_rl, ms) = new_rl(fn(_) { 2 }, fn(_) { 2 }, Some(60_000)) - let assert Ok(Nil) = memory_store.sweep(ms, 0, Some(60_000)) -} - -pub fn sweep_mixed_buckets_test() { - let #(rl, ms) = new_rl(fn(_) { 2 }, fn(_) { 2 }, Some(60_000)) - rate_limiter.set_now(rl, 0) - - rate_limiter.hit(rl, "a") |> should.be_ok - rate_limiter.hit(rl, "a") |> should.be_ok - rate_limiter.hit(rl, "b") |> should.be_ok - rate_limiter.hit(rl, "c") |> should.be_ok - rate_limiter.hit(rl, "c") |> should.be_ok - - // After long time, all refill to full - let assert Ok(Nil) = memory_store.sweep(ms, 1_000_000, Some(60_000)) - // All refilled to full → all swept - memory_store.get_count(ms) |> should.equal(0) -} - -pub fn sweep_keeps_active_test() { - let #(rl, ms) = new_rl(fn(_) { 2 }, fn(_) { 2 }, Some(60_000)) - rate_limiter.set_now(rl, 0) - rate_limiter.hit(rl, "a") |> should.be_ok - let assert Ok(Nil) = memory_store.sweep(ms, 0, Some(60_000)) - // "a" has 1 token at t=0, not full → kept - memory_store.get_count(ms) |> should.equal(1) -} - -pub fn sweep_all_active_test() { - let #(rl, ms) = new_rl(fn(_) { 2 }, fn(_) { 2 }, Some(60_000)) - rate_limiter.set_now(rl, 0) - rate_limiter.hit(rl, "a") |> should.be_ok - rate_limiter.hit(rl, "b") |> should.be_ok - rate_limiter.hit(rl, "c") |> should.be_ok - let assert Ok(Nil) = memory_store.sweep(ms, 0, Some(60_000)) - // All have 1 token, not full → all kept - memory_store.get_count(ms) |> should.equal(3) -} - -pub fn remove_test() { - let #(rl, ms) = new_rl(fn(_) { 2 }, fn(_) { 2 }, Some(60_000)) - rate_limiter.hit(rl, "a") |> should.be_ok - memory_store.get_count(ms) |> should.equal(1) - let assert Ok(Nil) = memory_store.remove(ms, "glimit:\"a\"") - memory_store.get_count(ms) |> should.equal(0) -} - -pub fn remove_nonexistent_test() { - let #(_rl, ms) = new_rl(fn(_) { 2 }, fn(_) { 2 }, Some(60_000)) - memory_store.remove(ms, "glimit:\"nonexistent\"") |> should.equal(Ok(Nil)) -} - -pub fn set_now_and_hit_test() { - let #(rl, _ms) = new_rl(fn(_) { 1 }, fn(_) { 3 }, Some(60_000)) - rate_limiter.set_now(rl, 0) - - rate_limiter.hit(rl, "a") |> should.be_ok - rate_limiter.hit(rl, "a") |> should.be_ok - rate_limiter.hit(rl, "a") |> should.be_ok - rate_limiter.hit(rl, "a") - |> should.equal(Error(rate_limiter.RateLimited)) - - rate_limiter.set_now(rl, 1000) - rate_limiter.hit(rl, "a") |> should.be_ok - rate_limiter.hit(rl, "a") - |> should.equal(Error(rate_limiter.RateLimited)) -} - -pub fn sweep_get_count_after_sweep_test() { - let #(rl, ms) = new_rl(fn(_) { 2 }, fn(_) { 2 }, Some(60_000)) - rate_limiter.set_now(rl, 0) - - // Hit "keep" so it's active - rate_limiter.hit(rl, "keep") |> should.be_ok - // Hit "remove" and exhaust - rate_limiter.hit(rl, "remove") |> should.be_ok - rate_limiter.hit(rl, "remove") |> should.be_ok - - // At t=1_000_000, both refill to full - let assert Ok(Nil) = memory_store.sweep(ms, 1_000_000, Some(60_000)) - - // Both full → both swept - memory_store.get_count(ms) |> should.equal(0) -} - -pub fn dynamic_config_test() { - let assert Ok(#(store, _ms)) = memory_store.new(Some(60_000), 10_000) - let assert Ok(rl) = - rate_limiter.new( - fn(id) { - case id { - "fast" -> 10 - _ -> 1 - } - }, - fn(id) { - case id { - "fast" -> 10 - _ -> 1 - } - }, - store, - ) - - rate_limiter.hit(rl, "fast") |> should.be_ok - rate_limiter.hit(rl, "fast") |> should.be_ok - rate_limiter.hit(rl, "slow") |> should.be_ok - rate_limiter.hit(rl, "slow") - |> should.equal(Error(rate_limiter.RateLimited)) -} - -pub fn invalid_config_returns_unavailable_test() { - let assert Ok(#(store, ms)) = memory_store.new(Some(60_000), 10_000) - let assert Ok(rl) = - rate_limiter.new( - fn(id) { - case id { - "bad" -> 0 - _ -> 2 - } - }, - fn(id) { - case id { - "bad" -> 0 - _ -> 2 - } - }, - store, - ) - - // Valid identifier works - rate_limiter.hit(rl, "good") |> should.be_ok - - // Invalid config fails open with Unavailable - rate_limiter.hit(rl, "bad") - |> should.equal(Error(rate_limiter.Unavailable)) - - // Invalid identifier is not stored - memory_store.get_count(ms) |> should.equal(1) -} - -pub fn crashing_callback_returns_unavailable_test() { - let assert Ok(#(store, _ms)) = memory_store.new(Some(60_000), 10_000) - let assert Ok(rl) = - rate_limiter.new( - fn(id) { - case id { - "crash" -> panic as "boom" - _ -> 2 - } - }, - fn(id) { - case id { - "crash" -> panic as "boom" - _ -> 2 - } - }, - store, - ) - - // Crashing callback should return Unavailable, not kill the actor - rate_limiter.hit(rl, "crash") - |> should.equal(Error(rate_limiter.Unavailable)) - - // Actor is still alive and serving other identifiers - rate_limiter.hit(rl, "good") |> should.be_ok -} - -pub fn crashing_single_callback_returns_unavailable_test() { - let assert Ok(#(store, _ms)) = memory_store.new(Some(60_000), 10_000) - let assert Ok(rl) = - rate_limiter.new( - fn(id) { - case id { - "crash" -> panic as "boom" - _ -> 2 - } - }, - fn(_) { 2 }, - store, - ) - - rate_limiter.hit(rl, "crash") - |> should.equal(Error(rate_limiter.Unavailable)) - - rate_limiter.hit(rl, "good") |> should.be_ok -} - -pub fn sweep_idle_bucket_test() { - let #(rl, ms) = new_rl(fn(_) { 1 }, fn(_) { 100 }, Some(60_000)) - rate_limiter.set_now(rl, 0) - - // Exhaust all 100 tokens - list.repeat(Nil, 100) - |> list.each(fn(_) { - let _ = rate_limiter.hit(rl, "a") - Nil - }) - - memory_store.get_count(ms) |> should.equal(1) - - // At t=61_000: tokens = 0 + 61 = 61 < 100, not full - // But idle for 61s > 60s threshold — should be swept - let assert Ok(Nil) = memory_store.sweep(ms, 61_000, Some(60_000)) - - memory_store.get_count(ms) |> should.equal(0) -} - -pub fn sweep_idle_exact_boundary_test() { - let #(rl, ms) = new_rl(fn(_) { 1 }, fn(_) { 100 }, Some(60_000)) - rate_limiter.set_now(rl, 0) - - list.repeat(Nil, 100) - |> list.each(fn(_) { - let _ = rate_limiter.hit(rl, "a") - Nil - }) - - // 60_000 - 0 = 60_000, which is NOT > 60_000 — kept - let assert Ok(Nil) = memory_store.sweep(ms, 60_000, Some(60_000)) - memory_store.get_count(ms) |> should.equal(1) -} - -pub fn sweep_idle_preserves_recent_bucket_test() { - let #(rl, ms) = new_rl(fn(_) { 1 }, fn(_) { 100 }, Some(60_000)) - rate_limiter.set_now(rl, 0) - - list.repeat(Nil, 100) - |> list.each(fn(_) { - let _ = rate_limiter.hit(rl, "a") - Nil - }) - - // At t=59_000: tokens = 59 < 100 (not full), idle for 59s < 60s (not idle) - let assert Ok(Nil) = memory_store.sweep(ms, 59_000, Some(60_000)) - - // Should be kept — not full and not idle - memory_store.get_count(ms) |> should.equal(1) -} - -pub fn dead_rate_limiter_returns_unavailable_test() { - let #(rl, _ms) = new_rl(fn(_) { 2 }, fn(_) { 2 }, Some(60_000)) - - // Trap exits so the kill signal doesn't crash the test process - let _trapped = process.trap_exits(True) - - // Kill the actor - let assert Ok(pid) = process.subject_owner(rl) - process.kill(pid) - process.sleep(10) - - // Hit should return Unavailable, not crash - rate_limiter.hit(rl, "a") - |> should.equal(Error(rate_limiter.Unavailable)) -} diff --git a/test/glimit_test.gleam b/test/glimit_test.gleam index 42a3834..e0357a9 100644 --- a/test/glimit_test.gleam +++ b/test/glimit_test.gleam @@ -8,7 +8,6 @@ import gleeunit/should import glimit import glimit/bucket import glimit/memory_store -import glimit/rate_limiter pub fn main() { gleeunit.main() @@ -59,41 +58,32 @@ pub fn burst_limit_test() { |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) |> glimit.build - let func = - fn(_) { "OK" } - |> glimit.apply_built(limiter) - - rate_limiter.set_now(limiter.rate_limiter_actor, 0) - func(Nil) |> should.equal("OK") - func(Nil) |> should.equal("OK") - func(Nil) |> should.equal("OK") - func(Nil) |> should.equal("Stop!") - func(Nil) |> should.equal("Stop!") - - rate_limiter.set_now(limiter.rate_limiter_actor, 1000) - func(Nil) |> should.equal("OK") - func(Nil) |> should.equal("Stop!") - func(Nil) |> should.equal("Stop!") - - rate_limiter.set_now(limiter.rate_limiter_actor, 3000) - func(Nil) |> should.equal("OK") - func(Nil) |> should.equal("OK") - func(Nil) |> should.equal("Stop!") - func(Nil) |> should.equal("Stop!") - - rate_limiter.set_now(limiter.rate_limiter_actor, 6000) - func(Nil) |> should.equal("OK") - func(Nil) |> should.equal("OK") - func(Nil) |> should.equal("OK") - func(Nil) |> should.equal("Stop!") - func(Nil) |> should.equal("Stop!") - - rate_limiter.set_now(limiter.rate_limiter_actor, 13_000) - func(Nil) |> should.equal("OK") - func(Nil) |> should.equal("OK") - func(Nil) |> should.equal("OK") - func(Nil) |> should.equal("Stop!") - func(Nil) |> should.equal("Stop!") + let limiter = set_now(limiter, 0) + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.equal(Error(glimit.RateLimited)) + + let limiter = set_now(limiter, 1000) + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.equal(Error(glimit.RateLimited)) + + let limiter = set_now(limiter, 3000) + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.equal(Error(glimit.RateLimited)) + + let limiter = set_now(limiter, 6000) + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.equal(Error(glimit.RateLimited)) + + let limiter = set_now(limiter, 13_000) + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.equal(Error(glimit.RateLimited)) } pub fn dynamic_per_second_test() { @@ -136,34 +126,26 @@ pub fn dynamic_per_second_static_burst_limit_test() { |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) |> glimit.build - let func = - fn(_) { "OK" } - |> glimit.apply_built(limiter) - - rate_limiter.set_now(limiter.rate_limiter_actor, 0) - func("id") |> should.equal("OK") - func("id") |> should.equal("OK") - func("id") |> should.equal("OK") - func("id") |> should.equal("Stop!") - func("id") |> should.equal("Stop!") - - rate_limiter.set_now(limiter.rate_limiter_actor, 1000) - func("id") |> should.equal("OK") - func("id") |> should.equal("OK") - func("id") |> should.equal("Stop!") - func("id") |> should.equal("Stop!") - - rate_limiter.set_now(limiter.rate_limiter_actor, 0) - func("other") |> should.equal("OK") - func("other") |> should.equal("OK") - func("other") |> should.equal("OK") - func("other") |> should.equal("Stop!") - func("other") |> should.equal("Stop!") - - rate_limiter.set_now(limiter.rate_limiter_actor, 1000) - func("other") |> should.equal("OK") - func("other") |> should.equal("Stop!") - func("other") |> should.equal("Stop!") + let limiter = set_now(limiter, 0) + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.equal(Error(glimit.RateLimited)) + + let limiter = set_now(limiter, 1000) + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.equal(Error(glimit.RateLimited)) + + let limiter = set_now(limiter, 0) + glimit.hit(limiter, "other") |> should.be_ok + glimit.hit(limiter, "other") |> should.be_ok + glimit.hit(limiter, "other") |> should.be_ok + glimit.hit(limiter, "other") |> should.equal(Error(glimit.RateLimited)) + + let limiter = set_now(limiter, 1000) + glimit.hit(limiter, "other") |> should.be_ok + glimit.hit(limiter, "other") |> should.equal(Error(glimit.RateLimited)) } pub fn static_per_second_dynamic_burst_limit_test() { @@ -180,32 +162,24 @@ pub fn static_per_second_dynamic_burst_limit_test() { |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) |> glimit.build - let func = - fn(_) { "OK" } - |> glimit.apply_built(limiter) - - rate_limiter.set_now(limiter.rate_limiter_actor, 0) - func("id") |> should.equal("OK") - func("id") |> should.equal("OK") - func("id") |> should.equal("OK") - func("id") |> should.equal("Stop!") - func("id") |> should.equal("Stop!") + let limiter = set_now(limiter, 0) + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.equal(Error(glimit.RateLimited)) - rate_limiter.set_now(limiter.rate_limiter_actor, 1000) - func("id") |> should.equal("OK") - func("id") |> should.equal("Stop!") - func("id") |> should.equal("Stop!") + let limiter = set_now(limiter, 1000) + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.equal(Error(glimit.RateLimited)) - rate_limiter.set_now(limiter.rate_limiter_actor, 0) - func("other") |> should.equal("OK") - func("other") |> should.equal("OK") - func("other") |> should.equal("Stop!") - func("other") |> should.equal("Stop!") + let limiter = set_now(limiter, 0) + glimit.hit(limiter, "other") |> should.be_ok + glimit.hit(limiter, "other") |> should.be_ok + glimit.hit(limiter, "other") |> should.equal(Error(glimit.RateLimited)) - rate_limiter.set_now(limiter.rate_limiter_actor, 1000) - func("other") |> should.equal("OK") - func("other") |> should.equal("Stop!") - func("other") |> should.equal("Stop!") + let limiter = set_now(limiter, 1000) + glimit.hit(limiter, "other") |> should.be_ok + glimit.hit(limiter, "other") |> should.equal(Error(glimit.RateLimited)) } pub fn dynamic_per_second_dynamic_burst_limit_test() { @@ -227,35 +201,27 @@ pub fn dynamic_per_second_dynamic_burst_limit_test() { |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) |> glimit.build - let func = - fn(_) { "OK" } - |> glimit.apply_built(limiter) - - rate_limiter.set_now(limiter.rate_limiter_actor, 0) - func("id") |> should.equal("OK") - func("id") |> should.equal("OK") - func("id") |> should.equal("OK") - func("id") |> should.equal("OK") - func("id") |> should.equal("Stop!") - func("id") |> should.equal("Stop!") - - rate_limiter.set_now(limiter.rate_limiter_actor, 1000) - func("id") |> should.equal("OK") - func("id") |> should.equal("OK") - func("id") |> should.equal("Stop!") - func("id") |> should.equal("Stop!") - - rate_limiter.set_now(limiter.rate_limiter_actor, 0) - func("other") |> should.equal("OK") - func("other") |> should.equal("OK") - func("other") |> should.equal("OK") - func("other") |> should.equal("Stop!") - func("other") |> should.equal("Stop!") - - rate_limiter.set_now(limiter.rate_limiter_actor, 1000) - func("other") |> should.equal("OK") - func("other") |> should.equal("Stop!") - func("other") |> should.equal("Stop!") + let limiter = set_now(limiter, 0) + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.equal(Error(glimit.RateLimited)) + + let limiter = set_now(limiter, 1000) + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.equal(Error(glimit.RateLimited)) + + let limiter = set_now(limiter, 0) + glimit.hit(limiter, "other") |> should.be_ok + glimit.hit(limiter, "other") |> should.be_ok + glimit.hit(limiter, "other") |> should.be_ok + glimit.hit(limiter, "other") |> should.equal(Error(glimit.RateLimited)) + + let limiter = set_now(limiter, 1000) + glimit.hit(limiter, "other") |> should.be_ok + glimit.hit(limiter, "other") |> should.equal(Error(glimit.RateLimited)) } pub fn sweep_preserves_active_limiters_test() { @@ -266,18 +232,13 @@ pub fn sweep_preserves_active_limiters_test() { |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) |> glimit.build - let func = - fn(_) { "OK" } - |> glimit.apply_built(limiter) - let assert option.Some(ms) = limiter.memory_store - - rate_limiter.set_now(limiter.rate_limiter_actor, 0) + let limiter = set_now(limiter, 0) // Hit "user_a" once — active, not full - func("user_a") |> should.equal("OK") - func("user_b") |> should.equal("OK") - func("user_b") |> should.equal("OK") + glimit.hit(limiter, "user_a") |> should.be_ok + glimit.hit(limiter, "user_b") |> should.be_ok + glimit.hit(limiter, "user_b") |> should.be_ok let assert Ok(Nil) = memory_store.sweep(ms, 0, option.Some(60_000)) @@ -285,8 +246,8 @@ pub fn sweep_preserves_active_limiters_test() { memory_store.get_count(ms) |> should.equal(2) // user_a still has 1 token left - func("user_a") |> should.equal("OK") - func("user_a") |> should.equal("Stop!") + glimit.hit(limiter, "user_a") |> should.be_ok + glimit.hit(limiter, "user_a") |> should.equal(Error(glimit.RateLimited)) } pub fn integration_many_identifiers_test() { @@ -298,13 +259,8 @@ pub fn integration_many_identifiers_test() { |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) |> glimit.build - let func = - fn(_) { "OK" } - |> glimit.apply_built(limiter) - let assert option.Some(ms) = limiter.memory_store - - rate_limiter.set_now(limiter.rate_limiter_actor, 0) + let limiter = set_now(limiter, 0) let ids = [ "id_0", "id_1", "id_2", "id_3", "id_4", "id_5", "id_6", "id_7", "id_8", @@ -314,7 +270,7 @@ pub fn integration_many_identifiers_test() { // Hit each ID i times (id_0: 0 hits, id_1: 1 hit, etc.) list.index_map(ids, fn(id, i) { list.repeat(Nil, i) - |> list.each(fn(_) { func(id) |> ignore }) + |> list.each(fn(_) { glimit.hit(limiter, id) |> ignore }) }) // id_0 was never hit so doesn't exist @@ -337,21 +293,16 @@ pub fn integration_sweep_then_reuse_test() { |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) |> glimit.build - let func = - fn(_) { "OK" } - |> glimit.apply_built(limiter) - let assert option.Some(ms) = limiter.memory_store - - rate_limiter.set_now(limiter.rate_limiter_actor, 0) + let limiter = set_now(limiter, 0) // Exhaust "user_a" (all tokens used) - func("user_a") |> should.equal("OK") - func("user_a") |> should.equal("OK") - func("user_a") |> should.equal("Stop!") + glimit.hit(limiter, "user_a") |> should.be_ok + glimit.hit(limiter, "user_a") |> should.be_ok + glimit.hit(limiter, "user_a") |> should.equal(Error(glimit.RateLimited)) // Hit "user_b" once - func("user_b") |> should.equal("OK") + glimit.hit(limiter, "user_b") |> should.be_ok let assert Ok(Nil) = memory_store.sweep(ms, 0, option.Some(60_000)) @@ -359,13 +310,13 @@ pub fn integration_sweep_then_reuse_test() { memory_store.get_count(ms) |> should.equal(2) // "user_a" is still rate-limited at t=0 - func("user_a") |> should.equal("Stop!") + glimit.hit(limiter, "user_a") |> should.equal(Error(glimit.RateLimited)) // Advance time so user_a gets tokens back - rate_limiter.set_now(limiter.rate_limiter_actor, 1000) - func("user_a") |> should.equal("OK") - func("user_a") |> should.equal("OK") - func("user_a") |> should.equal("Stop!") + let limiter = set_now(limiter, 1000) + glimit.hit(limiter, "user_a") |> should.be_ok + glimit.hit(limiter, "user_a") |> should.be_ok + glimit.hit(limiter, "user_a") |> should.equal(Error(glimit.RateLimited)) } pub fn sub_second_remainder_preservation_test() { @@ -378,38 +329,36 @@ pub fn sub_second_remainder_preservation_test() { |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) |> glimit.build - let reg = limiter.rate_limiter_actor - // Consume all 10 tokens at t=0 - rate_limiter.set_now(reg, 0) + let limiter = set_now(limiter, 0) list.repeat(Nil, 10) - |> list.each(fn(_) { rate_limiter.hit(reg, "id") |> ignore }) - rate_limiter.hit(reg, "id") |> should.equal(Error(rate_limiter.RateLimited)) + |> list.each(fn(_) { glimit.hit(limiter, "id") |> ignore }) + glimit.hit(limiter, "id") |> should.equal(Error(glimit.RateLimited)) // 400ms: tc = 0.0 + 2.0 * 400 / 1000 = 0.8 < 1.0 — still rate limited - rate_limiter.set_now(reg, 400) - rate_limiter.hit(reg, "id") |> should.equal(Error(rate_limiter.RateLimited)) + let limiter = set_now(limiter, 400) + glimit.hit(limiter, "id") |> should.equal(Error(glimit.RateLimited)) // 500ms: tc = 0.8 + 2.0 * 100 / 1000 = 1.0 >= 1.0 — succeeds - rate_limiter.set_now(reg, 500) - rate_limiter.hit(reg, "id") |> should.be_ok - rate_limiter.hit(reg, "id") |> should.equal(Error(rate_limiter.RateLimited)) + let limiter = set_now(limiter, 500) + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.equal(Error(glimit.RateLimited)) // 900ms: tc = 0.0 + 2.0 * 400 / 1000 = 0.8 < 1.0 - rate_limiter.set_now(reg, 900) - rate_limiter.hit(reg, "id") |> should.equal(Error(rate_limiter.RateLimited)) + let limiter = set_now(limiter, 900) + glimit.hit(limiter, "id") |> should.equal(Error(glimit.RateLimited)) // 1000ms: tc = 0.8 + 2.0 * 100 / 1000 = 1.0 >= 1.0 - rate_limiter.set_now(reg, 1000) - rate_limiter.hit(reg, "id") |> should.be_ok - rate_limiter.hit(reg, "id") |> should.equal(Error(rate_limiter.RateLimited)) + let limiter = set_now(limiter, 1000) + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.equal(Error(glimit.RateLimited)) // 2500ms: tc = 0.0 + 2.0 * 1500 / 1000 = 3.0 — 3 tokens - rate_limiter.set_now(reg, 2500) - rate_limiter.hit(reg, "id") |> should.be_ok - rate_limiter.hit(reg, "id") |> should.be_ok - rate_limiter.hit(reg, "id") |> should.be_ok - rate_limiter.hit(reg, "id") |> should.equal(Error(rate_limiter.RateLimited)) + let limiter = set_now(limiter, 2500) + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.equal(Error(glimit.RateLimited)) } pub fn sub_second_remainder_non_divisible_rate_test() { @@ -422,40 +371,38 @@ pub fn sub_second_remainder_non_divisible_rate_test() { |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) |> glimit.build - let reg = limiter.rate_limiter_actor - // Drain all 5 tokens at t=0 - rate_limiter.set_now(reg, 0) + let limiter = set_now(limiter, 0) list.repeat(Nil, 5) - |> list.each(fn(_) { rate_limiter.hit(reg, "id") |> ignore }) - rate_limiter.hit(reg, "id") |> should.equal(Error(rate_limiter.RateLimited)) + |> list.each(fn(_) { glimit.hit(limiter, "id") |> ignore }) + glimit.hit(limiter, "id") |> should.equal(Error(glimit.RateLimited)) // 333ms: tc = 0.0 + 3.0 * 333 / 1000 = 0.999 < 1.0 - rate_limiter.set_now(reg, 333) - rate_limiter.hit(reg, "id") |> should.equal(Error(rate_limiter.RateLimited)) + let limiter = set_now(limiter, 333) + glimit.hit(limiter, "id") |> should.equal(Error(glimit.RateLimited)) // 334ms: tc = 0.999 + 3.0 * 1 / 1000 = 1.002 >= 1.0 — succeeds - rate_limiter.set_now(reg, 334) - rate_limiter.hit(reg, "id") |> should.be_ok - rate_limiter.hit(reg, "id") |> should.equal(Error(rate_limiter.RateLimited)) + let limiter = set_now(limiter, 334) + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.equal(Error(glimit.RateLimited)) // 666ms: tc = 0.002 + 3.0 * 332 / 1000 = 0.998 < 1.0 - rate_limiter.set_now(reg, 666) - rate_limiter.hit(reg, "id") |> should.equal(Error(rate_limiter.RateLimited)) + let limiter = set_now(limiter, 666) + glimit.hit(limiter, "id") |> should.equal(Error(glimit.RateLimited)) // 667ms: tc = 0.998 + 3.0 * 1 / 1000 = 1.001 >= 1.0 — succeeds - rate_limiter.set_now(reg, 667) - rate_limiter.hit(reg, "id") |> should.be_ok - rate_limiter.hit(reg, "id") |> should.equal(Error(rate_limiter.RateLimited)) + let limiter = set_now(limiter, 667) + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.equal(Error(glimit.RateLimited)) // 3000ms: tc = 0.001 + 3.0 * 2333 / 1000 = 7.0, capped at burst limit 5 - rate_limiter.set_now(reg, 3000) - rate_limiter.hit(reg, "id") |> should.be_ok - rate_limiter.hit(reg, "id") |> should.be_ok - rate_limiter.hit(reg, "id") |> should.be_ok - rate_limiter.hit(reg, "id") |> should.be_ok - rate_limiter.hit(reg, "id") |> should.be_ok - rate_limiter.hit(reg, "id") |> should.equal(Error(rate_limiter.RateLimited)) + let limiter = set_now(limiter, 3000) + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.equal(Error(glimit.RateLimited)) } pub fn build_missing_per_second_test() { @@ -492,21 +439,16 @@ pub fn builder_overwrite_test() { |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) |> glimit.build - let func = - fn(_) { "OK" } - |> glimit.apply_built(limiter) - - rate_limiter.set_now(limiter.rate_limiter_actor, 0) + let limiter = set_now(limiter, 0) // burst_limit=2: two hits succeed, third is limited - func(Nil) |> should.equal("OK") - func(Nil) |> should.equal("OK") - // on_limit_exceeded returns "Stop!" (not "wrong") - func(Nil) |> should.equal("Stop!") + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.equal(Error(glimit.RateLimited)) // Advance 1 second — per_second=1 so only 1 token refilled (not 999) - rate_limiter.set_now(limiter.rate_limiter_actor, 1000) - func(Nil) |> should.equal("OK") - func(Nil) |> should.equal("Stop!") + let limiter = set_now(limiter, 1000) + glimit.hit(limiter, "id") |> should.be_ok + glimit.hit(limiter, "id") |> should.equal(Error(glimit.RateLimited)) } pub fn apply2_test() { @@ -566,17 +508,12 @@ pub fn custom_max_idle_test() { |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) |> glimit.build - let func = - fn(_) { "OK" } - |> glimit.apply_built(limiter) - let assert option.Some(ms) = limiter.memory_store - - rate_limiter.set_now(limiter.rate_limiter_actor, 0) + let limiter = set_now(limiter, 0) // Exhaust all 100 tokens list.repeat(Nil, 100) - |> list.each(fn(_) { func(Nil) |> ignore }) + |> list.each(fn(_) { glimit.hit(limiter, "id") |> ignore }) memory_store.get_count(ms) |> should.equal(1) @@ -599,17 +536,12 @@ pub fn disabled_idle_eviction_test() { |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) |> glimit.build - let func = - fn(_) { "OK" } - |> glimit.apply_built(limiter) - let assert option.Some(ms) = limiter.memory_store - - rate_limiter.set_now(limiter.rate_limiter_actor, 0) + let limiter = set_now(limiter, 0) // Exhaust all 100 tokens list.repeat(Nil, 100) - |> list.each(fn(_) { func(Nil) |> ignore }) + |> list.each(fn(_) { glimit.hit(limiter, "id") |> ignore }) // At t=61_000: bucket has 61 tokens (not full) and has been idle for >60s. // With default idle eviction this would be swept, but max_idle(0) disables it. @@ -627,16 +559,11 @@ pub fn negative_max_idle_disables_eviction_test() { |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) |> glimit.build - let func = - fn(_) { "OK" } - |> glimit.apply_built(limiter) - let assert option.Some(ms) = limiter.memory_store - - rate_limiter.set_now(limiter.rate_limiter_actor, 0) + let limiter = set_now(limiter, 0) list.repeat(Nil, 100) - |> list.each(fn(_) { func(Nil) |> ignore }) + |> list.each(fn(_) { glimit.hit(limiter, "id") |> ignore }) // At t=61_000: idle for >60s but eviction is disabled let assert Ok(Nil) = memory_store.sweep(ms, 61_000, option.None) @@ -654,15 +581,11 @@ pub fn max_idle_overwrite_test() { |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) |> glimit.build - let func = - fn(_) { "OK" } - |> glimit.apply_built(limiter) - let assert option.Some(ms) = limiter.memory_store + let limiter = set_now(limiter, 0) - rate_limiter.set_now(limiter.rate_limiter_actor, 0) list.repeat(Nil, 100) - |> list.each(fn(_) { func(Nil) |> ignore }) + |> list.each(fn(_) { glimit.hit(limiter, "id") |> ignore }) // At t=61_000: idle 61s, but max_idle is 120s (last set value) — kept let assert Ok(Nil) = memory_store.sweep(ms, 61_000, option.Some(120_000)) @@ -673,7 +596,7 @@ pub fn max_idle_overwrite_test() { memory_store.get_count(ms) |> should.equal(0) } -pub fn dead_rate_limiter_fails_open_test() { +pub fn dead_memory_store_fails_open_test() { let assert Ok(limiter) = glimit.new() |> glimit.per_second(2) @@ -691,8 +614,9 @@ pub fn dead_rate_limiter_fails_open_test() { // Trap exits so the kill signal doesn't crash the test process let _trapped = process.trap_exits(True) - // Kill the rate limiter actor - let assert Ok(pid) = process.subject_owner(limiter.rate_limiter_actor) + // Kill the memory store actor + let assert option.Some(ms) = limiter.memory_store + let assert Ok(pid) = memory_store.pid(ms) process.kill(pid) process.sleep(10) @@ -733,23 +657,507 @@ pub fn per_second_negative_fails_open_test() { func(Nil) |> should.equal("OK") } +// Tests merged from glimit_rate_limiter_test + +pub fn hit_returns_ok_test() { + let assert Ok(limiter) = + glimit.new() + |> glimit.per_second(2) + |> glimit.burst_limit(2) + |> glimit.identifier(fn(x) { x }) + |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) + |> glimit.build + + glimit.hit(limiter, "a") |> should.be_ok +} + +pub fn hit_rate_limited_test() { + let assert Ok(limiter) = + glimit.new() + |> glimit.per_second(2) + |> glimit.burst_limit(2) + |> glimit.identifier(fn(x) { x }) + |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) + |> glimit.build + + glimit.hit(limiter, "a") |> should.be_ok + glimit.hit(limiter, "a") |> should.be_ok + glimit.hit(limiter, "a") |> should.equal(Error(glimit.RateLimited)) +} + +pub fn hit_different_ids_test() { + let assert Ok(limiter) = + glimit.new() + |> glimit.per_second(1) + |> glimit.burst_limit(1) + |> glimit.identifier(fn(x) { x }) + |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) + |> glimit.build + + glimit.hit(limiter, "a") |> should.be_ok + glimit.hit(limiter, "a") |> should.equal(Error(glimit.RateLimited)) + glimit.hit(limiter, "b") |> should.be_ok + glimit.hit(limiter, "b") |> should.equal(Error(glimit.RateLimited)) +} + +pub fn get_count_empty_test() { + let assert Ok(limiter) = + glimit.new() + |> glimit.per_second(2) + |> glimit.burst_limit(2) + |> glimit.identifier(fn(x) { x }) + |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) + |> glimit.build + + glimit.get_count(limiter) |> should.equal(0) +} + +pub fn get_count_after_hits_test() { + let assert Ok(limiter) = + glimit.new() + |> glimit.per_second(2) + |> glimit.burst_limit(2) + |> glimit.identifier(fn(x) { x }) + |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) + |> glimit.build + + glimit.hit(limiter, "a") |> should.be_ok + glimit.hit(limiter, "b") |> should.be_ok + glimit.get_count(limiter) |> should.equal(2) +} + +pub fn same_id_same_count_test() { + let assert Ok(limiter) = + glimit.new() + |> glimit.per_second(2) + |> glimit.burst_limit(2) + |> glimit.identifier(fn(x) { x }) + |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) + |> glimit.build + + glimit.hit(limiter, "a") |> should.be_ok + glimit.hit(limiter, "a") |> should.be_ok + glimit.get_count(limiter) |> should.equal(1) +} + +pub fn sweep_full_bucket_test() { + let assert Ok(limiter) = + glimit.new() + |> glimit.per_second(2) + |> glimit.burst_limit(2) + |> glimit.identifier(fn(x) { x }) + |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) + |> glimit.build + + let assert option.Some(ms) = limiter.memory_store + let limiter = set_now(limiter, 0) + + glimit.hit(limiter, "a") |> should.be_ok + let assert Ok(Nil) = memory_store.sweep(ms, 1_000_000, option.Some(60_000)) + memory_store.get_count(ms) |> should.equal(0) +} + +pub fn sweep_not_full_bucket_test() { + let assert Ok(limiter) = + glimit.new() + |> glimit.per_second(2) + |> glimit.burst_limit(2) + |> glimit.identifier(fn(x) { x }) + |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) + |> glimit.build + + let assert option.Some(ms) = limiter.memory_store + let limiter = set_now(limiter, 0) + + glimit.hit(limiter, "a") |> should.be_ok + let assert Ok(Nil) = memory_store.sweep(ms, 0, option.Some(60_000)) + memory_store.get_count(ms) |> should.equal(1) +} + +pub fn sweep_after_long_time_test() { + let assert Ok(limiter) = + glimit.new() + |> glimit.per_second(2) + |> glimit.burst_limit(2) + |> glimit.identifier(fn(x) { x }) + |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) + |> glimit.build + + let assert option.Some(ms) = limiter.memory_store + let limiter = set_now(limiter, 0) + + glimit.hit(limiter, "a") |> should.be_ok + glimit.hit(limiter, "a") |> should.be_ok + glimit.hit(limiter, "a") |> should.equal(Error(glimit.RateLimited)) + + let assert Ok(Nil) = memory_store.sweep(ms, 1_000_000, option.Some(60_000)) + memory_store.get_count(ms) |> should.equal(0) +} + +pub fn sweep_empty_test() { + let assert Ok(limiter) = + glimit.new() + |> glimit.per_second(2) + |> glimit.burst_limit(2) + |> glimit.identifier(fn(x) { x }) + |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) + |> glimit.build + + let assert option.Some(ms) = limiter.memory_store + let assert Ok(Nil) = memory_store.sweep(ms, 0, option.Some(60_000)) +} + +pub fn sweep_mixed_buckets_test() { + let assert Ok(limiter) = + glimit.new() + |> glimit.per_second(2) + |> glimit.burst_limit(2) + |> glimit.identifier(fn(x) { x }) + |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) + |> glimit.build + + let assert option.Some(ms) = limiter.memory_store + let limiter = set_now(limiter, 0) + + glimit.hit(limiter, "a") |> should.be_ok + glimit.hit(limiter, "a") |> should.be_ok + glimit.hit(limiter, "b") |> should.be_ok + glimit.hit(limiter, "c") |> should.be_ok + glimit.hit(limiter, "c") |> should.be_ok + + let assert Ok(Nil) = memory_store.sweep(ms, 1_000_000, option.Some(60_000)) + memory_store.get_count(ms) |> should.equal(0) +} + +pub fn sweep_keeps_active_test() { + let assert Ok(limiter) = + glimit.new() + |> glimit.per_second(2) + |> glimit.burst_limit(2) + |> glimit.identifier(fn(x) { x }) + |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) + |> glimit.build + + let assert option.Some(ms) = limiter.memory_store + let limiter = set_now(limiter, 0) + + glimit.hit(limiter, "a") |> should.be_ok + let assert Ok(Nil) = memory_store.sweep(ms, 0, option.Some(60_000)) + memory_store.get_count(ms) |> should.equal(1) +} + +pub fn sweep_all_active_test() { + let assert Ok(limiter) = + glimit.new() + |> glimit.per_second(2) + |> glimit.burst_limit(2) + |> glimit.identifier(fn(x) { x }) + |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) + |> glimit.build + + let assert option.Some(ms) = limiter.memory_store + let limiter = set_now(limiter, 0) + + glimit.hit(limiter, "a") |> should.be_ok + glimit.hit(limiter, "b") |> should.be_ok + glimit.hit(limiter, "c") |> should.be_ok + let assert Ok(Nil) = memory_store.sweep(ms, 0, option.Some(60_000)) + memory_store.get_count(ms) |> should.equal(3) +} + +pub fn remove_test() { + let assert Ok(limiter) = + glimit.new() + |> glimit.per_second(2) + |> glimit.burst_limit(2) + |> glimit.identifier(fn(x) { x }) + |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) + |> glimit.build + + let assert option.Some(ms) = limiter.memory_store + + glimit.hit(limiter, "a") |> should.be_ok + memory_store.get_count(ms) |> should.equal(1) + let assert Ok(Nil) = memory_store.remove(ms, "glimit:\"a\"") + memory_store.get_count(ms) |> should.equal(0) +} + +pub fn remove_nonexistent_test() { + let assert Ok(limiter) = + glimit.new() + |> glimit.per_second(2) + |> glimit.burst_limit(2) + |> glimit.identifier(fn(x) { x }) + |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) + |> glimit.build + + let assert option.Some(ms) = limiter.memory_store + memory_store.remove(ms, "glimit:\"nonexistent\"") |> should.equal(Ok(Nil)) +} + +pub fn set_now_and_hit_test() { + let assert Ok(limiter) = + glimit.new() + |> glimit.per_second(1) + |> glimit.burst_limit(3) + |> glimit.identifier(fn(x) { x }) + |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) + |> glimit.build + + let limiter = set_now(limiter, 0) + + glimit.hit(limiter, "a") |> should.be_ok + glimit.hit(limiter, "a") |> should.be_ok + glimit.hit(limiter, "a") |> should.be_ok + glimit.hit(limiter, "a") |> should.equal(Error(glimit.RateLimited)) + + let limiter = set_now(limiter, 1000) + glimit.hit(limiter, "a") |> should.be_ok + glimit.hit(limiter, "a") |> should.equal(Error(glimit.RateLimited)) +} + +pub fn sweep_get_count_after_sweep_test() { + let assert Ok(limiter) = + glimit.new() + |> glimit.per_second(2) + |> glimit.burst_limit(2) + |> glimit.identifier(fn(x) { x }) + |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) + |> glimit.build + + let assert option.Some(ms) = limiter.memory_store + let limiter = set_now(limiter, 0) + + glimit.hit(limiter, "keep") |> should.be_ok + glimit.hit(limiter, "remove") |> should.be_ok + glimit.hit(limiter, "remove") |> should.be_ok + + let assert Ok(Nil) = memory_store.sweep(ms, 1_000_000, option.Some(60_000)) + memory_store.get_count(ms) |> should.equal(0) +} + +pub fn dynamic_config_test() { + let assert Ok(limiter) = + glimit.new() + |> glimit.per_second_fn(fn(id) { + case id { + "fast" -> 10 + _ -> 1 + } + }) + |> glimit.burst_limit_fn(fn(id) { + case id { + "fast" -> 10 + _ -> 1 + } + }) + |> glimit.identifier(fn(x) { x }) + |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) + |> glimit.build + + glimit.hit(limiter, "fast") |> should.be_ok + glimit.hit(limiter, "fast") |> should.be_ok + glimit.hit(limiter, "slow") |> should.be_ok + glimit.hit(limiter, "slow") |> should.equal(Error(glimit.RateLimited)) +} + +pub fn invalid_config_returns_unavailable_test() { + let assert Ok(limiter) = + glimit.new() + |> glimit.per_second_fn(fn(id) { + case id { + "bad" -> 0 + _ -> 2 + } + }) + |> glimit.burst_limit_fn(fn(id) { + case id { + "bad" -> 0 + _ -> 2 + } + }) + |> glimit.identifier(fn(x) { x }) + |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) + |> glimit.build + + let assert option.Some(ms) = limiter.memory_store + + // Valid identifier works + glimit.hit(limiter, "good") |> should.be_ok + + // Invalid config fails open with Unavailable + glimit.hit(limiter, "bad") |> should.equal(Error(glimit.Unavailable)) + + // Invalid identifier is not stored + memory_store.get_count(ms) |> should.equal(1) +} + +pub fn crashing_callback_returns_unavailable_test() { + let assert Ok(limiter) = + glimit.new() + |> glimit.per_second_fn(fn(id) { + case id { + "crash" -> panic as "boom" + _ -> 2 + } + }) + |> glimit.burst_limit_fn(fn(id) { + case id { + "crash" -> panic as "boom" + _ -> 2 + } + }) + |> glimit.identifier(fn(x) { x }) + |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) + |> glimit.build + + // Crashing callback should return Unavailable + glimit.hit(limiter, "crash") |> should.equal(Error(glimit.Unavailable)) + + // Still serving other identifiers + glimit.hit(limiter, "good") |> should.be_ok +} + +pub fn crashing_single_callback_returns_unavailable_test() { + let assert Ok(limiter) = + glimit.new() + |> glimit.per_second_fn(fn(id) { + case id { + "crash" -> panic as "boom" + _ -> 2 + } + }) + |> glimit.burst_limit(2) + |> glimit.identifier(fn(x) { x }) + |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) + |> glimit.build + + glimit.hit(limiter, "crash") |> should.equal(Error(glimit.Unavailable)) + + glimit.hit(limiter, "good") |> should.be_ok +} + +pub fn sweep_idle_bucket_test() { + let assert Ok(limiter) = + glimit.new() + |> glimit.per_second(1) + |> glimit.burst_limit(100) + |> glimit.identifier(fn(x) { x }) + |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) + |> glimit.build + + let assert option.Some(ms) = limiter.memory_store + let limiter = set_now(limiter, 0) + + // Exhaust all 100 tokens + list.repeat(Nil, 100) + |> list.each(fn(_) { + let _ = glimit.hit(limiter, "a") + Nil + }) + + memory_store.get_count(ms) |> should.equal(1) + + // At t=61_000: tokens = 0 + 61 = 61 < 100, not full + // But idle for 61s > 60s threshold — should be swept + let assert Ok(Nil) = memory_store.sweep(ms, 61_000, option.Some(60_000)) + + memory_store.get_count(ms) |> should.equal(0) +} + +pub fn sweep_idle_exact_boundary_test() { + let assert Ok(limiter) = + glimit.new() + |> glimit.per_second(1) + |> glimit.burst_limit(100) + |> glimit.identifier(fn(x) { x }) + |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) + |> glimit.build + + let assert option.Some(ms) = limiter.memory_store + let limiter = set_now(limiter, 0) + + list.repeat(Nil, 100) + |> list.each(fn(_) { + let _ = glimit.hit(limiter, "a") + Nil + }) + + // 60_000 - 0 = 60_000, which is NOT > 60_000 — kept + let assert Ok(Nil) = memory_store.sweep(ms, 60_000, option.Some(60_000)) + memory_store.get_count(ms) |> should.equal(1) +} + +pub fn sweep_idle_preserves_recent_bucket_test() { + let assert Ok(limiter) = + glimit.new() + |> glimit.per_second(1) + |> glimit.burst_limit(100) + |> glimit.identifier(fn(x) { x }) + |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) + |> glimit.build + + let assert option.Some(ms) = limiter.memory_store + let limiter = set_now(limiter, 0) + + list.repeat(Nil, 100) + |> list.each(fn(_) { + let _ = glimit.hit(limiter, "a") + Nil + }) + + // At t=59_000: tokens = 59 < 100 (not full), idle for 59s < 60s (not idle) + let assert Ok(Nil) = memory_store.sweep(ms, 59_000, option.Some(60_000)) + + // Should be kept — not full and not idle + memory_store.get_count(ms) |> should.equal(1) +} + +pub fn dead_memory_store_returns_unavailable_test() { + let assert Ok(limiter) = + glimit.new() + |> glimit.per_second(2) + |> glimit.burst_limit(2) + |> glimit.identifier(fn(x) { x }) + |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) + |> glimit.build + + // Trap exits so the kill signal doesn't crash the test process + let _trapped = process.trap_exits(True) + + // Kill the memory store actor + let assert option.Some(ms) = limiter.memory_store + let assert Ok(pid) = memory_store.pid(ms) + process.kill(pid) + process.sleep(10) + + // Hit should return StoreLockFailed (lock_and_get fails on dead actor), not crash + glimit.hit(limiter, "a") |> should.equal(Error(glimit.StoreLockFailed)) +} + fn ignore(_value: a) -> Nil { Nil } +fn set_now( + limiter: glimit.RateLimiter(a, b, id), + now: Int, +) -> glimit.RateLimiter(a, b, id) { + glimit.RateLimiter(..limiter, now: fn() { now }) +} + // --------------------------------------------------------------------------- // In-memory store for testing the pluggable store backend // --------------------------------------------------------------------------- type StoreMsg { - StoreGet(key: String, reply: Subject(Result(bucket.BucketState, Nil))) - StoreSet( + StoreLockAndGet(key: String, reply: Subject(Result(bucket.BucketState, Nil))) + StoreSetAndUnlock( key: String, state: bucket.BucketState, ttl: Int, reply: Subject(Nil), ) - StoreLock(key: String, reply: Subject(Bool)) StoreUnlock(key: String, reply: Subject(Nil)) } @@ -770,31 +1178,28 @@ fn new_test_store() -> glimit.Store { }) |> actor.on_message(fn(state: StoreState, msg: StoreMsg) { case msg { - StoreGet(key, reply) -> { - case dict.get(state.data, key) { - Ok(v) -> actor.send(reply, Ok(v)) - Error(_) -> actor.send(reply, Error(Nil)) - } - actor.continue(state) - } - StoreSet(key, bucket_state, _ttl, reply) -> { - let data = dict.insert(state.data, key, bucket_state) - actor.send(reply, Nil) - actor.continue(StoreState(..state, data: data)) - } - StoreLock(key, reply) -> { + StoreLockAndGet(key, reply) -> { case dict.get(state.locks, key) { Ok(True) -> { - actor.send(reply, False) + actor.send(reply, Error(Nil)) actor.continue(state) } _ -> { let locks = dict.insert(state.locks, key, True) - actor.send(reply, True) + case dict.get(state.data, key) { + Ok(v) -> actor.send(reply, Ok(v)) + Error(_) -> actor.send(reply, Error(Nil)) + } actor.continue(StoreState(..state, locks: locks)) } } } + StoreSetAndUnlock(key, bucket_state, _ttl, reply) -> { + let data = dict.insert(state.data, key, bucket_state) + let locks = dict.delete(state.locks, key) + actor.send(reply, Nil) + actor.continue(StoreState(data: data, locks: locks)) + } StoreUnlock(key, reply) -> { let locks = dict.delete(state.locks, key) actor.send(reply, Nil) @@ -807,32 +1212,24 @@ fn new_test_store() -> glimit.Store { let store_subject = started.data bucket.Store( - get: fn(key) { + lock_and_get: fn(key) { let reply: Subject(Result(bucket.BucketState, Nil)) = process.new_subject() - process.send(store_subject, StoreGet(key, reply)) + process.send(store_subject, StoreLockAndGet(key, reply)) case process.receive(reply, 1000) { Ok(Ok(v)) -> Ok(option.Some(v)) Ok(Error(_)) -> Ok(option.None) Error(_) -> Error(Nil) } }, - set: fn(key, state, ttl) { + set_and_unlock: fn(key, state, ttl) { let reply = process.new_subject() - process.send(store_subject, StoreSet(key, state, ttl, reply)) + process.send(store_subject, StoreSetAndUnlock(key, state, ttl, reply)) case process.receive(reply, 1000) { Ok(_) -> Ok(Nil) Error(_) -> Error(Nil) } }, - lock: fn(key) { - let reply = process.new_subject() - process.send(store_subject, StoreLock(key, reply)) - case process.receive(reply, 1000) { - Ok(True) -> Ok(Nil) - _ -> Error(Nil) - } - }, unlock: fn(key) { let reply = process.new_subject() process.send(store_subject, StoreUnlock(key, reply)) @@ -902,14 +1299,8 @@ pub fn store_burst_limit_test() { fn(_) { "OK" } |> glimit.apply_built(limiter) - rate_limiter.set_now(limiter.rate_limiter_actor, 0) - func(Nil) |> should.equal("OK") func(Nil) |> should.equal("OK") func(Nil) |> should.equal("OK") - func(Nil) |> should.equal("Stop!") - - // After 1 second, 1 token refills - rate_limiter.set_now(limiter.rate_limiter_actor, 1000) func(Nil) |> should.equal("OK") func(Nil) |> should.equal("Stop!") }