Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
java: [8, 11]
java: [17, 21]

steps:
- uses: actions/checkout@v3
Expand All @@ -37,5 +37,5 @@ jobs:
- name: Build without Scala
run: ./gradlew build --scan --stacktrace -Porg.gradle.project.enable_scala=false

- name: Build with Scala
run: ./gradlew build --scan --stacktrace -Porg.gradle.project.enable_scala=true
# - name: Build with Scala
# run: ./gradlew build --scan --stacktrace -Porg.gradle.project.enable_scala=true
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ You will need to set up your environment in order to develop, debug, and execute
Flink supports Linux, OS X, and Windows as development environments for Flink programs and local execution. The following software is required for a Flink development setup and should be installed on your system:

- Git
- a JDK for Java 8 or Java 11 (a JRE is not sufficient; other versions of Java are currently not supported)
- a JDK for Java 11 or Java 17 or Java 21 (a JRE is not sufficient; other versions of Java are currently not supported)
- an IDE for Java (and/or Scala) development with Gradle support
- We recommend [IntelliJ](https://www.jetbrains.com/idea/), but [Eclipse](https://www.eclipse.org/downloads/) or [Visual Studio Code](https://code.visualstudio.com/) (with the [Java extension pack](https://code.visualstudio.com/docs/java/java-tutorial)) can also be used so long as you stick to Java
- For Scala, you will need to use IntelliJ (and its [Scala plugin](https://plugins.jetbrains.com/plugin/1347-scala/))
Expand Down Expand Up @@ -130,7 +130,7 @@ You can also selectively apply this plugin in a single subproject if desired.

The project needs to be imported as a gradle project into your IDE.

Then you should be able to open [`RideCleansingTest`](ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTest.java) and run this test.
Then you should be able to open [`RideCleansingTest`](ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingUnitTest.java) and run this test.

> **:information_source: Note for Scala users:** You will need to use IntelliJ with the JetBrains Scala plugin, and you will need to add a Scala 2.12 SDK to the Global Libraries section of the Project Structure as well as to the module you are working on.
> IntelliJ will ask you for the latter when you open a Scala file.
Expand Down Expand Up @@ -241,6 +241,8 @@ For Java/Scala exercises and solutions, we provide special tasks that can be lis
- [Exercise](long-ride-alerts/README.md)
- [Discussion](long-ride-alerts/DISCUSSION.md)

<a name="contributing"></a>

## Contribute

If you would like to contribute to this repository or add new exercises, please read the [contributing](CONTRIBUTING.md) guide.
Expand Down
4 changes: 2 additions & 2 deletions README_zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ under the License.
Linux、OS X 和 Windows 均可作为 Flink 程序和本地执行的开发环境。 Flink 开发设置需要以下软件,它们应该安装在系统上:

- Git
- Java 8 或者 Java 11 版本的 JDK (JRE不满足要求;目前不支持其他版本的Java)
- Java 11 或者 Java 17 或者 Java 21 版本的 JDK (JRE不满足要求;目前不支持其他版本的Java)
- 支持 Gradle 的 Java (及/或 Scala) 开发IDE
- 推荐使用 [IntelliJ](https://www.jetbrains.com/idea/), 但 [Eclipse](https://www.eclipse.org/downloads/) 或 [Visual Studio Code](https://code.visualstudio.com/) (安装 [Java extension pack](https://code.visualstudio.com/docs/java/java-tutorial) 插件) 也可以用于Java环境
- 为了使用 Scala, 需要使用 IntelliJ (及其 [Scala plugin](https://plugins.jetbrains.com/plugin/1347-scala/) 插件)
Expand Down Expand Up @@ -134,7 +134,7 @@ org.gradle.project.enable_scala = true

本项目应作为 gradle 项目导入到IDE中。

然后应该可以打开 [`RideCleansingTest`](ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTest.java) 并运行此测试。
然后应该可以打开 [`RideCleansingTest`](ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingUnitTest.java) 并运行此测试。

> **:information_source: Scala 用户须知:** 需要将 IntelliJ 与 JetBrains Scala 插件一起使用,并且需要将 Scala 2.12 SDK 添加到项目结构的全局库部分以及工作模块中。
> 当打开 Scala 文件时,IntelliJ 会要求提供后者(JetBrains Scala 插件)。
Expand Down
23 changes: 13 additions & 10 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
*/

plugins {
id 'com.github.johnrengelman.shadow' version '7.0.0' apply false
id "com.diffplug.spotless" version "6.4.2" apply false
id 'com.gradleup.shadow' version '9.0.0-rc2' apply false
id "com.diffplug.spotless" version "7.2.1" apply false
}

description = "Flink Training Exercises"

allprojects {
group = 'org.apache.flink'
version = '1.17-SNAPSHOT'
version = '2.0.0-SNAPSHOT'

apply plugin: 'com.diffplug.spotless'

Expand All @@ -33,7 +33,7 @@ allprojects {
target '*.gradle', '*.md', '.gitignore'

trimTrailingWhitespace()
indentWithSpaces(4)
leadingTabsToSpaces(4)
endWithNewline()
}

Expand Down Expand Up @@ -89,13 +89,13 @@ subprojects {
if (project.properties['org.gradle.project.enable_scala'].trim() == 'true') {
apply plugin: 'scala'
}
apply plugin: 'com.github.johnrengelman.shadow'
apply plugin: 'com.gradleup.shadow'
apply plugin: 'checkstyle'
apply plugin: 'eclipse'

ext {
javaVersion = '1.8'
flinkVersion = '1.17.0'
javaVersion = '17'
flinkVersion = '2.0.0'
scalaBinaryVersion = '2.12'
log4jVersion = '2.12.1'
junitVersion = '4.13'
Expand Down Expand Up @@ -127,9 +127,12 @@ subprojects {
shadow "org.apache.logging.log4j:log4j-core:${log4jVersion}"

shadow "org.apache.flink:flink-clients:${flinkVersion}"
shadow "org.apache.flink:flink-java:${flinkVersion}"
// removed
// shadow "org.apache.flink:flink-java:${flinkVersion}"
shadow "org.apache.flink:flink-runtime:${flinkVersion}"
shadow "org.apache.flink:flink-streaming-java:${flinkVersion}"
shadow "org.apache.flink:flink-streaming-scala_${scalaBinaryVersion}:${flinkVersion}"
// removed
// shadow "org.apache.flink:flink-streaming-scala_${scalaBinaryVersion}:${flinkVersion}"

// allows using Flink's web UI when running in the IDE:
shadow "org.apache.flink:flink-runtime-web:${flinkVersion}"
Expand Down Expand Up @@ -174,7 +177,7 @@ subprojects {

spotless {
java {
googleJavaFormat('1.7').aosp()
googleJavaFormat('1.28.0').aosp()

// \# refers to static imports
importOrder('org.apache.flink', 'org.apache.flink.shaded', '', 'javax', 'java', 'scala', '\\#')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.training.exercises.common.sources;

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
import org.apache.flink.training.exercises.common.utils.DataGenerator;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.training.exercises.common.sources;

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;

import java.util.ArrayList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.training.exercises.testing;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.training.exercises.common.utils.MissingSolutionException;
import org.apache.flink.util.Collector;
Expand Down Expand Up @@ -48,7 +48,7 @@ public ComposedKeyedProcessFunction(
}

@Override
public void open(Configuration parameters) throws Exception {
public void open(OpenContext parameters) throws Exception {

try {
exercise.setRuntimeContext(this.getRuntimeContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.training.exercises.testing;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.training.exercises.common.utils.MissingSolutionException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.training.exercises.testing;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.training.exercises.common.utils.MissingSolutionException;
import org.apache.flink.util.Collector;
Expand Down Expand Up @@ -50,7 +50,7 @@ public ComposedRichCoFlatMapFunction(
}

@Override
public void open(Configuration parameters) throws Exception {
public void open(OpenContext parameters) throws Exception {

try {
exercise.setRuntimeContext(this.getRuntimeContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.training.exercises.testing;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.training.exercises.common.utils.MissingSolutionException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.training.exercises.testing;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;

public interface ExecutablePipeline<IN, OUT> {
JobExecutionResult execute(SourceFunction<IN> source, TestSink<OUT> sink) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.training.exercises.testing;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;

public interface ExecutableTwoInputPipeline<IN1, IN2, OUT> {
JobExecutionResult execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction;

public class ParallelTestSource<T> extends RichParallelSourceFunction<T>
implements ResultTypeQueryable<T> {
Expand All @@ -37,8 +37,9 @@ public ParallelTestSource(T... events) {

@Override
public void run(SourceContext<T> ctx) {
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
int indexOfThisSubtask = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
int numberOfParallelSubtasks =
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
int subtask = 0;

// the elements of the testStream are assigned to the parallel instances in a round-robin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.ListAccumulator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.sink.legacy.RichSinkFunction;

import java.util.List;

Expand All @@ -38,7 +38,7 @@ public TestSink() {
}

@Override
public void open(Configuration parameters) {
public void open(OpenContext parameters) {
getRuntimeContext().addAccumulator(name, new ListAccumulator<OUT>());
}

Expand Down
2 changes: 1 addition & 1 deletion config/checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ This file is based on the checkstyle file of Apache Beam.
<!-- Checks for Javadoc comments. -->
<!-- See http://checkstyle.sf.net/config_javadoc.html -->
<module name="JavadocMethod">
<property name="scope" value="protected"/>
<property name="accessModifiers" value="protected"/>
<property name="severity" value="error"/>
<property name="allowMissingParamTags" value="true"/>
<property name="allowMissingReturnTag" value="true"/>
Expand Down
8 changes: 8 additions & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,11 @@ org.gradle.parallel = true

# Scala exercises can be enabled by setting this to true
org.gradle.project.enable_scala = false

# This is required for Java 16+ to compile the code
org.gradle.jvmargs=-XX:MaxMetaspaceSize=512m \
--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED \
--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED \
--add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED \
--add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED \
--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
4 changes: 3 additions & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.3-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
Loading