Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ project(':cruise-control-metrics-reporter') {
testImplementation "org.apache.kafka:kafka-storage:$kafkaVersion"
testImplementation 'commons-io:commons-io:2.11.0'
testImplementation "org.apache.zookeeper:zookeeper:${zookeeperVersion}"
testImplementation "org.testcontainers:kafka:1.21.3"
testOutput sourceSets.test.output
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,16 @@
import java.io.File;
import java.io.IOException;
import java.util.Properties;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaTestUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.junit.Assert;
import javax.net.ssl.KeyManagerFactory;

import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_CONFIG;


public class CruiseControlMetricsReporterSslTest extends CruiseControlMetricsReporterTest {

private File _trustStoreFile;
Expand All @@ -37,7 +33,6 @@ public CruiseControlMetricsReporterSslTest() {
@Override
public Properties overridingProps() {
Properties props = new Properties();
int port = CCKafkaTestUtils.findLocalPort();
// We need to convert all the properties to the Cruise Control properties.
setSecurityConfigs(props, "producer");
for (String configName : ProducerConfig.configNames()) {
Expand All @@ -48,14 +43,19 @@ public Properties overridingProps() {
}
}
props.setProperty(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, CruiseControlMetricsReporter.class.getName());
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "SSL://127.0.0.1:" + port);
props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), "127.0.0.1:" + port);
props.setProperty("listeners", "PLAINTEXT://0.0.0.0:9092,BROKER://0.0.0.0:9093,CONTROLLER://0.0.0.0:9094,TC-0://0.0.0.0:9095");
props.setProperty("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,BROKER:SSL,CONTROLLER:PLAINTEXT,TC-0:SSL");
props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), "127.0.0.1:" + "9093");
props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG), SecurityProtocol.SSL.name);
props.setProperty(CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG, "100");
props.setProperty(CRUISE_CONTROL_METRICS_TOPIC_CONFIG, TOPIC);
props.setProperty(ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG, "1");
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
props.setProperty(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "2");
props.setProperty("log.flush.interval.messages", "1");
props.setProperty("offsets.topic.replication.factor", "1");
props.setProperty("default.replication.factor", "2");

// The Kafka brokers should use the same key manager algorithm as the host that generates the certs
props.setProperty(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, KeyManagerFactory.getDefaultAlgorithm());

return props;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,25 @@
import com.linkedin.kafka.cruisecontrol.metricsreporter.exception.KafkaTopicDescriptionException;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.CruiseControlMetric;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.MetricSerde;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCEmbeddedBroker;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCEmbeddedKraftCluster;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaClientsIntegrationTestHarness;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaTestUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand All @@ -34,34 +38,40 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.testcontainers.kafka.KafkaContainer;

import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter.DEFAULT_BOOTSTRAP_SERVERS_HOST;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter.DEFAULT_BOOTSTRAP_SERVERS_PORT;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter.getTopicDescription;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_CONFIG;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_CONFIG;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_NUM_PARTITIONS_CONFIG;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_REPLICATION_FACTOR_CONFIG;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.metric.RawMetricType.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class CruiseControlMetricsReporterTest extends CCKafkaClientsIntegrationTestHarness {
private static final String KAFKA_VERSION = "3.9.1";
private static final int NUM_OF_BROKERS = 2;
protected static final String TOPIC = "CruiseControlMetricsReporterTest";
private static final String HOST = "127.0.0.1";
protected CCEmbeddedKraftCluster _cluster;

/**
* Setup the unit test.
*/
@Before
public void setUp() {
super.setUp();
List<Map<Object, Object>> brokerConfigs = buildBrokerConfigs();
_cluster = new CCEmbeddedKraftCluster(KAFKA_VERSION, NUM_OF_BROKERS, brokerConfigs);
_cluster.start();
_bootstrapUrl = _cluster.getExternalBootstrapAddress();

Properties props = new Properties();
props.setProperty(ProducerConfig.ACKS_CONFIG, "-1");
AtomicInteger failed = new AtomicInteger(0);
Expand All @@ -80,23 +90,27 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
assertEquals(0, failed.get());
}

/**
* Cleans up resources after each test method execution.
*/
@After
public void tearDown() {
super.tearDown();
_cluster.close();
}

@Override
public Properties overridingProps() {
Properties props = new Properties();
int port = CCKafkaTestUtils.findLocalPort();
props.setProperty("listeners", "PLAINTEXT://0.0.0.0:9092,BROKER://0.0.0.0:9093,CONTROLLER://0.0.0.0:9094,TC-0://0.0.0.0:9095");
props.setProperty("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT,TC-0:PLAINTEXT");
props.setProperty(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, CruiseControlMetricsReporter.class.getName());
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://" + HOST + ":" + port);
props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), HOST + ":" + port);
props.setProperty(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG, "100");
props.setProperty(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_CONFIG, TOPIC);
props.setProperty(ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG, "1");
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
props.setProperty(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "2");
props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), HOST + ":9093");
props.setProperty(CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG, "100");
props.setProperty(CRUISE_CONTROL_METRICS_TOPIC_CONFIG, TOPIC);
props.setProperty("log.flush.interval.messages", "1");
props.setProperty("offsets.topic.replication.factor", "1");
props.setProperty("default.replication.factor", "2");

return props;
}

Expand Down Expand Up @@ -185,11 +199,50 @@ public void testReportingMetrics() {
assertEquals("Expected " + expectedMetricTypes + ", but saw " + metricTypes, expectedMetricTypes, metricTypes);
}

/**
* Waits until the metadata of a Kafka topic changes compared to a previously known description.
*
* @param adminClient The Kafka AdminClient instance used to fetch topic metadata.
* @param oldTopicDescription The original TopicDescription to compare against.
* @param timeout The maximum time to wait for a metadata change.
*
* @return The updated TopicDescription once a change is detected.
*/
private TopicDescription waitForTopicMetadataChange(AdminClient adminClient,
TopicDescription oldTopicDescription,
Duration timeout) throws InterruptedException {
long deadline = System.currentTimeMillis() + timeout.toMillis();
String topicName = oldTopicDescription.name();
int oldPartitionCount = oldTopicDescription.partitions().size();

while (System.currentTimeMillis() < deadline) {
try {
TopicDescription newTopicDescription = adminClient
.describeTopics(Collections.singleton(topicName))
.all()
.get(timeout.toMillis(), TimeUnit.MILLISECONDS)
.get(topicName);

int newPartitionCount = newTopicDescription.partitions().size();

if (newPartitionCount != oldPartitionCount) {
return newTopicDescription;
}

Thread.sleep(500);
} catch (ExecutionException | TimeoutException e) {
// transient error, retry
}
}

throw new RuntimeException("Timeout waiting for topic metadata change for topic: " + topicName);
}

@Test
public void testUpdatingMetricsTopicConfig() throws InterruptedException {
Properties props = new Properties();
setSecurityConfigs(props, "admin");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, _cluster.getExternalBootstrapAddress());
AdminClient adminClient = AdminClient.create(props);

// For compatibility with Kafka 4.0 and beyond we must use new API methods.
Expand All @@ -200,51 +253,53 @@ public void testUpdatingMetricsTopicConfig() throws InterruptedException {
throw new RuntimeException(e);
}
assertEquals(1, topicDescription.partitions().size());

KafkaContainer broker = _cluster.getBrokers().get(0);

// Shutdown broker
_brokers.get(0).shutdown();
broker.stop();

// Change broker config
Map<Object, Object> brokerConfig = buildBrokerConfigs().get(0);
brokerConfig.put(CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_CONFIG, "true");
brokerConfig.put(CRUISE_CONTROL_METRICS_TOPIC_NUM_PARTITIONS_CONFIG, "2");
brokerConfig.put(CRUISE_CONTROL_METRICS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
CCEmbeddedBroker broker = new CCEmbeddedBroker(brokerConfig);
CCEmbeddedKraftCluster.setBrokerConfigViaEnvVars(broker, brokerConfig);

// Restart broker
broker.startup();
broker.start();

// Wait for broker to boot up
Thread.sleep(5000);
_cluster.waitUntilBrokerIsReady(adminClient, 0, Duration.ofSeconds(30));

// Check whether the topic config is updated
try {
topicDescription = getTopicDescription(adminClient, TOPIC);
} catch (KafkaTopicDescriptionException e) {
throw new RuntimeException(e);
}
assertEquals(2, topicDescription.partitions().size());
// Wait for topic metadata configuration change to propagate
TopicDescription newTopicDescription = waitForTopicMetadataChange(adminClient, topicDescription, Duration.ofSeconds(30));

assertEquals(2, newTopicDescription.partitions().size());
}

@Test
public void testGetKafkaBootstrapServersConfigure() {
// Test with a "listeners" config with a host
Map<Object, Object> brokerConfig = buildBrokerConfigs().get(0);
Map<String, Object> listenersMap = Collections.singletonMap(
SocketServerConfigs.LISTENERS_CONFIG, brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG));
Map<String, String> brokerConfig = _cluster.getBrokers().get(0).getEnvMap();
Map<String, Object> listenersMap = Collections.singletonMap("listeners", brokerConfig.get("KAFKA_LISTENERS"));
String bootstrapServers = CruiseControlMetricsReporter.getBootstrapServers(listenersMap);
String urlParse = "\\[?([0-9a-zA-Z\\-%._:]*)]?:(-?[0-9]+)";
Pattern urlParsePattern = Pattern.compile(urlParse);
assertTrue(urlParsePattern.matcher(bootstrapServers).matches());
assertEquals(HOST, bootstrapServers.split(":")[0]);
assertEquals("0.0.0.0", bootstrapServers.split(":")[0]);

// Test with a "listeners" config without a host in the first listener.
String listeners = "SSL://:1234,PLAINTEXT://myhost:4321";
listenersMap = Collections.singletonMap(SocketServerConfigs.LISTENERS_CONFIG, listeners);
listenersMap = Collections.singletonMap("listeners", listeners);
bootstrapServers = CruiseControlMetricsReporter.getBootstrapServers(listenersMap);
assertTrue(urlParsePattern.matcher(bootstrapServers).matches());
assertEquals(DEFAULT_BOOTSTRAP_SERVERS_HOST, bootstrapServers.split(":")[0]);
assertEquals("1234", bootstrapServers.split(":")[1]);

// Test with "listeners" and "port" config together.
listenersMap = new HashMap<>();
listenersMap.put(SocketServerConfigs.LISTENERS_CONFIG, listeners);
listenersMap.put("listeners", listeners);
listenersMap.put("port", "43");
bootstrapServers = CruiseControlMetricsReporter.getBootstrapServers(listenersMap);
assertTrue(urlParsePattern.matcher(bootstrapServers).matches());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,6 @@ private void validate() throws IllegalArgumentException {
if (_logDirectory == null) {
throw new IllegalArgumentException("log directory must be specified");
}
if (_zkConnect == null) {
throw new IllegalArgumentException("zkConnect must be specified");
}
}

/**
Expand Down
Loading
Loading