Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
java: [8, 11]
java: [17, 21]

steps:
- uses: actions/checkout@v3
Expand All @@ -37,5 +37,5 @@ jobs:
- name: Build without Scala
run: ./gradlew build --scan --stacktrace -Porg.gradle.project.enable_scala=false

- name: Build with Scala
run: ./gradlew build --scan --stacktrace -Porg.gradle.project.enable_scala=true
# - name: Build with Scala
# run: ./gradlew build --scan --stacktrace -Porg.gradle.project.enable_scala=true
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ You will need to set up your environment in order to develop, debug, and execute
Flink supports Linux, OS X, and Windows as development environments for Flink programs and local execution. The following software is required for a Flink development setup and should be installed on your system:

- Git
- a JDK for Java 8 or Java 11 (a JRE is not sufficient; other versions of Java are currently not supported)
- a JDK for Java 11 or Java 17 or Java 21 (a JRE is not sufficient; other versions of Java are currently not supported)
- an IDE for Java (and/or Scala) development with Gradle support
- We recommend [IntelliJ](https://www.jetbrains.com/idea/), but [Eclipse](https://www.eclipse.org/downloads/) or [Visual Studio Code](https://code.visualstudio.com/) (with the [Java extension pack](https://code.visualstudio.com/docs/java/java-tutorial)) can also be used so long as you stick to Java
- For Scala, you will need to use IntelliJ (and its [Scala plugin](https://plugins.jetbrains.com/plugin/1347-scala/))
Expand Down Expand Up @@ -130,7 +130,7 @@ You can also selectively apply this plugin in a single subproject if desired.

The project needs to be imported as a gradle project into your IDE.

Then you should be able to open [`RideCleansingTest`](ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTest.java) and run this test.
Then you should be able to open [`RideCleansingTest`](ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingUnitTest.java) and run this test.

> **:information_source: Note for Scala users:** You will need to use IntelliJ with the JetBrains Scala plugin, and you will need to add a Scala 2.12 SDK to the Global Libraries section of the Project Structure as well as to the module you are working on.
> IntelliJ will ask you for the latter when you open a Scala file.
Expand Down Expand Up @@ -241,6 +241,8 @@ For Java/Scala exercises and solutions, we provide special tasks that can be lis
- [Exercise](long-ride-alerts/README.md)
- [Discussion](long-ride-alerts/DISCUSSION.md)

<a name="contributing"></a>

## Contribute

If you would like to contribute to this repository or add new exercises, please read the [contributing](CONTRIBUTING.md) guide.
Expand Down
4 changes: 2 additions & 2 deletions README_zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ under the License.
Linux、OS X 和 Windows 均可作为 Flink 程序和本地执行的开发环境。 Flink 开发设置需要以下软件,它们应该安装在系统上:

- Git
- Java 8 或者 Java 11 版本的 JDK (JRE不满足要求;目前不支持其他版本的Java)
- Java 11 或者 Java 17 或者 Java 21 版本的 JDK (JRE不满足要求;目前不支持其他版本的Java)
- 支持 Gradle 的 Java (及/或 Scala) 开发IDE
- 推荐使用 [IntelliJ](https://www.jetbrains.com/idea/), 但 [Eclipse](https://www.eclipse.org/downloads/) 或 [Visual Studio Code](https://code.visualstudio.com/) (安装 [Java extension pack](https://code.visualstudio.com/docs/java/java-tutorial) 插件) 也可以用于Java环境
- 为了使用 Scala, 需要使用 IntelliJ (及其 [Scala plugin](https://plugins.jetbrains.com/plugin/1347-scala/) 插件)
Expand Down Expand Up @@ -134,7 +134,7 @@ org.gradle.project.enable_scala = true

本项目应作为 gradle 项目导入到IDE中。

然后应该可以打开 [`RideCleansingTest`](ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTest.java) 并运行此测试。
然后应该可以打开 [`RideCleansingTest`](ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingUnitTest.java) 并运行此测试。

> **:information_source: Scala 用户须知:** 需要将 IntelliJ 与 JetBrains Scala 插件一起使用,并且需要将 Scala 2.12 SDK 添加到项目结构的全局库部分以及工作模块中。
> 当打开 Scala 文件时,IntelliJ 会要求提供后者(JetBrains Scala 插件)。
Expand Down
23 changes: 13 additions & 10 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
*/

plugins {
id 'com.github.johnrengelman.shadow' version '7.0.0' apply false
id "com.diffplug.spotless" version "6.4.2" apply false
id 'com.gradleup.shadow' version '9.0.0-rc2' apply false
id "com.diffplug.spotless" version "7.2.1" apply false
}

description = "Flink Training Exercises"

allprojects {
group = 'org.apache.flink'
version = '1.17-SNAPSHOT'
version = '2.0.0-SNAPSHOT'

apply plugin: 'com.diffplug.spotless'

Expand All @@ -33,7 +33,7 @@ allprojects {
target '*.gradle', '*.md', '.gitignore'

trimTrailingWhitespace()
indentWithSpaces(4)
leadingTabsToSpaces(4)
endWithNewline()
}

Expand Down Expand Up @@ -89,13 +89,13 @@ subprojects {
if (project.properties['org.gradle.project.enable_scala'].trim() == 'true') {
apply plugin: 'scala'
}
apply plugin: 'com.github.johnrengelman.shadow'
apply plugin: 'com.gradleup.shadow'
apply plugin: 'checkstyle'
apply plugin: 'eclipse'

ext {
javaVersion = '1.8'
flinkVersion = '1.17.0'
javaVersion = '17'
flinkVersion = '2.0.0'
scalaBinaryVersion = '2.12'
log4jVersion = '2.12.1'
junitVersion = '4.13'
Expand Down Expand Up @@ -127,9 +127,12 @@ subprojects {
shadow "org.apache.logging.log4j:log4j-core:${log4jVersion}"

shadow "org.apache.flink:flink-clients:${flinkVersion}"
shadow "org.apache.flink:flink-java:${flinkVersion}"
// removed
// shadow "org.apache.flink:flink-java:${flinkVersion}"
shadow "org.apache.flink:flink-runtime:${flinkVersion}"
shadow "org.apache.flink:flink-streaming-java:${flinkVersion}"
shadow "org.apache.flink:flink-streaming-scala_${scalaBinaryVersion}:${flinkVersion}"
// removed
// shadow "org.apache.flink:flink-streaming-scala_${scalaBinaryVersion}:${flinkVersion}"

// allows using Flink's web UI when running in the IDE:
shadow "org.apache.flink:flink-runtime-web:${flinkVersion}"
Expand Down Expand Up @@ -174,7 +177,7 @@ subprojects {

spotless {
java {
googleJavaFormat('1.7').aosp()
googleJavaFormat('1.28.0').aosp()

// \# refers to static imports
importOrder('org.apache.flink', 'org.apache.flink.shaded', '', 'javax', 'java', 'scala', '\\#')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;

import java.time.Duration;

/**
* Example that counts the rides for each driver.
*
Expand All @@ -47,7 +50,18 @@ public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// start the data generator
DataStream<TaxiRide> rides = env.addSource(new TaxiRideGenerator());
DataStream<TaxiRide> rides =
env.fromSource(
new TaxiRideGenerator(),
new BoundedOutOfOrdernessTimestampExtractor<TaxiRide>(
Duration.ofSeconds(10)) {

@Override
public long extractTimestamp(TaxiRide taxiRide) {
return taxiRide.getEventTimeMillis();
}
},
"taxi ride");

// map each ride to a tuple of (driverId, 1)
DataStream<Tuple2<Long, Long>> tuples =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,53 +18,69 @@

package org.apache.flink.training.exercises.common.sources;

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
import org.apache.flink.training.exercises.common.utils.DataGenerator;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicLong;

/**
* This SourceFunction generates a data stream of TaxiFare records.
*
* <p>The stream is generated in order.
*/
public class TaxiFareGenerator implements SourceFunction<TaxiFare> {
public class TaxiFareGenerator extends DataGeneratorSource<TaxiFare> {

private volatile boolean running = true;
private Instant limitingTimestamp = Instant.MAX;

/** Create a bounded TaxiFareGenerator that runs only for the specified duration. */
public static TaxiFareGenerator runFor(Duration duration) {
TaxiFareGenerator generator = new TaxiFareGenerator();
generator.limitingTimestamp = DataGenerator.BEGINNING.plus(duration);
return generator;
}

@Override
public void run(SourceContext<TaxiFare> ctx) throws Exception {

long id = 1;

while (running) {
TaxiFare fare = new TaxiFare(id);
private static Instant limitingTimestamp = Instant.MAX;

/**
* build taxi fare deque.
*
* @return taxiFareDeque
*/
public static ConcurrentLinkedDeque<TaxiFare> buildTaxiFareDeque() {
ConcurrentLinkedDeque<TaxiFare> taxiFareDeque = new ConcurrentLinkedDeque<>();
for (int i = 1; ; i++) {
TaxiFare fare = new TaxiFare(i);
// don't emit events that exceed the specified limit
if (fare.startTime.compareTo(limitingTimestamp) >= 0) {
break;
}

++id;
ctx.collect(fare);

// match our event production rate to that of the TaxiRideGenerator
Thread.sleep(TaxiRideGenerator.SLEEP_MILLIS_PER_EVENT);
taxiFareDeque.push(fare);
}
return taxiFareDeque;
}

/** TaxiFareGenerator. */
public TaxiFareGenerator() {
this(buildTaxiFareDeque());
}

@Override
public void cancel() {
running = false;
/**
* TaxiFareGenerator.
*
* @param taxiFareDeque taxiFareDeque
*/
public TaxiFareGenerator(ConcurrentLinkedDeque<TaxiFare> taxiFareDeque) {
super(
new GeneratorFunction<Long, TaxiFare>() {

private final AtomicLong id = new AtomicLong(0);
private final AtomicLong maxStartTime = new AtomicLong(0);

@Override
public TaxiFare map(Long value) throws Exception {
synchronized (this) {
return taxiFareDeque.poll();
}
}
},
taxiFareDeque.size(),
RateLimiterStrategy.perSecond(200),
TypeInformation.of(TaxiFare.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,69 +18,82 @@

package org.apache.flink.training.exercises.common.sources;

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;

import java.util.ArrayList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicLong;

/**
* This SourceFunction generates a data stream of TaxiRide records.
*
* <p>The stream is produced out-of-order.
*/
public class TaxiRideGenerator implements SourceFunction<TaxiRide> {
public class TaxiRideGenerator extends DataGeneratorSource<TaxiRide> {

public static final int SLEEP_MILLIS_PER_EVENT = 10;
private static final int BATCH_SIZE = 5;
private volatile boolean running = true;

@Override
public void run(SourceContext<TaxiRide> ctx) throws Exception {
/** TaxiRideGenerator. */
public TaxiRideGenerator() {
super(
new GeneratorFunction<Long, TaxiRide>() {

PriorityQueue<TaxiRide> endEventQ = new PriorityQueue<>(100);
long id = 0;
long maxStartTime = 0;
private final AtomicLong id = new AtomicLong(0);
private final AtomicLong maxStartTime = new AtomicLong(0);
private final PriorityQueue<TaxiRide> endEventQ = new PriorityQueue<>(100);
private final ConcurrentLinkedDeque<TaxiRide> deque =
new ConcurrentLinkedDeque<>();

while (running) {
@Override
public TaxiRide map(Long value) throws Exception {
synchronized (this) {
if (deque.isEmpty()) {
long idLocal = id.get();
// generate a batch of START events
List<TaxiRide> startEvents = new ArrayList<TaxiRide>(BATCH_SIZE);
for (int i = 1; i <= BATCH_SIZE; i++) {
TaxiRide ride = new TaxiRide(idLocal + i, true);
startEvents.add(ride);
// the start times may be in order, but let's not assume that
maxStartTime.set(
Math.max(
maxStartTime.get(), ride.getEventTimeMillis()));
}
// enqueue the corresponding END events
for (int i = 1; i <= BATCH_SIZE; i++) {
endEventQ.add(new TaxiRide(idLocal + i, false));
}

// generate a batch of START events
List<TaxiRide> startEvents = new ArrayList<TaxiRide>(BATCH_SIZE);
for (int i = 1; i <= BATCH_SIZE; i++) {
TaxiRide ride = new TaxiRide(id + i, true);
startEvents.add(ride);
// the start times may be in order, but let's not assume that
maxStartTime = Math.max(maxStartTime, ride.getEventTimeMillis());
}
// release the END events coming before the end of this new batch
// (this allows a few END events to precede their matching START
// event)
while (endEventQ.peek().getEventTimeMillis()
<= maxStartTime.get()) {
TaxiRide ride = endEventQ.poll();
deque.push(ride);
}

// enqueue the corresponding END events
for (int i = 1; i <= BATCH_SIZE; i++) {
endEventQ.add(new TaxiRide(id + i, false));
}
// then emit the new START events (out-of-order)
java.util.Collections.shuffle(startEvents, new Random(id.get()));
startEvents.iterator().forEachRemaining(deque::push);

// release the END events coming before the end of this new batch
// (this allows a few END events to precede their matching START event)
while (endEventQ.peek().getEventTimeMillis() <= maxStartTime) {
TaxiRide ride = endEventQ.poll();
ctx.collect(ride);
}

// then emit the new START events (out-of-order)
java.util.Collections.shuffle(startEvents, new Random(id));
startEvents.iterator().forEachRemaining(r -> ctx.collect(r));

// prepare for the next batch
id += BATCH_SIZE;

// don't go too fast
Thread.sleep(BATCH_SIZE * SLEEP_MILLIS_PER_EVENT);
}
}

@Override
public void cancel() {
running = false;
// prepare for the next batch
id.set(id.get() + BATCH_SIZE);
}
return deque.poll();
}
}
},
Long.MAX_VALUE,
RateLimiterStrategy.perSecond(200),
TypeInformation.of(TaxiRide.class));
}
}
Loading