From 8ccb900af7fbc4c58051ea48a466b3f82f87fee3 Mon Sep 17 00:00:00 2001 From: "d.zharikhin" Date: Fri, 20 Aug 2021 16:27:42 +0300 Subject: [PATCH 1/2] start cache from specific index --- .../com/orbitz/consul/cache/ConsulCache.java | 107 +++++++++--------- .../orbitz/consul/cache/HealthCheckCache.java | 34 +++++- .../java/com/orbitz/consul/cache/KVCache.java | 34 +++++- .../consul/cache/NodesCatalogCache.java | 31 ++++- .../consul/cache/ServiceCatalogCache.java | 34 +++++- .../consul/cache/ServiceHealthCache.java | 39 ++++++- .../cache/IndexAwareStubCallbackConsumer.java | 32 ++++++ .../orbitz/consul/config/CacheConfigTest.java | 2 +- 8 files changed, 236 insertions(+), 77 deletions(-) create mode 100644 src/test/java/com/orbitz/consul/cache/IndexAwareStubCallbackConsumer.java diff --git a/src/main/java/com/orbitz/consul/cache/ConsulCache.java b/src/main/java/com/orbitz/consul/cache/ConsulCache.java index de33831f..f8263463 100644 --- a/src/main/java/com/orbitz/consul/cache/ConsulCache.java +++ b/src/main/java/com/orbitz/consul/cache/ConsulCache.java @@ -34,6 +34,9 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; +import java.util.function.Supplier; + +import javax.annotation.Nullable; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; @@ -49,7 +52,7 @@ enum State {latent, starting, started, stopped } private final static Logger LOGGER = LoggerFactory.getLogger(ConsulCache.class); - private final AtomicReference latestIndex = new AtomicReference<>(null); + private final AtomicReference latestIndex; private final AtomicLong lastContact = new AtomicLong(); private final AtomicBoolean isKnownLeader = new AtomicBoolean(); private final AtomicReference lastCacheInfo = new AtomicReference<>(null); @@ -72,9 +75,10 @@ protected ConsulCache( CallbackConsumer callbackConsumer, CacheConfig cacheConfig, ClientEventHandler eventHandler, - CacheDescriptor cacheDescriptor) { + CacheDescriptor cacheDescriptor, + @Nullable BigInteger initialIndex) { - this(keyConversion, callbackConsumer, cacheConfig, eventHandler, cacheDescriptor, createDefault()); + this(keyConversion, callbackConsumer, cacheConfig, eventHandler, cacheDescriptor, createDefault(), initialIndex); } protected ConsulCache( @@ -83,9 +87,12 @@ protected ConsulCache( CacheConfig cacheConfig, ClientEventHandler eventHandler, CacheDescriptor cacheDescriptor, - ScheduledExecutorService callbackScheduleExecutorService) { + ScheduledExecutorService callbackScheduleExecutorService, + @Nullable BigInteger initialIndex) { - this(keyConversion, callbackConsumer, cacheConfig, eventHandler, cacheDescriptor, new ExternalScheduler(callbackScheduleExecutorService)); + this(keyConversion, callbackConsumer, cacheConfig, eventHandler, cacheDescriptor, + new ExternalScheduler(callbackScheduleExecutorService), initialIndex + ); } protected ConsulCache( @@ -94,20 +101,14 @@ protected ConsulCache( CacheConfig cacheConfig, ClientEventHandler eventHandler, CacheDescriptor cacheDescriptor, - Scheduler callbackScheduler) { - if (keyConversion == null) { - Validate.notNull(keyConversion, "keyConversion must not be null"); - } - if (callbackConsumer == null) { - Validate.notNull(callbackConsumer, "callbackConsumer must not be null"); - } - if (cacheConfig == null) { - Validate.notNull(cacheConfig, "cacheConfig must not be null"); - } - if (eventHandler == null) { - Validate.notNull(eventHandler, "eventHandler must not be null"); - } - + Scheduler callbackScheduler, + @Nullable BigInteger initialIndex) { + Validate.notNull(keyConversion, "keyConversion must not be null"); + Validate.notNull(callbackConsumer, "callbackConsumer must not be null"); + Validate.notNull(cacheConfig, "cacheConfig must not be null"); + Validate.notNull(eventHandler, "eventHandler must not be null"); + + latestIndex = new AtomicReference<>(initialIndex); this.keyConversion = keyConversion; this.callBackConsumer = callbackConsumer; this.eventHandler = eventHandler; @@ -138,15 +139,8 @@ public void onComplete(ConsulResponse> consulResponse) { // metadata changes lastContact.set(consulResponse.getLastContact()); isKnownLeader.set(consulResponse.isKnownLeader()); - } - if (changed) { - Boolean locked = false; - if (state.get() == State.starting) { - listenersStartingLock.lock(); - locked = true; - } - try { + withStartingLock(() -> { for (Listener l : listeners) { try { l.notify(full); @@ -154,12 +148,8 @@ public void onComplete(ConsulResponse> consulResponse) { LOGGER.warn("ConsulCache Listener's notify method threw an exception.", e); } } - } - finally { - if (locked) { - listenersStartingLock.unlock(); - } - } + return null; + }); } if (state.compareAndSet(State.starting, State.started)) { @@ -173,8 +163,7 @@ public void onComplete(ConsulResponse> consulResponse) { } timeToWait = timeToWait.minusMillis(elapsedTime); - scheduler.schedule(ConsulCache.this::runCallback, - timeToWait.toMillis(), TimeUnit.MILLISECONDS); + scheduler.schedule(ConsulCache.this::runCallback, timeToWait.toMillis(), TimeUnit.MILLISECONDS); } else { onFailure(new ConsulException("Consul cluster has no elected leader")); @@ -198,6 +187,21 @@ public void onFailure(Throwable throwable) { }; } + private T withStartingLock(Supplier action) { + boolean wasInStartingState = state.get() == State.starting; + if (wasInStartingState) { + listenersStartingLock.lock(); + } + try { + return action.get(); + } + finally { + if (wasInStartingState) { + listenersStartingLock.unlock(); + } + } + } + static long computeBackOffDelayMs(CacheConfig cacheConfig) { return cacheConfig.getMinimumBackOffDelay().toMillis() + Math.round(Math.random() * (cacheConfig.getMaximumBackOffDelay().minus(cacheConfig.getMinimumBackOffDelay()).toMillis())); @@ -238,7 +242,8 @@ private void runCallback() { } private boolean isRunning() { - return state.get() == State.started || state.get() == State.starting; + State currentState = this.state.get(); + return currentState == State.started || currentState == State.starting; } public boolean awaitInitialized(long timeout, TimeUnit unit) throws InterruptedException { @@ -276,7 +281,8 @@ ImmutableMap convertToMap(final ConsulResponse> response) { private void updateIndex(ConsulResponse> consulResponse) { if (consulResponse != null && consulResponse.getIndex() != null) { - this.latestIndex.set(consulResponse.getIndex()); + BigInteger previousIndex = this.latestIndex.getAndSet(consulResponse.getIndex()); + LOGGER.trace("Updated cache index from {} to {}", previousIndex, latestIndex.get()); } } @@ -318,6 +324,7 @@ protected static Scheduler createExternal(ScheduledExecutorService executor) { * * @param */ + @FunctionalInterface protected interface CallbackConsumer { void consume(BigInteger index, ConsulResponseCallback> callback); } @@ -328,33 +335,23 @@ protected interface CallbackConsumer { * * @param */ + @FunctionalInterface public interface Listener { void notify(Map newValues); } public boolean addListener(Listener listener) { - Boolean locked = false; - boolean added; - if (state.get() == State.starting) { - listenersStartingLock.lock(); - locked = true; - } - try { - added = listeners.add(listener); - if (state.get() == State.started) { + return withStartingLock(() -> { + boolean added = listeners.add(listener); + if (this.state.get() == State.started) { try { listener.notify(lastResponse.get()); } catch (RuntimeException e) { LOGGER.warn("ConsulCache Listener's notify method threw an exception.", e); } } - } - finally { - if (locked) { - listenersStartingLock.unlock(); - } - } - return added; + return added; + }); } public List> getListeners() { @@ -396,9 +393,9 @@ public DefaultScheduler() { } } - private static class ExternalScheduler extends Scheduler { + private static final class ExternalScheduler extends Scheduler { - public ExternalScheduler(ScheduledExecutorService executor) { + private ExternalScheduler(ScheduledExecutorService executor) { super(executor); } diff --git a/src/main/java/com/orbitz/consul/cache/HealthCheckCache.java b/src/main/java/com/orbitz/consul/cache/HealthCheckCache.java index 0ac96dd5..612717a7 100644 --- a/src/main/java/com/orbitz/consul/cache/HealthCheckCache.java +++ b/src/main/java/com/orbitz/consul/cache/HealthCheckCache.java @@ -6,6 +6,7 @@ import com.orbitz.consul.model.health.HealthCheck; import com.orbitz.consul.option.QueryOptions; +import java.math.BigInteger; import java.util.concurrent.ScheduledExecutorService; import java.util.function.Function; @@ -16,7 +17,8 @@ private HealthCheckCache(HealthClient healthClient, int watchSeconds, QueryOptions queryOptions, Function keyExtractor, - Scheduler callbackScheduler) { + Scheduler callbackScheduler, + BigInteger initialIndex) { super(keyExtractor, (index, callback) -> { checkWatch(healthClient.getNetworkTimeoutConfig().getClientReadTimeoutMillis(), watchSeconds); @@ -26,7 +28,8 @@ private HealthCheckCache(HealthClient healthClient, healthClient.getConfig().getCacheConfig(), healthClient.getEventHandler(), new CacheDescriptor("health.state", state.getName()), - callbackScheduler); + callbackScheduler, + initialIndex); } /** @@ -47,7 +50,20 @@ public static HealthCheckCache newCache( final ScheduledExecutorService callbackExecutorService) { Scheduler callbackScheduler = createExternal(callbackExecutorService); - return new HealthCheckCache(healthClient, state, watchSeconds, queryOptions, keyExtractor, callbackScheduler); + return new HealthCheckCache(healthClient, state, watchSeconds, queryOptions, keyExtractor, callbackScheduler, null); + } + + public static HealthCheckCache newCache( + HealthClient healthClient, + com.orbitz.consul.model.State state, + int watchSeconds, + QueryOptions queryOptions, + BigInteger initialIndex, + Function keyExtractor, + ScheduledExecutorService callbackExecutorService) { + + Scheduler callbackScheduler = createExternal(callbackExecutorService); + return new HealthCheckCache(healthClient, state, watchSeconds, queryOptions, keyExtractor, callbackScheduler, initialIndex); } public static HealthCheckCache newCache( @@ -57,7 +73,7 @@ public static HealthCheckCache newCache( final QueryOptions queryOptions, final Function keyExtractor) { - return new HealthCheckCache(healthClient, state, watchSeconds, queryOptions, keyExtractor, createDefault()); + return new HealthCheckCache(healthClient, state, watchSeconds, queryOptions, keyExtractor, createDefault(), null); } public static HealthCheckCache newCache( final HealthClient healthClient, @@ -68,6 +84,15 @@ public static HealthCheckCache newCache( return newCache(healthClient, state, watchSeconds, queryOptions, HealthCheck::getCheckId); } + public static HealthCheckCache newCache( + HealthClient healthClient, + com.orbitz.consul.model.State state, + int watchSeconds, + QueryOptions queryOptions, BigInteger initialIndex) { + + return new HealthCheckCache(healthClient, state, watchSeconds, queryOptions, HealthCheck::getCheckId, createDefault(), initialIndex); + } + public static HealthCheckCache newCache( final HealthClient healthClient, final com.orbitz.consul.model.State state, @@ -76,6 +101,7 @@ public static HealthCheckCache newCache( return newCache(healthClient, state, watchSeconds, QueryOptions.BLANK); } + @Deprecated public static HealthCheckCache newCache(final HealthClient healthClient, final com.orbitz.consul.model.State state) { CacheConfig cacheConfig = healthClient.getConfig().getCacheConfig(); int watchSeconds = Ints.checkedCast(cacheConfig.getWatchDuration().getSeconds()); diff --git a/src/main/java/com/orbitz/consul/cache/KVCache.java b/src/main/java/com/orbitz/consul/cache/KVCache.java index aa67b12c..9ea3ca33 100644 --- a/src/main/java/com/orbitz/consul/cache/KVCache.java +++ b/src/main/java/com/orbitz/consul/cache/KVCache.java @@ -8,6 +8,7 @@ import com.orbitz.consul.model.kv.Value; import com.orbitz.consul.option.QueryOptions; +import java.math.BigInteger; import java.util.concurrent.ScheduledExecutorService; import java.util.function.Function; @@ -18,7 +19,8 @@ private KVCache(KeyValueClient kvClient, String keyPath, int watchSeconds, QueryOptions queryOptions, - Scheduler callbackScheduler) { + Scheduler callbackScheduler, + BigInteger initialIndex) { super(getKeyExtractorFunction(keyPath), (index, callback) -> { checkWatch(kvClient.getNetworkTimeoutConfig().getClientReadTimeoutMillis(), watchSeconds); @@ -28,7 +30,8 @@ private KVCache(KeyValueClient kvClient, kvClient.getConfig().getCacheConfig(), kvClient.getEventHandler(), new CacheDescriptor("keyvalue", rootPath), - callbackScheduler); + callbackScheduler, + initialIndex); } @VisibleForTesting @@ -56,7 +59,19 @@ public static KVCache newCache( final ScheduledExecutorService callbackExecutorService) { Scheduler scheduler = createExternal(callbackExecutorService); - return new KVCache(kvClient, rootPath, prepareRootPath(rootPath), watchSeconds, queryOptions, scheduler); + return new KVCache(kvClient, rootPath, prepareRootPath(rootPath), watchSeconds, queryOptions, scheduler, null); + } + + public static KVCache newCache( + KeyValueClient kvClient, + String rootPath, + int watchSeconds, + QueryOptions queryOptions, + BigInteger initialIndex, + ScheduledExecutorService callbackExecutorService) { + + Scheduler scheduler = createExternal(callbackExecutorService); + return new KVCache(kvClient, rootPath, prepareRootPath(rootPath), watchSeconds, queryOptions, scheduler, initialIndex); } public static KVCache newCache( @@ -64,7 +79,16 @@ public static KVCache newCache( final String rootPath, final int watchSeconds, final QueryOptions queryOptions) { - return new KVCache(kvClient, rootPath, prepareRootPath(rootPath), watchSeconds, queryOptions, createDefault()); + return new KVCache(kvClient, rootPath, prepareRootPath(rootPath), watchSeconds, queryOptions, createDefault(), null); + } + + public static KVCache newCache( + KeyValueClient kvClient, + String rootPath, + int watchSeconds, + BigInteger initialIndex, + QueryOptions queryOptions) { + return new KVCache(kvClient, rootPath, prepareRootPath(rootPath), watchSeconds, queryOptions, createDefault(), initialIndex); } @VisibleForTesting @@ -82,6 +106,7 @@ static String prepareRootPath(String rootPath) { * to be increased as well) * @return the cache object */ + @Deprecated public static KVCache newCache( final KeyValueClient kvClient, final String rootPath, @@ -97,6 +122,7 @@ public static KVCache newCache( * @param rootPath the root path * @return the cache object */ + @Deprecated public static KVCache newCache(final KeyValueClient kvClient, final String rootPath) { CacheConfig cacheConfig = kvClient.getConfig().getCacheConfig(); int watchSeconds = Ints.checkedCast(cacheConfig.getWatchDuration().getSeconds()); diff --git a/src/main/java/com/orbitz/consul/cache/NodesCatalogCache.java b/src/main/java/com/orbitz/consul/cache/NodesCatalogCache.java index 6b776460..713c6050 100644 --- a/src/main/java/com/orbitz/consul/cache/NodesCatalogCache.java +++ b/src/main/java/com/orbitz/consul/cache/NodesCatalogCache.java @@ -6,6 +6,7 @@ import com.orbitz.consul.model.health.Node; import com.orbitz.consul.option.QueryOptions; +import java.math.BigInteger; import java.util.concurrent.ScheduledExecutorService; public class NodesCatalogCache extends ConsulCache { @@ -13,7 +14,8 @@ public class NodesCatalogCache extends ConsulCache { private NodesCatalogCache(CatalogClient catalogClient, QueryOptions queryOptions, int watchSeconds, - Scheduler callbackScheduler) { + Scheduler callbackScheduler, + BigInteger initialIndex) { super(Node::getNode, (index, callback) -> { checkWatch(catalogClient.getNetworkTimeoutConfig().getClientReadTimeoutMillis(), watchSeconds); @@ -22,7 +24,8 @@ private NodesCatalogCache(CatalogClient catalogClient, catalogClient.getConfig().getCacheConfig(), catalogClient.getEventHandler(), new CacheDescriptor("catalog.nodes"), - callbackScheduler); + callbackScheduler, + initialIndex); } public static NodesCatalogCache newCache( @@ -32,16 +35,36 @@ public static NodesCatalogCache newCache( final ScheduledExecutorService callbackExecutorService) { Scheduler scheduler = createExternal(callbackExecutorService); - return new NodesCatalogCache(catalogClient, queryOptions, watchSeconds, scheduler); + return new NodesCatalogCache(catalogClient, queryOptions, watchSeconds, scheduler, null); + } + + public static NodesCatalogCache newCache( + CatalogClient catalogClient, + QueryOptions queryOptions, + int watchSeconds, + BigInteger initialIndex, + ScheduledExecutorService callbackExecutorService) { + + Scheduler scheduler = createExternal(callbackExecutorService); + return new NodesCatalogCache(catalogClient, queryOptions, watchSeconds, scheduler, initialIndex); } public static NodesCatalogCache newCache( final CatalogClient catalogClient, final QueryOptions queryOptions, final int watchSeconds) { - return new NodesCatalogCache(catalogClient, queryOptions, watchSeconds, createDefault()); + return new NodesCatalogCache(catalogClient, queryOptions, watchSeconds, createDefault(), null); + } + + public static NodesCatalogCache newCache( + CatalogClient catalogClient, + QueryOptions queryOptions, + int watchSeconds, + BigInteger initialIndex) { + return new NodesCatalogCache(catalogClient, queryOptions, watchSeconds, createDefault(), initialIndex); } + @Deprecated public static NodesCatalogCache newCache(final CatalogClient catalogClient) { CacheConfig cacheConfig = catalogClient.getConfig().getCacheConfig(); int watchSeconds = Ints.checkedCast(cacheConfig.getWatchDuration().getSeconds()); diff --git a/src/main/java/com/orbitz/consul/cache/ServiceCatalogCache.java b/src/main/java/com/orbitz/consul/cache/ServiceCatalogCache.java index 5c3b1bf9..0b7f7dc4 100644 --- a/src/main/java/com/orbitz/consul/cache/ServiceCatalogCache.java +++ b/src/main/java/com/orbitz/consul/cache/ServiceCatalogCache.java @@ -5,6 +5,7 @@ import com.orbitz.consul.config.CacheConfig; import com.orbitz.consul.model.catalog.CatalogService; import com.orbitz.consul.option.QueryOptions; +import java.math.BigInteger; import java.util.concurrent.ScheduledExecutorService; public class ServiceCatalogCache extends ConsulCache { @@ -13,7 +14,8 @@ private ServiceCatalogCache(CatalogClient catalogClient, String serviceName, QueryOptions queryOptions, int watchSeconds, - Scheduler callbackScheduler) { + Scheduler callbackScheduler, + BigInteger initialIndex) { super(CatalogService::getServiceId, (index, callback) -> { @@ -23,7 +25,8 @@ private ServiceCatalogCache(CatalogClient catalogClient, catalogClient.getConfig().getCacheConfig(), catalogClient.getEventHandler(), new CacheDescriptor("catalog.service", serviceName), - callbackScheduler); + callbackScheduler, + initialIndex); } public static ServiceCatalogCache newCache( @@ -34,7 +37,19 @@ public static ServiceCatalogCache newCache( final ScheduledExecutorService callbackExecutorService) { Scheduler scheduler = createExternal(callbackExecutorService); - return new ServiceCatalogCache(catalogClient, serviceName, queryOptions, watchSeconds, scheduler); + return new ServiceCatalogCache(catalogClient, serviceName, queryOptions, watchSeconds, scheduler, null); + } + + public static ServiceCatalogCache newCache( + CatalogClient catalogClient, + String serviceName, + QueryOptions queryOptions, + int watchSeconds, + BigInteger initialIndex, + ScheduledExecutorService callbackExecutorService) { + + Scheduler scheduler = createExternal(callbackExecutorService); + return new ServiceCatalogCache(catalogClient, serviceName, queryOptions, watchSeconds, scheduler, initialIndex); } public static ServiceCatalogCache newCache( @@ -43,9 +58,20 @@ public static ServiceCatalogCache newCache( final QueryOptions queryOptions, final int watchSeconds) { - return new ServiceCatalogCache(catalogClient, serviceName, queryOptions, watchSeconds, createDefault()); + return new ServiceCatalogCache(catalogClient, serviceName, queryOptions, watchSeconds, createDefault(), null); + } + + public static ServiceCatalogCache newCache( + CatalogClient catalogClient, + String serviceName, + QueryOptions queryOptions, + int watchSeconds, + BigInteger initialIndex) { + + return new ServiceCatalogCache(catalogClient, serviceName, queryOptions, watchSeconds, createDefault(), initialIndex); } + @Deprecated public static ServiceCatalogCache newCache(final CatalogClient catalogClient, final String serviceName) { CacheConfig cacheConfig = catalogClient.getConfig().getCacheConfig(); int watchSeconds = Ints.checkedCast(cacheConfig.getWatchDuration().getSeconds()); diff --git a/src/main/java/com/orbitz/consul/cache/ServiceHealthCache.java b/src/main/java/com/orbitz/consul/cache/ServiceHealthCache.java index ef7eda3f..81a58a45 100644 --- a/src/main/java/com/orbitz/consul/cache/ServiceHealthCache.java +++ b/src/main/java/com/orbitz/consul/cache/ServiceHealthCache.java @@ -7,8 +7,10 @@ import com.orbitz.consul.model.health.ServiceHealth; import com.orbitz.consul.option.QueryOptions; +import java.math.BigInteger; import java.util.concurrent.ScheduledExecutorService; import java.util.function.Function; +import javax.annotation.Nullable; public class ServiceHealthCache extends ConsulCache { @@ -18,7 +20,7 @@ private ServiceHealthCache(HealthClient healthClient, int watchSeconds, QueryOptions queryOptions, Function keyExtractor, - Scheduler callbackScheduler) { + Scheduler callbackScheduler, @Nullable BigInteger initialIndex) { super(keyExtractor, (index, callback) -> { checkWatch(healthClient.getNetworkTimeoutConfig().getClientReadTimeoutMillis(), watchSeconds); @@ -32,7 +34,8 @@ private ServiceHealthCache(HealthClient healthClient, healthClient.getConfig().getCacheConfig(), healthClient.getEventHandler(), new CacheDescriptor("health.service", serviceName), - callbackScheduler); + callbackScheduler, + initialIndex); } /** @@ -55,9 +58,23 @@ public static ServiceHealthCache newCache( final ScheduledExecutorService callbackExecutorService) { Scheduler scheduler = createExternal(callbackExecutorService); - return new ServiceHealthCache(healthClient, serviceName, passing, watchSeconds, queryOptions, keyExtractor, scheduler); + return new ServiceHealthCache(healthClient, serviceName, passing, watchSeconds, queryOptions, keyExtractor, scheduler, null); } + public static ServiceHealthCache newCache( + HealthClient healthClient, + String serviceName, + boolean passing, + int watchSeconds, + QueryOptions queryOptions, + BigInteger initialIndex, + Function keyExtractor, + ScheduledExecutorService callbackExecutorService) { + + Scheduler scheduler = createExternal(callbackExecutorService); + return new ServiceHealthCache(healthClient, serviceName, passing, watchSeconds, queryOptions, keyExtractor, scheduler, initialIndex); + } + public static ServiceHealthCache newCache( final HealthClient healthClient, final String serviceName, @@ -66,7 +83,7 @@ public static ServiceHealthCache newCache( final QueryOptions queryOptions, final Function keyExtractor) { - return new ServiceHealthCache(healthClient, serviceName, passing, watchSeconds, queryOptions, keyExtractor, createDefault()); + return new ServiceHealthCache(healthClient, serviceName, passing, watchSeconds, queryOptions, keyExtractor, createDefault(), null); } public static ServiceHealthCache newCache( @@ -78,7 +95,19 @@ public static ServiceHealthCache newCache( return newCache(healthClient, serviceName, passing, watchSeconds, queryOptions, ServiceHealthKey::fromServiceHealth); } - + + public static ServiceHealthCache newCache( + HealthClient healthClient, + String serviceName, + boolean passing, + int watchSeconds, + BigInteger initialIndex, + QueryOptions queryOptions) { + return new ServiceHealthCache(healthClient, serviceName, passing, watchSeconds, queryOptions, ServiceHealthKey::fromServiceHealth, + createDefault(), initialIndex + ); + } + public static ServiceHealthCache newCache( final HealthClient healthClient, final String serviceName, diff --git a/src/test/java/com/orbitz/consul/cache/IndexAwareStubCallbackConsumer.java b/src/test/java/com/orbitz/consul/cache/IndexAwareStubCallbackConsumer.java new file mode 100644 index 00000000..10b36224 --- /dev/null +++ b/src/test/java/com/orbitz/consul/cache/IndexAwareStubCallbackConsumer.java @@ -0,0 +1,32 @@ +package com.orbitz.consul.cache; + +import com.orbitz.consul.async.ConsulResponseCallback; +import com.orbitz.consul.model.ConsulResponse; +import com.orbitz.consul.model.kv.Value; + +import java.math.BigInteger; +import java.util.Collections; +import java.util.List; + +/** + * + */ +public class IndexAwareStubCallbackConsumer implements ConsulCache.CallbackConsumer { + + private final List result; + private BigInteger index; + + public IndexAwareStubCallbackConsumer(List result) { + this.result = Collections.unmodifiableList(result); + } + + @Override + public void consume(BigInteger index, ConsulResponseCallback> callback) { + this.index = index; + callback.onComplete(new ConsulResponse<>(result, 0, true, BigInteger.ZERO, null, null)); + } + + public BigInteger getIndex() { + return index; + } +} diff --git a/src/test/java/com/orbitz/consul/config/CacheConfigTest.java b/src/test/java/com/orbitz/consul/config/CacheConfigTest.java index d4283afc..7ed89e37 100644 --- a/src/test/java/com/orbitz/consul/config/CacheConfigTest.java +++ b/src/test/java/com/orbitz/consul/config/CacheConfigTest.java @@ -206,7 +206,7 @@ public void testMinDelayOnEmptyResultWithResults() throws InterruptedException { static class TestCache extends ConsulCache { private TestCache(Function keyConversion, CallbackConsumer callbackConsumer, CacheConfig cacheConfig, ClientEventHandler eventHandler, CacheDescriptor cacheDescriptor) { - super(keyConversion, callbackConsumer, cacheConfig, eventHandler, cacheDescriptor); + super(keyConversion, callbackConsumer, cacheConfig, eventHandler, cacheDescriptor, null); } static TestCache createCache(CacheConfig config, Supplier> res) { From aa385b224dc8f78c3b8e43742e916e6fee70eab7 Mon Sep 17 00:00:00 2001 From: "d.zharikhin" Date: Fri, 20 Aug 2021 16:27:56 +0300 Subject: [PATCH 2/2] test --- .../orbitz/consul/cache/ConsulCacheTest.java | 45 ++++++++++++++++--- ...ava => CountingCallsCallbackConsumer.java} | 4 +- 2 files changed, 40 insertions(+), 9 deletions(-) rename src/test/java/com/orbitz/consul/cache/{StubCallbackConsumer.java => CountingCallsCallbackConsumer.java} (81%) diff --git a/src/test/java/com/orbitz/consul/cache/ConsulCacheTest.java b/src/test/java/com/orbitz/consul/cache/ConsulCacheTest.java index 21a0dbd7..dd64550d 100644 --- a/src/test/java/com/orbitz/consul/cache/ConsulCacheTest.java +++ b/src/test/java/com/orbitz/consul/cache/ConsulCacheTest.java @@ -49,9 +49,11 @@ public void testDuplicateServicesDontCauseFailure() { CacheConfig cacheConfig = mock(CacheConfig.class); ClientEventHandler eventHandler = mock(ClientEventHandler.class); - final StubCallbackConsumer callbackConsumer = new StubCallbackConsumer(Collections.emptyList()); + final CountingCallsCallbackConsumer callbackConsumer = new CountingCallsCallbackConsumer(Collections.emptyList()); - final ConsulCache consulCache = new ConsulCache<>(keyExtractor, callbackConsumer, cacheConfig, eventHandler, new CacheDescriptor("")); + final ConsulCache consulCache = new ConsulCache<>(keyExtractor, callbackConsumer, cacheConfig, eventHandler, + new CacheDescriptor(""), null + ); final ConsulResponse> consulResponse = new ConsulResponse<>(response, 0, false, BigInteger.ONE, null, null); final ImmutableMap map = consulCache.convertToMap(consulResponse); assertNotNull(map); @@ -143,11 +145,11 @@ public void testListenerIsCalled() { .flags(0) .build(); final List result = Collections.singletonList(value); - final StubCallbackConsumer callbackConsumer = new StubCallbackConsumer( + final CountingCallsCallbackConsumer callbackConsumer = new CountingCallsCallbackConsumer( result); final ConsulCache cache = new ConsulCache<>(keyExtractor, callbackConsumer, cacheConfig, - eventHandler, new CacheDescriptor("")); + eventHandler, new CacheDescriptor(""), null); try { final StubListener listener = new StubListener(); @@ -167,6 +169,35 @@ public void testListenerIsCalled() { } } + @Test + public void testCacheStartsWithInitialIndex() { + final Function keyExtractor = Value::getKey; + final CacheConfig cacheConfig = CacheConfig.builder().build(); + ClientEventHandler eventHandler = mock(ClientEventHandler.class); + + final String key = "foo"; + final ImmutableValue value = ImmutableValue.builder() + .createIndex(1) + .modifyIndex(2) + .lockIndex(2) + .key(key) + .flags(0) + .build(); + final List result = Collections.singletonList(value); + final IndexAwareStubCallbackConsumer callbackConsumer = new IndexAwareStubCallbackConsumer(result); + + BigInteger expectedIndex = BigInteger.valueOf(23); + final ConsulCache cache = new ConsulCache<>(keyExtractor, callbackConsumer, cacheConfig, + eventHandler, new CacheDescriptor(""), expectedIndex + ); + try { + cache.start(); + assertEquals(expectedIndex, callbackConsumer.getIndex()); + } finally { + cache.stop(); + } + } + @Test public void testListenerThrowingExceptionIsIsolated() throws InterruptedException { final Function keyExtractor = Value::getKey; @@ -186,7 +217,7 @@ public void testListenerThrowingExceptionIsIsolated() throws InterruptedExceptio final List result = Collections.singletonList(value); try (final AsyncCallbackConsumer callbackConsumer = new AsyncCallbackConsumer(result)) { try (final ConsulCache cache = new ConsulCache<>(keyExtractor, callbackConsumer, cacheConfig, - eventHandler, new CacheDescriptor(""))) { + eventHandler, new CacheDescriptor(""), null)) { final StubListener goodListener = new StubListener(); final AlwaysThrowsListener badListener1 = new AlwaysThrowsListener(); @@ -232,11 +263,11 @@ public void testExceptionReceivedFromListenerWhenAlreadyStarted() { .flags(0) .build(); final List result = Collections.singletonList(value); - final StubCallbackConsumer callbackConsumer = new StubCallbackConsumer( + final CountingCallsCallbackConsumer callbackConsumer = new CountingCallsCallbackConsumer( result); try (final ConsulCache cache = new ConsulCache<>(keyExtractor, callbackConsumer, cacheConfig, - eventHandler, new CacheDescriptor(""))) { + eventHandler, new CacheDescriptor(""), null)) { final AlwaysThrowsListener badListener = new AlwaysThrowsListener(); diff --git a/src/test/java/com/orbitz/consul/cache/StubCallbackConsumer.java b/src/test/java/com/orbitz/consul/cache/CountingCallsCallbackConsumer.java similarity index 81% rename from src/test/java/com/orbitz/consul/cache/StubCallbackConsumer.java rename to src/test/java/com/orbitz/consul/cache/CountingCallsCallbackConsumer.java index 0e32e9ee..b5e14896 100644 --- a/src/test/java/com/orbitz/consul/cache/StubCallbackConsumer.java +++ b/src/test/java/com/orbitz/consul/cache/CountingCallsCallbackConsumer.java @@ -11,12 +11,12 @@ /** * */ -public class StubCallbackConsumer implements ConsulCache.CallbackConsumer { +public class CountingCallsCallbackConsumer implements ConsulCache.CallbackConsumer { private final List result; private int callCount; - public StubCallbackConsumer(List result) { + public CountingCallsCallbackConsumer(List result) { this.result = Collections.unmodifiableList(result); }