Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ void initPropertiesAndConfigs() {
// Injection of equality testing code of logical plan (de)serialization
conf.setClassVar(ConfVars.LOGICAL_PLAN_REWRITE_RULE_PROVIDER_CLASS, LogicalPlanTestRuleProvider.class);
conf.setClassVar(ConfVars.GLOBAL_PLAN_REWRITE_RULE_PROVIDER_CLASS, GlobalPlanTestRuleProvider.class);
conf.setLongVar(ConfVars.$DIST_QUERY_BROADCAST_CROSS_JOIN_THRESHOLD, 1024 * 1024); // 1GB
conf.setLongVar(ConfVars.$DIST_QUERY_BROADCAST_CROSS_JOIN_LIMIT, 1024 * 1024); // 1GB

conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES.varname, 4);
conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB.varname, 2000);
Expand Down
16 changes: 8 additions & 8 deletions tajo-common/src/main/java/org/apache/tajo/SessionVars.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ public enum SessionVars implements ConfigKey {
Validators.bool()),

// for distributed query strategies
BROADCAST_NON_CROSS_JOIN_THRESHOLD(ConfVars.$DIST_QUERY_BROADCAST_NON_CROSS_JOIN_THRESHOLD,
BROADCAST_NON_CROSS_JOIN_THRESHOLD(ConfVars.$DIST_QUERY_BROADCAST_NON_CROSS_JOIN_LIMIT,
"restriction for the total size of broadcasted table for non-cross join (kb)", DEFAULT, Long.class,
Validators.min("0")),
BROADCAST_CROSS_JOIN_THRESHOLD(ConfVars.$DIST_QUERY_BROADCAST_CROSS_JOIN_THRESHOLD,
BROADCAST_CROSS_JOIN_THRESHOLD(ConfVars.$DIST_QUERY_BROADCAST_CROSS_JOIN_LIMIT,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of the above two session variables also need to be changed.

"restriction for the total size of broadcasted table for cross join (kb)", DEFAULT, Long.class,
Validators.min("0")),

Expand Down Expand Up @@ -125,13 +125,13 @@ public enum SessionVars implements ConfigKey {
Integer.class, Validators.min("0")),
HASH_SHUFFLE_BUFFER_SIZE(ConfVars.$EXECUTOR_HASH_SHUFFLE_BUFFER_SIZE, "hash-shuffle buffer size for local disk I/O (mb)"
, DEFAULT, Integer.class, Validators.min("1")),
HASH_JOIN_SIZE_LIMIT(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD, "limited size for hash join (mb)", DEFAULT,
HASH_JOIN_SIZE_LIMIT(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_LIMIT, "limited size for hash join (mb)", DEFAULT,
Long.class, Validators.min("0")),
INNER_HASH_JOIN_SIZE_LIMIT(ConfVars.$EXECUTOR_INNER_HASH_JOIN_SIZE_THRESHOLD,
INNER_HASH_JOIN_SIZE_LIMIT(ConfVars.$EXECUTOR_INNER_HASH_JOIN_SIZE_LIMIT,
"limited size for hash inner join (mb)", DEFAULT, Long.class, Validators.min("0")),
OUTER_HASH_JOIN_SIZE_LIMIT(ConfVars.$EXECUTOR_OUTER_HASH_JOIN_SIZE_THRESHOLD, "limited size for hash outer join (mb)",
OUTER_HASH_JOIN_SIZE_LIMIT(ConfVars.$EXECUTOR_OUTER_HASH_JOIN_SIZE_LIMIT, "limited size for hash outer join (mb)",
DEFAULT, Long.class, Validators.min("0")),
HASH_GROUPBY_SIZE_LIMIT(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD, "limited size for hash groupby (mb)",
HASH_GROUPBY_SIZE_LIMIT(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_LIMIT, "limited size for hash groupby (mb)",
DEFAULT, Long.class, Validators.min("0")),
MAX_OUTPUT_FILE_SIZE(ConfVars.$MAX_OUTPUT_FILE_SIZE, "Maximum per-output file size (mb). 0 means infinite.", DEFAULT,
Long.class, Validators.min("0")),
Expand All @@ -145,7 +145,7 @@ public enum SessionVars implements ConfigKey {

// for index
INDEX_ENABLED(ConfVars.$INDEX_ENABLED, "index scan enabled", DEFAULT),
INDEX_SELECTIVITY_THRESHOLD(ConfVars.$INDEX_SELECTIVITY_THRESHOLD, "the selectivity threshold for index scan",
INDEX_SELECTIVITY_LIMIT(ConfVars.$INDEX_SELECTIVITY_LIMIT, "the selectivity threshold for index scan",
DEFAULT),

// for partition overwrite
Expand Down Expand Up @@ -176,7 +176,7 @@ public enum SessionVars implements ConfigKey {
TEST_FILTER_PUSHDOWN_ENABLED(ConfVars.$TEST_FILTER_PUSHDOWN_ENABLED, "filter push down enabled", TEST_VAR),
TEST_MIN_TASK_NUM(ConfVars.$TEST_MIN_TASK_NUM, "(test only) min task num", TEST_VAR),
TEST_PLAN_SHAPE_FIX_ENABLED(ConfVars.$TEST_PLAN_SHAPE_FIX_ENABLED, "(test only) plan shape fix enabled", TEST_VAR),
TEST_TIM_SORT_THRESHOLD_FOR_RADIX_SORT(ConfVars.$TEST_TIM_SORT_THRESHOLD_FOR_RADIX_SORT, "(test only) Tim sort threshold for radix sort", TEST_VAR)
TEST_TIM_SORT_LIMIT_FOR_RADIX_SORT(ConfVars.$TEST_TIM_SORT_LIMIT_FOR_RADIX_SORT, "(test only) Tim sort threshold for radix sort", TEST_VAR)
;

public static final Map<String, SessionVars> SESSION_VARS = Maps.newHashMap();
Expand Down
18 changes: 9 additions & 9 deletions tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public static enum ConfVars implements ConfigKey {
WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM("tajo.worker.resource.disk.parallel-execution.num", 2,
Validators.min("1")),

WORKER_HEARTBEAT_QUEUE_THRESHOLD_RATE("tajo.worker.heartbeat.queue.threshold-rate", 0.1f, Validators.min("0")),//10%
WORKER_HEARTBEAT_QUEUE_LIMIT_RATE("tajo.worker.heartbeat.queue.limit-rate", 0.1f, Validators.min("0")),//10%
WORKER_HEARTBEAT_IDLE_INTERVAL("tajo.worker.heartbeat.idle.interval", 10 * 1000), // 10 sec
WORKER_HEARTBEAT_ACTIVE_INTERVAL("tajo.worker.heartbeat.active.interval", 1000), // 1 sec

Expand Down Expand Up @@ -333,9 +333,9 @@ public static enum ConfVars implements ConfigKey {


// for distributed query strategies
$DIST_QUERY_BROADCAST_NON_CROSS_JOIN_THRESHOLD("tajo.dist-query.broadcast.non-cross-join.threshold-kb", 5 * 1024l,
$DIST_QUERY_BROADCAST_NON_CROSS_JOIN_LIMIT("tajo.dist-query.broadcast.non-cross-join.limit-kb", 5 * 1024l,
Validators.min("0")), // 5 MB
$DIST_QUERY_BROADCAST_CROSS_JOIN_THRESHOLD("tajo.dist-query.broadcast.cross-join.threshold-kb", 1 * 1024l,
$DIST_QUERY_BROADCAST_CROSS_JOIN_LIMIT("tajo.dist-query.broadcast.cross-join.limit-kb", 1 * 1024l,
Validators.min("0")), // 1 MB

$DIST_QUERY_JOIN_TASK_VOLUME("tajo.dist-query.join.task-volume-mb", 64),
Expand All @@ -351,12 +351,12 @@ public static enum ConfVars implements ConfigKey {

// for physical Executors
$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE("tajo.executor.external-sort.buffer-mb", 200),
$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.common.in-memory-hash-threshold-mb", 64l, Validators.min("0")),
$EXECUTOR_INNER_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.inner.in-memory-hash-threshold-mb", 64l,
$EXECUTOR_HASH_JOIN_SIZE_LIMIT("tajo.executor.join.common.in-memory-hash-limit-mb", 64l, Validators.min("0")),
$EXECUTOR_INNER_HASH_JOIN_SIZE_LIMIT("tajo.executor.join.inner.in-memory-hash-limit-mb", 64l,
Validators.min("0")),
$EXECUTOR_OUTER_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.outer.in-memory-hash-threshold-mb", 64l,
$EXECUTOR_OUTER_HASH_JOIN_SIZE_LIMIT("tajo.executor.join.outer.in-memory-hash-limit-mb", 64l,
Validators.min("0")),
$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD("tajo.executor.groupby.in-memory-hash-threshold-mb", 64l,
$EXECUTOR_GROUPBY_INMEMORY_HASH_LIMIT("tajo.executor.groupby.in-memory-hash-limit-mb", 64l,
Validators.min("0")),
$EXECUTOR_HASH_SHUFFLE_BUFFER_SIZE("tajo.executor.hash-shuffle.buffer-mb", 100, Validators.min("1")),
$MAX_OUTPUT_FILE_SIZE("tajo.query.max-outfile-size-mb", 0), // zero means infinite
Expand All @@ -368,7 +368,7 @@ public static enum ConfVars implements ConfigKey {

// for index
$INDEX_ENABLED("tajo.query.index.enabled", false),
$INDEX_SELECTIVITY_THRESHOLD("tajo.query.index.selectivity.threshold", 0.05f),
$INDEX_SELECTIVITY_LIMIT("tajo.query.index.selectivity.limit", 0.05f),

// Client -----------------------------------------------------------------
$CLIENT_SESSION_EXPIRY_TIME("tajo.client.session.expiry-time-sec", 3600), // default time is one hour.
Expand Down Expand Up @@ -397,7 +397,7 @@ public static enum ConfVars implements ConfigKey {
$TEST_FILTER_PUSHDOWN_ENABLED("tajo.test.plan.filter-pushdown.enabled", true),
$TEST_MIN_TASK_NUM("tajo.test.min-task-num", -1),
$TEST_PLAN_SHAPE_FIX_ENABLED("tajo.test.plan.shape.fix.enabled", false), // used for explain statement test
$TEST_TIM_SORT_THRESHOLD_FOR_RADIX_SORT("tajo.test.executor.radix-sort.tim-sort-threshold", 65536),
$TEST_TIM_SORT_LIMIT_FOR_RADIX_SORT("tajo.test.executor.radix-sort.tim-sort-limit", 65536),

// Behavior Control ---------------------------------------------------------
$BEHAVIOR_ARITHMETIC_ABORT("tajo.behavior.arithmetic-abort", false),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class TestRadixSort {

static {
queryContext = new QueryContext(new TajoConf());
queryContext.setInt(SessionVars.TEST_TIM_SORT_THRESHOLD_FOR_RADIX_SORT, 0);
queryContext.setInt(SessionVars.TEST_TIM_SORT_LIMIT_FOR_RADIX_SORT, 0);

schema = SchemaBuilder.builder().addAll(new Column[]{
new Column("col0", Type.INT8),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public TestIndexScan() throws ServiceException, SQLException, NoSuchSessionVaria
super(TajoConstants.DEFAULT_DATABASE_NAME);
Map<String,String> sessionVars = new HashMap<>();
sessionVars.put(SessionVars.INDEX_ENABLED.keyname(), "true");
sessionVars.put(SessionVars.INDEX_SELECTIVITY_THRESHOLD.keyname(), "0.01f");
sessionVars.put(SessionVars.INDEX_SELECTIVITY_LIMIT.keyname(), "0.01f");
client.updateSessionVariables(sessionVars);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,46 +58,46 @@ public TestJoinQuery(String joinOption) throws Exception {
super(TajoConstants.DEFAULT_DATABASE_NAME, joinOption);

testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_BROADCAST_JOIN_ENABLED.varname, "true");
testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_NON_CROSS_JOIN_THRESHOLD.varname,
testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_NON_CROSS_JOIN_LIMIT.varname,
"" + 5);
testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_CROSS_JOIN_THRESHOLD.varname,
testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_CROSS_JOIN_LIMIT.varname,
1024 * 1024 + "");

testingCluster.setAllTajoDaemonConfValue(
ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname,
ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.defaultVal);
ConfVars.$EXECUTOR_HASH_JOIN_SIZE_LIMIT.varname,
ConfVars.$EXECUTOR_HASH_JOIN_SIZE_LIMIT.defaultVal);

testingCluster.setAllTajoDaemonConfValue(ConfVars.$JOIN_HASH_TABLE_SIZE.keyname(), "100");

testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname,
ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.defaultVal);
testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.varname,
ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.defaultVal);
testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_LIMIT.varname,
ConfVars.$EXECUTOR_HASH_JOIN_SIZE_LIMIT.defaultVal);
testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_LIMIT.varname,
ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_LIMIT.defaultVal);

if (joinOption.indexOf("NoBroadcast") >= 0) {
testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_CROSS_JOIN_THRESHOLD.varname,
testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_CROSS_JOIN_LIMIT.varname,
1024 * 1024 + "");
testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_BROADCAST_JOIN_ENABLED.varname, "false");
}

if (joinOption.indexOf("Hash") >= 0) {
testingCluster.setAllTajoDaemonConfValue(
ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname, String.valueOf(256));
testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname,
ConfVars.$EXECUTOR_HASH_JOIN_SIZE_LIMIT.varname, String.valueOf(256));
testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_LIMIT.varname,
String.valueOf(256));
testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.varname,
testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_LIMIT.varname,
String.valueOf(256));
testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_CROSS_JOIN_THRESHOLD.varname,
testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_CROSS_JOIN_LIMIT.varname,
1024 * 1024 + "");
}
if (joinOption.indexOf("Sort") >= 0) {
testingCluster.setAllTajoDaemonConfValue(
ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname, String.valueOf(1));
testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname,
ConfVars.$EXECUTOR_HASH_JOIN_SIZE_LIMIT.varname, String.valueOf(1));
testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_LIMIT.varname,
String.valueOf(0));
testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.varname,
testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_LIMIT.varname,
String.valueOf(0));
testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_CROSS_JOIN_THRESHOLD.varname,
testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_CROSS_JOIN_LIMIT.varname,
1024 * 1024 + "");
}
}
Expand All @@ -121,19 +121,19 @@ public static void setup() throws Exception {
public static void classTearDown() throws SQLException {
testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_BROADCAST_JOIN_ENABLED.varname,
ConfVars.$TEST_BROADCAST_JOIN_ENABLED.defaultVal);
testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_NON_CROSS_JOIN_THRESHOLD.varname,
ConfVars.$DIST_QUERY_BROADCAST_NON_CROSS_JOIN_THRESHOLD.defaultVal);
testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_CROSS_JOIN_THRESHOLD.varname,
testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_NON_CROSS_JOIN_LIMIT.varname,
ConfVars.$DIST_QUERY_BROADCAST_NON_CROSS_JOIN_LIMIT.defaultVal);
testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_CROSS_JOIN_LIMIT.varname,
1024 * 1024 + "");

testingCluster.setAllTajoDaemonConfValue(
ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname,
ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.defaultVal);
ConfVars.$EXECUTOR_HASH_JOIN_SIZE_LIMIT.varname,
ConfVars.$EXECUTOR_HASH_JOIN_SIZE_LIMIT.defaultVal);

testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname,
ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.defaultVal);
testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.varname,
ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.defaultVal);
testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_LIMIT.varname,
ConfVars.$EXECUTOR_HASH_JOIN_SIZE_LIMIT.defaultVal);
testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_LIMIT.varname,
ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_LIMIT.defaultVal);

if (--reference == 0) {
dropCommonTables();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Available Session Variables:
\set SORT_LIST_SIZE [int value] - The initial size of list for in-memory sort
\set JOIN_HASH_TABLE_SIZE [int value] - The initial size of hash table for in-memory hash join
\set INDEX_ENABLED [true or false] - index scan enabled
\set INDEX_SELECTIVITY_THRESHOLD [real value] - the selectivity threshold for index scan
\set INDEX_SELECTIVITY_LIMIT [real value] - the selectivity threshold for index scan
\set PARTITION_NO_RESULT_OVERWRITE_ENABLED [true or false] - If true, a partitioned table is overwritten even if a sub query leads to no result. Otherwise, the table data will be kept if there is no result
\set ARITHABORT [true or false] - If true, a running query will be terminated when an overflow or divide-by-zero occurs.
\set FETCH_ROWNUM [int value] - The number of rows to be fetched from Master at a time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public static List<UnSafeTuple> sort(QueryContext queryContext, UnSafeTupleList
Comparator<UnSafeTuple> comp) {
UnSafeTuple[] in = list.toArray(new UnSafeTuple[list.size()]);
RadixSortContext context = new RadixSortContext(in, schema, sortSpecs, comp,
queryContext.getInt(SessionVars.TEST_TIM_SORT_THRESHOLD_FOR_RADIX_SORT));
queryContext.getInt(SessionVars.TEST_TIM_SORT_LIMIT_FOR_RADIX_SORT));

long before = System.currentTimeMillis();
recursiveCallForNextKey(context, 0, context.in.length, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void serviceStart() throws Exception {
NodeResources.createResource(systemConf.getIntVar(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY)));

// if resource changed over than 30%, send reports
float queueingRate = systemConf.getFloatVar(TajoConf.ConfVars.WORKER_HEARTBEAT_QUEUE_THRESHOLD_RATE);
float queueingRate = systemConf.getFloatVar(TajoConf.ConfVars.WORKER_HEARTBEAT_QUEUE_LIMIT_RATE);
this.queueingThreshold = Math.max((int) Math.floor(maxContainer * queueingRate), 1);
LOG.info("Queueing threshold:" + queueingThreshold);

Expand Down
Loading