diff --git a/.github/workflows/beam_PreCommit_Java_GCP_IO_Direct.yml b/.github/workflows/beam_PreCommit_Java_GCP_IO_Direct.yml index e3fa7afa2bdb..c97fa20f18d6 100644 --- a/.github/workflows/beam_PreCommit_Java_GCP_IO_Direct.yml +++ b/.github/workflows/beam_PreCommit_Java_GCP_IO_Direct.yml @@ -153,4 +153,4 @@ jobs: uses: codecov/codecov-action@v3 with: file: ${{ steps.jacoco_report_path.outputs.path }} - flags: java \ No newline at end of file + flags: java diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 774674b545ed..735016cdcc2b 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -613,8 +613,6 @@ class BeamModulePlugin implements Plugin { def google_ads_version = "33.0.0" def google_clients_version = "2.0.0" def google_cloud_bigdataoss_version = "2.2.26" - // [bomupgrader] determined by: com.google.cloud:google-cloud-spanner, consistent with: google_cloud_platform_libraries_bom - def google_cloud_spanner_version = "6.104.0" def google_code_gson_version = "2.10.1" def google_oauth_clients_version = "1.34.1" // [bomupgrader] determined by: io.grpc:grpc-netty, consistent with: google_cloud_platform_libraries_bom @@ -762,10 +760,7 @@ class BeamModulePlugin implements Plugin { // libraries-bom version on sdks/java/container/license_scripts/dep_urls_java.yaml google_cloud_platform_libraries_bom : "com.google.cloud:libraries-bom:26.75.0", google_cloud_secret_manager : "com.google.cloud:google-cloud-secretmanager", // google_cloud_platform_libraries_bom sets version - // TODO(#35868) remove pinned google_cloud_spanner_bom after tests or upstream fixed - google_cloud_spanner_bom : "com.google.cloud:google-cloud-spanner-bom:$google_cloud_spanner_version", google_cloud_spanner : "com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom sets version - google_cloud_spanner_test : "com.google.cloud:google-cloud-spanner:$google_cloud_spanner_version:tests", google_cloud_tink : "com.google.crypto.tink:tink:1.19.0", google_cloud_vertexai : "com.google.cloud:google-cloud-vertexai", // google_cloud_platform_libraries_bom sets version google_code_gson : "com.google.code.gson:gson:$google_code_gson_version", diff --git a/sdks/java/bom/gcp/build.gradle b/sdks/java/bom/gcp/build.gradle index 5b62243c8454..ccd3b5405eb6 100644 --- a/sdks/java/bom/gcp/build.gradle +++ b/sdks/java/bom/gcp/build.gradle @@ -20,17 +20,7 @@ apply from: '../common.gradle' dependencies { api platform(project(":sdks:java:bom")) - api platform(project.library.java.google_cloud_spanner_bom) - api platform(project.library.java.google_cloud_platform_libraries_bom) { - // TODO(https://github.com/apache/beam/issues/37328) remove exclude and google_cloud_spanner_bom after upstream and/or tests fixed - exclude group: "com.google.cloud", module: "google-cloud-spanner" - exclude group: "com.google.api.grpc", module: "proto-google-cloud-spanner-v1" - exclude group: "com.google.api.grpc", module: "proto-google-cloud-spanner-admin-instance-v1" - exclude group: "com.google.api.grpc", module: "proto-google-cloud-spanner-admin-database-v1" - exclude group: "com.google.api.grpc", module: "grpc-google-cloud-spanner-v1" - exclude group: "com.google.api.grpc", module: "grpc-google-cloud-spanner-admin-instance-v1" - exclude group: "com.google.api.grpc", module: "grpc-google-cloud-spanner-admin-database-v1" - } + api platform(project.library.java.google_cloud_platform_libraries_bom) constraints { api project.library.java.guava } @@ -42,4 +32,4 @@ publishing { artifactId = 'beam-sdks-java-google-cloud-platform-bom' } } -} \ No newline at end of file +} diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index 260740fbcf09..2686297c0001 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -31,17 +31,7 @@ description = "Apache Beam :: SDKs :: Java :: IO :: Google Cloud Platform" ext.summary = "IO library to read and write Google Cloud Platform systems from Beam." dependencies { - implementation(enforcedPlatform(library.java.google_cloud_platform_libraries_bom)) { - // TODO(https://github.com/apache/beam/issues/35868) remove exclude after upstream and/or tests fixed - exclude group: "com.google.cloud", module: "google-cloud-spanner" - exclude group: "com.google.api.grpc", module: "proto-google-cloud-spanner-v1" - exclude group: "com.google.api.grpc", module: "proto-google-cloud-spanner-admin-instance-v1" - exclude group: "com.google.api.grpc", module: "proto-google-cloud-spanner-admin-database-v1" - exclude group: "com.google.api.grpc", module: "grpc-google-cloud-spanner-v1" - exclude group: "com.google.api.grpc", module: "grpc-google-cloud-spanner-admin-instance-v1" - exclude group: "com.google.api.grpc", module: "grpc-google-cloud-spanner-admin-database-v1" - } - implementation(enforcedPlatform(library.java.google_cloud_spanner_bom)) + implementation(enforcedPlatform(library.java.google_cloud_platform_libraries_bom)) implementation project(path: ":model:pipeline", configuration: "shadow") implementation project(":runners:core-java") implementation project(path: ":sdks:java:core", configuration: "shadow") @@ -164,7 +154,7 @@ dependencies { testImplementation library.java.mockito_core testRuntimeOnly library.java.mockito_inline testImplementation library.java.joda_time - testImplementation library.java.google_cloud_spanner_test + testImplementation "com.google.cloud:google-cloud-spanner::tests" testImplementation library.java.google_cloud_bigtable_emulator testRuntimeOnly library.java.slf4j_jdk14 // everit_json is needed for Pubsub SchemaTransform that relies on JSON-schema translation. @@ -358,4 +348,4 @@ task postCommit { description = "Integration tests of GCP connectors using the DirectRunner." dependsOn integrationTest dependsOn integrationTestKms -} \ No newline at end of file +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java index 96ce735cad4a..b2d68154b4fe 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java @@ -30,6 +30,7 @@ import com.google.cloud.spanner.DatabaseAdminClient; import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.DatabaseId; +import com.google.cloud.spanner.SessionPoolOptions; import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerOptions; import com.google.cloud.spanner.v1.stub.SpannerStubSettings; @@ -38,6 +39,7 @@ import com.google.spanner.v1.ExecuteSqlRequest; import com.google.spanner.v1.PartialResultSet; import java.util.HashSet; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.beam.sdk.options.ValueProvider; @@ -61,6 +63,8 @@ public class SpannerAccessor implements AutoCloseable { */ private static final String USER_AGENT_PREFIX = "Apache_Beam_Java"; + static final java.time.Duration DEFAULT_SESSION_WAIT_DURATION = java.time.Duration.ofMinutes(0); + /** Instance ID to use when connecting to an experimental host. */ public static final String EXPERIMENTAL_HOST_INSTANCE_ID = "default"; @@ -113,6 +117,11 @@ public static SpannerAccessor getOrCreate(SpannerConfig spannerConfig) { static SpannerOptions buildSpannerOptions(SpannerConfig spannerConfig) { SpannerOptions.Builder builder = SpannerOptions.newBuilder(); + // TODO(https://github.com/apache/beam/issues/37451) Disable gRPC gcp extension which was + // causing the application thread to stall. + // Remove this once Spanner fixes the hanging issue + builder.disableGrpcGcpExtension(); + Set retryableCodes = new HashSet<>(); if (spannerConfig.getRetryableCodes() != null) { retryableCodes.addAll(spannerConfig.getRetryableCodes()); @@ -265,6 +274,15 @@ static SpannerOptions buildSpannerOptions(SpannerConfig spannerConfig) { builder.setCredentials(credentials.get()); } + // ValueProvider waitForSessionCreationDuration = + // spannerConfig.getWaitForSessionCreationDuration(); + // java.time.Duration waitDuration = + // Optional.ofNullable(waitForSessionCreationDuration) + // .map(ValueProvider::get) + // .orElse(DEFAULT_SESSION_WAIT_DURATION); + // builder.setSessionPoolOption( + // SessionPoolOptions.newBuilder().setWaitForMinSessionsDuration(waitDuration).build()); + return builder.build(); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java index f52b8378cb6a..a17c851f38a0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java @@ -104,6 +104,8 @@ public String getHostValue() { public abstract @Nullable ValueProvider getCredentials(); + public abstract @Nullable ValueProvider getWaitForSessionCreationDuration(); + abstract Builder toBuilder(); public static SpannerConfig create() { @@ -189,6 +191,9 @@ abstract Builder setExecuteStreamingSqlRetrySettings( abstract Builder setPlainText(ValueProvider plainText); + abstract Builder setWaitForSessionCreationDuration( + ValueProvider waitForSessionCreationDuration); + public abstract SpannerConfig build(); } @@ -389,4 +394,24 @@ public SpannerConfig withUsingPlainTextChannel(ValueProvider plainText) public SpannerConfig withUsingPlainTextChannel(boolean plainText) { return withUsingPlainTextChannel(ValueProvider.StaticValueProvider.of(plainText)); } + + /** + * Sets the wait time for a multiplexed session to be available when creating a database client. + * + *

Setting this will block the {@link com.google.cloud.spanner.DatabaseClient} creation. + * + * @param waitForSessionCreationDuration The duration to wait. Defaults to {@link + * SpannerAccessor#DEFAULT_SESSION_WAIT_DURATION}. + * @return {@link SpannerConfig} + */ + public SpannerConfig withWaitForSessionCreationDuration( + ValueProvider waitForSessionCreationDuration) { + return toBuilder().setWaitForSessionCreationDuration(waitForSessionCreationDuration).build(); + } + + public SpannerConfig withWaitForSessionCreationDuration( + java.time.Duration waitForSessionCreationDuration) { + return withWaitForSessionCreationDuration( + ValueProvider.StaticValueProvider.of(waitForSessionCreationDuration)); + } }