Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/flink.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ jobs:
sudo yum install https://mirror.stream.centos.org/9-stream/BaseOS/x86_64/os/Packages/tzdata-2025a-1.el9.noarch.rpm -y
sudo .github/workflows/util/install-flink-resources.sh
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
cd velox4j && git reset --hard 288d181a1b05c47f1f17339eb498dd6375f7aec8
cd velox4j && git reset --hard 889bafcf2fa04e8c31a30edbdf40fe203ef58484
git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
$GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
cd ..
Expand Down
32 changes: 31 additions & 1 deletion .github/workflows/util/install-flink-resources.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

LIBRDKAFKA_VERSION="v2.10.0"
CPPKAFKA_VERSION="v0.4.1"
FROCKSDB_VERSION="FRocksDB-6.20.3"
FROCKSDB_REPO="ververica/frocksdb"

function wget_and_untar {
local URL=$1
Expand All @@ -39,6 +41,28 @@ function wget_and_untar {
popd
}

function github_checkout {
local REPO=$1
shift
local VERSION=$1
shift
local GIT_CLONE_PARAMS=("$@")
local DIRNAME
DIRNAME=$(basename "$REPO")
SUDO="${SUDO:-""}"
cd "${DEPENDENCY_DIR}" || exit
if [ -z "${DIRNAME}" ]; then
echo "Failed to get repo name from ${REPO}"
exit 1
fi
if [ -d "${DIRNAME}" ] && prompt "${DIRNAME} already exists. Delete?"; then
${SUDO} rm -rf "${DIRNAME}"
fi
if [ ! -d "${DIRNAME}" ]; then
git clone -q -b "$VERSION" "${GIT_CLONE_PARAMS[@]}" "https://github.com/${REPO}.git"
fi
}

function cmake_install_dir {
pushd "./${DEPENDENCY_DIR}/$1"
# remove the directory argument
Expand All @@ -60,7 +84,7 @@ function cmake_install {
fi

mkdir -p "${BINARY_DIR}"
COMPILER_FLAGS=$(get_cxx_flags)
COMPILER_FLAGS="-g -gdwarf-2"
# Add platform specific CXX flags if any
COMPILER_FLAGS+=${OS_CXXFLAGS}

Expand Down Expand Up @@ -93,9 +117,15 @@ function install_cppkafka {
cmake_install_dir cppkafka -DBUILD_TESTS=OFF
}

function install_rocksdb {
github_checkout ${FROCKSDB_REPO} ${FROCKSDB_VERSION}
cmake_install_dir frocksdb -DWITH_GFLAGS=OFF -DWITH_TESTS=OFF -DFAIL_ON_WARNINGS=OFF
}

function install_velox_deps {
run_and_time install_librdkafka
run_and_time install_cppkafka
run_and_time install_rocksdb
}

install_velox_deps
26 changes: 25 additions & 1 deletion gluten-flink/docs/Flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ As some features have not been committed to upstream, you have to use the follow
## fetch velox4j code
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
cd velox4j
git reset --hard 288d181a1b05c47f1f17339eb498dd6375f7aec8
git reset --hard 889bafcf2fa04e8c31a30edbdf40fe203ef58484
mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
```
**Get gluten**
Expand Down Expand Up @@ -140,6 +140,30 @@ bin/sql-client.sh -f data-generator.sql

TODO

### RocksDB State

**Get & compile RocksDB**
```bash
git clone -b FRocksDB-6.20.3 https://github.com/ververica/frocksdb.git
cd frocksdb
make rocksdbjava -i
```

**Config RocksDB backend**
- copy compiled jar package to `${FLINK_HOME}/gluten_lib` directory.
```bash
cp ${ROCKSDB_COMPILE_DIR}/java/target/rocksdbjni-6.20.3-linux64.jar ${FLINK_HOME}/gluten_lib
```
- modify `${FLINK_HOME}/bin/config.sh` as follows
```
GLUTEN_JAR="$FLINK_HOME/gluten_lib/gluten-flink-loader-1.6.0.jar:$FLINK_HOME/gluten_lib/velox4j-0.1.0-SNAPSHOT.jar:$FLINK_HOME/gluten_lib/gluten-flink-runtime-1.6.0.jar:$FLINK_HOME/gluten_lib/rocksdbjni-6.20.3-linux64.jar"
echo "$GLUTEN_JAR""$FLINK_CLASSPATH""$FLINK_DIST"
```
- set rocksdb config in `${FLINK_HOME}/conf/config.yaml`
```
state.backend.type: rocksdb
```

## Performance
We are working on supporting the [Nexmark](https://github.com/nexmark/nexmark) benchmark for Flink.
Now the q0 has been supported.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.gluten.rexnode.AggregateCallConverter;
import org.apache.gluten.rexnode.Utils;
import org.apache.gluten.rexnode.WindowUtils;
import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator;
import org.apache.gluten.util.LogicalTypeConverter;
import org.apache.gluten.util.PlanNodeIdGenerator;

Expand Down Expand Up @@ -52,12 +51,16 @@
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
import org.apache.flink.table.planner.plan.utils.AggregateUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
Expand Down Expand Up @@ -176,6 +179,14 @@ protected Transformation<RowData> translateToPlanInternal(
final ZoneId shiftTimeZone =
TimeWindowUtil.getShiftTimeZone(
windowing.getTimeAttributeType(), TableConfigUtils.getLocalTimeZone(config));
final AggregateInfoList aggInfoList =
AggregateUtil.deriveStreamWindowAggregateInfoList(
planner.getTypeFactory(),
inputRowType,
JavaScalaConversionUtil.toScala(Arrays.asList(aggCalls)),
needRetraction,
windowing.getWindow(),
true); // isStateBackendDataViews

// --- Begin Gluten-specific code changes ---
// TODO: velox window not equal to flink window.
Expand Down Expand Up @@ -245,23 +256,30 @@ protected Transformation<RowData> translateToPlanInternal(
windowType,
outputType,
rowtimeIndex);
final OneInputStreamOperator windowOperator =
new GlutenOneInputOperator(
final RowDataKeySelector selector =
KeySelectorUtil.getRowDataSelector(
planner.getFlinkContext().getClassLoader(),
grouping,
InternalTypeInfo.of(inputRowType));
LogicalType[] accTypes =
Arrays.stream(aggInfoList.getAccTypes())
.map(x -> x.getLogicalType())
.collect(Collectors.toList())
.toArray(new LogicalType[] {});
final OneInputStreamOperator<RowData, RowData> windowOperator =
new org.apache.gluten.table.runtime.operators.WindowAggOperator<RowData, RowData, Long>(
new StatefulPlanNode(windowAgg.getId(), windowAgg),
PlanNodeIdGenerator.newId(),
inputType,
Map.of(windowAgg.getId(), outputType),
RowData.class,
RowData.class,
"StreamExecWindowAggregate");
"StreamExecWindowAggregate",
selector.getProducedType(),
aggInfoList.getAggNames(),
accTypes);
// --- End Gluten-specific code changes ---

final RowDataKeySelector selector =
KeySelectorUtil.getRowDataSelector(
planner.getFlinkContext().getClassLoader(),
grouping,
InternalTypeInfo.of(inputRowType));

final OneInputTransformation<RowData, RowData> transform =
ExecNodeUtil.createOneInputTransformation(
inputTransform,
Expand Down
6 changes: 6 additions & 0 deletions gluten-flink/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,7 @@ private void createOffloadedOneInputOperator(
Class<?> inClass = supportsVectorInput ? StatefulRecord.class : RowData.class;
Class<?> outClass = supportsVectorOutput ? StatefulRecord.class : RowData.class;
GlutenOneInputOperator<?, ?> newOneInputOp =
new GlutenOneInputOperator<>(
planNode,
sourceOperator.getId(),
sourceOperator.getInputType(),
sourceOperator.getOutputTypes(),
inClass,
outClass,
sourceOperator.getDescription());
sourceOperator.cloneWithInputOutputClasses(inClass, outClass);
offloadedOpConfig.setStreamOperator(newOneInputOp);
if (supportsVectorOutput) {
setOffloadedOutputSerializer(offloadedOpConfig, sourceOperator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class GlutenOneInputOperator<IN, OUT> extends TableStreamOperator<OUT>
private transient GlutenSessionResource sessionResource;
private transient Query query;
private transient ExternalStreams.BlockingQueue inputQueue;
private transient SerialTask task;
protected transient SerialTask task;
private final Class<IN> inClass;
private final Class<OUT> outClass;
private transient VectorInputBridge<IN> inputBridge;
Expand Down Expand Up @@ -191,6 +191,18 @@ private void processElementInternal() {
}
}

public <NIN, NOUT> GlutenOneInputOperator<NIN, NOUT> cloneWithInputOutputClasses(
Class<NIN> newInClass, Class<NOUT> newOutClass) {
return new GlutenOneInputOperator<>(
this.glutenPlan,
this.id,
this.inputType,
this.outputTypes,
newInClass,
newOutClass,
this.description);
}

@Override
public void processWatermark(Watermark mark) throws Exception {
task.notifyWatermark(mark.getTimestamp());
Expand Down Expand Up @@ -260,8 +272,7 @@ public void initializeState(StateInitializationContext context) throws Exception
if (task == null) {
initSession();
}
// TODO: implement it
task.initializeState(0);
task.initializeState(0, null);
super.initializeState(context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.github.zhztheplayer.velox4j.memory.MemoryManager;
import io.github.zhztheplayer.velox4j.session.Session;

import org.apache.flink.runtime.state.KeyedStateBackend;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;

Expand All @@ -29,6 +31,7 @@ class GlutenSessionResource {
private Session session;
private MemoryManager memoryManager;
private BufferAllocator allocator;
private KeyedStateBackend<?> keyedStateBackend;

public GlutenSessionResource() {
this.memoryManager = MemoryManager.create(AllocationListener.NOOP);
Expand Down Expand Up @@ -62,4 +65,12 @@ public MemoryManager getMemoryManager() {
public BufferAllocator getAllocator() {
return allocator;
}

public KeyedStateBackend<?> getKeyedStateBackend() {
return keyedStateBackend;
}

public void setKeyedStateBackend(KeyedStateBackend<?> keyedStateBackend) {
this.keyedStateBackend = keyedStateBackend;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception {
public void initializeState(FunctionInitializationContext context) throws Exception {
initSession();
// TODO: implement it
this.task.initializeState(0);
this.task.initializeState(0, null);
}

public String[] notifyCheckpointComplete(long checkpointId) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public void snapshotState(StateSnapshotContext context) throws Exception {
public void initializeState(StateInitializationContext context) throws Exception {
initSession();
// TODO: implement it
task.initializeState(0);
task.initializeState(0, null);
super.initializeState(context);
}

Expand Down
Loading