From 889b14ff5c2d6200144931b5e27a155d21a95b6a Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Mon, 9 Feb 2026 06:37:03 +0000 Subject: [PATCH] support rocksdb state for window operator --- .github/workflows/flink.yml | 2 +- .../workflows/util/install-flink-resources.sh | 32 +++- gluten-flink/docs/Flink.md | 26 ++- .../stream/StreamExecWindowAggregate.java | 38 +++-- gluten-flink/runtime/pom.xml | 6 + .../client/OffloadedJobGraphGenerator.java | 9 +- .../operators/GlutenOneInputOperator.java | 17 +- .../operators/GlutenSessionResource.java | 11 ++ .../operators/GlutenSourceFunction.java | 2 +- .../operators/GlutenTwoInputOperator.java | 2 +- .../runtime/operators/WindowAggOperator.java | 158 ++++++++++++++++++ 11 files changed, 277 insertions(+), 26 deletions(-) create mode 100644 gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/WindowAggOperator.java diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml index 3e893afce0b4..5bd953c77cf8 100644 --- a/.github/workflows/flink.yml +++ b/.github/workflows/flink.yml @@ -62,7 +62,7 @@ jobs: sudo yum install https://mirror.stream.centos.org/9-stream/BaseOS/x86_64/os/Packages/tzdata-2025a-1.el9.noarch.rpm -y sudo .github/workflows/util/install-flink-resources.sh git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git - cd velox4j && git reset --hard 288d181a1b05c47f1f17339eb498dd6375f7aec8 + cd velox4j && git reset --hard 889bafcf2fa04e8c31a30edbdf40fe203ef58484 git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch $GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true cd .. diff --git a/.github/workflows/util/install-flink-resources.sh b/.github/workflows/util/install-flink-resources.sh index 23c638cef823..600192a9a1da 100755 --- a/.github/workflows/util/install-flink-resources.sh +++ b/.github/workflows/util/install-flink-resources.sh @@ -16,6 +16,8 @@ LIBRDKAFKA_VERSION="v2.10.0" CPPKAFKA_VERSION="v0.4.1" +FROCKSDB_VERSION="FRocksDB-6.20.3" +FROCKSDB_REPO="ververica/frocksdb" function wget_and_untar { local URL=$1 @@ -39,6 +41,28 @@ function wget_and_untar { popd } +function github_checkout { + local REPO=$1 + shift + local VERSION=$1 + shift + local GIT_CLONE_PARAMS=("$@") + local DIRNAME + DIRNAME=$(basename "$REPO") + SUDO="${SUDO:-""}" + cd "${DEPENDENCY_DIR}" || exit + if [ -z "${DIRNAME}" ]; then + echo "Failed to get repo name from ${REPO}" + exit 1 + fi + if [ -d "${DIRNAME}" ] && prompt "${DIRNAME} already exists. Delete?"; then + ${SUDO} rm -rf "${DIRNAME}" + fi + if [ ! -d "${DIRNAME}" ]; then + git clone -q -b "$VERSION" "${GIT_CLONE_PARAMS[@]}" "https://github.com/${REPO}.git" + fi +} + function cmake_install_dir { pushd "./${DEPENDENCY_DIR}/$1" # remove the directory argument @@ -60,7 +84,7 @@ function cmake_install { fi mkdir -p "${BINARY_DIR}" - COMPILER_FLAGS=$(get_cxx_flags) + COMPILER_FLAGS="-g -gdwarf-2" # Add platform specific CXX flags if any COMPILER_FLAGS+=${OS_CXXFLAGS} @@ -93,9 +117,15 @@ function install_cppkafka { cmake_install_dir cppkafka -DBUILD_TESTS=OFF } +function install_rocksdb { + github_checkout ${FROCKSDB_REPO} ${FROCKSDB_VERSION} + cmake_install_dir frocksdb -DWITH_GFLAGS=OFF -DWITH_TESTS=OFF -DFAIL_ON_WARNINGS=OFF +} + function install_velox_deps { run_and_time install_librdkafka run_and_time install_cppkafka + run_and_time install_rocksdb } install_velox_deps diff --git a/gluten-flink/docs/Flink.md b/gluten-flink/docs/Flink.md index f84aa9945e2d..572df629c17b 100644 --- a/gluten-flink/docs/Flink.md +++ b/gluten-flink/docs/Flink.md @@ -48,7 +48,7 @@ As some features have not been committed to upstream, you have to use the follow ## fetch velox4j code git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git cd velox4j -git reset --hard 288d181a1b05c47f1f17339eb498dd6375f7aec8 +git reset --hard 889bafcf2fa04e8c31a30edbdf40fe203ef58484 mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true ``` **Get gluten** @@ -140,6 +140,30 @@ bin/sql-client.sh -f data-generator.sql TODO +### RocksDB State + +**Get & compile RocksDB** +```bash +git clone -b FRocksDB-6.20.3 https://github.com/ververica/frocksdb.git +cd frocksdb +make rocksdbjava -i +``` + +**Config RocksDB backend** +- copy compiled jar package to `${FLINK_HOME}/gluten_lib` directory. + ```bash + cp ${ROCKSDB_COMPILE_DIR}/java/target/rocksdbjni-6.20.3-linux64.jar ${FLINK_HOME}/gluten_lib + ``` +- modify `${FLINK_HOME}/bin/config.sh` as follows + ``` + GLUTEN_JAR="$FLINK_HOME/gluten_lib/gluten-flink-loader-1.6.0.jar:$FLINK_HOME/gluten_lib/velox4j-0.1.0-SNAPSHOT.jar:$FLINK_HOME/gluten_lib/gluten-flink-runtime-1.6.0.jar:$FLINK_HOME/gluten_lib/rocksdbjni-6.20.3-linux64.jar" + echo "$GLUTEN_JAR""$FLINK_CLASSPATH""$FLINK_DIST" + ``` +- set rocksdb config in `${FLINK_HOME}/conf/config.yaml` + ``` + state.backend.type: rocksdb + ``` + ## Performance We are working on supporting the [Nexmark](https://github.com/nexmark/nexmark) benchmark for Flink. Now the q0 has been supported. diff --git a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java index 499a65516db3..93e070398672 100644 --- a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java +++ b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java @@ -19,7 +19,6 @@ import org.apache.gluten.rexnode.AggregateCallConverter; import org.apache.gluten.rexnode.Utils; import org.apache.gluten.rexnode.WindowUtils; -import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator; import org.apache.gluten.util.LogicalTypeConverter; import org.apache.gluten.util.PlanNodeIdGenerator; @@ -52,12 +51,16 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata; import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; +import org.apache.flink.table.planner.plan.utils.AggregateInfoList; +import org.apache.flink.table.planner.plan.utils.AggregateUtil; import org.apache.flink.table.planner.plan.utils.KeySelectorUtil; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; import org.apache.flink.table.planner.utils.TableConfigUtils; import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.runtime.util.TimeWindowUtil; +import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; @@ -176,6 +179,14 @@ protected Transformation translateToPlanInternal( final ZoneId shiftTimeZone = TimeWindowUtil.getShiftTimeZone( windowing.getTimeAttributeType(), TableConfigUtils.getLocalTimeZone(config)); + final AggregateInfoList aggInfoList = + AggregateUtil.deriveStreamWindowAggregateInfoList( + planner.getTypeFactory(), + inputRowType, + JavaScalaConversionUtil.toScala(Arrays.asList(aggCalls)), + needRetraction, + windowing.getWindow(), + true); // isStateBackendDataViews // --- Begin Gluten-specific code changes --- // TODO: velox window not equal to flink window. @@ -245,23 +256,30 @@ protected Transformation translateToPlanInternal( windowType, outputType, rowtimeIndex); - final OneInputStreamOperator windowOperator = - new GlutenOneInputOperator( + final RowDataKeySelector selector = + KeySelectorUtil.getRowDataSelector( + planner.getFlinkContext().getClassLoader(), + grouping, + InternalTypeInfo.of(inputRowType)); + LogicalType[] accTypes = + Arrays.stream(aggInfoList.getAccTypes()) + .map(x -> x.getLogicalType()) + .collect(Collectors.toList()) + .toArray(new LogicalType[] {}); + final OneInputStreamOperator windowOperator = + new org.apache.gluten.table.runtime.operators.WindowAggOperator( new StatefulPlanNode(windowAgg.getId(), windowAgg), PlanNodeIdGenerator.newId(), inputType, Map.of(windowAgg.getId(), outputType), RowData.class, RowData.class, - "StreamExecWindowAggregate"); + "StreamExecWindowAggregate", + selector.getProducedType(), + aggInfoList.getAggNames(), + accTypes); // --- End Gluten-specific code changes --- - final RowDataKeySelector selector = - KeySelectorUtil.getRowDataSelector( - planner.getFlinkContext().getClassLoader(), - grouping, - InternalTypeInfo.of(inputRowType)); - final OneInputTransformation transform = ExecNodeUtil.createOneInputTransformation( inputTransform, diff --git a/gluten-flink/runtime/pom.xml b/gluten-flink/runtime/pom.xml index cfb58166bbba..fc3d272d5567 100644 --- a/gluten-flink/runtime/pom.xml +++ b/gluten-flink/runtime/pom.xml @@ -63,6 +63,12 @@ ${flink.version} provided + + org.apache.flink + flink-statebackend-rocksdb + ${flink.version} + provided + diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/client/OffloadedJobGraphGenerator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/client/OffloadedJobGraphGenerator.java index 622777eebef8..42784fce289e 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/client/OffloadedJobGraphGenerator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/client/OffloadedJobGraphGenerator.java @@ -240,14 +240,7 @@ private void createOffloadedOneInputOperator( Class inClass = supportsVectorInput ? StatefulRecord.class : RowData.class; Class outClass = supportsVectorOutput ? StatefulRecord.class : RowData.class; GlutenOneInputOperator newOneInputOp = - new GlutenOneInputOperator<>( - planNode, - sourceOperator.getId(), - sourceOperator.getInputType(), - sourceOperator.getOutputTypes(), - inClass, - outClass, - sourceOperator.getDescription()); + sourceOperator.cloneWithInputOutputClasses(inClass, outClass); offloadedOpConfig.setStreamOperator(newOneInputOp); if (supportsVectorOutput) { setOffloadedOutputSerializer(offloadedOpConfig, sourceOperator); diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java index df0b8b921fbf..a2733c2438ff 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java @@ -65,7 +65,7 @@ public class GlutenOneInputOperator extends TableStreamOperator private transient GlutenSessionResource sessionResource; private transient Query query; private transient ExternalStreams.BlockingQueue inputQueue; - private transient SerialTask task; + protected transient SerialTask task; private final Class inClass; private final Class outClass; private transient VectorInputBridge inputBridge; @@ -191,6 +191,18 @@ private void processElementInternal() { } } + public GlutenOneInputOperator cloneWithInputOutputClasses( + Class newInClass, Class newOutClass) { + return new GlutenOneInputOperator<>( + this.glutenPlan, + this.id, + this.inputType, + this.outputTypes, + newInClass, + newOutClass, + this.description); + } + @Override public void processWatermark(Watermark mark) throws Exception { task.notifyWatermark(mark.getTimestamp()); @@ -260,8 +272,7 @@ public void initializeState(StateInitializationContext context) throws Exception if (task == null) { initSession(); } - // TODO: implement it - task.initializeState(0); + task.initializeState(0, null); super.initializeState(context); } diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSessionResource.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSessionResource.java index b54102c466f5..ea38229e9500 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSessionResource.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSessionResource.java @@ -21,6 +21,8 @@ import io.github.zhztheplayer.velox4j.memory.MemoryManager; import io.github.zhztheplayer.velox4j.session.Session; +import org.apache.flink.runtime.state.KeyedStateBackend; + import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; @@ -29,6 +31,7 @@ class GlutenSessionResource { private Session session; private MemoryManager memoryManager; private BufferAllocator allocator; + private KeyedStateBackend keyedStateBackend; public GlutenSessionResource() { this.memoryManager = MemoryManager.create(AllocationListener.NOOP); @@ -62,4 +65,12 @@ public MemoryManager getMemoryManager() { public BufferAllocator getAllocator() { return allocator; } + + public KeyedStateBackend getKeyedStateBackend() { + return keyedStateBackend; + } + + public void setKeyedStateBackend(KeyedStateBackend keyedStateBackend) { + this.keyedStateBackend = keyedStateBackend; + } } diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java index 5c7b0c874d08..ea0ddcbc7c8d 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java @@ -215,7 +215,7 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception { public void initializeState(FunctionInitializationContext context) throws Exception { initSession(); // TODO: implement it - this.task.initializeState(0); + this.task.initializeState(0, null); } public String[] notifyCheckpointComplete(long checkpointId) throws Exception { diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java index 6d4765af97aa..2352d749436b 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java @@ -253,7 +253,7 @@ public void snapshotState(StateSnapshotContext context) throws Exception { public void initializeState(StateInitializationContext context) throws Exception { initSession(); // TODO: implement it - task.initializeState(0); + task.initializeState(0, null); super.initializeState(context); } diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/WindowAggOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/WindowAggOperator.java new file mode 100644 index 000000000000..50d91f2c73af --- /dev/null +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/WindowAggOperator.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.table.runtime.operators; + +import org.apache.gluten.util.LogicalTypeConverter; +import org.apache.gluten.util.ReflectUtils; + +import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode; +import io.github.zhztheplayer.velox4j.stateful.RocksDBKeyedStateBackendParameters; +import io.github.zhztheplayer.velox4j.type.BigIntType; +import io.github.zhztheplayer.velox4j.type.RowType; + +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.internal.InternalValueState; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.operators.window.tvf.state.WindowValueState; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType.RowField; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class WindowAggOperator extends GlutenOneInputOperator { + private final String windowStateName = "window-aggs"; + private WindowValueState windowState; + private InternalTypeInfo keyType; + private String[] accNames; + private LogicalType[] accTypes; + + public WindowAggOperator( + StatefulPlanNode plan, + String id, + RowType inputType, + Map outputTypes, + Class inClass, + Class outClass, + String description, + InternalTypeInfo keyType, + String[] accNames, + LogicalType[] accTypes) { + super(plan, id, inputType, outputTypes, inClass, outClass, description); + this.keyType = keyType; + this.accNames = accNames; + this.accTypes = accTypes; + } + + public InternalTypeInfo getKeyTye() { + return keyType; + } + + public String[] getAggregateNames() { + return accNames; + } + + public LogicalType[] getAggregateTypes() { + return accTypes; + } + + @SuppressWarnings("unchecked") + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + KeyedStateBackend stateBackend = getKeyedStateBackend(); + ValueStateDescriptor descriptor = + new ValueStateDescriptor<>(windowStateName, new RowDataSerializer(accTypes)); + ValueState state = + stateBackend.getOrCreateKeyedState(LongSerializer.INSTANCE, descriptor); + this.windowState = new WindowValueState<>((InternalValueState) state); + if (stateBackend instanceof RocksDBKeyedStateBackend) { + RocksDBKeyedStateBackend keyedStateBackend = + (RocksDBKeyedStateBackend) stateBackend; + RocksDB dbInstance = + (RocksDB) + ReflectUtils.getObjectField(RocksDBKeyedStateBackend.class, keyedStateBackend, "db"); + ColumnFamilyHandle columnFamilyHandle = + (ColumnFamilyHandle) + ReflectUtils.invokeObjectMethod( + RocksDBKeyedStateBackend.class, + keyedStateBackend, + "getColumnFamilyHandle", + new Class[] {String.class}, + new Object[] {windowStateName}); + String jobId = getRuntimeContext().getJobInfo().getJobId().toString(); + String operartorId = getRuntimeContext().getOperatorUniqueID().toString(); + List accFields = new ArrayList<>(); + for (int i = 0; i < accNames.length; ++i) { + accFields.add(new RowField(accNames[i], accTypes[i])); + } + RocksDBKeyedStateBackendParameters parameters = + new RocksDBKeyedStateBackendParameters( + jobId, + operartorId, + 1, + dbInstance.getNativeHandle(), + keyedStateBackend.getReadOptions().getNativeHandle(), + keyedStateBackend.getWriteOptions().getNativeHandle(), + List.of(windowStateName), + Map.of(windowStateName, operartorId), + Map.of(windowStateName, columnFamilyHandle.getNativeHandle()), + Map.of(windowStateName, LogicalTypeConverter.toVLType(keyType.toLogicalType())), + Map.of( + windowStateName, + LogicalTypeConverter.toVLType( + new org.apache.flink.table.types.logical.RowType(accFields))), + Map.of(windowStateName, new BigIntType())); + task.initializeState(0, parameters); + } + } + + @Override + public WindowAggOperator cloneWithInputOutputClasses( + Class newInClass, Class newOutClass) { + return new WindowAggOperator<>( + getPlanNode(), + getId(), + getInputType(), + getOutputTypes(), + newInClass, + newOutClass, + getDescription(), + keyType, + accNames, + accTypes); + } + + @Override + public void setCurrentKey(Object key) {} + + public void close() throws Exception { + super.close(); + if (windowState != null) {} + } +}