From a38429fa16c6d230e58a01016588ab19b784fbc7 Mon Sep 17 00:00:00 2001 From: Abhishek Ravi Date: Wed, 24 Apr 2019 00:48:33 -0700 Subject: [PATCH 1/7] RM planner test and framework changes - Added a new test class for testing RM planner. - Added a test case for a basic scenario. - Updated DrillQueryProfile to get optimal memory per node. - Refactored / improved existing code. --- .../test/framework/DrillQueryProfile.java | 68 +++++++-- .../drill/test/framework/DrillRMConfig.java | 6 +- .../DrillTestFrameworkUnitTests.java | 13 +- .../framework/common/DrillTestNGDefaults.java | 11 +- .../resourcemanagement/QueuePlannerTests.java | 140 ++++++++++++++++++ .../QueueSelectionTests.java | 16 +- ...c-drill-rm-override.conf => basic-rm.conf} | 0 framework/src/test/resources/default-rm.conf | 15 ++ ...le-drill-rm-override.conf => prod-rm.conf} | 0 framework/testng-java.xml | 1 + 10 files changed, 234 insertions(+), 36 deletions(-) create mode 100644 framework/src/test/java/org/apache/drill/test/framework/resourcemanagement/QueuePlannerTests.java rename framework/src/test/resources/{basic-drill-rm-override.conf => basic-rm.conf} (100%) create mode 100644 framework/src/test/resources/default-rm.conf rename framework/src/test/resources/{sample-drill-rm-override.conf => prod-rm.conf} (100%) diff --git a/framework/src/main/java/org/apache/drill/test/framework/DrillQueryProfile.java b/framework/src/main/java/org/apache/drill/test/framework/DrillQueryProfile.java index 83c5b9597..d90e06ee9 100644 --- a/framework/src/main/java/org/apache/drill/test/framework/DrillQueryProfile.java +++ b/framework/src/main/java/org/apache/drill/test/framework/DrillQueryProfile.java @@ -6,6 +6,7 @@ import oadd.org.apache.drill.exec.proto.UserBitShared; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; @JsonIgnoreProperties(ignoreUnknown = true) @@ -203,12 +204,12 @@ public String toString() { } /** - * Get optimal memory allocated per operator. - * Utility parses the DrillQueryProfile + * Get total optimal memory allocated (in bytes) for specified operator, across all drillbits. + * * @param operator * @return */ - public long getOptimalMemoryPerOperator(final UserBitShared.CoreOperatorType operator) { + public long getTotalOptimalMemoryPerOperator(final UserBitShared.CoreOperatorType operator) { return this.fragmentProfiles .stream() .flatMap(f -> f.minorFragmentProfiles @@ -219,6 +220,56 @@ public long getOptimalMemoryPerOperator(final UserBitShared.CoreOperatorType ope .sum(); } + /** + * Returns the max of optimal memory allocated (in bytes) to specified operator on a drillbit. + * + * @param operator + * @return + */ + public long getOptimalMemoryPerOperatorPerNode(final UserBitShared.CoreOperatorType operator) { + return this.fragmentProfiles.stream().flatMap(f -> f.minorFragmentProfiles + .stream()) + .collect(Collectors.groupingBy(m -> m.endpoint.address)) + .entrySet() + .stream() + .collect(Collectors + .toMap(Map.Entry::getKey, + e -> e.getValue() + .stream() + .flatMap(m -> m.operatorProfiles + .stream() + .filter(o -> o.operatorId == operator.getNumber())) + .mapToLong(o -> o.optimalMemAllocation) + .sum())) + .entrySet() + .stream() + .mapToLong(Map.Entry::getValue) + .max() + .orElse(0); + } + + /** + * Total optimal memory required (in bytes) for the query. + * @return total optimal memory required for the query (as estimated by the RM planner). + */ + public long getTotalOptimalMemory() { + return getOperatorsFromProfile() + .stream() + .mapToLong(this::getTotalOptimalMemoryPerOperator) + .sum(); + } + + /** + * Returns the maximum of estimated optimal memory (in bytes) required on a drillbit. + * @return total optimal memory required for the query (as estimated by the RM planner). + */ + public long getTotalOptimalMemoryPerNode() { + return getOperatorsFromProfile() + .stream() + .mapToLong(this::getOptimalMemoryPerOperatorPerNode) + .sum(); + } + /** * Get different operators in the profile. * @return a list of operators in the query profile. @@ -233,17 +284,6 @@ public List getOperatorsFromProfile() { .mapToObj(UserBitShared.CoreOperatorType::forNumber) .collect(Collectors.toList()); } - - /** - * Total optimal memory required for the query. - * @return total optimal memory required for the query (as estimated by the RM planner). - */ - public long getTotalOptimalMemoryEstimate() { - return getOperatorsFromProfile() - .stream() - .mapToLong(this::getOptimalMemoryPerOperator) - .sum(); - } } diff --git a/framework/src/main/java/org/apache/drill/test/framework/DrillRMConfig.java b/framework/src/main/java/org/apache/drill/test/framework/DrillRMConfig.java index 3c97fc823..f3ba7b29c 100644 --- a/framework/src/main/java/org/apache/drill/test/framework/DrillRMConfig.java +++ b/framework/src/main/java/org/apache/drill/test/framework/DrillRMConfig.java @@ -163,7 +163,7 @@ public String render(final int acc) { public static class QueueConfig implements DrillConfigRenderer { @JsonProperty(QUEUE_MAX_QUERY_MEMORY_PER_NODE_KEY) - public long maxQueryMemoryPerNodeInMB; + public long maxQueryMemoryPerNode; //in bytes @JsonProperty(QUEUE_MAX_WAITING_KEY) public int maxWaitingQueries; @@ -185,9 +185,9 @@ public String render(final int acc) { StringBuilder sb = new StringBuilder("{\n"); final int nextAcc = acc+2; - if (maxQueryMemoryPerNodeInMB > 0) { + if (maxQueryMemoryPerNode > 0) { ensureAtleastOneField = true; - sb.append(formatConfig(nextAcc, QUEUE_MAX_QUERY_MEMORY_PER_NODE_KEY, maxQueryMemoryPerNodeInMB)); + sb.append(formatConfig(nextAcc, QUEUE_MAX_QUERY_MEMORY_PER_NODE_KEY, maxQueryMemoryPerNode)); } if (maxWaitingQueries > 0) { diff --git a/framework/src/test/java/org/apache/drill/test/framework/DrillTestFrameworkUnitTests.java b/framework/src/test/java/org/apache/drill/test/framework/DrillTestFrameworkUnitTests.java index d7c39945b..7897a5f51 100644 --- a/framework/src/test/java/org/apache/drill/test/framework/DrillTestFrameworkUnitTests.java +++ b/framework/src/test/java/org/apache/drill/test/framework/DrillTestFrameworkUnitTests.java @@ -22,7 +22,7 @@ import static org.apache.drill.test.framework.DrillTestDefaults.DRILL_EXEC_RM_CONFIG_KEY; import static org.apache.drill.test.framework.common.DrillTestNGDefaults.UNIT_GROUP; -import static org.apache.drill.test.framework.common.DrillTestNGDefaults.SAMPLE_RM_CONFIG_NAME; +import static org.apache.drill.test.framework.common.DrillTestNGDefaults.PROD_RM_CONFIG_FILEPATH; @Test(groups = UNIT_GROUP) public class DrillTestFrameworkUnitTests extends DrillJavaTestBase { @@ -84,7 +84,7 @@ public void testQueryProfileDoesNotExist() { @Test(groups = UNIT_GROUP) public void testReadSampleRMConfigFile() { try { - DrillRMConfig drillRMConfig = DrillRMConfig.load(SAMPLE_RM_CONFIG_NAME); + DrillRMConfig drillRMConfig = DrillRMConfig.load(PROD_RM_CONFIG_FILEPATH); Assert.assertEquals(drillRMConfig.poolName, "root", "Root resource pool name did not match"); @@ -112,7 +112,7 @@ public void testLoadConfigWhenFileDoesNotExist() throws IOException { @Test(groups = UNIT_GROUP) public void testConfigFileRenderer() { try { - DrillRMConfig drillRMConfig = DrillRMConfig.load(SAMPLE_RM_CONFIG_NAME); + DrillRMConfig drillRMConfig = DrillRMConfig.load(PROD_RM_CONFIG_FILEPATH); Assert.assertEquals(drillRMConfig.poolName, "root", "Root resource pool name did not match"); @@ -149,7 +149,7 @@ public void testWriteRMConfigToFile() throws IOException { } try (BufferedWriter writer = new BufferedWriter(new FileWriter(filePath))) { - DrillRMConfig drillRMConfig = DrillRMConfig.load(SAMPLE_RM_CONFIG_NAME); + DrillRMConfig drillRMConfig = DrillRMConfig.load(PROD_RM_CONFIG_FILEPATH); writer.write(DRILL_EXEC_RM_CONFIG_KEY + ":" + drillRMConfig.render()); } @@ -178,14 +178,15 @@ public void testTotalMemoryForQueryProfile() { DrillQueryProfile profile = Utils.getQueryProfile(queryId); Assert.assertEquals(profile.queryId, queryId); - long rmMemEstimate = profile.getTotalOptimalMemoryEstimate(); + long rmMemEstimate = profile.getTotalOptimalMemoryPerNode(); LOG.info("Memory estimated by RM planner: " + rmMemEstimate); Assert.assertTrue(rmMemEstimate > 0, "RM estimated memory should be greater than 0"); List operators = profile.getOperatorsFromProfile(); Assert.assertTrue(operators.size() > 0, "Number of operators in the profile should be greater than 0"); - operators.forEach(LOG::info); + operators.forEach(o -> LOG.info("Operator: " + o + ", Optimal Memory per Node in bytes: " + + profile.getOptimalMemoryPerOperatorPerNode(o))); } catch (Exception e) { e.printStackTrace(); Assert.fail(e.getMessage()); diff --git a/framework/src/test/java/org/apache/drill/test/framework/common/DrillTestNGDefaults.java b/framework/src/test/java/org/apache/drill/test/framework/common/DrillTestNGDefaults.java index fbc4753e7..0aadb756d 100644 --- a/framework/src/test/java/org/apache/drill/test/framework/common/DrillTestNGDefaults.java +++ b/framework/src/test/java/org/apache/drill/test/framework/common/DrillTestNGDefaults.java @@ -5,13 +5,16 @@ public final class DrillTestNGDefaults { public static final String FUNCTIONAL_GROUP = "functional"; public static final String UNIT_GROUP = "unit"; - public static final String SAMPLE_RM_CONFIG_NAME = - DrillTestDefaults.CWD + "/src/test/resources/sample-drill-rm-override.conf"; - public static final String BASIC_RM_CONFIG_NAME = - DrillTestDefaults.CWD + "/src/test/resources/basic-drill-rm-override.conf"; + public static final String PROD_RM_CONFIG_FILEPATH = + DrillTestDefaults.CWD + "/src/test/resources/prod-rm.conf"; + public static final String BASIC_RM_CONFIG_FILEPATH = + DrillTestDefaults.CWD + "/src/test/resources/basic-rm.conf"; + public static final String DEFAULT_RM_CONFIG_FILEPATH = + DrillTestDefaults.CWD + "/src/test/resources/default-rm.conf"; public static String CONNECTION_URL_FOR_DRILLBIT(final String hostnameOrIp) { return String.format("jdbc:drill:drillbit=%s", hostnameOrIp); } public static final String NO_RESOURCE_POOL_ERROR = "No resource pools to choose from for the query"; + public static final String TPCH_01_PARQUET_SCHEMA = "dfs.drilltestdirtpch01parquet"; } diff --git a/framework/src/test/java/org/apache/drill/test/framework/resourcemanagement/QueuePlannerTests.java b/framework/src/test/java/org/apache/drill/test/framework/resourcemanagement/QueuePlannerTests.java new file mode 100644 index 000000000..62156d9c6 --- /dev/null +++ b/framework/src/test/java/org/apache/drill/test/framework/resourcemanagement/QueuePlannerTests.java @@ -0,0 +1,140 @@ +package org.apache.drill.test.framework.resourcemanagement; + +import com.google.common.base.Preconditions; +import org.apache.drill.test.framework.ConnectionPool; +import org.apache.drill.test.framework.DrillQueryProfile; +import org.apache.drill.test.framework.DrillRMConfig; +import org.apache.drill.test.framework.Utils; +import org.apache.drill.test.framework.common.DrillJavaTestBase; +import org.apache.drill.test.framework.common.DrillTestNGDefaults; +import org.apache.log4j.Logger; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Properties; + +import static org.apache.drill.test.framework.DrillTestDefaults.DRILL_HOME; +import static org.apache.drill.test.framework.DrillTestDefaults.DRILL_RM_OVERRIDE_CONF_FILENAME; +import static org.apache.drill.test.framework.common.DrillTestNGDefaults.DEFAULT_RM_CONFIG_FILEPATH; +import static org.apache.drill.test.framework.common.DrillTestNGDefaults.FUNCTIONAL_GROUP; +import static org.apache.drill.test.framework.common.DrillTestNGDefaults.TPCH_01_PARQUET_SCHEMA; + +@Test(groups = FUNCTIONAL_GROUP) +public class QueuePlannerTests extends DrillJavaTestBase { + private static final Logger LOG = Logger.getLogger(QueuePlannerTests.class); + + @BeforeClass(alwaysRun = true, description = "Invoked before all tests in the class") + private void setup() throws IOException { + cleanup(true); + } + + @AfterMethod(alwaysRun = true, description = "Invoked after all tests in the class are executed") + private void cleanup() { + cleanup(true); + } + + private void cleanup(final boolean restart) { + Preconditions.checkNotNull(connectionPool, + "Cleanup failed! Connection pool has not be instantiated"); + Preconditions.checkNotNull(drillCluster, + "Cleanup failed! Drill cluster information is unavailable"); + drillCluster.runCommand("rm -rf " + DRILL_HOME + "/conf/" + DRILL_RM_OVERRIDE_CONF_FILENAME); + if(restart) { + Utils.restartDrillbits(drillCluster); + } + } + + /** + * Test validates that if max_query_memory_per_node is set to a value + * greater than optimal memory estimated by RM planner, + * the query runs to completion in the queue. + * + * Test also validates that the estimation for optimal memory required does NOT change. + * + * @throws IOException + */ + @Test(groups = FUNCTIONAL_GROUP) + public void testLargerThanOptimalProject() throws IOException { + final String query = "SELECT o_orderkey, o_totalprice, o_comment FROM orders"; + final String queryTag = "test"; //Set tag such that both dev and test are eligible + final String expectedPoolName = "root"; + long expectedOptimalMemoryPerNode = 0; //initialize + long expectedRowCount = 0; //initialize + + final Properties props = Utils.createConnectionProperties( + TPCH_01_PARQUET_SCHEMA, null, queryTag); + + try(Connection conn = ConnectionPool + .createConnection(DrillTestNGDefaults.CONNECTION_URL_FOR_DRILLBIT( + drillCluster.getHosts().get(0)), + null, + null, + props); //Create a connection based on hostname and properties + + Statement stmt = conn.createStatement(); + ResultSet res = stmt.executeQuery(query)) { + final String queryId = Utils.getQueryID(res); //Get query id + LOG.info("QueryID: " + queryId + ", Query: " + query); + + final DrillQueryProfile queryProfile = Utils.getQueryProfile(queryId); //Get query profile + expectedOptimalMemoryPerNode = queryProfile.getTotalOptimalMemoryPerNode(); + LOG.info("QueryID: " + queryId + ", Estimated total optimal memory per node: " + + expectedOptimalMemoryPerNode); + + DrillRMConfig config = DrillRMConfig.load(DEFAULT_RM_CONFIG_FILEPATH); + config.queue.maxQueryMemoryPerNode = (long)(queryProfile.getTotalOptimalMemoryPerNode() * 1.2); + + LOG.info("Setting max_query_memory_per_node: " + config.queue.maxQueryMemoryPerNode); + + Utils.applyRMConfigToDrillCluster(config, drillCluster); + Utils.restartDrillbits(drillCluster); + + long rowCount = 0; + while(res.next()) { + rowCount++; + } + expectedRowCount = rowCount; + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + + try(Connection conn = ConnectionPool + .createConnection(DrillTestNGDefaults.CONNECTION_URL_FOR_DRILLBIT( + drillCluster.getHosts().get(0)), + null, + null, + props); //Create a connection based on hostname and properties + + Statement stmt = conn.createStatement(); + ResultSet res = stmt.executeQuery(query)) { + + final String queryId = Utils.getQueryID(res); //Get query id + LOG.info("QueryID: " + queryId + ", Query: " + query); + + final DrillQueryProfile queryProfile = Utils.getQueryProfile(queryId); //Get query profile + //Validate that the query was allowed into the queue + Assert.assertEquals(queryProfile.queueName, expectedPoolName, + "QueryID: " + queryId + " - The pool names do not match!"); + + Assert.assertEquals(queryProfile.getTotalOptimalMemoryPerNode(), + expectedOptimalMemoryPerNode, + "Optimal memory per node estimation did not match"); + + long rowCount = 0; + while(res.next()) { + rowCount++; + } + Assert.assertEquals(rowCount, expectedRowCount, "Number of rows returned did not match!"); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } +} diff --git a/framework/src/test/java/org/apache/drill/test/framework/resourcemanagement/QueueSelectionTests.java b/framework/src/test/java/org/apache/drill/test/framework/resourcemanagement/QueueSelectionTests.java index 169e469b6..79245e8bf 100644 --- a/framework/src/test/java/org/apache/drill/test/framework/resourcemanagement/QueueSelectionTests.java +++ b/framework/src/test/java/org/apache/drill/test/framework/resourcemanagement/QueueSelectionTests.java @@ -16,9 +16,7 @@ import static org.apache.drill.test.framework.DrillTestDefaults.DRILL_HOME; import static org.apache.drill.test.framework.DrillTestDefaults.DRILL_RM_OVERRIDE_CONF_FILENAME; -import static org.apache.drill.test.framework.common.DrillTestNGDefaults.FUNCTIONAL_GROUP; -import static org.apache.drill.test.framework.common.DrillTestNGDefaults.NO_RESOURCE_POOL_ERROR; -import static org.apache.drill.test.framework.common.DrillTestNGDefaults.BASIC_RM_CONFIG_NAME; +import static org.apache.drill.test.framework.common.DrillTestNGDefaults.*; @SuppressWarnings("Duplicates") @Test(groups = FUNCTIONAL_GROUP) @@ -28,7 +26,7 @@ public class QueueSelectionTests extends DrillJavaTestBase { @BeforeClass(alwaysRun = true, description = "Invoked before all tests in the class") private void setup() throws IOException { cleanup(false); - DrillRMConfig config = DrillRMConfig.load(BASIC_RM_CONFIG_NAME); + DrillRMConfig config = DrillRMConfig.load(BASIC_RM_CONFIG_FILEPATH); Utils.applyRMConfigToDrillCluster(config, drillCluster); Utils.restartDrillbits(drillCluster); } @@ -69,7 +67,7 @@ public void testTagSelectorBasic() throws IOException { //Build a connection with queryTag final Properties props = Utils.createConnectionProperties( - "dfs.drilltestdirtpch01parquet", null, queryTag); + TPCH_01_PARQUET_SCHEMA, null, queryTag); try(Connection conn = ConnectionPool .createConnection( @@ -123,7 +121,7 @@ public void testAclSelectorForUser() throws IOException { //Build a connection with only schema final Properties props = Utils.createConnectionProperties( - "dfs.drilltestdirtpch01parquet", null, null); + TPCH_01_PARQUET_SCHEMA, null, null); try(Connection conn = ConnectionPool .createConnection( @@ -179,7 +177,7 @@ public void testTagSelectBestFitPool() throws IOException { final String user = "bob"; final Properties props = Utils.createConnectionProperties( - "dfs.drilltestdirtpch01parquet", null, queryTag); + TPCH_01_PARQUET_SCHEMA, null, queryTag); try(Connection conn = ConnectionPool .createConnection(DrillTestNGDefaults.CONNECTION_URL_FOR_DRILLBIT( @@ -229,7 +227,7 @@ public void testUnknownUserDoesNotAllowQuery() throws IOException { "FROM orders " + "ORDER BY o_orderkey " + "DESC limit 1"; - final Properties props = Utils.createConnectionProperties("dfs.drilltestdirtpch01parquet", + final Properties props = Utils.createConnectionProperties(TPCH_01_PARQUET_SCHEMA, null, null); //NO Query Tags try(Connection conn = ConnectionPool @@ -266,7 +264,7 @@ public void testUnknownTagDoesNotAllowQuery() { "ORDER BY o_orderkey " + "DESC limit 1"; final String queryTag = "marketing"; //This tag is not configured for basic RM template - final Properties props = Utils.createConnectionProperties("dfs.drilltestdirtpch01parquet", + final Properties props = Utils.createConnectionProperties(TPCH_01_PARQUET_SCHEMA, null, queryTag); //NO Query Tags try(Connection conn = ConnectionPool diff --git a/framework/src/test/resources/basic-drill-rm-override.conf b/framework/src/test/resources/basic-rm.conf similarity index 100% rename from framework/src/test/resources/basic-drill-rm-override.conf rename to framework/src/test/resources/basic-rm.conf diff --git a/framework/src/test/resources/default-rm.conf b/framework/src/test/resources/default-rm.conf new file mode 100644 index 000000000..616cf7674 --- /dev/null +++ b/framework/src/test/resources/default-rm.conf @@ -0,0 +1,15 @@ +drill.exec.rm: { + pool_name: "root", + memory: 1.0, // 90% of total direct memory allocated to Drill + queue_selection_policy: "bestfit", // policy to select queue for a query when multiple queues are eligible + selector: { + tag: "test", + }, + queue: { + max_query_memory_per_node: 1073741824 // supported format regex [0-9]*[kKmMgG]? + max_waiting: 10, // default + max_admissible: 10, // default + max_wait_timeout: 30000, // default in ms + wait_for_preferred_nodes: true // default + } +} \ No newline at end of file diff --git a/framework/src/test/resources/sample-drill-rm-override.conf b/framework/src/test/resources/prod-rm.conf similarity index 100% rename from framework/src/test/resources/sample-drill-rm-override.conf rename to framework/src/test/resources/prod-rm.conf diff --git a/framework/testng-java.xml b/framework/testng-java.xml index 3890701ab..8c97ae543 100644 --- a/framework/testng-java.xml +++ b/framework/testng-java.xml @@ -4,6 +4,7 @@ + From 8f11f93c123449bf3ccea5aede86bed8b37c80b4 Mon Sep 17 00:00:00 2001 From: Abhishek Ravi Date: Wed, 24 Apr 2019 14:29:17 -0700 Subject: [PATCH 2/7] Add support for complex selector - Added support for complex selectors. - Added unit test to check if is correctly mapped to a bean. --- .../drill/test/framework/DrillRMConfig.java | 23 +++++++++++++++++-- .../DrillTestFrameworkUnitTests.java | 17 ++++++++++++++ framework/src/test/resources/basic-rm.conf | 16 +++++++++---- framework/src/test/resources/default-rm.conf | 2 +- 4 files changed, 51 insertions(+), 7 deletions(-) diff --git a/framework/src/main/java/org/apache/drill/test/framework/DrillRMConfig.java b/framework/src/main/java/org/apache/drill/test/framework/DrillRMConfig.java index f3ba7b29c..e4102970e 100644 --- a/framework/src/main/java/org/apache/drill/test/framework/DrillRMConfig.java +++ b/framework/src/main/java/org/apache/drill/test/framework/DrillRMConfig.java @@ -36,6 +36,9 @@ public class DrillRMConfig implements DrillConfigRenderer { //Selector Configurations public static final String SELECTOR_TAG_KEY = "tag"; public static final String SELECTOR_ACL_KEY = "acl"; + public static final String SELECTOR_OR_KEY = "or"; + public static final String SELECTOR_AND_KEY = "and"; + public static final String SELECTOR_NOT_EQUAL_KEY = "not_equal"; //ACL Configurations public static final String ACL_USERS_KEY = "users"; @@ -74,8 +77,10 @@ public class DrillRMConfig implements DrillConfigRenderer { public static class SelectorConfig implements DrillConfigRenderer { public String tag; - public AclConfig acl; + public SelectorConfig not_equal; + public List or; + public List and; @Override public String render() { @@ -98,6 +103,21 @@ public String render(final int acc) { sb.append(formatConfig(nextAcc, SELECTOR_ACL_KEY, acl)); } + if (not_equal != null) { + ensureAtleastOneField = true; + sb.append(formatConfig(nextAcc, SELECTOR_NOT_EQUAL_KEY, not_equal)); + } + + if (or != null) { + ensureAtleastOneField = true; + sb.append(formatConfig(nextAcc, SELECTOR_OR_KEY, or)); + } + + if (and != null) { + ensureAtleastOneField = true; + sb.append(formatConfig(nextAcc, SELECTOR_AND_KEY, and)); + } + if(ensureAtleastOneField) { sb.deleteCharAt(sb.length() - 1) .deleteCharAt(sb.length() - 1) @@ -119,7 +139,6 @@ public String render(final int acc) { public static class AclConfig implements DrillConfigRenderer { public List users; - public List groups; @Override diff --git a/framework/src/test/java/org/apache/drill/test/framework/DrillTestFrameworkUnitTests.java b/framework/src/test/java/org/apache/drill/test/framework/DrillTestFrameworkUnitTests.java index 7897a5f51..c1552419e 100644 --- a/framework/src/test/java/org/apache/drill/test/framework/DrillTestFrameworkUnitTests.java +++ b/framework/src/test/java/org/apache/drill/test/framework/DrillTestFrameworkUnitTests.java @@ -21,6 +21,7 @@ import java.util.Properties; import static org.apache.drill.test.framework.DrillTestDefaults.DRILL_EXEC_RM_CONFIG_KEY; +import static org.apache.drill.test.framework.common.DrillTestNGDefaults.BASIC_RM_CONFIG_FILEPATH; import static org.apache.drill.test.framework.common.DrillTestNGDefaults.UNIT_GROUP; import static org.apache.drill.test.framework.common.DrillTestNGDefaults.PROD_RM_CONFIG_FILEPATH; @@ -96,6 +97,22 @@ public void testReadSampleRMConfigFile() { } } + /** + * Test reading a sample RM config file, with complex selectors, in to a Java Bean. + */ + @Test(groups = UNIT_GROUP) + public void testReadComplexSelectorsRMConfigFile() { + try { + DrillRMConfig drillRMConfig = DrillRMConfig.load(BASIC_RM_CONFIG_FILEPATH); + Assert.assertEquals(drillRMConfig.childPools.get(1).selector.or.size(), 2, + "Or selector should have had 2 children!"); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + /** * Negative test to validate the behavior when the config file does not exist. */ diff --git a/framework/src/test/resources/basic-rm.conf b/framework/src/test/resources/basic-rm.conf index 36653e0f9..935f27782 100644 --- a/framework/src/test/resources/basic-rm.conf +++ b/framework/src/test/resources/basic-rm.conf @@ -21,10 +21,18 @@ drill.exec.rm: { pool_name: "TestPool", memory: 0.5, selector: { - acl: { - users: ["bob"], - group: ["test"] - } + or: [ + { + acl: { + users: ["bob"] + } + }, + { + acl: { + groups: ["test"] + } + } + ] }, queue: { max_query_memory_per_node: 2147483648 // supported format regex [0-9]*[kKmMgG]? diff --git a/framework/src/test/resources/default-rm.conf b/framework/src/test/resources/default-rm.conf index 616cf7674..e1c9aa383 100644 --- a/framework/src/test/resources/default-rm.conf +++ b/framework/src/test/resources/default-rm.conf @@ -3,7 +3,7 @@ drill.exec.rm: { memory: 1.0, // 90% of total direct memory allocated to Drill queue_selection_policy: "bestfit", // policy to select queue for a query when multiple queues are eligible selector: { - tag: "test", + tag: "test" }, queue: { max_query_memory_per_node: 1073741824 // supported format regex [0-9]*[kKmMgG]? From 6b07e4ad6eec56c4c1b1d82a1fc348992b383eea Mon Sep 17 00:00:00 2001 From: Abhishek Ravi Date: Thu, 25 Apr 2019 10:07:26 -0700 Subject: [PATCH 3/7] Add a run script to run tests --- bin/run_testng | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100755 bin/run_testng diff --git a/bin/run_testng b/bin/run_testng new file mode 100755 index 000000000..2e1e9477b --- /dev/null +++ b/bin/run_testng @@ -0,0 +1,11 @@ +#export MAVEN_OPTS=$MAVEN_OPTS" -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006" +bin=`dirname "${BASH_SOURCE-$0}"` +bin=`cd "$bin">/dev/null; pwd` +cd ${bin}/../framework +source ../conf/drillTestConfig.properties +if [[ $1 == "-Dtest="* ]]; then + mvn test $* +else + echo "Running the entire Suite" + mvn test -Ptestng $* +fi From 15a5fbdf41c0c0fc6150116969a0187255766501 Mon Sep 17 00:00:00 2001 From: Abhishek Ravi Date: Thu, 25 Apr 2019 10:12:31 -0700 Subject: [PATCH 4/7] Add help to run script --- bin/run_testng | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bin/run_testng b/bin/run_testng index 2e1e9477b..8a742dcdb 100755 --- a/bin/run_testng +++ b/bin/run_testng @@ -5,6 +5,8 @@ cd ${bin}/../framework source ../conf/drillTestConfig.properties if [[ $1 == "-Dtest="* ]]; then mvn test $* +elif [[ $1 == "-h" || $1 == "-help" ]]; then + echo "bin/runtestng [-Dtest=[#]]" else echo "Running the entire Suite" mvn test -Ptestng $* From 0d509c9fd134f159fad516457d3226bd54ae9203 Mon Sep 17 00:00:00 2001 From: Abhishek Ravi Date: Thu, 25 Apr 2019 10:19:15 -0700 Subject: [PATCH 5/7] Minor cosmetic change --- .../org/apache/drill/test/framework/DrillQueryProfile.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/framework/src/main/java/org/apache/drill/test/framework/DrillQueryProfile.java b/framework/src/main/java/org/apache/drill/test/framework/DrillQueryProfile.java index d90e06ee9..4815b1036 100644 --- a/framework/src/main/java/org/apache/drill/test/framework/DrillQueryProfile.java +++ b/framework/src/main/java/org/apache/drill/test/framework/DrillQueryProfile.java @@ -227,8 +227,10 @@ public long getTotalOptimalMemoryPerOperator(final UserBitShared.CoreOperatorTyp * @return */ public long getOptimalMemoryPerOperatorPerNode(final UserBitShared.CoreOperatorType operator) { - return this.fragmentProfiles.stream().flatMap(f -> f.minorFragmentProfiles - .stream()) + return this.fragmentProfiles + .stream() + .flatMap(f -> f.minorFragmentProfiles + .stream()) .collect(Collectors.groupingBy(m -> m.endpoint.address)) .entrySet() .stream() From 483ee23527638998dfeb8e98c15c5dafe3094ff7 Mon Sep 17 00:00:00 2001 From: Abhishek Ravi Date: Thu, 25 Apr 2019 10:20:39 -0700 Subject: [PATCH 6/7] More minor cosmetic changes --- .../drill/test/framework/DrillQueryProfile.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/framework/src/main/java/org/apache/drill/test/framework/DrillQueryProfile.java b/framework/src/main/java/org/apache/drill/test/framework/DrillQueryProfile.java index 4815b1036..70947dec1 100644 --- a/framework/src/main/java/org/apache/drill/test/framework/DrillQueryProfile.java +++ b/framework/src/main/java/org/apache/drill/test/framework/DrillQueryProfile.java @@ -234,15 +234,14 @@ public long getOptimalMemoryPerOperatorPerNode(final UserBitShared.CoreOperatorT .collect(Collectors.groupingBy(m -> m.endpoint.address)) .entrySet() .stream() - .collect(Collectors - .toMap(Map.Entry::getKey, - e -> e.getValue() - .stream() - .flatMap(m -> m.operatorProfiles - .stream() - .filter(o -> o.operatorId == operator.getNumber())) - .mapToLong(o -> o.optimalMemAllocation) - .sum())) + .collect(Collectors.toMap(Map.Entry::getKey, + e -> e.getValue() + .stream() + .flatMap(m -> m.operatorProfiles + .stream() + .filter(o -> o.operatorId == operator.getNumber())) + .mapToLong(o -> o.optimalMemAllocation) + .sum())) .entrySet() .stream() .mapToLong(Map.Entry::getValue) From a566d6a4e9450df81283e038c23235c5e51b8bb0 Mon Sep 17 00:00:00 2001 From: Abhishek Ravi Date: Thu, 2 May 2019 12:02:30 -0700 Subject: [PATCH 7/7] Add more tests for Planner and Selector --- .../AdvancedSelectionTests.java | 143 ++++++++++ ...ionTests.java => BasicSelectionTests.java} | 59 +++- .../CustomSelectionTests.java | 105 +++++++ .../resourcemanagement/QueuePlannerTests.java | 266 ++++++++++++++++++ framework/src/test/resources/basic-rm.conf | 1 + framework/src/test/resources/prod-rm.conf | 10 +- framework/testng-java.xml | 4 +- 7 files changed, 580 insertions(+), 8 deletions(-) create mode 100644 framework/src/test/java/org/apache/drill/test/framework/resourcemanagement/AdvancedSelectionTests.java rename framework/src/test/java/org/apache/drill/test/framework/resourcemanagement/{QueueSelectionTests.java => BasicSelectionTests.java} (83%) create mode 100644 framework/src/test/java/org/apache/drill/test/framework/resourcemanagement/CustomSelectionTests.java diff --git a/framework/src/test/java/org/apache/drill/test/framework/resourcemanagement/AdvancedSelectionTests.java b/framework/src/test/java/org/apache/drill/test/framework/resourcemanagement/AdvancedSelectionTests.java new file mode 100644 index 000000000..f73cb4f0d --- /dev/null +++ b/framework/src/test/java/org/apache/drill/test/framework/resourcemanagement/AdvancedSelectionTests.java @@ -0,0 +1,143 @@ +package org.apache.drill.test.framework.resourcemanagement; + +import com.google.common.base.Preconditions; +import org.apache.drill.test.framework.*; +import org.apache.drill.test.framework.common.DrillJavaTestBase; +import org.apache.drill.test.framework.common.DrillTestNGDefaults; +import org.apache.log4j.Logger; +import org.testng.Assert; +import org.testng.annotations.*; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Properties; + +import static org.apache.drill.test.framework.DrillTestDefaults.DRILL_HOME; +import static org.apache.drill.test.framework.DrillTestDefaults.DRILL_RM_OVERRIDE_CONF_FILENAME; +import static org.apache.drill.test.framework.common.DrillTestNGDefaults.*; + +@SuppressWarnings("Duplicates") +@Test(groups = FUNCTIONAL_GROUP) +public class AdvancedSelectionTests extends DrillJavaTestBase { + private static final Logger LOG = Logger.getLogger(AdvancedSelectionTests.class); + + @BeforeClass(alwaysRun = true, description = "Invoked before all tests in the class") + private void setup() throws IOException { + cleanup(false); + DrillRMConfig config = DrillRMConfig.load(PROD_RM_CONFIG_FILEPATH); + Utils.applyRMConfigToDrillCluster(config, drillCluster); + Utils.restartDrillbits(drillCluster); + } + + @AfterClass(alwaysRun = true, description = "Invoked after all tests in the class are executed") + private void cleanup() { + cleanup(true); + } + + private void cleanup(final boolean restart) { + Preconditions.checkNotNull(connectionPool, + "Cleanup failed! Connection pool has not be instantiated"); + Preconditions.checkNotNull(drillCluster, + "Cleanup failed! Drill cluster information is unavailable"); + drillCluster.runCommand("rm -rf " + DRILL_HOME + "/conf/" + DRILL_RM_OVERRIDE_CONF_FILENAME); + if(restart) { + Utils.restartDrillbits(drillCluster); + } + } + + /** + * Test to validate that the query is not admitted since the specified user is restricted into the queue. + * + * @throws IOException + */ + @Test(groups = FUNCTIONAL_GROUP) + public void testRestrictUserShortNotation() throws IOException { + final String query = "SELECT o_orderkey " + + "FROM orders " + + "ORDER BY o_orderkey " + + "DESC limit 1"; + final Properties props = Utils.createConnectionProperties(TPCH_01_PARQUET_SCHEMA, + null, null); //NO Query Tags + + try(Connection conn = ConnectionPool + .createConnection(DrillTestNGDefaults.CONNECTION_URL_FOR_DRILLBIT(drillCluster.getHosts().get(0)), + "jane", + null, + props); + Statement stmt = conn.createStatement(); + ResultSet res = stmt.executeQuery(query)) { + final String queryId = Utils.getQueryID(res); //Get query id + LOG.info("Query ID: " + queryId + ", Query: " + query); + + Utils.getQueryProfile(queryId); //Get query profile + Assert.fail("Did not receive expected exception: " + NO_RESOURCE_POOL_ERROR); + } catch (Exception e) { + if (e.getMessage().contains(NO_RESOURCE_POOL_ERROR)) { + LOG.info("Received expected exception: " + e.getMessage()); + } else { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + } + + /** + * Test validates that a hierarchical resource pool notation configuration is read correctly. + * It also validates that the right leaf queue is picked based on Acl and Tag configuration. + * + * @throws IOException + */ + @Test(groups = FUNCTIONAL_GROUP) + public void testAclComplexUserAndTag() throws IOException { + final String query = "SELECT o_orderkey " + + "FROM orders " + + "ORDER BY o_orderkey " + + "DESC limit 1"; + + //Set expectations + final long expectedOrderId = 60000; + final String expectedPoolName = "ExperimentalQueryResourcePool"; + final int expectedRowCount = 1; + final String expectedUser = "alice"; + final String queryTags = "experimental"; + + //Build a connection with only schema + final Properties props = Utils.createConnectionProperties( + TPCH_01_PARQUET_SCHEMA, null, queryTags); + + try(Connection conn = ConnectionPool + .createConnection( + DrillTestNGDefaults.CONNECTION_URL_FOR_DRILLBIT(drillCluster.getHosts().get(0)), + expectedUser, //Provide username for the connection + null, + props); + + Statement stmt = conn.createStatement(); + ResultSet res = stmt.executeQuery(query)) { + final String queryId = Utils.getQueryID(res); //Get query id + LOG.info("Query ID: " + queryId + ", Query: " + query); + + final DrillQueryProfile queryProfile = Utils.getQueryProfile(queryId); //Get query profile + + //Validate that the query was allowed into the queue + Assert.assertEquals(queryProfile.queueName, expectedPoolName, "The pool names do not match!"); + LOG.info("QueryID: " + queryId + ", Queue: " + queryProfile.queueName); + + long rowCount = 0; + while(res.next()) { + rowCount++; + Assert.assertEquals(res.getLong("o_orderkey"), expectedOrderId, + "OrderId expected did not match"); + + } + Assert.assertEquals(rowCount, expectedRowCount, "Number of rows returned did not match!"); + Assert.assertEquals(queryProfile.user, expectedUser); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } +} + diff --git a/framework/src/test/java/org/apache/drill/test/framework/resourcemanagement/QueueSelectionTests.java b/framework/src/test/java/org/apache/drill/test/framework/resourcemanagement/BasicSelectionTests.java similarity index 83% rename from framework/src/test/java/org/apache/drill/test/framework/resourcemanagement/QueueSelectionTests.java rename to framework/src/test/java/org/apache/drill/test/framework/resourcemanagement/BasicSelectionTests.java index 79245e8bf..6e41167f0 100644 --- a/framework/src/test/java/org/apache/drill/test/framework/resourcemanagement/QueueSelectionTests.java +++ b/framework/src/test/java/org/apache/drill/test/framework/resourcemanagement/BasicSelectionTests.java @@ -20,8 +20,8 @@ @SuppressWarnings("Duplicates") @Test(groups = FUNCTIONAL_GROUP) -public class QueueSelectionTests extends DrillJavaTestBase { - private static final Logger LOG = Logger.getLogger(QueueSelectionTests.class); +public class BasicSelectionTests extends DrillJavaTestBase { + private static final Logger LOG = Logger.getLogger(BasicSelectionTests.class); @BeforeClass(alwaysRun = true, description = "Invoked before all tests in the class") private void setup() throws IOException { @@ -286,4 +286,59 @@ public void testUnknownTagDoesNotAllowQuery() { } } } + + /** + * Test validates that user in Acl is evaluated and the right queue is picked based on the Acl. + * + * @throws IOException + */ + @Test(groups = FUNCTIONAL_GROUP) + public void testAclComplexUserOr() throws IOException { + final String query = "SELECT o_orderkey " + + "FROM orders " + + "ORDER BY o_orderkey " + + "DESC limit 1"; + + //Set expectations + final long expectedOrderId = 60000; + final String expectedPoolName = "TestPool"; + final int expectedRowCount = 1; + final String expectedUser = "alice"; + + //Build a connection with only schema + final Properties props = Utils.createConnectionProperties( + TPCH_01_PARQUET_SCHEMA, null, null); + + try(Connection conn = ConnectionPool + .createConnection( + DrillTestNGDefaults.CONNECTION_URL_FOR_DRILLBIT(drillCluster.getHosts().get(0)), + expectedUser, //Provide username for the connection + null, + props); + + Statement stmt = conn.createStatement(); + ResultSet res = stmt.executeQuery(query)) { + final String queryId = Utils.getQueryID(res); //Get query id + LOG.info("Query ID: " + queryId + ", Query: " + query); + + final DrillQueryProfile queryProfile = Utils.getQueryProfile(queryId); //Get query profile + + //Validate that the query was allowed into the queue + Assert.assertEquals(queryProfile.queueName, expectedPoolName, "The pool names do not match!"); + LOG.info("QueryID: " + queryId + ", Queue: " + queryProfile.queueName); + + long rowCount = 0; + while(res.next()) { + rowCount++; + Assert.assertEquals(res.getLong("o_orderkey"), expectedOrderId, + "OrderId expected did not match"); + + } + Assert.assertEquals(rowCount, expectedRowCount, "Number of rows returned did not match!"); + Assert.assertEquals(queryProfile.user, expectedUser); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } } diff --git a/framework/src/test/java/org/apache/drill/test/framework/resourcemanagement/CustomSelectionTests.java b/framework/src/test/java/org/apache/drill/test/framework/resourcemanagement/CustomSelectionTests.java new file mode 100644 index 000000000..ab35e7d03 --- /dev/null +++ b/framework/src/test/java/org/apache/drill/test/framework/resourcemanagement/CustomSelectionTests.java @@ -0,0 +1,105 @@ +package org.apache.drill.test.framework.resourcemanagement; + +import com.google.common.base.Preconditions; +import org.apache.drill.test.framework.ConnectionPool; +import org.apache.drill.test.framework.DrillQueryProfile; +import org.apache.drill.test.framework.DrillRMConfig; +import org.apache.drill.test.framework.Utils; +import org.apache.drill.test.framework.common.DrillJavaTestBase; +import org.apache.drill.test.framework.common.DrillTestNGDefaults; +import org.apache.log4j.Logger; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Properties; + +import static org.apache.drill.test.framework.DrillTestDefaults.DRILL_HOME; +import static org.apache.drill.test.framework.DrillTestDefaults.DRILL_RM_OVERRIDE_CONF_FILENAME; +import static org.apache.drill.test.framework.common.DrillTestNGDefaults.DEFAULT_RM_CONFIG_FILEPATH; +import static org.apache.drill.test.framework.common.DrillTestNGDefaults.FUNCTIONAL_GROUP; +import static org.apache.drill.test.framework.common.DrillTestNGDefaults.TPCH_01_PARQUET_SCHEMA; + +@Test(groups = FUNCTIONAL_GROUP) +public class CustomSelectionTests extends DrillJavaTestBase { + private static final Logger LOG = Logger.getLogger(CustomSelectionTests.class); + + @BeforeClass(alwaysRun = true, description = "Invoked before all tests in the class") + private void setup() throws IOException { + cleanup(true); + } + + @AfterMethod(alwaysRun = true, description = "Invoked after all tests in the class are executed") + private void cleanup() { + cleanup(true); + } + + private void cleanup(final boolean restart) { + Preconditions.checkNotNull(connectionPool, + "Cleanup failed! Connection pool has not be instantiated"); + Preconditions.checkNotNull(drillCluster, + "Cleanup failed! Drill cluster information is unavailable"); + drillCluster.runCommand("rm -rf " + DRILL_HOME + "/conf/" + DRILL_RM_OVERRIDE_CONF_FILENAME); + if(restart) { + Utils.restartDrillbits(drillCluster); + } + } + + /** + * Test validates that a "*" for "users" will allow all users to run query. + * + * @throws IOException + */ + @Test(groups = FUNCTIONAL_GROUP) + public void testStarToAllowUser() throws IOException { + final String query = "SELECT o_orderkey, o_totalprice, o_comment FROM orders"; + final String expectedPoolName = "root"; + final String expectedUserName = "anonymous"; + + DrillRMConfig config = DrillRMConfig.load(DEFAULT_RM_CONFIG_FILEPATH); + config.selector = new DrillRMConfig.SelectorConfig(); + config.selector.acl = new DrillRMConfig.AclConfig(); + config.selector.acl.users = new ArrayList<>(); + config.selector.acl.users.add("*"); + + LOG.info("drill.exec.rm: " + config.render()); + + Utils.applyRMConfigToDrillCluster(config, drillCluster); + Utils.restartDrillbits(drillCluster); + + final Properties props = Utils.createConnectionProperties( + TPCH_01_PARQUET_SCHEMA, null, null); + + try(Connection conn = ConnectionPool + .createConnection(DrillTestNGDefaults.CONNECTION_URL_FOR_DRILLBIT( + drillCluster.getHosts().get(0)), + expectedUserName, + null, + props); //Create a connection based on hostname and properties + + Statement stmt = conn.createStatement(); + ResultSet res = stmt.executeQuery(query)) { + final String queryId = Utils.getQueryID(res); //Get query id + LOG.info("QueryID: " + queryId + ", Query: " + query); + + final DrillQueryProfile queryProfile = Utils.getQueryProfile(queryId); //Get query profile + LOG.info("QueryID: " + queryId + ", QueueName: " + queryProfile.queueName + ", User: " + queryProfile.user); + + //Validate that the query was allowed into the queue + Assert.assertEquals(queryProfile.queueName, expectedPoolName, + "QueryID: " + queryId + " - The pool names do not match!"); + + Assert.assertEquals(queryProfile.user, expectedUserName, + "QueryID: " + queryId + " - The user names do not match!"); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } +} diff --git a/framework/src/test/java/org/apache/drill/test/framework/resourcemanagement/QueuePlannerTests.java b/framework/src/test/java/org/apache/drill/test/framework/resourcemanagement/QueuePlannerTests.java index 62156d9c6..e449afe6f 100644 --- a/framework/src/test/java/org/apache/drill/test/framework/resourcemanagement/QueuePlannerTests.java +++ b/framework/src/test/java/org/apache/drill/test/framework/resourcemanagement/QueuePlannerTests.java @@ -137,4 +137,270 @@ public void testLargerThanOptimalProject() throws IOException { Assert.fail(e.getMessage()); } } + + /** + * Test validates that if max_query_memory_per_node is set to a value equal to + * optimal memory estimated by RM planner, the query runs to completion in the queue. + * + * Test also validates that the estimation for optimal memory required does NOT change. + * + * FIXME: Currently the test fails, bug in RM Planning, enable once it is resolved. + * + * @throws IOException + */ + @Test(groups = FUNCTIONAL_GROUP, enabled = false) + public void testEqualToOptimalProject() throws IOException { + final String query = "SELECT o_orderkey, o_totalprice, o_comment FROM orders"; + final String queryTag = "test"; //Set tag such that both dev and test are eligible + final String expectedPoolName = "root"; + long expectedOptimalMemoryPerNode = 0; //initialize + long expectedRowCount = 0; //initialize + + final Properties props = Utils.createConnectionProperties( + TPCH_01_PARQUET_SCHEMA, null, queryTag); + + try(Connection conn = ConnectionPool + .createConnection(DrillTestNGDefaults.CONNECTION_URL_FOR_DRILLBIT( + drillCluster.getHosts().get(0)), + null, + null, + props); //Create a connection based on hostname and properties + + Statement stmt = conn.createStatement(); + ResultSet res = stmt.executeQuery(query)) { + final String queryId = Utils.getQueryID(res); //Get query id + LOG.info("QueryID: " + queryId + ", Query: " + query); + + final DrillQueryProfile queryProfile = Utils.getQueryProfile(queryId); //Get query profile + expectedOptimalMemoryPerNode = queryProfile.getTotalOptimalMemoryPerNode(); + LOG.info("QueryID: " + queryId + ", Estimated total optimal memory per node: " + + expectedOptimalMemoryPerNode); + + DrillRMConfig config = DrillRMConfig.load(DEFAULT_RM_CONFIG_FILEPATH); + config.queue.maxQueryMemoryPerNode = (long)(queryProfile.getTotalOptimalMemoryPerNode()); + + LOG.info("Setting max_query_memory_per_node: " + config.queue.maxQueryMemoryPerNode); + + Utils.applyRMConfigToDrillCluster(config, drillCluster); + Utils.restartDrillbits(drillCluster); + + long rowCount = 0; + while(res.next()) { + rowCount++; + } + expectedRowCount = rowCount; + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + + try(Connection conn = ConnectionPool + .createConnection(DrillTestNGDefaults.CONNECTION_URL_FOR_DRILLBIT( + drillCluster.getHosts().get(0)), + null, + null, + props); //Create a connection based on hostname and properties + + Statement stmt = conn.createStatement(); + ResultSet res = stmt.executeQuery(query)) { + + final String queryId = Utils.getQueryID(res); //Get query id + LOG.info("QueryID: " + queryId + ", Query: " + query); + + final DrillQueryProfile queryProfile = Utils.getQueryProfile(queryId); //Get query profile + //Validate that the query was allowed into the queue + Assert.assertEquals(queryProfile.queueName, expectedPoolName, + "QueryID: " + queryId + " - The pool names do not match!"); + + Assert.assertEquals(queryProfile.getTotalOptimalMemoryPerNode(), + expectedOptimalMemoryPerNode, + "Optimal memory per node estimation did not match"); + + long rowCount = 0; + while(res.next()) { + rowCount++; + } + Assert.assertEquals(rowCount, expectedRowCount, "Number of rows returned did not match!"); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + /** + * Test validates that if max_query_memory_per_node is set to a value + * greater than optimal memory estimated by RM planner, + * the query runs to completion in the queue. + * + * Test also validates that the estimation for optimal memory required does NOT change. + * + * @throws IOException + */ + @Test(groups = FUNCTIONAL_GROUP) + public void testLargerThanOptimalOrderBy() throws IOException { + final String query = "SELECT o_orderkey, o_totalprice, o_comment FROM orders ORDER BY o_orderkey "; + final String queryTag = "test"; //Set tag such that both dev and test are eligible + final String expectedPoolName = "root"; + long expectedOptimalMemoryPerNode = 0; //initialize + long expectedRowCount = 0; //initialize + + final Properties props = Utils.createConnectionProperties( + TPCH_01_PARQUET_SCHEMA, null, queryTag); + + try(Connection conn = ConnectionPool + .createConnection(DrillTestNGDefaults.CONNECTION_URL_FOR_DRILLBIT( + drillCluster.getHosts().get(0)), + null, + null, + props); //Create a connection based on hostname and properties + + Statement stmt = conn.createStatement(); + ResultSet res = stmt.executeQuery(query)) { + final String queryId = Utils.getQueryID(res); //Get query id + LOG.info("QueryID: " + queryId + ", Query: " + query); + + final DrillQueryProfile queryProfile = Utils.getQueryProfile(queryId); //Get query profile + expectedOptimalMemoryPerNode = queryProfile.getTotalOptimalMemoryPerNode(); + LOG.info("QueryID: " + queryId + ", Estimated total optimal memory per node: " + + expectedOptimalMemoryPerNode); + + DrillRMConfig config = DrillRMConfig.load(DEFAULT_RM_CONFIG_FILEPATH); + config.queue.maxQueryMemoryPerNode = (long)(queryProfile.getTotalOptimalMemoryPerNode() * 1.2); + + LOG.info("Setting max_query_memory_per_node: " + config.queue.maxQueryMemoryPerNode); + + Utils.applyRMConfigToDrillCluster(config, drillCluster); + Utils.restartDrillbits(drillCluster); + + long rowCount = 0; + while(res.next()) { + rowCount++; + } + expectedRowCount = rowCount; + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + + try(Connection conn = ConnectionPool + .createConnection(DrillTestNGDefaults.CONNECTION_URL_FOR_DRILLBIT( + drillCluster.getHosts().get(0)), + null, + null, + props); //Create a connection based on hostname and properties + + Statement stmt = conn.createStatement(); + ResultSet res = stmt.executeQuery(query)) { + + final String queryId = Utils.getQueryID(res); //Get query id + LOG.info("QueryID: " + queryId + ", Query: " + query); + + final DrillQueryProfile queryProfile = Utils.getQueryProfile(queryId); //Get query profile + //Validate that the query was allowed into the queue + Assert.assertEquals(queryProfile.queueName, expectedPoolName, + "QueryID: " + queryId + " - The pool names do not match!"); + + Assert.assertEquals(queryProfile.getTotalOptimalMemoryPerNode(), + expectedOptimalMemoryPerNode, + "Optimal memory per node estimation did not match"); + + long rowCount = 0; + while(res.next()) { + rowCount++; + } + Assert.assertEquals(rowCount, expectedRowCount, "Number of rows returned did not match!"); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + /** + * Test validates that if max_query_memory_per_node is set to a value equal to + * optimal memory estimated by RM planner, the query runs to completion in the queue. + * + * Test also validates that the estimation for optimal memory required does NOT change. + * + * FIXME: Currently the test fails, bug in RM Planning, enable once it is resolved. + * + * @throws IOException + */ + @Test(groups = FUNCTIONAL_GROUP, enabled = false) + public void testEqualToOptimalOrderBy() throws IOException { + final String query = "SELECT o_orderkey, o_totalprice, o_comment FROM orders ORDER BY o_orderkey"; + final String queryTag = "test"; //Set tag such that both dev and test are eligible + final String expectedPoolName = "root"; + long expectedOptimalMemoryPerNode = 0; //initialize + long expectedRowCount = 0; //initialize + + final Properties props = Utils.createConnectionProperties( + TPCH_01_PARQUET_SCHEMA, null, queryTag); + + try(Connection conn = ConnectionPool + .createConnection(DrillTestNGDefaults.CONNECTION_URL_FOR_DRILLBIT( + drillCluster.getHosts().get(0)), + null, + null, + props); //Create a connection based on hostname and properties + + Statement stmt = conn.createStatement(); + ResultSet res = stmt.executeQuery(query)) { + final String queryId = Utils.getQueryID(res); //Get query id + LOG.info("QueryID: " + queryId + ", Query: " + query); + + final DrillQueryProfile queryProfile = Utils.getQueryProfile(queryId); //Get query profile + expectedOptimalMemoryPerNode = queryProfile.getTotalOptimalMemoryPerNode(); + LOG.info("QueryID: " + queryId + ", Estimated total optimal memory per node: " + + expectedOptimalMemoryPerNode); + + DrillRMConfig config = DrillRMConfig.load(DEFAULT_RM_CONFIG_FILEPATH); + config.queue.maxQueryMemoryPerNode = (long)(queryProfile.getTotalOptimalMemoryPerNode()); + + LOG.info("Setting max_query_memory_per_node: " + config.queue.maxQueryMemoryPerNode); + + Utils.applyRMConfigToDrillCluster(config, drillCluster); + Utils.restartDrillbits(drillCluster); + + long rowCount = 0; + while(res.next()) { + rowCount++; + } + expectedRowCount = rowCount; + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + + try(Connection conn = ConnectionPool + .createConnection(DrillTestNGDefaults.CONNECTION_URL_FOR_DRILLBIT( + drillCluster.getHosts().get(0)), + null, + null, + props); //Create a connection based on hostname and properties + + Statement stmt = conn.createStatement(); + ResultSet res = stmt.executeQuery(query)) { + + final String queryId = Utils.getQueryID(res); //Get query id + LOG.info("QueryID: " + queryId + ", Query: " + query); + + final DrillQueryProfile queryProfile = Utils.getQueryProfile(queryId); //Get query profile + //Validate that the query was allowed into the queue + Assert.assertEquals(queryProfile.queueName, expectedPoolName, + "QueryID: " + queryId + " - The pool names do not match!"); + + Assert.assertEquals(queryProfile.getTotalOptimalMemoryPerNode(), + expectedOptimalMemoryPerNode, + "Optimal memory per node estimation did not match"); + + long rowCount = 0; + while(res.next()) { + rowCount++; + } + Assert.assertEquals(rowCount, expectedRowCount, "Number of rows returned did not match!"); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } } diff --git a/framework/src/test/resources/basic-rm.conf b/framework/src/test/resources/basic-rm.conf index 935f27782..1450003cf 100644 --- a/framework/src/test/resources/basic-rm.conf +++ b/framework/src/test/resources/basic-rm.conf @@ -29,6 +29,7 @@ drill.exec.rm: { }, { acl: { + users: ["alice"], groups: ["test"] } } diff --git a/framework/src/test/resources/prod-rm.conf b/framework/src/test/resources/prod-rm.conf index 6af836a15..4eae8a851 100644 --- a/framework/src/test/resources/prod-rm.conf +++ b/framework/src/test/resources/prod-rm.conf @@ -9,7 +9,7 @@ drill.exec.rm:{ selector:{ acl:{ groups:["dev","test","sales:-"] - users:["alice","john"] + users:["alice","john","jane:-"] } }, child_pools:[ @@ -20,7 +20,7 @@ drill.exec.rm:{ tag:"small" }, queue:{ - max_query_memory_per_node:6881, + max_query_memory_per_node:1073741824, max_waiting:5, max_wait_timeout:2000 } @@ -32,7 +32,7 @@ drill.exec.rm:{ tag:"large" }, queue:{ - max_query_memory_per_node:5505, + max_query_memory_per_node:2147483648, max_waiting:2 } }, @@ -43,7 +43,7 @@ drill.exec.rm:{ tag :"experimental" }, queue:{ - max_query_memory_per_node:4587, + max_query_memory_per_node:1073741824, max_waiting:4 } } @@ -59,7 +59,7 @@ drill.exec.rm:{ } }, queue:{ - max_query_memory_per_node:4587, + max_query_memory_per_node:2147483648, max_waiting:10, wait_for_preferred_nodes:false } diff --git a/framework/testng-java.xml b/framework/testng-java.xml index 8c97ae543..03d411d29 100644 --- a/framework/testng-java.xml +++ b/framework/testng-java.xml @@ -3,7 +3,9 @@ - + + +