From 1388e29b058c0ccf7a8fe1ff38eff6cb4f7ce7d4 Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Fri, 27 Feb 2026 21:09:39 +0000 Subject: [PATCH] Fix Rate Limiting bypass via multiple X-Forwarded-For headers Currently, the rate limiter uses Starlette's `request.headers.get("X-Forwarded-For")` which only extracts the *first* header value. If an attacker sends multiple `X-Forwarded-For` headers with spoofed IPs, the proxy (Caddy) might append its real IP to a subsequent header. The rate limiter then relies on the spoofed IP, successfully bypassing the limit. This patch fixes the vulnerability by using `request.headers.getlist("X-Forwarded-For")` to collect all headers, join them with commas, and extract the genuinely appended last IP. Included a new unittest to guarantee robust parsing and protection against multiple and comma-separated header variations. --- app/main.py | 6 +++-- tests/test_rate_limit.py | 56 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 2 deletions(-) create mode 100644 tests/test_rate_limit.py diff --git a/app/main.py b/app/main.py index 4492c48..c0ffb19 100644 --- a/app/main.py +++ b/app/main.py @@ -133,8 +133,10 @@ async def rate_limit_middleware(request: Request, call_next): # AVA runs behind Caddy (trusted proxy). Caddy appends the real client IP to X-Forwarded-For. # We take the last IP in the list as the client IP. - if xff := request.headers.get("X-Forwarded-For"): - client_ip = xff.split(",")[-1].strip() + # We use getlist() to combine all X-Forwarded-For headers, preventing spoofing via multiple headers. + xff_list = request.headers.getlist("X-Forwarded-For") + if xff_list: + client_ip = ",".join(xff_list).split(",")[-1].strip() if not _rate_limiter.is_allowed(client_ip): logger.warning(f"Rate limit exceeded for {client_ip}") diff --git a/tests/test_rate_limit.py b/tests/test_rate_limit.py new file mode 100644 index 0000000..7929785 --- /dev/null +++ b/tests/test_rate_limit.py @@ -0,0 +1,56 @@ +import unittest +from fastapi.testclient import TestClient +from app.main import app, _rate_limiter + +class TestRateLimitBypass(unittest.TestCase): + def setUp(self): + # Reset the rate limiter before each test + _rate_limiter._hits.clear() + self.client = TestClient(app) + + def test_multiple_x_forwarded_for_headers(self): + """ + Test that an attacker cannot bypass the rate limiter by sending + multiple X-Forwarded-For headers. Starlette's `request.headers.get()` + only returns the first header value, so we must use `getlist()`. + """ + # Make 35 requests where the "attacker" changes the first X-Forwarded-For + # header, but the proxy (simulated) appends the real IP in a second header. + + real_ip = b"1.2.3.4" + + for i in range(35): + # TestClient accepts headers as a list of tuples to allow multiple headers with the same name + headers = [ + (b"x-forwarded-for", f"spoofed_{i}".encode("utf-8")), + (b"x-forwarded-for", real_ip) + ] + + response = self.client.get("/health", headers=headers) + + # The rate limiter is set to 30 requests per minute. + # So the 31st request and beyond should be blocked. + if i < 30: + self.assertEqual(response.status_code, 200, f"Request {i+1} failed early with status {response.status_code}") + else: + self.assertEqual(response.status_code, 429, f"Request {i+1} bypassed rate limiter! Expected 429, got {response.status_code}") + + def test_comma_separated_x_forwarded_for(self): + """ + Test that comma-separated X-Forwarded-For values correctly extract the last IP. + """ + for i in range(35): + # Simulate a single header with comma-separated values + headers = [ + (b"x-forwarded-for", f"spoofed_{i}, 1.2.3.4".encode("utf-8")), + ] + + response = self.client.get("/health", headers=headers) + + if i < 30: + self.assertEqual(response.status_code, 200) + else: + self.assertEqual(response.status_code, 429) + +if __name__ == '__main__': + unittest.main()