From a3dfed781c58bc0e2808fb0f4d5408bc33d39573 Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Tue, 14 Oct 2025 07:47:06 +0000 Subject: [PATCH 1/7] support proctime window --- .../StreamExecGlobalWindowAggregate.java | 10 +- .../StreamExecLocalWindowAggregate.java | 5 +- .../stream/StreamExecWindowAggregate.java | 41 +++++--- .../apache/gluten/rexnode/WindowUtils.java | 36 ++++++- .../SourceTransformationTranslator.java | 95 +++++++++++++++++++ .../runtime/config/VeloxQueryConfig.java | 3 + 6 files changed, 172 insertions(+), 18 deletions(-) diff --git a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java index ee8c91a3757c..5825447086fc 100644 --- a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java +++ b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java @@ -37,6 +37,7 @@ import org.apache.flink.FlinkVersion; import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -205,11 +206,15 @@ protected Transformation translateToPlanInternal( // TODO: support more window types. Tuple5 windowSpecParams = WindowUtils.extractWindowParameters(windowing); + Tuple2 windowStartAndEndIndexes = + WindowUtils.getWindowStartAndEndIndexes(namedWindowProperties, (RowType) getOutputType()); long size = windowSpecParams.f0; long slide = windowSpecParams.f1; long offset = windowSpecParams.f2; int rowtimeIndex = windowSpecParams.f3; int windowType = windowSpecParams.f4; + int windowStartIndex = windowStartAndEndIndexes.f0; + int windowEndIndex = windowStartAndEndIndexes.f1; PartitionFunctionSpec sliceAssignerSpec = new StreamWindowPartitionFunctionSpec( inputType, rowtimeIndex, size, slide, offset, windowType); @@ -252,7 +257,10 @@ protected Transformation translateToPlanInternal( offset, windowType, outputType, - rowtimeIndex); + false, + rowtimeIndex, + windowStartIndex, + windowEndIndex); final OneInputStreamOperator windowOperator = new GlutenVectorOneInputOperator( new StatefulPlanNode(windowAgg.getId(), windowAgg), diff --git a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java index 4ffcf7998c27..d1b01e046323 100644 --- a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java +++ b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java @@ -220,7 +220,10 @@ protected Transformation translateToPlanInternal( offset, windowType, outputType, - rowtimeIndex); + false, + rowtimeIndex, + -1, + -1); final OneInputStreamOperator localAggOperator = new GlutenVectorOneInputOperator( new StatefulPlanNode(windowAgg.getId(), windowAgg), diff --git a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java index afe5d0148396..667ed1876fdb 100644 --- a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java +++ b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java @@ -37,6 +37,7 @@ import org.apache.flink.FlinkVersion; import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -65,6 +66,8 @@ import org.apache.calcite.rel.core.AggregateCall; import org.apache.commons.math3.util.ArithmeticUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; @@ -97,6 +100,7 @@ minStateVersion = FlinkVersion.v1_15) public class StreamExecWindowAggregate extends StreamExecWindowAggregateBase { + private static final Logger LOG = LoggerFactory.getLogger(StreamExecWindowAggregate.class); public static final String WINDOW_AGGREGATE_TRANSFORMATION = "window-aggregate"; private static final long WINDOW_AGG_MEMORY_RATIO = 100; @@ -197,11 +201,16 @@ protected Transformation translateToPlanInternal( // TODO: support more window types. Tuple5 windowSpecParams = WindowUtils.extractWindowParameters(windowing); + Tuple2 windowStartAndEndIndexes = + WindowUtils.getWindowStartAndEndIndexes(namedWindowProperties, (RowType) getOutputType()); long size = windowSpecParams.f0; long slide = windowSpecParams.f1; long offset = windowSpecParams.f2; int rowtimeIndex = windowSpecParams.f3; int windowType = windowSpecParams.f4; + boolean isRowTime = windowing.isRowtime(); + int windowStartIndex = windowStartAndEndIndexes.f0; + int windowEndIndex = windowStartAndEndIndexes.f1; PartitionFunctionSpec sliceAssignerSpec = new StreamWindowPartitionFunctionSpec( inputType, rowtimeIndex, size, slide, offset, windowType); @@ -210,7 +219,7 @@ protected Transformation translateToPlanInternal( PlanNodeIdGenerator.newId(), AggregateStep.SINGLE, groupingKeys, - groupingKeys, + isRowTime ? groupingKeys : List.of(), aggNames, aggregates, false, @@ -218,17 +227,19 @@ protected Transformation translateToPlanInternal( null, List.of()); PlanNode localAgg = - new AggregationNode( - PlanNodeIdGenerator.newId(), - AggregateStep.SINGLE, - groupingKeys, - groupingKeys, - aggNames, - aggregates, - false, - List.of(new EmptyNode(inputType)), - null, - List.of()); + isRowTime + ? new AggregationNode( + PlanNodeIdGenerator.newId(), + AggregateStep.SINGLE, + groupingKeys, + groupingKeys, + aggNames, + aggregates, + false, + List.of(new EmptyNode(inputType)), + null, + List.of()) + : null; PlanNode windowAgg = new StreamWindowAggregationNode( PlanNodeIdGenerator.newId(), @@ -244,7 +255,11 @@ protected Transformation translateToPlanInternal( offset, windowType, outputType, - rowtimeIndex); + windowing.isRowtime(), + rowtimeIndex, + windowStartIndex, + windowEndIndex); + LOG.info("windowStartIndex:{}, windowEndIndex:{}", windowStartIndex, windowEndIndex); final OneInputStreamOperator windowOperator = new GlutenVectorOneInputOperator( new StatefulPlanNode(windowAgg.getId(), windowAgg), diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java index 6f7a64f6f4ee..3f8f8ad5aaf5 100644 --- a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java @@ -16,20 +16,32 @@ */ package org.apache.gluten.rexnode; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.table.planner.plan.logical.CumulativeWindowSpec; import org.apache.flink.table.planner.plan.logical.HoppingWindowSpec; +import org.apache.flink.table.planner.plan.logical.SessionWindowSpec; import org.apache.flink.table.planner.plan.logical.SliceAttachedWindowingStrategy; import org.apache.flink.table.planner.plan.logical.TimeAttributeWindowingStrategy; import org.apache.flink.table.planner.plan.logical.TumblingWindowSpec; import org.apache.flink.table.planner.plan.logical.WindowAttachedWindowingStrategy; import org.apache.flink.table.planner.plan.logical.WindowSpec; import org.apache.flink.table.planner.plan.logical.WindowingStrategy; +import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty; +import org.apache.flink.table.runtime.groupwindow.WindowEnd; +import org.apache.flink.table.runtime.groupwindow.WindowStart; +import org.apache.flink.table.types.logical.RowType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Duration; +import java.util.List; /** Utility to store some useful functions. */ public class WindowUtils { + private static final Logger LOG = LoggerFactory.getLogger(WindowUtils.class); // Get names for project node. public static Tuple5 extractWindowParameters( WindowingStrategy windowing) { @@ -53,24 +65,28 @@ public static Tuple5 extractWindowParameters if (windowOffset != null) { offset = windowOffset.toMillis(); } + windowType = 0; } else if (windowSpec instanceof TumblingWindowSpec) { size = ((TumblingWindowSpec) windowSpec).getSize().toMillis(); Duration windowOffset = ((TumblingWindowSpec) windowSpec).getOffset(); if (windowOffset != null) { offset = windowOffset.toMillis(); } + windowType = 1; + } else if (windowSpec instanceof CumulativeWindowSpec) { + windowType = 2; + } else if (windowSpec instanceof SessionWindowSpec) { + windowType = 3; } else { throw new RuntimeException("Not support window spec " + windowSpec); } - + LOG.info("window strategy:{}", windowing.getClass().getName()); if (windowing instanceof TimeAttributeWindowingStrategy) { if (windowing.isRowtime()) { rowtimeIndex = ((TimeAttributeWindowingStrategy) windowing).getTimeAttributeIndex(); } - windowType = 0; } else if (windowing instanceof WindowAttachedWindowingStrategy) { rowtimeIndex = ((WindowAttachedWindowingStrategy) windowing).getWindowEnd(); - windowType = 1; } else if (windowing instanceof SliceAttachedWindowingStrategy) { rowtimeIndex = ((SliceAttachedWindowingStrategy) windowing).getSliceEnd(); } else { @@ -79,4 +95,18 @@ public static Tuple5 extractWindowParameters return new Tuple5( size, slide, offset, rowtimeIndex, windowType); } + + public static Tuple2 getWindowStartAndEndIndexes( + NamedWindowProperty[] props, RowType outputType) { + int windowStartIndex = -1, windowEndIndex = -1; + List outputNames = outputType.getFieldNames(); + for (NamedWindowProperty prop : props) { + if (prop.getProperty() instanceof WindowStart) { + windowStartIndex = outputNames.indexOf(prop.getName()); + } else if (prop.getProperty() instanceof WindowEnd) { + windowEndIndex = outputNames.indexOf(prop.getName()); + } + } + return new Tuple2(windowStartIndex, windowEndIndex); + } } diff --git a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java b/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java index 5e7dd62f0df9..31dcd08a3633 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java @@ -22,6 +22,8 @@ import org.apache.gluten.util.PlanNodeIdGenerator; import org.apache.gluten.util.ReflectUtils; +import io.github.zhztheplayer.velox4j.connector.KafkaConnectorSplit; +import io.github.zhztheplayer.velox4j.connector.KafkaTableHandle; import io.github.zhztheplayer.velox4j.connector.NexmarkConnectorSplit; import io.github.zhztheplayer.velox4j.connector.NexmarkTableHandle; import io.github.zhztheplayer.velox4j.plan.PlanNode; @@ -31,6 +33,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.connector.source.Source; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.streaming.api.graph.SimpleTransformationTranslator; import org.apache.flink.streaming.api.graph.StreamGraph; @@ -43,8 +46,11 @@ import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; +import java.util.UUID; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -130,6 +136,95 @@ private Collection translateInternal( null, transformation.getOutputType(), "Source: " + transformation.getName()); + } else if (sourceClazz.getSimpleName().equals("KafkaSource")) { + RowType outputType = + (RowType) + LogicalTypeConverter.toVLType( + ((InternalTypeInfo) transformation.getOutputType()).toLogicalType()); + String connectorId = "connector-kafka"; + Source kafkaSource = transformation.getSource(); + Properties properties = + (Properties) ReflectUtils.getObjectField(sourceClazz, kafkaSource, "props"); + Object kafkaSubscriber = ReflectUtils.getObjectField(sourceClazz, kafkaSource, "subscriber"); + Object topics = + ReflectUtils.getObjectField(kafkaSubscriber.getClass(), kafkaSubscriber, "topics"); + Object deserializer = + ReflectUtils.getObjectField(sourceClazz, kafkaSource, "deserializationSchema"); + if (deserializer.getClass().getSimpleName().equals("KafkaDeserializationSchemaWrapper")) { + deserializer = + ReflectUtils.getObjectField( + deserializer.getClass(), deserializer, "kafkaDeserializationSchema"); + if (deserializer.getClass().getSimpleName().equals("DynamicKafkaDeserializationSchema")) { + deserializer = + ReflectUtils.getObjectField( + deserializer.getClass(), deserializer, "valueDeserialization"); + } + } + Object offsetStartInitializer = + ReflectUtils.getObjectField(sourceClazz, kafkaSource, "startingOffsetsInitializer"); + String startupMode = "group-offsets"; + String offsetStartInitializerClazzName = offsetStartInitializer.getClass().getSimpleName(); + if (offsetStartInitializerClazzName.equals("LatestOffsetsInitializer")) { + startupMode = "latest-offsets"; + } else if (offsetStartInitializerClazzName.equals("ReaderHandledOffsetsInitializer")) { + Long offset = + (Long) + ReflectUtils.getObjectField( + offsetStartInitializer.getClass(), offsetStartInitializer, "startingOffset"); + startupMode = + offset == -1 ? "latest-offsets" : offset == -2 ? "earliest-offsets" : "group-offsets"; + } + String planId = PlanNodeIdGenerator.newId(); + String topic = ((List) topics).get(0); + String format = + deserializer.getClass().getSimpleName().equals("JsonParserRowDataDeserializationSchema") + ? "json" + : "raw"; + Map kafkaTableParameters = new HashMap(); + for (String key : properties.stringPropertyNames()) { + kafkaTableParameters.put(key, properties.getProperty(key)); + } + kafkaTableParameters.put("topic", topic); + kafkaTableParameters.put("format", format); + kafkaTableParameters.put("scan.startup.mode", startupMode); + kafkaTableParameters.put( + "enable.auto.commit", + context.getStreamGraph().getCheckpointConfig().isCheckpointingEnabled() + ? "false" + : "true"); + kafkaTableParameters.put( + "client.id", + properties.getProperty("client.id.prefix", "connector-kafka") + "-" + UUID.randomUUID()); + KafkaTableHandle kafkaTableHandle = + new KafkaTableHandle(connectorId, topic, outputType, kafkaTableParameters); + KafkaConnectorSplit connectorSplit = + new KafkaConnectorSplit( + connectorId, + 0, + false, + kafkaTableParameters.get("bootstrap.servers"), + kafkaTableParameters.get("group.id"), + format, + Boolean.valueOf(kafkaTableParameters.getOrDefault("enable.auto.commit", "false")), + "latest", + List.of()); + TableScanNode kafkaScan = new TableScanNode(planId, outputType, kafkaTableHandle, List.of()); + StreamOperatorFactory operatorFactory = + SimpleOperatorFactory.of( + new GlutenStreamSource( + new GlutenVectorSourceFunction( + new StatefulPlanNode(kafkaScan.getId(), kafkaScan), + Map.of(kafkaScan.getId(), outputType), + kafkaScan.getId(), + connectorSplit))); + streamGraph.addLegacySource( + transformationId, + slotSharingGroup, + transformation.getCoLocationGroupKey(), + operatorFactory, + null, + transformation.getOutputType(), + "Source: " + transformation.getName()); } else { SourceOperatorFactory operatorFactory = new SourceOperatorFactory<>( diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java index 788ecb4a68b7..a1026cfe49a5 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java @@ -31,6 +31,8 @@ public class VeloxQueryConfig { private static final String keyVeloxAdjustTimestampToSessionTimeZone = "adjust_timestamp_to_session_timezone"; private static final String keyVeloxSessionTimezone = "session_timezone"; + private static final String kStreamingAggregationMinOutputBatchRows = + "streaming_aggregation_min_output_batch_rows"; public static Config getConfig(RuntimeContext context) { if (!(context instanceof StreamingRuntimeContext)) { @@ -47,6 +49,7 @@ public static Config getConfig(RuntimeContext context) { } else { configMap.put(keyVeloxSessionTimezone, localTimeZone); } + configMap.put(kStreamingAggregationMinOutputBatchRows, String.valueOf(1)); return Config.create(configMap); } } From 9dc019844b74481a29349a035df202e8ec9283ac Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Thu, 16 Oct 2025 01:46:29 +0000 Subject: [PATCH 2/7] remove useless changes --- .../stream/StreamExecWindowAggregate.java | 6 +- .../apache/gluten/rexnode/WindowUtils.java | 4 - .../SourceTransformationTranslator.java | 95 ------------------- 3 files changed, 2 insertions(+), 103 deletions(-) diff --git a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java index 667ed1876fdb..8efb8be4e3a9 100644 --- a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java +++ b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java @@ -66,8 +66,6 @@ import org.apache.calcite.rel.core.AggregateCall; import org.apache.commons.math3.util.ArithmeticUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.annotation.Nullable; @@ -100,7 +98,6 @@ minStateVersion = FlinkVersion.v1_15) public class StreamExecWindowAggregate extends StreamExecWindowAggregateBase { - private static final Logger LOG = LoggerFactory.getLogger(StreamExecWindowAggregate.class); public static final String WINDOW_AGGREGATE_TRANSFORMATION = "window-aggregate"; private static final long WINDOW_AGG_MEMORY_RATIO = 100; @@ -226,6 +223,8 @@ protected Transformation translateToPlanInternal( List.of(new EmptyNode(inputType)), null, List.of()); + // processing time window can not apply to local-global aggregate optimization, so here we need + // to set local aggregtate as null when it is not event time window. PlanNode localAgg = isRowTime ? new AggregationNode( @@ -259,7 +258,6 @@ protected Transformation translateToPlanInternal( rowtimeIndex, windowStartIndex, windowEndIndex); - LOG.info("windowStartIndex:{}, windowEndIndex:{}", windowStartIndex, windowEndIndex); final OneInputStreamOperator windowOperator = new GlutenVectorOneInputOperator( new StatefulPlanNode(windowAgg.getId(), windowAgg), diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java index 3f8f8ad5aaf5..5a419509e931 100644 --- a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java @@ -32,9 +32,6 @@ import org.apache.flink.table.runtime.groupwindow.WindowStart; import org.apache.flink.table.types.logical.RowType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.time.Duration; import java.util.List; @@ -80,7 +77,6 @@ public static Tuple5 extractWindowParameters } else { throw new RuntimeException("Not support window spec " + windowSpec); } - LOG.info("window strategy:{}", windowing.getClass().getName()); if (windowing instanceof TimeAttributeWindowingStrategy) { if (windowing.isRowtime()) { rowtimeIndex = ((TimeAttributeWindowingStrategy) windowing).getTimeAttributeIndex(); diff --git a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java b/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java index 31dcd08a3633..5e7dd62f0df9 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java @@ -22,8 +22,6 @@ import org.apache.gluten.util.PlanNodeIdGenerator; import org.apache.gluten.util.ReflectUtils; -import io.github.zhztheplayer.velox4j.connector.KafkaConnectorSplit; -import io.github.zhztheplayer.velox4j.connector.KafkaTableHandle; import io.github.zhztheplayer.velox4j.connector.NexmarkConnectorSplit; import io.github.zhztheplayer.velox4j.connector.NexmarkTableHandle; import io.github.zhztheplayer.velox4j.plan.PlanNode; @@ -33,7 +31,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.connector.source.Source; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.streaming.api.graph.SimpleTransformationTranslator; import org.apache.flink.streaming.api.graph.StreamGraph; @@ -46,11 +43,8 @@ import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Properties; -import java.util.UUID; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -136,95 +130,6 @@ private Collection translateInternal( null, transformation.getOutputType(), "Source: " + transformation.getName()); - } else if (sourceClazz.getSimpleName().equals("KafkaSource")) { - RowType outputType = - (RowType) - LogicalTypeConverter.toVLType( - ((InternalTypeInfo) transformation.getOutputType()).toLogicalType()); - String connectorId = "connector-kafka"; - Source kafkaSource = transformation.getSource(); - Properties properties = - (Properties) ReflectUtils.getObjectField(sourceClazz, kafkaSource, "props"); - Object kafkaSubscriber = ReflectUtils.getObjectField(sourceClazz, kafkaSource, "subscriber"); - Object topics = - ReflectUtils.getObjectField(kafkaSubscriber.getClass(), kafkaSubscriber, "topics"); - Object deserializer = - ReflectUtils.getObjectField(sourceClazz, kafkaSource, "deserializationSchema"); - if (deserializer.getClass().getSimpleName().equals("KafkaDeserializationSchemaWrapper")) { - deserializer = - ReflectUtils.getObjectField( - deserializer.getClass(), deserializer, "kafkaDeserializationSchema"); - if (deserializer.getClass().getSimpleName().equals("DynamicKafkaDeserializationSchema")) { - deserializer = - ReflectUtils.getObjectField( - deserializer.getClass(), deserializer, "valueDeserialization"); - } - } - Object offsetStartInitializer = - ReflectUtils.getObjectField(sourceClazz, kafkaSource, "startingOffsetsInitializer"); - String startupMode = "group-offsets"; - String offsetStartInitializerClazzName = offsetStartInitializer.getClass().getSimpleName(); - if (offsetStartInitializerClazzName.equals("LatestOffsetsInitializer")) { - startupMode = "latest-offsets"; - } else if (offsetStartInitializerClazzName.equals("ReaderHandledOffsetsInitializer")) { - Long offset = - (Long) - ReflectUtils.getObjectField( - offsetStartInitializer.getClass(), offsetStartInitializer, "startingOffset"); - startupMode = - offset == -1 ? "latest-offsets" : offset == -2 ? "earliest-offsets" : "group-offsets"; - } - String planId = PlanNodeIdGenerator.newId(); - String topic = ((List) topics).get(0); - String format = - deserializer.getClass().getSimpleName().equals("JsonParserRowDataDeserializationSchema") - ? "json" - : "raw"; - Map kafkaTableParameters = new HashMap(); - for (String key : properties.stringPropertyNames()) { - kafkaTableParameters.put(key, properties.getProperty(key)); - } - kafkaTableParameters.put("topic", topic); - kafkaTableParameters.put("format", format); - kafkaTableParameters.put("scan.startup.mode", startupMode); - kafkaTableParameters.put( - "enable.auto.commit", - context.getStreamGraph().getCheckpointConfig().isCheckpointingEnabled() - ? "false" - : "true"); - kafkaTableParameters.put( - "client.id", - properties.getProperty("client.id.prefix", "connector-kafka") + "-" + UUID.randomUUID()); - KafkaTableHandle kafkaTableHandle = - new KafkaTableHandle(connectorId, topic, outputType, kafkaTableParameters); - KafkaConnectorSplit connectorSplit = - new KafkaConnectorSplit( - connectorId, - 0, - false, - kafkaTableParameters.get("bootstrap.servers"), - kafkaTableParameters.get("group.id"), - format, - Boolean.valueOf(kafkaTableParameters.getOrDefault("enable.auto.commit", "false")), - "latest", - List.of()); - TableScanNode kafkaScan = new TableScanNode(planId, outputType, kafkaTableHandle, List.of()); - StreamOperatorFactory operatorFactory = - SimpleOperatorFactory.of( - new GlutenStreamSource( - new GlutenVectorSourceFunction( - new StatefulPlanNode(kafkaScan.getId(), kafkaScan), - Map.of(kafkaScan.getId(), outputType), - kafkaScan.getId(), - connectorSplit))); - streamGraph.addLegacySource( - transformationId, - slotSharingGroup, - transformation.getCoLocationGroupKey(), - operatorFactory, - null, - transformation.getOutputType(), - "Source: " + transformation.getName()); } else { SourceOperatorFactory operatorFactory = new SourceOperatorFactory<>( From 2909b250b89e7425a78dd10d2c3cd7909308829b Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Thu, 16 Oct 2025 01:48:21 +0000 Subject: [PATCH 3/7] remove useless changes --- .../src/main/java/org/apache/gluten/rexnode/WindowUtils.java | 1 - 1 file changed, 1 deletion(-) diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java index 5a419509e931..021969f2a264 100644 --- a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java @@ -38,7 +38,6 @@ /** Utility to store some useful functions. */ public class WindowUtils { - private static final Logger LOG = LoggerFactory.getLogger(WindowUtils.class); // Get names for project node. public static Tuple5 extractWindowParameters( WindowingStrategy windowing) { From 332fb11ba7d615037a92124ecf6e131fdabba3d7 Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Wed, 11 Jun 2025 10:49:59 +0000 Subject: [PATCH 4/7] support kafka data source --- .github/workflows/flink.yml | 2 +- gluten-flink/docs/Flink.md | 2 +- .../nodes/exec/common/CommonExecSink.java | 5 +- .../stream/StreamExecTableSourceScan.java | 8 +- ...er.java => FromElementsSourceFactory.java} | 31 +++- .../gluten/velox/KafkaSourceSinkFactory.java | 141 +++++++++++++++ .../gluten/velox/NexmarkSourceFactory.java | 107 +++++++++++ ...SinkBuilder.java => PrintSinkFactory.java} | 25 ++- ...apache.gluten.velox.VeloxSourceSinkFactory | 4 + .../SourceTransformationTranslator.java | 168 ------------------ .../gluten/velox/VeloxSourceSinkFactory.java | 58 ++++++ gluten-flink/ut/pom.xml | 30 ++++ .../runtime/stream/custom/NexmarkTest.java | 102 +++++++++-- .../src/test/resources/nexmark/ddl_kafka.sql | 46 +++++ .../src/test/resources/nexmark/ddl_views.sql | 6 +- 15 files changed, 533 insertions(+), 202 deletions(-) rename gluten-flink/planner/src/main/java/org/apache/gluten/velox/{VeloxSourceBuilder.java => FromElementsSourceFactory.java} (81%) create mode 100644 gluten-flink/planner/src/main/java/org/apache/gluten/velox/KafkaSourceSinkFactory.java create mode 100644 gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java rename gluten-flink/planner/src/main/java/org/apache/gluten/velox/{VeloxSinkBuilder.java => PrintSinkFactory.java} (86%) create mode 100644 gluten-flink/planner/src/main/resources/META-INF/services/org.apache.gluten.velox.VeloxSourceSinkFactory delete mode 100644 gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java create mode 100644 gluten-flink/runtime/src/main/java/org/apache/gluten/velox/VeloxSourceSinkFactory.java create mode 100644 gluten-flink/ut/src/test/resources/nexmark/ddl_kafka.sql diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml index 4c0f1fdd4e8e..099ee734389b 100644 --- a/.github/workflows/flink.yml +++ b/.github/workflows/flink.yml @@ -61,7 +61,7 @@ jobs: sudo yum install https://mirror.stream.centos.org/9-stream/BaseOS/x86_64/os/Packages/tzdata-2025a-1.el9.noarch.rpm -y sudo .github/workflows/util/install-flink-resources.sh git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git - cd velox4j && git reset --hard 14eea127c5088f972cdf1ca0987fd95429485a0e + cd velox4j && git reset --hard 1753fa68f71d8a1a0df2d4a0ff346ae00e973e9c git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true cd .. diff --git a/gluten-flink/docs/Flink.md b/gluten-flink/docs/Flink.md index bc3e2ed09240..a73b56bb32fd 100644 --- a/gluten-flink/docs/Flink.md +++ b/gluten-flink/docs/Flink.md @@ -48,7 +48,7 @@ As some features have not been committed to upstream, you have to use the follow ## fetch velox4j code git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git cd velox4j -git reset --hard 14eea127c5088f972cdf1ca0987fd95429485a0e +git reset --hard 1753fa68f71d8a1a0df2d4a0ff346ae00e973e9c mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true ``` **Get gluten** diff --git a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java index f11d3f79587e..efd23619688e 100644 --- a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java +++ b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java @@ -19,7 +19,7 @@ import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator; import org.apache.gluten.util.LogicalTypeConverter; import org.apache.gluten.util.PlanNodeIdGenerator; -import org.apache.gluten.velox.VeloxSinkBuilder; +import org.apache.gluten.velox.VeloxSourceSinkFactory; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.dag.Transformation; @@ -470,7 +470,8 @@ public Optional generateUid(String name) { Transformation sinkTransformation = createSinkFunctionTransformation( sinkFunction, env, inputTransform, rowtimeFieldIndex, sinkMeta, sinkParallelism); - return VeloxSinkBuilder.build(env.getConfiguration(), sinkTransformation); + return VeloxSourceSinkFactory.getFactory(sinkTransformation) + .buildSink(env.getConfiguration(), sinkTransformation); // --- End Gluten-specific code changes --- } else if (runtimeProvider instanceof OutputFormatProvider) { OutputFormat outputFormat = diff --git a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java index 07189d5f5eee..0d7f7533f2e0 100644 --- a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java +++ b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java @@ -16,7 +16,7 @@ */ package org.apache.flink.table.planner.plan.nodes.exec.stream; -import org.apache.gluten.velox.VeloxSourceBuilder; +import org.apache.gluten.velox.VeloxSourceSinkFactory; import org.apache.flink.FlinkVersion; import org.apache.flink.api.common.io.InputFormat; @@ -106,7 +106,11 @@ protected Transformation translateToPlanInternal( .getScanTableSource( planner.getFlinkContext(), ShortcutUtils.unwrapTypeFactory(planner)); Transformation sourceTransformation = super.translateToPlanInternal(planner, config); - return VeloxSourceBuilder.build(sourceTransformation, tableSource); + VeloxSourceSinkFactory factory = VeloxSourceSinkFactory.getFactory(sourceTransformation); + return factory.buildSource( + sourceTransformation, + tableSource, + planner.getExecEnv().getCheckpointConfig().isCheckpointingEnabled()); // --- End Gluten-specific code changes --- } } diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/VeloxSourceBuilder.java b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/FromElementsSourceFactory.java similarity index 81% rename from gluten-flink/planner/src/main/java/org/apache/gluten/velox/VeloxSourceBuilder.java rename to gluten-flink/planner/src/main/java/org/apache/gluten/velox/FromElementsSourceFactory.java index 7c56bef72990..5a5ad79aeb95 100644 --- a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/VeloxSourceBuilder.java +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/FromElementsSourceFactory.java @@ -28,6 +28,8 @@ import io.github.zhztheplayer.velox4j.plan.TableScanNode; import org.apache.flink.api.dag.Transformation; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.transformations.LegacySourceTransformation; import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.data.RowData; @@ -40,22 +42,27 @@ import java.util.List; import java.util.Map; -public class VeloxSourceBuilder { +public class FromElementsSourceFactory implements VeloxSourceSinkFactory { - public static Transformation build( - Transformation transformation, ScanTableSource scanTableSource) { + @SuppressWarnings("rawtypes") + @Override + public boolean match(Transformation transformation) { if (transformation instanceof LegacySourceTransformation) { - if (scanTableSource.getClass().getSimpleName().equals("TestValuesScanLookupTableSource")) { - return buildFromElementsSource(transformation, scanTableSource); + StreamSource source = ((LegacySourceTransformation) transformation).getOperator(); + String sourceFunctionName = source.getUserFunction().getClass().getSimpleName(); + if (sourceFunctionName.equals("FromElementsFunction")) { + return true; } } - return transformation; + return false; } - /** `FromElementsSource` is designed for ut tests, and we map it to velox source. */ @SuppressWarnings({"rawtypes", "unchecked"}) - private static Transformation buildFromElementsSource( - Transformation transformation, ScanTableSource tableSource) { + @Override + public Transformation buildSource( + Transformation transformation, + ScanTableSource tableSource, + boolean checkpointEnabled) { LegacySourceTransformation sourceTransformation = (LegacySourceTransformation) transformation; try { @@ -106,4 +113,10 @@ private static Transformation buildFromElementsSource( throw new FlinkRuntimeException(e); } } + + @Override + public Transformation buildSink( + ReadableConfig config, Transformation transformation) { + throw new FlinkRuntimeException("Unimplemented method 'buildSink'"); + } } diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/KafkaSourceSinkFactory.java b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/KafkaSourceSinkFactory.java new file mode 100644 index 000000000000..c9b20d4c33e0 --- /dev/null +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/KafkaSourceSinkFactory.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.velox; + +import org.apache.gluten.streaming.api.operators.GlutenStreamSource; +import org.apache.gluten.table.runtime.operators.GlutenVectorSourceFunction; +import org.apache.gluten.util.LogicalTypeConverter; +import org.apache.gluten.util.PlanNodeIdGenerator; +import org.apache.gluten.util.ReflectUtils; + +import io.github.zhztheplayer.velox4j.connector.KafkaConnectorSplit; +import io.github.zhztheplayer.velox4j.connector.KafkaTableHandle; +import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode; +import io.github.zhztheplayer.velox4j.plan.TableScanNode; +import io.github.zhztheplayer.velox4j.type.RowType; + +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.api.transformations.LegacySourceTransformation; +import org.apache.flink.streaming.api.transformations.SourceTransformation; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; + +public class KafkaSourceSinkFactory implements VeloxSourceSinkFactory { + + @SuppressWarnings("rawtypes") + @Override + public boolean match(Transformation transformation) { + if (transformation instanceof SourceTransformation) { + Source source = ((SourceTransformation) transformation).getSource(); + return source.getClass().getSimpleName().equals("KafkaSource"); + } + return false; + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Override + public Transformation buildSource( + Transformation transformation, + ScanTableSource tableSource, + boolean checkpointEnabled) { + RowType outputType = + (RowType) + LogicalTypeConverter.toVLType( + ((InternalTypeInfo) transformation.getOutputType()).toLogicalType()); + try { + Class tableSourceClazz = + Class.forName("org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource"); + Properties properties = + (Properties) ReflectUtils.getObjectField(tableSourceClazz, tableSource, "properties"); + List topics = + (List) ReflectUtils.getObjectField(tableSourceClazz, tableSource, "topics"); + DecodingFormat decodingFormat = + (DecodingFormat) + ReflectUtils.getObjectField(tableSourceClazz, tableSource, "valueDecodingFormat"); + String startupMode = + String.valueOf(ReflectUtils.getObjectField(tableSourceClazz, tableSource, "startupMode")); + String connectorId = "connector-kafka"; + String planId = PlanNodeIdGenerator.newId(); + String topic = topics.get(0); + String format = + decodingFormat.getClass().getName().contains("JsonFormatFactory") ? "json" : "raw"; + Map kafkaTableParameters = new HashMap(); + for (String key : properties.stringPropertyNames()) { + kafkaTableParameters.put(key, properties.getProperty(key)); + } + kafkaTableParameters.put("topic", topic); + kafkaTableParameters.put("format", format); + kafkaTableParameters.put( + "scan.startup.mode", + startupMode.equals("LATEST") + ? "latest-offsets" + : startupMode.equals("EARLIEST") ? "earliest-offsets" : "group-offsets"); + kafkaTableParameters.put("enable.auto.commit", checkpointEnabled ? "false" : "true"); + kafkaTableParameters.put( + "client.id", + properties.getProperty("client.id.prefix", connectorId) + "-" + UUID.randomUUID()); + KafkaTableHandle kafkaTableHandle = + new KafkaTableHandle(connectorId, topic, outputType, kafkaTableParameters); + KafkaConnectorSplit connectorSplit = + new KafkaConnectorSplit( + connectorId, + 0, + false, + kafkaTableParameters.get("bootstrap.servers"), + kafkaTableParameters.get("group.id"), + format, + Boolean.valueOf(kafkaTableParameters.getOrDefault("enable.auto.commit", "false")), + "latest", + List.of()); + TableScanNode kafkaScan = new TableScanNode(planId, outputType, kafkaTableHandle, List.of()); + GlutenStreamSource sourceOp = + new GlutenStreamSource( + new GlutenVectorSourceFunction( + new StatefulPlanNode(kafkaScan.getId(), kafkaScan), + Map.of(kafkaScan.getId(), outputType), + kafkaScan.getId(), + connectorSplit)); + SourceTransformation sourceTransformation = (SourceTransformation) transformation; + return new LegacySourceTransformation( + sourceTransformation.getName(), + sourceOp, + transformation.getOutputType(), + sourceTransformation.getParallelism(), + sourceTransformation.getBoundedness(), + false); + } catch (Exception e) { + throw new FlinkRuntimeException(e); + } + } + + @Override + public Transformation buildSink( + ReadableConfig config, Transformation transformation) { + throw new FlinkRuntimeException("Unimplemented method 'buildSink'"); + } +} diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java new file mode 100644 index 000000000000..07ce6089e6f1 --- /dev/null +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.velox; + +import org.apache.gluten.streaming.api.operators.GlutenStreamSource; +import org.apache.gluten.table.runtime.operators.GlutenVectorSourceFunction; +import org.apache.gluten.util.LogicalTypeConverter; +import org.apache.gluten.util.PlanNodeIdGenerator; +import org.apache.gluten.util.ReflectUtils; + +import io.github.zhztheplayer.velox4j.connector.NexmarkConnectorSplit; +import io.github.zhztheplayer.velox4j.connector.NexmarkTableHandle; +import io.github.zhztheplayer.velox4j.plan.PlanNode; +import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode; +import io.github.zhztheplayer.velox4j.plan.TableScanNode; +import io.github.zhztheplayer.velox4j.type.RowType; + +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.api.transformations.LegacySourceTransformation; +import org.apache.flink.streaming.api.transformations.SourceTransformation; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; + +import java.util.List; +import java.util.Map; + +public class NexmarkSourceFactory implements VeloxSourceSinkFactory { + + @SuppressWarnings("rawtypes") + @Override + public boolean match(Transformation transformation) { + if (transformation instanceof SourceTransformation) { + Class sourceClazz = ((SourceTransformation) transformation).getSource().getClass(); + return sourceClazz.getSimpleName().equals("NexmarkSource"); + } + return false; + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + @Override + public Transformation buildSource( + Transformation transformation, + ScanTableSource tableSource, + boolean checkpointEnabled) { + RowType outputType = + (RowType) + LogicalTypeConverter.toVLType( + ((InternalTypeInfo) transformation.getOutputType()).toLogicalType()); + Object nexmarkSource = ((SourceTransformation) transformation).getSource(); + String id = PlanNodeIdGenerator.newId(); + List nexmarkSourceSplits = + (List) + ReflectUtils.invokeObjectMethod( + nexmarkSource.getClass(), + nexmarkSource, + "getSplits", + new Class[] {int.class}, + new Object[] {transformation.getParallelism()}); + Object nexmarkSourceSplit = nexmarkSourceSplits.get(0); + Object generatorConfig = + ReflectUtils.getObjectField( + nexmarkSourceSplit.getClass(), nexmarkSourceSplit, "generatorConfig"); + Long maxEvents = + (Long) + ReflectUtils.getObjectField(generatorConfig.getClass(), generatorConfig, "maxEvents"); + PlanNode tableScan = + new TableScanNode(id, outputType, new NexmarkTableHandle("connector-nexmark"), List.of()); + GlutenStreamSource sourceOp = + new GlutenStreamSource( + new GlutenVectorSourceFunction( + new StatefulPlanNode(tableScan.getId(), tableScan), + Map.of(id, outputType), + id, + new NexmarkConnectorSplit( + "connector-nexmark", + maxEvents > Integer.MAX_VALUE ? Integer.MAX_VALUE : maxEvents.intValue()))); + return new LegacySourceTransformation( + transformation.getName(), + sourceOp, + transformation.getOutputType(), + transformation.getParallelism(), + ((SourceTransformation) transformation).getBoundedness(), + false); + } + + @Override + public Transformation buildSink( + ReadableConfig config, Transformation transformation) { + throw new UnsupportedOperationException("Unimplemented method 'buildSink'"); + } +} diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/VeloxSinkBuilder.java b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PrintSinkFactory.java similarity index 86% rename from gluten-flink/planner/src/main/java/org/apache/gluten/velox/VeloxSinkBuilder.java rename to gluten-flink/planner/src/main/java/org/apache/gluten/velox/PrintSinkFactory.java index f27dec0a44af..2b6554f63d99 100644 --- a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/VeloxSinkBuilder.java +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PrintSinkFactory.java @@ -36,6 +36,8 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.transformations.LegacySinkTransformation; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.operators.sink.SinkOperator; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.util.FlinkRuntimeException; @@ -43,9 +45,11 @@ import java.util.List; import java.util.Map; -public class VeloxSinkBuilder { +public class PrintSinkFactory implements VeloxSourceSinkFactory { - public static Transformation build(ReadableConfig config, Transformation transformation) { + @SuppressWarnings("rawtypes") + @Override + public boolean match(Transformation transformation) { if (transformation instanceof LegacySinkTransformation) { SimpleOperatorFactory operatorFactory = (SimpleOperatorFactory) ((LegacySinkTransformation) transformation).getOperatorFactory(); @@ -56,14 +60,23 @@ public static Transformation build(ReadableConfig config, Transformation transfo .getClass() .getSimpleName() .equals("RowDataPrintFunction")) { - return buildPrintSink(config, (LegacySinkTransformation) transformation); + return true; } } - return transformation; + return false; } - private static LegacySinkTransformation buildPrintSink( - ReadableConfig config, LegacySinkTransformation transformation) { + @Override + public Transformation buildSource( + Transformation transformation, + ScanTableSource tableSource, + boolean checkpointEnabled) { + throw new FlinkRuntimeException("Unimplemented method 'buildSource'"); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + @Override + public Transformation buildSink(ReadableConfig config, Transformation transformation) { Transformation inputTrans = (Transformation) transformation.getInputs().get(0); InternalTypeInfo inputTypeInfo = (InternalTypeInfo) inputTrans.getOutputType(); String logDir = config.get(CoreOptions.FLINK_LOG_DIR); diff --git a/gluten-flink/planner/src/main/resources/META-INF/services/org.apache.gluten.velox.VeloxSourceSinkFactory b/gluten-flink/planner/src/main/resources/META-INF/services/org.apache.gluten.velox.VeloxSourceSinkFactory new file mode 100644 index 000000000000..9d7623b7ec72 --- /dev/null +++ b/gluten-flink/planner/src/main/resources/META-INF/services/org.apache.gluten.velox.VeloxSourceSinkFactory @@ -0,0 +1,4 @@ +org.apache.gluten.velox.FromElementsSourceFactory +org.apache.gluten.velox.KafkaSourceSinkFactory +org.apache.gluten.velox.PrintSinkFactory +org.apache.gluten.velox.NexmarkSourceFactory diff --git a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java b/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java deleted file mode 100644 index 5e7dd62f0df9..000000000000 --- a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.runtime.translators; - -import org.apache.gluten.streaming.api.operators.GlutenStreamSource; -import org.apache.gluten.table.runtime.operators.GlutenVectorSourceFunction; -import org.apache.gluten.util.LogicalTypeConverter; -import org.apache.gluten.util.PlanNodeIdGenerator; -import org.apache.gluten.util.ReflectUtils; - -import io.github.zhztheplayer.velox4j.connector.NexmarkConnectorSplit; -import io.github.zhztheplayer.velox4j.connector.NexmarkTableHandle; -import io.github.zhztheplayer.velox4j.plan.PlanNode; -import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode; -import io.github.zhztheplayer.velox4j.plan.TableScanNode; -import io.github.zhztheplayer.velox4j.type.RowType; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.connector.source.SourceSplit; -import org.apache.flink.streaming.api.graph.SimpleTransformationTranslator; -import org.apache.flink.streaming.api.graph.StreamGraph; -import org.apache.flink.streaming.api.graph.TransformationTranslator; -import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; -import org.apache.flink.streaming.api.operators.SourceOperatorFactory; -import org.apache.flink.streaming.api.operators.StreamOperatorFactory; -import org.apache.flink.streaming.api.transformations.SourceTransformation; -import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; - -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * A {@link TransformationTranslator} for the {@link SourceTransformation}. - * - * @param The type of the elements that this source produces. - */ -@Internal -public class SourceTransformationTranslator - extends SimpleTransformationTranslator> { - - @Override - protected Collection translateForBatchInternal( - final SourceTransformation transformation, final Context context) { - - return translateInternal( - transformation, context, false /* don't emit progressive watermarks */); - } - - @Override - protected Collection translateForStreamingInternal( - final SourceTransformation transformation, final Context context) { - - return translateInternal(transformation, context, true /* emit progressive watermarks */); - } - - private Collection translateInternal( - final SourceTransformation transformation, - final Context context, - boolean emitProgressiveWatermarks) { - checkNotNull(transformation); - checkNotNull(context); - - final StreamGraph streamGraph = context.getStreamGraph(); - final String slotSharingGroup = context.getSlotSharingGroup(); - final int transformationId = transformation.getId(); - final ExecutionConfig executionConfig = streamGraph.getExecutionConfig(); - - // --- Begin Gluten-specific code changes --- - Class sourceClazz = transformation.getSource().getClass(); - if (sourceClazz.getSimpleName().equals("NexmarkSource")) { - RowType outputType = - (RowType) - LogicalTypeConverter.toVLType( - ((InternalTypeInfo) transformation.getOutputType()).toLogicalType()); - String id = PlanNodeIdGenerator.newId(); - Object nexmarkSource = transformation.getSource(); - List nexmarkSourceSplits = - (List) - ReflectUtils.invokeObjectMethod( - sourceClazz, - nexmarkSource, - "getSplits", - new Class[] {int.class}, - new Object[] {transformation.getParallelism()}); - Object nexmarkSourceSplit = nexmarkSourceSplits.get(0); - Object generatorConfig = - ReflectUtils.getObjectField( - nexmarkSourceSplit.getClass(), nexmarkSourceSplit, "generatorConfig"); - Long maxEvents = - (Long) - ReflectUtils.getObjectField(generatorConfig.getClass(), generatorConfig, "maxEvents"); - PlanNode tableScan = - new TableScanNode(id, outputType, new NexmarkTableHandle("connector-nexmark"), List.of()); - StreamOperatorFactory operatorFactory = - SimpleOperatorFactory.of( - new GlutenStreamSource( - new GlutenVectorSourceFunction( - new StatefulPlanNode(tableScan.getId(), tableScan), - Map.of(id, outputType), - id, - new NexmarkConnectorSplit( - "connector-nexmark", - maxEvents > Integer.MAX_VALUE - ? Integer.MAX_VALUE - : maxEvents.intValue())))); - streamGraph.addLegacySource( - transformationId, - slotSharingGroup, - transformation.getCoLocationGroupKey(), - operatorFactory, - null, - transformation.getOutputType(), - "Source: " + transformation.getName()); - } else { - SourceOperatorFactory operatorFactory = - new SourceOperatorFactory<>( - transformation.getSource(), - transformation.getWatermarkStrategy(), - emitProgressiveWatermarks); - - operatorFactory.setChainingStrategy(transformation.getChainingStrategy()); - operatorFactory.setCoordinatorListeningID(transformation.getCoordinatorListeningID()); - - streamGraph.addSource( - transformationId, - slotSharingGroup, - transformation.getCoLocationGroupKey(), - operatorFactory, - null, - transformation.getOutputType(), - "Source: " + transformation.getName()); - } - // --- End Gluten-specific code changes --- - - final int parallelism = - transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT - ? transformation.getParallelism() - : executionConfig.getParallelism(); - - streamGraph.setParallelism( - transformationId, parallelism, transformation.isParallelismConfigured()); - streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism()); - - streamGraph.setSupportsConcurrentExecutionAttempts( - transformationId, transformation.isSupportsConcurrentExecutionAttempts()); - - return Collections.singleton(transformationId); - } -} diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/velox/VeloxSourceSinkFactory.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/velox/VeloxSourceSinkFactory.java new file mode 100644 index 000000000000..1946d1b90bd9 --- /dev/null +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/velox/VeloxSourceSinkFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.velox; + +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.HashSet; +import java.util.ServiceLoader; +import java.util.Set; + +public interface VeloxSourceSinkFactory { + + /** Match the conditions to determine whether the operator can be offloaded to velox. */ + boolean match(Transformation transformation); + + /** Build source transformation that offload the operator to velox. */ + Transformation buildSource( + Transformation transformation, + ScanTableSource tableSource, + boolean checkpointEnabled); + + /** Build sink transformation that offload the operator to velox. */ + Transformation buildSink(ReadableConfig config, Transformation transformation); + + /** Choose the matched source/sink factory by given transformation. */ + static VeloxSourceSinkFactory getFactory(Transformation transformation) { + ServiceLoader factories = + ServiceLoader.load(VeloxSourceSinkFactory.class); + Set factoryNames = new HashSet<>(); + for (VeloxSourceSinkFactory factory : factories) { + factoryNames.add(factory.getClass().getName()); + if (factory.match(transformation)) { + return factory; + } + } + throw new FlinkRuntimeException( + "Not find implemented factory to build velox transformation, available factories:" + + factoryNames); + } +} diff --git a/gluten-flink/ut/pom.xml b/gluten-flink/ut/pom.xml index 3583f7823fce..74311055a603 100644 --- a/gluten-flink/ut/pom.xml +++ b/gluten-flink/ut/pom.xml @@ -152,6 +152,36 @@ tests test + + com.salesforce.kafka.test + kafka-junit5 + 3.2.5 + test + + + org.apache.kafka + kafka_2.12 + 3.4.0 + test + + + org.apache.flink + flink-connector-kafka + 3.3.0-1.19 + test + + + org.apache.flink + flink-connector-base + ${flink.version} + test + + + org.apache.flink + flink-json + ${flink.version} + test + diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java index 5454b2939647..c527b5f28ad7 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java @@ -18,13 +18,18 @@ import org.apache.gluten.table.runtime.stream.common.Velox4jEnvironment; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import com.salesforce.kafka.test.junit5.SharedKafkaTestResource; +import com.salesforce.kafka.test.listeners.PlainListener; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,9 +66,30 @@ public class NexmarkTest { put("PERSON_PROPORTION", "1"); put("AUCTION_PROPORTION", "3"); put("BID_PROPORTION", "46"); + put("NEXMARK_TABLE", "datagen"); } }; + private static final int KAFKA_PORT = 9092; + private static String topicName = "nexmark"; + + @RegisterExtension + public static final SharedKafkaTestResource sharedKafkaTestResource = + new SharedKafkaTestResource() + .withBrokers(1) + .registerListener(new PlainListener().onPorts(KAFKA_PORT)); + + private static final Map KAFKA_VARIABLES = + new HashMap<>() { + { + put("BOOTSTRAP_SERVERS", "localhost:9092"); + put("NEXMARK_TABLE", "kafka"); + } + }; + + private static final List VIEWS = List.of("person", "auction", "bid", "B"); + private static final List FUNCTIONS = List.of("count_char"); + private static StreamTableEnvironment tEnv; @BeforeAll @@ -76,31 +102,49 @@ static void setup() { EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build(); tEnv = StreamTableEnvironment.create(env, settings); - - setupNexmarkEnvironment(tEnv); } @Test - void testAllNexmarkQueries() throws ExecutionException, InterruptedException, TimeoutException { + void testAllNexmarkSourceQueries() + throws ExecutionException, InterruptedException, TimeoutException { + setupNexmarkEnvironment(tEnv, "ddl_gen.sql", NEXMARK_VARIABLES); List queryFiles = getQueries(); assertThat(queryFiles).isNotEmpty(); + LOG.warn("Found {} Nexmark query files: {}", queryFiles.size(), queryFiles); + for (String queryFile : queryFiles) { + LOG.warn("Executing nextmark query from file: {}", queryFile); + executeQuery(tEnv, queryFile, false); + } + clearEnvironment(tEnv); + } + + @Test + void testAllKafkaSourceQueries() + throws ExecutionException, InterruptedException, TimeoutException { + sharedKafkaTestResource.getKafkaTestUtils().createTopic(topicName, 1, (short) 1); + setupNexmarkEnvironment(tEnv, "ddl_kafka.sql", KAFKA_VARIABLES); + List queryFiles = getQueries(); + assertThat(queryFiles).isNotEmpty(); LOG.warn("Found {} Nexmark query files: {}", queryFiles.size(), queryFiles); for (String queryFile : queryFiles) { - LOG.warn("Executing query from file: {}", queryFile); - executeQuery(tEnv, queryFile); + LOG.warn("Executing kafka query from file:{}", queryFile); + executeQuery(tEnv, queryFile, true); } + clearEnvironment(tEnv); } - private static void setupNexmarkEnvironment(StreamTableEnvironment tEnv) { - String createNexmarkSource = readSqlFromFile(NEXMARK_RESOURCE_DIR + "/ddl_gen.sql"); - createNexmarkSource = replaceVariables(createNexmarkSource, NEXMARK_VARIABLES); + private static void setupNexmarkEnvironment( + StreamTableEnvironment tEnv, String sourceFileName, Map variables) { + String createNexmarkSource = readSqlFromFile(NEXMARK_RESOURCE_DIR + "/" + sourceFileName); + createNexmarkSource = replaceVariables(createNexmarkSource, variables); tEnv.executeSql(createNexmarkSource); String createTableView = readSqlFromFile(NEXMARK_RESOURCE_DIR + "/ddl_views.sql"); String[] sqlTableView = createTableView.split(";"); for (String sql : sqlTableView) { + sql = replaceVariables(sql, variables); String trimmedSql = sql.trim(); if (!trimmedSql.isEmpty()) { tEnv.executeSql(trimmedSql); @@ -116,7 +160,23 @@ private static String replaceVariables(String sql, Map variables return result; } - private void executeQuery(StreamTableEnvironment tEnv, String queryFileName) + private static void clearEnvironment(StreamTableEnvironment tEnv) { + for (int i = 0; i <= 22; ++i) { + String tableName = "nexmark_q" + i; + String sql = String.format("drop table if exists %s", tableName); + tEnv.executeSql(sql); + } + for (String view : VIEWS) { + String sql = String.format("drop view if exists %s", view); + tEnv.executeSql(sql); + } + for (String func : FUNCTIONS) { + String sql = String.format("drop function if exists %s", func); + tEnv.executeSql(sql); + } + } + + private void executeQuery(StreamTableEnvironment tEnv, String queryFileName, boolean kafkaSource) throws ExecutionException, InterruptedException, TimeoutException { String queryContent = readSqlFromFile(NEXMARK_RESOURCE_DIR + "/" + queryFileName); @@ -136,7 +196,11 @@ private void executeQuery(StreamTableEnvironment tEnv, String queryFileName) String insertQuery = sqlStatements[sqlStatements.length - 2].trim(); if (!insertQuery.isEmpty()) { TableResult insertResult = tEnv.executeSql(insertQuery); - waitForJobCompletion(insertResult, 30000); + if (kafkaSource) { + assertThat(checkJobRunningStatus(insertResult, 30000) == true); + } else { + waitForJobCompletion(insertResult, 30000); + } } assertTrue(sqlStatements[sqlStatements.length - 1].trim().isEmpty()); } @@ -147,6 +211,24 @@ private void waitForJobCompletion(TableResult result, long timeoutMs) result.getJobClient().get().getJobExecutionResult().get(timeoutMs, TimeUnit.MILLISECONDS); } + private boolean checkJobRunningStatus(TableResult result, long timeoutMs) + throws InterruptedException { + long startTime = System.currentTimeMillis(); + assertTrue(result.getJobClient().isPresent()); + JobClient jobClient = result.getJobClient().get(); + while (System.currentTimeMillis() < startTime + timeoutMs) { + if (jobClient.getJobStatus().complete(JobStatus.RUNNING)) { + jobClient.cancel(); + return true; + } else { + Thread.sleep(1000); + } + } + LOG.warn("Job not running in " + timeoutMs + " millseconds."); + jobClient.cancel(); + return false; + } + private List getQueries() { URL resourceUrl = getClass().getClassLoader().getResource(NEXMARK_RESOURCE_DIR); diff --git a/gluten-flink/ut/src/test/resources/nexmark/ddl_kafka.sql b/gluten-flink/ut/src/test/resources/nexmark/ddl_kafka.sql new file mode 100644 index 000000000000..28affcdd348b --- /dev/null +++ b/gluten-flink/ut/src/test/resources/nexmark/ddl_kafka.sql @@ -0,0 +1,46 @@ +CREATE TABLE kafka ( + event_type int, + person ROW< + id BIGINT, + name VARCHAR, + emailAddress VARCHAR, + creditCard VARCHAR, + city VARCHAR, + state VARCHAR, + `dateTime` TIMESTAMP(3), + extra VARCHAR>, + auction ROW< + id BIGINT, + itemName VARCHAR, + description VARCHAR, + initialBid BIGINT, + reserve BIGINT, + `dateTime` TIMESTAMP(3), + expires TIMESTAMP(3), + seller BIGINT, + category BIGINT, + extra VARCHAR>, + bid ROW< + auction BIGINT, + bidder BIGINT, + price BIGINT, + channel VARCHAR, + url VARCHAR, + `dateTime` TIMESTAMP(3), + extra VARCHAR>, + `dateTime` AS + CASE + WHEN event_type = 0 THEN person.`dateTime` + WHEN event_type = 1 THEN auction.`dateTime` + ELSE bid.`dateTime` + END, + WATERMARK FOR `dateTime` AS `dateTime` - INTERVAL '4' SECOND +) WITH ( + 'connector' = 'kafka', + 'topic' = 'nexmark', + 'properties.bootstrap.servers' = '${BOOTSTRAP_SERVERS}', + 'properties.group.id' = 'nexmark', + 'scan.startup.mode' = 'earliest-offset', + 'sink.partitioner' = 'round-robin', + 'format' = 'json' +); \ No newline at end of file diff --git a/gluten-flink/ut/src/test/resources/nexmark/ddl_views.sql b/gluten-flink/ut/src/test/resources/nexmark/ddl_views.sql index 54902b4b44a0..36f368dd92db 100644 --- a/gluten-flink/ut/src/test/resources/nexmark/ddl_views.sql +++ b/gluten-flink/ut/src/test/resources/nexmark/ddl_views.sql @@ -8,7 +8,7 @@ SELECT person.state, `dateTime`, person.extra -FROM datagen WHERE event_type = 0; +FROM ${NEXMARK_TABLE} WHERE event_type = 0; CREATE VIEW auction AS SELECT @@ -22,7 +22,7 @@ SELECT auction.seller, auction.category, auction.extra -FROM datagen WHERE event_type = 1; +FROM ${NEXMARK_TABLE} WHERE event_type = 1; CREATE VIEW bid AS SELECT @@ -33,4 +33,4 @@ SELECT bid.url, `dateTime`, bid.extra -FROM datagen WHERE event_type = 2; +FROM ${NEXMARK_TABLE} WHERE event_type = 2; From 992023a90f9f997ab387f1e24971d7e9fbda42a6 Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Tue, 9 Dec 2025 03:54:23 +0000 Subject: [PATCH 5/7] fix reviews --- .../nodes/exec/common/CommonExecSink.java | 3 +- .../stream/StreamExecTableSourceScan.java | 3 +- .../velox/FromElementsSourceFactory.java | 10 ++-- .../gluten/velox/KafkaSourceSinkFactory.java | 4 +- .../gluten/velox/NexmarkSourceFactory.java | 4 +- .../apache/gluten/velox/PrintSinkFactory.java | 4 +- .../gluten/velox/VeloxSourceSinkFactory.java | 57 ++++++++++++++----- .../runtime/stream/custom/NexmarkTest.java | 4 +- .../src/test/resources/nexmark/ddl_kafka.sql | 2 +- 9 files changed, 59 insertions(+), 32 deletions(-) diff --git a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java index efd23619688e..a774da124927 100644 --- a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java +++ b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java @@ -470,8 +470,7 @@ public Optional generateUid(String name) { Transformation sinkTransformation = createSinkFunctionTransformation( sinkFunction, env, inputTransform, rowtimeFieldIndex, sinkMeta, sinkParallelism); - return VeloxSourceSinkFactory.getFactory(sinkTransformation) - .buildSink(env.getConfiguration(), sinkTransformation); + return VeloxSourceSinkFactory.buildSink(env.getConfiguration(), sinkTransformation); // --- End Gluten-specific code changes --- } else if (runtimeProvider instanceof OutputFormatProvider) { OutputFormat outputFormat = diff --git a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java index 0d7f7533f2e0..23143d60ace7 100644 --- a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java +++ b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java @@ -106,8 +106,7 @@ protected Transformation translateToPlanInternal( .getScanTableSource( planner.getFlinkContext(), ShortcutUtils.unwrapTypeFactory(planner)); Transformation sourceTransformation = super.translateToPlanInternal(planner, config); - VeloxSourceSinkFactory factory = VeloxSourceSinkFactory.getFactory(sourceTransformation); - return factory.buildSource( + return VeloxSourceSinkFactory.buildSource( sourceTransformation, tableSource, planner.getExecEnv().getCheckpointConfig().isCheckpointingEnabled()); diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/FromElementsSourceFactory.java b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/FromElementsSourceFactory.java index 5a5ad79aeb95..70d6dcd87e7a 100644 --- a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/FromElementsSourceFactory.java +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/FromElementsSourceFactory.java @@ -29,6 +29,7 @@ import org.apache.flink.api.dag.Transformation; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.api.functions.source.FromElementsFunction; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.transformations.LegacySourceTransformation; import org.apache.flink.table.connector.source.ScanTableSource; @@ -49,17 +50,14 @@ public class FromElementsSourceFactory implements VeloxSourceSinkFactory { public boolean match(Transformation transformation) { if (transformation instanceof LegacySourceTransformation) { StreamSource source = ((LegacySourceTransformation) transformation).getOperator(); - String sourceFunctionName = source.getUserFunction().getClass().getSimpleName(); - if (sourceFunctionName.equals("FromElementsFunction")) { - return true; - } + return source.getUserFunction() instanceof FromElementsFunction; } return false; } @SuppressWarnings({"rawtypes", "unchecked"}) @Override - public Transformation buildSource( + public Transformation buildVeloxSource( Transformation transformation, ScanTableSource tableSource, boolean checkpointEnabled) { @@ -115,7 +113,7 @@ public Transformation buildSource( } @Override - public Transformation buildSink( + public Transformation buildVeloxSink( ReadableConfig config, Transformation transformation) { throw new FlinkRuntimeException("Unimplemented method 'buildSink'"); } diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/KafkaSourceSinkFactory.java b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/KafkaSourceSinkFactory.java index c9b20d4c33e0..f840cdce805c 100644 --- a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/KafkaSourceSinkFactory.java +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/KafkaSourceSinkFactory.java @@ -59,7 +59,7 @@ public boolean match(Transformation transformation) { @SuppressWarnings({"unchecked", "rawtypes"}) @Override - public Transformation buildSource( + public Transformation buildVeloxSource( Transformation transformation, ScanTableSource tableSource, boolean checkpointEnabled) { @@ -134,7 +134,7 @@ public Transformation buildSource( } @Override - public Transformation buildSink( + public Transformation buildVeloxSink( ReadableConfig config, Transformation transformation) { throw new FlinkRuntimeException("Unimplemented method 'buildSink'"); } diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java index 07ce6089e6f1..c7598c81f0af 100644 --- a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java @@ -54,7 +54,7 @@ public boolean match(Transformation transformation) { @SuppressWarnings({"rawtypes", "unchecked"}) @Override - public Transformation buildSource( + public Transformation buildVeloxSource( Transformation transformation, ScanTableSource tableSource, boolean checkpointEnabled) { @@ -100,7 +100,7 @@ public Transformation buildSource( } @Override - public Transformation buildSink( + public Transformation buildVeloxSink( ReadableConfig config, Transformation transformation) { throw new UnsupportedOperationException("Unimplemented method 'buildSink'"); } diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PrintSinkFactory.java b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PrintSinkFactory.java index 2b6554f63d99..b6415c84397b 100644 --- a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PrintSinkFactory.java +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PrintSinkFactory.java @@ -67,7 +67,7 @@ public boolean match(Transformation transformation) { } @Override - public Transformation buildSource( + public Transformation buildVeloxSource( Transformation transformation, ScanTableSource tableSource, boolean checkpointEnabled) { @@ -76,7 +76,7 @@ public Transformation buildSource( @SuppressWarnings({"rawtypes", "unchecked"}) @Override - public Transformation buildSink(ReadableConfig config, Transformation transformation) { + public Transformation buildVeloxSink(ReadableConfig config, Transformation transformation) { Transformation inputTrans = (Transformation) transformation.getInputs().get(0); InternalTypeInfo inputTypeInfo = (InternalTypeInfo) inputTrans.getOutputType(); String logDir = config.get(CoreOptions.FLINK_LOG_DIR); diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/velox/VeloxSourceSinkFactory.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/velox/VeloxSourceSinkFactory.java index 1946d1b90bd9..07e8593e9db7 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/velox/VeloxSourceSinkFactory.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/velox/VeloxSourceSinkFactory.java @@ -20,39 +20,70 @@ import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.data.RowData; -import org.apache.flink.util.FlinkRuntimeException; -import java.util.HashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; import java.util.ServiceLoader; -import java.util.Set; public interface VeloxSourceSinkFactory { - /** Match the conditions to determine whether the operator can be offloaded to velox. */ + static final Logger LOG = LoggerFactory.getLogger(VeloxSourceSinkFactory.class); + + /** Match the conditions to determine if the operator can be offloaded to velox. */ boolean match(Transformation transformation); /** Build source transformation that offload the operator to velox. */ - Transformation buildSource( + Transformation buildVeloxSource( Transformation transformation, ScanTableSource tableSource, boolean checkpointEnabled); /** Build sink transformation that offload the operator to velox. */ - Transformation buildSink(ReadableConfig config, Transformation transformation); + Transformation buildVeloxSink( + ReadableConfig config, Transformation transformation); /** Choose the matched source/sink factory by given transformation. */ - static VeloxSourceSinkFactory getFactory(Transformation transformation) { + private static Optional getFactory( + Transformation transformation) { ServiceLoader factories = ServiceLoader.load(VeloxSourceSinkFactory.class); - Set factoryNames = new HashSet<>(); for (VeloxSourceSinkFactory factory : factories) { - factoryNames.add(factory.getClass().getName()); if (factory.match(transformation)) { - return factory; + return Optional.of(factory); } } - throw new FlinkRuntimeException( - "Not find implemented factory to build velox transformation, available factories:" - + factoryNames); + return Optional.empty(); + } + + /** Build Velox source, or fallback to flink orignal source . */ + static Transformation buildSource( + Transformation transformation, + ScanTableSource tableSource, + boolean checkpointEnabled) { + Optional factory = getFactory(transformation); + if (factory.isEmpty()) { + LOG.warn( + "Not find matched factory to build velox source transformation, and we will use flink original transformation {} instead.", + transformation.getClass().getName()); + return transformation; + } else { + return factory.get().buildVeloxSource(transformation, tableSource, checkpointEnabled); + } + } + + /** Build Velox sink, or fallback to flink original sink. */ + static Transformation buildSink( + ReadableConfig config, Transformation transformation) { + Optional factory = getFactory(transformation); + if (factory.isEmpty()) { + LOG.warn( + "Not find matched factory to build velox sink transformation, and we will use flink original transformation {} instead.", + transformation.getClass().getName()); + return transformation; + } else { + return factory.get().buildVeloxSink(config, transformation); + } } } diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java index c527b5f28ad7..476c4cba4d01 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java @@ -74,7 +74,7 @@ public class NexmarkTest { private static String topicName = "nexmark"; @RegisterExtension - public static final SharedKafkaTestResource sharedKafkaTestResource = + public static final SharedKafkaTestResource kafkaInstance = new SharedKafkaTestResource() .withBrokers(1) .registerListener(new PlainListener().onPorts(KAFKA_PORT)); @@ -122,7 +122,7 @@ void testAllNexmarkSourceQueries() @Test void testAllKafkaSourceQueries() throws ExecutionException, InterruptedException, TimeoutException { - sharedKafkaTestResource.getKafkaTestUtils().createTopic(topicName, 1, (short) 1); + kafkaInstance.getKafkaTestUtils().createTopic(topicName, 1, (short) 1); setupNexmarkEnvironment(tEnv, "ddl_kafka.sql", KAFKA_VARIABLES); List queryFiles = getQueries(); assertThat(queryFiles).isNotEmpty(); diff --git a/gluten-flink/ut/src/test/resources/nexmark/ddl_kafka.sql b/gluten-flink/ut/src/test/resources/nexmark/ddl_kafka.sql index 28affcdd348b..27757eaeafcd 100644 --- a/gluten-flink/ut/src/test/resources/nexmark/ddl_kafka.sql +++ b/gluten-flink/ut/src/test/resources/nexmark/ddl_kafka.sql @@ -43,4 +43,4 @@ CREATE TABLE kafka ( 'scan.startup.mode' = 'earliest-offset', 'sink.partitioner' = 'round-robin', 'format' = 'json' -); \ No newline at end of file +); From 9b5739abc4a654a2ebb93d9e9acc33add5dfc33b Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Fri, 12 Dec 2025 02:18:51 +0000 Subject: [PATCH 6/7] optimize code --- .../nodes/exec/common/CommonExecSink.java | 4 +++- .../stream/StreamExecTableSourceScan.java | 8 ++++++-- .../velox/FromElementsSourceFactory.java | 9 ++++----- .../gluten/velox/KafkaSourceSinkFactory.java | 10 +++++----- .../gluten/velox/NexmarkSourceFactory.java | 8 ++------ .../apache/gluten/velox/PrintSinkFactory.java | 11 +++++------ .../gluten/velox/VeloxSourceSinkFactory.java | 19 +++++++------------ 7 files changed, 32 insertions(+), 37 deletions(-) diff --git a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java index a774da124927..a1b0e0e7b796 100644 --- a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java +++ b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes; @@ -470,7 +471,8 @@ public Optional generateUid(String name) { Transformation sinkTransformation = createSinkFunctionTransformation( sinkFunction, env, inputTransform, rowtimeFieldIndex, sinkMeta, sinkParallelism); - return VeloxSourceSinkFactory.buildSink(env.getConfiguration(), sinkTransformation); + return VeloxSourceSinkFactory.buildSink( + sinkTransformation, Map.of(Configuration.class.getName(), env.getConfiguration())); // --- End Gluten-specific code changes --- } else if (runtimeProvider instanceof OutputFormatProvider) { OutputFormat outputFormat = diff --git a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java index 23143d60ace7..90b3981f0f84 100644 --- a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java +++ b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java @@ -40,6 +40,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import java.util.Collections; +import java.util.Map; /** * Stream {@link ExecNode} to read data from an external source defined by a {@link @@ -108,8 +109,11 @@ protected Transformation translateToPlanInternal( Transformation sourceTransformation = super.translateToPlanInternal(planner, config); return VeloxSourceSinkFactory.buildSource( sourceTransformation, - tableSource, - planner.getExecEnv().getCheckpointConfig().isCheckpointingEnabled()); + Map.of( + ScanTableSource.class.getName(), + tableSource, + "checkpoint.enabled", + planner.getExecEnv().getCheckpointConfig().isCheckpointingEnabled())); // --- End Gluten-specific code changes --- } } diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/FromElementsSourceFactory.java b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/FromElementsSourceFactory.java index 70d6dcd87e7a..da31edeccd14 100644 --- a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/FromElementsSourceFactory.java +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/FromElementsSourceFactory.java @@ -28,7 +28,6 @@ import io.github.zhztheplayer.velox4j.plan.TableScanNode; import org.apache.flink.api.dag.Transformation; -import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.functions.source.FromElementsFunction; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.transformations.LegacySourceTransformation; @@ -58,12 +57,12 @@ public boolean match(Transformation transformation) { @SuppressWarnings({"rawtypes", "unchecked"}) @Override public Transformation buildVeloxSource( - Transformation transformation, - ScanTableSource tableSource, - boolean checkpointEnabled) { + Transformation transformation, Map parameters) { LegacySourceTransformation sourceTransformation = (LegacySourceTransformation) transformation; try { + ScanTableSource tableSource = + (ScanTableSource) parameters.get(ScanTableSource.class.getName()); Class tableSourceClazz = Class.forName( "org.apache.flink.table.planner.factories.TestValuesTableFactory$TestValuesScanTableSourceWithoutProjectionPushDown"); @@ -114,7 +113,7 @@ public Transformation buildVeloxSource( @Override public Transformation buildVeloxSink( - ReadableConfig config, Transformation transformation) { + Transformation transformation, Map parameters) { throw new FlinkRuntimeException("Unimplemented method 'buildSink'"); } } diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/KafkaSourceSinkFactory.java b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/KafkaSourceSinkFactory.java index f840cdce805c..54e2b8b4361a 100644 --- a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/KafkaSourceSinkFactory.java +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/KafkaSourceSinkFactory.java @@ -30,7 +30,6 @@ import org.apache.flink.api.connector.source.Source; import org.apache.flink.api.dag.Transformation; -import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.transformations.LegacySourceTransformation; import org.apache.flink.streaming.api.transformations.SourceTransformation; import org.apache.flink.table.connector.format.DecodingFormat; @@ -60,14 +59,15 @@ public boolean match(Transformation transformation) { @SuppressWarnings({"unchecked", "rawtypes"}) @Override public Transformation buildVeloxSource( - Transformation transformation, - ScanTableSource tableSource, - boolean checkpointEnabled) { + Transformation transformation, Map parameters) { RowType outputType = (RowType) LogicalTypeConverter.toVLType( ((InternalTypeInfo) transformation.getOutputType()).toLogicalType()); try { + ScanTableSource tableSource = + (ScanTableSource) parameters.get(ScanTableSource.class.getName()); + boolean checkpointEnabled = (Boolean) parameters.get("checkpoint.enabled"); Class tableSourceClazz = Class.forName("org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource"); Properties properties = @@ -135,7 +135,7 @@ public Transformation buildVeloxSource( @Override public Transformation buildVeloxSink( - ReadableConfig config, Transformation transformation) { + Transformation transformation, Map parameters) { throw new FlinkRuntimeException("Unimplemented method 'buildSink'"); } } diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java index c7598c81f0af..736f3cc3c72b 100644 --- a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java @@ -30,10 +30,8 @@ import io.github.zhztheplayer.velox4j.type.RowType; import org.apache.flink.api.dag.Transformation; -import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.transformations.LegacySourceTransformation; import org.apache.flink.streaming.api.transformations.SourceTransformation; -import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; @@ -55,9 +53,7 @@ public boolean match(Transformation transformation) { @SuppressWarnings({"rawtypes", "unchecked"}) @Override public Transformation buildVeloxSource( - Transformation transformation, - ScanTableSource tableSource, - boolean checkpointEnabled) { + Transformation transformation, Map parameters) { RowType outputType = (RowType) LogicalTypeConverter.toVLType( @@ -101,7 +97,7 @@ public Transformation buildVeloxSource( @Override public Transformation buildVeloxSink( - ReadableConfig config, Transformation transformation) { + Transformation transformation, Map parameters) { throw new UnsupportedOperationException("Unimplemented method 'buildSink'"); } } diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PrintSinkFactory.java b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PrintSinkFactory.java index b6415c84397b..b00a76a21f16 100644 --- a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PrintSinkFactory.java +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PrintSinkFactory.java @@ -31,12 +31,11 @@ import org.apache.flink.api.dag.Transformation; import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; -import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.transformations.LegacySinkTransformation; -import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.operators.sink.SinkOperator; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; @@ -68,17 +67,17 @@ public boolean match(Transformation transformation) { @Override public Transformation buildVeloxSource( - Transformation transformation, - ScanTableSource tableSource, - boolean checkpointEnabled) { + Transformation transformation, Map parameters) { throw new FlinkRuntimeException("Unimplemented method 'buildSource'"); } @SuppressWarnings({"rawtypes", "unchecked"}) @Override - public Transformation buildVeloxSink(ReadableConfig config, Transformation transformation) { + public Transformation buildVeloxSink( + Transformation transformation, Map parameters) { Transformation inputTrans = (Transformation) transformation.getInputs().get(0); InternalTypeInfo inputTypeInfo = (InternalTypeInfo) inputTrans.getOutputType(); + Configuration config = (Configuration) parameters.get(Configuration.class.getName()); String logDir = config.get(CoreOptions.FLINK_LOG_DIR); String printPath; if (logDir != null) { diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/velox/VeloxSourceSinkFactory.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/velox/VeloxSourceSinkFactory.java index 07e8593e9db7..aff1bb779557 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/velox/VeloxSourceSinkFactory.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/velox/VeloxSourceSinkFactory.java @@ -17,13 +17,12 @@ package org.apache.gluten.velox; import org.apache.flink.api.dag.Transformation; -import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.data.RowData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; import java.util.Optional; import java.util.ServiceLoader; @@ -36,13 +35,11 @@ public interface VeloxSourceSinkFactory { /** Build source transformation that offload the operator to velox. */ Transformation buildVeloxSource( - Transformation transformation, - ScanTableSource tableSource, - boolean checkpointEnabled); + Transformation transformation, Map parameters); /** Build sink transformation that offload the operator to velox. */ Transformation buildVeloxSink( - ReadableConfig config, Transformation transformation); + Transformation transformation, Map parameters); /** Choose the matched source/sink factory by given transformation. */ private static Optional getFactory( @@ -59,9 +56,7 @@ private static Optional getFactory( /** Build Velox source, or fallback to flink orignal source . */ static Transformation buildSource( - Transformation transformation, - ScanTableSource tableSource, - boolean checkpointEnabled) { + Transformation transformation, Map parameters) { Optional factory = getFactory(transformation); if (factory.isEmpty()) { LOG.warn( @@ -69,13 +64,13 @@ static Transformation buildSource( transformation.getClass().getName()); return transformation; } else { - return factory.get().buildVeloxSource(transformation, tableSource, checkpointEnabled); + return factory.get().buildVeloxSource(transformation, parameters); } } /** Build Velox sink, or fallback to flink original sink. */ static Transformation buildSink( - ReadableConfig config, Transformation transformation) { + Transformation transformation, Map parameters) { Optional factory = getFactory(transformation); if (factory.isEmpty()) { LOG.warn( @@ -83,7 +78,7 @@ static Transformation buildSink( transformation.getClass().getName()); return transformation; } else { - return factory.get().buildVeloxSink(config, transformation); + return factory.get().buildVeloxSink(transformation, parameters); } } } From ac8e16fbec2641fc2da41a1e4368c727a0f80d6b Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Wed, 24 Dec 2025 11:34:49 +0000 Subject: [PATCH 7/7] fix memory leak --- .../flink/client/StreamGraphTranslator.java | 12 ++--- .../api/operators/GlutenStreamSource.java | 27 +++++++++++ .../runtime/tasks/GlutenOutputCollector.java | 3 -- .../runtime/config/VeloxSessionConfig.java | 45 +++++++++++++++++++ .../GlutenVectorOneInputOperator.java | 2 + .../operators/GlutenVectorSourceFunction.java | 11 ++--- .../typeutils/GlutenRowVectorSerializer.java | 45 ++++++++----------- .../ut/src/test/resources/nexmark/q5.sql | 38 ---------------- .../ut/src/test/resources/nexmark/q7.sql | 21 --------- .../ut/src/test/resources/nexmark/q8.sql | 27 ----------- 10 files changed, 103 insertions(+), 128 deletions(-) create mode 100644 gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxSessionConfig.java delete mode 100644 gluten-flink/ut/src/test/resources/nexmark/q5.sql delete mode 100644 gluten-flink/ut/src/test/resources/nexmark/q7.sql delete mode 100644 gluten-flink/ut/src/test/resources/nexmark/q8.sql diff --git a/gluten-flink/runtime/src/main/java/org/apache/flink/client/StreamGraphTranslator.java b/gluten-flink/runtime/src/main/java/org/apache/flink/client/StreamGraphTranslator.java index db702d56053e..3fdbd4ad4aaf 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/flink/client/StreamGraphTranslator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/flink/client/StreamGraphTranslator.java @@ -132,7 +132,7 @@ private void buildGlutenChains(StreamConfig taskConfig, Map nodeToNonChainedOuts = new HashMap<>(outEdges.size()); taskConfig @@ -216,14 +216,16 @@ private void buildGlutenChains(StreamConfig taskConfig, Map implements WatermarkGaugeExposingOutput> { protected final Map>> outputs; - private final Random random = new XORShiftRandom(); private final WatermarkGauge watermarkGauge = new WatermarkGauge(); protected final Counter numRecordsOutForTask; diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxSessionConfig.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxSessionConfig.java new file mode 100644 index 000000000000..ee55a258654b --- /dev/null +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxSessionConfig.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.table.runtime.config; + +import io.github.zhztheplayer.velox4j.session.Session; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +public class VeloxSessionConfig implements Serializable { + private final Map sessions; + + private static VeloxSessionConfig config = new VeloxSessionConfig(); + + private VeloxSessionConfig() { + sessions = new HashMap<>(); + } + + public static VeloxSessionConfig getSessionConfig() { + return config; + } + + public void addSession(String id, Session session) { + sessions.put(id, session); + } + + public Session getSession(String id) { + return sessions.getOrDefault(id, null); + } +} diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java index e859e15ca37b..3b21d60e74ee 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java @@ -18,6 +18,7 @@ import org.apache.gluten.streaming.api.operators.GlutenOperator; import org.apache.gluten.table.runtime.config.VeloxQueryConfig; +import org.apache.gluten.table.runtime.config.VeloxSessionConfig; import io.github.zhztheplayer.velox4j.Velox4j; import io.github.zhztheplayer.velox4j.config.ConnectorConfig; @@ -82,6 +83,7 @@ public GlutenVectorOneInputOperator( void initGlutenTask() { memoryManager = MemoryManager.create(AllocationListener.NOOP); session = Velox4j.newSession(memoryManager); + VeloxSessionConfig.getSessionConfig().addSession(id, session); // add a mock input as velox not allow the source is empty. StatefulPlanNode mockInput = new StatefulPlanNode( diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java index 472bd0bfed0a..9cb09a14f7ed 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java @@ -33,7 +33,6 @@ import io.github.zhztheplayer.velox4j.stateful.StatefulElement; import io.github.zhztheplayer.velox4j.type.RowType; -import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -53,7 +52,7 @@ * instead of RowData to avoid data convert. */ public class GlutenVectorSourceFunction extends RichParallelSourceFunction - implements CheckpointedFunction, CheckpointListener { + implements CheckpointedFunction { private static final Logger LOG = LoggerFactory.getLogger(GlutenVectorSourceFunction.class); private final StatefulPlanNode planNode; @@ -105,7 +104,6 @@ public void open(Configuration parameters) throws Exception { new Query( planNode, VeloxQueryConfig.getConfig(getRuntimeContext()), ConnectorConfig.empty()); allocator = new RootAllocator(Long.MAX_VALUE); - task = session.queryOps().execute(query); task.addSplit(id, split); task.noMoreSplits(id); @@ -133,7 +131,6 @@ public void run(SourceContext sourceContext) throws Exception { } taskMetrics.updateMetrics(task, id); } - task.close(); session.close(); memoryManager.close(); @@ -170,13 +167,11 @@ public void initializeState(FunctionInitializationContext context) throws Except this.task.initializeState(0); } - @Override - public void notifyCheckpointComplete(long checkpointId) throws Exception { + public String[] notifyCheckpointComplete(long checkpointId) throws Exception { // TODO: notify velox - this.task.notifyCheckpointComplete(checkpointId); + return this.task.notifyCheckpointComplete(checkpointId); } - @Override public void notifyCheckpointAborted(long checkpointId) throws Exception { // TODO: notify velox this.task.notifyCheckpointAborted(checkpointId); diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/typeutils/GlutenRowVectorSerializer.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/typeutils/GlutenRowVectorSerializer.java index db17a47a2d59..6f340ad1e50b 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/typeutils/GlutenRowVectorSerializer.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/typeutils/GlutenRowVectorSerializer.java @@ -16,11 +16,10 @@ */ package org.apache.gluten.table.runtime.typeutils; -import io.github.zhztheplayer.velox4j.Velox4j; +import org.apache.gluten.streaming.api.operators.GlutenOperator; +import org.apache.gluten.table.runtime.config.VeloxSessionConfig; + import io.github.zhztheplayer.velox4j.data.RowVector; -import io.github.zhztheplayer.velox4j.memory.AllocationListener; -import io.github.zhztheplayer.velox4j.memory.MemoryManager; -import io.github.zhztheplayer.velox4j.session.Session; import io.github.zhztheplayer.velox4j.stateful.StatefulRecord; import io.github.zhztheplayer.velox4j.type.RowType; @@ -31,24 +30,23 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; -import java.io.Closeable; import java.io.IOException; /** Serializer for {@link RowVector}. */ @Internal -public class GlutenRowVectorSerializer extends TypeSerializer implements Closeable { +public class GlutenRowVectorSerializer extends TypeSerializer { private static final long serialVersionUID = 1L; private final RowType rowType; - private transient MemoryManager memoryManager; - private transient Session session; + private final GlutenOperator operator; - public GlutenRowVectorSerializer(RowType rowType) { + public GlutenRowVectorSerializer(RowType rowType, GlutenOperator operator) { this.rowType = rowType; + this.operator = operator; } @Override public TypeSerializer duplicate() { - return new GlutenRowVectorSerializer(rowType); + return new GlutenRowVectorSerializer(rowType, operator); } @Override @@ -65,14 +63,15 @@ public void serialize(StatefulRecord record, DataOutputView target) throws IOExc @Override public StatefulRecord deserialize(DataInputView source) throws IOException { - if (memoryManager == null) { - memoryManager = MemoryManager.create(AllocationListener.NOOP); - session = Velox4j.newSession(memoryManager); - } int len = source.readInt(); byte[] str = new byte[len]; source.readFully(str); - RowVector rowVector = session.baseVectorOps().deserializeOne(new String(str)).asRowVector(); + RowVector rowVector = + VeloxSessionConfig.getSessionConfig() + .getSession(operator.getId()) + .baseVectorOps() + .deserializeOne(new String(str)) + .asRowVector(); StatefulRecord record = new StatefulRecord(null, 0, 0, false, -1); record.setRowVector(rowVector); return record; @@ -131,15 +130,7 @@ public int getLength() { @Override public TypeSerializerSnapshot snapshotConfiguration() { - return new RowVectorSerializerSnapshot(rowType); - } - - @Override - public void close() { - if (memoryManager != null) { - memoryManager.close(); - session.close(); - } + return new RowVectorSerializerSnapshot(rowType, operator); } /** {@link TypeSerializerSnapshot} for Gluten RowVector.. */ @@ -148,14 +139,16 @@ public static final class RowVectorSerializerSnapshot private static final int CURRENT_VERSION = 1; private RowType rowType; + private GlutenOperator operator; @SuppressWarnings("unused") public RowVectorSerializerSnapshot() { // this constructor is used when restoring from a checkpoint/savepoint. } - RowVectorSerializerSnapshot(RowType rowType) { + RowVectorSerializerSnapshot(RowType rowType, GlutenOperator operator) { this.rowType = rowType; + this.operator = operator; } @Override @@ -172,7 +165,7 @@ public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCode @Override public GlutenRowVectorSerializer restoreSerializer() { - return new GlutenRowVectorSerializer(rowType); + return new GlutenRowVectorSerializer(rowType, operator); } @Override diff --git a/gluten-flink/ut/src/test/resources/nexmark/q5.sql b/gluten-flink/ut/src/test/resources/nexmark/q5.sql deleted file mode 100644 index 98954bacf201..000000000000 --- a/gluten-flink/ut/src/test/resources/nexmark/q5.sql +++ /dev/null @@ -1,38 +0,0 @@ -CREATE TABLE nexmark_q5 ( - auction BIGINT, - num BIGINT -) WITH ( - 'connector' = 'blackhole' -); - -INSERT INTO nexmark_q5 -SELECT AuctionBids.auction, AuctionBids.num - FROM ( - SELECT - auction, - count(*) AS num, - window_start AS starttime, - window_end AS endtime - FROM TABLE( - HOP(TABLE bid, DESCRIPTOR(`dateTime`), INTERVAL '2' SECOND, INTERVAL '10' SECOND)) - GROUP BY auction, window_start, window_end - ) AS AuctionBids - JOIN ( - SELECT - max(CountBids.num) AS maxn, - CountBids.starttime, - CountBids.endtime - FROM ( - SELECT - count(*) AS num, - window_start AS starttime, - window_end AS endtime - FROM TABLE( - HOP(TABLE bid, DESCRIPTOR(`dateTime`), INTERVAL '2' SECOND, INTERVAL '10' SECOND)) - GROUP BY auction, window_start, window_end - ) AS CountBids - GROUP BY CountBids.starttime, CountBids.endtime - ) AS MaxBids - ON AuctionBids.starttime = MaxBids.starttime AND - AuctionBids.endtime = MaxBids.endtime AND - AuctionBids.num >= MaxBids.maxn; diff --git a/gluten-flink/ut/src/test/resources/nexmark/q7.sql b/gluten-flink/ut/src/test/resources/nexmark/q7.sql deleted file mode 100644 index 1b0ec308e94b..000000000000 --- a/gluten-flink/ut/src/test/resources/nexmark/q7.sql +++ /dev/null @@ -1,21 +0,0 @@ -CREATE TABLE nexmark_q7 ( - auction BIGINT, - bidder BIGINT, - price BIGINT, - `dateTime` TIMESTAMP(3), - extra VARCHAR -) WITH ( - 'connector' = 'blackhole' -); - -INSERT INTO nexmark_q7 -SELECT B.auction, B.price, B.bidder, B.`dateTime`, B.extra -from bid B -JOIN ( - SELECT MAX(price) AS maxprice, window_end as `dateTime` - FROM TABLE( - TUMBLE(TABLE bid, DESCRIPTOR(`dateTime`), INTERVAL '10' SECOND)) - GROUP BY window_start, window_end -) B1 -ON B.price = B1.maxprice -WHERE B.`dateTime` BETWEEN B1.`dateTime` - INTERVAL '10' SECOND AND B1.`dateTime`; diff --git a/gluten-flink/ut/src/test/resources/nexmark/q8.sql b/gluten-flink/ut/src/test/resources/nexmark/q8.sql deleted file mode 100644 index 1b112f9fc752..000000000000 --- a/gluten-flink/ut/src/test/resources/nexmark/q8.sql +++ /dev/null @@ -1,27 +0,0 @@ -CREATE TABLE nexmark_q8 ( - id BIGINT, - name VARCHAR, - stime TIMESTAMP(3) -) WITH ( - 'connector' = 'blackhole' -); - -INSERT INTO nexmark_q8 -SELECT P.id, P.name, P.starttime -FROM ( - SELECT id, name, - window_start AS starttime, - window_end AS endtime - FROM TABLE( - TUMBLE(TABLE person, DESCRIPTOR(`dateTime`), INTERVAL '10' SECOND)) - GROUP BY id, name, window_start, window_end -) P -JOIN ( - SELECT seller, - window_start AS starttime, - window_end AS endtime - FROM TABLE( - TUMBLE(TABLE auction, DESCRIPTOR(`dateTime`), INTERVAL '10' SECOND)) - GROUP BY seller, window_start, window_end -) A -ON P.id = A.seller AND P.starttime = A.starttime AND P.endtime = A.endtime;