From 21220bf4412344eb336c04ab3d26db893ce03dd4 Mon Sep 17 00:00:00 2001 From: dakshing Date: Sat, 20 Dec 2025 15:39:30 +0530 Subject: [PATCH] adding rate limiter --- java/pom.xml | 1 + java/rate-limiter/Dockerfile | 14 ++++ java/rate-limiter/README.md | 0 java/rate-limiter/docker-compose.yml | 22 +++++ java/rate-limiter/pom.xml | 80 +++++++++++++++++++ .../java/com/dsl/ratelimiter/Application.java | 32 ++++++++ .../factory/RateLimiterFactory.java | 43 ++++++++++ .../ratelimiter/handler/RateLimitHandler.java | 48 +++++++++++ .../dsl/ratelimiter/server/NettyServer.java | 49 ++++++++++++ .../strategy/RateLimiterStrategy.java | 11 +++ .../strategy/impl/LuaRateLimiter.java | 49 ++++++++++++ .../src/main/resources/lua/fixed_window.lua | 10 +++ .../src/main/resources/lua/leaking_bucket.lua | 23 ++++++ .../main/resources/lua/sliding_counter.lua | 23 ++++++ .../src/main/resources/lua/sliding_log.lua | 15 ++++ .../src/main/resources/lua/token_bucket.lua | 27 +++++++ java/url-shortener/Dockerfile | 10 +-- 17 files changed, 449 insertions(+), 8 deletions(-) create mode 100644 java/rate-limiter/Dockerfile create mode 100644 java/rate-limiter/README.md create mode 100644 java/rate-limiter/docker-compose.yml create mode 100644 java/rate-limiter/pom.xml create mode 100644 java/rate-limiter/src/main/java/com/dsl/ratelimiter/Application.java create mode 100644 java/rate-limiter/src/main/java/com/dsl/ratelimiter/factory/RateLimiterFactory.java create mode 100644 java/rate-limiter/src/main/java/com/dsl/ratelimiter/handler/RateLimitHandler.java create mode 100644 java/rate-limiter/src/main/java/com/dsl/ratelimiter/server/NettyServer.java create mode 100644 java/rate-limiter/src/main/java/com/dsl/ratelimiter/strategy/RateLimiterStrategy.java create mode 100644 java/rate-limiter/src/main/java/com/dsl/ratelimiter/strategy/impl/LuaRateLimiter.java create mode 100644 java/rate-limiter/src/main/resources/lua/fixed_window.lua create mode 100644 java/rate-limiter/src/main/resources/lua/leaking_bucket.lua create mode 100644 java/rate-limiter/src/main/resources/lua/sliding_counter.lua create mode 100644 java/rate-limiter/src/main/resources/lua/sliding_log.lua create mode 100644 java/rate-limiter/src/main/resources/lua/token_bucket.lua diff --git a/java/pom.xml b/java/pom.xml index 8e13dcd..7bc37dd 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -11,6 +11,7 @@ dsl-common url-shortener + rate-limiter diff --git a/java/rate-limiter/Dockerfile b/java/rate-limiter/Dockerfile new file mode 100644 index 0000000..64655d5 --- /dev/null +++ b/java/rate-limiter/Dockerfile @@ -0,0 +1,14 @@ +FROM maven:3.9.6-eclipse-temurin-21 AS build +WORKDIR /app + +COPY . . + +RUN mvn clean package -DskipTests -pl rate-limiter -am + +FROM eclipse-temurin:21-jre-alpine +WORKDIR /app +COPY --from=build /app/rate-limiter/target/rate-limiter-1.0-SNAPSHOT.jar app.jar +COPY --from=build /app/rate-limiter/target/lib lib + + +ENTRYPOINT ["java", "-cp", "app.jar:lib/*", "com.dsl.ratelimiter.Application"] \ No newline at end of file diff --git a/java/rate-limiter/README.md b/java/rate-limiter/README.md new file mode 100644 index 0000000..e69de29 diff --git a/java/rate-limiter/docker-compose.yml b/java/rate-limiter/docker-compose.yml new file mode 100644 index 0000000..e1a34aa --- /dev/null +++ b/java/rate-limiter/docker-compose.yml @@ -0,0 +1,22 @@ +version: '3.8' + +services: + # 1. Redis + redis: + image: redis:alpine + container_name: limiter-redis + ports: + - "6379:6379" + + # 2. Rate Limiter App + app: + container_name: rate-limiter-app + build: + context: .. + dockerfile: rate-limiter/Dockerfile + ports: + - "8080:8080" + depends_on: + - redis + environment: + - REDIS_HOST=redis \ No newline at end of file diff --git a/java/rate-limiter/pom.xml b/java/rate-limiter/pom.xml new file mode 100644 index 0000000..f7044b9 --- /dev/null +++ b/java/rate-limiter/pom.xml @@ -0,0 +1,80 @@ + + + 4.0.0 + + + + com.dsl + distributed-systems-lab + 1.0-SNAPSHOT + + + com.dsl.ratelimiter + rate-limiter + 1.0-SNAPSHOT + + + 6.3.2.RELEASE + 4.1.107.Final + 2.0.12 + + + + + io.lettuce + lettuce-core + ${lettuce.version} + + + + org.slf4j + slf4j-simple + ${slf4j.version} + + + + io.netty + netty-codec-http + ${netty.version} + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + 3.6.1 + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/lib + + + + + + org.apache.maven.plugins + maven-jar-plugin + 3.3.0 + + + + true + lib/ + com.dsl.ratelimiter.Application + + + + + + + + \ No newline at end of file diff --git a/java/rate-limiter/src/main/java/com/dsl/ratelimiter/Application.java b/java/rate-limiter/src/main/java/com/dsl/ratelimiter/Application.java new file mode 100644 index 0000000..9f4ae70 --- /dev/null +++ b/java/rate-limiter/src/main/java/com/dsl/ratelimiter/Application.java @@ -0,0 +1,32 @@ +package com.dsl.ratelimiter; + +import com.dsl.ratelimiter.factory.RateLimiterFactory; +import com.dsl.ratelimiter.handler.RateLimitHandler; +import com.dsl.ratelimiter.server.NettyServer; +import com.dsl.ratelimiter.strategy.RateLimiterStrategy; +import io.lettuce.core.RedisClient; +import io.lettuce.core.api.StatefulRedisConnection; + +public class Application { + + public static void main(String[] args) throws Exception { + int port = 8080; + String redisHost = System.getenv().getOrDefault("REDIS_HOST", "localhost"); + String redisUrl = "redis://" + redisHost + ":6379"; + + System.out.println("Starting Rate Limiter on port " + port); + System.out.println("Connecting to Redis at " + redisHost + "..."); + + try (RedisClient redisClient = RedisClient.create(redisUrl)) { + StatefulRedisConnection connection = redisClient.connect(); + + RateLimiterFactory factory = new RateLimiterFactory(connection.sync()); + RateLimiterStrategy rateLimiter = factory.get(RateLimiterFactory.Type.TOKEN_BUCKET); + + System.out.println("Token Bucket Strategy Loaded."); + + NettyServer server = new NettyServer<>(port, new RateLimitHandler(rateLimiter)); + server.start(); + } + } +} \ No newline at end of file diff --git a/java/rate-limiter/src/main/java/com/dsl/ratelimiter/factory/RateLimiterFactory.java b/java/rate-limiter/src/main/java/com/dsl/ratelimiter/factory/RateLimiterFactory.java new file mode 100644 index 0000000..939c131 --- /dev/null +++ b/java/rate-limiter/src/main/java/com/dsl/ratelimiter/factory/RateLimiterFactory.java @@ -0,0 +1,43 @@ +package com.dsl.ratelimiter.factory; + +import com.dsl.ratelimiter.strategy.impl.LuaRateLimiter; +import com.dsl.ratelimiter.strategy.RateLimiterStrategy; +import io.lettuce.core.api.sync.RedisCommands; + +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +public class RateLimiterFactory { + + public enum Type { TOKEN_BUCKET, LEAKING_BUCKET, FIXED_WINDOW, SLIDING_LOG, SLIDING_COUNTER } + + private final RedisCommands redis; + private final Map scriptShas = new HashMap<>(); + + public RateLimiterFactory(RedisCommands redis) { + this.redis = redis; + load(Type.TOKEN_BUCKET, "token_bucket.lua"); + load(Type.LEAKING_BUCKET, "leaking_bucket.lua"); + load(Type.FIXED_WINDOW, "fixed_window.lua"); + load(Type.SLIDING_LOG, "sliding_log.lua"); + load(Type.SLIDING_COUNTER, "sliding_counter.lua"); + } + + private void load(Type type, String file) { + try (InputStream is = getClass().getResourceAsStream("/lua/" + file)) { + if (is == null) throw new RuntimeException("Script not found: " + file); + String script = new String(is.readAllBytes(), StandardCharsets.UTF_8); + scriptShas.put(type, redis.scriptLoad(script)); + } catch (Exception e) { + throw new RuntimeException("Failed to load script: " + file, e); + } + } + + public RateLimiterStrategy get(Type type) { + String sha = scriptShas.get(type); + boolean needsMillis = (type == Type.SLIDING_LOG); + return new LuaRateLimiter(redis, sha, needsMillis); + } +} \ No newline at end of file diff --git a/java/rate-limiter/src/main/java/com/dsl/ratelimiter/handler/RateLimitHandler.java b/java/rate-limiter/src/main/java/com/dsl/ratelimiter/handler/RateLimitHandler.java new file mode 100644 index 0000000..25df2eb --- /dev/null +++ b/java/rate-limiter/src/main/java/com/dsl/ratelimiter/handler/RateLimitHandler.java @@ -0,0 +1,48 @@ +package com.dsl.ratelimiter.handler; + +import com.dsl.ratelimiter.strategy.RateLimiterStrategy; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.*; +import io.netty.util.CharsetUtil; + +@ChannelHandler.Sharable +public class RateLimitHandler extends SimpleChannelInboundHandler { + + private final RateLimiterStrategy strategy; + private final long capacity; + private final long rate; + + public RateLimitHandler(RateLimiterStrategy strategy) { + this.strategy = strategy; + this.capacity = 10; + this.rate = 1; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) { + String userId = "anonymous"; + if (req.headers().contains("X-User-ID")) { + userId = req.headers().get("X-User-ID"); + } + + String key = "rate_limit:" + userId; + + boolean allowed = strategy.isAllowed(key, capacity, rate); + + if (allowed) { + sendResponse(ctx, HttpResponseStatus.OK, "Request Allowed for " + userId); + } else { + sendResponse(ctx, HttpResponseStatus.TOO_MANY_REQUESTS, "Rate Limit Exceeded"); + } + } + + private void sendResponse(ChannelHandlerContext ctx, HttpResponseStatus status, String content) { + FullHttpResponse response = new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, status, Unpooled.copiedBuffer(content, CharsetUtil.UTF_8)); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain"); + response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); + ctx.writeAndFlush(response); + } +} \ No newline at end of file diff --git a/java/rate-limiter/src/main/java/com/dsl/ratelimiter/server/NettyServer.java b/java/rate-limiter/src/main/java/com/dsl/ratelimiter/server/NettyServer.java new file mode 100644 index 0000000..b4e7693 --- /dev/null +++ b/java/rate-limiter/src/main/java/com/dsl/ratelimiter/server/NettyServer.java @@ -0,0 +1,49 @@ +package com.dsl.ratelimiter.server; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpServerCodec; + +public class NettyServer> { + + private final int port; + private final T handler; + + public NettyServer(int port, T handler) { + this.port = port; + this.handler = handler; + } + + public void start() throws Exception { + EventLoopGroup bossGroup = new NioEventLoopGroup(1); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + + try { + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ch.pipeline().addLast(new HttpServerCodec()); + ch.pipeline().addLast(new HttpObjectAggregator(65536)); + ch.pipeline().addLast(handler); + } + }); + + ChannelFuture f = b.bind(port).sync(); + f.channel().closeFuture().sync(); + } finally { + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + } + +} diff --git a/java/rate-limiter/src/main/java/com/dsl/ratelimiter/strategy/RateLimiterStrategy.java b/java/rate-limiter/src/main/java/com/dsl/ratelimiter/strategy/RateLimiterStrategy.java new file mode 100644 index 0000000..b89c1b0 --- /dev/null +++ b/java/rate-limiter/src/main/java/com/dsl/ratelimiter/strategy/RateLimiterStrategy.java @@ -0,0 +1,11 @@ +package com.dsl.ratelimiter.strategy; + +public interface RateLimiterStrategy { + /** + * @param key Unique identifier (e.g., "ip:127.0.0.1") + * @param limit Max requests + * @param periodSeconds Window size + * @return true if allowed, false if blocked + */ + boolean isAllowed(String key, long limit, long periodSeconds); +} \ No newline at end of file diff --git a/java/rate-limiter/src/main/java/com/dsl/ratelimiter/strategy/impl/LuaRateLimiter.java b/java/rate-limiter/src/main/java/com/dsl/ratelimiter/strategy/impl/LuaRateLimiter.java new file mode 100644 index 0000000..694b2d0 --- /dev/null +++ b/java/rate-limiter/src/main/java/com/dsl/ratelimiter/strategy/impl/LuaRateLimiter.java @@ -0,0 +1,49 @@ +package com.dsl.ratelimiter.strategy.impl; + +import com.dsl.ratelimiter.strategy.RateLimiterStrategy; +import io.lettuce.core.ScriptOutputType; +import io.lettuce.core.api.sync.RedisCommands; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; + +public class LuaRateLimiter implements RateLimiterStrategy { + private static final Logger logger = LoggerFactory.getLogger(LuaRateLimiter.class); + + private final RedisCommands redis; + private final String scriptSha; + private final boolean needsMillis; + + public LuaRateLimiter(RedisCommands redis, String scriptSha, boolean needsMillis) { + this.redis = redis; + this.scriptSha = scriptSha; + this.needsMillis = needsMillis; + } + + @Override + public boolean isAllowed(String key, long limit, long period) { + long now = needsMillis ? Instant.now().toEpochMilli() : Instant.now().getEpochSecond(); + + try { + // Execute Lua Script + Object result = redis.evalsha(scriptSha, ScriptOutputType.INTEGER, + new String[]{key}, + String.valueOf(limit), + String.valueOf(period), + String.valueOf(now) + ); + + if (result instanceof Long) { + return ((Long) result) == 1L; + } + return false; + + } catch (Exception e) { + // Circuit Breaker - Fail Open + // If Redis is down/slow, we allow traffic to preserve user experience. + logger.error("Redis rate limiter failed for key: {}. Defaulting to Allow.", key, e); + return true; + } + } +} \ No newline at end of file diff --git a/java/rate-limiter/src/main/resources/lua/fixed_window.lua b/java/rate-limiter/src/main/resources/lua/fixed_window.lua new file mode 100644 index 0000000..4bbc516 --- /dev/null +++ b/java/rate-limiter/src/main/resources/lua/fixed_window.lua @@ -0,0 +1,10 @@ +local key = KEYS[1] +local limit = tonumber(ARGV[1]) +local window = tonumber(ARGV[2]) + +local current = redis.call('INCR', key) +if current == 1 then + redis.call('EXPIRE', key, window) -- Setting the window at the first request +end + +if current <= limit then return 1 else return 0 end \ No newline at end of file diff --git a/java/rate-limiter/src/main/resources/lua/leaking_bucket.lua b/java/rate-limiter/src/main/resources/lua/leaking_bucket.lua new file mode 100644 index 0000000..4a55f37 --- /dev/null +++ b/java/rate-limiter/src/main/resources/lua/leaking_bucket.lua @@ -0,0 +1,23 @@ +local key = KEYS[1] +local capacity = tonumber(ARGV[1]) +local leak_rate = tonumber(ARGV[2]) +local now = tonumber(ARGV[3]) + +local last_leak = tonumber(redis.call('HGET', key, 'last_leak') or 0) +local water = tonumber(redis.call('HGET', key, 'water') or 0) + +local elapsed = math.max(0, now - last_leak) -- time since last request came in +local leaked = elapsed * leak_rate + +if leaked > 0 then + water = math.max(0, water - leaked) + last_leak = now +end + +if water < capacity then + water = water + 1 + redis.call('HSET', key, 'last_leak', last_leak, 'water', water) + redis.call('EXPIRE', key, 60) + return 1 +end +return 0 \ No newline at end of file diff --git a/java/rate-limiter/src/main/resources/lua/sliding_counter.lua b/java/rate-limiter/src/main/resources/lua/sliding_counter.lua new file mode 100644 index 0000000..96e12a4 --- /dev/null +++ b/java/rate-limiter/src/main/resources/lua/sliding_counter.lua @@ -0,0 +1,23 @@ +local key = KEYS[1] +local limit = tonumber(ARGV[1]) +local window = tonumber(ARGV[2]) +local now = tonumber(ARGV[3]) -- Seconds + +local current_window = math.floor(now / window) -- bucketing time into windows +local prev_window = current_window - 1 + +local cur_key = key .. ':' .. current_window +local prev_key = key .. ':' .. prev_window + +local cur_count = tonumber(redis.call('GET', cur_key) or 0) +local prev_count = tonumber(redis.call('GET', prev_key) or 0) + +local weight = 1 - ((now % window) / window) +local estimate = cur_count + (prev_count * weight) + +if estimate < limit then + redis.call('INCR', cur_key) + redis.call('EXPIRE', cur_key, window * 2) + return 1 +end +return 0 \ No newline at end of file diff --git a/java/rate-limiter/src/main/resources/lua/sliding_log.lua b/java/rate-limiter/src/main/resources/lua/sliding_log.lua new file mode 100644 index 0000000..4fd7f93 --- /dev/null +++ b/java/rate-limiter/src/main/resources/lua/sliding_log.lua @@ -0,0 +1,15 @@ +local key = KEYS[1] +local limit = tonumber(ARGV[1]) +local window = tonumber(ARGV[2]) +local now = tonumber(ARGV[3]) -- Milliseconds +local window_start = now - (window * 1000) + +redis.call('ZREMRANGEBYSCORE', key, '-inf', window_start) -- Remove old requests +local count = redis.call('ZCARD', key) -- Get number of requests in the window + +if count < limit then + redis.call('ZADD', key, now, now) + redis.call('EXPIRE', key, window) + return 1 +end +return 0 \ No newline at end of file diff --git a/java/rate-limiter/src/main/resources/lua/token_bucket.lua b/java/rate-limiter/src/main/resources/lua/token_bucket.lua new file mode 100644 index 0000000..11f090c --- /dev/null +++ b/java/rate-limiter/src/main/resources/lua/token_bucket.lua @@ -0,0 +1,27 @@ +local key = KEYS[1] +local max_tokens = tonumber(ARGV[1]) +local refill_rate = tonumber(ARGV[2]) -- tokens per second +local now = tonumber(ARGV[3]) +local requested = 1 + +local last_refill = tonumber(redis.call('HGET', key, 'last_refill') or 0) +local tokens = tonumber(redis.call('HGET', key, 'tokens') or max_tokens) + +if last_refill > 0 then + local elapsed = math.max(0, now - last_refill) + local refill = elapsed * refill_rate + if refill > 0 then + tokens = math.min(max_tokens, tokens + refill) + last_refill = now + end +else + last_refill = now +end + +if tokens >= requested then + tokens = tokens - requested + redis.call('HSET', key, 'last_refill', last_refill, 'tokens', tokens) + redis.call('EXPIRE', key, 60) + return 1 +end +return 0 \ No newline at end of file diff --git a/java/url-shortener/Dockerfile b/java/url-shortener/Dockerfile index 55118fa..a227660 100644 --- a/java/url-shortener/Dockerfile +++ b/java/url-shortener/Dockerfile @@ -2,15 +2,9 @@ FROM maven:3.9.6-eclipse-temurin-21 AS build WORKDIR /app -COPY pom.xml . +COPY . . -COPY dsl-common/pom.xml dsl-common/pom.xml -COPY dsl-common/src dsl-common/src - -COPY url-shortener/pom.xml url-shortener/pom.xml -COPY url-shortener/src url-shortener/src - -RUN mvn clean package -DskipTests +RUN mvn clean package -DskipTests -pl url-shortener -am # --- Runtime Stage --- FROM eclipse-temurin:21-jre-alpine