diff --git a/.github/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml b/.github/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml index 0a808f2f8617..d09c1772c7b5 100644 --- a/.github/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml +++ b/.github/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml @@ -63,11 +63,13 @@ env: jobs: beam_PostCommit_Java_PVR_Flink_Batch: - name: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + name: ${{ matrix.job_name }} (${{ matrix.flink_version }}) strategy: matrix: job_name: ["beam_PostCommit_Java_PVR_Flink_Batch"] job_phrase: ["Run Java_PVR_Flink_Batch PostCommit"] + # every major version + flink_version: ['1.20', '2.0'] timeout-minutes: 240 runs-on: [self-hosted, ubuntu-20.04, highmem] if: | @@ -83,13 +85,21 @@ jobs: with: comment_phrase: ${{ matrix.job_phrase }} github_token: ${{ secrets.GITHUB_TOKEN }} - github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + github_job: ${{ matrix.job_name }} (${{ matrix.flink_version }}) - name: Setup environment uses: ./.github/actions/setup-environment-action + - name: run validatesPortableRunnerBatchDataSet script + uses: ./.github/actions/gradle-command-self-hosted-action + if: startsWith(matrix.flink_version, '1') + with: + gradle-command: :runners:flink:${{ matrix.flink_version }}:job-server:validatesPortableRunnerBatchDataSet + env: + CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH }} - name: run validatesPortableRunnerBatch script uses: ./.github/actions/gradle-command-self-hosted-action + if: startsWith(matrix.flink_version, '2') with: - gradle-command: :runners:flink:1.20:job-server:validatesPortableRunnerBatchDataSet + gradle-command: :runners:flink:${{ matrix.flink_version }}:job-server:validatesPortableRunnerBatch env: CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH }} - name: Archive JUnit Test Results @@ -98,9 +108,4 @@ jobs: with: name: JUnit Test Results path: "**/build/reports/tests/" - - name: Upload test report - uses: actions/upload-artifact@v4 - with: - name: java-code-coverage-report - path: "**/build/test-results/**/*.xml" # TODO: Investigate 'Max retries exceeded' issue with EnricoMi/publish-unit-test-result-action@v2. diff --git a/.github/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml b/.github/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml index 3d40c300db0b..7bb170b277e0 100644 --- a/.github/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml +++ b/.github/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml @@ -52,13 +52,15 @@ env: jobs: beam_PostCommit_Java_PVR_Flink_Streaming: - name: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + name: ${{ matrix.job_name }} (${{ matrix.flink_version }}) runs-on: [self-hosted, ubuntu-20.04, main] timeout-minutes: 120 strategy: matrix: job_name: [beam_PostCommit_Java_PVR_Flink_Streaming] job_phrase: [Run Java Flink PortableValidatesRunner Streaming] + # every major version + flink_version: [ '1.20', '2.0' ] if: | github.event_name == 'workflow_dispatch' || github.event_name == 'pull_request_target' || @@ -71,13 +73,13 @@ jobs: with: comment_phrase: ${{ matrix.job_phrase }} github_token: ${{ secrets.GITHUB_TOKEN }} - github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + github_job: ${{ matrix.job_name }} (${{ matrix.flink_version }}) - name: Setup environment uses: ./.github/actions/setup-environment-action - name: run PostCommit Java Flink PortableValidatesRunner Streaming script uses: ./.github/actions/gradle-command-self-hosted-action with: - gradle-command: runners:flink:1.20:job-server:validatesPortableRunnerStreaming + gradle-command: runners:flink:${{ matrix.flink_version }}:job-server:validatesPortableRunnerStreaming - name: Archive JUnit Test Results uses: actions/upload-artifact@v4 if: ${{ !success() }} diff --git a/CHANGES.md b/CHANGES.md index 3e55990184d4..1bd807abe1e4 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -61,8 +61,8 @@ * New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)). * New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)). -* Flink 2.0 support for Java classic Flink runner ([#36947](https://github.com/apache/beam/issues/36947)). - Also added intial, experimental support for Portable Flink runner since this Beam version. +* Flink 2.0 support for Java Classic and Portable Flink Runners ([#36947](https://github.com/apache/beam/issues/36947)), + experimental support for other SDK languages including Python. ## I/Os diff --git a/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java b/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java index ae918083256a..d68acdb68638 100644 --- a/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java +++ b/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java @@ -696,8 +696,9 @@ private void translateImpulse( public static class IsFlinkNativeTransform implements NativeTransforms.IsNativeTransform { @Override public boolean test(RunnerApi.PTransform pTransform) { - return STREAMING_IMPULSE_TRANSFORM_URN.equals( - PTransformTranslation.urnForTransformOrNull(pTransform)); + String urn = PTransformTranslation.urnForTransformOrNull(pTransform); + return STREAMING_IMPULSE_TRANSFORM_URN.equals(urn) + || PTransformTranslation.RESHUFFLE_URN.equals(urn); } } diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index d51c338f1893..7f33d2371496 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -125,7 +125,6 @@ dependencies { shadowTest library.java.guava_testlib shadowTest library.java.mockito_core shadowTest library.java.hamcrest - shadowTest "com.esotericsoftware.kryo:kryo:2.21" shadowTest library.java.quickcheck_core shadowTest library.java.quickcheck_generators shadowTest library.java.zstd_jni diff --git a/sdks/java/extensions/avro/build.gradle b/sdks/java/extensions/avro/build.gradle index 75eb474c60fe..e3afb5ff52e4 100644 --- a/sdks/java/extensions/avro/build.gradle +++ b/sdks/java/extensions/avro/build.gradle @@ -75,6 +75,7 @@ dependencies { testImplementation project(path: ":sdks:java:extensions:avro:vendored-test", configuration: "shadowTest") testImplementation library.java.junit testImplementation "org.tukaani:xz:1.9" // marked as optional in avro + testImplementation "com.esotericsoftware:kryo:5.6.2" // Used by Avro coder test testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") testRuntimeOnly library.java.slf4j_jdk14 avroVersions.each { k,v -> @@ -86,6 +87,7 @@ dependencies { "avroVersion$k" project(path: ":runners:direct-java", configuration: "shadow") "avroVersion$k" library.java.slf4j_jdk14 "avroVersion$k" "org.tukaani:xz:1.9" // marked as optional in avro + "avroVersion$k" "com.esotericsoftware:kryo:5.6.2" // Used by Avro coder test "avroVersion$k" library.java.zstd_jni // marked as optional in avro "avroVersion$k" "org.apache.avro:avro:$v:tests" "avroVersion${k}Generate" "org.apache.avro:avro-tools:$v" diff --git a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoderTest.java b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoderTest.java index fd13e42c0157..2c0337979c05 100644 --- a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoderTest.java +++ b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoderTest.java @@ -286,6 +286,7 @@ public void testKryoSerialization() throws Exception { // Kryo instantiation Kryo kryo = new Kryo(); + kryo.setRegistrationRequired(false); kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()); kryo.addDefaultSerializer(AvroCoder.SerializableSchemaSupplier.class, JavaSerializer.class); diff --git a/sdks/java/extensions/euphoria/build.gradle b/sdks/java/extensions/euphoria/build.gradle index 3f41963638ba..d26b3b4d3661 100644 --- a/sdks/java/extensions/euphoria/build.gradle +++ b/sdks/java/extensions/euphoria/build.gradle @@ -30,7 +30,7 @@ dependencies { implementation library.java.joda_time implementation library.java.slf4j_api implementation library.java.vendored_guava_32_1_2_jre - testImplementation project(":sdks:java:extensions:kryo") + testImplementation project(path: ":sdks:java:extensions:kryo", configuration: "shadow") testImplementation library.java.slf4j_api testImplementation library.java.hamcrest testImplementation library.java.mockito_core diff --git a/sdks/java/extensions/kryo/build.gradle b/sdks/java/extensions/kryo/build.gradle index 29dfd453cc37..88ef2b46fc8d 100644 --- a/sdks/java/extensions/kryo/build.gradle +++ b/sdks/java/extensions/kryo/build.gradle @@ -19,7 +19,7 @@ plugins { id 'org.apache.beam.module' } ext { - kryoVersion = '5.5.0' + kryoVersion = '5.6.2' } applyJavaNature( automaticModuleName: 'org.apache.beam.sdk.extensions.kryo',