Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
*/
package org.apache.flink.table.planner.plan.nodes.exec.common;

import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator;
import org.apache.gluten.util.LogicalTypeConverter;
import org.apache.gluten.util.PlanNodeIdGenerator;
import org.apache.gluten.velox.VeloxSourceSinkFactory;

import org.apache.flink.api.common.io.OutputFormat;
Expand Down Expand Up @@ -73,6 +70,7 @@
import org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer;
import org.apache.flink.table.runtime.operators.sink.RowKindSetter;
import org.apache.flink.table.runtime.operators.sink.SinkOperator;
import org.apache.flink.table.runtime.operators.sink.StreamRecordTimestampInserter;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.BinaryType;
import org.apache.flink.table.types.logical.CharType;
Expand Down Expand Up @@ -558,27 +556,56 @@ private Transformation<RowData> applyRowtimeTransformation(
if (rowtimeFieldIndex == -1) {
return inputTransform;
}
// --- Begin Gluten-specific code changes ---
io.github.zhztheplayer.velox4j.type.RowType outputType =
(io.github.zhztheplayer.velox4j.type.RowType)
LogicalTypeConverter.toVLType(
((InternalTypeInfo) inputTransform.getOutputType()).toLogicalType());
return ExecNodeUtil.createOneInputTransformation(
inputTransform,
createTransformationMeta(
TIMESTAMP_INSERTER_TRANSFORMATION,
String.format("StreamRecordTimestampInserter(rowtime field: %s)", rowtimeFieldIndex),
"StreamRecordTimestampInserter",
config),
// TODO: support it, Map.of() will not be used, hardcode it here.
new GlutenOneInputOperator(
null, PlanNodeIdGenerator.newId(), null, Map.of("1", outputType)),
new StreamRecordTimestampInserter(rowtimeFieldIndex),
inputTransform.getOutputType(),
sinkParallelism,
sinkParallelismConfigured);
// --- End Gluten-specific code changes ---
}

/*
private Transformation<RowData> applyRowtimeTransformation(
Transformation<RowData> inputTransform,
int rowtimeFieldIndex,
int sinkParallelism,
ExecNodeConfig config) {
// Don't apply the transformation/operator if there is no rowtimeFieldIndex
if (rowtimeFieldIndex == -1) {
return inputTransform;
}
// --- Begin Gluten-specific code changes ---
io.github.zhztheplayer.velox4j.type.RowType outputType =
(io.github.zhztheplayer.velox4j.type.RowType)
LogicalTypeConverter.toVLType(
((InternalTypeInfo) inputTransform.getOutputType()).toLogicalType());
return ExecNodeUtil.createOneInputTransformation(
inputTransform,
createTransformationMeta(
TIMESTAMP_INSERTER_TRANSFORMATION,
String.format("StreamRecordTimestampInserter(rowtime field: %s)", rowtimeFieldIndex),
"StreamRecordTimestampInserter",
config),
// TODO: support it, Map.of() will not be used, hardcode it here.
new GlutenOneInputOperator(
null,
PlanNodeIdGenerator.newId(),
null,
Map.of("1", outputType),
RowData.class,
RowData.class),
inputTransform.getOutputType(),
sinkParallelism,
sinkParallelismConfigured);
// --- End Gluten-specific code changes ---
}
*/

private InternalTypeInfo<RowData> getInputTypeInfo() {
return InternalTypeInfo.of(getInputEdges().get(0).getOutputType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.apache.gluten.rexnode.RexConversionContext;
import org.apache.gluten.rexnode.RexNodeConverter;
import org.apache.gluten.rexnode.Utils;
import org.apache.gluten.table.runtime.operators.GlutenVectorOneInputOperator;
import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator;
import org.apache.gluten.util.LogicalTypeConverter;
import org.apache.gluten.util.PlanNodeIdGenerator;

Expand Down Expand Up @@ -53,6 +53,8 @@
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import org.apache.calcite.rex.RexNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

Expand All @@ -68,6 +70,7 @@
minPlanVersion = FlinkVersion.v1_15,
minStateVersion = FlinkVersion.v1_15)
public class StreamExecCalc extends CommonExecCalc implements StreamExecNode<RowData> {
private static final Logger LOG = LoggerFactory.getLogger(StreamExecCalc.class);

public StreamExecCalc(
ReadableConfig tableConfig,
Expand Down Expand Up @@ -142,11 +145,14 @@ public Transformation<RowData> translateToPlanInternal(
(io.github.zhztheplayer.velox4j.type.RowType)
LogicalTypeConverter.toVLType(getOutputType());
final OneInputStreamOperator calOperator =
new GlutenVectorOneInputOperator(
new GlutenOneInputOperator(
new StatefulPlanNode(project.getId(), project),
PlanNodeIdGenerator.newId(),
inputType,
Map.of(project.getId(), outputType));
Map.of(project.getId(), outputType),
RowData.class,
RowData.class,
"StreamExecCalc");
return ExecNodeUtil.createOneInputTransformation(
inputTransform,
new TransformationMetadata("gluten-calc", "Gluten cal operator"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.flink.table.planner.plan.nodes.exec.stream;

import org.apache.gluten.table.runtime.operators.GlutenVectorOneInputOperator;
import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator;
import org.apache.gluten.util.LogicalTypeConverter;
import org.apache.gluten.util.PlanNodeIdGenerator;

Expand Down Expand Up @@ -212,11 +212,14 @@ protected Transformation<RowData> translateToPlanInternal(
deduplicateNode,
outputType);
operator =
new GlutenVectorOneInputOperator(
new GlutenOneInputOperator(
new StatefulPlanNode(streamRankNode.getId(), streamRankNode),
PlanNodeIdGenerator.newId(),
inputType,
Map.of(streamRankNode.getId(), outputType));
Map.of(streamRankNode.getId(), outputType),
RowData.class,
RowData.class,
"StreamExecDeduplicate");
} else {
throw new RuntimeException("ProcTime in deduplicate is not supported.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.apache.gluten.streaming.api.operators.GlutenOperator;
import org.apache.gluten.streaming.runtime.partitioner.GlutenKeyGroupStreamPartitioner;
import org.apache.gluten.table.runtime.keyselector.GlutenKeySelector;
import org.apache.gluten.table.runtime.operators.GlutenVectorOneInputOperator;
import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator;
import org.apache.gluten.util.LogicalTypeConverter;
import org.apache.gluten.util.PlanNodeIdGenerator;

Expand Down Expand Up @@ -164,8 +164,14 @@ protected Transformation<RowData> translateToPlanInternal(
partitionFunctionSpec);
PlanNode exchange = new StreamPartitionNode(id, localPartition, parallelism);
final OneInputStreamOperator exchangeKeyGenerator =
new GlutenVectorOneInputOperator(
new StatefulPlanNode(id, exchange), id, glutenInputType, Map.of(id, outputType));
new GlutenOneInputOperator(
new StatefulPlanNode(id, exchange),
id,
glutenInputType,
Map.of(id, outputType),
RowData.class,
RowData.class,
"StreamExecExchange");
inputTransform =
ExecNodeUtil.createOneInputTransformation(
inputTransform,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.apache.gluten.rexnode.AggregateCallConverter;
import org.apache.gluten.rexnode.Utils;
import org.apache.gluten.rexnode.WindowUtils;
import org.apache.gluten.table.runtime.operators.GlutenVectorOneInputOperator;
import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator;
import org.apache.gluten.util.LogicalTypeConverter;
import org.apache.gluten.util.PlanNodeIdGenerator;

Expand Down Expand Up @@ -254,11 +254,14 @@ protected Transformation<RowData> translateToPlanInternal(
outputType,
rowtimeIndex);
final OneInputStreamOperator windowOperator =
new GlutenVectorOneInputOperator(
new GlutenOneInputOperator(
new StatefulPlanNode(windowAgg.getId(), windowAgg),
PlanNodeIdGenerator.newId(),
inputType,
Map.of(windowAgg.getId(), outputType));
Map.of(windowAgg.getId(), outputType),
RowData.class,
RowData.class,
"StreamExecGlobalWindowAggregate");
// --- End Gluten-specific code changes ---

final RowDataKeySelector selector =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import org.apache.gluten.rexnode.AggregateCallConverter;
import org.apache.gluten.rexnode.Utils;
import org.apache.gluten.table.runtime.operators.GlutenVectorOneInputOperator;
import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator;
import org.apache.gluten.util.LogicalTypeConverter;
import org.apache.gluten.util.PlanNodeIdGenerator;

Expand Down Expand Up @@ -206,11 +206,14 @@ protected Transformation<RowData> translateToPlanInternal(
new GroupAggregationNode(
PlanNodeIdGenerator.newId(), aggsHandlerNode, keySelectorSpec, outputType);
final OneInputStreamOperator operator =
new GlutenVectorOneInputOperator(
new GlutenOneInputOperator(
new StatefulPlanNode(aggregation.getId(), aggregation),
PlanNodeIdGenerator.newId(),
inputType,
Map.of(aggregation.getId(), outputType));
Map.of(aggregation.getId(), outputType),
RowData.class,
RowData.class,
"StreamExecGroupAggregate");
// --- End Gluten-specific code changes ---

// partitioned aggregation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import org.apache.gluten.rexnode.AggregateCallConverter;
import org.apache.gluten.rexnode.Utils;
import org.apache.gluten.table.runtime.operators.GlutenVectorOneInputOperator;
import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator;
import org.apache.gluten.util.LogicalTypeConverter;
import org.apache.gluten.util.PlanNodeIdGenerator;

Expand Down Expand Up @@ -240,11 +240,14 @@ protected Transformation<RowData> translateToPlanInternal(
1, // TODO: get from window attributes
outputType);
final OneInputStreamOperator windowOperator =
new GlutenVectorOneInputOperator(
new GlutenOneInputOperator(
new StatefulPlanNode(windowAgg.getId(), windowAgg),
PlanNodeIdGenerator.newId(),
inputType,
Map.of(windowAgg.getId(), outputType));
Map.of(windowAgg.getId(), outputType),
RowData.class,
RowData.class,
"StreamExecGroupWindowAggregate");
// --- End Gluten-specific code changes ---

final OneInputTransformation<RowData, RowData> transform =
Expand All @@ -264,6 +267,9 @@ protected Transformation<RowData> translateToPlanInternal(
InternalTypeInfo.of(inputRowType));
transform.setStateKeySelector(selector);
transform.setStateKeyType(selector.getProducedType());

// GlutenKeySelector selector = new GlutenKeySelector();
// transform.setStateKeySelector(selector);
return transform;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.apache.gluten.rexnode.RexConversionContext;
import org.apache.gluten.rexnode.RexNodeConverter;
import org.apache.gluten.rexnode.Utils;
import org.apache.gluten.table.runtime.operators.GlutenVectorTwoInputOperator;
import org.apache.gluten.table.runtime.operators.GlutenTwoInputOperator;
import org.apache.gluten.util.LogicalTypeConverter;
import org.apache.gluten.util.PlanNodeIdGenerator;

Expand Down Expand Up @@ -308,13 +308,16 @@ protected Transformation<RowData> translateToPlanInternal(
outputType,
1024);
operator =
new GlutenVectorTwoInputOperator(
new GlutenTwoInputOperator(
new StatefulPlanNode(join.getId(), join),
leftInput.getId(),
rightInput.getId(),
leftInputType,
rightInputType,
Map.of(join.getId(), outputType));
Map.of(join.getId(), outputType),
RowData.class,
RowData.class,
"StreamExecJoin");
// --- End Gluten-specific code changes ---
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.apache.gluten.rexnode.AggregateCallConverter;
import org.apache.gluten.rexnode.Utils;
import org.apache.gluten.rexnode.WindowUtils;
import org.apache.gluten.table.runtime.operators.GlutenVectorOneInputOperator;
import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator;
import org.apache.gluten.util.LogicalTypeConverter;
import org.apache.gluten.util.PlanNodeIdGenerator;

Expand Down Expand Up @@ -222,11 +222,14 @@ protected Transformation<RowData> translateToPlanInternal(
outputType,
rowtimeIndex);
final OneInputStreamOperator localAggOperator =
new GlutenVectorOneInputOperator(
new GlutenOneInputOperator(
new StatefulPlanNode(windowAgg.getId(), windowAgg),
PlanNodeIdGenerator.newId(),
inputType,
Map.of(windowAgg.getId(), outputType));
Map.of(windowAgg.getId(), outputType),
RowData.class,
RowData.class,
"StreamExecLocalWindowAggregate");
// --- End Gluten-specific code changes ---

return ExecNodeUtil.createOneInputTransformation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.stream;

import org.apache.gluten.rexnode.Utils;
import org.apache.gluten.table.runtime.operators.GlutenVectorOneInputOperator;
import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator;
import org.apache.gluten.util.LogicalTypeConverter;
import org.apache.gluten.util.PlanNodeIdGenerator;

Expand Down Expand Up @@ -268,11 +268,14 @@ protected Transformation<RowData> translateToPlanInternal(
streamTopNNode,
outputType);
final OneInputStreamOperator operator =
new GlutenVectorOneInputOperator(
new GlutenOneInputOperator(
new StatefulPlanNode(streamRankNode.getId(), streamRankNode),
PlanNodeIdGenerator.newId(),
inputType,
Map.of(streamRankNode.getId(), outputType));
Map.of(streamRankNode.getId(), outputType),
RowData.class,
RowData.class,
"StreamExecRank");
// --- End Gluten-specific code changes ---

OneInputTransformation<RowData, RowData> transform =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.apache.gluten.rexnode.RexConversionContext;
import org.apache.gluten.rexnode.RexNodeConverter;
import org.apache.gluten.rexnode.Utils;
import org.apache.gluten.table.runtime.operators.GlutenVectorOneInputOperator;
import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator;
import org.apache.gluten.util.LogicalTypeConverter;
import org.apache.gluten.util.PlanNodeIdGenerator;

Expand Down Expand Up @@ -162,11 +162,14 @@ protected Transformation<RowData> translateToPlanInternal(
rowtimeFieldIndex,
watermarkInterval);
final OneInputStreamOperator watermarkOperator =
new GlutenVectorOneInputOperator(
new GlutenOneInputOperator(
new StatefulPlanNode(watermark.getId(), watermark),
PlanNodeIdGenerator.newId(),
inputType,
Map.of(watermark.getId(), outputType));
Map.of(watermark.getId(), outputType),
RowData.class,
RowData.class,
"StreamExecWatermarkAssigner");

return ExecNodeUtil.createOneInputTransformation(
inputTransform,
Expand Down
Loading
Loading