@@ -38,21 +38,23 @@ private enum TokenState {
3838 // monthly downtime allowed by a 99.99% uptime SLA (~4.38 minutes) while increasing the likelihood
3939 // that the token is refreshed asynchronously if the auth server is down.
4040 private static final Duration MAX_STALE_DURATION = Duration .ofMinutes (20 );
41+ // Delay before another async refresh may be attempted after an async refresh failure.
42+ private static final Duration ASYNC_REFRESH_RETRY_BACKOFF = Duration .ofMinutes (1 );
4143 // Default additional buffer before expiry to consider a token as expired.
4244 // This is 40 seconds by default since Azure Databricks rejects tokens that are within 30 seconds
4345 // of expiry.
4446 private static final Duration DEFAULT_EXPIRY_BUFFER = Duration .ofSeconds (40 );
4547
46- // The token source to use for refreshing the token .
48+ // Underlying token source used to fetch replacement tokens .
4749 private final TokenSource tokenSource ;
4850 // Whether asynchronous refresh is enabled.
4951 private boolean asyncDisabled = false ;
5052 // The legacy duration before expiry to consider a token as 'stale'.
5153 private final Duration staticStaleDuration ;
5254 // Whether to use the dynamic stale duration computation or defer to the legacy duration.
53- private final boolean useDynamicStaleDuration ;
55+ private final boolean useLegacyStaleDuration ;
5456 // The dynamically computed duration before expiry to consider a token as 'stale'.
55- private volatile Duration dynamicStaleDuration ;
57+ private volatile Instant staleAfter ;
5658 // Additional buffer before expiry to consider a token as expired.
5759 private final Duration expiryBuffer ;
5860 // Clock supplier for current time.
@@ -62,23 +64,16 @@ private enum TokenState {
6264 protected volatile Token token ;
6365 // Whether a refresh is currently in progress (for async refresh).
6466 private boolean refreshInProgress = false ;
65- // Whether the last refresh attempt succeeded.
66- private boolean lastRefreshSucceeded = true ;
6767
6868 private CachedTokenSource (Builder builder ) {
6969 this .tokenSource = builder .tokenSource ;
7070 this .asyncDisabled = builder .asyncDisabled ;
7171 this .staticStaleDuration = builder .staleDuration ;
72- this .useDynamicStaleDuration = builder .useDynamicStaleDuration ;
72+ this .useLegacyStaleDuration = builder .useLegacyStaleDuration ;
7373 this .expiryBuffer = builder .expiryBuffer ;
7474 this .clockSupplier = builder .clockSupplier ;
75- this .token = builder .token ;
7675
77- if (this .useDynamicStaleDuration && this .token != null ) {
78- this .dynamicStaleDuration = computeStaleDuration (this .token );
79- } else {
80- this .dynamicStaleDuration = Duration .ofMinutes (0 );
81- }
76+ this .updateToken (builder .token );
8277 }
8378
8479 /**
@@ -91,7 +86,7 @@ public static class Builder {
9186 private final TokenSource tokenSource ;
9287 private boolean asyncDisabled = false ;
9388 private Duration staleDuration = DEFAULT_STALE_DURATION ;
94- private boolean useDynamicStaleDuration = true ;
89+ private boolean useLegacyStaleDuration = false ;
9590 private Duration expiryBuffer = DEFAULT_EXPIRY_BUFFER ;
9691 private ClockSupplier clockSupplier = new UtcClockSupplier ();
9792 private Token token ;
@@ -139,15 +134,18 @@ public Builder setAsyncDisabled(boolean asyncDisabled) {
139134 * Sets the duration before token expiry at which the token is considered stale.
140135 *
141136 * <p>When asynchronous refresh is enabled, tokens that are stale but not yet expired will
142- * trigger a background refresh while continuing to serve the current token.
137+ * trigger a background refresh while continuing to serve the current token. Calling this method
138+ * opts into the legacy fixed stale-duration behavior instead of the default dynamic stale
139+ * computation, preserving backward compatibility for callers that already provide a custom
140+ * stale duration.
143141 *
144142 * @param staleDuration The duration before expiry to consider a token stale. Must be greater
145143 * than the expiry buffer duration.
146144 * @return This builder instance for method chaining.
147145 */
148146 public Builder setStaleDuration (Duration staleDuration ) {
149147 this .staleDuration = staleDuration ;
150- this .useDynamicStaleDuration = false ;
148+ this .useLegacyStaleDuration = true ;
151149 return this ;
152150 }
153151
@@ -190,6 +188,77 @@ public CachedTokenSource build() {
190188 }
191189 }
192190
191+ /**
192+ * Replaces the cached token and recomputes the time after which it should be treated as stale.
193+ *
194+ * <p>Legacy mode uses the configured fixed stale duration. Dynamic mode derives the stale window
195+ * from the token's remaining TTL and caps it at {@link #MAX_STALE_DURATION}. The stale threshold
196+ * is written before the volatile token write so readers that observe the new token also observe
197+ * the matching {@code staleAfter} value.
198+ *
199+ * @param t The token to cache. May be null.
200+ */
201+ private void updateToken (Token t ) {
202+ if (t == null ) {
203+ this .staleAfter = null ;
204+ this .token = t ;
205+ return ;
206+ }
207+
208+ if (t .getExpiry () == null ) {
209+ this .staleAfter = null ;
210+ this .token = t ;
211+ return ;
212+ }
213+
214+ if (this .useLegacyStaleDuration ) {
215+ this .staleAfter = t .getExpiry ().minus (staticStaleDuration );
216+ } else {
217+ Duration ttl = Duration .between (Instant .now (clockSupplier .getClock ()), t .getExpiry ());
218+ Duration staleDuration = ttl .dividedBy (2 );
219+ if (staleDuration .compareTo (MAX_STALE_DURATION ) > 0 ) {
220+ staleDuration = MAX_STALE_DURATION ;
221+ }
222+ if (staleDuration .compareTo (Duration .ZERO ) <= 0 ) {
223+ staleDuration = Duration .ZERO ;
224+ }
225+
226+ this .staleAfter = t .getExpiry ().minus (staleDuration );
227+ }
228+
229+ // Publish the token after staleAfter so readers that observe the new token also observe the
230+ // stale threshold computed for that token.
231+ this .token = t ;
232+ }
233+
234+ /**
235+ * Delays the next async refresh attempt after an async refresh failure.
236+ *
237+ * <p>The cached token remains usable until it becomes expired. Moving {@code staleAfter} into the
238+ * future prevents callers from immediately retrying async refresh on every stale read while the
239+ * auth service is unhealthy.
240+ */
241+ private void handleFailedAsyncRefresh () {
242+ synchronized (this ) {
243+ if (this .staleAfter != null ) {
244+ Instant now = Instant .now (clockSupplier .getClock ());
245+ this .staleAfter = now .plus (ASYNC_REFRESH_RETRY_BACKOFF );
246+ }
247+ }
248+ }
249+
250+ /**
251+ * Returns {@code true} when the currently cached token has a later expiry than {@code candidate},
252+ * meaning the candidate should be discarded. This prevents an async refresh that was started
253+ * before a blocking refresh from overwriting the newer token obtained by the blocking path.
254+ */
255+ private boolean cachedTokenIsNewer (Token candidate ) {
256+ return token != null
257+ && token .getExpiry () != null
258+ && candidate .getExpiry () != null
259+ && token .getExpiry ().isAfter (candidate .getExpiry ());
260+ }
261+
193262 /**
194263 * Gets the current token, refreshing if necessary. If async refresh is enabled, may return a
195264 * stale token while a refresh is in progress.
@@ -206,21 +275,6 @@ public Token getToken() {
206275 return getTokenAsync ();
207276 }
208277
209- private Duration computeStaleDuration (Token t ) {
210- if (t .getExpiry () == null ) {
211- return Duration .ZERO ; // Tokens with no expiry are considered permanent.
212- }
213-
214- Duration ttl = Duration .between (Instant .now (clockSupplier .getClock ()), t .getExpiry ());
215-
216- if (ttl .compareTo (Duration .ZERO ) <= 0 ) {
217- return Duration .ZERO ;
218- }
219-
220- Duration halfTtl = ttl .dividedBy (2 );
221- return halfTtl .compareTo (MAX_STALE_DURATION ) > 0 ? MAX_STALE_DURATION : halfTtl ;
222- }
223-
224278 /**
225279 * Determine the state of the current token (fresh, stale, or expired).
226280 *
@@ -234,12 +288,11 @@ protected TokenState getTokenState(Token t) {
234288 return TokenState .FRESH ; // Tokens with no expiry are considered permanent.
235289 }
236290
237- Duration lifeTime = Duration . between ( Instant .now (clockSupplier .getClock ()), t . getExpiry ());
238- if (lifeTime . compareTo ( expiryBuffer ) <= 0 ) {
291+ Instant now = Instant .now (clockSupplier .getClock ());
292+ if (now . isAfter ( t . getExpiry (). minus ( expiryBuffer )) ) {
239293 return TokenState .EXPIRED ;
240294 }
241- Duration staleDuration = useDynamicStaleDuration ? dynamicStaleDuration : staticStaleDuration ;
242- if (lifeTime .compareTo (staleDuration ) <= 0 ) {
295+ if (now .isAfter (staleAfter )) {
243296 return TokenState .STALE ;
244297 }
245298 return TokenState .FRESH ;
@@ -265,23 +318,15 @@ protected Token getTokenBlocking() {
265318 if (getTokenState (token ) != TokenState .EXPIRED ) {
266319 return token ;
267320 }
268- lastRefreshSucceeded = false ;
269321 Token newToken ;
270322 try {
271323 newToken = tokenSource .getToken ();
272324 } catch (Exception e ) {
273325 logger .error ("Failed to refresh token synchronously" , e );
274326 throw e ;
275327 }
276- lastRefreshSucceeded = true ;
277328
278- // Write dynamicStaleDuration before publishing the new token via the volatile write,
279- // so unsynchronized readers that see the new token are guaranteed to also see the
280- // updated dynamicStaleDuration.
281- if (useDynamicStaleDuration && newToken != null ) {
282- dynamicStaleDuration = computeStaleDuration (newToken );
283- }
284- token = newToken ;
329+ updateToken (newToken );
285330 return token ;
286331 }
287332 }
@@ -318,31 +363,28 @@ protected Token getTokenAsync() {
318363 private synchronized void triggerAsyncRefresh () {
319364 // Check token state again inside the synchronized block to avoid triggering a refresh if
320365 // another thread updated the token in the meantime.
321- if (!refreshInProgress && lastRefreshSucceeded && getTokenState (token ) != TokenState .FRESH ) {
322- refreshInProgress = true ;
323- CompletableFuture .runAsync (
324- () -> {
325- try {
326- // Attempt to refresh the token in the background.
327- Token newToken = tokenSource .getToken ();
328- synchronized (this ) {
329- // Write dynamicStaleDuration before publishing the new token via the volatile
330- // write, so unsynchronized readers that see the new token are guaranteed to also
331- // see the updated dynamicStaleDuration.
332- if (useDynamicStaleDuration && newToken != null ) {
333- dynamicStaleDuration = computeStaleDuration (newToken );
334- }
335- token = newToken ;
336- refreshInProgress = false ;
337- }
338- } catch (Exception e ) {
339- synchronized (this ) {
340- lastRefreshSucceeded = false ;
341- refreshInProgress = false ;
342- logger .error ("Asynchronous token refresh failed" , e );
366+ if (refreshInProgress || getTokenState (token ) != TokenState .STALE ) {
367+ return ;
368+ }
369+
370+ refreshInProgress = true ;
371+ CompletableFuture .runAsync (
372+ () -> {
373+ try {
374+ Token newToken = tokenSource .getToken ();
375+ synchronized (this ) {
376+ if (!cachedTokenIsNewer (newToken )) {
377+ updateToken (newToken );
343378 }
379+ refreshInProgress = false ;
344380 }
345- });
346- }
381+ } catch (Exception e ) {
382+ synchronized (this ) {
383+ handleFailedAsyncRefresh ();
384+ refreshInProgress = false ;
385+ logger .error ("Asynchronous token refresh failed" , e );
386+ }
387+ }
388+ });
347389 }
348390}
0 commit comments