Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Go_VR_Flink.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1,
"modification": 2,
"https://github.com/apache/beam/pull/32440": "testing datastream optimizations",
"https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 support"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"modification": "#37313"
}
5 changes: 3 additions & 2 deletions .github/workflows/beam_PostCommit_Python_Portable_Flink.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ jobs:
job_name: ["beam_PostCommit_Python_Portable_Flink"]
job_phrase: ["Run Python Portable Flink"]
# TODO: Enable PROCESS https://github.com/apache/beam/issues/35702
# environment_type: ['DOCKER', 'LOOPBACK', 'PROCESS']
environment_type: ['DOCKER', 'LOOPBACK']
# all environment_type: ['DOCKER', 'LOOPBACK', 'PROCESS']
# Run modes not covered by PreCommit_Python_PVR_Flink (i.e. other than 'LOOPBACK')
environment_type: ['DOCKER']
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOOPBACK for both Flink 1.20 and Flink 2.0 already have coverage, dedup'd here

steps:
- uses: actions/checkout@v4
- name: Setup repository
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ jobs:
CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH}}
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :sdks:python:test-suites:portable:py${{steps.set_py_ver_clean.outputs.py_ver_clean}}:flinkValidatesRunner
gradle-command: :sdks:python:test-suites:portable:py${{steps.set_py_ver_clean.outputs.py_ver_clean}}:flink1ValidatesRunner
arguments: |
-PpythonVersion=${{ matrix.python_version }} \
- name: Archive Python Test Results
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/beam_PreCommit_Python_PVR_Flink.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ jobs:
env:
CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH}}
with:
# Run Flink 2 tests. Flink 1.20 is covered by PostCommit_Python_ValidatesRunner_Flink
gradle-command: :sdks:python:test-suites:portable:py313:flinkValidatesRunner
arguments: |
-PpythonVersion=3.13 \
Expand Down
3 changes: 1 addition & 2 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@

* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).
* Flink 2.0 support for Java Classic and Portable Flink Runners ([#36947](https://github.com/apache/beam/issues/36947)),
experimental support for other SDK languages including Python.
* Flink 2.0 support ([#36947](https://github.com/apache/beam/issues/36947)).


## I/Os
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,8 +552,7 @@ class BeamModulePlugin implements Plugin<Project> {
project.ext.currentJavaVersion = getSupportedJavaVersion()

project.ext.allFlinkVersions = project.flink_versions.split(',')
// TODO(https://github.com/apache/beam/issues/36947): Move to use project.ext.allFlinkVersions.last() when Flink 2 support completed
project.ext.latestFlinkVersion = '1.20'
project.ext.latestFlinkVersion = project.ext.allFlinkVersions.last()

project.ext.nativeArchitecture = {
// Best guess as to this system's normalized native architecture name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.apache.beam.sdk.values.WindowedValues.WindowedValueCoder;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashMultiset;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -987,8 +988,11 @@ private <T> void translateTestStream(
// stage
String sideInputTag = sideInputId.getLocalName();
String collectionId =
components
.getTransformsOrThrow(sideInputId.getTransformId())
MoreObjects.firstNonNull(
components.getTransformsOrDefault(sideInputId.getTransformId(), null),
// In the case of optimized pipeline, side input transform may not be found in
// component proto
stagePayload.getComponents().getTransformsOrThrow(sideInputId.getTransformId()))
.getInputsOrThrow(sideInputId.getLocalName());
RunnerApi.WindowingStrategy windowingStrategyProto =
components.getWindowingStrategiesOrThrow(
Expand Down Expand Up @@ -1045,8 +1049,11 @@ private TransformedSideInputs transformSideInputs(
tagToIntMapping.put(tag, count);
count++;
String collectionId =
components
.getTransformsOrThrow(sideInput.getKey().getTransformId())
MoreObjects.firstNonNull(
components.getTransformsOrDefault(sideInput.getKey().getTransformId(), null),
stagePayload
.getComponents()
.getTransformsOrThrow(sideInput.getKey().getTransformId()))
.getInputsOrThrow(sideInput.getKey().getLocalName());
DataStream<Object> sideInputStream = context.getDataStreamOrThrow(collectionId);
TypeInformation<Object> tpe = sideInputStream.getType();
Expand Down Expand Up @@ -1078,7 +1085,11 @@ private TransformedSideInputs transformSideInputs(
TupleTag<?> tag = sideInput.getValue().getTagInternal();
final int intTag = tagToIntMapping.get(tag);
RunnerApi.PTransform pTransform =
components.getTransformsOrThrow(sideInput.getKey().getTransformId());
MoreObjects.firstNonNull(
components.getTransformsOrDefault(sideInput.getKey().getTransformId(), null),
stagePayload
.getComponents()
.getTransformsOrThrow(sideInput.getKey().getTransformId()));
String collectionId = pTransform.getInputsOrThrow(sideInput.getKey().getLocalName());
DataStream<WindowedValue<?>> sideInputStream = context.getDataStreamOrThrow(collectionId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.apache.beam.sdk.values.WindowedValues.WindowedValueCoder;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashMultiset;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -986,8 +987,11 @@ private <T> void translateTestStream(
// stage
String sideInputTag = sideInputId.getLocalName();
String collectionId =
components
.getTransformsOrThrow(sideInputId.getTransformId())
MoreObjects.firstNonNull(
components.getTransformsOrDefault(sideInputId.getTransformId(), null),
// In the case of optimized pipeline, side input transform may not be found in
// component proto
stagePayload.getComponents().getTransformsOrThrow(sideInputId.getTransformId()))
.getInputsOrThrow(sideInputId.getLocalName());
RunnerApi.WindowingStrategy windowingStrategyProto =
components.getWindowingStrategiesOrThrow(
Expand Down Expand Up @@ -1044,8 +1048,11 @@ private TransformedSideInputs transformSideInputs(
tagToIntMapping.put(tag, count);
count++;
String collectionId =
components
.getTransformsOrThrow(sideInput.getKey().getTransformId())
MoreObjects.firstNonNull(
components.getTransformsOrDefault(sideInput.getKey().getTransformId(), null),
stagePayload
.getComponents()
.getTransformsOrThrow(sideInput.getKey().getTransformId()))
.getInputsOrThrow(sideInput.getKey().getLocalName());
DataStream<Object> sideInputStream = context.getDataStreamOrThrow(collectionId);
TypeInformation<Object> tpe = sideInputStream.getType();
Expand Down Expand Up @@ -1077,7 +1084,11 @@ private TransformedSideInputs transformSideInputs(
TupleTag<?> tag = sideInput.getValue().getTagInternal();
final int intTag = tagToIntMapping.get(tag);
RunnerApi.PTransform pTransform =
components.getTransformsOrThrow(sideInput.getKey().getTransformId());
MoreObjects.firstNonNull(
components.getTransformsOrDefault(sideInput.getKey().getTransformId(), null),
stagePayload
.getComponents()
.getTransformsOrThrow(sideInput.getKey().getTransformId()));
String collectionId = pTransform.getInputsOrThrow(sideInput.getKey().getLocalName());
DataStream<WindowedValue<?>> sideInputStream = context.getDataStreamOrThrow(collectionId);

Expand Down
17 changes: 15 additions & 2 deletions sdks/go/test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,31 @@ task dataflowValidatesRunnerARM64() {
task flinkValidatesRunner {
group = "Verification"

// TODO(https://github.com/apache/beam/issues/37600) use project.ext.latestFlinkVersion after resolved
def flinkVersion = '1.20'

dependsOn ":sdks:go:test:goBuild"
dependsOn ":sdks:go:container:docker"
dependsOn ":sdks:java:container:${project.ext.currentJavaVersion}:docker"
dependsOn ":runners:flink:${project.ext.latestFlinkVersion}:job-server:shadowJar"
dependsOn ":runners:flink:${flinkVersion}:job-server:shadowJar"
dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar"
doFirst {
// Copy Flink conf file
copy {
from "${project.rootDir}/runners/flink/2.0/src/test/resources/flink-test-config.yaml"
into "${project.buildDir}/flink-conf"

// Rename the file during the copy process
rename 'flink-test-config.yaml', 'config.yaml'
}
}
doLast {
def pipelineOptions = [ // Pipeline options piped directly to Go SDK flags.
"--expansion_jar=test:${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}",
]
def options = [
"--runner flink",
"--flink_job_server_jar ${project(":runners:flink:${project.ext.latestFlinkVersion}:job-server").shadowJar.archivePath}",
"--flink_job_server_jar ${project(":runners:flink:${flinkVersion}:job-server").shadowJar.archivePath}",
"--pipeline_opts \"${pipelineOptions.join(' ')}\"",
]
exec {
Expand Down
6 changes: 5 additions & 1 deletion sdks/go/test/run_validatesrunner_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,13 @@ if [[ "$RUNNER" == "flink" || "$RUNNER" == "spark" || "$RUNNER" == "samza" || "$
echo "No endpoint specified; starting a new $RUNNER job server on $ENDPOINT"
if [[ "$RUNNER" == "flink" ]]; then
"$JAVA_CMD" \
-Dslf4j.provider=org.slf4j.simple.SimpleServiceProvider \
-Dorg.slf4j.simpleLogger.log.org.apache.flink.metrics=error \
-Dorg.slf4j.simpleLogger.log.org.apache.flink.runtime=error \
-Dorg.slf4j.simpleLogger.log.org.apache.flink.streaming=error \
-jar $FLINK_JOB_SERVER_JAR \
--flink-master [local] \
--flink-conf-dir $CURRENT_DIRECTORY/../../../runners/flink/src/test/resources \
--flink-conf-dir $CURRENT_DIRECTORY/build/flink-conf/ \
--job-port $JOB_PORT \
--expansion-port 0 \
--artifact-port 0 &
Expand Down
1 change: 0 additions & 1 deletion sdks/java/extensions/sql/expansion-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,4 @@ shadowJar {
manifest {
attributes(["Multi-Release": true])
}
outputs.upToDateWhen { false }
Copy link
Contributor Author

@Abacn Abacn Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a workaround leftover for corrupted remote Gradle cache, could cause elevated compile time.

}
29 changes: 24 additions & 5 deletions sdks/python/apache_beam/runners/portability/flink_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import argparse
import logging
import platform
import shlex
import typing
import unittest
Expand Down Expand Up @@ -139,6 +140,7 @@ def _create_conf_dir(cls):
cls.test_metrics_path = path.join(cls.conf_dir, 'test-metrics.txt')

# path to write Flink configuration to
# Flink 1.x conf:
conf_path = path.join(cls.conf_dir, 'flink-conf.yaml')
file_reporter = 'org.apache.beam.runners.flink.metrics.FileReporter'
with open(conf_path, 'w') as f:
Expand All @@ -149,6 +151,19 @@ def _create_conf_dir(cls):
'metrics.reporter.file.path: %s' % cls.test_metrics_path,
'metrics.scope.operator: <operator_name>',
]))
# Flink 2.x conf:
conf_path_2 = path.join(cls.conf_dir, 'config.yaml')
with open(conf_path_2, 'w') as f:
f.write(
'''metrics:
reporters: file
reporter:
file:
class: %s
path: %s
scope:
operator: <operator_name>
''' % (file_reporter, cls.test_metrics_path))

@classmethod
def _subprocess_command(cls, job_port, expansion_port):
Expand All @@ -158,15 +173,19 @@ def _subprocess_command(cls, job_port, expansion_port):

cls._create_conf_dir()
cls.expansion_port = expansion_port

platform_specific_opts = []
if platform.system() == 'Linux':
# UseContainerSupport is supported in Linux and turned on by default
platform_specific_opts.append('-XX:-UseContainerSupport')
try:
return [
'java',
'-XX:-UseContainerSupport',
return ['java'] + platform_specific_opts + [
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix flink_runner_test on MacOS (and persumably windows)

'--add-opens=java.base/java.lang=ALL-UNNAMED',
'--add-opens=java.base/java.nio=ALL-UNNAMED',
'--add-opens=java.base/java.util=ALL-UNNAMED',
'-Dorg.slf4j.simpleLogger.defaultLogLevel=warn',
'-Dslf4j.provider=org.slf4j.simple.SimpleServiceProvider',
'-Dorg.slf4j.simpleLogger.log.org.apache.flink.metrics=error',
'-Dorg.slf4j.simpleLogger.log.org.apache.flink.runtime=error',
'-Dorg.slf4j.simpleLogger.log.org.apache.flink.streaming=error',
'-jar',
cls.flink_job_server_jar,
'--flink-master',
Expand Down
Loading
Loading