Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import org.apache.flink.FlinkVersion;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
Expand Down Expand Up @@ -205,11 +206,15 @@ protected Transformation<RowData> translateToPlanInternal(
// TODO: support more window types.
Tuple5<Long, Long, Long, Integer, Integer> windowSpecParams =
WindowUtils.extractWindowParameters(windowing);
Tuple2<Integer, Integer> windowStartAndEndIndexes =
WindowUtils.getWindowStartAndEndIndexes(namedWindowProperties, (RowType) getOutputType());
long size = windowSpecParams.f0;
long slide = windowSpecParams.f1;
long offset = windowSpecParams.f2;
int rowtimeIndex = windowSpecParams.f3;
int windowType = windowSpecParams.f4;
int windowStartIndex = windowStartAndEndIndexes.f0;
int windowEndIndex = windowStartAndEndIndexes.f1;
PartitionFunctionSpec sliceAssignerSpec =
new StreamWindowPartitionFunctionSpec(
inputType, rowtimeIndex, size, slide, offset, windowType);
Expand Down Expand Up @@ -252,7 +257,10 @@ protected Transformation<RowData> translateToPlanInternal(
offset,
windowType,
outputType,
rowtimeIndex);
false,
rowtimeIndex,
windowStartIndex,
windowEndIndex);
final OneInputStreamOperator windowOperator =
new GlutenVectorOneInputOperator(
new StatefulPlanNode(windowAgg.getId(), windowAgg),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,10 @@ protected Transformation<RowData> translateToPlanInternal(
offset,
windowType,
outputType,
rowtimeIndex);
false,
rowtimeIndex,
-1,
-1);
final OneInputStreamOperator localAggOperator =
new GlutenVectorOneInputOperator(
new StatefulPlanNode(windowAgg.getId(), windowAgg),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import org.apache.flink.FlinkVersion;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
Expand Down Expand Up @@ -197,11 +198,16 @@ protected Transformation<RowData> translateToPlanInternal(
// TODO: support more window types.
Tuple5<Long, Long, Long, Integer, Integer> windowSpecParams =
WindowUtils.extractWindowParameters(windowing);
Tuple2<Integer, Integer> windowStartAndEndIndexes =
WindowUtils.getWindowStartAndEndIndexes(namedWindowProperties, (RowType) getOutputType());
long size = windowSpecParams.f0;
long slide = windowSpecParams.f1;
long offset = windowSpecParams.f2;
int rowtimeIndex = windowSpecParams.f3;
int windowType = windowSpecParams.f4;
boolean isRowTime = windowing.isRowtime();
int windowStartIndex = windowStartAndEndIndexes.f0;
int windowEndIndex = windowStartAndEndIndexes.f1;
PartitionFunctionSpec sliceAssignerSpec =
new StreamWindowPartitionFunctionSpec(
inputType, rowtimeIndex, size, slide, offset, windowType);
Expand All @@ -210,25 +216,29 @@ protected Transformation<RowData> translateToPlanInternal(
PlanNodeIdGenerator.newId(),
AggregateStep.SINGLE,
groupingKeys,
groupingKeys,
isRowTime ? groupingKeys : List.of(),
aggNames,
aggregates,
false,
List.of(new EmptyNode(inputType)),
null,
List.of());
// processing time window can not apply to local-global aggregate optimization, so here we need
// to set local aggregtate as null when it is not event time window.
PlanNode localAgg =
new AggregationNode(
PlanNodeIdGenerator.newId(),
AggregateStep.SINGLE,
groupingKeys,
groupingKeys,
aggNames,
aggregates,
false,
List.of(new EmptyNode(inputType)),
null,
List.of());
isRowTime
? new AggregationNode(
PlanNodeIdGenerator.newId(),
AggregateStep.SINGLE,
groupingKeys,
groupingKeys,
aggNames,
aggregates,
false,
List.of(new EmptyNode(inputType)),
null,
List.of())
: null;
PlanNode windowAgg =
new StreamWindowAggregationNode(
PlanNodeIdGenerator.newId(),
Expand All @@ -244,7 +254,10 @@ protected Transformation<RowData> translateToPlanInternal(
offset,
windowType,
outputType,
rowtimeIndex);
windowing.isRowtime(),
rowtimeIndex,
windowStartIndex,
windowEndIndex);
final OneInputStreamOperator windowOperator =
new GlutenVectorOneInputOperator(
new StatefulPlanNode(windowAgg.getId(), windowAgg),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,24 @@
*/
package org.apache.gluten.rexnode;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.table.planner.plan.logical.CumulativeWindowSpec;
import org.apache.flink.table.planner.plan.logical.HoppingWindowSpec;
import org.apache.flink.table.planner.plan.logical.SessionWindowSpec;
import org.apache.flink.table.planner.plan.logical.SliceAttachedWindowingStrategy;
import org.apache.flink.table.planner.plan.logical.TimeAttributeWindowingStrategy;
import org.apache.flink.table.planner.plan.logical.TumblingWindowSpec;
import org.apache.flink.table.planner.plan.logical.WindowAttachedWindowingStrategy;
import org.apache.flink.table.planner.plan.logical.WindowSpec;
import org.apache.flink.table.planner.plan.logical.WindowingStrategy;
import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
import org.apache.flink.table.runtime.groupwindow.WindowEnd;
import org.apache.flink.table.runtime.groupwindow.WindowStart;
import org.apache.flink.table.types.logical.RowType;

import java.time.Duration;
import java.util.List;

/** Utility to store some useful functions. */
public class WindowUtils {
Expand Down Expand Up @@ -53,24 +61,27 @@ public static Tuple5<Long, Long, Long, Integer, Integer> extractWindowParameters
if (windowOffset != null) {
offset = windowOffset.toMillis();
}
windowType = 0;
} else if (windowSpec instanceof TumblingWindowSpec) {
size = ((TumblingWindowSpec) windowSpec).getSize().toMillis();
Duration windowOffset = ((TumblingWindowSpec) windowSpec).getOffset();
if (windowOffset != null) {
offset = windowOffset.toMillis();
}
windowType = 1;
} else if (windowSpec instanceof CumulativeWindowSpec) {
windowType = 2;
} else if (windowSpec instanceof SessionWindowSpec) {
windowType = 3;
} else {
throw new RuntimeException("Not support window spec " + windowSpec);
}

if (windowing instanceof TimeAttributeWindowingStrategy) {
if (windowing.isRowtime()) {
rowtimeIndex = ((TimeAttributeWindowingStrategy) windowing).getTimeAttributeIndex();
}
windowType = 0;
} else if (windowing instanceof WindowAttachedWindowingStrategy) {
rowtimeIndex = ((WindowAttachedWindowingStrategy) windowing).getWindowEnd();
windowType = 1;
} else if (windowing instanceof SliceAttachedWindowingStrategy) {
rowtimeIndex = ((SliceAttachedWindowingStrategy) windowing).getSliceEnd();
} else {
Expand All @@ -79,4 +90,18 @@ public static Tuple5<Long, Long, Long, Integer, Integer> extractWindowParameters
return new Tuple5<Long, Long, Long, Integer, Integer>(
size, slide, offset, rowtimeIndex, windowType);
}

public static Tuple2<Integer, Integer> getWindowStartAndEndIndexes(
NamedWindowProperty[] props, RowType outputType) {
int windowStartIndex = -1, windowEndIndex = -1;
List<String> outputNames = outputType.getFieldNames();
for (NamedWindowProperty prop : props) {
if (prop.getProperty() instanceof WindowStart) {
windowStartIndex = outputNames.indexOf(prop.getName());
} else if (prop.getProperty() instanceof WindowEnd) {
windowEndIndex = outputNames.indexOf(prop.getName());
}
}
return new Tuple2<Integer, Integer>(windowStartIndex, windowEndIndex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private void buildGlutenChains(StreamConfig taskConfig, Map<Integer, StreamConfi
// TODO: judge whether can set?
if (isSourceGluten) {
if (taskConfig.getOperatorName().equals("exchange-hash")) {
taskConfig.setTypeSerializerOut(new GlutenRowVectorSerializer(null));
taskConfig.setTypeSerializerOut(new GlutenRowVectorSerializer(null, sourceOperator));
}
Map<IntermediateDataSetID, String> nodeToNonChainedOuts = new HashMap<>(outEdges.size());
taskConfig
Expand Down Expand Up @@ -216,14 +216,16 @@ private void buildGlutenChains(StreamConfig taskConfig, Map<Integer, StreamConfi
taskConfig.setStatePartitioner(0, new GlutenKeySelector());
taskConfig.setStatePartitioner(1, new GlutenKeySelector());
taskConfig.setupNetworkInputs(
new GlutenRowVectorSerializer(null), new GlutenRowVectorSerializer(null));
new GlutenRowVectorSerializer(null, twoInputOperator),
new GlutenRowVectorSerializer(null, twoInputOperator));
} else {
taskConfig.setStreamOperator(
GlutenVectorOneInputOperator oneInputOperator =
new GlutenVectorOneInputOperator(
sourceNode, sourceOperator.getId(), sourceOperator.getInputType(), nodeToOutTypes));
sourceNode, sourceOperator.getId(), sourceOperator.getInputType(), nodeToOutTypes);
taskConfig.setStreamOperator(oneInputOperator);
// TODO: judge whether can set?
taskConfig.setStatePartitioner(0, new GlutenKeySelector());
taskConfig.setupNetworkInputs(new GlutenRowVectorSerializer(null));
taskConfig.setupNetworkInputs(new GlutenRowVectorSerializer(null, oneInputOperator));
}
Utils.setNodeToChainedOutputs(taskConfig, nodeToChainedOuts);
Utils.setNodeToNonChainedOutputs(taskConfig, nodeToNonChainedOuts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
package org.apache.gluten.streaming.api.operators;

import org.apache.gluten.table.runtime.operators.GlutenVectorSourceFunction;
import org.apache.gluten.util.ReflectUtils;

import io.github.zhztheplayer.velox4j.connector.ConnectorSplit;
import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
import io.github.zhztheplayer.velox4j.type.RowType;

import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;

import java.util.Map;
Expand Down Expand Up @@ -63,4 +66,28 @@ public String getId() {
public ConnectorSplit getConnectorSplit() {
return sourceFunction.getConnectorSplit();
}

@SuppressWarnings("rawtypes")
private SourceFunction.SourceContext getSourceContext() {
return (SourceFunction.SourceContext)
ReflectUtils.getObjectField(StreamSource.class, this, "ctx");
}

@SuppressWarnings({"rawtypes"})
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
String[] committed = sourceFunction.notifyCheckpointComplete(checkpointId);
SourceFunction.SourceContext sourceContext = getSourceContext();
TaskInfo taskInfo = getRuntimeContext().getTaskInfo();
if (sourceContext != null
&& committed != null
&& taskInfo.getTaskName().contains("StreamingFileWriter")) {}
}

@Override
public void notifyCheckpointAborted(long checkpointId) throws Exception {
super.notifyCheckpointAborted(checkpointId);
sourceFunction.notifyCheckpointAborted(checkpointId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,15 @@
import org.apache.flink.streaming.runtime.tasks.WatermarkGaugeExposingOutput;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.XORShiftRandom;

import java.util.Map;
import java.util.Random;

/**
* Collector for gluten, it may contain several outputs, dispatch the record according to node id.
*/
public class GlutenOutputCollector<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {

protected final Map<String, OutputWithChainingCheck<StreamRecord<T>>> outputs;
private final Random random = new XORShiftRandom();
private final WatermarkGauge watermarkGauge = new WatermarkGauge();
protected final Counter numRecordsOutForTask;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class VeloxQueryConfig {
private static final String keyVeloxAdjustTimestampToSessionTimeZone =
"adjust_timestamp_to_session_timezone";
private static final String keyVeloxSessionTimezone = "session_timezone";
private static final String kStreamingAggregationMinOutputBatchRows =
"streaming_aggregation_min_output_batch_rows";

public static Config getConfig(RuntimeContext context) {
if (!(context instanceof StreamingRuntimeContext)) {
Expand All @@ -47,6 +49,7 @@ public static Config getConfig(RuntimeContext context) {
} else {
configMap.put(keyVeloxSessionTimezone, localTimeZone);
}
configMap.put(kStreamingAggregationMinOutputBatchRows, String.valueOf(1));
return Config.create(configMap);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.table.runtime.config;

import io.github.zhztheplayer.velox4j.session.Session;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

public class VeloxSessionConfig implements Serializable {
private final Map<String, Session> sessions;

private static VeloxSessionConfig config = new VeloxSessionConfig();

private VeloxSessionConfig() {
sessions = new HashMap<>();
}

public static VeloxSessionConfig getSessionConfig() {
return config;
}

public void addSession(String id, Session session) {
sessions.put(id, session);
}

public Session getSession(String id) {
return sessions.getOrDefault(id, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.gluten.streaming.api.operators.GlutenOperator;
import org.apache.gluten.table.runtime.config.VeloxQueryConfig;
import org.apache.gluten.table.runtime.config.VeloxSessionConfig;

import io.github.zhztheplayer.velox4j.Velox4j;
import io.github.zhztheplayer.velox4j.config.ConnectorConfig;
Expand Down Expand Up @@ -82,6 +83,7 @@ public GlutenVectorOneInputOperator(
void initGlutenTask() {
memoryManager = MemoryManager.create(AllocationListener.NOOP);
session = Velox4j.newSession(memoryManager);
VeloxSessionConfig.getSessionConfig().addSession(id, session);
// add a mock input as velox not allow the source is empty.
StatefulPlanNode mockInput =
new StatefulPlanNode(
Expand Down
Loading