Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
<modules>
<module>dsl-common</module>
<module>url-shortener</module>
<module>rate-limiter</module>
</modules>

<properties>
Expand Down
14 changes: 14 additions & 0 deletions java/rate-limiter/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM maven:3.9.6-eclipse-temurin-21 AS build
WORKDIR /app

COPY . .

RUN mvn clean package -DskipTests -pl rate-limiter -am

FROM eclipse-temurin:21-jre-alpine
WORKDIR /app
COPY --from=build /app/rate-limiter/target/rate-limiter-1.0-SNAPSHOT.jar app.jar
COPY --from=build /app/rate-limiter/target/lib lib


ENTRYPOINT ["java", "-cp", "app.jar:lib/*", "com.dsl.ratelimiter.Application"]
Empty file added java/rate-limiter/README.md
Empty file.
22 changes: 22 additions & 0 deletions java/rate-limiter/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
version: '3.8'

services:
# 1. Redis
redis:
image: redis:alpine
container_name: limiter-redis
ports:
- "6379:6379"

# 2. Rate Limiter App
app:
container_name: rate-limiter-app
build:
context: ..
dockerfile: rate-limiter/Dockerfile
ports:
- "8080:8080"
depends_on:
- redis
environment:
- REDIS_HOST=redis
80 changes: 80 additions & 0 deletions java/rate-limiter/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<!-- Reference to the parent POM -->
<parent>
<groupId>com.dsl</groupId>
<artifactId>distributed-systems-lab</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>

<groupId>com.dsl.ratelimiter</groupId>
<artifactId>rate-limiter</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<lettuce.version>6.3.2.RELEASE</lettuce.version>
<netty.version>4.1.107.Final</netty.version>
<slf4j.version>2.0.12</slf4j.version>
</properties>

<dependencies>
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>${lettuce.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
<version>${netty.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.6.1</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.dsl.ratelimiter.Application</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.dsl.ratelimiter;

import com.dsl.ratelimiter.factory.RateLimiterFactory;
import com.dsl.ratelimiter.handler.RateLimitHandler;
import com.dsl.ratelimiter.server.NettyServer;
import com.dsl.ratelimiter.strategy.RateLimiterStrategy;
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;

public class Application {

public static void main(String[] args) throws Exception {
int port = 8080;
String redisHost = System.getenv().getOrDefault("REDIS_HOST", "localhost");
String redisUrl = "redis://" + redisHost + ":6379";

System.out.println("Starting Rate Limiter on port " + port);
System.out.println("Connecting to Redis at " + redisHost + "...");

try (RedisClient redisClient = RedisClient.create(redisUrl)) {
StatefulRedisConnection<String, String> connection = redisClient.connect();

RateLimiterFactory factory = new RateLimiterFactory(connection.sync());
RateLimiterStrategy rateLimiter = factory.get(RateLimiterFactory.Type.TOKEN_BUCKET);

System.out.println("Token Bucket Strategy Loaded.");

NettyServer<RateLimitHandler> server = new NettyServer<>(port, new RateLimitHandler(rateLimiter));
server.start();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.dsl.ratelimiter.factory;

import com.dsl.ratelimiter.strategy.impl.LuaRateLimiter;
import com.dsl.ratelimiter.strategy.RateLimiterStrategy;
import io.lettuce.core.api.sync.RedisCommands;

import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

public class RateLimiterFactory {

public enum Type { TOKEN_BUCKET, LEAKING_BUCKET, FIXED_WINDOW, SLIDING_LOG, SLIDING_COUNTER }

private final RedisCommands<String, String> redis;
private final Map<Type, String> scriptShas = new HashMap<>();

public RateLimiterFactory(RedisCommands<String, String> redis) {
this.redis = redis;
load(Type.TOKEN_BUCKET, "token_bucket.lua");
load(Type.LEAKING_BUCKET, "leaking_bucket.lua");
load(Type.FIXED_WINDOW, "fixed_window.lua");
load(Type.SLIDING_LOG, "sliding_log.lua");
load(Type.SLIDING_COUNTER, "sliding_counter.lua");
}

private void load(Type type, String file) {
try (InputStream is = getClass().getResourceAsStream("/lua/" + file)) {
if (is == null) throw new RuntimeException("Script not found: " + file);
String script = new String(is.readAllBytes(), StandardCharsets.UTF_8);
scriptShas.put(type, redis.scriptLoad(script));
} catch (Exception e) {
throw new RuntimeException("Failed to load script: " + file, e);
}
}

public RateLimiterStrategy get(Type type) {
String sha = scriptShas.get(type);
boolean needsMillis = (type == Type.SLIDING_LOG);
return new LuaRateLimiter(redis, sha, needsMillis);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.dsl.ratelimiter.handler;

import com.dsl.ratelimiter.strategy.RateLimiterStrategy;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;

@ChannelHandler.Sharable
public class RateLimitHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

private final RateLimiterStrategy strategy;
private final long capacity;
private final long rate;

public RateLimitHandler(RateLimiterStrategy strategy) {
this.strategy = strategy;
this.capacity = 10;
this.rate = 1;
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) {
String userId = "anonymous";
if (req.headers().contains("X-User-ID")) {
userId = req.headers().get("X-User-ID");
}

String key = "rate_limit:" + userId;

boolean allowed = strategy.isAllowed(key, capacity, rate);

if (allowed) {
sendResponse(ctx, HttpResponseStatus.OK, "Request Allowed for " + userId);
} else {
sendResponse(ctx, HttpResponseStatus.TOO_MANY_REQUESTS, "Rate Limit Exceeded");
}
}

private void sendResponse(ChannelHandlerContext ctx, HttpResponseStatus status, String content) {
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, status, Unpooled.copiedBuffer(content, CharsetUtil.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
ctx.writeAndFlush(response);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.dsl.ratelimiter.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;

public class NettyServer<T extends SimpleChannelInboundHandler<?>> {

private final int port;
private final T handler;

public NettyServer(int port, T handler) {
this.port = port;
this.handler = handler;
}

public void start() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new HttpObjectAggregator(65536));
ch.pipeline().addLast(handler);
}
});

ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.dsl.ratelimiter.strategy;

public interface RateLimiterStrategy {
/**
* @param key Unique identifier (e.g., "ip:127.0.0.1")
* @param limit Max requests
* @param periodSeconds Window size
* @return true if allowed, false if blocked
*/
boolean isAllowed(String key, long limit, long periodSeconds);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.dsl.ratelimiter.strategy.impl;

import com.dsl.ratelimiter.strategy.RateLimiterStrategy;
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.api.sync.RedisCommands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;

public class LuaRateLimiter implements RateLimiterStrategy {
private static final Logger logger = LoggerFactory.getLogger(LuaRateLimiter.class);

private final RedisCommands<String, String> redis;
private final String scriptSha;
private final boolean needsMillis;

public LuaRateLimiter(RedisCommands<String, String> redis, String scriptSha, boolean needsMillis) {
this.redis = redis;
this.scriptSha = scriptSha;
this.needsMillis = needsMillis;
}

@Override
public boolean isAllowed(String key, long limit, long period) {
long now = needsMillis ? Instant.now().toEpochMilli() : Instant.now().getEpochSecond();

try {
// Execute Lua Script
Object result = redis.evalsha(scriptSha, ScriptOutputType.INTEGER,
new String[]{key},
String.valueOf(limit),
String.valueOf(period),
String.valueOf(now)
);

if (result instanceof Long) {
return ((Long) result) == 1L;
}
return false;

} catch (Exception e) {
// Circuit Breaker - Fail Open
// If Redis is down/slow, we allow traffic to preserve user experience.
logger.error("Redis rate limiter failed for key: {}. Defaulting to Allow.", key, e);
return true;
}
}
}
10 changes: 10 additions & 0 deletions java/rate-limiter/src/main/resources/lua/fixed_window.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])

local current = redis.call('INCR', key)
if current == 1 then
redis.call('EXPIRE', key, window) -- Setting the window at the first request
end

if current <= limit then return 1 else return 0 end
Loading