diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java index b634ac21c2284..b26649a3aac2f 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java @@ -513,7 +513,7 @@ public RightJoin(ExecutionContext ctx, RelDataType rowType, Predicate } } - if (waitingLeft == NOT_WAITING && requested > 0 && !rightNotMatchedIndexes.isEmpty()) { + if (waitingLeft == NOT_WAITING && requested > 0 && (rightNotMatchedIndexes != null && !rightNotMatchedIndexes.isEmpty())) { assert lastPushedInd >= 0; inLoop = true; @@ -657,7 +657,7 @@ public FullOuterJoin(ExecutionContext ctx, RelDataType rowType, Predicate 0 && !rightNotMatchedIndexes.isEmpty()) { + if (waitingLeft == NOT_WAITING && requested > 0 && (rightNotMatchedIndexes != null && !rightNotMatchedIndexes.isEmpty())) { assert lastPushedInd >= 0; inLoop = true; diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlValidator.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlValidator.java index b593eb9f79b8b..0be98cf5900db 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlValidator.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlValidator.java @@ -145,8 +145,7 @@ public IgniteSqlValidator(SqlOperatorTable opTab, CalciteCatalogReader catalogRe super.validateSelect(select, targetRowType); } - - + /** * @param n Node to check limit. * @param nodeName Node name. diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java index 6d4bcb7b6d21c..6f71c884220ea 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java @@ -17,12 +17,21 @@ package org.apache.ignite.internal.processors.query.calcite.exec.rel; +import java.util.ArrayDeque; +import java.util.Deque; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.LockSupport; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; + import com.google.common.collect.ImmutableMap; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler; @@ -38,18 +47,22 @@ import org.apache.ignite.internal.processors.query.calcite.message.TestIoManager; import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription; import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.GridTestKernalContext; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor; import org.junit.After; import org.junit.Before; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME; /** * */ +@RunWith(Parameterized.class) public class AbstractExecutionTest extends GridCommonAbstractTest { /** */ private Throwable lastE; @@ -69,6 +82,50 @@ public class AbstractExecutionTest extends GridCommonAbstractTest { /** */ protected int nodesCnt = 3; + /** */ + enum ExecutionStrategy { + /** */ + FIFO { + @Override public T2 nextTask(Deque> tasks) { + return tasks.pollFirst(); + } + }, + + /** */ + LIFO { + @Override public T2 nextTask(Deque> tasks) { + return tasks.pollLast(); + } + }, + + /** */ + RANDOM { + @Override public T2 nextTask(Deque> tasks) { + return ThreadLocalRandom.current().nextBoolean() ? tasks.pollLast() : tasks.pollFirst(); + } + }; + + /** + * Returns a next task according to the strategy. + * + * @param tasks Task list. + * @return Next task. + */ + public T2 nextTask(Deque> tasks) { + throw new UnsupportedOperationException(); + } + } + + /** */ + @Parameterized.Parameters(name = "Execution strategy = {0}") + public static List parameters() { + return Stream.of(ExecutionStrategy.values()).map(s -> new Object[]{s}).collect(Collectors.toList()); + } + + /** Execution direction. */ + @Parameterized.Parameter + public static ExecutionStrategy execStgy; + /** */ @Before public void setup() throws Exception { @@ -85,7 +142,7 @@ public void setup() throws Exception { GridTestKernalContext kernal = newContext(); QueryTaskExecutorImpl taskExecutor = new QueryTaskExecutorImpl(kernal); - taskExecutor.stripedThreadPoolExecutor(new IgniteStripedThreadPoolExecutor( + taskExecutor.stripedThreadPoolExecutor(new IgniteTestStripedThreadPoolExecutor( kernal.config().getQueryThreadPoolSize(), kernal.igniteInstanceName(), "calciteQry", @@ -114,6 +171,58 @@ public void setup() throws Exception { } } + /** Task reordering executor. */ + private static class IgniteTestStripedThreadPoolExecutor extends org.apache.ignite.thread.IgniteStripedThreadPoolExecutor { + /** */ + final Deque> tasks = new ArrayDeque<>(); + + /** Internal stop flag. */ + AtomicBoolean stop = new AtomicBoolean(); + + /** Inner execution service. */ + ExecutorService exec = Executors.newWorkStealingPool(); + + /** {@inheritDoc} */ + public IgniteTestStripedThreadPoolExecutor(int concurrentLvl, String igniteInstanceName, String threadNamePrefix, Thread.UncaughtExceptionHandler eHnd, boolean allowCoreThreadTimeOut, long keepAliveTime) { + super(concurrentLvl, igniteInstanceName, threadNamePrefix, eHnd, allowCoreThreadTimeOut, keepAliveTime); + + GridTestUtils.runAsync(() -> { + while (!stop.get()) { + synchronized (tasks) { + while (!tasks.isEmpty()) { + T2 r = execStgy.nextTask(tasks); + + exec.execute(() -> super.execute(r.getKey(), r.getValue())); + } + } + + LockSupport.parkNanos(ThreadLocalRandom.current().nextLong(1_000, 10_000)); + } + }); + } + + /** {@inheritDoc} */ + @Override public void execute(Runnable task, int idx) { + synchronized (tasks) { + tasks.add(new T2<>(task, idx)); + } + } + + /** {@inheritDoc} */ + @Override public void shutdown() { + stop.set(true); + + super.shutdown(); + } + + /** {@inheritDoc} */ + @Override public List shutdownNow() { + stop.set(true); + + return super.shutdownNow(); + } + } + /** */ @After public void tearDown() { diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java index e3724f2adf195..8a5145a75ec93 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java @@ -17,11 +17,14 @@ package org.apache.ignite.internal.processors.query.calcite.exec.rel; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; import java.util.Random; import java.util.UUID; +import java.util.stream.Stream; import com.google.common.collect.ImmutableList; import org.apache.calcite.rel.type.RelDataType; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; @@ -31,7 +34,6 @@ import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils; import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; @@ -39,30 +41,42 @@ * */ @SuppressWarnings("TypeMayBeWeakened") -@RunWith(Parameterized.class) public class ContinuousExecutionTest extends AbstractExecutionTest { /** */ - @Parameter() + @Parameter(1) public int rowsCnt; /** */ - @Parameter(1) + @Parameter(2) public int remoteFragmentsCnt; /** */ - @Parameterized.Parameters(name = "rowsCount={0}, remoteFragmentsCount={1}") - public static List parameters() { - return ImmutableList.of( - new Object[]{10, 1}, - new Object[]{10, 5}, - new Object[]{10, 10}, - new Object[]{100, 1}, - new Object[]{100, 5}, - new Object[]{100, 10}, - new Object[]{100_000, 1}, - new Object[]{100_000, 5}, - new Object[]{100_000, 10} + @Parameterized.Parameters(name = "rowsCount={1}, remoteFragmentsCount={2}") + public static List data() { + List extraParams = new ArrayList<>(); + + ImmutableList newParams = ImmutableList.of( + new Object[] {10, 1}, + new Object[] {10, 5}, + new Object[] {10, 10}, + new Object[] {100, 1}, + new Object[] {100, 5}, + new Object[] {100, 10}, + new Object[] {100_000, 1}, + new Object[] {100_000, 5}, + new Object[] {100_000, 10} ); + + for (Object[] newParam : newParams) { + for (Object[] inheritedParam : AbstractExecutionTest.parameters()) { + Object[] both = Stream.concat(Arrays.stream(inheritedParam), Arrays.stream(newParam)) + .toArray(Object[]::new); + + extraParams.add(both); + } + } + + return extraParams; } /**