diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 7690d7dc..fd95c24e 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -22,7 +22,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
- java: [8, 11]
+ java: [17, 21]
steps:
- uses: actions/checkout@v3
@@ -37,5 +37,5 @@ jobs:
- name: Build without Scala
run: ./gradlew build --scan --stacktrace -Porg.gradle.project.enable_scala=false
- - name: Build with Scala
- run: ./gradlew build --scan --stacktrace -Porg.gradle.project.enable_scala=true
+# - name: Build with Scala
+# run: ./gradlew build --scan --stacktrace -Porg.gradle.project.enable_scala=true
diff --git a/README.md b/README.md
index 9ed2a0cb..8fee9dff 100644
--- a/README.md
+++ b/README.md
@@ -57,7 +57,7 @@ You will need to set up your environment in order to develop, debug, and execute
Flink supports Linux, OS X, and Windows as development environments for Flink programs and local execution. The following software is required for a Flink development setup and should be installed on your system:
- Git
-- a JDK for Java 8 or Java 11 (a JRE is not sufficient; other versions of Java are currently not supported)
+- a JDK for Java 11 or Java 17 or Java 21 (a JRE is not sufficient; other versions of Java are currently not supported)
- an IDE for Java (and/or Scala) development with Gradle support
- We recommend [IntelliJ](https://www.jetbrains.com/idea/), but [Eclipse](https://www.eclipse.org/downloads/) or [Visual Studio Code](https://code.visualstudio.com/) (with the [Java extension pack](https://code.visualstudio.com/docs/java/java-tutorial)) can also be used so long as you stick to Java
- For Scala, you will need to use IntelliJ (and its [Scala plugin](https://plugins.jetbrains.com/plugin/1347-scala/))
@@ -130,7 +130,7 @@ You can also selectively apply this plugin in a single subproject if desired.
The project needs to be imported as a gradle project into your IDE.
-Then you should be able to open [`RideCleansingTest`](ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTest.java) and run this test.
+Then you should be able to open [`RideCleansingTest`](ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingUnitTest.java) and run this test.
> **:information_source: Note for Scala users:** You will need to use IntelliJ with the JetBrains Scala plugin, and you will need to add a Scala 2.12 SDK to the Global Libraries section of the Project Structure as well as to the module you are working on.
> IntelliJ will ask you for the latter when you open a Scala file.
@@ -241,6 +241,8 @@ For Java/Scala exercises and solutions, we provide special tasks that can be lis
- [Exercise](long-ride-alerts/README.md)
- [Discussion](long-ride-alerts/DISCUSSION.md)
+
+
## Contribute
If you would like to contribute to this repository or add new exercises, please read the [contributing](CONTRIBUTING.md) guide.
diff --git a/README_zh.md b/README_zh.md
index 1914e08e..d73b9c3c 100644
--- a/README_zh.md
+++ b/README_zh.md
@@ -59,7 +59,7 @@ under the License.
Linux、OS X 和 Windows 均可作为 Flink 程序和本地执行的开发环境。 Flink 开发设置需要以下软件,它们应该安装在系统上:
- Git
-- Java 8 或者 Java 11 版本的 JDK (JRE不满足要求;目前不支持其他版本的Java)
+- Java 11 或者 Java 17 或者 Java 21 版本的 JDK (JRE不满足要求;目前不支持其他版本的Java)
- 支持 Gradle 的 Java (及/或 Scala) 开发IDE
- 推荐使用 [IntelliJ](https://www.jetbrains.com/idea/), 但 [Eclipse](https://www.eclipse.org/downloads/) 或 [Visual Studio Code](https://code.visualstudio.com/) (安装 [Java extension pack](https://code.visualstudio.com/docs/java/java-tutorial) 插件) 也可以用于Java环境
- 为了使用 Scala, 需要使用 IntelliJ (及其 [Scala plugin](https://plugins.jetbrains.com/plugin/1347-scala/) 插件)
@@ -134,7 +134,7 @@ org.gradle.project.enable_scala = true
本项目应作为 gradle 项目导入到IDE中。
-然后应该可以打开 [`RideCleansingTest`](ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTest.java) 并运行此测试。
+然后应该可以打开 [`RideCleansingTest`](ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingUnitTest.java) 并运行此测试。
> **:information_source: Scala 用户须知:** 需要将 IntelliJ 与 JetBrains Scala 插件一起使用,并且需要将 Scala 2.12 SDK 添加到项目结构的全局库部分以及工作模块中。
> 当打开 Scala 文件时,IntelliJ 会要求提供后者(JetBrains Scala 插件)。
diff --git a/build.gradle b/build.gradle
index 3344a8e2..add9a22c 100644
--- a/build.gradle
+++ b/build.gradle
@@ -16,15 +16,15 @@
*/
plugins {
- id 'com.github.johnrengelman.shadow' version '7.0.0' apply false
- id "com.diffplug.spotless" version "6.4.2" apply false
+ id 'com.gradleup.shadow' version '9.0.0-rc2' apply false
+ id "com.diffplug.spotless" version "7.2.1" apply false
}
description = "Flink Training Exercises"
allprojects {
group = 'org.apache.flink'
- version = '1.17-SNAPSHOT'
+ version = '2.0.0-SNAPSHOT'
apply plugin: 'com.diffplug.spotless'
@@ -33,7 +33,7 @@ allprojects {
target '*.gradle', '*.md', '.gitignore'
trimTrailingWhitespace()
- indentWithSpaces(4)
+ leadingTabsToSpaces(4)
endWithNewline()
}
@@ -89,13 +89,13 @@ subprojects {
if (project.properties['org.gradle.project.enable_scala'].trim() == 'true') {
apply plugin: 'scala'
}
- apply plugin: 'com.github.johnrengelman.shadow'
+ apply plugin: 'com.gradleup.shadow'
apply plugin: 'checkstyle'
apply plugin: 'eclipse'
ext {
- javaVersion = '1.8'
- flinkVersion = '1.17.0'
+ javaVersion = '17'
+ flinkVersion = '2.0.0'
scalaBinaryVersion = '2.12'
log4jVersion = '2.12.1'
junitVersion = '4.13'
@@ -127,9 +127,12 @@ subprojects {
shadow "org.apache.logging.log4j:log4j-core:${log4jVersion}"
shadow "org.apache.flink:flink-clients:${flinkVersion}"
- shadow "org.apache.flink:flink-java:${flinkVersion}"
+// removed
+// shadow "org.apache.flink:flink-java:${flinkVersion}"
+ shadow "org.apache.flink:flink-runtime:${flinkVersion}"
shadow "org.apache.flink:flink-streaming-java:${flinkVersion}"
- shadow "org.apache.flink:flink-streaming-scala_${scalaBinaryVersion}:${flinkVersion}"
+// removed
+// shadow "org.apache.flink:flink-streaming-scala_${scalaBinaryVersion}:${flinkVersion}"
// allows using Flink's web UI when running in the IDE:
shadow "org.apache.flink:flink-runtime-web:${flinkVersion}"
@@ -174,7 +177,7 @@ subprojects {
spotless {
java {
- googleJavaFormat('1.7').aosp()
+ googleJavaFormat('1.28.0').aosp()
// \# refers to static imports
importOrder('org.apache.flink', 'org.apache.flink.shaded', '', 'javax', 'java', 'scala', '\\#')
diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareGenerator.java b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareGenerator.java
index 58fbe687..0e3e9010 100644
--- a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareGenerator.java
+++ b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareGenerator.java
@@ -18,7 +18,7 @@
package org.apache.flink.training.exercises.common.sources;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
import org.apache.flink.training.exercises.common.utils.DataGenerator;
diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java
index 975ff2bf..9aaddebd 100644
--- a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java
+++ b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java
@@ -18,7 +18,7 @@
package org.apache.flink.training.exercises.common.sources;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
import java.util.ArrayList;
diff --git a/common/src/test/java/org/apache/flink/training/exercises/testing/ComposedKeyedProcessFunction.java b/common/src/test/java/org/apache/flink/training/exercises/testing/ComposedKeyedProcessFunction.java
index c3ecf2cc..e71b145c 100644
--- a/common/src/test/java/org/apache/flink/training/exercises/testing/ComposedKeyedProcessFunction.java
+++ b/common/src/test/java/org/apache/flink/training/exercises/testing/ComposedKeyedProcessFunction.java
@@ -18,7 +18,7 @@
package org.apache.flink.training.exercises.testing;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.training.exercises.common.utils.MissingSolutionException;
import org.apache.flink.util.Collector;
@@ -48,7 +48,7 @@ public ComposedKeyedProcessFunction(
}
@Override
- public void open(Configuration parameters) throws Exception {
+ public void open(OpenContext parameters) throws Exception {
try {
exercise.setRuntimeContext(this.getRuntimeContext());
diff --git a/common/src/test/java/org/apache/flink/training/exercises/testing/ComposedPipeline.java b/common/src/test/java/org/apache/flink/training/exercises/testing/ComposedPipeline.java
index ba277d00..018ebb78 100644
--- a/common/src/test/java/org/apache/flink/training/exercises/testing/ComposedPipeline.java
+++ b/common/src/test/java/org/apache/flink/training/exercises/testing/ComposedPipeline.java
@@ -19,7 +19,7 @@
package org.apache.flink.training.exercises.testing;
import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.training.exercises.common.utils.MissingSolutionException;
/**
diff --git a/common/src/test/java/org/apache/flink/training/exercises/testing/ComposedRichCoFlatMapFunction.java b/common/src/test/java/org/apache/flink/training/exercises/testing/ComposedRichCoFlatMapFunction.java
index 97d5ce21..64e4c130 100644
--- a/common/src/test/java/org/apache/flink/training/exercises/testing/ComposedRichCoFlatMapFunction.java
+++ b/common/src/test/java/org/apache/flink/training/exercises/testing/ComposedRichCoFlatMapFunction.java
@@ -18,7 +18,7 @@
package org.apache.flink.training.exercises.testing;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.training.exercises.common.utils.MissingSolutionException;
import org.apache.flink.util.Collector;
@@ -50,7 +50,7 @@ public ComposedRichCoFlatMapFunction(
}
@Override
- public void open(Configuration parameters) throws Exception {
+ public void open(OpenContext parameters) throws Exception {
try {
exercise.setRuntimeContext(this.getRuntimeContext());
diff --git a/common/src/test/java/org/apache/flink/training/exercises/testing/ComposedTwoInputPipeline.java b/common/src/test/java/org/apache/flink/training/exercises/testing/ComposedTwoInputPipeline.java
index 95e24983..064a5b92 100644
--- a/common/src/test/java/org/apache/flink/training/exercises/testing/ComposedTwoInputPipeline.java
+++ b/common/src/test/java/org/apache/flink/training/exercises/testing/ComposedTwoInputPipeline.java
@@ -19,7 +19,7 @@
package org.apache.flink.training.exercises.testing;
import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.training.exercises.common.utils.MissingSolutionException;
/**
diff --git a/common/src/test/java/org/apache/flink/training/exercises/testing/ExecutablePipeline.java b/common/src/test/java/org/apache/flink/training/exercises/testing/ExecutablePipeline.java
index 685edc47..1ef30d67 100644
--- a/common/src/test/java/org/apache/flink/training/exercises/testing/ExecutablePipeline.java
+++ b/common/src/test/java/org/apache/flink/training/exercises/testing/ExecutablePipeline.java
@@ -19,7 +19,7 @@
package org.apache.flink.training.exercises.testing;
import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
public interface ExecutablePipeline {
JobExecutionResult execute(SourceFunction source, TestSink sink) throws Exception;
diff --git a/common/src/test/java/org/apache/flink/training/exercises/testing/ExecutableTwoInputPipeline.java b/common/src/test/java/org/apache/flink/training/exercises/testing/ExecutableTwoInputPipeline.java
index 3f9aba90..663e4d91 100644
--- a/common/src/test/java/org/apache/flink/training/exercises/testing/ExecutableTwoInputPipeline.java
+++ b/common/src/test/java/org/apache/flink/training/exercises/testing/ExecutableTwoInputPipeline.java
@@ -19,7 +19,7 @@
package org.apache.flink.training.exercises.testing;
import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
public interface ExecutableTwoInputPipeline {
JobExecutionResult execute(
diff --git a/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java b/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java
index 6dcfc022..78d00087 100644
--- a/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java
+++ b/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java
@@ -21,7 +21,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction;
public class ParallelTestSource extends RichParallelSourceFunction
implements ResultTypeQueryable {
@@ -37,8 +37,9 @@ public ParallelTestSource(T... events) {
@Override
public void run(SourceContext ctx) {
- int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
- int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ int indexOfThisSubtask = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
+ int numberOfParallelSubtasks =
+ getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
int subtask = 0;
// the elements of the testStream are assigned to the parallel instances in a round-robin
diff --git a/common/src/test/java/org/apache/flink/training/exercises/testing/TestSink.java b/common/src/test/java/org/apache/flink/training/exercises/testing/TestSink.java
index 4fef8b9d..b3c89638 100644
--- a/common/src/test/java/org/apache/flink/training/exercises/testing/TestSink.java
+++ b/common/src/test/java/org/apache/flink/training/exercises/testing/TestSink.java
@@ -20,8 +20,8 @@
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.ListAccumulator;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.streaming.api.functions.sink.legacy.RichSinkFunction;
import java.util.List;
@@ -38,7 +38,7 @@ public TestSink() {
}
@Override
- public void open(Configuration parameters) {
+ public void open(OpenContext parameters) {
getRuntimeContext().addAccumulator(name, new ListAccumulator());
}
diff --git a/config/checkstyle/checkstyle.xml b/config/checkstyle/checkstyle.xml
index 56d219df..05dc6ef3 100644
--- a/config/checkstyle/checkstyle.xml
+++ b/config/checkstyle/checkstyle.xml
@@ -247,7 +247,7 @@ This file is based on the checkstyle file of Apache Beam.
-
+
diff --git a/gradle.properties b/gradle.properties
index 6be414c4..b7ee9cf0 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -3,3 +3,11 @@ org.gradle.parallel = true
# Scala exercises can be enabled by setting this to true
org.gradle.project.enable_scala = false
+
+# This is required for Java 16+ to compile the code
+org.gradle.jvmargs=-XX:MaxMetaspaceSize=512m \
+ --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED \
+ --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED \
+ --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED \
+ --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED \
+ --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED
diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar
index f3d88b1c..9bbc975c 100644
Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 69a97150..d4081da4 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,5 +1,7 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-7.1-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.3-bin.zip
+networkTimeout=10000
+validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
diff --git a/gradlew b/gradlew
index 2fe81a7d..faf93008 100755
--- a/gradlew
+++ b/gradlew
@@ -1,7 +1,7 @@
-#!/usr/bin/env sh
+#!/bin/sh
#
-# Copyright 2015 the original author or authors.
+# Copyright © 2015-2021 the original authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -15,80 +15,115 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+# SPDX-License-Identifier: Apache-2.0
+#
##############################################################################
-##
-## Gradle start up script for UN*X
-##
+#
+# Gradle start up script for POSIX generated by Gradle.
+#
+# Important for running:
+#
+# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is
+# noncompliant, but you have some other compliant shell such as ksh or
+# bash, then to run this script, type that shell name before the whole
+# command line, like:
+#
+# ksh Gradle
+#
+# Busybox and similar reduced shells will NOT work, because this script
+# requires all of these POSIX shell features:
+# * functions;
+# * expansions «$var», «${var}», «${var:-default}», «${var+SET}»,
+# «${var#prefix}», «${var%suffix}», and «$( cmd )»;
+# * compound commands having a testable exit status, especially «case»;
+# * various built-in commands including «command», «set», and «ulimit».
+#
+# Important for patching:
+#
+# (2) This script targets any POSIX shell, so it avoids extensions provided
+# by Bash, Ksh, etc; in particular arrays are avoided.
+#
+# The "traditional" practice of packing multiple parameters into a
+# space-separated string is a well documented source of bugs and security
+# problems, so this is (mostly) avoided, by progressively accumulating
+# options in "$@", and eventually passing that to Java.
+#
+# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS,
+# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly;
+# see the in-line comments for details.
+#
+# There are tweaks for specific operating systems such as AIX, CygWin,
+# Darwin, MinGW, and NonStop.
+#
+# (3) This script is generated from the Groovy template
+# https://github.com/gradle/gradle/blob/HEAD/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
+# within the Gradle project.
+#
+# You can find Gradle at https://github.com/gradle/gradle/.
+#
##############################################################################
# Attempt to set APP_HOME
+
# Resolve links: $0 may be a link
-PRG="$0"
-# Need this for relative symlinks.
-while [ -h "$PRG" ] ; do
- ls=`ls -ld "$PRG"`
- link=`expr "$ls" : '.*-> \(.*\)$'`
- if expr "$link" : '/.*' > /dev/null; then
- PRG="$link"
- else
- PRG=`dirname "$PRG"`"/$link"
- fi
+app_path=$0
+
+# Need this for daisy-chained symlinks.
+while
+ APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path
+ [ -h "$app_path" ]
+do
+ ls=$( ls -ld "$app_path" )
+ link=${ls#*' -> '}
+ case $link in #(
+ /*) app_path=$link ;; #(
+ *) app_path=$APP_HOME$link ;;
+ esac
done
-SAVED="`pwd`"
-cd "`dirname \"$PRG\"`/" >/dev/null
-APP_HOME="`pwd -P`"
-cd "$SAVED" >/dev/null
-APP_NAME="Gradle"
-APP_BASE_NAME=`basename "$0"`
-
-# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
-DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
+# This is normally unused
+# shellcheck disable=SC2034
+APP_BASE_NAME=${0##*/}
+# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036)
+APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s\n' "$PWD" ) || exit
# Use the maximum available, or set MAX_FD != -1 to use that value.
-MAX_FD="maximum"
+MAX_FD=maximum
warn () {
echo "$*"
-}
+} >&2
die () {
echo
echo "$*"
echo
exit 1
-}
+} >&2
# OS specific support (must be 'true' or 'false').
cygwin=false
msys=false
darwin=false
nonstop=false
-case "`uname`" in
- CYGWIN* )
- cygwin=true
- ;;
- Darwin* )
- darwin=true
- ;;
- MINGW* )
- msys=true
- ;;
- NONSTOP* )
- nonstop=true
- ;;
+case "$( uname )" in #(
+ CYGWIN* ) cygwin=true ;; #(
+ Darwin* ) darwin=true ;; #(
+ MSYS* | MINGW* ) msys=true ;; #(
+ NONSTOP* ) nonstop=true ;;
esac
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
+
# Determine the Java command to use to start the JVM.
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
# IBM's JDK on AIX uses strange locations for the executables
- JAVACMD="$JAVA_HOME/jre/sh/java"
+ JAVACMD=$JAVA_HOME/jre/sh/java
else
- JAVACMD="$JAVA_HOME/bin/java"
+ JAVACMD=$JAVA_HOME/bin/java
fi
if [ ! -x "$JAVACMD" ] ; then
die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
@@ -97,87 +132,120 @@ Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
else
- JAVACMD="java"
- which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
+ JAVACMD=java
+ if ! command -v java >/dev/null 2>&1
+ then
+ die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
+ fi
fi
# Increase the maximum file descriptors if we can.
-if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then
- MAX_FD_LIMIT=`ulimit -H -n`
- if [ $? -eq 0 ] ; then
- if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then
- MAX_FD="$MAX_FD_LIMIT"
- fi
- ulimit -n $MAX_FD
- if [ $? -ne 0 ] ; then
- warn "Could not set maximum file descriptor limit: $MAX_FD"
- fi
- else
- warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT"
- fi
+if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then
+ case $MAX_FD in #(
+ max*)
+ # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked.
+ # shellcheck disable=SC2039,SC3045
+ MAX_FD=$( ulimit -H -n ) ||
+ warn "Could not query maximum file descriptor limit"
+ esac
+ case $MAX_FD in #(
+ '' | soft) :;; #(
+ *)
+ # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked.
+ # shellcheck disable=SC2039,SC3045
+ ulimit -n "$MAX_FD" ||
+ warn "Could not set maximum file descriptor limit to $MAX_FD"
+ esac
fi
-# For Darwin, add options to specify how the application appears in the dock
-if $darwin; then
- GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
-fi
+# Collect all arguments for the java command, stacking in reverse order:
+# * args from the command line
+# * the main class name
+# * -classpath
+# * -D...appname settings
+# * --module-path (only if needed)
+# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables.
# For Cygwin or MSYS, switch paths to Windows format before running java
-if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then
- APP_HOME=`cygpath --path --mixed "$APP_HOME"`
- CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
- JAVACMD=`cygpath --unix "$JAVACMD"`
-
- # We build the pattern for arguments to be converted via cygpath
- ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`
- SEP=""
- for dir in $ROOTDIRSRAW ; do
- ROOTDIRS="$ROOTDIRS$SEP$dir"
- SEP="|"
- done
- OURCYGPATTERN="(^($ROOTDIRS))"
- # Add a user-defined pattern to the cygpath arguments
- if [ "$GRADLE_CYGPATTERN" != "" ] ; then
- OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)"
- fi
+if "$cygwin" || "$msys" ; then
+ APP_HOME=$( cygpath --path --mixed "$APP_HOME" )
+ CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" )
+
+ JAVACMD=$( cygpath --unix "$JAVACMD" )
+
# Now convert the arguments - kludge to limit ourselves to /bin/sh
- i=0
- for arg in "$@" ; do
- CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -`
- CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option
-
- if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition
- eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"`
- else
- eval `echo args$i`="\"$arg\""
+ for arg do
+ if
+ case $arg in #(
+ -*) false ;; # don't mess with options #(
+ /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath
+ [ -e "$t" ] ;; #(
+ *) false ;;
+ esac
+ then
+ arg=$( cygpath --path --ignore --mixed "$arg" )
fi
- i=`expr $i + 1`
+ # Roll the args list around exactly as many times as the number of
+ # args, so each arg winds up back in the position where it started, but
+ # possibly modified.
+ #
+ # NB: a `for` loop captures its iteration list before it begins, so
+ # changing the positional parameters here affects neither the number of
+ # iterations, nor the values presented in `arg`.
+ shift # remove old arg
+ set -- "$@" "$arg" # push replacement arg
done
- case $i in
- 0) set -- ;;
- 1) set -- "$args0" ;;
- 2) set -- "$args0" "$args1" ;;
- 3) set -- "$args0" "$args1" "$args2" ;;
- 4) set -- "$args0" "$args1" "$args2" "$args3" ;;
- 5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;;
- 6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;;
- 7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;;
- 8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;;
- 9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;;
- esac
fi
-# Escape application args
-save () {
- for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
- echo " "
-}
-APP_ARGS=`save "$@"`
-# Collect all arguments for the java command, following the shell quoting and substitution rules
-eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS"
+# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
+DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
+
+# Collect all arguments for the java command:
+# * DEFAULT_JVM_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments,
+# and any embedded shellness will be escaped.
+# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be
+# treated as '${Hostname}' itself on the command line.
+
+set -- \
+ "-Dorg.gradle.appname=$APP_BASE_NAME" \
+ -classpath "$CLASSPATH" \
+ org.gradle.wrapper.GradleWrapperMain \
+ "$@"
+
+# Stop when "xargs" is not available.
+if ! command -v xargs >/dev/null 2>&1
+then
+ die "xargs is not available"
+fi
+
+# Use "xargs" to parse quoted args.
+#
+# With -n1 it outputs one arg per line, with the quotes and backslashes removed.
+#
+# In Bash we could simply go:
+#
+# readarray ARGS < <( xargs -n1 <<<"$var" ) &&
+# set -- "${ARGS[@]}" "$@"
+#
+# but POSIX shell has neither arrays nor command substitution, so instead we
+# post-process each arg (as a line of input to sed) to backslash-escape any
+# character that might be a shell metacharacter, then use eval to reverse
+# that process (while maintaining the separation between arguments), and wrap
+# the whole thing up as a single "set" statement.
+#
+# This will of course break if any of these variables contains a newline or
+# an unmatched quote.
+#
+
+eval "set -- $(
+ printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" |
+ xargs -n1 |
+ sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' |
+ tr '\n' ' '
+ )" '"$@"'
exec "$JAVACMD" "$@"
diff --git a/gradlew.bat b/gradlew.bat
index 9109989e..9b42019c 100644
--- a/gradlew.bat
+++ b/gradlew.bat
@@ -13,8 +13,10 @@
@rem See the License for the specific language governing permissions and
@rem limitations under the License.
@rem
+@rem SPDX-License-Identifier: Apache-2.0
+@rem
-@if "%DEBUG%" == "" @echo off
+@if "%DEBUG%"=="" @echo off
@rem ##########################################################################
@rem
@rem Gradle startup script for Windows
@@ -25,7 +27,8 @@
if "%OS%"=="Windows_NT" setlocal
set DIRNAME=%~dp0
-if "%DIRNAME%" == "" set DIRNAME=.
+if "%DIRNAME%"=="" set DIRNAME=.
+@rem This is normally unused
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%
@@ -40,13 +43,13 @@ if defined JAVA_HOME goto findJavaFromJavaHome
set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1
-if "%ERRORLEVEL%" == "0" goto init
+if %ERRORLEVEL% equ 0 goto execute
-echo.
-echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
-echo.
-echo Please set the JAVA_HOME variable in your environment to match the
-echo location of your Java installation.
+echo. 1>&2
+echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. 1>&2
+echo. 1>&2
+echo Please set the JAVA_HOME variable in your environment to match the 1>&2
+echo location of your Java installation. 1>&2
goto fail
@@ -54,48 +57,36 @@ goto fail
set JAVA_HOME=%JAVA_HOME:"=%
set JAVA_EXE=%JAVA_HOME%/bin/java.exe
-if exist "%JAVA_EXE%" goto init
+if exist "%JAVA_EXE%" goto execute
-echo.
-echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
-echo.
-echo Please set the JAVA_HOME variable in your environment to match the
-echo location of your Java installation.
+echo. 1>&2
+echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% 1>&2
+echo. 1>&2
+echo Please set the JAVA_HOME variable in your environment to match the 1>&2
+echo location of your Java installation. 1>&2
goto fail
-:init
-@rem Get command-line arguments, handling Windows variants
-
-if not "%OS%" == "Windows_NT" goto win9xME_args
-
-:win9xME_args
-@rem Slurp the command line arguments.
-set CMD_LINE_ARGS=
-set _SKIP=2
-
-:win9xME_args_slurp
-if "x%~1" == "x" goto execute
-
-set CMD_LINE_ARGS=%*
-
:execute
@rem Setup the command line
set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
+
@rem Execute Gradle
-"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS%
+"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %*
:end
@rem End local scope for the variables with windows NT shell
-if "%ERRORLEVEL%"=="0" goto mainEnd
+if %ERRORLEVEL% equ 0 goto mainEnd
:fail
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
rem the _cmd.exe /c_ return code!
-if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
-exit /b 1
+set EXIT_CODE=%ERRORLEVEL%
+if %EXIT_CODE% equ 0 set EXIT_CODE=1
+if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE%
+exit /b %EXIT_CODE%
:mainEnd
if "%OS%"=="Windows_NT" endlocal
diff --git a/hourly-tips/src/main/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsExercise.java b/hourly-tips/src/main/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsExercise.java
index 79c89aee..286020a7 100644
--- a/hourly-tips/src/main/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsExercise.java
+++ b/hourly-tips/src/main/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsExercise.java
@@ -22,9 +22,9 @@
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.sink.legacy.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
import org.apache.flink.training.exercises.common.sources.TaxiFareGenerator;
import org.apache.flink.training.exercises.common.utils.MissingSolutionException;
diff --git a/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java b/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java
index 6ae443ff..c1c0cb80 100644
--- a/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java
+++ b/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java
@@ -23,17 +23,18 @@
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.sink.legacy.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
import org.apache.flink.training.exercises.common.sources.TaxiFareGenerator;
import org.apache.flink.util.Collector;
+import java.time.Duration;
+
/**
* Java reference implementation for the Hourly Tips exercise from the Flink training.
*
@@ -89,12 +90,12 @@ public JobExecutionResult execute() throws Exception {
// compute tips per hour for each driver
DataStream> hourlyTips =
fares.keyBy((TaxiFare fare) -> fare.driverId)
- .window(TumblingEventTimeWindows.of(Time.hours(1)))
+ .window(TumblingEventTimeWindows.of(Duration.ofHours(1)))
.process(new AddTips());
// find the driver with the highest sum of tips for each hour
DataStream> hourlyMax =
- hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);
+ hourlyTips.windowAll(TumblingEventTimeWindows.of(Duration.ofHours(1))).maxBy(2);
/* You should explore how this alternative (commented out below) behaves.
* In what ways is the same as, and different from, the solution above (using a windowAll)?
diff --git a/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java b/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
index 1e379c35..46c0c2d4 100644
--- a/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
+++ b/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
@@ -21,7 +21,7 @@
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
import org.apache.flink.training.exercises.common.utils.DataGenerator;
diff --git a/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java b/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
index f555e5da..48ac2092 100644
--- a/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
+++ b/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
@@ -21,13 +21,13 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
-import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.sink.legacy.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
import org.apache.flink.training.exercises.common.utils.MissingSolutionException;
@@ -99,7 +99,7 @@ public static void main(String[] args) throws Exception {
public static class AlertFunction extends KeyedProcessFunction {
@Override
- public void open(Configuration config) throws Exception {
+ public void open(OpenContext config) throws Exception {
throw new MissingSolutionException();
}
diff --git a/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java b/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
index e542817c..329f82a9 100644
--- a/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
+++ b/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
@@ -21,15 +21,15 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
-import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.sink.legacy.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
import org.apache.flink.util.Collector;
@@ -104,7 +104,7 @@ public static class AlertFunction extends KeyedProcessFunction rideState;
@Override
- public void open(Configuration config) {
+ public void open(OpenContext config) {
ValueStateDescriptor rideStateDescriptor =
new ValueStateDescriptor<>("ride event", TaxiRide.class);
rideState = getRuntimeContext().getState(rideStateDescriptor);
diff --git a/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesIntegrationTest.java b/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesIntegrationTest.java
index 792a8dd9..0a5f1092 100644
--- a/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesIntegrationTest.java
+++ b/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesIntegrationTest.java
@@ -20,7 +20,7 @@
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
import org.apache.flink.training.exercises.testing.ComposedPipeline;
diff --git a/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java b/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java
index 1f07312f..20ce9598 100644
--- a/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java
+++ b/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java
@@ -21,9 +21,9 @@
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.sink.legacy.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
import org.apache.flink.training.exercises.common.utils.MissingSolutionException;
diff --git a/ride-cleansing/src/solution/java/org/apache/flink/training/solutions/ridecleansing/RideCleansingSolution.java b/ride-cleansing/src/solution/java/org/apache/flink/training/solutions/ridecleansing/RideCleansingSolution.java
index 46786268..61798bd5 100644
--- a/ride-cleansing/src/solution/java/org/apache/flink/training/solutions/ridecleansing/RideCleansingSolution.java
+++ b/ride-cleansing/src/solution/java/org/apache/flink/training/solutions/ridecleansing/RideCleansingSolution.java
@@ -21,9 +21,9 @@
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.sink.legacy.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
import org.apache.flink.training.exercises.common.utils.GeoUtils;
diff --git a/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java b/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java
index 0662dfc0..f80bc10a 100644
--- a/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java
+++ b/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java
@@ -19,13 +19,13 @@
package org.apache.flink.training.exercises.ridesandfares;
import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
-import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.sink.legacy.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.training.exercises.common.datatypes.RideAndFare;
import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
@@ -100,7 +100,7 @@ public static class EnrichmentFunction
extends RichCoFlatMapFunction {
@Override
- public void open(Configuration config) throws Exception {
+ public void open(OpenContext config) throws Exception {
throw new MissingSolutionException();
}
diff --git a/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java b/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
index 3cdf17a6..7fea72ab 100644
--- a/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
+++ b/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
@@ -19,15 +19,16 @@
package org.apache.flink.training.solutions.ridesandfares;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
-import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.sink.legacy.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.training.exercises.common.datatypes.RideAndFare;
import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
@@ -127,7 +128,7 @@ public static class EnrichmentFunction
private ValueState fareState;
@Override
- public void open(Configuration config) {
+ public void open(OpenContext config) {
rideState =
getRuntimeContext()