diff --git a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java index ee8c91a3757c..5825447086fc 100644 --- a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java +++ b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java @@ -37,6 +37,7 @@ import org.apache.flink.FlinkVersion; import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -205,11 +206,15 @@ protected Transformation translateToPlanInternal( // TODO: support more window types. Tuple5 windowSpecParams = WindowUtils.extractWindowParameters(windowing); + Tuple2 windowStartAndEndIndexes = + WindowUtils.getWindowStartAndEndIndexes(namedWindowProperties, (RowType) getOutputType()); long size = windowSpecParams.f0; long slide = windowSpecParams.f1; long offset = windowSpecParams.f2; int rowtimeIndex = windowSpecParams.f3; int windowType = windowSpecParams.f4; + int windowStartIndex = windowStartAndEndIndexes.f0; + int windowEndIndex = windowStartAndEndIndexes.f1; PartitionFunctionSpec sliceAssignerSpec = new StreamWindowPartitionFunctionSpec( inputType, rowtimeIndex, size, slide, offset, windowType); @@ -252,7 +257,10 @@ protected Transformation translateToPlanInternal( offset, windowType, outputType, - rowtimeIndex); + false, + rowtimeIndex, + windowStartIndex, + windowEndIndex); final OneInputStreamOperator windowOperator = new GlutenVectorOneInputOperator( new StatefulPlanNode(windowAgg.getId(), windowAgg), diff --git a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java index 4ffcf7998c27..d1b01e046323 100644 --- a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java +++ b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java @@ -220,7 +220,10 @@ protected Transformation translateToPlanInternal( offset, windowType, outputType, - rowtimeIndex); + false, + rowtimeIndex, + -1, + -1); final OneInputStreamOperator localAggOperator = new GlutenVectorOneInputOperator( new StatefulPlanNode(windowAgg.getId(), windowAgg), diff --git a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java index afe5d0148396..8efb8be4e3a9 100644 --- a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java +++ b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java @@ -37,6 +37,7 @@ import org.apache.flink.FlinkVersion; import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -197,11 +198,16 @@ protected Transformation translateToPlanInternal( // TODO: support more window types. Tuple5 windowSpecParams = WindowUtils.extractWindowParameters(windowing); + Tuple2 windowStartAndEndIndexes = + WindowUtils.getWindowStartAndEndIndexes(namedWindowProperties, (RowType) getOutputType()); long size = windowSpecParams.f0; long slide = windowSpecParams.f1; long offset = windowSpecParams.f2; int rowtimeIndex = windowSpecParams.f3; int windowType = windowSpecParams.f4; + boolean isRowTime = windowing.isRowtime(); + int windowStartIndex = windowStartAndEndIndexes.f0; + int windowEndIndex = windowStartAndEndIndexes.f1; PartitionFunctionSpec sliceAssignerSpec = new StreamWindowPartitionFunctionSpec( inputType, rowtimeIndex, size, slide, offset, windowType); @@ -210,25 +216,29 @@ protected Transformation translateToPlanInternal( PlanNodeIdGenerator.newId(), AggregateStep.SINGLE, groupingKeys, - groupingKeys, + isRowTime ? groupingKeys : List.of(), aggNames, aggregates, false, List.of(new EmptyNode(inputType)), null, List.of()); + // processing time window can not apply to local-global aggregate optimization, so here we need + // to set local aggregtate as null when it is not event time window. PlanNode localAgg = - new AggregationNode( - PlanNodeIdGenerator.newId(), - AggregateStep.SINGLE, - groupingKeys, - groupingKeys, - aggNames, - aggregates, - false, - List.of(new EmptyNode(inputType)), - null, - List.of()); + isRowTime + ? new AggregationNode( + PlanNodeIdGenerator.newId(), + AggregateStep.SINGLE, + groupingKeys, + groupingKeys, + aggNames, + aggregates, + false, + List.of(new EmptyNode(inputType)), + null, + List.of()) + : null; PlanNode windowAgg = new StreamWindowAggregationNode( PlanNodeIdGenerator.newId(), @@ -244,7 +254,10 @@ protected Transformation translateToPlanInternal( offset, windowType, outputType, - rowtimeIndex); + windowing.isRowtime(), + rowtimeIndex, + windowStartIndex, + windowEndIndex); final OneInputStreamOperator windowOperator = new GlutenVectorOneInputOperator( new StatefulPlanNode(windowAgg.getId(), windowAgg), diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java index 6f7a64f6f4ee..021969f2a264 100644 --- a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java @@ -16,16 +16,24 @@ */ package org.apache.gluten.rexnode; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.table.planner.plan.logical.CumulativeWindowSpec; import org.apache.flink.table.planner.plan.logical.HoppingWindowSpec; +import org.apache.flink.table.planner.plan.logical.SessionWindowSpec; import org.apache.flink.table.planner.plan.logical.SliceAttachedWindowingStrategy; import org.apache.flink.table.planner.plan.logical.TimeAttributeWindowingStrategy; import org.apache.flink.table.planner.plan.logical.TumblingWindowSpec; import org.apache.flink.table.planner.plan.logical.WindowAttachedWindowingStrategy; import org.apache.flink.table.planner.plan.logical.WindowSpec; import org.apache.flink.table.planner.plan.logical.WindowingStrategy; +import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty; +import org.apache.flink.table.runtime.groupwindow.WindowEnd; +import org.apache.flink.table.runtime.groupwindow.WindowStart; +import org.apache.flink.table.types.logical.RowType; import java.time.Duration; +import java.util.List; /** Utility to store some useful functions. */ public class WindowUtils { @@ -53,24 +61,27 @@ public static Tuple5 extractWindowParameters if (windowOffset != null) { offset = windowOffset.toMillis(); } + windowType = 0; } else if (windowSpec instanceof TumblingWindowSpec) { size = ((TumblingWindowSpec) windowSpec).getSize().toMillis(); Duration windowOffset = ((TumblingWindowSpec) windowSpec).getOffset(); if (windowOffset != null) { offset = windowOffset.toMillis(); } + windowType = 1; + } else if (windowSpec instanceof CumulativeWindowSpec) { + windowType = 2; + } else if (windowSpec instanceof SessionWindowSpec) { + windowType = 3; } else { throw new RuntimeException("Not support window spec " + windowSpec); } - if (windowing instanceof TimeAttributeWindowingStrategy) { if (windowing.isRowtime()) { rowtimeIndex = ((TimeAttributeWindowingStrategy) windowing).getTimeAttributeIndex(); } - windowType = 0; } else if (windowing instanceof WindowAttachedWindowingStrategy) { rowtimeIndex = ((WindowAttachedWindowingStrategy) windowing).getWindowEnd(); - windowType = 1; } else if (windowing instanceof SliceAttachedWindowingStrategy) { rowtimeIndex = ((SliceAttachedWindowingStrategy) windowing).getSliceEnd(); } else { @@ -79,4 +90,18 @@ public static Tuple5 extractWindowParameters return new Tuple5( size, slide, offset, rowtimeIndex, windowType); } + + public static Tuple2 getWindowStartAndEndIndexes( + NamedWindowProperty[] props, RowType outputType) { + int windowStartIndex = -1, windowEndIndex = -1; + List outputNames = outputType.getFieldNames(); + for (NamedWindowProperty prop : props) { + if (prop.getProperty() instanceof WindowStart) { + windowStartIndex = outputNames.indexOf(prop.getName()); + } else if (prop.getProperty() instanceof WindowEnd) { + windowEndIndex = outputNames.indexOf(prop.getName()); + } + } + return new Tuple2(windowStartIndex, windowEndIndex); + } } diff --git a/gluten-flink/runtime/src/main/java/org/apache/flink/client/StreamGraphTranslator.java b/gluten-flink/runtime/src/main/java/org/apache/flink/client/StreamGraphTranslator.java index db702d56053e..3fdbd4ad4aaf 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/flink/client/StreamGraphTranslator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/flink/client/StreamGraphTranslator.java @@ -132,7 +132,7 @@ private void buildGlutenChains(StreamConfig taskConfig, Map nodeToNonChainedOuts = new HashMap<>(outEdges.size()); taskConfig @@ -216,14 +216,16 @@ private void buildGlutenChains(StreamConfig taskConfig, Map implements WatermarkGaugeExposingOutput> { protected final Map>> outputs; - private final Random random = new XORShiftRandom(); private final WatermarkGauge watermarkGauge = new WatermarkGauge(); protected final Counter numRecordsOutForTask; diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java index 788ecb4a68b7..a1026cfe49a5 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java @@ -31,6 +31,8 @@ public class VeloxQueryConfig { private static final String keyVeloxAdjustTimestampToSessionTimeZone = "adjust_timestamp_to_session_timezone"; private static final String keyVeloxSessionTimezone = "session_timezone"; + private static final String kStreamingAggregationMinOutputBatchRows = + "streaming_aggregation_min_output_batch_rows"; public static Config getConfig(RuntimeContext context) { if (!(context instanceof StreamingRuntimeContext)) { @@ -47,6 +49,7 @@ public static Config getConfig(RuntimeContext context) { } else { configMap.put(keyVeloxSessionTimezone, localTimeZone); } + configMap.put(kStreamingAggregationMinOutputBatchRows, String.valueOf(1)); return Config.create(configMap); } } diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxSessionConfig.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxSessionConfig.java new file mode 100644 index 000000000000..ee55a258654b --- /dev/null +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxSessionConfig.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.table.runtime.config; + +import io.github.zhztheplayer.velox4j.session.Session; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +public class VeloxSessionConfig implements Serializable { + private final Map sessions; + + private static VeloxSessionConfig config = new VeloxSessionConfig(); + + private VeloxSessionConfig() { + sessions = new HashMap<>(); + } + + public static VeloxSessionConfig getSessionConfig() { + return config; + } + + public void addSession(String id, Session session) { + sessions.put(id, session); + } + + public Session getSession(String id) { + return sessions.getOrDefault(id, null); + } +} diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java index e859e15ca37b..3b21d60e74ee 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java @@ -18,6 +18,7 @@ import org.apache.gluten.streaming.api.operators.GlutenOperator; import org.apache.gluten.table.runtime.config.VeloxQueryConfig; +import org.apache.gluten.table.runtime.config.VeloxSessionConfig; import io.github.zhztheplayer.velox4j.Velox4j; import io.github.zhztheplayer.velox4j.config.ConnectorConfig; @@ -82,6 +83,7 @@ public GlutenVectorOneInputOperator( void initGlutenTask() { memoryManager = MemoryManager.create(AllocationListener.NOOP); session = Velox4j.newSession(memoryManager); + VeloxSessionConfig.getSessionConfig().addSession(id, session); // add a mock input as velox not allow the source is empty. StatefulPlanNode mockInput = new StatefulPlanNode( diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java index 472bd0bfed0a..9cb09a14f7ed 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java @@ -33,7 +33,6 @@ import io.github.zhztheplayer.velox4j.stateful.StatefulElement; import io.github.zhztheplayer.velox4j.type.RowType; -import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -53,7 +52,7 @@ * instead of RowData to avoid data convert. */ public class GlutenVectorSourceFunction extends RichParallelSourceFunction - implements CheckpointedFunction, CheckpointListener { + implements CheckpointedFunction { private static final Logger LOG = LoggerFactory.getLogger(GlutenVectorSourceFunction.class); private final StatefulPlanNode planNode; @@ -105,7 +104,6 @@ public void open(Configuration parameters) throws Exception { new Query( planNode, VeloxQueryConfig.getConfig(getRuntimeContext()), ConnectorConfig.empty()); allocator = new RootAllocator(Long.MAX_VALUE); - task = session.queryOps().execute(query); task.addSplit(id, split); task.noMoreSplits(id); @@ -133,7 +131,6 @@ public void run(SourceContext sourceContext) throws Exception { } taskMetrics.updateMetrics(task, id); } - task.close(); session.close(); memoryManager.close(); @@ -170,13 +167,11 @@ public void initializeState(FunctionInitializationContext context) throws Except this.task.initializeState(0); } - @Override - public void notifyCheckpointComplete(long checkpointId) throws Exception { + public String[] notifyCheckpointComplete(long checkpointId) throws Exception { // TODO: notify velox - this.task.notifyCheckpointComplete(checkpointId); + return this.task.notifyCheckpointComplete(checkpointId); } - @Override public void notifyCheckpointAborted(long checkpointId) throws Exception { // TODO: notify velox this.task.notifyCheckpointAborted(checkpointId); diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/typeutils/GlutenRowVectorSerializer.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/typeutils/GlutenRowVectorSerializer.java index db17a47a2d59..6f340ad1e50b 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/typeutils/GlutenRowVectorSerializer.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/typeutils/GlutenRowVectorSerializer.java @@ -16,11 +16,10 @@ */ package org.apache.gluten.table.runtime.typeutils; -import io.github.zhztheplayer.velox4j.Velox4j; +import org.apache.gluten.streaming.api.operators.GlutenOperator; +import org.apache.gluten.table.runtime.config.VeloxSessionConfig; + import io.github.zhztheplayer.velox4j.data.RowVector; -import io.github.zhztheplayer.velox4j.memory.AllocationListener; -import io.github.zhztheplayer.velox4j.memory.MemoryManager; -import io.github.zhztheplayer.velox4j.session.Session; import io.github.zhztheplayer.velox4j.stateful.StatefulRecord; import io.github.zhztheplayer.velox4j.type.RowType; @@ -31,24 +30,23 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; -import java.io.Closeable; import java.io.IOException; /** Serializer for {@link RowVector}. */ @Internal -public class GlutenRowVectorSerializer extends TypeSerializer implements Closeable { +public class GlutenRowVectorSerializer extends TypeSerializer { private static final long serialVersionUID = 1L; private final RowType rowType; - private transient MemoryManager memoryManager; - private transient Session session; + private final GlutenOperator operator; - public GlutenRowVectorSerializer(RowType rowType) { + public GlutenRowVectorSerializer(RowType rowType, GlutenOperator operator) { this.rowType = rowType; + this.operator = operator; } @Override public TypeSerializer duplicate() { - return new GlutenRowVectorSerializer(rowType); + return new GlutenRowVectorSerializer(rowType, operator); } @Override @@ -65,14 +63,15 @@ public void serialize(StatefulRecord record, DataOutputView target) throws IOExc @Override public StatefulRecord deserialize(DataInputView source) throws IOException { - if (memoryManager == null) { - memoryManager = MemoryManager.create(AllocationListener.NOOP); - session = Velox4j.newSession(memoryManager); - } int len = source.readInt(); byte[] str = new byte[len]; source.readFully(str); - RowVector rowVector = session.baseVectorOps().deserializeOne(new String(str)).asRowVector(); + RowVector rowVector = + VeloxSessionConfig.getSessionConfig() + .getSession(operator.getId()) + .baseVectorOps() + .deserializeOne(new String(str)) + .asRowVector(); StatefulRecord record = new StatefulRecord(null, 0, 0, false, -1); record.setRowVector(rowVector); return record; @@ -131,15 +130,7 @@ public int getLength() { @Override public TypeSerializerSnapshot snapshotConfiguration() { - return new RowVectorSerializerSnapshot(rowType); - } - - @Override - public void close() { - if (memoryManager != null) { - memoryManager.close(); - session.close(); - } + return new RowVectorSerializerSnapshot(rowType, operator); } /** {@link TypeSerializerSnapshot} for Gluten RowVector.. */ @@ -148,14 +139,16 @@ public static final class RowVectorSerializerSnapshot private static final int CURRENT_VERSION = 1; private RowType rowType; + private GlutenOperator operator; @SuppressWarnings("unused") public RowVectorSerializerSnapshot() { // this constructor is used when restoring from a checkpoint/savepoint. } - RowVectorSerializerSnapshot(RowType rowType) { + RowVectorSerializerSnapshot(RowType rowType, GlutenOperator operator) { this.rowType = rowType; + this.operator = operator; } @Override @@ -172,7 +165,7 @@ public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCode @Override public GlutenRowVectorSerializer restoreSerializer() { - return new GlutenRowVectorSerializer(rowType); + return new GlutenRowVectorSerializer(rowType, operator); } @Override diff --git a/gluten-flink/ut/src/test/resources/nexmark/q5.sql b/gluten-flink/ut/src/test/resources/nexmark/q5.sql deleted file mode 100644 index 98954bacf201..000000000000 --- a/gluten-flink/ut/src/test/resources/nexmark/q5.sql +++ /dev/null @@ -1,38 +0,0 @@ -CREATE TABLE nexmark_q5 ( - auction BIGINT, - num BIGINT -) WITH ( - 'connector' = 'blackhole' -); - -INSERT INTO nexmark_q5 -SELECT AuctionBids.auction, AuctionBids.num - FROM ( - SELECT - auction, - count(*) AS num, - window_start AS starttime, - window_end AS endtime - FROM TABLE( - HOP(TABLE bid, DESCRIPTOR(`dateTime`), INTERVAL '2' SECOND, INTERVAL '10' SECOND)) - GROUP BY auction, window_start, window_end - ) AS AuctionBids - JOIN ( - SELECT - max(CountBids.num) AS maxn, - CountBids.starttime, - CountBids.endtime - FROM ( - SELECT - count(*) AS num, - window_start AS starttime, - window_end AS endtime - FROM TABLE( - HOP(TABLE bid, DESCRIPTOR(`dateTime`), INTERVAL '2' SECOND, INTERVAL '10' SECOND)) - GROUP BY auction, window_start, window_end - ) AS CountBids - GROUP BY CountBids.starttime, CountBids.endtime - ) AS MaxBids - ON AuctionBids.starttime = MaxBids.starttime AND - AuctionBids.endtime = MaxBids.endtime AND - AuctionBids.num >= MaxBids.maxn; diff --git a/gluten-flink/ut/src/test/resources/nexmark/q7.sql b/gluten-flink/ut/src/test/resources/nexmark/q7.sql deleted file mode 100644 index 1b0ec308e94b..000000000000 --- a/gluten-flink/ut/src/test/resources/nexmark/q7.sql +++ /dev/null @@ -1,21 +0,0 @@ -CREATE TABLE nexmark_q7 ( - auction BIGINT, - bidder BIGINT, - price BIGINT, - `dateTime` TIMESTAMP(3), - extra VARCHAR -) WITH ( - 'connector' = 'blackhole' -); - -INSERT INTO nexmark_q7 -SELECT B.auction, B.price, B.bidder, B.`dateTime`, B.extra -from bid B -JOIN ( - SELECT MAX(price) AS maxprice, window_end as `dateTime` - FROM TABLE( - TUMBLE(TABLE bid, DESCRIPTOR(`dateTime`), INTERVAL '10' SECOND)) - GROUP BY window_start, window_end -) B1 -ON B.price = B1.maxprice -WHERE B.`dateTime` BETWEEN B1.`dateTime` - INTERVAL '10' SECOND AND B1.`dateTime`; diff --git a/gluten-flink/ut/src/test/resources/nexmark/q8.sql b/gluten-flink/ut/src/test/resources/nexmark/q8.sql deleted file mode 100644 index 1b112f9fc752..000000000000 --- a/gluten-flink/ut/src/test/resources/nexmark/q8.sql +++ /dev/null @@ -1,27 +0,0 @@ -CREATE TABLE nexmark_q8 ( - id BIGINT, - name VARCHAR, - stime TIMESTAMP(3) -) WITH ( - 'connector' = 'blackhole' -); - -INSERT INTO nexmark_q8 -SELECT P.id, P.name, P.starttime -FROM ( - SELECT id, name, - window_start AS starttime, - window_end AS endtime - FROM TABLE( - TUMBLE(TABLE person, DESCRIPTOR(`dateTime`), INTERVAL '10' SECOND)) - GROUP BY id, name, window_start, window_end -) P -JOIN ( - SELECT seller, - window_start AS starttime, - window_end AS endtime - FROM TABLE( - TUMBLE(TABLE auction, DESCRIPTOR(`dateTime`), INTERVAL '10' SECOND)) - GROUP BY seller, window_start, window_end -) A -ON P.id = A.seller AND P.starttime = A.starttime AND P.endtime = A.endtime;