From 73a4d1a3e02ec8779ad384b7548e404330e2d7fa Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Fri, 29 Sep 2023 09:42:13 -0700 Subject: [PATCH 01/12] [Tiered Caching] Enabling serialization for IndicesRequestCache key object Signed-off-by: Sagar Upadhyaya --- .../indices/IndicesRequestCacheIT.java | 40 +++++++++++ .../index/OpenSearchDirectoryReader.java | 52 +++++++++++++- .../indices/IndicesRequestCache.java | 72 +++++++++++++------ .../opensearch/indices/IndicesService.java | 21 ++++-- .../indices/IndicesRequestCacheTests.java | 72 ++++++++++++++----- .../indices/IndicesServiceCloseTests.java | 7 ++ 6 files changed, 222 insertions(+), 42 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index 98a22717019cf..a1815d9be2daf 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -634,6 +634,45 @@ public void testProfileDisableCache() throws Exception { } } + public void testCacheWithInvalidation() throws Exception { + Client client = client(); + assertAcked( + client.admin() + .indices() + .prepareCreate("index") + .setMapping("k", "type=keyword") + .setSettings( + Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + .get() + ); + indexRandom(true, client.prepareIndex("index").setSource("k", "hello")); + ensureSearchable("index"); + SearchResponse resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); + assertSearchResponse(resp); + OpenSearchAssertions.assertAllSuccessful(resp); + assertThat(resp.getHits().getTotalHits().value, equalTo(1L)); + + assertCacheState(client, "index", 0, 1); + // Index but don't refresh + indexRandom(false, client.prepareIndex("index").setSource("k", "hello2")); + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); + assertSearchResponse(resp); + // Should expect hit as here as refresh didn't happen + assertCacheState(client, "index", 1, 1); + + // Explicit refresh would invalidate cache + refresh(); + // Hit same query again + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); + assertSearchResponse(resp); + // Should expect miss as key has changed due to change in IndexReader.CacheKey (due to refresh) + assertCacheState(client, "index", 1, 2); + } + private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) { RequestCacheStats requestCacheStats = client.admin() .indices() @@ -648,6 +687,7 @@ private static void assertCacheState(Client client, String index, long expectedH Arrays.asList(expectedHits, expectedMisses, 0L), Arrays.asList(requestCacheStats.getHitCount(), requestCacheStats.getMissCount(), requestCacheStats.getEvictions()) ); + } } diff --git a/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java b/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java index 77609822d3d90..e5038436012dd 100644 --- a/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java +++ b/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java @@ -39,6 +39,7 @@ import org.opensearch.core.index.shard.ShardId; import java.io.IOException; +import java.util.UUID; /** * A {@link org.apache.lucene.index.FilterDirectoryReader} that exposes @@ -51,11 +52,14 @@ public final class OpenSearchDirectoryReader extends FilterDirectoryReader { private final ShardId shardId; private final FilterDirectoryReader.SubReaderWrapper wrapper; + private DelegatingCacheHelper delegatingCacheHelper; + private OpenSearchDirectoryReader(DirectoryReader in, FilterDirectoryReader.SubReaderWrapper wrapper, ShardId shardId) throws IOException { super(in, wrapper); this.wrapper = wrapper; this.shardId = shardId; + this.delegatingCacheHelper = new DelegatingCacheHelper(in.getReaderCacheHelper()); } /** @@ -68,7 +72,53 @@ public ShardId shardId() { @Override public CacheHelper getReaderCacheHelper() { // safe to delegate since this reader does not alter the index - return in.getReaderCacheHelper(); + return this.delegatingCacheHelper; + } + + public DelegatingCacheHelper getDelegatingCacheHelper() { + return this.delegatingCacheHelper; + } + + public class DelegatingCacheHelper implements CacheHelper { + CacheHelper cacheHelper; + DelegatingCacheKey serializableCacheKey; + + DelegatingCacheHelper(CacheHelper cacheHelper) { + this.cacheHelper = cacheHelper; + this.serializableCacheKey = new DelegatingCacheKey(cacheHelper.getKey()); + } + + @Override + public CacheKey getKey() { + return this.cacheHelper.getKey(); + } + + public DelegatingCacheKey getDelegatingCacheKey() { + return this.serializableCacheKey; + } + + @Override + public void addClosedListener(ClosedListener listener) { + this.cacheHelper.addClosedListener(listener); + } + } + + public class DelegatingCacheKey { + CacheKey cacheKey; + private final UUID uniqueId; + + DelegatingCacheKey(CacheKey cacheKey) { + this.cacheKey = cacheKey; + this.uniqueId = UUID.randomUUID(); + } + + public CacheKey getCacheKey() { + return this.cacheKey; + } + + public UUID getId() { + return uniqueId; + } } @Override diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 629cea102a8b2..25a067bdc8d0c 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -51,6 +51,9 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.common.unit.ByteSizeValue; import java.io.Closeable; @@ -108,8 +111,9 @@ public final class IndicesRequestCache implements RemovalListener cache; + private final IndicesService indicesService; - IndicesRequestCache(Settings settings) { + IndicesRequestCache(Settings settings, IndicesService indicesService) { this.size = INDICES_CACHE_QUERY_SIZE.get(settings); this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null; long sizeInBytes = size.getBytes(); @@ -121,6 +125,7 @@ public final class IndicesRequestCache implements RemovalListener { + protected static class Loader implements CacheLoader { private final CacheEntity entity; private final CheckedSupplier loader; @@ -207,7 +225,7 @@ public BytesReference load(Key key) throws Exception { /** * Basic interface to make this cache testable. */ - interface CacheEntity extends Accountable { + interface CacheEntity extends Accountable, Writeable { /** * Called after the value was loaded. @@ -240,6 +258,7 @@ interface CacheEntity extends Accountable { * Called when this entity instance is removed */ void onRemoval(RemovalNotification notification); + } /** @@ -247,17 +266,23 @@ interface CacheEntity extends Accountable { * * @opensearch.internal */ - static class Key implements Accountable { - private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class); + class Key implements Accountable, Writeable { + private final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class); public final CacheEntity entity; // use as identity equality - public final IndexReader.CacheKey readerCacheKey; + public final String readerCacheKeyUniqueId; public final BytesReference value; - Key(CacheEntity entity, IndexReader.CacheKey readerCacheKey, BytesReference value) { + Key(CacheEntity entity, BytesReference value, String readerCacheKeyUniqueId) { this.entity = entity; - this.readerCacheKey = Objects.requireNonNull(readerCacheKey); this.value = value; + this.readerCacheKeyUniqueId = Objects.requireNonNull(readerCacheKeyUniqueId); + } + + Key(StreamInput in) throws IOException { + this.entity = in.readOptionalWriteable(in1 -> indicesService.new IndexShardCacheEntity(in1)); + this.readerCacheKeyUniqueId = in.readOptionalString(); + this.value = in.readBytesReference(); } @Override @@ -276,7 +301,7 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Key key = (Key) o; - if (Objects.equals(readerCacheKey, key.readerCacheKey) == false) return false; + if (Objects.equals(readerCacheKeyUniqueId, key.readerCacheKeyUniqueId) == false) return false; if (!entity.getCacheIdentity().equals(key.entity.getCacheIdentity())) return false; if (!value.equals(key.value)) return false; return true; @@ -285,19 +310,26 @@ public boolean equals(Object o) { @Override public int hashCode() { int result = entity.getCacheIdentity().hashCode(); - result = 31 * result + readerCacheKey.hashCode(); + result = 31 * result + readerCacheKeyUniqueId.hashCode(); result = 31 * result + value.hashCode(); return result; } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalWriteable(entity); + out.writeOptionalString(readerCacheKeyUniqueId); + out.writeBytesReference(value); + } } private class CleanupKey implements IndexReader.ClosedListener { final CacheEntity entity; - final IndexReader.CacheKey readerCacheKey; + final String readerCacheKeyUniqueId; - private CleanupKey(CacheEntity entity, IndexReader.CacheKey readerCacheKey) { + private CleanupKey(CacheEntity entity, String readerCacheKeyUniqueId) { this.entity = entity; - this.readerCacheKey = readerCacheKey; + this.readerCacheKeyUniqueId = readerCacheKeyUniqueId; } @Override @@ -315,7 +347,7 @@ public boolean equals(Object o) { return false; } CleanupKey that = (CleanupKey) o; - if (Objects.equals(readerCacheKey, that.readerCacheKey) == false) return false; + if (Objects.equals(readerCacheKeyUniqueId, that.readerCacheKeyUniqueId) == false) return false; if (!entity.getCacheIdentity().equals(that.entity.getCacheIdentity())) return false; return true; } @@ -323,7 +355,7 @@ public boolean equals(Object o) { @Override public int hashCode() { int result = entity.getCacheIdentity().hashCode(); - result = 31 * result + Objects.hashCode(readerCacheKey); + result = 31 * result + Objects.hashCode(readerCacheKeyUniqueId); return result; } } @@ -336,7 +368,7 @@ synchronized void cleanCache() { for (Iterator iterator = keysToClean.iterator(); iterator.hasNext();) { CleanupKey cleanupKey = iterator.next(); iterator.remove(); - if (cleanupKey.readerCacheKey == null || cleanupKey.entity.isOpen() == false) { + if (cleanupKey.readerCacheKeyUniqueId == null || cleanupKey.entity.isOpen() == false) { // null indicates full cleanup, as does a closed shard currentFullClean.add(cleanupKey.entity.getCacheIdentity()); } else { @@ -349,7 +381,7 @@ synchronized void cleanCache() { if (currentFullClean.contains(key.entity.getCacheIdentity())) { iterator.remove(); } else { - if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerCacheKey))) { + if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerCacheKeyUniqueId))) { iterator.remove(); } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index a72142e65c5e8..f5e71327b6e7b 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -391,7 +391,7 @@ public IndicesService( this.shardsClosedTimeout = settings.getAsTime(INDICES_SHARDS_CLOSED_TIMEOUT, new TimeValue(1, TimeUnit.DAYS)); this.analysisRegistry = analysisRegistry; this.indexNameExpressionResolver = indexNameExpressionResolver; - this.indicesRequestCache = new IndicesRequestCache(settings); + this.indicesRequestCache = new IndicesRequestCache(settings, this); this.indicesQueryCache = new IndicesQueryCache(settings); this.mapperRegistry = mapperRegistry; this.namedWriteableRegistry = namedWriteableRegistry; @@ -1746,14 +1746,21 @@ private BytesReference cacheShardLevelResult( * * @opensearch.internal */ - static final class IndexShardCacheEntity extends AbstractIndexShardCacheEntity { - private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(IndexShardCacheEntity.class); + public final class IndexShardCacheEntity extends AbstractIndexShardCacheEntity { + private final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(IndexShardCacheEntity.class); private final IndexShard indexShard; - protected IndexShardCacheEntity(IndexShard indexShard) { + public IndexShardCacheEntity(IndexShard indexShard) { this.indexShard = indexShard; } + public IndexShardCacheEntity(StreamInput in) throws IOException { + Index index = in.readOptionalWriteable(Index::new); + int shardId = in.readVInt(); + IndexService indexService = indices.get(index.getUUID()); + this.indexShard = Optional.ofNullable(indexService).map(indexService1 -> indexService1.getShard(shardId)).orElse(null); + } + @Override protected ShardRequestCache stats() { return indexShard.requestCache(); @@ -1775,6 +1782,12 @@ public long ramBytesUsed() { // across many entities return BASE_RAM_BYTES_USED; } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalWriteable(indexShard.shardId().getIndex()); + out.writeVInt(indexShard.shardId().id()); + } } @FunctionalInterface diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 8494259c8fd8a..664865f21f3a8 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -52,23 +52,28 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.common.bytes.AbstractBytesReference; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.XContentHelper; +import org.opensearch.index.IndexService; import org.opensearch.index.cache.request.ShardRequestCache; import org.opensearch.index.query.TermQueryBuilder; -import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.test.OpenSearchSingleNodeTestCase; import java.io.IOException; import java.util.Arrays; +import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; -public class IndicesRequestCacheTests extends OpenSearchTestCase { +public class IndicesRequestCacheTests extends OpenSearchSingleNodeTestCase { public void testBasicOperationsCache() throws Exception { ShardRequestCache requestCacheStats = new ShardRequestCache(); - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -122,7 +127,7 @@ public void testBasicOperationsCache() throws Exception { } public void testCacheDifferentReaders() throws Exception { - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); AtomicBoolean indexShard = new AtomicBoolean(true); ShardRequestCache requestCacheStats = new ShardRequestCache(); Directory dir = newDirectory(); @@ -218,7 +223,7 @@ public void testCacheDifferentReaders() throws Exception { public void testEviction() throws Exception { final ByteSizeValue size; { - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); AtomicBoolean indexShard = new AtomicBoolean(true); ShardRequestCache requestCacheStats = new ShardRequestCache(); Directory dir = newDirectory(); @@ -244,7 +249,8 @@ public void testEviction() throws Exception { IOUtils.close(reader, secondReader, writer, dir, cache); } IndicesRequestCache cache = new IndicesRequestCache( - Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), size.getBytes() + 1 + "b").build() + Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), size.getBytes() + 1 + "b").build(), + null ); AtomicBoolean indexShard = new AtomicBoolean(true); ShardRequestCache requestCacheStats = new ShardRequestCache(); @@ -281,7 +287,7 @@ public void testEviction() throws Exception { } public void testClearAllEntityIdentity() throws Exception { - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); AtomicBoolean indexShard = new AtomicBoolean(true); ShardRequestCache requestCacheStats = new ShardRequestCache(); @@ -366,7 +372,7 @@ public BytesReference get() { public void testInvalidate() throws Exception { ShardRequestCache requestCacheStats = new ShardRequestCache(); - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -435,20 +441,23 @@ public void testInvalidate() throws Exception { public void testEqualsKey() throws IOException { AtomicBoolean trueBoolean = new AtomicBoolean(true); AtomicBoolean falseBoolean = new AtomicBoolean(false); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndicesRequestCache indicesRequestCache = indicesService.indicesRequestCache; Directory dir = newDirectory(); IndexWriterConfig config = newIndexWriterConfig(); IndexWriter writer = new IndexWriter(dir, config); - IndexReader reader1 = DirectoryReader.open(writer); - IndexReader.CacheKey rKey1 = reader1.getReaderCacheHelper().getKey(); + ShardId shardId = new ShardId("foo", "bar", 1); + IndexReader reader1 = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); + String rKey1 = ((OpenSearchDirectoryReader) reader1).getDelegatingCacheHelper().getDelegatingCacheKey().getId().toString(); writer.addDocument(new Document()); - IndexReader reader2 = DirectoryReader.open(writer); - IndexReader.CacheKey rKey2 = reader2.getReaderCacheHelper().getKey(); + IndexReader reader2 = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); + String rKey2 = ((OpenSearchDirectoryReader) reader2).getDelegatingCacheHelper().getDelegatingCacheKey().getId().toString(); IOUtils.close(reader1, reader2, writer, dir); - IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(1)); - IndicesRequestCache.Key key2 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(1)); - IndicesRequestCache.Key key3 = new IndicesRequestCache.Key(new TestEntity(null, falseBoolean), rKey1, new TestBytesReference(1)); - IndicesRequestCache.Key key4 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey2, new TestBytesReference(1)); - IndicesRequestCache.Key key5 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(2)); + IndicesRequestCache.Key key1 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(1), rKey1); + IndicesRequestCache.Key key2 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(1), rKey1); + IndicesRequestCache.Key key3 = indicesRequestCache.new Key(new TestEntity(null, falseBoolean), new TestBytesReference(1), rKey1); + IndicesRequestCache.Key key4 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(1), rKey2); + IndicesRequestCache.Key key5 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(2), rKey2); String s = "Some other random object"; assertEquals(key1, key1); assertEquals(key1, key2); @@ -459,6 +468,32 @@ public void testEqualsKey() throws IOException { assertNotEquals(key1, key5); } + public void testSerializationDeserializationOfCacheKey() throws Exception { + TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); + BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + ShardRequestCache shardRequestCache = new ShardRequestCache(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndicesRequestCache indicesRequestCache = indicesService.indicesRequestCache; + IndexService indexService = createIndex("test"); + IndexShard indexShard = indexService.getShard(0); + IndicesService.IndexShardCacheEntity shardCacheEntity = indicesService.new IndexShardCacheEntity(indexShard); + String readerCacheKeyId = UUID.randomUUID().toString(); + IndicesRequestCache.Key key1 = indicesRequestCache.new Key(shardCacheEntity, termBytes, readerCacheKeyId); + BytesReference bytesReference = null; + try (BytesStreamOutput out = new BytesStreamOutput()) { + key1.writeTo(out); + bytesReference = out.bytes(); + } + StreamInput in = bytesReference.streamInput(); + + IndicesRequestCache.Key key2 = indicesRequestCache.new Key(in); + + assertEquals(readerCacheKeyId, key2.readerCacheKeyUniqueId); + assertEquals(shardCacheEntity.getCacheIdentity(), key2.entity.getCacheIdentity()); + assertEquals(termBytes, key2.value); + + } + private class TestBytesReference extends AbstractBytesReference { int dummyValue; @@ -538,5 +573,8 @@ public Object getCacheIdentity() { public long ramBytesUsed() { return 42; } + + @Override + public void writeTo(StreamOutput out) throws IOException {} } } diff --git a/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java b/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java index 415844dccb611..364c7a94cad54 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java @@ -41,6 +41,7 @@ import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexModule; @@ -59,6 +60,7 @@ import org.opensearch.test.hamcrest.OpenSearchAssertions; import org.opensearch.transport.nio.MockNioTransportPlugin; +import java.io.IOException; import java.nio.file.Path; import java.util.Arrays; import java.util.Collections; @@ -315,6 +317,11 @@ public void testCloseWhileOngoingRequestUsesRequestCache() throws Exception { assertEquals(0L, cache.count()); IndicesRequestCache.CacheEntity cacheEntity = new IndicesRequestCache.CacheEntity() { + @Override + public void writeTo(StreamOutput out) throws IOException { + + } + @Override public long ramBytesUsed() { return 42; From db3d61d3418f7c12f3099bb22eaee3ed3c5edf6b Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Fri, 29 Sep 2023 10:28:53 -0700 Subject: [PATCH 02/12] Fixing javadoc issue Signed-off-by: Sagar Upadhyaya --- .../common/lucene/index/OpenSearchDirectoryReader.java | 8 ++++++++ .../java/org/opensearch/indices/IndicesRequestCache.java | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java b/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java index e5038436012dd..b2e21d2076cbb 100644 --- a/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java +++ b/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java @@ -79,6 +79,10 @@ public DelegatingCacheHelper getDelegatingCacheHelper() { return this.delegatingCacheHelper; } + /** + * Wraps existing IndexReader cache helper which internally provides a way to wrap CacheKey. + * @opensearch.internal + */ public class DelegatingCacheHelper implements CacheHelper { CacheHelper cacheHelper; DelegatingCacheKey serializableCacheKey; @@ -103,6 +107,10 @@ public void addClosedListener(ClosedListener listener) { } } + /** + * Wraps internal IndexReader.CacheKey and attaches a uniqueId to it which can be eventually be used instead of + * object itself for serialization purposes. + */ public class DelegatingCacheKey { CacheKey cacheKey; private final UUID uniqueId; diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 25a067bdc8d0c..e1d18b1172865 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -198,7 +198,7 @@ void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference * * @opensearch.internal */ - protected static class Loader implements CacheLoader { + private static class Loader implements CacheLoader { private final CacheEntity entity; private final CheckedSupplier loader; From 29c5375457f015235e00b671d62cf732eece6d81 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Mon, 9 Oct 2023 08:34:15 -0700 Subject: [PATCH 03/12] Tiered caching framework draft changes --- .../org/opensearch/common/cache/Cache.java | 112 ++++++------ .../common/cache/RemovalNotification.java | 12 ++ .../cache/request/ShardRequestCache.java | 48 +++-- .../AbstractIndexShardCacheEntity.java | 19 +- .../org/opensearch/indices/CachingTier.java | 39 ++++ .../opensearch/indices/DiskCachingTier.java | 13 ++ .../indices/DummyDiskCachingTier.java | 58 ++++++ .../indices/IndicesRequestCache.java | 92 ++++++---- .../opensearch/indices/OnHeapCachingTier.java | 11 ++ .../indices/OpenSearchOnHeapCache.java | 119 ++++++++++++ .../java/org/opensearch/indices/TierType.java | 15 ++ .../indices/TieredCacheEventListener.java | 22 +++ .../indices/TieredCacheHandler.java | 24 +++ .../opensearch/indices/TieredCacheLoader.java | 15 ++ .../TieredCacheSpilloverStrategyHandler.java | 172 ++++++++++++++++++ .../indices/IndicesServiceCloseTests.java | 6 +- 16 files changed, 661 insertions(+), 116 deletions(-) create mode 100644 server/src/main/java/org/opensearch/indices/CachingTier.java create mode 100644 server/src/main/java/org/opensearch/indices/DiskCachingTier.java create mode 100644 server/src/main/java/org/opensearch/indices/DummyDiskCachingTier.java create mode 100644 server/src/main/java/org/opensearch/indices/OnHeapCachingTier.java create mode 100644 server/src/main/java/org/opensearch/indices/OpenSearchOnHeapCache.java create mode 100644 server/src/main/java/org/opensearch/indices/TierType.java create mode 100644 server/src/main/java/org/opensearch/indices/TieredCacheEventListener.java create mode 100644 server/src/main/java/org/opensearch/indices/TieredCacheHandler.java create mode 100644 server/src/main/java/org/opensearch/indices/TieredCacheLoader.java create mode 100644 server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java diff --git a/server/src/main/java/org/opensearch/common/cache/Cache.java b/server/src/main/java/org/opensearch/common/cache/Cache.java index 0ebef1556424b..c7c28e13496e5 100644 --- a/server/src/main/java/org/opensearch/common/cache/Cache.java +++ b/server/src/main/java/org/opensearch/common/cache/Cache.java @@ -422,68 +422,74 @@ public V computeIfAbsent(K key, CacheLoader loader) throws ExecutionExcept } }); if (value == null) { - // we need to synchronize loading of a value for a given key; however, holding the segment lock while - // invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we - // need a mechanism to ensure that load is invoked at most once, but we are not invoking load while holding - // the segment lock; to do this, we atomically put a future in the map that can load the value, and then - // get the value from this future on the thread that won the race to place the future into the segment map - CacheSegment segment = getCacheSegment(key); - CompletableFuture> future; - CompletableFuture> completableFuture = new CompletableFuture<>(); + value = compute(key, loader); + } + return value; + } - try (ReleasableLock ignored = segment.writeLock.acquire()) { - future = segment.map.putIfAbsent(key, completableFuture); - } + public V compute(K key, CacheLoader loader) throws ExecutionException { + long now = now(); + // we need to synchronize loading of a value for a given key; however, holding the segment lock while + // invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we + // need a mechanism to ensure that load is invoked at most once, but we are not invoking load while holding + // the segment lock; to do this, we atomically put a future in the map that can load the value, and then + // get the value from this future on the thread that won the race to place the future into the segment map + CacheSegment segment = getCacheSegment(key); + CompletableFuture> future; + CompletableFuture> completableFuture = new CompletableFuture<>(); - BiFunction, Throwable, ? extends V> handler = (ok, ex) -> { - if (ok != null) { - try (ReleasableLock ignored = lruLock.acquire()) { - promote(ok, now); - } - return ok.value; - } else { - try (ReleasableLock ignored = segment.writeLock.acquire()) { - CompletableFuture> sanity = segment.map.get(key); - if (sanity != null && sanity.isCompletedExceptionally()) { - segment.map.remove(key); - } - } - return null; - } - }; + try (ReleasableLock ignored = segment.writeLock.acquire()) { + future = segment.map.putIfAbsent(key, completableFuture); + } - CompletableFuture completableValue; - if (future == null) { - future = completableFuture; - completableValue = future.handle(handler); - V loaded; - try { - loaded = loader.load(key); - } catch (Exception e) { - future.completeExceptionally(e); - throw new ExecutionException(e); - } - if (loaded == null) { - NullPointerException npe = new NullPointerException("loader returned a null value"); - future.completeExceptionally(npe); - throw new ExecutionException(npe); - } else { - future.complete(new Entry<>(key, loaded, now)); + BiFunction, Throwable, ? extends V> handler = (ok, ex) -> { + if (ok != null) { + try (ReleasableLock ignored = lruLock.acquire()) { + promote(ok, now); } + return ok.value; } else { - completableValue = future.handle(handler); + try (ReleasableLock ignored = segment.writeLock.acquire()) { + CompletableFuture> sanity = segment.map.get(key); + if (sanity != null && sanity.isCompletedExceptionally()) { + segment.map.remove(key); + } + } + return null; } + }; + CompletableFuture completableValue; + if (future == null) { + future = completableFuture; + completableValue = future.handle(handler); + V loaded; try { - value = completableValue.get(); - // check to ensure the future hasn't been completed with an exception - if (future.isCompletedExceptionally()) { - future.get(); // call get to force the exception to be thrown for other concurrent callers - throw new IllegalStateException("the future was completed exceptionally but no exception was thrown"); - } - } catch (InterruptedException e) { - throw new IllegalStateException(e); + loaded = loader.load(key); + } catch (Exception e) { + future.completeExceptionally(e); + throw new ExecutionException(e); } + if (loaded == null) { + NullPointerException npe = new NullPointerException("loader returned a null value"); + future.completeExceptionally(npe); + throw new ExecutionException(npe); + } else { + future.complete(new Entry<>(key, loaded, now)); + } + } else { + completableValue = future.handle(handler); + } + V value; + try { + value = completableValue.get(); + // check to ensure the future hasn't been completed with an exception + if (future.isCompletedExceptionally()) { + future.get(); // call get to force the exception to be thrown for other concurrent callers + throw new IllegalStateException("the future was completed exceptionally but no exception was thrown"); + } + } catch (InterruptedException e) { + throw new IllegalStateException(e); } return value; } diff --git a/server/src/main/java/org/opensearch/common/cache/RemovalNotification.java b/server/src/main/java/org/opensearch/common/cache/RemovalNotification.java index 6d355b2122460..71e240064c6ae 100644 --- a/server/src/main/java/org/opensearch/common/cache/RemovalNotification.java +++ b/server/src/main/java/org/opensearch/common/cache/RemovalNotification.java @@ -32,6 +32,8 @@ package org.opensearch.common.cache; +import org.opensearch.indices.TierType; + /** * Notification when an element is removed from the cache * @@ -42,11 +44,17 @@ public class RemovalNotification { private final K key; private final V value; private final RemovalReason removalReason; + private final TierType tierType; public RemovalNotification(K key, V value, RemovalReason removalReason) { + this(key, value, removalReason, TierType.ON_HEAP); + } + + public RemovalNotification(K key, V value, RemovalReason removalReason, TierType tierType) { this.key = key; this.value = value; this.removalReason = removalReason; + this.tierType = tierType; } public K getKey() { @@ -60,4 +68,8 @@ public V getValue() { public RemovalReason getRemovalReason() { return removalReason; } + + public TierType getTierType() { + return tierType; + } } diff --git a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java index b13eec79c2be8..1beef5217355f 100644 --- a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java +++ b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java @@ -35,6 +35,9 @@ import org.apache.lucene.util.Accountable; import org.opensearch.common.metrics.CounterMetric; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.indices.TierType; + +import java.util.EnumMap; /** * Tracks the portion of the request cache in use for a particular shard. @@ -43,30 +46,39 @@ */ public final class ShardRequestCache { - final CounterMetric evictionsMetric = new CounterMetric(); - final CounterMetric totalMetric = new CounterMetric(); - final CounterMetric hitCount = new CounterMetric(); - final CounterMetric missCount = new CounterMetric(); + private EnumMap statsHolder = new EnumMap<>(TierType.class); + + public ShardRequestCache() { + for (TierType tierType : TierType.values()) { + statsHolder.put(tierType, new StatsHolder()); + } + } public RequestCacheStats stats() { - return new RequestCacheStats(totalMetric.count(), evictionsMetric.count(), hitCount.count(), missCount.count()); + // TODO: Change RequestCacheStats to support disk tier stats. + return new RequestCacheStats( + statsHolder.get(TierType.ON_HEAP).totalMetric.count(), + statsHolder.get(TierType.ON_HEAP).evictionsMetric.count(), + statsHolder.get(TierType.ON_HEAP).hitCount.count(), + statsHolder.get(TierType.ON_HEAP).missCount.count() + ); } - public void onHit() { - hitCount.inc(); + public void onHit(TierType tierType) { + statsHolder.get(tierType).hitCount.inc(); } - public void onMiss() { - missCount.inc(); + public void onMiss(TierType tierType) { + statsHolder.get(tierType).missCount.inc(); } - public void onCached(Accountable key, BytesReference value) { - totalMetric.inc(key.ramBytesUsed() + value.ramBytesUsed()); + public void onCached(Accountable key, BytesReference value, TierType tierType) { + statsHolder.get(tierType).totalMetric.inc(key.ramBytesUsed() + value.ramBytesUsed()); } - public void onRemoval(Accountable key, BytesReference value, boolean evicted) { + public void onRemoval(Accountable key, BytesReference value, boolean evicted, TierType tierType) { if (evicted) { - evictionsMetric.inc(); + statsHolder.get(tierType).evictionsMetric.inc(); } long dec = 0; if (key != null) { @@ -75,6 +87,14 @@ public void onRemoval(Accountable key, BytesReference value, boolean evicted) { if (value != null) { dec += value.ramBytesUsed(); } - totalMetric.dec(dec); + statsHolder.get(tierType).totalMetric.dec(dec); + } + + static class StatsHolder { + + final CounterMetric evictionsMetric = new CounterMetric(); + final CounterMetric totalMetric = new CounterMetric(); + final CounterMetric hitCount = new CounterMetric(); + final CounterMetric missCount = new CounterMetric(); } } diff --git a/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java b/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java index bb1201cb910a9..2eef16df2bb9a 100644 --- a/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java +++ b/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java @@ -51,22 +51,27 @@ abstract class AbstractIndexShardCacheEntity implements IndicesRequestCache.Cach protected abstract ShardRequestCache stats(); @Override - public final void onCached(IndicesRequestCache.Key key, BytesReference value) { - stats().onCached(key, value); + public final void onCached(IndicesRequestCache.Key key, BytesReference value, TierType tierType) { + stats().onCached(key, value, tierType); } @Override - public final void onHit() { - stats().onHit(); + public final void onHit(TierType tierType) { + stats().onHit(tierType); } @Override - public final void onMiss() { - stats().onMiss(); + public final void onMiss(TierType tierType) { + stats().onMiss(tierType); } @Override public final void onRemoval(RemovalNotification notification) { - stats().onRemoval(notification.getKey(), notification.getValue(), notification.getRemovalReason() == RemovalReason.EVICTED); + stats().onRemoval( + notification.getKey(), + notification.getValue(), + notification.getRemovalReason() == RemovalReason.EVICTED, + notification.getTierType() + ); } } diff --git a/server/src/main/java/org/opensearch/indices/CachingTier.java b/server/src/main/java/org/opensearch/indices/CachingTier.java new file mode 100644 index 0000000000000..85596929cfd6b --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/CachingTier.java @@ -0,0 +1,39 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +import org.opensearch.common.cache.RemovalListener; + +/** + * asdsadssa + * @param + * @param + */ +public interface CachingTier { + + V get(K key); + + void put(K key, V value); + + V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception; + + void invalidate(K key); + + V compute(K key, TieredCacheLoader loader) throws Exception; + + void setRemovalListener(RemovalListener removalListener); + + void invalidateAll(); + + Iterable keys(); + + int count(); + + TierType getTierType(); +} diff --git a/server/src/main/java/org/opensearch/indices/DiskCachingTier.java b/server/src/main/java/org/opensearch/indices/DiskCachingTier.java new file mode 100644 index 0000000000000..efd9a459cd338 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/DiskCachingTier.java @@ -0,0 +1,13 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +public interface DiskCachingTier extends CachingTier { + +} diff --git a/server/src/main/java/org/opensearch/indices/DummyDiskCachingTier.java b/server/src/main/java/org/opensearch/indices/DummyDiskCachingTier.java new file mode 100644 index 0000000000000..26a78b6c61920 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/DummyDiskCachingTier.java @@ -0,0 +1,58 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +import org.opensearch.common.cache.RemovalListener; + +import java.util.Collections; + +public class DummyDiskCachingTier implements CachingTier { + + @Override + public V get(K key) { + return null; + } + + @Override + public void put(K key, V value) {} + + @Override + public V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception { + return null; + } + + @Override + public void invalidate(K key) {} + + @Override + public V compute(K key, TieredCacheLoader loader) throws Exception { + return null; + } + + @Override + public void setRemovalListener(RemovalListener removalListener) {} + + @Override + public void invalidateAll() {} + + @Override + public Iterable keys() { + return Collections::emptyIterator; + } + + @Override + public int count() { + return 0; + } + + @Override + public TierType getTierType() { + return TierType.DISK; + } +} diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index e1d18b1172865..279b6e38d6f9e 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -39,10 +39,6 @@ import org.apache.lucene.util.Accountable; import org.apache.lucene.util.RamUsageEstimator; import org.opensearch.common.CheckedSupplier; -import org.opensearch.common.cache.Cache; -import org.opensearch.common.cache.CacheBuilder; -import org.opensearch.common.cache.CacheLoader; -import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Setting; @@ -81,7 +77,7 @@ * * @opensearch.internal */ -public final class IndicesRequestCache implements RemovalListener, Closeable { +public final class IndicesRequestCache implements TieredCacheEventListener, Closeable { private static final Logger logger = LogManager.getLogger(IndicesRequestCache.class); @@ -110,27 +106,35 @@ public final class IndicesRequestCache implements RemovalListener keysToClean = ConcurrentCollections.newConcurrentSet(); private final ByteSizeValue size; private final TimeValue expire; - private final Cache cache; - private final IndicesService indicesService; + // private final Cache cache; + + private final TieredCacheHandler tieredCacheHandler; IndicesRequestCache(Settings settings, IndicesService indicesService) { this.size = INDICES_CACHE_QUERY_SIZE.get(settings); this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null; long sizeInBytes = size.getBytes(); - CacheBuilder cacheBuilder = CacheBuilder.builder() - .setMaximumWeight(sizeInBytes) - .weigher((k, v) -> k.ramBytesUsed() + v.ramBytesUsed()) - .removalListener(this); - if (expire != null) { - cacheBuilder.setExpireAfterAccess(expire); - } - cache = cacheBuilder.build(); - this.indicesService = indicesService; + // CacheBuilder cacheBuilder = CacheBuilder.builder() + // .setMaximumWeight(sizeInBytes) + // .weigher((k, v) -> k.ramBytesUsed() + v.ramBytesUsed()); + // //.removalListener(this); + // if (expire != null) { + // cacheBuilder.setExpireAfterAccess(expire); + // } + // cache = cacheBuilder.build(); + + OnHeapCachingTier openSearchOnHeapCache = new OpenSearchOnHeapCache.Builder().setWeigher( + (k, v) -> k.ramBytesUsed() + v.ramBytesUsed() + ).setMaximumWeight(sizeInBytes).setExpireAfterAccess(expire).build(); + + tieredCacheHandler = new TieredCacheSpilloverStrategyHandler.Builder().setOnHeapCachingTier( + openSearchOnHeapCache + ).setOnDiskCachingTier(new DummyDiskCachingTier<>()).setTieredCacheEventListener(this).build(); } @Override public void close() { - cache.invalidateAll(); + tieredCacheHandler.invalidateAll(); } void clear(CacheEntity entity) { @@ -138,11 +142,26 @@ void clear(CacheEntity entity) { cleanCache(); } + @Override + public void onMiss(Key key, TierType tierType) { + key.entity.onMiss(tierType); + } + @Override public void onRemoval(RemovalNotification notification) { notification.getKey().entity.onRemoval(notification); } + @Override + public void onHit(Key key, BytesReference value, TierType tierType) { + key.entity.onHit(tierType); + } + + @Override + public void onCached(Key key, BytesReference value, TierType tierType) { + key.entity.onCached(key, value, tierType); + } + BytesReference getOrCompute( CacheEntity cacheEntity, CheckedSupplier loader, @@ -158,9 +177,9 @@ BytesReference getOrCompute( assert readerCacheKeyUniqueId != null; final Key key = new Key(cacheEntity, cacheKey, readerCacheKeyUniqueId); Loader cacheLoader = new Loader(cacheEntity, loader); - BytesReference value = cache.computeIfAbsent(key, cacheLoader); + BytesReference value = tieredCacheHandler.computeIfAbsent(key, cacheLoader); if (cacheLoader.isLoaded()) { - key.entity.onMiss(); + // key.entity.onMiss(); // see if its the first time we see this reader, and make sure to register a cleanup key CleanupKey cleanupKey = new CleanupKey(cacheEntity, readerCacheKeyUniqueId); if (!registeredClosedListeners.containsKey(cleanupKey)) { @@ -169,9 +188,10 @@ BytesReference getOrCompute( OpenSearchDirectoryReader.addReaderCloseListener(reader, cleanupKey); } } - } else { - key.entity.onHit(); } + // else { + // key.entity.onHit(); + // } return value; } @@ -183,14 +203,8 @@ BytesReference getOrCompute( */ void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) { assert reader.getReaderCacheHelper() != null; - String readerCacheKeyUniqueId = null; - if (reader instanceof OpenSearchDirectoryReader) { - IndexReader.CacheHelper cacheHelper = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper(); - readerCacheKeyUniqueId = ((OpenSearchDirectoryReader.DelegatingCacheHelper) cacheHelper).getDelegatingCacheKey() - .getId() - .toString(); - } - cache.invalidate(new Key(cacheEntity, cacheKey, readerCacheKeyUniqueId)); + tieredCacheHandler.invalidate(new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey)); + // cache.invalidate(new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey)); } /** @@ -198,7 +212,7 @@ void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference * * @opensearch.internal */ - private static class Loader implements CacheLoader { + private static class Loader implements org.opensearch.indices.TieredCacheLoader { private final CacheEntity entity; private final CheckedSupplier loader; @@ -216,7 +230,7 @@ public boolean isLoaded() { @Override public BytesReference load(Key key) throws Exception { BytesReference value = loader.get(); - entity.onCached(key, value); + // entity.onCached(key, value); loaded = true; return value; } @@ -230,7 +244,7 @@ interface CacheEntity extends Accountable, Writeable { /** * Called after the value was loaded. */ - void onCached(Key key, BytesReference value); + void onCached(Key key, BytesReference value, TierType tierType); /** * Returns true iff the resource behind this entity is still open ie. @@ -247,12 +261,12 @@ interface CacheEntity extends Accountable, Writeable { /** * Called each time this entity has a cache hit. */ - void onHit(); + void onHit(TierType tierType); /** * Called each time this entity has a cache miss. */ - void onMiss(); + void onMiss(TierType tierType); /** * Called when this entity instance is removed @@ -376,7 +390,7 @@ synchronized void cleanCache() { } } if (!currentKeysToClean.isEmpty() || !currentFullClean.isEmpty()) { - for (Iterator iterator = cache.keys().iterator(); iterator.hasNext();) { + for (Iterator iterator = tieredCacheHandler.getOnHeapCachingTier().keys().iterator(); iterator.hasNext();) { Key key = iterator.next(); if (currentFullClean.contains(key.entity.getCacheIdentity())) { iterator.remove(); @@ -387,15 +401,15 @@ synchronized void cleanCache() { } } } - - cache.refresh(); + // TODO + // cache.refresh(); } /** * Returns the current size of the cache */ - int count() { - return cache.count(); + long count() { + return tieredCacheHandler.count(); } int numRegisteredCloseListeners() { // for testing diff --git a/server/src/main/java/org/opensearch/indices/OnHeapCachingTier.java b/server/src/main/java/org/opensearch/indices/OnHeapCachingTier.java new file mode 100644 index 0000000000000..ea8bd79e1d445 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/OnHeapCachingTier.java @@ -0,0 +1,11 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +public interface OnHeapCachingTier extends CachingTier {} diff --git a/server/src/main/java/org/opensearch/indices/OpenSearchOnHeapCache.java b/server/src/main/java/org/opensearch/indices/OpenSearchOnHeapCache.java new file mode 100644 index 0000000000000..cb250edfd7a24 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/OpenSearchOnHeapCache.java @@ -0,0 +1,119 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +import org.opensearch.common.cache.Cache; +import org.opensearch.common.cache.CacheBuilder; +import org.opensearch.common.cache.RemovalListener; +import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.unit.TimeValue; + +import java.util.concurrent.ExecutionException; +import java.util.function.ToLongBiFunction; + +public class OpenSearchOnHeapCache implements OnHeapCachingTier, RemovalListener { + + private final Cache cache; + private RemovalListener removalListener; + + private OpenSearchOnHeapCache(long maxWeightInBytes, ToLongBiFunction weigher, TimeValue expireAfterAcess) { + CacheBuilder cacheBuilder = CacheBuilder.builder() + .setMaximumWeight(maxWeightInBytes) + .weigher(weigher) + .removalListener(this); + if (expireAfterAcess != null) { + cacheBuilder.setExpireAfterAccess(expireAfterAcess); + } + cache = cacheBuilder.build(); + } + + @Override + public void setRemovalListener(RemovalListener removalListener) { + this.removalListener = removalListener; + } + + @Override + public void invalidateAll() { + + } + + @Override + public Iterable keys() { + return this.cache.keys(); + } + + @Override + public int count() { + return cache.count(); + } + + @Override + public TierType getTierType() { + return TierType.ON_HEAP; + } + + @Override + public V get(K key) { + return cache.get(key); + } + + @Override + public void put(K key, V value) { + cache.put(key, value); + } + + @Override + public V computeIfAbsent(K key, TieredCacheLoader loader) throws ExecutionException { + return cache.computeIfAbsent(key, key1 -> loader.load(key)); + } + + @Override + public void invalidate(K key) { + cache.invalidate(key); + } + + @Override + public V compute(K key, TieredCacheLoader loader) throws Exception { + return cache.compute(key, key1 -> loader.load(key)); + } + + @Override + public void onRemoval(RemovalNotification notification) { + removalListener.onRemoval(notification); + } + + public static class Builder { + private long maxWeightInBytes; + + private ToLongBiFunction weigher; + + private TimeValue expireAfterAcess; + + public Builder() {} + + public Builder setMaximumWeight(long sizeInBytes) { + this.maxWeightInBytes = sizeInBytes; + return this; + } + + public Builder setWeigher(ToLongBiFunction weigher) { + this.weigher = weigher; + return this; + } + + public Builder setExpireAfterAccess(TimeValue expireAfterAcess) { + this.expireAfterAcess = expireAfterAcess; + return this; + } + + public OpenSearchOnHeapCache build() { + return new OpenSearchOnHeapCache(maxWeightInBytes, weigher, expireAfterAcess); + } + } +} diff --git a/server/src/main/java/org/opensearch/indices/TierType.java b/server/src/main/java/org/opensearch/indices/TierType.java new file mode 100644 index 0000000000000..9a286fd26151b --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/TierType.java @@ -0,0 +1,15 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +public enum TierType { + + ON_HEAP, + DISK; +} diff --git a/server/src/main/java/org/opensearch/indices/TieredCacheEventListener.java b/server/src/main/java/org/opensearch/indices/TieredCacheEventListener.java new file mode 100644 index 0000000000000..084ac5a57e0d3 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/TieredCacheEventListener.java @@ -0,0 +1,22 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +import org.opensearch.common.cache.RemovalNotification; + +public interface TieredCacheEventListener { + + void onMiss(K key, TierType tierType); + + void onRemoval(RemovalNotification notification); + + void onHit(K key, V value, TierType tierType); + + void onCached(K key, V value, TierType tierType); +} diff --git a/server/src/main/java/org/opensearch/indices/TieredCacheHandler.java b/server/src/main/java/org/opensearch/indices/TieredCacheHandler.java new file mode 100644 index 0000000000000..5fe41f5adce94 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/TieredCacheHandler.java @@ -0,0 +1,24 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +public interface TieredCacheHandler { + + V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception; + + V get(K key); + + void invalidate(K key); + + void invalidateAll(); + + long count(); + + CachingTier getOnHeapCachingTier(); +} diff --git a/server/src/main/java/org/opensearch/indices/TieredCacheLoader.java b/server/src/main/java/org/opensearch/indices/TieredCacheLoader.java new file mode 100644 index 0000000000000..f6bb1a74e973e --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/TieredCacheLoader.java @@ -0,0 +1,15 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +public interface TieredCacheLoader { + V load(K key) throws Exception; + + boolean isLoaded(); +} diff --git a/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java b/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java new file mode 100644 index 0000000000000..1047f8a6dc2cc --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java @@ -0,0 +1,172 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +import org.opensearch.common.cache.RemovalListener; +import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.RemovalReason; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.function.Function; + +/** + * + * @param + * @param + */ +public class TieredCacheSpilloverStrategyHandler implements TieredCacheHandler, RemovalListener { + + private final OnHeapCachingTier onHeapCachingTier; + private final CachingTier diskCachingTier; + private final TieredCacheEventListener tieredCacheEventListener; + + /** + * Maintains caching tiers in order of get calls. + */ + private final List> cachingTierList; + + private TieredCacheSpilloverStrategyHandler( + OnHeapCachingTier onHeapCachingTier, + CachingTier diskCachingTier, + TieredCacheEventListener tieredCacheEventListener + ) { + this.onHeapCachingTier = Objects.requireNonNull(onHeapCachingTier); + this.diskCachingTier = Objects.requireNonNull(diskCachingTier); + this.tieredCacheEventListener = tieredCacheEventListener; + this.cachingTierList = Arrays.asList(onHeapCachingTier, diskCachingTier); + setRemovalListeners(); + } + + @Override + public V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception { + CacheValue cacheValue = getValueFromTierCache().apply(key); + if (cacheValue == null) { + // Add the value to the onHeap cache. Any items if evicted will be moved to lower tier + V value = onHeapCachingTier.compute(key, loader); + tieredCacheEventListener.onCached(key, value, TierType.ON_HEAP); + return value; + } else { + tieredCacheEventListener.onHit(key, cacheValue.value, cacheValue.source); + } + return cacheValue.value; + } + + @Override + public V get(K key) { + CacheValue cacheValue = getValueFromTierCache().apply(key); + if (cacheValue == null) { + return null; + } + return cacheValue.value; + } + + @Override + public void invalidate(K key) { + // TODO + } + + @Override + public void invalidateAll() { + for (CachingTier cachingTier : cachingTierList) { + cachingTier.invalidateAll(); + } + } + + @Override + public long count() { + long totalCount = 0; + for (CachingTier cachingTier : cachingTierList) { + totalCount += cachingTier.count(); + } + return totalCount; + } + + @Override + public void onRemoval(RemovalNotification notification) { + if (RemovalReason.EVICTED.equals(notification.getRemovalReason())) { + switch (notification.getTierType()) { + case ON_HEAP: + diskCachingTier.put(notification.getKey(), notification.getValue()); + break; + default: + break; + } + } + tieredCacheEventListener.onRemoval(notification); + } + + @Override + public CachingTier getOnHeapCachingTier() { + return this.onHeapCachingTier; + } + + private void setRemovalListeners() { + for (CachingTier cachingTier : cachingTierList) { + cachingTier.setRemovalListener(this); + } + } + + private Function> getValueFromTierCache() { + return key -> { + for (CachingTier cachingTier : cachingTierList) { + V value = cachingTier.get(key); + if (value != null) { + tieredCacheEventListener.onHit(key, value, cachingTier.getTierType()); + return new CacheValue<>(value, cachingTier.getTierType()); + } + tieredCacheEventListener.onMiss(key, cachingTier.getTierType()); + } + return null; + }; + } + + public static class CacheValue { + V value; + TierType source; + + CacheValue(V value, TierType source) { + this.value = value; + this.source = source; + } + } + + public static class Builder { + private OnHeapCachingTier onHeapCachingTier; + private CachingTier diskCachingTier; + private TieredCacheEventListener tieredCacheEventListener; + + public Builder() {} + + public Builder setOnHeapCachingTier(OnHeapCachingTier onHeapCachingTier) { + this.onHeapCachingTier = onHeapCachingTier; + return this; + } + + public Builder setOnDiskCachingTier(CachingTier diskCachingTier) { + this.diskCachingTier = diskCachingTier; + return this; + } + + public Builder setTieredCacheEventListener(TieredCacheEventListener tieredCacheEventListener) { + this.tieredCacheEventListener = tieredCacheEventListener; + return this; + } + + public TieredCacheSpilloverStrategyHandler build() { + return new TieredCacheSpilloverStrategyHandler( + this.onHeapCachingTier, + this.diskCachingTier, + this.tieredCacheEventListener + ); + } + } + +} diff --git a/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java b/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java index 364c7a94cad54..766d80a81b097 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java @@ -328,7 +328,7 @@ public long ramBytesUsed() { } @Override - public void onCached(Key key, BytesReference value) {} + public void onCached(Key key, BytesReference value, TierType tierType) {} @Override public boolean isOpen() { @@ -341,10 +341,10 @@ public Object getCacheIdentity() { } @Override - public void onHit() {} + public void onHit(TierType tierType) {} @Override - public void onMiss() {} + public void onMiss(TierType tierType) {} @Override public void onRemoval(RemovalNotification notification) {} From 502938f86962fee6cb4f2f416e6e84e2bc9125c2 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Wed, 11 Oct 2023 16:14:50 -0700 Subject: [PATCH 04/12] fixed double counting of on heap hit count --- .../opensearch/indices/TieredCacheSpilloverStrategyHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java b/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java index 1047f8a6dc2cc..b1b68bbf93f79 100644 --- a/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java +++ b/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java @@ -54,7 +54,7 @@ public V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception tieredCacheEventListener.onCached(key, value, TierType.ON_HEAP); return value; } else { - tieredCacheEventListener.onHit(key, cacheValue.value, cacheValue.source); + //tieredCacheEventListener.onHit(key, cacheValue.value, cacheValue.source); // this double counts, see line 122 } return cacheValue.value; } From f57e2b09ce046c3e572675d5d82dee477ed6899c Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Thu, 19 Oct 2023 10:20:13 -0700 Subject: [PATCH 05/12] [Tiered caching] Framework changes Signed-off-by: Sagar Upadhyaya --- .../org/opensearch/indices/CachingTier.java | 12 +- .../opensearch/indices/DiskCachingTier.java | 5 + .../indices/IndicesRequestCache.java | 39 +- .../opensearch/indices/OnHeapCachingTier.java | 5 + .../indices/OpenSearchOnHeapCache.java | 7 +- .../indices/TieredCacheService.java | 34 ++ .../TieredCacheSpilloverStrategyService.java | 231 +++++++++ ...redCacheSpilloverStrategyServiceTests.java | 458 ++++++++++++++++++ 8 files changed, 764 insertions(+), 27 deletions(-) create mode 100644 server/src/main/java/org/opensearch/indices/TieredCacheService.java create mode 100644 server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyService.java create mode 100644 server/src/test/java/org/opensearch/indices/TieredCacheSpilloverStrategyServiceTests.java diff --git a/server/src/main/java/org/opensearch/indices/CachingTier.java b/server/src/main/java/org/opensearch/indices/CachingTier.java index 85596929cfd6b..7bf7294467114 100644 --- a/server/src/main/java/org/opensearch/indices/CachingTier.java +++ b/server/src/main/java/org/opensearch/indices/CachingTier.java @@ -11,9 +11,10 @@ import org.opensearch.common.cache.RemovalListener; /** - * asdsadssa - * @param - * @param + * Caching tier interface. Can be implemented/extended by concrete classes to provide different flavors of cache like + * onHeap, disk etc. + * @param Type of key + * @param Type of value */ public interface CachingTier { @@ -36,4 +37,9 @@ public interface CachingTier { int count(); TierType getTierType(); + + /** + * Force any outstanding size-based and time-based evictions to occur + */ + default void refresh() {} } diff --git a/server/src/main/java/org/opensearch/indices/DiskCachingTier.java b/server/src/main/java/org/opensearch/indices/DiskCachingTier.java index efd9a459cd338..abc8952c0b8e2 100644 --- a/server/src/main/java/org/opensearch/indices/DiskCachingTier.java +++ b/server/src/main/java/org/opensearch/indices/DiskCachingTier.java @@ -8,6 +8,11 @@ package org.opensearch.indices; +/** + * This is specific to disk caching tier and can be used to add methods which are specific to disk tier. + * @param Type of key + * @param Type of value + */ public interface DiskCachingTier extends CachingTier { } diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 279b6e38d6f9e..0dc316311ca83 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -106,7 +106,7 @@ public final class IndicesRequestCache implements TieredCacheEventListener keysToClean = ConcurrentCollections.newConcurrentSet(); private final ByteSizeValue size; private final TimeValue expire; - // private final Cache cache; + private final TieredCacheService tieredCacheService; private final TieredCacheHandler tieredCacheHandler; @@ -114,27 +114,21 @@ public final class IndicesRequestCache implements TieredCacheEventListener cacheBuilder = CacheBuilder.builder() - // .setMaximumWeight(sizeInBytes) - // .weigher((k, v) -> k.ramBytesUsed() + v.ramBytesUsed()); - // //.removalListener(this); - // if (expire != null) { - // cacheBuilder.setExpireAfterAccess(expire); - // } - // cache = cacheBuilder.build(); + // Initialize onHeap cache tier first. OnHeapCachingTier openSearchOnHeapCache = new OpenSearchOnHeapCache.Builder().setWeigher( (k, v) -> k.ramBytesUsed() + v.ramBytesUsed() ).setMaximumWeight(sizeInBytes).setExpireAfterAccess(expire).build(); - tieredCacheHandler = new TieredCacheSpilloverStrategyHandler.Builder().setOnHeapCachingTier( + // Initialize tiered cache service. TODO: Enable Disk tier when tiered support is turned on. + tieredCacheService = new TieredCacheSpilloverStrategyService.Builder().setOnHeapCachingTier( openSearchOnHeapCache - ).setOnDiskCachingTier(new DummyDiskCachingTier<>()).setTieredCacheEventListener(this).build(); + ).setTieredCacheEventListener(this).build(); } @Override public void close() { - tieredCacheHandler.invalidateAll(); + tieredCacheService.invalidateAll(); } void clear(CacheEntity entity) { @@ -177,9 +171,8 @@ BytesReference getOrCompute( assert readerCacheKeyUniqueId != null; final Key key = new Key(cacheEntity, cacheKey, readerCacheKeyUniqueId); Loader cacheLoader = new Loader(cacheEntity, loader); - BytesReference value = tieredCacheHandler.computeIfAbsent(key, cacheLoader); + BytesReference value = tieredCacheService.computeIfAbsent(key, cacheLoader); if (cacheLoader.isLoaded()) { - // key.entity.onMiss(); // see if its the first time we see this reader, and make sure to register a cleanup key CleanupKey cleanupKey = new CleanupKey(cacheEntity, readerCacheKeyUniqueId); if (!registeredClosedListeners.containsKey(cleanupKey)) { @@ -203,8 +196,7 @@ BytesReference getOrCompute( */ void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) { assert reader.getReaderCacheHelper() != null; - tieredCacheHandler.invalidate(new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey)); - // cache.invalidate(new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey)); + tieredCacheService.invalidate(new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey)); } /** @@ -230,7 +222,6 @@ public boolean isLoaded() { @Override public BytesReference load(Key key) throws Exception { BytesReference value = loader.get(); - // entity.onCached(key, value); loaded = true; return value; } @@ -280,8 +271,8 @@ interface CacheEntity extends Accountable, Writeable { * * @opensearch.internal */ - class Key implements Accountable, Writeable { - private final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class); + public static class Key implements Accountable { + private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class); public final CacheEntity entity; // use as identity equality public final String readerCacheKeyUniqueId; @@ -374,6 +365,9 @@ public int hashCode() { } } + /** + * Logic to clean up in-memory cache. + */ synchronized void cleanCache() { final Set currentKeysToClean = new HashSet<>(); final Set currentFullClean = new HashSet<>(); @@ -390,7 +384,7 @@ synchronized void cleanCache() { } } if (!currentKeysToClean.isEmpty() || !currentFullClean.isEmpty()) { - for (Iterator iterator = tieredCacheHandler.getOnHeapCachingTier().keys().iterator(); iterator.hasNext();) { + for (Iterator iterator = tieredCacheService.getOnHeapCachingTier().keys().iterator(); iterator.hasNext();) { Key key = iterator.next(); if (currentFullClean.contains(key.entity.getCacheIdentity())) { iterator.remove(); @@ -401,15 +395,14 @@ synchronized void cleanCache() { } } } - // TODO - // cache.refresh(); + tieredCacheService.getOnHeapCachingTier().refresh(); } /** * Returns the current size of the cache */ long count() { - return tieredCacheHandler.count(); + return tieredCacheService.count(); } int numRegisteredCloseListeners() { // for testing diff --git a/server/src/main/java/org/opensearch/indices/OnHeapCachingTier.java b/server/src/main/java/org/opensearch/indices/OnHeapCachingTier.java index ea8bd79e1d445..f7d68c27c1904 100644 --- a/server/src/main/java/org/opensearch/indices/OnHeapCachingTier.java +++ b/server/src/main/java/org/opensearch/indices/OnHeapCachingTier.java @@ -8,4 +8,9 @@ package org.opensearch.indices; +/** + * This is specific to onHeap caching tier and can be used to add methods which are specific to this tier. + * @param Type of key + * @param Type of value + */ public interface OnHeapCachingTier extends CachingTier {} diff --git a/server/src/main/java/org/opensearch/indices/OpenSearchOnHeapCache.java b/server/src/main/java/org/opensearch/indices/OpenSearchOnHeapCache.java index cb250edfd7a24..189b741ed1696 100644 --- a/server/src/main/java/org/opensearch/indices/OpenSearchOnHeapCache.java +++ b/server/src/main/java/org/opensearch/indices/OpenSearchOnHeapCache.java @@ -40,7 +40,7 @@ public void setRemovalListener(RemovalListener removalListener) { @Override public void invalidateAll() { - + cache.invalidateAll(); } @Override @@ -83,6 +83,11 @@ public V compute(K key, TieredCacheLoader loader) throws Exception { return cache.compute(key, key1 -> loader.load(key)); } + @Override + public void refresh() { + cache.refresh(); + } + @Override public void onRemoval(RemovalNotification notification) { removalListener.onRemoval(notification); diff --git a/server/src/main/java/org/opensearch/indices/TieredCacheService.java b/server/src/main/java/org/opensearch/indices/TieredCacheService.java new file mode 100644 index 0000000000000..59e5e0e00b6c1 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/TieredCacheService.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +import java.util.Optional; + +/** + * This service encapsulates all logic to write/fetch to/from appropriate tiers. Can be implemented with different + * flavors like spillover etc. + * @param Type of key + * @param Type of value + */ +public interface TieredCacheService { + + V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception; + + V get(K key); + + void invalidate(K key); + + void invalidateAll(); + + long count(); + + OnHeapCachingTier getOnHeapCachingTier(); + + Optional> getDiskCachingTier(); +} diff --git a/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyService.java b/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyService.java new file mode 100644 index 0000000000000..7799170a1ede9 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyService.java @@ -0,0 +1,231 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +import org.opensearch.common.cache.RemovalListener; +import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.RemovalReason; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; + +/** + * This service spillover the evicted items from upper tier to lower tier. For now, we are spilling the in-memory + * cache items to disk tier cache. + * @param Type of key + * @param Type of value + */ +public class TieredCacheSpilloverStrategyService implements TieredCacheService, RemovalListener { + + private final OnHeapCachingTier onHeapCachingTier; + + /** + * Optional in case tiered caching is turned off. + */ + private final Optional> diskCachingTier; + private final TieredCacheEventListener tieredCacheEventListener; + + /** + * Maintains caching tiers in order of get calls. + */ + private final List> cachingTierList; + + private TieredCacheSpilloverStrategyService( + OnHeapCachingTier onHeapCachingTier, + DiskCachingTier diskCachingTier, + TieredCacheEventListener tieredCacheEventListener + ) { + this.onHeapCachingTier = Objects.requireNonNull(onHeapCachingTier); + this.diskCachingTier = Optional.ofNullable(diskCachingTier); + this.tieredCacheEventListener = Objects.requireNonNull(tieredCacheEventListener); + this.cachingTierList = this.diskCachingTier.map(diskTier -> Arrays.asList(onHeapCachingTier, diskTier)) + .orElse(List.of(onHeapCachingTier)); + setRemovalListeners(); + } + + /** + * This method logic is divided into 2 parts: + * 1. First check whether key is present or not in desired tier. If yes, return the value. + * 2. If the key is not present, then add the key/value pair to onHeap cache. + * @param key Key for lookup. + * @param loader Used to load value in case it is not present in any tier. + * @return value + * @throws Exception + */ + @Override + public V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception { + CacheValue cacheValue = getValueFromTierCache(true).apply(key); + if (cacheValue == null) { + // Add the value to the onHeap cache. Any items if evicted will be moved to lower tier. + V value = onHeapCachingTier.compute(key, loader); + tieredCacheEventListener.onCached(key, value, TierType.ON_HEAP); + return value; + } + return cacheValue.value; + } + + @Override + public V get(K key) { + CacheValue cacheValue = getValueFromTierCache(true).apply(key); + if (cacheValue == null) { + return null; + } + return cacheValue.value; + } + + /** + * First fetches the tier type which has this key. And then invalidate accordingly. + * @param key + */ + @Override + public void invalidate(K key) { + // We don't need to track hits/misses in this case. + CacheValue cacheValue = getValueFromTierCache(false).apply(key); + if (cacheValue != null) { + switch (cacheValue.source) { + case ON_HEAP: + onHeapCachingTier.invalidate(key); + break; + case DISK: + diskCachingTier.ifPresent(diskTier -> diskTier.invalidate(key)); + break; + default: + break; + } + } + } + + @Override + public void invalidateAll() { + for (CachingTier cachingTier : cachingTierList) { + cachingTier.invalidateAll(); + } + } + + /** + * Returns the total count of items present in all cache tiers. + * @return total count of items in cache + */ + @Override + public long count() { + long totalCount = 0; + for (CachingTier cachingTier : cachingTierList) { + totalCount += cachingTier.count(); + } + return totalCount; + } + + /** + * Called whenever an item is evicted from any cache tier. If the item was evicted from onHeap cache, it is moved + * to disk tier cache. In case it was evicted from disk tier cache, it will discarded. + * @param notification Contains info about the removal like reason, key/value etc. + */ + @Override + public void onRemoval(RemovalNotification notification) { + if (RemovalReason.EVICTED.equals(notification.getRemovalReason())) { + switch (notification.getTierType()) { + case ON_HEAP: + diskCachingTier.ifPresent(diskTier -> { + diskTier.put(notification.getKey(), notification.getValue()); + tieredCacheEventListener.onCached(notification.getKey(), notification.getValue(), TierType.DISK); + }); + break; + default: + break; + } + } + tieredCacheEventListener.onRemoval(notification); + } + + @Override + public OnHeapCachingTier getOnHeapCachingTier() { + return this.onHeapCachingTier; + } + + @Override + public Optional> getDiskCachingTier() { + return this.diskCachingTier; + } + + /** + * Register this service as a listener to removal events from different caching tiers. + */ + private void setRemovalListeners() { + for (CachingTier cachingTier : cachingTierList) { + cachingTier.setRemovalListener(this); + } + } + + private Function> getValueFromTierCache(boolean trackStats) { + return key -> { + for (CachingTier cachingTier : cachingTierList) { + V value = cachingTier.get(key); + if (value != null) { + if (trackStats) { + tieredCacheEventListener.onHit(key, value, cachingTier.getTierType()); + } + return new CacheValue<>(value, cachingTier.getTierType()); + } + if (trackStats) { + tieredCacheEventListener.onMiss(key, cachingTier.getTierType()); + } + } + return null; + }; + } + + /** + * Represents a cache value along with its associated tier type where it is stored. + * @param Type of value. + */ + public static class CacheValue { + V value; + TierType source; + + CacheValue(V value, TierType source) { + this.value = value; + this.source = source; + } + } + + public static class Builder { + private OnHeapCachingTier onHeapCachingTier; + private DiskCachingTier diskCachingTier; + private TieredCacheEventListener tieredCacheEventListener; + + public Builder() {} + + public Builder setOnHeapCachingTier(OnHeapCachingTier onHeapCachingTier) { + this.onHeapCachingTier = onHeapCachingTier; + return this; + } + + public Builder setOnDiskCachingTier(DiskCachingTier diskCachingTier) { + this.diskCachingTier = diskCachingTier; + return this; + } + + public Builder setTieredCacheEventListener(TieredCacheEventListener tieredCacheEventListener) { + this.tieredCacheEventListener = tieredCacheEventListener; + return this; + } + + public TieredCacheSpilloverStrategyService build() { + return new TieredCacheSpilloverStrategyService( + this.onHeapCachingTier, + this.diskCachingTier, + this.tieredCacheEventListener + ); + } + } + +} diff --git a/server/src/test/java/org/opensearch/indices/TieredCacheSpilloverStrategyServiceTests.java b/server/src/test/java/org/opensearch/indices/TieredCacheSpilloverStrategyServiceTests.java new file mode 100644 index 0000000000000..4c4c7f195ba31 --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/TieredCacheSpilloverStrategyServiceTests.java @@ -0,0 +1,458 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +import org.opensearch.common.cache.RemovalListener; +import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.RemovalReason; +import org.opensearch.common.metrics.CounterMetric; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +public class TieredCacheSpilloverStrategyServiceTests extends OpenSearchTestCase { + + public void testComputeAndAbsentWithoutAnyOnHeapCacheEviction() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + MockTieredCacheEventListener eventListener = new MockTieredCacheEventListener(); + TieredCacheSpilloverStrategyService spilloverStrategyService = intializeTieredCacheService( + onHeapCacheSize, + randomIntBetween(1, 4), + eventListener + ); + int numOfItems1 = randomIntBetween(1, onHeapCacheSize / 2 - 1); + List keys = new ArrayList<>(); + // Put values in cache. + for (int iter = 0; iter < numOfItems1; iter++) { + String key = UUID.randomUUID().toString(); + keys.add(key); + TieredCacheLoader tieredCacheLoader = getTieredCacheLoader(); + spilloverStrategyService.computeIfAbsent(key, tieredCacheLoader); + } + assertEquals(numOfItems1, eventListener.enumMap.get(TierType.ON_HEAP).missCount.count()); + assertEquals(0, eventListener.enumMap.get(TierType.ON_HEAP).hitCount.count()); + assertEquals(0, eventListener.enumMap.get(TierType.ON_HEAP).evictionsMetric.count()); + + // Try to hit cache again with some randomization. + int numOfItems2 = randomIntBetween(1, onHeapCacheSize / 2 - 1); + int cacheHit = 0; + int cacheMiss = 0; + for (int iter = 0; iter < numOfItems2; iter++) { + if (randomBoolean()) { + // Hit cache with stored key + cacheHit++; + int index = randomIntBetween(0, keys.size() - 1); + spilloverStrategyService.computeIfAbsent(keys.get(index), getTieredCacheLoader()); + } else { + // Hit cache with randomized key which is expected to miss cache always. + spilloverStrategyService.computeIfAbsent(UUID.randomUUID().toString(), getTieredCacheLoader()); + cacheMiss++; + } + } + assertEquals(cacheHit, eventListener.enumMap.get(TierType.ON_HEAP).hitCount.count()); + assertEquals(numOfItems1 + cacheMiss, eventListener.enumMap.get(TierType.ON_HEAP).missCount.count()); + assertEquals(0, eventListener.enumMap.get(TierType.ON_HEAP).evictionsMetric.count()); + } + + public void testComputeAndAbsentWithEvictionsFromOnHeapCache() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + int diskCacheSize = randomIntBetween(60, 100); + int totalSize = onHeapCacheSize + diskCacheSize; + MockTieredCacheEventListener eventListener = new MockTieredCacheEventListener(); + TieredCacheSpilloverStrategyService spilloverStrategyService = intializeTieredCacheService( + onHeapCacheSize, + diskCacheSize, + eventListener + ); + + // Put values in cache more than it's size and cause evictions from onHeap. + int numOfItems1 = randomIntBetween(onHeapCacheSize + 1, totalSize); + List onHeapKeys = new ArrayList<>(); + List diskTierKeys = new ArrayList<>(); + for (int iter = 0; iter < numOfItems1; iter++) { + String key = UUID.randomUUID().toString(); + if (iter > (onHeapCacheSize - 1)) { + // All these are bound to go to disk based cache. + diskTierKeys.add(key); + } else { + onHeapKeys.add(key); + } + TieredCacheLoader tieredCacheLoader = getTieredCacheLoader(); + spilloverStrategyService.computeIfAbsent(key, tieredCacheLoader); + } + assertEquals(numOfItems1, eventListener.enumMap.get(TierType.ON_HEAP).missCount.count()); + assertEquals(0, eventListener.enumMap.get(TierType.ON_HEAP).hitCount.count()); + assertTrue(eventListener.enumMap.get(TierType.ON_HEAP).evictionsMetric.count() > 0); + + assertEquals( + eventListener.enumMap.get(TierType.ON_HEAP).evictionsMetric.count(), + eventListener.enumMap.get(TierType.DISK).cachedCount.count() + ); + assertEquals(diskTierKeys.size(), eventListener.enumMap.get(TierType.DISK).cachedCount.count()); + + // Try to hit cache again with some randomization. + int numOfItems2 = randomIntBetween(50, 200); + int onHeapCacheHit = 0; + int diskCacheHit = 0; + int cacheMiss = 0; + for (int iter = 0; iter < numOfItems2; iter++) { + if (randomBoolean()) { + if (randomBoolean()) { // Hit cache with key stored in onHeap cache. + onHeapCacheHit++; + int index = randomIntBetween(0, onHeapKeys.size() - 1); + spilloverStrategyService.computeIfAbsent(onHeapKeys.get(index), getTieredCacheLoader()); + } else { // Hit cache with key stored in disk cache. + diskCacheHit++; + int index = randomIntBetween(0, diskTierKeys.size() - 1); + spilloverStrategyService.computeIfAbsent(diskTierKeys.get(index), getTieredCacheLoader()); + } + } else { + // Hit cache with randomized key which is expected to miss cache always. + TieredCacheLoader tieredCacheLoader = getTieredCacheLoader(); + spilloverStrategyService.computeIfAbsent(UUID.randomUUID().toString(), tieredCacheLoader); + cacheMiss++; + } + } + // On heap cache misses would also include diskCacheHits as it means it missed onHeap cache. + assertEquals(numOfItems1 + cacheMiss + diskCacheHit, eventListener.enumMap.get(TierType.ON_HEAP).missCount.count()); + assertEquals(onHeapCacheHit, eventListener.enumMap.get(TierType.ON_HEAP).hitCount.count()); + assertEquals(cacheMiss + numOfItems1, eventListener.enumMap.get(TierType.DISK).missCount.count()); + assertEquals(diskCacheHit, eventListener.enumMap.get(TierType.DISK).hitCount.count()); + } + + public void testComputeAndAbsentWithEvictionsFromBothTier() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + int diskCacheSize = randomIntBetween(onHeapCacheSize + 1, 100); + int totalSize = onHeapCacheSize + diskCacheSize; + + MockTieredCacheEventListener eventListener = new MockTieredCacheEventListener(); + TieredCacheSpilloverStrategyService spilloverStrategyService = intializeTieredCacheService( + onHeapCacheSize, + diskCacheSize, + eventListener + ); + + int numOfItems = randomIntBetween(totalSize + 1, totalSize * 3); + for (int iter = 0; iter < numOfItems; iter++) { + TieredCacheLoader tieredCacheLoader = getTieredCacheLoader(); + spilloverStrategyService.computeIfAbsent(UUID.randomUUID().toString(), tieredCacheLoader); + } + assertTrue(eventListener.enumMap.get(TierType.ON_HEAP).evictionsMetric.count() > 0); + assertTrue(eventListener.enumMap.get(TierType.DISK).evictionsMetric.count() > 0); + } + + public void testGetAndCount() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + int diskCacheSize = randomIntBetween(onHeapCacheSize + 1, 100); + int totalSize = onHeapCacheSize + diskCacheSize; + + MockTieredCacheEventListener eventListener = new MockTieredCacheEventListener(); + TieredCacheSpilloverStrategyService spilloverStrategyService = intializeTieredCacheService( + onHeapCacheSize, + diskCacheSize, + eventListener + ); + + int numOfItems1 = randomIntBetween(onHeapCacheSize + 1, totalSize); + List onHeapKeys = new ArrayList<>(); + List diskTierKeys = new ArrayList<>(); + for (int iter = 0; iter < numOfItems1; iter++) { + String key = UUID.randomUUID().toString(); + if (iter > (onHeapCacheSize - 1)) { + // All these are bound to go to disk based cache. + diskTierKeys.add(key); + } else { + onHeapKeys.add(key); + } + TieredCacheLoader tieredCacheLoader = getTieredCacheLoader(); + spilloverStrategyService.computeIfAbsent(key, tieredCacheLoader); + } + + for (int iter = 0; iter < numOfItems1; iter++) { + if (randomBoolean()) { + if (randomBoolean()) { + int index = randomIntBetween(0, onHeapKeys.size() - 1); + assertNotNull(spilloverStrategyService.get(onHeapKeys.get(index))); + } else { + int index = randomIntBetween(0, diskTierKeys.size() - 1); + assertNotNull(spilloverStrategyService.get(diskTierKeys.get(index))); + } + } else { + assertNull(spilloverStrategyService.get(UUID.randomUUID().toString())); + } + } + assertEquals(numOfItems1, spilloverStrategyService.count()); + } + + public void testWithDiskTierNull() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + MockTieredCacheEventListener eventListener = new MockTieredCacheEventListener(); + TieredCacheSpilloverStrategyService spilloverStrategyService = new TieredCacheSpilloverStrategyService.Builder< + String, + String>().setOnHeapCachingTier(new MockOnHeapCacheTier<>(onHeapCacheSize)).setTieredCacheEventListener(eventListener).build(); + int numOfItems = randomIntBetween(onHeapCacheSize + 1, onHeapCacheSize * 3); + for (int iter = 0; iter < numOfItems; iter++) { + TieredCacheLoader tieredCacheLoader = getTieredCacheLoader(); + spilloverStrategyService.computeIfAbsent(UUID.randomUUID().toString(), tieredCacheLoader); + } + assertTrue(eventListener.enumMap.get(TierType.ON_HEAP).evictionsMetric.count() > 0); + assertEquals(0, eventListener.enumMap.get(TierType.DISK).cachedCount.count()); + assertEquals(0, eventListener.enumMap.get(TierType.DISK).evictionsMetric.count()); + assertEquals(0, eventListener.enumMap.get(TierType.DISK).missCount.count()); + } + + private TieredCacheLoader getTieredCacheLoader() { + return new TieredCacheLoader() { + boolean isLoaded = false; + + @Override + public String load(String key) { + isLoaded = true; + return UUID.randomUUID().toString(); + } + + @Override + public boolean isLoaded() { + return isLoaded; + } + }; + } + + private TieredCacheSpilloverStrategyService intializeTieredCacheService( + int onHeapCacheSize, + int diksCacheSize, + TieredCacheEventListener cacheEventListener + ) { + DiskCachingTier diskCache = new MockDiskCachingTier<>(diksCacheSize); + OnHeapCachingTier openSearchOnHeapCache = new MockOnHeapCacheTier<>(onHeapCacheSize); + return new TieredCacheSpilloverStrategyService.Builder().setOnHeapCachingTier(openSearchOnHeapCache) + .setOnDiskCachingTier(diskCache) + .setTieredCacheEventListener(cacheEventListener) + .build(); + } + + class MockOnHeapCacheTier implements OnHeapCachingTier, RemovalListener { + + Map onHeapCacheTier; + int maxSize; + private RemovalListener removalListener; + + MockOnHeapCacheTier(int size) { + maxSize = size; + this.onHeapCacheTier = new ConcurrentHashMap(); + } + + @Override + public V get(K key) { + return this.onHeapCacheTier.get(key); + } + + @Override + public void put(K key, V value) { + this.onHeapCacheTier.put(key, value); + } + + @Override + public V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception { + if (this.onHeapCacheTier.size() > maxSize) { // If it exceeds, just notify for evict. + onRemoval(new RemovalNotification<>(key, loader.load(key), RemovalReason.EVICTED, TierType.ON_HEAP)); + return loader.load(key); + } + return this.onHeapCacheTier.computeIfAbsent(key, k -> { + try { + return loader.load(key); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Override + public void invalidate(K key) { + this.onHeapCacheTier.remove(key); + } + + @Override + public V compute(K key, TieredCacheLoader loader) throws Exception { + if (this.onHeapCacheTier.size() >= maxSize) { // If it exceeds, just notify for evict. + onRemoval(new RemovalNotification<>(key, loader.load(key), RemovalReason.EVICTED, TierType.ON_HEAP)); + return loader.load(key); + } + return this.onHeapCacheTier.compute(key, ((k, v) -> { + try { + return loader.load(key); + } catch (Exception e) { + throw new RuntimeException(e); + } + })); + } + + @Override + public void setRemovalListener(RemovalListener removalListener) { + this.removalListener = removalListener; + } + + @Override + public void invalidateAll() { + this.onHeapCacheTier.clear(); + } + + @Override + public Iterable keys() { + return this.onHeapCacheTier.keySet(); + } + + @Override + public int count() { + return this.onHeapCacheTier.size(); + } + + @Override + public TierType getTierType() { + return TierType.ON_HEAP; + } + + @Override + public void onRemoval(RemovalNotification notification) { + removalListener.onRemoval(notification); + } + } + + class MockTieredCacheEventListener implements TieredCacheEventListener { + + EnumMap enumMap = new EnumMap<>(TierType.class); + + MockTieredCacheEventListener() { + for (TierType tierType : TierType.values()) { + enumMap.put(tierType, new TestStatsHolder()); + } + } + + @Override + public void onMiss(K key, TierType tierType) { + enumMap.get(tierType).missCount.inc(); + } + + @Override + public void onRemoval(RemovalNotification notification) { + if (notification.getRemovalReason().equals(RemovalReason.EVICTED)) { + enumMap.get(notification.getTierType()).evictionsMetric.inc(); + } + } + + @Override + public void onHit(K key, V value, TierType tierType) { + enumMap.get(tierType).hitCount.inc(); + } + + @Override + public void onCached(K key, V value, TierType tierType) { + enumMap.get(tierType).cachedCount.inc(); + } + + class TestStatsHolder { + final CounterMetric evictionsMetric = new CounterMetric(); + final CounterMetric hitCount = new CounterMetric(); + final CounterMetric missCount = new CounterMetric(); + + final CounterMetric cachedCount = new CounterMetric(); + } + } + + class MockDiskCachingTier implements DiskCachingTier, RemovalListener { + Map diskTier; + private RemovalListener removalListener; + int maxSize; + + MockDiskCachingTier(int size) { + this.maxSize = size; + diskTier = new ConcurrentHashMap(); + } + + @Override + public V get(K key) { + return this.diskTier.get(key); + } + + @Override + public void put(K key, V value) { + if (this.diskTier.size() >= maxSize) { // For simplification + onRemoval(new RemovalNotification<>(key, value, RemovalReason.EVICTED, TierType.DISK)); + return; + } + this.diskTier.put(key, value); + } + + @Override + public V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception { + return this.diskTier.computeIfAbsent(key, k -> { + try { + return loader.load(k); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Override + public void invalidate(K key) { + this.diskTier.remove(key); + } + + @Override + public V compute(K key, TieredCacheLoader loader) throws Exception { + if (this.diskTier.size() >= maxSize) { // If it exceeds, just notify for evict. + onRemoval(new RemovalNotification<>(key, loader.load(key), RemovalReason.EVICTED, TierType.DISK)); + return loader.load(key); + } + return this.diskTier.compute(key, (k, v) -> { + try { + return loader.load(key); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Override + public void setRemovalListener(RemovalListener removalListener) { + this.removalListener = removalListener; + } + + @Override + public void invalidateAll() { + this.diskTier.clear(); + } + + @Override + public Iterable keys() { + return null; + } + + @Override + public int count() { + return this.diskTier.size(); + } + + @Override + public TierType getTierType() { + return TierType.DISK; + } + + @Override + public void onRemoval(RemovalNotification notification) { + this.removalListener.onRemoval(notification); + } + } +} From dfef98141acc52b9df442b8bfb186a7c3ce237f4 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Thu, 26 Oct 2023 12:27:16 -0700 Subject: [PATCH 06/12] Addressing comments Signed-off-by: Sagar Upadhyaya --- .../index/OpenSearchDirectoryReader.java | 6 +-- .../indices/IndicesRequestCache.java | 44 +++++++++++-------- .../indices/IndicesRequestCacheTests.java | 6 +-- 3 files changed, 31 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java b/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java index b2e21d2076cbb..c9da20f279a3c 100644 --- a/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java +++ b/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java @@ -113,18 +113,18 @@ public void addClosedListener(ClosedListener listener) { */ public class DelegatingCacheKey { CacheKey cacheKey; - private final UUID uniqueId; + private final String uniqueId; DelegatingCacheKey(CacheKey cacheKey) { this.cacheKey = cacheKey; - this.uniqueId = UUID.randomUUID(); + this.uniqueId = UUID.randomUUID().toString(); } public CacheKey getCacheKey() { return this.cacheKey; } - public UUID getId() { + public String getId() { return uniqueId; } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 0dc316311ca83..eb1c67e9cb877 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -167,14 +167,14 @@ BytesReference getOrCompute( OpenSearchDirectoryReader.DelegatingCacheHelper delegatingCacheHelper = (OpenSearchDirectoryReader.DelegatingCacheHelper) reader .getReaderCacheHelper(); - String readerCacheKeyUniqueId = delegatingCacheHelper.getDelegatingCacheKey().getId().toString(); - assert readerCacheKeyUniqueId != null; - final Key key = new Key(cacheEntity, cacheKey, readerCacheKeyUniqueId); + String readerCacheKeyId = delegatingCacheHelper.getDelegatingCacheKey().getId(); + assert readerCacheKeyId != null; + final Key key = new Key(cacheEntity, cacheKey, readerCacheKeyId); Loader cacheLoader = new Loader(cacheEntity, loader); BytesReference value = tieredCacheService.computeIfAbsent(key, cacheLoader); if (cacheLoader.isLoaded()) { // see if its the first time we see this reader, and make sure to register a cleanup key - CleanupKey cleanupKey = new CleanupKey(cacheEntity, readerCacheKeyUniqueId); + CleanupKey cleanupKey = new CleanupKey(cacheEntity, readerCacheKeyId); if (!registeredClosedListeners.containsKey(cleanupKey)) { Boolean previous = registeredClosedListeners.putIfAbsent(cleanupKey, Boolean.TRUE); if (previous == null) { @@ -196,7 +196,13 @@ BytesReference getOrCompute( */ void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) { assert reader.getReaderCacheHelper() != null; - tieredCacheService.invalidate(new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey)); + String readerCacheKeyId = null; + if (reader instanceof OpenSearchDirectoryReader) { + IndexReader.CacheHelper cacheHelper = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper(); + readerCacheKeyId = ((OpenSearchDirectoryReader.DelegatingCacheHelper) cacheHelper).getDelegatingCacheKey() + .getId(); + } + cache.invalidate(new Key(cacheEntity, cacheKey, readerCacheKeyId)); } /** @@ -275,18 +281,18 @@ public static class Key implements Accountable { private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class); public final CacheEntity entity; // use as identity equality - public final String readerCacheKeyUniqueId; + public final String readerCacheKeyId; public final BytesReference value; - Key(CacheEntity entity, BytesReference value, String readerCacheKeyUniqueId) { + Key(CacheEntity entity, BytesReference value, String readerCacheKeyId) { this.entity = entity; this.value = value; - this.readerCacheKeyUniqueId = Objects.requireNonNull(readerCacheKeyUniqueId); + this.readerCacheKeyId = Objects.requireNonNull(readerCacheKeyId); } Key(StreamInput in) throws IOException { this.entity = in.readOptionalWriteable(in1 -> indicesService.new IndexShardCacheEntity(in1)); - this.readerCacheKeyUniqueId = in.readOptionalString(); + this.readerCacheKeyId = in.readOptionalString(); this.value = in.readBytesReference(); } @@ -306,7 +312,7 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Key key = (Key) o; - if (Objects.equals(readerCacheKeyUniqueId, key.readerCacheKeyUniqueId) == false) return false; + if (Objects.equals(readerCacheKeyId, key.readerCacheKeyId) == false) return false; if (!entity.getCacheIdentity().equals(key.entity.getCacheIdentity())) return false; if (!value.equals(key.value)) return false; return true; @@ -315,7 +321,7 @@ public boolean equals(Object o) { @Override public int hashCode() { int result = entity.getCacheIdentity().hashCode(); - result = 31 * result + readerCacheKeyUniqueId.hashCode(); + result = 31 * result + readerCacheKeyId.hashCode(); result = 31 * result + value.hashCode(); return result; } @@ -323,18 +329,18 @@ public int hashCode() { @Override public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(entity); - out.writeOptionalString(readerCacheKeyUniqueId); + out.writeOptionalString(readerCacheKeyId); out.writeBytesReference(value); } } private class CleanupKey implements IndexReader.ClosedListener { final CacheEntity entity; - final String readerCacheKeyUniqueId; + final String readerCacheKeyId; - private CleanupKey(CacheEntity entity, String readerCacheKeyUniqueId) { + private CleanupKey(CacheEntity entity, String readerCacheKeyId) { this.entity = entity; - this.readerCacheKeyUniqueId = readerCacheKeyUniqueId; + this.readerCacheKeyId = readerCacheKeyId; } @Override @@ -352,7 +358,7 @@ public boolean equals(Object o) { return false; } CleanupKey that = (CleanupKey) o; - if (Objects.equals(readerCacheKeyUniqueId, that.readerCacheKeyUniqueId) == false) return false; + if (Objects.equals(readerCacheKeyId, that.readerCacheKeyId) == false) return false; if (!entity.getCacheIdentity().equals(that.entity.getCacheIdentity())) return false; return true; } @@ -360,7 +366,7 @@ public boolean equals(Object o) { @Override public int hashCode() { int result = entity.getCacheIdentity().hashCode(); - result = 31 * result + Objects.hashCode(readerCacheKeyUniqueId); + result = 31 * result + Objects.hashCode(readerCacheKeyId); return result; } } @@ -376,7 +382,7 @@ synchronized void cleanCache() { for (Iterator iterator = keysToClean.iterator(); iterator.hasNext();) { CleanupKey cleanupKey = iterator.next(); iterator.remove(); - if (cleanupKey.readerCacheKeyUniqueId == null || cleanupKey.entity.isOpen() == false) { + if (cleanupKey.readerCacheKeyId == null || cleanupKey.entity.isOpen() == false) { // null indicates full cleanup, as does a closed shard currentFullClean.add(cleanupKey.entity.getCacheIdentity()); } else { @@ -389,7 +395,7 @@ synchronized void cleanCache() { if (currentFullClean.contains(key.entity.getCacheIdentity())) { iterator.remove(); } else { - if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerCacheKeyUniqueId))) { + if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerCacheKeyId))) { iterator.remove(); } } diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 664865f21f3a8..18ec013711f22 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -448,10 +448,10 @@ public void testEqualsKey() throws IOException { IndexWriter writer = new IndexWriter(dir, config); ShardId shardId = new ShardId("foo", "bar", 1); IndexReader reader1 = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); - String rKey1 = ((OpenSearchDirectoryReader) reader1).getDelegatingCacheHelper().getDelegatingCacheKey().getId().toString(); + String rKey1 = ((OpenSearchDirectoryReader) reader1).getDelegatingCacheHelper().getDelegatingCacheKey().getId(); writer.addDocument(new Document()); IndexReader reader2 = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); - String rKey2 = ((OpenSearchDirectoryReader) reader2).getDelegatingCacheHelper().getDelegatingCacheKey().getId().toString(); + String rKey2 = ((OpenSearchDirectoryReader) reader2).getDelegatingCacheHelper().getDelegatingCacheKey().getId(); IOUtils.close(reader1, reader2, writer, dir); IndicesRequestCache.Key key1 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(1), rKey1); IndicesRequestCache.Key key2 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(1), rKey1); @@ -488,7 +488,7 @@ public void testSerializationDeserializationOfCacheKey() throws Exception { IndicesRequestCache.Key key2 = indicesRequestCache.new Key(in); - assertEquals(readerCacheKeyId, key2.readerCacheKeyUniqueId); + assertEquals(readerCacheKeyId, key2.readerCacheKeyId); assertEquals(shardCacheEntity.getCacheIdentity(), key2.entity.getCacheIdentity()); assertEquals(termBytes, key2.value); From dc6cf5b4bad7cabc96aa89a0c84d35ed4cbce4f6 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Thu, 26 Oct 2023 12:48:24 -0700 Subject: [PATCH 07/12] Adding changelog Signed-off-by: Sagar Upadhyaya --- CHANGELOG.md | 19 +++++++++---------- .../indices/IndicesRequestCache.java | 3 +-- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 50922b85a0c0d..e1e20bc93ea11 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -82,15 +82,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 2.x] ### Added -- Add coordinator level stats for search latency ([#8386](https://github.com/opensearch-project/OpenSearch/issues/8386)) -- Add metrics for thread_pool task wait time ([#9681](https://github.com/opensearch-project/OpenSearch/pull/9681)) -- Async blob read support for S3 plugin ([#9694](https://github.com/opensearch-project/OpenSearch/pull/9694)) -- [Telemetry-Otel] Added support for OtlpGrpcSpanExporter exporter ([#9666](https://github.com/opensearch-project/OpenSearch/pull/9666)) -- Async blob read support for encrypted containers ([#10131](https://github.com/opensearch-project/OpenSearch/pull/10131)) -- Add capability to restrict async durability mode for remote indexes ([#10189](https://github.com/opensearch-project/OpenSearch/pull/10189)) -- Add Doc Status Counter for Indexing Engine ([#4562](https://github.com/opensearch-project/OpenSearch/issues/4562)) -- Add unreferenced file cleanup count to merge stats ([#10204](https://github.com/opensearch-project/OpenSearch/pull/10204)) -- [Remote Store] Add support to restrict creation & deletion if system repository and mutation of immutable settings of system repository ([#9839](https://github.com/opensearch-project/OpenSearch/pull/9839)) +- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351)) +- [Remote Store] Add repository stats for remote store([#10567](https://github.com/opensearch-project/OpenSearch/pull/10567)) +- Add search query categorizer ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255)) +- Introduce ConcurrentQueryProfiler to profile query using concurrent segment search path and support concurrency during rewrite and create weight ([10352](https://github.com/opensearch-project/OpenSearch/pull/10352)) +- [Remote cluster state] Make index and global metadata upload timeout dynamic cluster settings ([#10814](https://github.com/opensearch-project/OpenSearch/pull/10814)) +- Added cluster setting cluster.restrict.index.replication_type to restrict setting of index setting replication type ([#10866](https://github.com/opensearch-project/OpenSearch/pull/10866)) +- Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670)) +- [Tiered caching] Enabling serialization for IndicesRequestCache key object ([#10275](https://github.com/opensearch-project/OpenSearch/pull/10275)) ### Dependencies - Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575)) @@ -143,4 +142,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Security [Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD -[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.11...2.x \ No newline at end of file +[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.11...2.x diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index eb1c67e9cb877..2ddaf8dc93d78 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -199,8 +199,7 @@ void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference String readerCacheKeyId = null; if (reader instanceof OpenSearchDirectoryReader) { IndexReader.CacheHelper cacheHelper = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper(); - readerCacheKeyId = ((OpenSearchDirectoryReader.DelegatingCacheHelper) cacheHelper).getDelegatingCacheKey() - .getId(); + readerCacheKeyId = ((OpenSearchDirectoryReader.DelegatingCacheHelper) cacheHelper).getDelegatingCacheKey().getId(); } cache.invalidate(new Key(cacheEntity, cacheKey, readerCacheKeyId)); } From 9134543354f1f9fd3c023eb2e077468de487c570 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Thu, 26 Oct 2023 14:42:08 -0700 Subject: [PATCH 08/12] Added javadoc for new files/packages Signed-off-by: Sagar Upadhyaya --- .../common/cache/RemovalNotification.java | 2 +- .../cache/tier}/CachingTier.java | 2 +- .../cache/tier}/DiskCachingTier.java | 2 +- .../cache/tier}/OnHeapCachingTier.java | 2 +- .../cache/tier}/OpenSearchOnHeapCache.java | 26 ++++++--- .../cache/tier}/TierType.java | 5 +- .../cache/tier}/TieredCacheEventListener.java | 7 ++- .../cache/tier}/TieredCacheLoader.java | 7 ++- .../cache/tier}/TieredCacheService.java | 2 +- .../TieredCacheSpilloverStrategyService.java | 25 ++++---- .../common/cache/tier/package-info.java | 10 ++++ .../cache/request/ShardRequestCache.java | 2 +- .../AbstractIndexShardCacheEntity.java | 1 + .../indices/DummyDiskCachingTier.java | 58 ------------------- .../indices/IndicesRequestCache.java | 9 ++- ...redCacheSpilloverStrategyServiceTests.java | 2 +- .../indices/IndicesServiceCloseTests.java | 1 + 17 files changed, 73 insertions(+), 90 deletions(-) rename server/src/main/java/org/opensearch/{indices => common/cache/tier}/CachingTier.java (96%) rename server/src/main/java/org/opensearch/{indices => common/cache/tier}/DiskCachingTier.java (91%) rename server/src/main/java/org/opensearch/{indices => common/cache/tier}/OnHeapCachingTier.java (91%) rename server/src/main/java/org/opensearch/{indices => common/cache/tier}/OpenSearchOnHeapCache.java (80%) rename server/src/main/java/org/opensearch/{indices => common/cache/tier}/TierType.java (77%) rename server/src/main/java/org/opensearch/{indices => common/cache/tier}/TieredCacheEventListener.java (76%) rename server/src/main/java/org/opensearch/{indices => common/cache/tier}/TieredCacheLoader.java (66%) rename server/src/main/java/org/opensearch/{indices => common/cache/tier}/TieredCacheService.java (94%) rename server/src/main/java/org/opensearch/{indices => common/cache/tier}/TieredCacheSpilloverStrategyService.java (92%) create mode 100644 server/src/main/java/org/opensearch/common/cache/tier/package-info.java delete mode 100644 server/src/main/java/org/opensearch/indices/DummyDiskCachingTier.java rename server/src/test/java/org/opensearch/{indices => common/cache/tier}/TieredCacheSpilloverStrategyServiceTests.java (99%) diff --git a/server/src/main/java/org/opensearch/common/cache/RemovalNotification.java b/server/src/main/java/org/opensearch/common/cache/RemovalNotification.java index 71e240064c6ae..2152c4917b62d 100644 --- a/server/src/main/java/org/opensearch/common/cache/RemovalNotification.java +++ b/server/src/main/java/org/opensearch/common/cache/RemovalNotification.java @@ -32,7 +32,7 @@ package org.opensearch.common.cache; -import org.opensearch.indices.TierType; +import org.opensearch.common.cache.tier.TierType; /** * Notification when an element is removed from the cache diff --git a/server/src/main/java/org/opensearch/indices/CachingTier.java b/server/src/main/java/org/opensearch/common/cache/tier/CachingTier.java similarity index 96% rename from server/src/main/java/org/opensearch/indices/CachingTier.java rename to server/src/main/java/org/opensearch/common/cache/tier/CachingTier.java index 7bf7294467114..48fd5deadc111 100644 --- a/server/src/main/java/org/opensearch/indices/CachingTier.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/CachingTier.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.indices; +package org.opensearch.common.cache.tier; import org.opensearch.common.cache.RemovalListener; diff --git a/server/src/main/java/org/opensearch/indices/DiskCachingTier.java b/server/src/main/java/org/opensearch/common/cache/tier/DiskCachingTier.java similarity index 91% rename from server/src/main/java/org/opensearch/indices/DiskCachingTier.java rename to server/src/main/java/org/opensearch/common/cache/tier/DiskCachingTier.java index abc8952c0b8e2..4db71b6378a02 100644 --- a/server/src/main/java/org/opensearch/indices/DiskCachingTier.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/DiskCachingTier.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.indices; +package org.opensearch.common.cache.tier; /** * This is specific to disk caching tier and can be used to add methods which are specific to disk tier. diff --git a/server/src/main/java/org/opensearch/indices/OnHeapCachingTier.java b/server/src/main/java/org/opensearch/common/cache/tier/OnHeapCachingTier.java similarity index 91% rename from server/src/main/java/org/opensearch/indices/OnHeapCachingTier.java rename to server/src/main/java/org/opensearch/common/cache/tier/OnHeapCachingTier.java index f7d68c27c1904..127fa6ee8e6b3 100644 --- a/server/src/main/java/org/opensearch/indices/OnHeapCachingTier.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/OnHeapCachingTier.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.indices; +package org.opensearch.common.cache.tier; /** * This is specific to onHeap caching tier and can be used to add methods which are specific to this tier. diff --git a/server/src/main/java/org/opensearch/indices/OpenSearchOnHeapCache.java b/server/src/main/java/org/opensearch/common/cache/tier/OpenSearchOnHeapCache.java similarity index 80% rename from server/src/main/java/org/opensearch/indices/OpenSearchOnHeapCache.java rename to server/src/main/java/org/opensearch/common/cache/tier/OpenSearchOnHeapCache.java index 189b741ed1696..22d2f769507cf 100644 --- a/server/src/main/java/org/opensearch/indices/OpenSearchOnHeapCache.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/OpenSearchOnHeapCache.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.indices; +package org.opensearch.common.cache.tier; import org.opensearch.common.cache.Cache; import org.opensearch.common.cache.CacheBuilder; @@ -14,21 +14,28 @@ import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.unit.TimeValue; +import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.function.ToLongBiFunction; +/** + * This variant of on-heap cache uses OpenSearch custom cache implementation. + * @param Type of key + * @param Type of value + */ public class OpenSearchOnHeapCache implements OnHeapCachingTier, RemovalListener { private final Cache cache; private RemovalListener removalListener; - private OpenSearchOnHeapCache(long maxWeightInBytes, ToLongBiFunction weigher, TimeValue expireAfterAcess) { + private OpenSearchOnHeapCache(Builder builder) { + Objects.requireNonNull(builder.weigher); CacheBuilder cacheBuilder = CacheBuilder.builder() - .setMaximumWeight(maxWeightInBytes) - .weigher(weigher) + .setMaximumWeight(builder.maxWeightInBytes) + .weigher(builder.weigher) .removalListener(this); - if (expireAfterAcess != null) { - cacheBuilder.setExpireAfterAccess(expireAfterAcess); + if (builder.expireAfterAcess != null) { + cacheBuilder.setExpireAfterAccess(builder.expireAfterAcess); } cache = cacheBuilder.build(); } @@ -93,6 +100,11 @@ public void onRemoval(RemovalNotification notification) { removalListener.onRemoval(notification); } + /** + * Builder object + * @param Type of key + * @param Type of value + */ public static class Builder { private long maxWeightInBytes; @@ -118,7 +130,7 @@ public Builder setExpireAfterAccess(TimeValue expireAfterAcess) { } public OpenSearchOnHeapCache build() { - return new OpenSearchOnHeapCache(maxWeightInBytes, weigher, expireAfterAcess); + return new OpenSearchOnHeapCache(this); } } } diff --git a/server/src/main/java/org/opensearch/indices/TierType.java b/server/src/main/java/org/opensearch/common/cache/tier/TierType.java similarity index 77% rename from server/src/main/java/org/opensearch/indices/TierType.java rename to server/src/main/java/org/opensearch/common/cache/tier/TierType.java index 9a286fd26151b..ca61b636c1dda 100644 --- a/server/src/main/java/org/opensearch/indices/TierType.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/TierType.java @@ -6,8 +6,11 @@ * compatible open source license. */ -package org.opensearch.indices; +package org.opensearch.common.cache.tier; +/** + * Tier types in cache. + */ public enum TierType { ON_HEAP, diff --git a/server/src/main/java/org/opensearch/indices/TieredCacheEventListener.java b/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheEventListener.java similarity index 76% rename from server/src/main/java/org/opensearch/indices/TieredCacheEventListener.java rename to server/src/main/java/org/opensearch/common/cache/tier/TieredCacheEventListener.java index 084ac5a57e0d3..05b59bf16b282 100644 --- a/server/src/main/java/org/opensearch/indices/TieredCacheEventListener.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheEventListener.java @@ -6,10 +6,15 @@ * compatible open source license. */ -package org.opensearch.indices; +package org.opensearch.common.cache.tier; import org.opensearch.common.cache.RemovalNotification; +/** + * This can be used to listen to tiered caching events + * @param Type of key + * @param Type of value + */ public interface TieredCacheEventListener { void onMiss(K key, TierType tierType); diff --git a/server/src/main/java/org/opensearch/indices/TieredCacheLoader.java b/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheLoader.java similarity index 66% rename from server/src/main/java/org/opensearch/indices/TieredCacheLoader.java rename to server/src/main/java/org/opensearch/common/cache/tier/TieredCacheLoader.java index f6bb1a74e973e..d720feade0609 100644 --- a/server/src/main/java/org/opensearch/indices/TieredCacheLoader.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheLoader.java @@ -6,8 +6,13 @@ * compatible open source license. */ -package org.opensearch.indices; +package org.opensearch.common.cache.tier; +/** + * Used to load value in tiered cache if not present. + * @param Type of key + * @param Type of value + */ public interface TieredCacheLoader { V load(K key) throws Exception; diff --git a/server/src/main/java/org/opensearch/indices/TieredCacheService.java b/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheService.java similarity index 94% rename from server/src/main/java/org/opensearch/indices/TieredCacheService.java rename to server/src/main/java/org/opensearch/common/cache/tier/TieredCacheService.java index 59e5e0e00b6c1..31d58510206f0 100644 --- a/server/src/main/java/org/opensearch/indices/TieredCacheService.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheService.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.indices; +package org.opensearch.common.cache.tier; import java.util.Optional; diff --git a/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyService.java b/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheSpilloverStrategyService.java similarity index 92% rename from server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyService.java rename to server/src/main/java/org/opensearch/common/cache/tier/TieredCacheSpilloverStrategyService.java index 7799170a1ede9..c148145274546 100644 --- a/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyService.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheSpilloverStrategyService.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.indices; +package org.opensearch.common.cache.tier; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; @@ -39,14 +39,10 @@ public class TieredCacheSpilloverStrategyService implements TieredCacheSer */ private final List> cachingTierList; - private TieredCacheSpilloverStrategyService( - OnHeapCachingTier onHeapCachingTier, - DiskCachingTier diskCachingTier, - TieredCacheEventListener tieredCacheEventListener - ) { - this.onHeapCachingTier = Objects.requireNonNull(onHeapCachingTier); - this.diskCachingTier = Optional.ofNullable(diskCachingTier); - this.tieredCacheEventListener = Objects.requireNonNull(tieredCacheEventListener); + private TieredCacheSpilloverStrategyService(Builder builder) { + this.onHeapCachingTier = Objects.requireNonNull(builder.onHeapCachingTier); + this.diskCachingTier = Optional.ofNullable(builder.diskCachingTier); + this.tieredCacheEventListener = Objects.requireNonNull(builder.tieredCacheEventListener); this.cachingTierList = this.diskCachingTier.map(diskTier -> Arrays.asList(onHeapCachingTier, diskTier)) .orElse(List.of(onHeapCachingTier)); setRemovalListeners(); @@ -197,6 +193,11 @@ public static class CacheValue { } } + /** + * Builder object + * @param Type of key + * @param Type of value + */ public static class Builder { private OnHeapCachingTier onHeapCachingTier; private DiskCachingTier diskCachingTier; @@ -220,11 +221,7 @@ public Builder setTieredCacheEventListener(TieredCacheEventListener } public TieredCacheSpilloverStrategyService build() { - return new TieredCacheSpilloverStrategyService( - this.onHeapCachingTier, - this.diskCachingTier, - this.tieredCacheEventListener - ); + return new TieredCacheSpilloverStrategyService(this); } } diff --git a/server/src/main/java/org/opensearch/common/cache/tier/package-info.java b/server/src/main/java/org/opensearch/common/cache/tier/package-info.java new file mode 100644 index 0000000000000..7ad81dbe3073c --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/tier/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Base package for cache tier support. */ +package org.opensearch.common.cache.tier; diff --git a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java index 1beef5217355f..efad437804bef 100644 --- a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java +++ b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java @@ -33,9 +33,9 @@ package org.opensearch.index.cache.request; import org.apache.lucene.util.Accountable; +import org.opensearch.common.cache.tier.TierType; import org.opensearch.common.metrics.CounterMetric; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.indices.TierType; import java.util.EnumMap; diff --git a/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java b/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java index 2eef16df2bb9a..d9c256b4b4a94 100644 --- a/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java +++ b/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java @@ -34,6 +34,7 @@ import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.cache.RemovalReason; +import org.opensearch.common.cache.tier.TierType; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.index.cache.request.ShardRequestCache; import org.opensearch.index.shard.IndexShard; diff --git a/server/src/main/java/org/opensearch/indices/DummyDiskCachingTier.java b/server/src/main/java/org/opensearch/indices/DummyDiskCachingTier.java deleted file mode 100644 index 26a78b6c61920..0000000000000 --- a/server/src/main/java/org/opensearch/indices/DummyDiskCachingTier.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.indices; - -import org.opensearch.common.cache.RemovalListener; - -import java.util.Collections; - -public class DummyDiskCachingTier implements CachingTier { - - @Override - public V get(K key) { - return null; - } - - @Override - public void put(K key, V value) {} - - @Override - public V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception { - return null; - } - - @Override - public void invalidate(K key) {} - - @Override - public V compute(K key, TieredCacheLoader loader) throws Exception { - return null; - } - - @Override - public void setRemovalListener(RemovalListener removalListener) {} - - @Override - public void invalidateAll() {} - - @Override - public Iterable keys() { - return Collections::emptyIterator; - } - - @Override - public int count() { - return 0; - } - - @Override - public TierType getTierType() { - return TierType.DISK; - } -} diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 2ddaf8dc93d78..fe75e8ffb5f39 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -40,6 +40,13 @@ import org.apache.lucene.util.RamUsageEstimator; import org.opensearch.common.CheckedSupplier; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.tier.OnHeapCachingTier; +import org.opensearch.common.cache.tier.OpenSearchOnHeapCache; +import org.opensearch.common.cache.tier.TierType; +import org.opensearch.common.cache.tier.TieredCacheEventListener; +import org.opensearch.common.cache.tier.TieredCacheLoader; +import org.opensearch.common.cache.tier.TieredCacheService; +import org.opensearch.common.cache.tier.TieredCacheSpilloverStrategyService; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; @@ -209,7 +216,7 @@ void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference * * @opensearch.internal */ - private static class Loader implements org.opensearch.indices.TieredCacheLoader { + private static class Loader implements TieredCacheLoader { private final CacheEntity entity; private final CheckedSupplier loader; diff --git a/server/src/test/java/org/opensearch/indices/TieredCacheSpilloverStrategyServiceTests.java b/server/src/test/java/org/opensearch/common/cache/tier/TieredCacheSpilloverStrategyServiceTests.java similarity index 99% rename from server/src/test/java/org/opensearch/indices/TieredCacheSpilloverStrategyServiceTests.java rename to server/src/test/java/org/opensearch/common/cache/tier/TieredCacheSpilloverStrategyServiceTests.java index 4c4c7f195ba31..3cd08df649f72 100644 --- a/server/src/test/java/org/opensearch/indices/TieredCacheSpilloverStrategyServiceTests.java +++ b/server/src/test/java/org/opensearch/common/cache/tier/TieredCacheSpilloverStrategyServiceTests.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.indices; +package org.opensearch.common.cache.tier; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; diff --git a/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java b/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java index 766d80a81b097..5dd4eb504ec2f 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java @@ -37,6 +37,7 @@ import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.routing.allocation.DiskThresholdSettings; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.tier.TierType; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.common.bytes.BytesArray; From 50fc1671084c303a4a2b4e46127e9d1dfe69df95 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Thu, 26 Oct 2023 14:46:09 -0700 Subject: [PATCH 09/12] Added changelog Signed-off-by: Sagar Upadhyaya --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e1e20bc93ea11..7db82b23a256b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -89,7 +89,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Remote cluster state] Make index and global metadata upload timeout dynamic cluster settings ([#10814](https://github.com/opensearch-project/OpenSearch/pull/10814)) - Added cluster setting cluster.restrict.index.replication_type to restrict setting of index setting replication type ([#10866](https://github.com/opensearch-project/OpenSearch/pull/10866)) - Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670)) -- [Tiered caching] Enabling serialization for IndicesRequestCache key object ([#10275](https://github.com/opensearch-project/OpenSearch/pull/10275)) +- [Tiered caching] Framework changes ([#10753](https://github.com/opensearch-project/OpenSearch/pull/10753) ### Dependencies - Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575)) From be1fe01de6e3804a96287296a5b838917774d18c Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Thu, 26 Oct 2023 16:30:39 -0700 Subject: [PATCH 10/12] Fixing javadoc warnings Signed-off-by: Sagar Upadhyaya --- .../cache/tier/TieredCacheSpilloverStrategyService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheSpilloverStrategyService.java b/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheSpilloverStrategyService.java index c148145274546..153dbf9b330f5 100644 --- a/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheSpilloverStrategyService.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/TieredCacheSpilloverStrategyService.java @@ -55,7 +55,7 @@ private TieredCacheSpilloverStrategyService(Builder builder) { * @param key Key for lookup. * @param loader Used to load value in case it is not present in any tier. * @return value - * @throws Exception + * @throws Exception exception thrown */ @Override public V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception { @@ -80,7 +80,7 @@ public V get(K key) { /** * First fetches the tier type which has this key. And then invalidate accordingly. - * @param key + * @param key key to invalidate */ @Override public void invalidate(K key) { From e33f43fa42ebce6459054e2c99ab229055a2580e Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Thu, 28 Dec 2023 15:55:24 -0800 Subject: [PATCH 11/12] Cleanup from separation cherry pick Signed-off-by: Peter Alfonsi --- .../indices/IndicesRequestCache.java | 17 +- .../indices/TieredCacheHandler.java | 24 --- .../TieredCacheSpilloverStrategyHandler.java | 172 ------------------ .../indices/IndicesRequestCacheTests.java | 26 --- 4 files changed, 5 insertions(+), 234 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/indices/TieredCacheHandler.java delete mode 100644 server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index fe75e8ffb5f39..f3cac4cbaebcc 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -114,13 +114,13 @@ public final class IndicesRequestCache implements TieredCacheEventListener tieredCacheService; - - private final TieredCacheHandler tieredCacheHandler; + private final IndicesService indicesService; IndicesRequestCache(Settings settings, IndicesService indicesService) { this.size = INDICES_CACHE_QUERY_SIZE.get(settings); this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null; long sizeInBytes = size.getBytes(); + this.indicesService = indicesService; // Initialize onHeap cache tier first. OnHeapCachingTier openSearchOnHeapCache = new OpenSearchOnHeapCache.Builder().setWeigher( @@ -208,7 +208,7 @@ void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference IndexReader.CacheHelper cacheHelper = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper(); readerCacheKeyId = ((OpenSearchDirectoryReader.DelegatingCacheHelper) cacheHelper).getDelegatingCacheKey().getId(); } - cache.invalidate(new Key(cacheEntity, cacheKey, readerCacheKeyId)); + tieredCacheService.invalidate(new Key(cacheEntity, cacheKey, readerCacheKeyId)); } /** @@ -283,8 +283,8 @@ interface CacheEntity extends Accountable, Writeable { * * @opensearch.internal */ - public static class Key implements Accountable { - private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class); + public class Key implements Accountable { + private final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class); public final CacheEntity entity; // use as identity equality public final String readerCacheKeyId; @@ -331,13 +331,6 @@ public int hashCode() { result = 31 * result + value.hashCode(); return result; } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeOptionalWriteable(entity); - out.writeOptionalString(readerCacheKeyId); - out.writeBytesReference(value); - } } private class CleanupKey implements IndexReader.ClosedListener { diff --git a/server/src/main/java/org/opensearch/indices/TieredCacheHandler.java b/server/src/main/java/org/opensearch/indices/TieredCacheHandler.java deleted file mode 100644 index 5fe41f5adce94..0000000000000 --- a/server/src/main/java/org/opensearch/indices/TieredCacheHandler.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.indices; - -public interface TieredCacheHandler { - - V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception; - - V get(K key); - - void invalidate(K key); - - void invalidateAll(); - - long count(); - - CachingTier getOnHeapCachingTier(); -} diff --git a/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java b/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java deleted file mode 100644 index b1b68bbf93f79..0000000000000 --- a/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.indices; - -import org.opensearch.common.cache.RemovalListener; -import org.opensearch.common.cache.RemovalNotification; -import org.opensearch.common.cache.RemovalReason; - -import java.util.Arrays; -import java.util.List; -import java.util.Objects; -import java.util.function.Function; - -/** - * - * @param - * @param - */ -public class TieredCacheSpilloverStrategyHandler implements TieredCacheHandler, RemovalListener { - - private final OnHeapCachingTier onHeapCachingTier; - private final CachingTier diskCachingTier; - private final TieredCacheEventListener tieredCacheEventListener; - - /** - * Maintains caching tiers in order of get calls. - */ - private final List> cachingTierList; - - private TieredCacheSpilloverStrategyHandler( - OnHeapCachingTier onHeapCachingTier, - CachingTier diskCachingTier, - TieredCacheEventListener tieredCacheEventListener - ) { - this.onHeapCachingTier = Objects.requireNonNull(onHeapCachingTier); - this.diskCachingTier = Objects.requireNonNull(diskCachingTier); - this.tieredCacheEventListener = tieredCacheEventListener; - this.cachingTierList = Arrays.asList(onHeapCachingTier, diskCachingTier); - setRemovalListeners(); - } - - @Override - public V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception { - CacheValue cacheValue = getValueFromTierCache().apply(key); - if (cacheValue == null) { - // Add the value to the onHeap cache. Any items if evicted will be moved to lower tier - V value = onHeapCachingTier.compute(key, loader); - tieredCacheEventListener.onCached(key, value, TierType.ON_HEAP); - return value; - } else { - //tieredCacheEventListener.onHit(key, cacheValue.value, cacheValue.source); // this double counts, see line 122 - } - return cacheValue.value; - } - - @Override - public V get(K key) { - CacheValue cacheValue = getValueFromTierCache().apply(key); - if (cacheValue == null) { - return null; - } - return cacheValue.value; - } - - @Override - public void invalidate(K key) { - // TODO - } - - @Override - public void invalidateAll() { - for (CachingTier cachingTier : cachingTierList) { - cachingTier.invalidateAll(); - } - } - - @Override - public long count() { - long totalCount = 0; - for (CachingTier cachingTier : cachingTierList) { - totalCount += cachingTier.count(); - } - return totalCount; - } - - @Override - public void onRemoval(RemovalNotification notification) { - if (RemovalReason.EVICTED.equals(notification.getRemovalReason())) { - switch (notification.getTierType()) { - case ON_HEAP: - diskCachingTier.put(notification.getKey(), notification.getValue()); - break; - default: - break; - } - } - tieredCacheEventListener.onRemoval(notification); - } - - @Override - public CachingTier getOnHeapCachingTier() { - return this.onHeapCachingTier; - } - - private void setRemovalListeners() { - for (CachingTier cachingTier : cachingTierList) { - cachingTier.setRemovalListener(this); - } - } - - private Function> getValueFromTierCache() { - return key -> { - for (CachingTier cachingTier : cachingTierList) { - V value = cachingTier.get(key); - if (value != null) { - tieredCacheEventListener.onHit(key, value, cachingTier.getTierType()); - return new CacheValue<>(value, cachingTier.getTierType()); - } - tieredCacheEventListener.onMiss(key, cachingTier.getTierType()); - } - return null; - }; - } - - public static class CacheValue { - V value; - TierType source; - - CacheValue(V value, TierType source) { - this.value = value; - this.source = source; - } - } - - public static class Builder { - private OnHeapCachingTier onHeapCachingTier; - private CachingTier diskCachingTier; - private TieredCacheEventListener tieredCacheEventListener; - - public Builder() {} - - public Builder setOnHeapCachingTier(OnHeapCachingTier onHeapCachingTier) { - this.onHeapCachingTier = onHeapCachingTier; - return this; - } - - public Builder setOnDiskCachingTier(CachingTier diskCachingTier) { - this.diskCachingTier = diskCachingTier; - return this; - } - - public Builder setTieredCacheEventListener(TieredCacheEventListener tieredCacheEventListener) { - this.tieredCacheEventListener = tieredCacheEventListener; - return this; - } - - public TieredCacheSpilloverStrategyHandler build() { - return new TieredCacheSpilloverStrategyHandler( - this.onHeapCachingTier, - this.diskCachingTier, - this.tieredCacheEventListener - ); - } - } - -} diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 18ec013711f22..0dc4669c27a18 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -468,32 +468,6 @@ public void testEqualsKey() throws IOException { assertNotEquals(key1, key5); } - public void testSerializationDeserializationOfCacheKey() throws Exception { - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - ShardRequestCache shardRequestCache = new ShardRequestCache(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndicesRequestCache indicesRequestCache = indicesService.indicesRequestCache; - IndexService indexService = createIndex("test"); - IndexShard indexShard = indexService.getShard(0); - IndicesService.IndexShardCacheEntity shardCacheEntity = indicesService.new IndexShardCacheEntity(indexShard); - String readerCacheKeyId = UUID.randomUUID().toString(); - IndicesRequestCache.Key key1 = indicesRequestCache.new Key(shardCacheEntity, termBytes, readerCacheKeyId); - BytesReference bytesReference = null; - try (BytesStreamOutput out = new BytesStreamOutput()) { - key1.writeTo(out); - bytesReference = out.bytes(); - } - StreamInput in = bytesReference.streamInput(); - - IndicesRequestCache.Key key2 = indicesRequestCache.new Key(in); - - assertEquals(readerCacheKeyId, key2.readerCacheKeyId); - assertEquals(shardCacheEntity.getCacheIdentity(), key2.entity.getCacheIdentity()); - assertEquals(termBytes, key2.value); - - } - private class TestBytesReference extends AbstractBytesReference { int dummyValue; From 0faf76185844ae5c60d886e14d4ed1d5d37da95c Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Tue, 2 Jan 2024 10:20:42 -0800 Subject: [PATCH 12/12] Removed stats updates which should be in stats PR Signed-off-by: Peter Alfonsi --- .../cache/request/ShardRequestCache.java | 47 ++++++------------- .../AbstractIndexShardCacheEntity.java | 17 ++++--- 2 files changed, 22 insertions(+), 42 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java index efad437804bef..795d585d88647 100644 --- a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java +++ b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java @@ -33,12 +33,9 @@ package org.opensearch.index.cache.request; import org.apache.lucene.util.Accountable; -import org.opensearch.common.cache.tier.TierType; import org.opensearch.common.metrics.CounterMetric; import org.opensearch.core.common.bytes.BytesReference; -import java.util.EnumMap; - /** * Tracks the portion of the request cache in use for a particular shard. * @@ -46,39 +43,31 @@ */ public final class ShardRequestCache { - private EnumMap statsHolder = new EnumMap<>(TierType.class); - - public ShardRequestCache() { - for (TierType tierType : TierType.values()) { - statsHolder.put(tierType, new StatsHolder()); - } - } + final CounterMetric evictionsMetric = new CounterMetric(); + final CounterMetric totalMetric = new CounterMetric(); + final CounterMetric hitCount = new CounterMetric(); + final CounterMetric missCount = new CounterMetric(); public RequestCacheStats stats() { // TODO: Change RequestCacheStats to support disk tier stats. - return new RequestCacheStats( - statsHolder.get(TierType.ON_HEAP).totalMetric.count(), - statsHolder.get(TierType.ON_HEAP).evictionsMetric.count(), - statsHolder.get(TierType.ON_HEAP).hitCount.count(), - statsHolder.get(TierType.ON_HEAP).missCount.count() - ); + return new RequestCacheStats(totalMetric.count(), evictionsMetric.count(), hitCount.count(), missCount.count()); } - public void onHit(TierType tierType) { - statsHolder.get(tierType).hitCount.inc(); + public void onHit() { + hitCount.inc(); } - public void onMiss(TierType tierType) { - statsHolder.get(tierType).missCount.inc(); + public void onMiss() { + missCount.inc(); } - public void onCached(Accountable key, BytesReference value, TierType tierType) { - statsHolder.get(tierType).totalMetric.inc(key.ramBytesUsed() + value.ramBytesUsed()); + public void onCached(Accountable key, BytesReference value) { + totalMetric.inc(key.ramBytesUsed() + value.ramBytesUsed()); } - public void onRemoval(Accountable key, BytesReference value, boolean evicted, TierType tierType) { + public void onRemoval(Accountable key, BytesReference value, boolean evicted) { if (evicted) { - statsHolder.get(tierType).evictionsMetric.inc(); + evictionsMetric.inc(); } long dec = 0; if (key != null) { @@ -87,14 +76,6 @@ public void onRemoval(Accountable key, BytesReference value, boolean evicted, Ti if (value != null) { dec += value.ramBytesUsed(); } - statsHolder.get(tierType).totalMetric.dec(dec); - } - - static class StatsHolder { - - final CounterMetric evictionsMetric = new CounterMetric(); - final CounterMetric totalMetric = new CounterMetric(); - final CounterMetric hitCount = new CounterMetric(); - final CounterMetric missCount = new CounterMetric(); + totalMetric.dec(dec); } } diff --git a/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java b/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java index d9c256b4b4a94..81d4f545e0fd9 100644 --- a/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java +++ b/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java @@ -53,26 +53,25 @@ abstract class AbstractIndexShardCacheEntity implements IndicesRequestCache.Cach @Override public final void onCached(IndicesRequestCache.Key key, BytesReference value, TierType tierType) { - stats().onCached(key, value, tierType); + // TODO: Handle tierType in stats + stats().onCached(key, value); } @Override public final void onHit(TierType tierType) { - stats().onHit(tierType); + // TODO: Handle tierType in stats + stats().onHit(); } @Override public final void onMiss(TierType tierType) { - stats().onMiss(tierType); + // TODO: Handle tierType in stats + stats().onMiss(); } @Override public final void onRemoval(RemovalNotification notification) { - stats().onRemoval( - notification.getKey(), - notification.getValue(), - notification.getRemovalReason() == RemovalReason.EVICTED, - notification.getTierType() - ); + // TODO: Handle tierType in stats + stats().onRemoval(notification.getKey(), notification.getValue(), notification.getRemovalReason() == RemovalReason.EVICTED); } }