From f29e3d3841cf2a857014274eaf4c72ebb4ee5ca9 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Mon, 9 Feb 2026 14:46:36 -0500 Subject: [PATCH 1/4] Exercise Flink 2.0 Python Validates Runner tests --- .../beam_PostCommit_PortableJar_Flink.json | 1 + ...beam_PostCommit_Python_Examples_Flink.json | 1 + .../beam_PostCommit_Python_Portable_Flink.yml | 7 +-- .../beam_PreCommit_Python_PVR_Flink.yml | 3 +- ...nkStreamingPortablePipelineTranslator.java | 21 +++++++-- ...nkStreamingPortablePipelineTranslator.java | 21 +++++++-- .../sql/expansion-service/build.gradle | 1 - .../runners/portability/flink_runner_test.py | 24 ++++++++-- .../python/test-suites/portable/common.gradle | 46 +++++++++---------- 9 files changed, 81 insertions(+), 44 deletions(-) create mode 100644 .github/trigger_files/beam_PostCommit_PortableJar_Flink.json create mode 100644 .github/trigger_files/beam_PostCommit_Python_Examples_Flink.json diff --git a/.github/trigger_files/beam_PostCommit_PortableJar_Flink.json b/.github/trigger_files/beam_PostCommit_PortableJar_Flink.json new file mode 100644 index 000000000000..0967ef424bce --- /dev/null +++ b/.github/trigger_files/beam_PostCommit_PortableJar_Flink.json @@ -0,0 +1 @@ +{} diff --git a/.github/trigger_files/beam_PostCommit_Python_Examples_Flink.json b/.github/trigger_files/beam_PostCommit_Python_Examples_Flink.json new file mode 100644 index 000000000000..0967ef424bce --- /dev/null +++ b/.github/trigger_files/beam_PostCommit_Python_Examples_Flink.json @@ -0,0 +1 @@ +{} diff --git a/.github/workflows/beam_PostCommit_Python_Portable_Flink.yml b/.github/workflows/beam_PostCommit_Python_Portable_Flink.yml index f3c032ebffe2..1592e50e0bb2 100644 --- a/.github/workflows/beam_PostCommit_Python_Portable_Flink.yml +++ b/.github/workflows/beam_PostCommit_Python_Portable_Flink.yml @@ -63,8 +63,9 @@ jobs: job_name: ["beam_PostCommit_Python_Portable_Flink"] job_phrase: ["Run Python Portable Flink"] # TODO: Enable PROCESS https://github.com/apache/beam/issues/35702 - # environment_type: ['DOCKER', 'LOOPBACK', 'PROCESS'] - environment_type: ['DOCKER', 'LOOPBACK'] + # all environment_type: ['DOCKER', 'LOOPBACK', 'PROCESS'] + # Run modes not covered by PreCommit_Python_PVR_Flink (i.e. other than 'LOOPBACK') + environment_type: ['DOCKER'] steps: - uses: actions/checkout@v4 - name: Setup repository @@ -83,7 +84,7 @@ jobs: CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH}} uses: ./.github/actions/gradle-command-self-hosted-action with: - gradle-command: :sdks:python:test-suites:portable:py310:flinkCompatibilityMatrix${{ matrix.environment_type }} + gradle-command: :sdks:python:test-suites:portable:py310:flink2CompatibilityMatrix${{ matrix.environment_type }} arguments: | -PpythonVersion=3.10 \ - name: Archive Python Test Results diff --git a/.github/workflows/beam_PreCommit_Python_PVR_Flink.yml b/.github/workflows/beam_PreCommit_Python_PVR_Flink.yml index 588605aa2c2d..2c9dcf289552 100644 --- a/.github/workflows/beam_PreCommit_Python_PVR_Flink.yml +++ b/.github/workflows/beam_PreCommit_Python_PVR_Flink.yml @@ -106,7 +106,8 @@ jobs: env: CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH}} with: - gradle-command: :sdks:python:test-suites:portable:py313:flinkValidatesRunner + # Run Flink 2 tests. Flink 1.20 is covered by PostCommit_Python_ValidatesRunner_Flink + gradle-command: :sdks:python:test-suites:portable:py313:flink2ValidatesRunner arguments: | -PpythonVersion=3.13 \ - name: Archive Python Test Results diff --git a/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java b/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java index d68acdb68638..e8929b84593a 100644 --- a/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java +++ b/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java @@ -100,6 +100,7 @@ import org.apache.beam.sdk.values.WindowedValues.WindowedValueCoder; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashMultiset; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; @@ -987,8 +988,11 @@ private void translateTestStream( // stage String sideInputTag = sideInputId.getLocalName(); String collectionId = - components - .getTransformsOrThrow(sideInputId.getTransformId()) + MoreObjects.firstNonNull( + components.getTransformsOrDefault(sideInputId.getTransformId(), null), + // In the case of optimized pipeline, side input transform may not be found in + // component proto + stagePayload.getComponents().getTransformsOrThrow(sideInputId.getTransformId())) .getInputsOrThrow(sideInputId.getLocalName()); RunnerApi.WindowingStrategy windowingStrategyProto = components.getWindowingStrategiesOrThrow( @@ -1045,8 +1049,11 @@ private TransformedSideInputs transformSideInputs( tagToIntMapping.put(tag, count); count++; String collectionId = - components - .getTransformsOrThrow(sideInput.getKey().getTransformId()) + MoreObjects.firstNonNull( + components.getTransformsOrDefault(sideInput.getKey().getTransformId(), null), + stagePayload + .getComponents() + .getTransformsOrThrow(sideInput.getKey().getTransformId())) .getInputsOrThrow(sideInput.getKey().getLocalName()); DataStream sideInputStream = context.getDataStreamOrThrow(collectionId); TypeInformation tpe = sideInputStream.getType(); @@ -1078,7 +1085,11 @@ private TransformedSideInputs transformSideInputs( TupleTag tag = sideInput.getValue().getTagInternal(); final int intTag = tagToIntMapping.get(tag); RunnerApi.PTransform pTransform = - components.getTransformsOrThrow(sideInput.getKey().getTransformId()); + MoreObjects.firstNonNull( + components.getTransformsOrDefault(sideInput.getKey().getTransformId(), null), + stagePayload + .getComponents() + .getTransformsOrThrow(sideInput.getKey().getTransformId())); String collectionId = pTransform.getInputsOrThrow(sideInput.getKey().getLocalName()); DataStream> sideInputStream = context.getDataStreamOrThrow(collectionId); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java index cad90de8ceed..caa5a1788c86 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java @@ -100,6 +100,7 @@ import org.apache.beam.sdk.values.WindowedValues.WindowedValueCoder; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashMultiset; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; @@ -986,8 +987,11 @@ private void translateTestStream( // stage String sideInputTag = sideInputId.getLocalName(); String collectionId = - components - .getTransformsOrThrow(sideInputId.getTransformId()) + MoreObjects.firstNonNull( + components.getTransformsOrDefault(sideInputId.getTransformId(), null), + // In the case of optimized pipeline, side input transform may not be found in + // component proto + stagePayload.getComponents().getTransformsOrThrow(sideInputId.getTransformId())) .getInputsOrThrow(sideInputId.getLocalName()); RunnerApi.WindowingStrategy windowingStrategyProto = components.getWindowingStrategiesOrThrow( @@ -1044,8 +1048,11 @@ private TransformedSideInputs transformSideInputs( tagToIntMapping.put(tag, count); count++; String collectionId = - components - .getTransformsOrThrow(sideInput.getKey().getTransformId()) + MoreObjects.firstNonNull( + components.getTransformsOrDefault(sideInput.getKey().getTransformId(), null), + stagePayload + .getComponents() + .getTransformsOrThrow(sideInput.getKey().getTransformId())) .getInputsOrThrow(sideInput.getKey().getLocalName()); DataStream sideInputStream = context.getDataStreamOrThrow(collectionId); TypeInformation tpe = sideInputStream.getType(); @@ -1077,7 +1084,11 @@ private TransformedSideInputs transformSideInputs( TupleTag tag = sideInput.getValue().getTagInternal(); final int intTag = tagToIntMapping.get(tag); RunnerApi.PTransform pTransform = - components.getTransformsOrThrow(sideInput.getKey().getTransformId()); + MoreObjects.firstNonNull( + components.getTransformsOrDefault(sideInput.getKey().getTransformId(), null), + stagePayload + .getComponents() + .getTransformsOrThrow(sideInput.getKey().getTransformId())); String collectionId = pTransform.getInputsOrThrow(sideInput.getKey().getLocalName()); DataStream> sideInputStream = context.getDataStreamOrThrow(collectionId); diff --git a/sdks/java/extensions/sql/expansion-service/build.gradle b/sdks/java/extensions/sql/expansion-service/build.gradle index 8b5bd8c69240..562c1ac8dc76 100644 --- a/sdks/java/extensions/sql/expansion-service/build.gradle +++ b/sdks/java/extensions/sql/expansion-service/build.gradle @@ -56,5 +56,4 @@ shadowJar { manifest { attributes(["Multi-Release": true]) } - outputs.upToDateWhen { false } } diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py index dbeef557ab5a..d2cfb6575050 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py @@ -18,6 +18,7 @@ import argparse import logging +import platform import shlex import typing import unittest @@ -139,6 +140,7 @@ def _create_conf_dir(cls): cls.test_metrics_path = path.join(cls.conf_dir, 'test-metrics.txt') # path to write Flink configuration to + # Flink 1.x conf: conf_path = path.join(cls.conf_dir, 'flink-conf.yaml') file_reporter = 'org.apache.beam.runners.flink.metrics.FileReporter' with open(conf_path, 'w') as f: @@ -149,6 +151,19 @@ def _create_conf_dir(cls): 'metrics.reporter.file.path: %s' % cls.test_metrics_path, 'metrics.scope.operator: ', ])) + # Flink 2.x conf: + conf_path_2 = path.join(cls.conf_dir, 'config.yaml') + with open(conf_path_2, 'w') as f: + f.write( + '''metrics: + reporters: file + reporter: + file: + class: %s + path: %s + scope: + operator: +''' % (file_reporter, cls.test_metrics_path)) @classmethod def _subprocess_command(cls, job_port, expansion_port): @@ -158,11 +173,12 @@ def _subprocess_command(cls, job_port, expansion_port): cls._create_conf_dir() cls.expansion_port = expansion_port - + platform_specific_opts = [] + if platform.system() == 'Linux': + # UseContainerSupport is supported in Linux and turned on by default + platform_specific_opts.append('-XX:-UseContainerSupport') try: - return [ - 'java', - '-XX:-UseContainerSupport', + return ['java'] + platform_specific_opts + [ '--add-opens=java.base/java.lang=ALL-UNNAMED', '--add-opens=java.base/java.nio=ALL-UNNAMED', '--add-opens=java.base/java.util=ALL-UNNAMED', diff --git a/sdks/python/test-suites/portable/common.gradle b/sdks/python/test-suites/portable/common.gradle index f7fa3e8e0b26..806e11a38419 100644 --- a/sdks/python/test-suites/portable/common.gradle +++ b/sdks/python/test-suites/portable/common.gradle @@ -22,25 +22,33 @@ import org.apache.tools.ant.taskdefs.condition.Os def pythonRootDir = "${rootDir}/sdks/python" def pythonVersionSuffix = project.ext.pythonVersion.replace('.', '') +// TODO(https://github.com/apache/beam/issues/36947): Remove when dropping Flink 1.x support def latestFlinkVersion = project.ext.latestFlinkVersion +def latestFlink2Version = '2.0' def currentJavaVersion = project.ext.currentJavaVersion ext { pythonContainerTask = ":sdks:python:container:py${pythonVersionSuffix}:docker" } -def createFlinkRunnerTestTask(String workerType) { - def taskName = "flinkCompatibilityMatrix${workerType}" - // project(":runners:flink:${latestFlinkVersion}:job-server").shadowJar.archivePath is not resolvable until runtime, so hard-code it here. - def jobServerJar = "${rootDir}/runners/flink/${latestFlinkVersion}/job-server/build/libs/beam-runners-flink-${latestFlinkVersion}-job-server-${version}.jar" +def createFlinkRunnerTestTask(String workerType, String flinkVersion) { + String taskName + + // project(":runners:flink:${flinkVersion}:job-server").shadowJar.archivePath is not resolvable until runtime, so hard-code it here. + def jobServerJar = "${rootDir}/runners/flink/${flinkVersion}/job-server/build/libs/beam-runners-flink-${flinkVersion}-job-server-${version}.jar" def options = "--flink_job_server_jar=${jobServerJar} --environment_type=${workerType}" + if (flinkVersion.startsWith('2')) { + taskName = "flink2CompatibilityMatrix${workerType}" + } else { + taskName = "flinkCompatibilityMatrix${workerType}" + } if (workerType == 'PROCESS') { options += " --environment_options=process_command=${buildDir.absolutePath}/sdk_worker.sh" } def task = toxTask(taskName, 'flink-runner-test', options) // Through the Flink job server, we transitively add dependencies on the expansion services needed in tests. task.configure { - dependsOn ":runners:flink:${latestFlinkVersion}:job-server:shadowJar" + dependsOn ":runners:flink:${flinkVersion}:job-server:shadowJar" // The Java SDK worker is required to execute external transforms. def suffix = getSupportedJavaVersion() dependsOn ":sdks:java:container:${suffix}:docker" @@ -53,31 +61,19 @@ def createFlinkRunnerTestTask(String workerType) { return task } -createFlinkRunnerTestTask('DOCKER') -createFlinkRunnerTestTask('PROCESS') -createFlinkRunnerTestTask('LOOPBACK') +createFlinkRunnerTestTask('DOCKER', latestFlinkVersion) +createFlinkRunnerTestTask('PROCESS', latestFlinkVersion) +createFlinkRunnerTestTask('LOOPBACK', latestFlinkVersion) +createFlinkRunnerTestTask('DOCKER', latestFlink2Version) +createFlinkRunnerTestTask('PROCESS', latestFlink2Version) +createFlinkRunnerTestTask('LOOPBACK', latestFlink2Version) task flinkValidatesRunner() { dependsOn 'flinkCompatibilityMatrixLOOPBACK' } -// TODO(https://github.com/apache/beam/issues/19962): Enable on pre-commit. -tasks.register("flinkTriggerTranscript") { - dependsOn 'setupVirtualenv' - dependsOn ":runners:flink:${latestFlinkVersion}:job-server:shadowJar" - doLast { - exec { - executable 'sh' - args '-c', """ - . ${envdir}/bin/activate \\ - && cd ${pythonRootDir} \\ - && pip install -e .[test] \\ - && pytest \\ - apache_beam/transforms/trigger_test.py::WeakTestStreamTranscriptTest \\ - --test-pipeline-options='--runner=FlinkRunner --environment_type=LOOPBACK --flink_job_server_jar=${project(":runners:flink:${latestFlinkVersion}:job-server:").shadowJar.archivePath}' - """ - } - } +task flink2ValidatesRunner() { + dependsOn 'flink2CompatibilityMatrixLOOPBACK' } // Verifies BEAM-10702. From 74fd01e775551bd0e2c0e420f4dcc3a887c77b6c Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 13 Feb 2026 10:39:06 -0500 Subject: [PATCH 2/4] clean up TODOs: Move Flink 2.0 as latestFlinkVersion --- .../beam_PostCommit_Go_VR_Flink.json | 2 +- ...beam_PostCommit_Python_Examples_Flink.json | 4 +- .../beam_PostCommit_Python_Portable_Flink.yml | 2 +- ...ostCommit_Python_ValidatesRunner_Flink.yml | 2 +- .../beam_PreCommit_Python_PVR_Flink.yml | 2 +- CHANGES.md | 3 +- .../beam/gradle/BeamModulePlugin.groovy | 3 +- .../python/test-suites/portable/common.gradle | 46 +++++++++---------- 8 files changed, 32 insertions(+), 32 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json b/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json index d5ac7fc60d7f..83b506b55ed5 100644 --- a/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json +++ b/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json @@ -1,6 +1,6 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 1, + "modification": 2, "https://github.com/apache/beam/pull/32440": "testing datastream optimizations", "https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 support" } diff --git a/.github/trigger_files/beam_PostCommit_Python_Examples_Flink.json b/.github/trigger_files/beam_PostCommit_Python_Examples_Flink.json index 0967ef424bce..9f5479a1277b 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Examples_Flink.json +++ b/.github/trigger_files/beam_PostCommit_Python_Examples_Flink.json @@ -1 +1,3 @@ -{} +{ + "modification": "#37313" +} diff --git a/.github/workflows/beam_PostCommit_Python_Portable_Flink.yml b/.github/workflows/beam_PostCommit_Python_Portable_Flink.yml index 1592e50e0bb2..aaf1dd51e264 100644 --- a/.github/workflows/beam_PostCommit_Python_Portable_Flink.yml +++ b/.github/workflows/beam_PostCommit_Python_Portable_Flink.yml @@ -84,7 +84,7 @@ jobs: CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH}} uses: ./.github/actions/gradle-command-self-hosted-action with: - gradle-command: :sdks:python:test-suites:portable:py310:flink2CompatibilityMatrix${{ matrix.environment_type }} + gradle-command: :sdks:python:test-suites:portable:py310:flinkCompatibilityMatrix${{ matrix.environment_type }} arguments: | -PpythonVersion=3.10 \ - name: Archive Python Test Results diff --git a/.github/workflows/beam_PostCommit_Python_ValidatesRunner_Flink.yml b/.github/workflows/beam_PostCommit_Python_ValidatesRunner_Flink.yml index 51006c079b7e..15f4cbc0a8cb 100644 --- a/.github/workflows/beam_PostCommit_Python_ValidatesRunner_Flink.yml +++ b/.github/workflows/beam_PostCommit_Python_ValidatesRunner_Flink.yml @@ -88,7 +88,7 @@ jobs: CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH}} uses: ./.github/actions/gradle-command-self-hosted-action with: - gradle-command: :sdks:python:test-suites:portable:py${{steps.set_py_ver_clean.outputs.py_ver_clean}}:flinkValidatesRunner + gradle-command: :sdks:python:test-suites:portable:py${{steps.set_py_ver_clean.outputs.py_ver_clean}}:flink1ValidatesRunner arguments: | -PpythonVersion=${{ matrix.python_version }} \ - name: Archive Python Test Results diff --git a/.github/workflows/beam_PreCommit_Python_PVR_Flink.yml b/.github/workflows/beam_PreCommit_Python_PVR_Flink.yml index 2c9dcf289552..05603cb6a210 100644 --- a/.github/workflows/beam_PreCommit_Python_PVR_Flink.yml +++ b/.github/workflows/beam_PreCommit_Python_PVR_Flink.yml @@ -107,7 +107,7 @@ jobs: CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH}} with: # Run Flink 2 tests. Flink 1.20 is covered by PostCommit_Python_ValidatesRunner_Flink - gradle-command: :sdks:python:test-suites:portable:py313:flink2ValidatesRunner + gradle-command: :sdks:python:test-suites:portable:py313:flinkValidatesRunner arguments: | -PpythonVersion=3.13 \ - name: Archive Python Test Results diff --git a/CHANGES.md b/CHANGES.md index 1bd807abe1e4..9a9d0d0b8784 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -61,8 +61,7 @@ * New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)). * New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)). -* Flink 2.0 support for Java Classic and Portable Flink Runners ([#36947](https://github.com/apache/beam/issues/36947)), - experimental support for other SDK languages including Python. +* Flink 2.0 support ([#36947](https://github.com/apache/beam/issues/36947)). ## I/Os diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 774674b545ed..ee8f977f1d3e 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -552,8 +552,7 @@ class BeamModulePlugin implements Plugin { project.ext.currentJavaVersion = getSupportedJavaVersion() project.ext.allFlinkVersions = project.flink_versions.split(',') - // TODO(https://github.com/apache/beam/issues/36947): Move to use project.ext.allFlinkVersions.last() when Flink 2 support completed - project.ext.latestFlinkVersion = '1.20' + project.ext.latestFlinkVersion = project.ext.allFlinkVersions.last() project.ext.nativeArchitecture = { // Best guess as to this system's normalized native architecture name. diff --git a/sdks/python/test-suites/portable/common.gradle b/sdks/python/test-suites/portable/common.gradle index 806e11a38419..8846fae153ff 100644 --- a/sdks/python/test-suites/portable/common.gradle +++ b/sdks/python/test-suites/portable/common.gradle @@ -22,9 +22,9 @@ import org.apache.tools.ant.taskdefs.condition.Os def pythonRootDir = "${rootDir}/sdks/python" def pythonVersionSuffix = project.ext.pythonVersion.replace('.', '') -// TODO(https://github.com/apache/beam/issues/36947): Remove when dropping Flink 1.x support def latestFlinkVersion = project.ext.latestFlinkVersion -def latestFlink2Version = '2.0' +// TODO(https://github.com/apache/beam/issues/36947): Remove when dropping Flink 1.x support +def latestFlink1Version = '1.20' def currentJavaVersion = project.ext.currentJavaVersion ext { @@ -37,8 +37,8 @@ def createFlinkRunnerTestTask(String workerType, String flinkVersion) { // project(":runners:flink:${flinkVersion}:job-server").shadowJar.archivePath is not resolvable until runtime, so hard-code it here. def jobServerJar = "${rootDir}/runners/flink/${flinkVersion}/job-server/build/libs/beam-runners-flink-${flinkVersion}-job-server-${version}.jar" def options = "--flink_job_server_jar=${jobServerJar} --environment_type=${workerType}" - if (flinkVersion.startsWith('2')) { - taskName = "flink2CompatibilityMatrix${workerType}" + if (flinkVersion.startsWith('1')) { + taskName = "flink1CompatibilityMatrix${workerType}" } else { taskName = "flinkCompatibilityMatrix${workerType}" } @@ -64,16 +64,16 @@ def createFlinkRunnerTestTask(String workerType, String flinkVersion) { createFlinkRunnerTestTask('DOCKER', latestFlinkVersion) createFlinkRunnerTestTask('PROCESS', latestFlinkVersion) createFlinkRunnerTestTask('LOOPBACK', latestFlinkVersion) -createFlinkRunnerTestTask('DOCKER', latestFlink2Version) -createFlinkRunnerTestTask('PROCESS', latestFlink2Version) -createFlinkRunnerTestTask('LOOPBACK', latestFlink2Version) +createFlinkRunnerTestTask('DOCKER', latestFlink1Version) +createFlinkRunnerTestTask('PROCESS', latestFlink1Version) +createFlinkRunnerTestTask('LOOPBACK', latestFlink1Version) -task flinkValidatesRunner() { - dependsOn 'flinkCompatibilityMatrixLOOPBACK' +task flink1ValidatesRunner() { + dependsOn 'flink1CompatibilityMatrixLOOPBACK' } -task flink2ValidatesRunner() { - dependsOn 'flink2CompatibilityMatrixLOOPBACK' +task flinkValidatesRunner() { + dependsOn 'flinkCompatibilityMatrixLOOPBACK' } // Verifies BEAM-10702. @@ -253,7 +253,7 @@ tasks.register("prismTriggerTranscript") { project.tasks.register("preCommitPy${pythonVersionSuffix}") { dependsOn = [":sdks:python:container:py${pythonVersionSuffix}:docker", - ":runners:flink:${latestFlinkVersion}:job-server:shadowJar", + ":runners:flink:${latestFlink1Version}:job-server:shadowJar", 'portableWordCountFlinkRunnerBatch', 'portableWordCountFlinkRunnerStreaming'] } @@ -272,7 +272,7 @@ project.tasks.register("flinkExamples") { dependsOn = [ 'setupVirtualenv', 'installGcpTest', - ":runners:flink:${latestFlinkVersion}:job-server:shadowJar" + ":runners:flink:${latestFlink1Version}:job-server:shadowJar" ] doLast { def testOpts = [ @@ -284,7 +284,7 @@ project.tasks.register("flinkExamples") { "--project=apache-beam-testing", "--environment_type=LOOPBACK", "--temp_location=gs://temp-storage-for-end-to-end-tests/temp-it", - "--flink_job_server_jar=${project(":runners:flink:${latestFlinkVersion}:job-server").shadowJar.archivePath}", + "--flink_job_server_jar=${project(":runners:flink:${latestFlink1Version}:job-server").shadowJar.archivePath}", "--flink_conf_dir=${flink_conf_dir}", '--sdk_harness_log_level_overrides=' + // suppress info level flink.runtime log flood @@ -384,7 +384,7 @@ project.tasks.register("postCommitPy${pythonVersionSuffix}IT") { dependsOn = [ 'setupVirtualenv', 'installGcpTest', - ":runners:flink:${latestFlinkVersion}:job-server:shadowJar", + ":runners:flink:${latestFlink1Version}:job-server:shadowJar", ":sdks:java:container:${fork_java_version}:docker", ':sdks:java:testing:kafka-service:buildTestKafkaServiceJar', ':sdks:java:io:expansion-service:shadowJar', @@ -408,7 +408,7 @@ project.tasks.register("postCommitPy${pythonVersionSuffix}IT") { "--project=apache-beam-testing", "--environment_type=LOOPBACK", "--temp_location=gs://temp-storage-for-end-to-end-tests/temp-it", - "--flink_job_server_jar=${project(":runners:flink:${latestFlinkVersion}:job-server").shadowJar.archivePath}", + "--flink_job_server_jar=${project(":runners:flink:${latestFlink1Version}:job-server").shadowJar.archivePath}", "--flink_conf_dir=${flink_conf_dir}", '--sdk_harness_log_level_overrides=' + // suppress info level flink.runtime log flood @@ -440,7 +440,7 @@ project.tasks.register("xlangSpannerIOIT") { dependsOn = [ 'setupVirtualenv', 'installGcpTest', - ":runners:flink:${latestFlinkVersion}:job-server:shadowJar", + ":runners:flink:${latestFlink1Version}:job-server:shadowJar", ":sdks:java:container:${currentJavaVersion}:docker", ':sdks:java:io:expansion-service:shadowJar', ':sdks:java:io:google-cloud-platform:expansion-service:shadowJar', @@ -459,7 +459,7 @@ project.tasks.register("xlangSpannerIOIT") { "--project=apache-beam-testing", "--environment_type=LOOPBACK", "--temp_location=gs://temp-storage-for-end-to-end-tests/temp-it", - "--flink_job_server_jar=${project(":runners:flink:${latestFlinkVersion}:job-server").shadowJar.archivePath}", + "--flink_job_server_jar=${project(":runners:flink:${latestFlink1Version}:job-server").shadowJar.archivePath}", '--sdk_harness_log_level_overrides=' + // suppress info level flink.runtime log flood '{\\"org.apache.flink.runtime\\":\\"WARN\\",' + @@ -504,20 +504,20 @@ def addTestJavaJarCreator(String runner, Task jobServerJarTask) { } // TODO(BEAM-11333) Update and test multiple Flink versions. -addTestJavaJarCreator("FlinkRunner", tasks.getByPath(":runners:flink:${latestFlinkVersion}:job-server:shadowJar")) +addTestJavaJarCreator("FlinkRunner", tasks.getByPath(":runners:flink:${latestFlink1Version}:job-server:shadowJar")) addTestJavaJarCreator("SparkRunner", tasks.getByPath(":runners:spark:3:job-server:shadowJar")) def addTestFlinkUberJar(boolean saveMainSession) { project.tasks.register("testUberJarFlinkRunner${saveMainSession ? 'SaveMainSession' : ''}") { - dependsOn ":runners:flink:${latestFlinkVersion}:job-server:shadowJar" - dependsOn ":runners:flink:${latestFlinkVersion}:job-server:miniCluster" + dependsOn ":runners:flink:${latestFlink1Version}:job-server:shadowJar" + dependsOn ":runners:flink:${latestFlink1Version}:job-server:miniCluster" dependsOn pythonContainerTask doLast{ exec { executable "sh" def options = [ - "--flink_job_server_jar ${tasks.getByPath(":runners:flink:${latestFlinkVersion}:job-server:shadowJar").archivePath}", - "--flink_mini_cluster_jar ${tasks.getByPath(":runners:flink:${latestFlinkVersion}:job-server:miniCluster").archivePath}", + "--flink_job_server_jar ${tasks.getByPath(":runners:flink:${latestFlink1Version}:job-server:shadowJar").archivePath}", + "--flink_mini_cluster_jar ${tasks.getByPath(":runners:flink:${latestFlink1Version}:job-server:miniCluster").archivePath}", "--env_dir ${project.rootProject.buildDir}/gradleenv/${project.path.hashCode()}", "--python_root_dir ${project.rootDir}/sdks/python", "--python_version ${project.ext.pythonVersion}", From d78249fe489cfa40a1493a47983461517e6addb4 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 13 Feb 2026 11:13:18 -0500 Subject: [PATCH 3/4] Fix PortableJar PostCommit --- sdks/python/test-suites/portable/common.gradle | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdks/python/test-suites/portable/common.gradle b/sdks/python/test-suites/portable/common.gradle index 8846fae153ff..843d32705c2f 100644 --- a/sdks/python/test-suites/portable/common.gradle +++ b/sdks/python/test-suites/portable/common.gradle @@ -253,7 +253,7 @@ tasks.register("prismTriggerTranscript") { project.tasks.register("preCommitPy${pythonVersionSuffix}") { dependsOn = [":sdks:python:container:py${pythonVersionSuffix}:docker", - ":runners:flink:${latestFlink1Version}:job-server:shadowJar", + ":runners:flink:${latestFlinkVersion}:job-server:shadowJar", 'portableWordCountFlinkRunnerBatch', 'portableWordCountFlinkRunnerStreaming'] } @@ -509,15 +509,15 @@ addTestJavaJarCreator("SparkRunner", tasks.getByPath(":runners:spark:3:job-serve def addTestFlinkUberJar(boolean saveMainSession) { project.tasks.register("testUberJarFlinkRunner${saveMainSession ? 'SaveMainSession' : ''}") { - dependsOn ":runners:flink:${latestFlink1Version}:job-server:shadowJar" - dependsOn ":runners:flink:${latestFlink1Version}:job-server:miniCluster" + dependsOn ":runners:flink:${latestFlinkVersion}:job-server:shadowJar" + dependsOn ":runners:flink:${latestFlinkVersion}:job-server:miniCluster" dependsOn pythonContainerTask doLast{ exec { executable "sh" def options = [ - "--flink_job_server_jar ${tasks.getByPath(":runners:flink:${latestFlink1Version}:job-server:shadowJar").archivePath}", - "--flink_mini_cluster_jar ${tasks.getByPath(":runners:flink:${latestFlink1Version}:job-server:miniCluster").archivePath}", + "--flink_job_server_jar ${tasks.getByPath(":runners:flink:${latestFlinkVersion}:job-server:shadowJar").archivePath}", + "--flink_mini_cluster_jar ${tasks.getByPath(":runners:flink:${latestFlinkVersion}:job-server:miniCluster").archivePath}", "--env_dir ${project.rootProject.buildDir}/gradleenv/${project.path.hashCode()}", "--python_root_dir ${project.rootDir}/sdks/python", "--python_version ${project.ext.pythonVersion}", From d2e34c464ea5b5309ce7854b1ffd1e3ca70f7e66 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 13 Feb 2026 11:49:42 -0500 Subject: [PATCH 4/4] Still run Go VR on Flink 1.20 --- sdks/go/test/build.gradle | 17 +++++++++++++++-- sdks/go/test/run_validatesrunner_tests.sh | 6 +++++- .../runners/portability/flink_runner_test.py | 5 ++++- 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/sdks/go/test/build.gradle b/sdks/go/test/build.gradle index 5576c40c0aab..424b009fd12f 100644 --- a/sdks/go/test/build.gradle +++ b/sdks/go/test/build.gradle @@ -79,18 +79,31 @@ task dataflowValidatesRunnerARM64() { task flinkValidatesRunner { group = "Verification" + // TODO(https://github.com/apache/beam/issues/37600) use project.ext.latestFlinkVersion after resolved + def flinkVersion = '1.20' + dependsOn ":sdks:go:test:goBuild" dependsOn ":sdks:go:container:docker" dependsOn ":sdks:java:container:${project.ext.currentJavaVersion}:docker" - dependsOn ":runners:flink:${project.ext.latestFlinkVersion}:job-server:shadowJar" + dependsOn ":runners:flink:${flinkVersion}:job-server:shadowJar" dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar" + doFirst { + // Copy Flink conf file + copy { + from "${project.rootDir}/runners/flink/2.0/src/test/resources/flink-test-config.yaml" + into "${project.buildDir}/flink-conf" + + // Rename the file during the copy process + rename 'flink-test-config.yaml', 'config.yaml' + } + } doLast { def pipelineOptions = [ // Pipeline options piped directly to Go SDK flags. "--expansion_jar=test:${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", ] def options = [ "--runner flink", - "--flink_job_server_jar ${project(":runners:flink:${project.ext.latestFlinkVersion}:job-server").shadowJar.archivePath}", + "--flink_job_server_jar ${project(":runners:flink:${flinkVersion}:job-server").shadowJar.archivePath}", "--pipeline_opts \"${pipelineOptions.join(' ')}\"", ] exec { diff --git a/sdks/go/test/run_validatesrunner_tests.sh b/sdks/go/test/run_validatesrunner_tests.sh index be7a795f01a5..972caef6a336 100755 --- a/sdks/go/test/run_validatesrunner_tests.sh +++ b/sdks/go/test/run_validatesrunner_tests.sh @@ -273,9 +273,13 @@ if [[ "$RUNNER" == "flink" || "$RUNNER" == "spark" || "$RUNNER" == "samza" || "$ echo "No endpoint specified; starting a new $RUNNER job server on $ENDPOINT" if [[ "$RUNNER" == "flink" ]]; then "$JAVA_CMD" \ + -Dslf4j.provider=org.slf4j.simple.SimpleServiceProvider \ + -Dorg.slf4j.simpleLogger.log.org.apache.flink.metrics=error \ + -Dorg.slf4j.simpleLogger.log.org.apache.flink.runtime=error \ + -Dorg.slf4j.simpleLogger.log.org.apache.flink.streaming=error \ -jar $FLINK_JOB_SERVER_JAR \ --flink-master [local] \ - --flink-conf-dir $CURRENT_DIRECTORY/../../../runners/flink/src/test/resources \ + --flink-conf-dir $CURRENT_DIRECTORY/build/flink-conf/ \ --job-port $JOB_PORT \ --expansion-port 0 \ --artifact-port 0 & diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py index d2cfb6575050..096645833ae5 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py @@ -182,7 +182,10 @@ def _subprocess_command(cls, job_port, expansion_port): '--add-opens=java.base/java.lang=ALL-UNNAMED', '--add-opens=java.base/java.nio=ALL-UNNAMED', '--add-opens=java.base/java.util=ALL-UNNAMED', - '-Dorg.slf4j.simpleLogger.defaultLogLevel=warn', + '-Dslf4j.provider=org.slf4j.simple.SimpleServiceProvider', + '-Dorg.slf4j.simpleLogger.log.org.apache.flink.metrics=error', + '-Dorg.slf4j.simpleLogger.log.org.apache.flink.runtime=error', + '-Dorg.slf4j.simpleLogger.log.org.apache.flink.streaming=error', '-jar', cls.flink_job_server_jar, '--flink-master',