diff --git a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCContainerizedKraftCluster.java b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCContainerizedKraftCluster.java index 707415dd9..80b34e964 100644 --- a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCContainerizedKraftCluster.java +++ b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCContainerizedKraftCluster.java @@ -396,6 +396,25 @@ public void shutDownBroker(int brokerId) { timeout, String.format("Broker %s did not shutdown properly.", brokerId) ); + + // Wait until describeLogDirs fails + waitUntil( + () -> { + try { + _adminClient.describeLogDirs(Collections.singletonList(brokerId)) + .allDescriptions() + .get(5, TimeUnit.SECONDS); + return false; + } catch (InterruptedException ie) { + throw new RuntimeException(ie); + } catch (Exception e) { + return true; + } + }, + result -> result, + timeout, + String.format("Broker %s did not fully shut down (logDirs RPC still succeeds).", brokerId) + ); } public static class BrokerWaitStrategy extends AbstractWaitStrategy { diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java index 42ec5fa84..863569ba0 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java @@ -869,9 +869,17 @@ private void executeAndVerifyProposals(AdminClient adminClient, topic.partitions().get(tp.partition()).replicas().stream().anyMatch(rep -> r.brokerId().equals(rep.id()))); } } - assertEquals("The leader should have moved for " + tp, - proposal.newLeader().brokerId().intValue(), topic.partitions().get(tp.partition()).leader().id()); + // Wait for leader movement + //TopicDescription topicAfterLeaderMove = _cluster.waitForTopicMetadata(tp.topic(), Duration.ofSeconds(60), + // topicDescription -> topicDescription.partitions().get(tp.partition()).leader().id() + // == proposal.newLeader().brokerId()); + assertEquals("The leader should have moved for " + tp + + " proposal.newReplicas: " + proposal.newReplicas() + + " proposal.old.replicas " + proposal.oldReplicas() + + " topic.leader " + topic.partitions().get(tp.partition()).leader().id(), + proposal.newLeader().brokerId(), + Integer.valueOf(topic.partitions().get(tp.partition()).leader().id())); } if (isTriggeredByUserRequest) { EasyMock.verify(mockUserTaskInfo, mockUserTaskManager, mockExecutorNotifier, mockLoadMonitor, mockAnomalyDetectorManager);