From 2325d119e7a6177cba8139be6f77f1b961bd6cff Mon Sep 17 00:00:00 2001 From: Sakthivel Subramanian Date: Fri, 30 Jan 2026 02:00:18 +0530 Subject: [PATCH 1/6] Update spanner version and fix hanging issue --- .../beam/gradle/BeamModulePlugin.groovy | 5 ---- sdks/java/bom/gcp/build.gradle | 14 ++--------- .../io/google-cloud-platform/build.gradle | 16 +++--------- .../sdk/io/gcp/spanner/SpannerAccessor.java | 18 +++++++++++++ .../sdk/io/gcp/spanner/SpannerConfig.java | 25 +++++++++++++++++++ 5 files changed, 48 insertions(+), 30 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 774674b545ed..735016cdcc2b 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -613,8 +613,6 @@ class BeamModulePlugin implements Plugin { def google_ads_version = "33.0.0" def google_clients_version = "2.0.0" def google_cloud_bigdataoss_version = "2.2.26" - // [bomupgrader] determined by: com.google.cloud:google-cloud-spanner, consistent with: google_cloud_platform_libraries_bom - def google_cloud_spanner_version = "6.104.0" def google_code_gson_version = "2.10.1" def google_oauth_clients_version = "1.34.1" // [bomupgrader] determined by: io.grpc:grpc-netty, consistent with: google_cloud_platform_libraries_bom @@ -762,10 +760,7 @@ class BeamModulePlugin implements Plugin { // libraries-bom version on sdks/java/container/license_scripts/dep_urls_java.yaml google_cloud_platform_libraries_bom : "com.google.cloud:libraries-bom:26.75.0", google_cloud_secret_manager : "com.google.cloud:google-cloud-secretmanager", // google_cloud_platform_libraries_bom sets version - // TODO(#35868) remove pinned google_cloud_spanner_bom after tests or upstream fixed - google_cloud_spanner_bom : "com.google.cloud:google-cloud-spanner-bom:$google_cloud_spanner_version", google_cloud_spanner : "com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom sets version - google_cloud_spanner_test : "com.google.cloud:google-cloud-spanner:$google_cloud_spanner_version:tests", google_cloud_tink : "com.google.crypto.tink:tink:1.19.0", google_cloud_vertexai : "com.google.cloud:google-cloud-vertexai", // google_cloud_platform_libraries_bom sets version google_code_gson : "com.google.code.gson:gson:$google_code_gson_version", diff --git a/sdks/java/bom/gcp/build.gradle b/sdks/java/bom/gcp/build.gradle index 5b62243c8454..ccd3b5405eb6 100644 --- a/sdks/java/bom/gcp/build.gradle +++ b/sdks/java/bom/gcp/build.gradle @@ -20,17 +20,7 @@ apply from: '../common.gradle' dependencies { api platform(project(":sdks:java:bom")) - api platform(project.library.java.google_cloud_spanner_bom) - api platform(project.library.java.google_cloud_platform_libraries_bom) { - // TODO(https://github.com/apache/beam/issues/37328) remove exclude and google_cloud_spanner_bom after upstream and/or tests fixed - exclude group: "com.google.cloud", module: "google-cloud-spanner" - exclude group: "com.google.api.grpc", module: "proto-google-cloud-spanner-v1" - exclude group: "com.google.api.grpc", module: "proto-google-cloud-spanner-admin-instance-v1" - exclude group: "com.google.api.grpc", module: "proto-google-cloud-spanner-admin-database-v1" - exclude group: "com.google.api.grpc", module: "grpc-google-cloud-spanner-v1" - exclude group: "com.google.api.grpc", module: "grpc-google-cloud-spanner-admin-instance-v1" - exclude group: "com.google.api.grpc", module: "grpc-google-cloud-spanner-admin-database-v1" - } + api platform(project.library.java.google_cloud_platform_libraries_bom) constraints { api project.library.java.guava } @@ -42,4 +32,4 @@ publishing { artifactId = 'beam-sdks-java-google-cloud-platform-bom' } } -} \ No newline at end of file +} diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index 260740fbcf09..2686297c0001 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -31,17 +31,7 @@ description = "Apache Beam :: SDKs :: Java :: IO :: Google Cloud Platform" ext.summary = "IO library to read and write Google Cloud Platform systems from Beam." dependencies { - implementation(enforcedPlatform(library.java.google_cloud_platform_libraries_bom)) { - // TODO(https://github.com/apache/beam/issues/35868) remove exclude after upstream and/or tests fixed - exclude group: "com.google.cloud", module: "google-cloud-spanner" - exclude group: "com.google.api.grpc", module: "proto-google-cloud-spanner-v1" - exclude group: "com.google.api.grpc", module: "proto-google-cloud-spanner-admin-instance-v1" - exclude group: "com.google.api.grpc", module: "proto-google-cloud-spanner-admin-database-v1" - exclude group: "com.google.api.grpc", module: "grpc-google-cloud-spanner-v1" - exclude group: "com.google.api.grpc", module: "grpc-google-cloud-spanner-admin-instance-v1" - exclude group: "com.google.api.grpc", module: "grpc-google-cloud-spanner-admin-database-v1" - } - implementation(enforcedPlatform(library.java.google_cloud_spanner_bom)) + implementation(enforcedPlatform(library.java.google_cloud_platform_libraries_bom)) implementation project(path: ":model:pipeline", configuration: "shadow") implementation project(":runners:core-java") implementation project(path: ":sdks:java:core", configuration: "shadow") @@ -164,7 +154,7 @@ dependencies { testImplementation library.java.mockito_core testRuntimeOnly library.java.mockito_inline testImplementation library.java.joda_time - testImplementation library.java.google_cloud_spanner_test + testImplementation "com.google.cloud:google-cloud-spanner::tests" testImplementation library.java.google_cloud_bigtable_emulator testRuntimeOnly library.java.slf4j_jdk14 // everit_json is needed for Pubsub SchemaTransform that relies on JSON-schema translation. @@ -358,4 +348,4 @@ task postCommit { description = "Integration tests of GCP connectors using the DirectRunner." dependsOn integrationTest dependsOn integrationTestKms -} \ No newline at end of file +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java index 96ce735cad4a..51a5d616cb6c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java @@ -30,6 +30,7 @@ import com.google.cloud.spanner.DatabaseAdminClient; import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.DatabaseId; +import com.google.cloud.spanner.SessionPoolOptions; import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerOptions; import com.google.cloud.spanner.v1.stub.SpannerStubSettings; @@ -38,6 +39,7 @@ import com.google.spanner.v1.ExecuteSqlRequest; import com.google.spanner.v1.PartialResultSet; import java.util.HashSet; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.beam.sdk.options.ValueProvider; @@ -61,6 +63,8 @@ public class SpannerAccessor implements AutoCloseable { */ private static final String USER_AGENT_PREFIX = "Apache_Beam_Java"; + static final java.time.Duration DEFAULT_SESSION_WAIT_DURATION = java.time.Duration.ofMinutes(5); + /** Instance ID to use when connecting to an experimental host. */ public static final String EXPERIMENTAL_HOST_INSTANCE_ID = "default"; @@ -113,6 +117,11 @@ public static SpannerAccessor getOrCreate(SpannerConfig spannerConfig) { static SpannerOptions buildSpannerOptions(SpannerConfig spannerConfig) { SpannerOptions.Builder builder = SpannerOptions.newBuilder(); + // TODO(https://github.com/apache/beam/issues/37451) Disable gRPC gcp extension which was + // causing the application thread to stall. + // Remove this once Spanner fixes the hanging issue + builder.disableGrpcGcpExtension(); + Set retryableCodes = new HashSet<>(); if (spannerConfig.getRetryableCodes() != null) { retryableCodes.addAll(spannerConfig.getRetryableCodes()); @@ -265,6 +274,15 @@ static SpannerOptions buildSpannerOptions(SpannerConfig spannerConfig) { builder.setCredentials(credentials.get()); } + ValueProvider waitForSessionCreationDuration = + spannerConfig.getWaitForSessionCreationDuration(); + java.time.Duration waitDuration = + Optional.ofNullable(waitForSessionCreationDuration) + .map(ValueProvider::get) + .orElse(DEFAULT_SESSION_WAIT_DURATION); + builder.setSessionPoolOption( + SessionPoolOptions.newBuilder().setWaitForMinSessionsDuration(waitDuration).build()); + return builder.build(); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java index f52b8378cb6a..a17c851f38a0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java @@ -104,6 +104,8 @@ public String getHostValue() { public abstract @Nullable ValueProvider getCredentials(); + public abstract @Nullable ValueProvider getWaitForSessionCreationDuration(); + abstract Builder toBuilder(); public static SpannerConfig create() { @@ -189,6 +191,9 @@ abstract Builder setExecuteStreamingSqlRetrySettings( abstract Builder setPlainText(ValueProvider plainText); + abstract Builder setWaitForSessionCreationDuration( + ValueProvider waitForSessionCreationDuration); + public abstract SpannerConfig build(); } @@ -389,4 +394,24 @@ public SpannerConfig withUsingPlainTextChannel(ValueProvider plainText) public SpannerConfig withUsingPlainTextChannel(boolean plainText) { return withUsingPlainTextChannel(ValueProvider.StaticValueProvider.of(plainText)); } + + /** + * Sets the wait time for a multiplexed session to be available when creating a database client. + * + *

Setting this will block the {@link com.google.cloud.spanner.DatabaseClient} creation. + * + * @param waitForSessionCreationDuration The duration to wait. Defaults to {@link + * SpannerAccessor#DEFAULT_SESSION_WAIT_DURATION}. + * @return {@link SpannerConfig} + */ + public SpannerConfig withWaitForSessionCreationDuration( + ValueProvider waitForSessionCreationDuration) { + return toBuilder().setWaitForSessionCreationDuration(waitForSessionCreationDuration).build(); + } + + public SpannerConfig withWaitForSessionCreationDuration( + java.time.Duration waitForSessionCreationDuration) { + return withWaitForSessionCreationDuration( + ValueProvider.StaticValueProvider.of(waitForSessionCreationDuration)); + } } From 45e1d86dbb4062eac43018324243c92d5321889f Mon Sep 17 00:00:00 2001 From: Sakthivel Subramanian Date: Thu, 5 Feb 2026 13:34:13 +0530 Subject: [PATCH 2/6] add info logs --- .github/workflows/beam_PreCommit_Java_GCP_IO_Direct.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/beam_PreCommit_Java_GCP_IO_Direct.yml b/.github/workflows/beam_PreCommit_Java_GCP_IO_Direct.yml index e3fa7afa2bdb..1a090f272618 100644 --- a/.github/workflows/beam_PreCommit_Java_GCP_IO_Direct.yml +++ b/.github/workflows/beam_PreCommit_Java_GCP_IO_Direct.yml @@ -111,6 +111,7 @@ jobs: :sdks:java:io:google-cloud-platform:expansion-service:build \ :sdks:java:io:google-cloud-platform:postCommit \ arguments: | + --info \ -PdisableSpotlessCheck=true \ -PdisableCheckStyle=true \ -PenableJacocoReport \ @@ -153,4 +154,4 @@ jobs: uses: codecov/codecov-action@v3 with: file: ${{ steps.jacoco_report_path.outputs.path }} - flags: java \ No newline at end of file + flags: java From c4c9f69b4ddf3b85f58afe240e43c20396757585 Mon Sep 17 00:00:00 2001 From: Sakthivel Subramanian Date: Mon, 9 Feb 2026 14:42:53 +0530 Subject: [PATCH 3/6] disable retry --- .../beam/sdk/io/gcp/spanner/SpannerAccessor.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java index 51a5d616cb6c..b6571dcf827c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java @@ -274,14 +274,14 @@ static SpannerOptions buildSpannerOptions(SpannerConfig spannerConfig) { builder.setCredentials(credentials.get()); } - ValueProvider waitForSessionCreationDuration = - spannerConfig.getWaitForSessionCreationDuration(); - java.time.Duration waitDuration = - Optional.ofNullable(waitForSessionCreationDuration) - .map(ValueProvider::get) - .orElse(DEFAULT_SESSION_WAIT_DURATION); - builder.setSessionPoolOption( - SessionPoolOptions.newBuilder().setWaitForMinSessionsDuration(waitDuration).build()); + // ValueProvider waitForSessionCreationDuration = + // spannerConfig.getWaitForSessionCreationDuration(); + // java.time.Duration waitDuration = + // Optional.ofNullable(waitForSessionCreationDuration) + // .map(ValueProvider::get) + // .orElse(DEFAULT_SESSION_WAIT_DURATION); + // builder.setSessionPoolOption( + // SessionPoolOptions.newBuilder().setWaitForMinSessionsDuration(waitDuration).build()); return builder.build(); } From 7e13513c4dbdf858968ee792ff9bbbd8f5974c7b Mon Sep 17 00:00:00 2001 From: Sakthivel Subramanian Date: Mon, 9 Feb 2026 16:45:24 +0530 Subject: [PATCH 4/6] Set default session wait time to 0 --- .../sdk/io/gcp/spanner/SpannerAccessor.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java index b6571dcf827c..41f8e87c1bd2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java @@ -63,7 +63,7 @@ public class SpannerAccessor implements AutoCloseable { */ private static final String USER_AGENT_PREFIX = "Apache_Beam_Java"; - static final java.time.Duration DEFAULT_SESSION_WAIT_DURATION = java.time.Duration.ofMinutes(5); + static final java.time.Duration DEFAULT_SESSION_WAIT_DURATION = java.time.Duration.ofMinutes(0); /** Instance ID to use when connecting to an experimental host. */ public static final String EXPERIMENTAL_HOST_INSTANCE_ID = "default"; @@ -274,14 +274,14 @@ static SpannerOptions buildSpannerOptions(SpannerConfig spannerConfig) { builder.setCredentials(credentials.get()); } - // ValueProvider waitForSessionCreationDuration = - // spannerConfig.getWaitForSessionCreationDuration(); - // java.time.Duration waitDuration = - // Optional.ofNullable(waitForSessionCreationDuration) - // .map(ValueProvider::get) - // .orElse(DEFAULT_SESSION_WAIT_DURATION); - // builder.setSessionPoolOption( - // SessionPoolOptions.newBuilder().setWaitForMinSessionsDuration(waitDuration).build()); + ValueProvider waitForSessionCreationDuration = + spannerConfig.getWaitForSessionCreationDuration(); + java.time.Duration waitDuration = + Optional.ofNullable(waitForSessionCreationDuration) + .map(ValueProvider::get) + .orElse(DEFAULT_SESSION_WAIT_DURATION); + builder.setSessionPoolOption( + SessionPoolOptions.newBuilder().setWaitForMinSessionsDuration(waitDuration).build()); return builder.build(); } From d116466294f44251dc88b7bd09fb9c988b534ea7 Mon Sep 17 00:00:00 2001 From: Sakthivel Subramanian Date: Mon, 9 Feb 2026 16:46:17 +0530 Subject: [PATCH 5/6] Remove info in the workflow --- .github/workflows/beam_PreCommit_Java_GCP_IO_Direct.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/beam_PreCommit_Java_GCP_IO_Direct.yml b/.github/workflows/beam_PreCommit_Java_GCP_IO_Direct.yml index 1a090f272618..c97fa20f18d6 100644 --- a/.github/workflows/beam_PreCommit_Java_GCP_IO_Direct.yml +++ b/.github/workflows/beam_PreCommit_Java_GCP_IO_Direct.yml @@ -111,7 +111,6 @@ jobs: :sdks:java:io:google-cloud-platform:expansion-service:build \ :sdks:java:io:google-cloud-platform:postCommit \ arguments: | - --info \ -PdisableSpotlessCheck=true \ -PdisableCheckStyle=true \ -PenableJacocoReport \ From d4759e163081bc23912ef92859f42146f035aef3 Mon Sep 17 00:00:00 2001 From: Sakthivel Subramanian Date: Mon, 9 Feb 2026 17:51:19 +0530 Subject: [PATCH 6/6] comment out setting session pool option --- .../beam/sdk/io/gcp/spanner/SpannerAccessor.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java index 41f8e87c1bd2..b2d68154b4fe 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java @@ -274,14 +274,14 @@ static SpannerOptions buildSpannerOptions(SpannerConfig spannerConfig) { builder.setCredentials(credentials.get()); } - ValueProvider waitForSessionCreationDuration = - spannerConfig.getWaitForSessionCreationDuration(); - java.time.Duration waitDuration = - Optional.ofNullable(waitForSessionCreationDuration) - .map(ValueProvider::get) - .orElse(DEFAULT_SESSION_WAIT_DURATION); - builder.setSessionPoolOption( - SessionPoolOptions.newBuilder().setWaitForMinSessionsDuration(waitDuration).build()); + // ValueProvider waitForSessionCreationDuration = + // spannerConfig.getWaitForSessionCreationDuration(); + // java.time.Duration waitDuration = + // Optional.ofNullable(waitForSessionCreationDuration) + // .map(ValueProvider::get) + // .orElse(DEFAULT_SESSION_WAIT_DURATION); + // builder.setSessionPoolOption( + // SessionPoolOptions.newBuilder().setWaitForMinSessionsDuration(waitDuration).build()); return builder.build(); }