From 00ac1c42a4a5759fdece8cc0f582bf3bcde33aba Mon Sep 17 00:00:00 2001 From: Kyle Liberti Date: Mon, 18 Aug 2025 20:52:45 -0400 Subject: [PATCH] Migrate CruiseControlMetricsReporterTest to TestContainers Signed-off-by: Kyle Liberti --- build.gradle | 1 + .../CruiseControlMetricsReporterSslTest.java | 24 +- .../CruiseControlMetricsReporterTest.java | 123 ++++++--- .../utils/CCEmbeddedBrokerBuilder.java | 3 - .../utils/CCEmbeddedKraftCluster.java | 258 ++++++++++++++++++ .../utils/CCKafkaIntegrationTestHarness.java | 5 +- 6 files changed, 364 insertions(+), 50 deletions(-) create mode 100644 cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCEmbeddedKraftCluster.java diff --git a/build.gradle b/build.gradle index e8ed73fbad..40f941ab75 100644 --- a/build.gradle +++ b/build.gradle @@ -484,6 +484,7 @@ project(':cruise-control-metrics-reporter') { testImplementation "org.apache.kafka:kafka-storage:$kafkaVersion" testImplementation 'commons-io:commons-io:2.11.0' testImplementation "org.apache.zookeeper:zookeeper:${zookeeperVersion}" + testImplementation "org.testcontainers:kafka:1.21.3" testOutput sourceSets.test.output } diff --git a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterSslTest.java b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterSslTest.java index 6cb0501b45..7c16f30444 100644 --- a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterSslTest.java +++ b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterSslTest.java @@ -7,20 +7,16 @@ import java.io.File; import java.io.IOException; import java.util.Properties; -import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaTestUtils; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.security.auth.SecurityProtocol; -import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; -import org.apache.kafka.network.SocketServerConfigs; -import org.apache.kafka.server.config.ReplicationConfigs; -import org.apache.kafka.server.config.ServerLogConfigs; import org.junit.Assert; +import javax.net.ssl.KeyManagerFactory; import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG; import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_CONFIG; - public class CruiseControlMetricsReporterSslTest extends CruiseControlMetricsReporterTest { private File _trustStoreFile; @@ -37,7 +33,6 @@ public CruiseControlMetricsReporterSslTest() { @Override public Properties overridingProps() { Properties props = new Properties(); - int port = CCKafkaTestUtils.findLocalPort(); // We need to convert all the properties to the Cruise Control properties. setSecurityConfigs(props, "producer"); for (String configName : ProducerConfig.configNames()) { @@ -48,14 +43,19 @@ public Properties overridingProps() { } } props.setProperty(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, CruiseControlMetricsReporter.class.getName()); - props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "SSL://127.0.0.1:" + port); - props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), "127.0.0.1:" + port); + props.setProperty("listeners", "PLAINTEXT://0.0.0.0:9092,BROKER://0.0.0.0:9093,CONTROLLER://0.0.0.0:9094,TC-0://0.0.0.0:9095"); + props.setProperty("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,BROKER:SSL,CONTROLLER:PLAINTEXT,TC-0:SSL"); + props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), "127.0.0.1:" + "9093"); props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG), SecurityProtocol.SSL.name); props.setProperty(CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG, "100"); props.setProperty(CRUISE_CONTROL_METRICS_TOPIC_CONFIG, TOPIC); - props.setProperty(ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG, "1"); - props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1"); - props.setProperty(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "2"); + props.setProperty("log.flush.interval.messages", "1"); + props.setProperty("offsets.topic.replication.factor", "1"); + props.setProperty("default.replication.factor", "2"); + + // The Kafka brokers should use the same key manager algorithm as the host that generates the certs + props.setProperty(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, KeyManagerFactory.getDefaultAlgorithm()); + return props; } diff --git a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterTest.java b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterTest.java index b72263dbe1..b1b4b04d2f 100644 --- a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterTest.java +++ b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterTest.java @@ -7,21 +7,25 @@ import com.linkedin.kafka.cruisecontrol.metricsreporter.exception.KafkaTopicDescriptionException; import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.CruiseControlMetric; import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.MetricSerde; -import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCEmbeddedBroker; +import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCEmbeddedKraftCluster; import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaClientsIntegrationTestHarness; import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; -import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaTestUtils; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -34,18 +38,17 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; -import org.apache.kafka.network.SocketServerConfigs; -import org.apache.kafka.server.config.ReplicationConfigs; -import org.apache.kafka.server.config.ServerLogConfigs; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.testcontainers.kafka.KafkaContainer; import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter.DEFAULT_BOOTSTRAP_SERVERS_HOST; import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter.DEFAULT_BOOTSTRAP_SERVERS_PORT; import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter.getTopicDescription; +import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG; import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_CONFIG; +import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_CONFIG; import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_NUM_PARTITIONS_CONFIG; import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_REPLICATION_FACTOR_CONFIG; import static com.linkedin.kafka.cruisecontrol.metricsreporter.metric.RawMetricType.*; @@ -53,15 +56,22 @@ import static org.junit.Assert.assertTrue; public class CruiseControlMetricsReporterTest extends CCKafkaClientsIntegrationTestHarness { + private static final String KAFKA_VERSION = "3.9.1"; + private static final int NUM_OF_BROKERS = 2; protected static final String TOPIC = "CruiseControlMetricsReporterTest"; private static final String HOST = "127.0.0.1"; + protected CCEmbeddedKraftCluster _cluster; /** * Setup the unit test. */ @Before public void setUp() { - super.setUp(); + List> brokerConfigs = buildBrokerConfigs(); + _cluster = new CCEmbeddedKraftCluster(KAFKA_VERSION, NUM_OF_BROKERS, brokerConfigs); + _cluster.start(); + _bootstrapUrl = _cluster.getExternalBootstrapAddress(); + Properties props = new Properties(); props.setProperty(ProducerConfig.ACKS_CONFIG, "-1"); AtomicInteger failed = new AtomicInteger(0); @@ -80,23 +90,27 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) { assertEquals(0, failed.get()); } + /** + * Cleans up resources after each test method execution. + */ @After public void tearDown() { - super.tearDown(); + _cluster.close(); } @Override public Properties overridingProps() { Properties props = new Properties(); - int port = CCKafkaTestUtils.findLocalPort(); + props.setProperty("listeners", "PLAINTEXT://0.0.0.0:9092,BROKER://0.0.0.0:9093,CONTROLLER://0.0.0.0:9094,TC-0://0.0.0.0:9095"); + props.setProperty("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT,TC-0:PLAINTEXT"); props.setProperty(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, CruiseControlMetricsReporter.class.getName()); - props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://" + HOST + ":" + port); - props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), HOST + ":" + port); - props.setProperty(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG, "100"); - props.setProperty(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_CONFIG, TOPIC); - props.setProperty(ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG, "1"); - props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1"); - props.setProperty(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "2"); + props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), HOST + ":9093"); + props.setProperty(CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG, "100"); + props.setProperty(CRUISE_CONTROL_METRICS_TOPIC_CONFIG, TOPIC); + props.setProperty("log.flush.interval.messages", "1"); + props.setProperty("offsets.topic.replication.factor", "1"); + props.setProperty("default.replication.factor", "2"); + return props; } @@ -185,11 +199,50 @@ public void testReportingMetrics() { assertEquals("Expected " + expectedMetricTypes + ", but saw " + metricTypes, expectedMetricTypes, metricTypes); } + /** + * Waits until the metadata of a Kafka topic changes compared to a previously known description. + * + * @param adminClient The Kafka AdminClient instance used to fetch topic metadata. + * @param oldTopicDescription The original TopicDescription to compare against. + * @param timeout The maximum time to wait for a metadata change. + * + * @return The updated TopicDescription once a change is detected. + */ + private TopicDescription waitForTopicMetadataChange(AdminClient adminClient, + TopicDescription oldTopicDescription, + Duration timeout) throws InterruptedException { + long deadline = System.currentTimeMillis() + timeout.toMillis(); + String topicName = oldTopicDescription.name(); + int oldPartitionCount = oldTopicDescription.partitions().size(); + + while (System.currentTimeMillis() < deadline) { + try { + TopicDescription newTopicDescription = adminClient + .describeTopics(Collections.singleton(topicName)) + .all() + .get(timeout.toMillis(), TimeUnit.MILLISECONDS) + .get(topicName); + + int newPartitionCount = newTopicDescription.partitions().size(); + + if (newPartitionCount != oldPartitionCount) { + return newTopicDescription; + } + + Thread.sleep(500); + } catch (ExecutionException | TimeoutException e) { + // transient error, retry + } + } + + throw new RuntimeException("Timeout waiting for topic metadata change for topic: " + topicName); + } + @Test public void testUpdatingMetricsTopicConfig() throws InterruptedException { Properties props = new Properties(); setSecurityConfigs(props, "admin"); - props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); + props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, _cluster.getExternalBootstrapAddress()); AdminClient adminClient = AdminClient.create(props); // For compatibility with Kafka 4.0 and beyond we must use new API methods. @@ -200,43 +253,45 @@ public void testUpdatingMetricsTopicConfig() throws InterruptedException { throw new RuntimeException(e); } assertEquals(1, topicDescription.partitions().size()); + + KafkaContainer broker = _cluster.getBrokers().get(0); + // Shutdown broker - _brokers.get(0).shutdown(); + broker.stop(); + // Change broker config Map brokerConfig = buildBrokerConfigs().get(0); brokerConfig.put(CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_CONFIG, "true"); brokerConfig.put(CRUISE_CONTROL_METRICS_TOPIC_NUM_PARTITIONS_CONFIG, "2"); brokerConfig.put(CRUISE_CONTROL_METRICS_TOPIC_REPLICATION_FACTOR_CONFIG, "1"); - CCEmbeddedBroker broker = new CCEmbeddedBroker(brokerConfig); + CCEmbeddedKraftCluster.setBrokerConfigViaEnvVars(broker, brokerConfig); + // Restart broker - broker.startup(); + broker.start(); + // Wait for broker to boot up - Thread.sleep(5000); + _cluster.waitUntilBrokerIsReady(adminClient, 0, Duration.ofSeconds(30)); - // Check whether the topic config is updated - try { - topicDescription = getTopicDescription(adminClient, TOPIC); - } catch (KafkaTopicDescriptionException e) { - throw new RuntimeException(e); - } - assertEquals(2, topicDescription.partitions().size()); + // Wait for topic metadata configuration change to propagate + TopicDescription newTopicDescription = waitForTopicMetadataChange(adminClient, topicDescription, Duration.ofSeconds(30)); + + assertEquals(2, newTopicDescription.partitions().size()); } @Test public void testGetKafkaBootstrapServersConfigure() { // Test with a "listeners" config with a host - Map brokerConfig = buildBrokerConfigs().get(0); - Map listenersMap = Collections.singletonMap( - SocketServerConfigs.LISTENERS_CONFIG, brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG)); + Map brokerConfig = _cluster.getBrokers().get(0).getEnvMap(); + Map listenersMap = Collections.singletonMap("listeners", brokerConfig.get("KAFKA_LISTENERS")); String bootstrapServers = CruiseControlMetricsReporter.getBootstrapServers(listenersMap); String urlParse = "\\[?([0-9a-zA-Z\\-%._:]*)]?:(-?[0-9]+)"; Pattern urlParsePattern = Pattern.compile(urlParse); assertTrue(urlParsePattern.matcher(bootstrapServers).matches()); - assertEquals(HOST, bootstrapServers.split(":")[0]); + assertEquals("0.0.0.0", bootstrapServers.split(":")[0]); // Test with a "listeners" config without a host in the first listener. String listeners = "SSL://:1234,PLAINTEXT://myhost:4321"; - listenersMap = Collections.singletonMap(SocketServerConfigs.LISTENERS_CONFIG, listeners); + listenersMap = Collections.singletonMap("listeners", listeners); bootstrapServers = CruiseControlMetricsReporter.getBootstrapServers(listenersMap); assertTrue(urlParsePattern.matcher(bootstrapServers).matches()); assertEquals(DEFAULT_BOOTSTRAP_SERVERS_HOST, bootstrapServers.split(":")[0]); @@ -244,7 +299,7 @@ public void testGetKafkaBootstrapServersConfigure() { // Test with "listeners" and "port" config together. listenersMap = new HashMap<>(); - listenersMap.put(SocketServerConfigs.LISTENERS_CONFIG, listeners); + listenersMap.put("listeners", listeners); listenersMap.put("port", "43"); bootstrapServers = CruiseControlMetricsReporter.getBootstrapServers(listenersMap); assertTrue(urlParsePattern.matcher(bootstrapServers).matches()); diff --git a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCEmbeddedBrokerBuilder.java b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCEmbeddedBrokerBuilder.java index c6a1fcb4fe..5fd34a082d 100644 --- a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCEmbeddedBrokerBuilder.java +++ b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCEmbeddedBrokerBuilder.java @@ -241,9 +241,6 @@ private void validate() throws IllegalArgumentException { if (_logDirectory == null) { throw new IllegalArgumentException("log directory must be specified"); } - if (_zkConnect == null) { - throw new IllegalArgumentException("zkConnect must be specified"); - } } /** diff --git a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCEmbeddedKraftCluster.java b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCEmbeddedKraftCluster.java new file mode 100644 index 0000000000..7dcb49bced --- /dev/null +++ b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCEmbeddedKraftCluster.java @@ -0,0 +1,258 @@ +/* + * Copyright 2017 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.kafka.cruisecontrol.metricsreporter.utils; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.DescribeClusterResult; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.Uuid; +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.kafka.KafkaContainer; +import org.testcontainers.lifecycle.Startable; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * The {@code CCEmbeddedKraftCluster} class creates an embedded KRaft Kafka cluster used for testing purposes. + */ +public class CCEmbeddedKraftCluster implements Startable { + private static final int HOST_PORT_START = 10090; + private static final int MAPPED_CONTAINER_PORT = 9095; + + private final int _brokersNum; + private final Network _network; + private final List _brokers; + + public CCEmbeddedKraftCluster(String brokerVersion, int numOfBrokers, List> brokerConfigs) { + if (numOfBrokers < 0) { + throw new IllegalArgumentException("numOfBrokers '" + numOfBrokers + "' must be greater than 0"); + } + + this._brokersNum = numOfBrokers; + this._network = Network.newNetwork(); + + String controllerQuorumVoters = IntStream + .range(0, numOfBrokers) + .mapToObj(brokerNum -> String.format("%d@broker-%d:9094", brokerNum, brokerNum)) + .collect(Collectors.joining(",")); + + String clusterId = Uuid.randomUuid().toString(); + this._brokers = + IntStream + .range(0, numOfBrokers) + .mapToObj(brokerNum -> { + Map brokerConfig = brokerConfigs.get(brokerNum); + + String networkAlias = "broker-" + brokerNum; + int externalPort = HOST_PORT_START + brokerNum; + + KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("apache/kafka") + .withTag(brokerVersion) + ) { + /* + * At this time, the `configure()` method of the KafkaContainer class overwrites the user configured + * Kafka `listener` and `listener.security.protocol.map` configurations, forcing PLAINTEXT protocol for + * listeners exposed outside the container network. To allow for TLS connections to the TestContainer + * Kafka cluster from clients outside the container network we must override the `configure()` method. + * + * Tracking issue in TestContainer's Kafka module here: + * https://github.com/testcontainers/testcontainers-java/issues/10035 + */ + @Override + protected void configure() { } + } + .withNetwork(this._network) + .withNetworkAliases(networkAlias) + .withExposedPorts(9092, MAPPED_CONTAINER_PORT) + .withListener("0.0.0.0:" + MAPPED_CONTAINER_PORT, () -> "localhost:" + externalPort) + .withEnv("CLUSTER_ID", clusterId) + // Uncomment the following line when debugging Kafka cluster problems. + .withLogConsumer(outputFrame -> System.out.print(networkAlias + " | " + outputFrame.getUtf8String())) + // Uncomment the following line when debugging SSL connection problems. + //.withEnv("KAFKA_OPTS", "-Djavax.net.debug=ssl,handshake") + .withStartupTimeout(Duration.ofMinutes(1)); + kafkaContainer.setPortBindings(List.of(externalPort + ":" + MAPPED_CONTAINER_PORT)); + + brokerConfig.put("broker.id", brokerNum + ""); + brokerConfig.put("node.id", brokerNum + ""); + brokerConfig.put("controller.quorum.voters", controllerQuorumVoters); + + // TestContainers automatically sets `inter.broker.listener.name` so we must disable `security.inter.broker.protocol` + // https://kafka.apache.org/documentation/#brokerconfigs_inter.broker.listener.name + brokerConfig.remove("security.inter.broker.protocol"); + + setBrokerConfigViaEnvVars(kafkaContainer, brokerConfig); + + // Mount metrics reporter and copy generated certs into container + setupContainerResources(kafkaContainer, brokerConfig); + + return kafkaContainer; + }) + .collect(Collectors.toList()); + } + + private void copyCertToContainer(KafkaContainer container, Map config, String key) { + if (config.containsKey(key)) { + String path = config.get(key).toString(); + container.withCopyToContainer(MountableFile.forHostPath(path, 0644), path); + } + } + + private void setupContainerResources(KafkaContainer kafkaContainer, Map brokerConfig) { + Path libsDir = Paths.get("build", "libs").toAbsolutePath().normalize(); + + try { + Path jarPath = Files.list(libsDir) + .filter(path -> path.getFileName().toString().startsWith("cruise-control-metrics-reporter")) + .filter(path -> path.getFileName().toString().endsWith(".jar")) + .findFirst() + .orElseThrow(() -> new IllegalStateException("Cruise Control Metrics Reporter jar not found in: " + libsDir)); + + kafkaContainer.withFileSystemBind( + jarPath.toString(), + "/opt/kafka/libs/cruise-control-metrics-reporter.jar", + BindMode.READ_ONLY + ); + + copyCertToContainer(kafkaContainer, brokerConfig, "ssl.truststore.location"); + copyCertToContainer(kafkaContainer, brokerConfig, "ssl.keystore.location"); + copyCertToContainer(kafkaContainer, brokerConfig, "cruise.control.metrics.reporter.ssl.keystore.location"); + } catch (IOException e) { + throw new RuntimeException("Failed to mount Kafka container resources", e); + } + } + + /** + * Updates the broker configuration of the Kafka container. + * + * @param kafkaContainer The Kafka container to be updated. + * @param brokerConfig A map of properties that will override the default broker + * configuration. + */ + public static void setBrokerConfigViaEnvVars(KafkaContainer kafkaContainer, Map brokerConfig) { + for (Map.Entry entry : brokerConfig.entrySet()) { + String key = String.valueOf(entry.getKey()); + Object rawValue = entry.getValue(); + String value; + + if (rawValue instanceof Collection) { + value = String.join(",", ((Collection) rawValue).stream() + .map(Object::toString) + .toArray(String[]::new)); + } else if (rawValue instanceof String[]) { + value = String.join(",", (String[]) rawValue); + } else if (rawValue instanceof org.apache.kafka.common.config.types.Password) { + value = ((org.apache.kafka.common.config.types.Password) rawValue).value(); + } else if (rawValue instanceof String) { + value = (String) rawValue; + } else { + value = String.valueOf(rawValue); + } + + // Convert Kafka broker properties key to env var format: e.g., log.retention.hours -> KAFKA_LOG_RETENTION_HOURS + String envKey = "KAFKA_" + key.toUpperCase().replace('.', '_'); + kafkaContainer.withEnv(envKey, value); + } + } + + /** + * Returns list of KafkaContainer broker objects within the TestContainer Kafka cluster. + * + * @return List of KafkaContainer broker objects within the TestContainer Kafka cluster. + */ + public List getBrokers() { + return this._brokers; + } + + /** + * Returns a comma-separated list of bootstrap server addresses using plain text protocol. + * + * @return A comma-separated list of bootstrap server addresses using plain text protocol in the form {@code host:port}. + */ + public String getPlainBootstrapAddress() { + return _brokers.stream().map(KafkaContainer::getBootstrapServers).collect(Collectors.joining(",")); + } + + /** + * Returns a comma-separated list of bootstrap server addresses that are reachable from outside + * the Docker network used by the TestContainers Kafka cluster. + * + * @return A comma-separated list of external bootstrap server addresses in the form {@code host:port}. + */ + public String getExternalBootstrapAddress() { + return _brokers.stream() + .map(broker -> String.format("%s:%d", broker.getHost(), broker.getMappedPort(MAPPED_CONTAINER_PORT))) + .collect(Collectors.joining(",")); + } + + @Override + public void start() { + _brokers.parallelStream().forEach(GenericContainer::start); + + Properties props = new Properties(); + props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getPlainBootstrapAddress()); + AdminClient adminClient = AdminClient.create(props); + + IntStream.range(0, _brokersNum).parallel().forEach( + id -> waitUntilBrokerIsReady(adminClient, id, Duration.ofSeconds(30)) + ); + + adminClient.close(); + } + + @Override + public void stop() { + this._brokers.stream().parallel().forEach(GenericContainer::stop); + } + + /** + * Waits until the identified broker is part of the Kafka cluster. + * + * @param adminClient The Kafka AdminClient instance. + * @param brokerId The id of broker. + * @param timeout The maximum duration to wait for the broker to join the cluster. + * @throws RuntimeException if the broker does not become part of the cluster before the timeout, + * or if the thread is interrupted while waiting. + */ + public void waitUntilBrokerIsReady(AdminClient adminClient, int brokerId, Duration timeout) { + long deadline = System.currentTimeMillis() + timeout.toMillis(); + + while (System.currentTimeMillis() < deadline) { + try { + DescribeClusterResult cluster = adminClient.describeCluster(); + Collection nodes = cluster.nodes().get(); + + boolean found = nodes.stream().anyMatch(node -> node.id() == brokerId); + if (found) { + return; + } + + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting for broker to become ready", e); + } catch (ExecutionException e) { + // Cluster info might not be ready yet, ignore and retry + } + } + + throw new RuntimeException("Broker " + brokerId + " did not become ready within timeout"); + } +} diff --git a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCKafkaIntegrationTestHarness.java b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCKafkaIntegrationTestHarness.java index 1b547f2523..3f57546da1 100644 --- a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCKafkaIntegrationTestHarness.java +++ b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCKafkaIntegrationTestHarness.java @@ -91,7 +91,10 @@ protected List> buildBrokerConfigs() { protected Map createBrokerConfig(int brokerId) { CCEmbeddedBrokerBuilder builder = new CCEmbeddedBrokerBuilder(); - builder.zkConnect(zookeeper()); + CCEmbeddedZookeeper zookeeper = zookeeper(); + if (zookeeper != null) { + builder.zkConnect(zookeeper); + } builder.nodeId(brokerId); builder.enable(securityProtocol()); if (securityProtocol() == SecurityProtocol.SSL) {