diff --git a/bmq-sdk/src/test/docker/config/bmqbrkrcfg.json b/bmq-sdk/src/test/docker/config/bmqbrkrcfg.json
index 82f4153..df17713 100644
--- a/bmq-sdk/src/test/docker/config/bmqbrkrcfg.json
+++ b/bmq-sdk/src/test/docker/config/bmqbrkrcfg.json
@@ -1,6 +1,6 @@
{
"taskConfig": {
- "allocatorType": "STACKTRACETEST",
+ "allocatorType": "COUNTING",
"allocationLimit": 34359738368,
"logController": {
"fileName": "/var/local/bmq/logs/logs.%T.%p",
diff --git a/bmq-sdk/src/test/java/com/bloomberg/bmq/it/util/BmqBrokerContainer.java b/bmq-sdk/src/test/java/com/bloomberg/bmq/it/util/BmqBrokerContainer.java
index 3b29f17..ca24ea2 100644
--- a/bmq-sdk/src/test/java/com/bloomberg/bmq/it/util/BmqBrokerContainer.java
+++ b/bmq-sdk/src/test/java/com/bloomberg/bmq/it/util/BmqBrokerContainer.java
@@ -53,6 +53,7 @@ public class BmqBrokerContainer implements BmqBroker {
private static final String OUTPUT_FILENAME = "output.log";
private static final long MAX_CONTAINER_WAIT_TIME_MS = 5000;
private static final long CONTAINER_HEALTH_CHECK_DT_MS = 100;
+ private static final String BROKER_READY_MESSAGE = "Cluster (local) is available";
private final SessionOptions sessionOptions;
private final DockerClient client;
@@ -188,12 +189,13 @@ public void start() {
logger.info("Starting container '{}'...", containerName);
client.startContainerCmd(containerId).exec();
+ long startTime = System.currentTimeMillis();
try {
for (long totalTimeMs = 0;
totalTimeMs < MAX_CONTAINER_WAIT_TIME_MS;
totalTimeMs += CONTAINER_HEALTH_CHECK_DT_MS) {
- Thread.sleep(CONTAINER_HEALTH_CHECK_DT_MS);
+ // Check if container is still running
InspectContainerResponse resp = client.inspectContainerCmd(containerId).exec();
if (!resp.getState().getRunning()) {
logger.error(
@@ -201,15 +203,31 @@ public void start() {
containerId,
resp.getState().getStatus());
throw new RuntimeException(
- String.format("Failed to start container '{}'", containerId));
+ String.format("Failed to start container '%s'", containerId));
+ }
+
+ // Check logs for broker readiness message
+ if (isBrokerReady()) {
+ long elapsed = System.currentTimeMillis() - startTime;
+ logger.info("Container '{}' broker is ready after {} ms", containerId, elapsed);
+ break;
+ }
+
+ if (totalTimeMs + CONTAINER_HEALTH_CHECK_DT_MS >= MAX_CONTAINER_WAIT_TIME_MS) {
+ throw new RuntimeException(
+ String.format(
+ "Timeout waiting for broker ready in container '%s'",
+ containerId));
}
+
+ Thread.sleep(CONTAINER_HEALTH_CHECK_DT_MS);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
- logger.info("Container '{}' is running", containerId);
+ logger.info("Container '{}' broker is ready", containerId);
// For BlazingMQ broker running in container default tier should be the 'lcl-{guest
// hostname}'
@@ -357,6 +375,29 @@ public void onNext(Frame item) {
}
}
+ private boolean isBrokerReady() {
+ final boolean[] ready = {false};
+ ResultCallback.Adapter callback =
+ new ResultCallback.Adapter() {
+ @Override
+ public void onNext(Frame item) {
+ if (item.toString().contains(BROKER_READY_MESSAGE)) {
+ ready[0] = true;
+ }
+ }
+ };
+ try {
+ client.logContainerCmd(containerId)
+ .withStdOut(true)
+ .withStdErr(true)
+ .exec(callback)
+ .awaitCompletion();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ return ready[0];
+ }
+
private String getHostname() {
// Get container hostname
logger.info("Get '{}' container hostname", containerName);