From 26ddd73bee7bf675675b6a1a1b2956fca8dc4270 Mon Sep 17 00:00:00 2001 From: Sakthivel Subramanian Date: Tue, 6 Jan 2026 13:36:39 +0530 Subject: [PATCH 1/2] fix(spanner): Retry creation of multiplexed session --- .../MultiplexedSessionDatabaseClient.java | 77 +++-- .../cloud/spanner/MockSpannerServiceImpl.java | 8 +- ...edSessionDatabaseClientMockServerTest.java | 265 ++++++++++++++++++ 3 files changed, 328 insertions(+), 22 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java index 11e83add517..da1b6d6eda0 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java @@ -40,6 +40,7 @@ import java.time.Duration; import java.time.Instant; import java.util.BitSet; +import java.util.EnumSet; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -262,6 +263,9 @@ public void close() { */ private static final Map CHANNEL_USAGE = new HashMap<>(); + private static final EnumSet RETRYABLE_ERROR_CODES = + EnumSet.of(ErrorCode.DEADLINE_EXCEEDED, ErrorCode.RESOURCE_EXHAUSTED, ErrorCode.UNAVAILABLE); + private final BitSet channelUsage; private final int numChannels; @@ -358,11 +362,19 @@ public void close() { SettableApiFuture.create(); this.readWriteBeginTransactionReferenceFuture = SettableApiFuture.create(); this.multiplexedSessionReference = new AtomicReference<>(initialSessionReferenceFuture); + asyncCreateMultiplexedSession(initialSessionReferenceFuture); + maybeWaitForSessionCreation( + sessionClient.getSpanner().getOptions().getSessionPoolOptions(), + initialSessionReferenceFuture); + } + + private void asyncCreateMultiplexedSession( + SettableApiFuture sessionReferenceFuture) { this.sessionClient.asyncCreateMultiplexedSession( new SessionConsumer() { @Override public void onSessionReady(SessionImpl session) { - initialSessionReferenceFuture.set(session.getSessionReference()); + sessionReferenceFuture.set(session.getSessionReference()); // only start the maintainer if we actually managed to create a session in the first // place. maintainer.start(); @@ -395,33 +407,62 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount // Mark multiplexes sessions as unimplemented and fall back to regular sessions if // UNIMPLEMENTED is returned. maybeMarkUnimplemented(t); - initialSessionReferenceFuture.setException(t); + sessionReferenceFuture.setException(t); } }); - maybeWaitForSessionCreation( - sessionClient.getSpanner().getOptions().getSessionPoolOptions(), - initialSessionReferenceFuture); } void setPool(SessionPool pool) { this.pool = pool; } - private static void maybeWaitForSessionCreation( - SessionPoolOptions sessionPoolOptions, ApiFuture future) { + private void maybeWaitForSessionCreation( + SessionPoolOptions sessionPoolOptions, + SettableApiFuture initialSessionReferenceFuture) { Duration waitDuration = sessionPoolOptions.getWaitForMinSessions(); if (waitDuration != null && !waitDuration.isZero()) { - long timeoutMillis = waitDuration.toMillis(); - try { - future.get(timeoutMillis, TimeUnit.MILLISECONDS); - } catch (ExecutionException executionException) { - throw SpannerExceptionFactory.asSpannerException(executionException.getCause()); - } catch (InterruptedException interruptedException) { - throw SpannerExceptionFactory.propagateInterrupt(interruptedException); - } catch (TimeoutException timeoutException) { - throw SpannerExceptionFactory.newSpannerException( - ErrorCode.DEADLINE_EXCEEDED, - "Timed out after waiting " + timeoutMillis + "ms for multiplexed session creation"); + + SpannerException lastException = null; + SettableApiFuture sessionReferenceFuture = initialSessionReferenceFuture; + Duration remainingTime; + + Instant endTime = Instant.now().plus(waitDuration); + while ((remainingTime = Duration.between(Instant.now(), endTime)).toMillis() > 0) { + // If any exception is thrown, then retry the multiplexed session creation + if (sessionReferenceFuture == null) { + sessionReferenceFuture = SettableApiFuture.create(); + asyncCreateMultiplexedSession(sessionReferenceFuture); + this.multiplexedSessionReference.set(sessionReferenceFuture); + } + try { + sessionReferenceFuture.get(remainingTime.toMillis(), TimeUnit.MILLISECONDS); + lastException = null; + break; + } catch (ExecutionException executionException) { + lastException = SpannerExceptionFactory.asSpannerException(executionException.getCause()); + } catch (InterruptedException interruptedException) { + lastException = SpannerExceptionFactory.propagateInterrupt(interruptedException); + } catch (TimeoutException timeoutException) { + lastException = + SpannerExceptionFactory.newSpannerException( + ErrorCode.DEADLINE_EXCEEDED, + "Timed out after waiting " + + waitDuration.toMillis() + + "ms for multiplexed session creation"); + } + // if any exception is thrown, then set the session reference to null to retry the + // multiplexed session creation only if the error code is DEADLINE EXCEEDED, UNAVAILABLE or + // RESOURCE_EXHAUSTED + if (RETRYABLE_ERROR_CODES.contains(lastException.getErrorCode())) { + sessionReferenceFuture = null; + } else { + break; + } + } + // if the wait time elapsed and multiplexed session fetch failed then throw the last exception + // that we have received + if (lastException != null) { + throw lastException; } } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index a47aecdccc4..d495a6ff63c 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -542,16 +542,16 @@ void simulateExecutionTime( boolean stickyGlobalExceptions, CountDownLatch freezeLock) { Uninterruptibles.awaitUninterruptibly(freezeLock); - checkException(globalExceptions, stickyGlobalExceptions); - if (streamIndices.isEmpty()) { - checkException(this.exceptions, stickyException); - } if (minimumExecutionTime > 0 || randomExecutionTime > 0) { Uninterruptibles.sleepUninterruptibly( (randomExecutionTime == 0 ? 0 : RANDOM.nextInt(randomExecutionTime)) + minimumExecutionTime, TimeUnit.MILLISECONDS); } + checkException(globalExceptions, stickyGlobalExceptions); + if (streamIndices.isEmpty()) { + checkException(this.exceptions, stickyException); + } } private static void checkException(Queue exceptions, boolean keepException) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index 0448656475a..cc60b646132 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -210,6 +210,271 @@ public void testMaintainerMaintainsMultipleClients() { } } + @Test + public void testRetryWithTheSessionCreationWaitTime() { + mockSpanner.setCreateSessionExecutionTime( + SimulatedExecutionTime.ofExceptions( + Arrays.asList( + Status.DEADLINE_EXCEEDED + .withDescription( + "CallOptions deadline exceeded after 22.986872393s. " + + "Name resolution delay 6.911918521 seconds. [closed=[], " + + "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]") + .asRuntimeException(), + Status.DEADLINE_EXCEEDED + .withDescription( + "CallOptions deadline exceeded after 22.986872393s. " + + "Name resolution delay 6.911918521 seconds. [closed=[], " + + "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]") + .asRuntimeException()))); + + Spanner testSpanner = + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setUseMultiplexedSession(true) + .setUseMultiplexedSessionForRW(true) + .setUseMultiplexedSessionPartitionedOps(true) + .setWaitForMinSessionsDuration(Duration.ofSeconds(1)) + .setFailOnSessionLeak() + .build()) + .build() + .getService(); + + DatabaseClientImpl client = + (DatabaseClientImpl) testSpanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) { + //noinspection StatementWithEmptyBody + while (resultSet.next()) { + // ignore + } + } + + List createSessionRequests = + mockSpanner.getRequestsOfType(CreateSessionRequest.class); + assertEquals(3, createSessionRequests.size()); + + List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + assertEquals(1, requests.size()); + + assertNotNull(client.multiplexedSessionDatabaseClient); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); + + testSpanner.close(); + } + + @Test + public void testRetryWithTheDatabaseNotFoundExceptionWithSessionCreationWaitTime() { + mockSpanner.setCreateSessionExecutionTime( + SimulatedExecutionTime.ofExceptions( + Collections.singletonList( + Status.NOT_FOUND.withDescription("Database not found.").asRuntimeException()))); + + Spanner testSpanner = + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setUseMultiplexedSession(true) + .setUseMultiplexedSessionForRW(true) + .setUseMultiplexedSessionPartitionedOps(true) + .setWaitForMinSessionsDuration(Duration.ofSeconds(1)) + .setFailOnSessionLeak() + .build()) + .build() + .getService(); + + assertThrows( + SpannerException.class, () -> testSpanner.getDatabaseClient(DatabaseId.of("p", "i", "d"))); + + List createSessionRequests = + mockSpanner.getRequestsOfType(CreateSessionRequest.class); + assertEquals(1, createSessionRequests.size()); + + List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + assertEquals(0, requests.size()); + + testSpanner.close(); + } + + @Test + public void testRetryWithNoSessionCreationWaitTime() { + mockSpanner.setCreateSessionExecutionTime( + SimulatedExecutionTime.ofExceptions( + Collections.singletonList( + Status.DEADLINE_EXCEEDED + .withDescription( + "CallOptions deadline exceeded after 22.986872393s. " + + "Name resolution delay 6.911918521 seconds. [closed=[], " + + "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]") + .asRuntimeException()))); + + Spanner testSpanner = + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setUseMultiplexedSession(true) + .setUseMultiplexedSessionForRW(true) + .setUseMultiplexedSessionPartitionedOps(true) + .setFailOnSessionLeak() + .build()) + .build() + .getService(); + + DatabaseClientImpl client = + (DatabaseClientImpl) testSpanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + SpannerException spannerException = + assertThrows( + SpannerException.class, + () -> { + try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) { + //noinspection StatementWithEmptyBody + while (resultSet.next()) { + // ignore + } + } + }); + assertEquals(ErrorCode.DEADLINE_EXCEEDED, spannerException.getErrorCode()); + + List createSessionRequests = + mockSpanner.getRequestsOfType(CreateSessionRequest.class); + assertEquals(1, createSessionRequests.size()); + + List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + assertEquals(0, requests.size()); + + testSpanner.close(); + } + + @Test + public void testRetryWithDelayedInResponseExceedsSessionCreationWaitTime() { + mockSpanner.setCreateSessionExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTimeAndExceptions( + 600, + 0, + Arrays.asList( + Status.DEADLINE_EXCEEDED + .withDescription( + "CallOptions deadline exceeded after 22.986872393s. " + + "Name resolution delay 6.911918521 seconds. [closed=[], " + + "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]") + .asRuntimeException(), + Status.DEADLINE_EXCEEDED + .withDescription( + "CallOptions deadline exceeded after 22.986872393s. " + + "Name resolution delay 6.911918521 seconds. [closed=[], " + + "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]") + .asRuntimeException()))); + + Spanner testSpanner = + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setUseMultiplexedSession(true) + .setUseMultiplexedSessionForRW(true) + .setUseMultiplexedSessionPartitionedOps(true) + .setWaitForMinSessionsDuration(Duration.ofMillis(1000)) + .setFailOnSessionLeak() + .build()) + .build() + .getService(); + + SpannerException spannerException = + assertThrows( + SpannerException.class, + () -> { + DatabaseClientImpl client = + (DatabaseClientImpl) testSpanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) { + //noinspection StatementWithEmptyBody + while (resultSet.next()) { + // ignore + } + } + }); + assertEquals(ErrorCode.DEADLINE_EXCEEDED, spannerException.getErrorCode()); + + List createSessionRequests = + mockSpanner.getRequestsOfType(CreateSessionRequest.class); + assertEquals(2, createSessionRequests.size()); + + List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + assertEquals(0, requests.size()); + + testSpanner.close(); + } + + @Test + public void testRetryWithDelayInExceptionWithInSessionCreationWaitTime() { + mockSpanner.setCreateSessionExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTimeAndExceptions( + 200, + 0, + Arrays.asList( + Status.DEADLINE_EXCEEDED + .withDescription( + "CallOptions deadline exceeded after 22.986872393s. " + + "Name resolution delay 6.911918521 seconds. [closed=[], " + + "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]") + .asRuntimeException(), + Status.DEADLINE_EXCEEDED + .withDescription( + "CallOptions deadline exceeded after 22.986872393s. " + + "Name resolution delay 6.911918521 seconds. [closed=[], " + + "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]") + .asRuntimeException()))); + + Spanner testSpanner = + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setUseMultiplexedSession(true) + .setUseMultiplexedSessionForRW(true) + .setUseMultiplexedSessionPartitionedOps(true) + .setWaitForMinSessionsDuration(Duration.ofMillis(1000)) + .setFailOnSessionLeak() + .build()) + .build() + .getService(); + + DatabaseClientImpl client = + (DatabaseClientImpl) testSpanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) { + //noinspection StatementWithEmptyBody + while (resultSet.next()) { + // ignore + } + } + + List createSessionRequests = + mockSpanner.getRequestsOfType(CreateSessionRequest.class); + assertEquals(3, createSessionRequests.size()); + + List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + assertEquals(1, requests.size()); + + testSpanner.close(); + } + @Test public void testUnimplementedErrorOnCreation_fallsBackToRegularSessions() { mockSpanner.setCreateSessionExecutionTime( From 103b5bf9805898aef35ce09a362b56fbeac9490b Mon Sep 17 00:00:00 2001 From: Sakthivel Subramanian Date: Wed, 7 Jan 2026 14:37:53 +0530 Subject: [PATCH 2/2] Reduce wait time to run tests faster --- ...ltiplexedSessionDatabaseClientMockServerTest.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index cc60b646132..ee8c286da5f 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -285,7 +285,7 @@ public void testRetryWithTheDatabaseNotFoundExceptionWithSessionCreationWaitTime .setUseMultiplexedSession(true) .setUseMultiplexedSessionForRW(true) .setUseMultiplexedSessionPartitionedOps(true) - .setWaitForMinSessionsDuration(Duration.ofSeconds(1)) + .setWaitForMinSessionsDuration(Duration.ofMillis(200)) .setFailOnSessionLeak() .build()) .build() @@ -361,7 +361,7 @@ public void testRetryWithNoSessionCreationWaitTime() { public void testRetryWithDelayedInResponseExceedsSessionCreationWaitTime() { mockSpanner.setCreateSessionExecutionTime( SimulatedExecutionTime.ofMinimumAndRandomTimeAndExceptions( - 600, + 150, 0, Arrays.asList( Status.DEADLINE_EXCEEDED @@ -370,7 +370,7 @@ public void testRetryWithDelayedInResponseExceedsSessionCreationWaitTime() { + "Name resolution delay 6.911918521 seconds. [closed=[], " + "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]") .asRuntimeException(), - Status.DEADLINE_EXCEEDED + Status.UNAVAILABLE .withDescription( "CallOptions deadline exceeded after 22.986872393s. " + "Name resolution delay 6.911918521 seconds. [closed=[], " @@ -387,7 +387,7 @@ public void testRetryWithDelayedInResponseExceedsSessionCreationWaitTime() { .setUseMultiplexedSession(true) .setUseMultiplexedSessionForRW(true) .setUseMultiplexedSessionPartitionedOps(true) - .setWaitForMinSessionsDuration(Duration.ofMillis(1000)) + .setWaitForMinSessionsDuration(Duration.ofMillis(200)) .setFailOnSessionLeak() .build()) .build() @@ -423,7 +423,7 @@ public void testRetryWithDelayedInResponseExceedsSessionCreationWaitTime() { public void testRetryWithDelayInExceptionWithInSessionCreationWaitTime() { mockSpanner.setCreateSessionExecutionTime( SimulatedExecutionTime.ofMinimumAndRandomTimeAndExceptions( - 200, + 50, 0, Arrays.asList( Status.DEADLINE_EXCEEDED @@ -449,7 +449,7 @@ public void testRetryWithDelayInExceptionWithInSessionCreationWaitTime() { .setUseMultiplexedSession(true) .setUseMultiplexedSessionForRW(true) .setUseMultiplexedSessionPartitionedOps(true) - .setWaitForMinSessionsDuration(Duration.ofMillis(1000)) + .setWaitForMinSessionsDuration(Duration.ofMillis(200)) .setFailOnSessionLeak() .build()) .build()