Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bmq-sdk/src/test/docker/config/bmqbrkrcfg.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"taskConfig": {
"allocatorType": "STACKTRACETEST",
"allocatorType": "COUNTING",
"allocationLimit": 34359738368,
"logController": {
"fileName": "/var/local/bmq/logs/logs.%T.%p",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class BmqBrokerContainer implements BmqBroker {
private static final String OUTPUT_FILENAME = "output.log";
private static final long MAX_CONTAINER_WAIT_TIME_MS = 5000;
private static final long CONTAINER_HEALTH_CHECK_DT_MS = 100;
private static final String BROKER_READY_MESSAGE = "Cluster (local) is available";

private final SessionOptions sessionOptions;
private final DockerClient client;
Expand Down Expand Up @@ -188,28 +189,45 @@ public void start() {
logger.info("Starting container '{}'...", containerName);
client.startContainerCmd(containerId).exec();

long startTime = System.currentTimeMillis();
try {
for (long totalTimeMs = 0;
totalTimeMs < MAX_CONTAINER_WAIT_TIME_MS;
totalTimeMs += CONTAINER_HEALTH_CHECK_DT_MS) {
Thread.sleep(CONTAINER_HEALTH_CHECK_DT_MS);

// Check if container is still running
InspectContainerResponse resp = client.inspectContainerCmd(containerId).exec();
if (!resp.getState().getRunning()) {
logger.error(
"Container '{}' is not running, status = '{}'",
containerId,
resp.getState().getStatus());
throw new RuntimeException(
String.format("Failed to start container '{}'", containerId));
String.format("Failed to start container '%s'", containerId));
}

// Check logs for broker readiness message
if (isBrokerReady()) {
long elapsed = System.currentTimeMillis() - startTime;
logger.info("Container '{}' broker is ready after {} ms", containerId, elapsed);
break;
}

if (totalTimeMs + CONTAINER_HEALTH_CHECK_DT_MS >= MAX_CONTAINER_WAIT_TIME_MS) {
throw new RuntimeException(
String.format(
"Timeout waiting for broker ready in container '%s'",
containerId));
}

Thread.sleep(CONTAINER_HEALTH_CHECK_DT_MS);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}

logger.info("Container '{}' is running", containerId);
logger.info("Container '{}' broker is ready", containerId);

// For BlazingMQ broker running in container default tier should be the 'lcl-{guest
// hostname}'
Expand Down Expand Up @@ -357,6 +375,29 @@ public void onNext(Frame item) {
}
}

private boolean isBrokerReady() {
final boolean[] ready = {false};
ResultCallback.Adapter<Frame> callback =
new ResultCallback.Adapter<Frame>() {
@Override
public void onNext(Frame item) {
if (item.toString().contains(BROKER_READY_MESSAGE)) {
ready[0] = true;
}
}
};
try {
client.logContainerCmd(containerId)
.withStdOut(true)
.withStdErr(true)
.exec(callback)
.awaitCompletion();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return ready[0];
}

private String getHostname() {
// Get container hostname
logger.info("Get '{}' container hostname", containerName);
Expand Down