diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java index a3777099bfcb..eb1860902c6f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java @@ -30,6 +30,7 @@ import com.google.cloud.spanner.DatabaseAdminClient; import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.DatabaseId; +import com.google.cloud.spanner.SessionPoolOptions; import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerOptions; import com.google.cloud.spanner.v1.stub.SpannerStubSettings; @@ -38,6 +39,7 @@ import com.google.spanner.v1.ExecuteSqlRequest; import com.google.spanner.v1.PartialResultSet; import java.util.HashSet; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.beam.sdk.options.ValueProvider; @@ -61,6 +63,9 @@ public class SpannerAccessor implements AutoCloseable { */ private static final String USER_AGENT_PREFIX = "Apache_Beam_Java"; + // Default wait time for session creation + static final java.time.Duration DEFAULT_SESSION_WAIT_DURATION = java.time.Duration.ofMinutes(5); + /** Instance ID to use when connecting to an experimental host. */ public static final String EXPERIMENTAL_HOST_INSTANCE_ID = "default"; @@ -270,6 +275,15 @@ static SpannerOptions buildSpannerOptions(SpannerConfig spannerConfig) { builder.setCredentials(credentials.get()); } + ValueProvider waitForSessionCreationDuration = + spannerConfig.getWaitForSessionCreationDuration(); + java.time.Duration waitDuration = + Optional.ofNullable(waitForSessionCreationDuration) + .map(ValueProvider::get) + .orElse(DEFAULT_SESSION_WAIT_DURATION); + builder.setSessionPoolOption( + SessionPoolOptions.newBuilder().setWaitForMinSessionsDuration(waitDuration).build()); + return builder.build(); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java index f52b8378cb6a..a17c851f38a0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java @@ -104,6 +104,8 @@ public String getHostValue() { public abstract @Nullable ValueProvider getCredentials(); + public abstract @Nullable ValueProvider getWaitForSessionCreationDuration(); + abstract Builder toBuilder(); public static SpannerConfig create() { @@ -189,6 +191,9 @@ abstract Builder setExecuteStreamingSqlRetrySettings( abstract Builder setPlainText(ValueProvider plainText); + abstract Builder setWaitForSessionCreationDuration( + ValueProvider waitForSessionCreationDuration); + public abstract SpannerConfig build(); } @@ -389,4 +394,24 @@ public SpannerConfig withUsingPlainTextChannel(ValueProvider plainText) public SpannerConfig withUsingPlainTextChannel(boolean plainText) { return withUsingPlainTextChannel(ValueProvider.StaticValueProvider.of(plainText)); } + + /** + * Sets the wait time for a multiplexed session to be available when creating a database client. + * + *

Setting this will block the {@link com.google.cloud.spanner.DatabaseClient} creation. + * + * @param waitForSessionCreationDuration The duration to wait. Defaults to {@link + * SpannerAccessor#DEFAULT_SESSION_WAIT_DURATION}. + * @return {@link SpannerConfig} + */ + public SpannerConfig withWaitForSessionCreationDuration( + ValueProvider waitForSessionCreationDuration) { + return toBuilder().setWaitForSessionCreationDuration(waitForSessionCreationDuration).build(); + } + + public SpannerConfig withWaitForSessionCreationDuration( + java.time.Duration waitForSessionCreationDuration) { + return withWaitForSessionCreationDuration( + ValueProvider.StaticValueProvider.of(waitForSessionCreationDuration)); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java index 512fd5998ddb..34c839d3e1e6 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java @@ -26,6 +26,7 @@ import com.google.cloud.spanner.DatabaseId; import com.google.cloud.spanner.Dialect; import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.SessionPoolOptions; import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerOptions; import com.google.cloud.spanner.Struct; @@ -118,6 +119,10 @@ public void setUp() throws Exception { SpannerOptions.newBuilder() .setProjectId(project) .disableGrpcGcpExtension() + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setWaitForMinSessionsDuration(java.time.Duration.ofMinutes(5)) + .build()) .build() .getService(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java index 91fe3473be03..df23435d82ab 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java @@ -28,6 +28,7 @@ import com.google.cloud.spanner.Dialect; import com.google.cloud.spanner.Mutation; import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.SessionPoolOptions; import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerOptions; import com.google.cloud.spanner.Statement; @@ -125,6 +126,10 @@ public void setUp() throws Exception { SpannerOptions.newBuilder() .setProjectId(project) .disableGrpcGcpExtension() + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setWaitForMinSessionsDuration(java.time.Duration.ofMinutes(5)) + .build()) .build() .getService(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/IntegrationTestEnv.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/IntegrationTestEnv.java index 581b7f3cc2f5..b0ce19b9f003 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/IntegrationTestEnv.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/IntegrationTestEnv.java @@ -21,6 +21,7 @@ import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.DatabaseId; import com.google.cloud.spanner.Dialect; +import com.google.cloud.spanner.SessionPoolOptions; import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerOptions; import java.util.ArrayList; @@ -81,6 +82,10 @@ protected void before() throws Throwable { .setProjectId(projectId) .setHost(host) .disableGrpcGcpExtension() + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setWaitForMinSessionsDuration(java.time.Duration.ofMinutes(5)) + .build()) .build() .getService(); databaseAdminClient = spanner.getDatabaseAdminClient(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.java index 21f6eef79362..5a05c9a5452f 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.java @@ -106,7 +106,7 @@ public void testTransactionBoundaries() { // Commit a initial transaction to get the timestamp to start reading from. List mutations = new ArrayList<>(); mutations.add(insertRecordMutation(0, "FirstName0", "LastName0")); - final long timeIncrementInSeconds = 2; + final long timeIncrementInSeconds = 10; final Timestamp startTimestamp = databaseClient.write(mutations); writeTransactionsToDatabase(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyGloballyIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyGloballyIT.java index 2b2c134032b3..c5fdbe80c7a2 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyGloballyIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyGloballyIT.java @@ -99,7 +99,7 @@ public void testOrderedWithinKey() { .withDatabaseId(databaseId); // Get the time increment interval at which to flush data changes ordered by key. - final long timeIncrementInSeconds = 2; + final long timeIncrementInSeconds = 10; // Commit a initial transaction to get the timestamp to start reading from. List mutations = new ArrayList<>();