From d75cb4fd5d33f19dc2655b0a9e939c5adb91ebb5 Mon Sep 17 00:00:00 2001 From: marr97 Date: Wed, 11 Mar 2026 18:02:37 +0300 Subject: [PATCH 1/2] feature: add virtual threads to LoadEngine and validation tests --- .../java/com/volta/engine/LoadEngine.java | 34 ++++++++++++----- src/test/java/com/volta/HttpSenderTest.java | 35 ----------------- .../java/com/volta/engine/LoadEngineTest.java | 38 ++++++++++++++++++- 3 files changed, 61 insertions(+), 46 deletions(-) delete mode 100644 src/test/java/com/volta/HttpSenderTest.java diff --git a/src/main/java/com/volta/engine/LoadEngine.java b/src/main/java/com/volta/engine/LoadEngine.java index 12d4194..ebdedc1 100644 --- a/src/main/java/com/volta/engine/LoadEngine.java +++ b/src/main/java/com/volta/engine/LoadEngine.java @@ -2,6 +2,8 @@ import com.volta.http.HttpSender; import java.net.http.HttpResponse; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,25 +37,39 @@ public void start() { long endTime = System.nanoTime() + (long) durationSeconds * 1_000_000_000L; long sendNextTime = System.nanoTime(); - try (HttpSender sender = new HttpSender()) { + try (HttpSender sender = new HttpSender(); + ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) { + while (running && System.nanoTime() < endTime) { - while (System.nanoTime() < sendNextTime) { - // busy-wait - } + long waitMillis = (sendNextTime - System.nanoTime()) / 1_000_000; - try { - HttpResponse response = sender.send(url); - log.info("Status: {}, Body: {}", response.statusCode(), response.body()); - } catch (Exception e) { - log.error("Request failed", e); + if (waitMillis > 0) { + try { + Thread.sleep(waitMillis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } } + + executor.submit( + () -> { + try { + HttpResponse response = sender.send(url); + log.info("Status: {}, Body: {}", response.statusCode(), response.body()); + } catch (Exception e) { + log.error("Request failed", e); + } + }); + sendNextTime += intervalNanos; } } catch (Exception e) { log.error("Sender closed or failed to initialize", e); } + running = false; log.info("Test finished"); } diff --git a/src/test/java/com/volta/HttpSenderTest.java b/src/test/java/com/volta/HttpSenderTest.java deleted file mode 100644 index 4b1f931..0000000 --- a/src/test/java/com/volta/HttpSenderTest.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.volta; - -import static com.github.tomakehurst.wiremock.client.WireMock.*; -import static org.junit.jupiter.api.Assertions.*; - -import com.github.tomakehurst.wiremock.junit5.WireMockTest; -import java.net.http.HttpResponse; -import org.junit.jupiter.api.Test; - -@WireMockTest(httpPort = 8089) -class HttpSenderTest { - - private final HttpSender sender = new HttpSender(); - - @Test - void sendReturns200ForValidUrl() throws Exception { - stubFor(get("/test").willReturn(ok("Hello from mock!"))); - - HttpResponse response = sender.send("http://localhost:8089/test"); - assertEquals(200, response.statusCode()); - } - - @Test - void responseBodyIsNotEmpty() throws Exception { - stubFor(get("/test").willReturn(ok("Hello from mock!"))); - - HttpResponse response = sender.send("http://localhost:8089/test"); - assertFalse(response.body().isEmpty()); - } - - @Test - void sendThrowsOnInvalidUrl() { - assertThrows(Exception.class, () -> sender.send("http://localhost:1")); - } -} diff --git a/src/test/java/com/volta/engine/LoadEngineTest.java b/src/test/java/com/volta/engine/LoadEngineTest.java index 30b1b61..f1b925e 100644 --- a/src/test/java/com/volta/engine/LoadEngineTest.java +++ b/src/test/java/com/volta/engine/LoadEngineTest.java @@ -92,13 +92,47 @@ void shouldHandleServerErrors() { wireMock.stubFor(get("/error").willReturn(serverError())); LoadEngine engine = new LoadEngine(baseUrl + "/error", 10, 2); - assertDoesNotThrow(() -> engine.start()); + assertDoesNotThrow(engine::start); } @Test void shouldHandleInvalidUrl() { LoadEngine engine = new LoadEngine("http://invalid-host-that-does-not-exist:9999/test", 5, 2); - assertDoesNotThrow(() -> engine.start()); + assertDoesNotThrow(engine::start); + } + + @Test + void shouldRejectNullUrl() { + assertThrows(IllegalArgumentException.class, () -> new LoadEngine(null, 10, 5)); + } + + @Test + void shouldRejectEmptyUrl() { + assertThrows(IllegalArgumentException.class, () -> new LoadEngine("", 10, 5)); + } + + @Test + void shouldRejectZeroRps() { + assertThrows( + IllegalArgumentException.class, () -> new LoadEngine("http://localhost/test", 0, 5)); + } + + @Test + void shouldRejectNegativeRps() { + assertThrows( + IllegalArgumentException.class, () -> new LoadEngine("http://localhost/test", -1, 5)); + } + + @Test + void shouldRejectZeroDuration() { + assertThrows( + IllegalArgumentException.class, () -> new LoadEngine("http://localhost/test", 10, 0)); + } + + @Test + void shouldRejectNegativeDuration() { + assertThrows( + IllegalArgumentException.class, () -> new LoadEngine("http://localhost/test", 10, -1)); } } From 74f9e56fd94e606afd11a1dbe6b1654a9274a530 Mon Sep 17 00:00:00 2001 From: marr97 Date: Sat, 14 Mar 2026 18:35:51 +0300 Subject: [PATCH 2/2] fix: use virtual threads with semaphore throttling in load engine --- .../java/com/volta/engine/LoadEngine.java | 28 +++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/volta/engine/LoadEngine.java b/src/main/java/com/volta/engine/LoadEngine.java index ebdedc1..cf66585 100644 --- a/src/main/java/com/volta/engine/LoadEngine.java +++ b/src/main/java/com/volta/engine/LoadEngine.java @@ -4,6 +4,7 @@ import java.net.http.HttpResponse; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -14,20 +15,21 @@ public class LoadEngine { private final int targetRps; private final int durationSeconds; private volatile boolean running = false; + private static final int MAX_CONCURRENT_REQUESTS = 1000; - public LoadEngine(String URL, int targetRPS, int durationSeconds) { - if (URL == null || URL.isBlank()) { + public LoadEngine(String url, int targetRps, int durationSeconds) { + if (url == null || url.isBlank()) { throw new IllegalArgumentException("URL must not be empty"); } - if (targetRPS <= 0) { + if (targetRps <= 0) { throw new IllegalArgumentException("RPS must be positive"); } if (durationSeconds <= 0) { throw new IllegalArgumentException("Duration must be positive"); } - this.url = URL; - this.targetRps = targetRPS; + this.url = url; + this.targetRps = targetRps; this.durationSeconds = durationSeconds; } @@ -37,6 +39,8 @@ public void start() { long endTime = System.nanoTime() + (long) durationSeconds * 1_000_000_000L; long sendNextTime = System.nanoTime(); + Semaphore semaphore = new Semaphore(MAX_CONCURRENT_REQUESTS); + try (HttpSender sender = new HttpSender(); ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) { @@ -53,6 +57,13 @@ public void start() { } } + try { + semaphore.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + executor.submit( () -> { try { @@ -60,15 +71,22 @@ public void start() { log.info("Status: {}, Body: {}", response.statusCode(), response.body()); } catch (Exception e) { log.error("Request failed", e); + } finally { + semaphore.release(); } }); + if (System.nanoTime() - sendNextTime > 1_000_000_000L) { + sendNextTime = System.nanoTime(); + } + sendNextTime += intervalNanos; } } catch (Exception e) { log.error("Sender closed or failed to initialize", e); } + // try-with-resources handles executor.close() and sender.close() running = false; log.info("Test finished"); }