diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/pagemem/JmhCacheFreelistBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/pagemem/JmhCacheFreelistBenchmark.java new file mode 100644 index 00000000000000..4f656494e1fa73 --- /dev/null +++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/pagemem/JmhCacheFreelistBenchmark.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.benchmarks.jmh.pagemem; + +import java.util.AbstractCollection; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider; +import org.apache.ignite.internal.pagemem.PageIdAllocator; +import org.apache.ignite.internal.pagemem.PageMemory; +import org.apache.ignite.internal.pagemem.impl.PageMemoryNoStoreImpl; +import org.apache.ignite.internal.processors.cache.CacheObjectImpl; +import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter; +import org.apache.ignite.internal.processors.cache.persistence.DataRegion; +import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl; +import org.apache.ignite.internal.processors.cache.persistence.Storable; +import org.apache.ignite.internal.processors.cache.persistence.evict.NoOpPageEvictionTracker; +import org.apache.ignite.internal.processors.cache.persistence.freelist.CacheFreeList; +import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.stat.IoStatisticsHolder; +import org.apache.ignite.internal.stat.IoStatisticsHolderNoOp; +import org.apache.ignite.logger.java.JavaLogger; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +/** + * Performance comparision between FreeList.insertRow(..) and FreeList.insertRows(..). + */ +@BenchmarkMode(Mode.AverageTime) +@Fork(value = 1, jvmArgsAppend = {"-Xms1g", "-server", "-XX:+AggressiveOpts", "-XX:MaxMetaspaceSize=256m", "-ea"}) +@OutputTimeUnit(MICROSECONDS) +@State(Scope.Benchmark) +@Threads(1) +@Warmup(iterations = 10, time = 200, timeUnit = MILLISECONDS) +@Measurement(iterations = 11, time = 200, timeUnit = MILLISECONDS) +public class JmhCacheFreelistBenchmark { + /** */ + private static final long MEMORY_REGION_SIZE = 10 * 1024 * 1024 * 1024L; // 10 GB + + /** */ + private static final int PAGE_SIZE = 4096; + + /** */ + private static final int ROWS_COUNT = 200; + + /** */ + public enum DATA_ROW_SIZE { + /** */ + r4_64(4, 64), + + /** */ + r100_300(100, 300), + + /** */ + r300_700(300, 700), + + /** */ + r700_1200(700, 1200), + + /** */ + r1200_3000(1_200, 3_000), + + /** */ + r1000_8000(1_000, 8_000), + + /** Large objects only. */ + r4000_16000(4_000, 16_000), + + /** Mixed objects, mostly large objects. */ + r100_32000(100, 32_000); + + /** */ + private final int min; + + /** */ + private final int max; + + /** */ + DATA_ROW_SIZE(int min, int max) { + this.min = min; + this.max = max; + } + } + + /** + * Check {@link FreeList#insertDataRow(Storable, IoStatisticsHolder)} performance. + */ + @Benchmark + public void insertRow(FreeListProvider provider, Data rows) throws IgniteCheckedException { + for (CacheDataRow row : rows) + provider.freeList.insertDataRow(row, IoStatisticsHolderNoOp.INSTANCE); + } + + /** + * Check {@link FreeList#insertDataRows(Collection, IoStatisticsHolder)} performance. + */ + @Benchmark + public void insertRows(FreeListProvider provider, Data rows) throws IgniteCheckedException { + provider.freeList.insertDataRows(rows, IoStatisticsHolderNoOp.INSTANCE); + } + + /** */ + @State(Scope.Thread) + public static class Data extends AbstractCollection { + /** */ + @Param + private DATA_ROW_SIZE range; + + /** */ + private Collection rows = new ArrayList<>(ROWS_COUNT); + + /** */ + @Setup(Level.Invocation) + public void prepare() { + Random rnd = ThreadLocalRandom.current(); + + int randomRange = range.max - range.min; + + for (int i = 0; i < ROWS_COUNT; i++) { + int keySize = (range.min + rnd.nextInt(randomRange)) / 2; + int valSize = (range.min + rnd.nextInt(randomRange)) / 2; + + CacheDataRow row = new TestDataRow(keySize, valSize); + + rows.add(row); + } + } + + /** */ + @TearDown(Level.Invocation) + public void cleanup() { + rows.clear(); + } + + /** {@inheritDoc} */ + @Override public Iterator iterator() { + return rows.iterator(); + } + + /** {@inheritDoc} */ + @Override public int size() { + return rows.size(); + } + } + + /** */ + @State(Scope.Thread) + public static class FreeListProvider { + /** */ + private final DataRegionConfiguration plcCfg = + new DataRegionConfiguration().setInitialSize(MEMORY_REGION_SIZE).setMaxSize(MEMORY_REGION_SIZE); + + /** */ + private final JavaLogger log = new JavaLogger(); + + /** */ + private PageMemory pageMem; + + /** */ + private FreeList freeList; + + /** */ + @Setup(Level.Trial) + public void setup() throws IgniteCheckedException { + pageMem = createPageMemory(log, PAGE_SIZE, plcCfg); + + freeList = createFreeList(pageMem, plcCfg); + } + + /** */ + @TearDown(Level.Trial) + public void tearDown() { + pageMem.stop(true); + } + + /** + * @return Page memory. + */ + protected PageMemory createPageMemory(IgniteLogger log, int pageSize, DataRegionConfiguration plcCfg) { + PageMemory pageMem = new PageMemoryNoStoreImpl(log, + new UnsafeMemoryProvider(log), + null, + pageSize, + plcCfg, + new DataRegionMetricsImpl(plcCfg), + true); + + pageMem.start(); + + return pageMem; + } + + /** + * @param pageMem Page memory. + * @return Free list. + * @throws IgniteCheckedException If failed. + */ + private FreeList createFreeList( + PageMemory pageMem, + DataRegionConfiguration plcCfg + ) throws IgniteCheckedException { + long metaPageId = pageMem.allocatePage(1, 1, PageIdAllocator.FLAG_DATA); + + DataRegionMetricsImpl regionMetrics = new DataRegionMetricsImpl(plcCfg); + + DataRegion dataRegion = new DataRegion(pageMem, plcCfg, regionMetrics, new NoOpPageEvictionTracker()); + + return new CacheFreeList( + 1, + "freelist", + regionMetrics, + dataRegion, + null, + null, + metaPageId, + true, + null + ); + } + } + + /** */ + private static class TestDataRow extends CacheDataRowAdapter { + /** */ + private long link; + + /** + * @param keySize Key size. + * @param valSize Value size. + */ + private TestDataRow(int keySize, int valSize) { + super( + new KeyCacheObjectImpl(0, new byte[keySize], 0), + new CacheObjectImpl(0, new byte[valSize]), + new GridCacheVersion(keySize, valSize, 1), + 0 + ); + } + + /** {@inheritDoc} */ + @Override public long link() { + return link; + } + + /** {@inheritDoc} */ + @Override public void link(long link) { + this.link = link; + } + } + + /** + * Run benchmark. + * + * @param args Args. + */ + public static void main(String[] args) throws RunnerException { + final Options options = new OptionsBuilder() + .include(JmhCacheFreelistBenchmark.class.getSimpleName()) + .build(); + + new Runner(options).run(); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 9aec3996c32046..71702edf87ca22 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -792,6 +792,42 @@ default boolean initialValue(CacheObject val, * @throws IgniteCheckedException In case of error. * @throws GridCacheEntryRemovedException If entry was removed. */ + default boolean initialValue(CacheObject val, + GridCacheVersion ver, + @Nullable MvccVersion mvccVer, + @Nullable MvccVersion newMvccVer, + byte mvccTxState, + byte newMvccTxState, + long ttl, + long expireTime, + boolean preload, + AffinityTopologyVersion topVer, + GridDrType drType, + boolean fromStore) throws IgniteCheckedException, GridCacheEntryRemovedException { + return initialValue(val, ver, null, null, TxState.NA, TxState.NA, + ttl, expireTime, preload, topVer, drType, fromStore, null); + } + + /** + * Sets new value if current version is 0 + * + * @param val New value. + * @param ver Version to use. + * @param mvccVer Mvcc version. + * @param newMvccVer New mvcc version. + * @param mvccTxState Tx state hint for mvcc version. + * @param newMvccTxState Tx state hint for new mvcc version. + * @param ttl Time to live. + * @param expireTime Expiration time. + * @param preload Flag indicating whether entry is being preloaded. + * @param topVer Topology version. + * @param drType DR type. + * @param fromStore {@code True} if value was loaded from store. + * @param row Pre-created data row, associated with this cache entry. + * @return {@code True} if initial value was set. + * @throws IgniteCheckedException In case of error. + * @throws GridCacheEntryRemovedException If entry was removed. + */ public boolean initialValue(CacheObject val, GridCacheVersion ver, @Nullable MvccVersion mvccVer, @@ -803,7 +839,8 @@ public boolean initialValue(CacheObject val, boolean preload, AffinityTopologyVersion topVer, GridDrType drType, - boolean fromStore) throws IgniteCheckedException, GridCacheEntryRemovedException; + boolean fromStore, + @Nullable CacheDataRow row) throws IgniteCheckedException, GridCacheEntryRemovedException; /** * Create versioned entry for this cache entry. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index f1b7ec76b2152f..02cc42b0dfe050 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -3310,7 +3310,8 @@ private boolean skipInterceptor(@Nullable GridCacheVersion explicitVer) { boolean preload, AffinityTopologyVersion topVer, GridDrType drType, - boolean fromStore + boolean fromStore, + CacheDataRow row ) throws IgniteCheckedException, GridCacheEntryRemovedException { ensureFreeSpace(); @@ -3386,7 +3387,7 @@ else if (val == null) cctx.offheap().mvccInitialValue(this, val, ver, expTime, mvccVer, newMvccVer); } else - storeValue(val, expTime, ver); + storeValue(val, expTime, ver, null, row); } } else { @@ -3417,7 +3418,7 @@ else if (val == null) } else // Optimization to access storage only once. - update = storeValue(val, expTime, ver, p); + update = storeValue(val, expTime, ver, p, row); } if (update) { @@ -4257,7 +4258,7 @@ private IgniteTxLocalAdapter currentTx() { protected boolean storeValue(@Nullable CacheObject val, long expireTime, GridCacheVersion ver) throws IgniteCheckedException { - return storeValue(val, expireTime, ver, null); + return storeValue(val, expireTime, ver, null, null); } /** @@ -4267,6 +4268,7 @@ protected boolean storeValue(@Nullable CacheObject val, * @param expireTime Expire time. * @param ver New entry version. * @param predicate Optional predicate. + * @param row Pre-created data row, associated with this cache entry. * @return {@code True} if storage was modified. * @throws IgniteCheckedException If update failed. */ @@ -4274,10 +4276,12 @@ protected boolean storeValue( @Nullable CacheObject val, long expireTime, GridCacheVersion ver, - @Nullable IgnitePredicate predicate) throws IgniteCheckedException { + @Nullable IgnitePredicate predicate, + @Nullable CacheDataRow row + ) throws IgniteCheckedException { assert lock.isHeldByCurrentThread(); - UpdateClosure closure = new UpdateClosure(this, val, ver, expireTime, predicate); + UpdateClosure closure = new UpdateClosure(this, val, ver, expireTime, predicate, row); cctx.offheap().invoke(cctx, key, localPartition(), closure); @@ -5716,16 +5720,19 @@ private static class UpdateClosure implements IgniteCacheOffheapManager.OffheapI * @param predicate Optional predicate. */ UpdateClosure(GridCacheMapEntry entry, @Nullable CacheObject val, GridCacheVersion ver, long expireTime, - @Nullable IgnitePredicate predicate) { + @Nullable IgnitePredicate predicate, @Nullable CacheDataRow newRow) { this.entry = entry; this.val = val; this.ver = ver; this.expireTime = expireTime; this.predicate = predicate; + this.newRow = newRow; } /** {@inheritDoc} */ @Override public void call(@Nullable CacheDataRow oldRow) throws IgniteCheckedException { + assert newRow == null || val != null; + if (oldRow != null) { oldRow.key(entry.key); @@ -5741,6 +5748,14 @@ private static class UpdateClosure implements IgniteCacheOffheapManager.OffheapI } if (val != null) { + // If there is a pre created row, we cannot update the old one. + // The old row will be removed after the operation is completed, as usual. + if (newRow != null) { + treeOp = IgniteTree.OperationType.PUT; + + return; + } + newRow = entry.cctx.offheap().dataStore(entry.localPartition()).createRow( entry.cctx, entry.key, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index cc5dd42ef6cc92..83421f31211f7f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -54,6 +54,7 @@ import org.apache.ignite.cache.store.CacheStoreSessionListener; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataPageEvictionMode; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -65,6 +66,7 @@ import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; +import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; @@ -72,6 +74,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLog; +import org.apache.ignite.internal.processors.cache.persistence.DataRegion; import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; @@ -1998,6 +2001,38 @@ public static boolean isCacheTemplateName(String cacheName) { return cacheName.endsWith("*"); } + /** + * Calculates whether there is enough free space in a region to store a specified amount of data. + * + * @param memPlc Data region. + * @param size Data size in bytes. + * @return {@code True} if a specified amount of data can be stored in the memory region without evictions. + */ + public static boolean isEnoughSpaceForData(DataRegion memPlc, long size) { + DataRegionConfiguration plc = memPlc.config(); + + if (size <= 0 || plc.isPersistenceEnabled() || plc.getPageEvictionMode() == DataPageEvictionMode.DISABLED) + return true; + + PageMemory pageMem = memPlc.pageMemory(); + + int sysPageSize = pageMem.systemPageSize(); + + long pagesRequired = Math.round(size / (double)sysPageSize); + + long maxPages = plc.getMaxSize() / sysPageSize; + + // There are enough pages left. + if (pagesRequired < maxPages - pageMem.loadedPages()) + return true; + + // Empty pages pool size restricted. + if (pagesRequired > plc.getEmptyPagesPoolSize()) + return false; + + return pagesRequired < Math.round(maxPages * (1.0d - plc.getEvictionThreshold())); + } + /** * */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index ab8d338e46b051..6cdd0540e5591a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache; +import java.util.Collection; import java.util.List; import java.util.Map; import javax.cache.Cache; @@ -694,6 +695,22 @@ CacheDataRow createRow( long expireTime, @Nullable CacheDataRow oldRow) throws IgniteCheckedException; + /** + * Create data rows. + * + * @param infos Entry infos. + * @return Created rows. + * @throws IgniteCheckedException If failed. + */ + public Map createRows( + Collection infos + ) throws IgniteCheckedException; + + /** + * @param row Row removed from cache. + */ + public void removeRow(CacheDataRow row) throws IgniteCheckedException; + /** * @param cctx Cache context. * @param cleanupRows Rows to cleanup. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index ff87d7059a03f8..56b7f5bef4e4e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -18,13 +18,16 @@ package org.apache.ignite.internal.processors.cache; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; @@ -1724,6 +1727,53 @@ private void invoke0(GridCacheContext cctx, CacheSearchRow row, OffheapInvokeClo return dataRow; } + /** {@inheritDoc} */ + @Override public Map createRows( + Collection infos + ) throws IgniteCheckedException { + if (!busyLock.enterBusy()) + throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); + + // todo check hashmap + Map map = new LinkedHashMap<>(U.capacity(infos.size())); + + for (GridCacheEntryInfo info : infos) { + DataRow row = info.value() == null ? null : + makeDataRow(info.key(), + info.value(), + info.version(), + info.expireTime(), + grp.storeCacheIdInDataPage() ? info.cacheId() : CU.UNDEFINED_CACHE_ID); + + map.put(info, row); + } + + try { + Collection rows = F.view(map.values(), Objects::nonNull); + + rowStore.addRows(rows, grp.statisticsHolderData()); + + Iterator iter = infos.iterator(); + + if (grp.sharedGroup() && !grp.storeCacheIdInDataPage()) { + for (DataRow row : rows) + row.cacheId(iter.next().cacheId()); + } + } + finally { + busyLock.leaveBusy(); + } + + return map; + } + + /** {@inheritDoc} */ + @Override public void removeRow(CacheDataRow row) throws IgniteCheckedException { + assert row != null; + + rowStore.removeRow(row.link(), grp.statisticsHolderData()); + } + /** * @param key Cache key. * @param val Cache value. @@ -2363,7 +2413,7 @@ private int cleanup0(GridCacheContext cctx, @Nullable List ctx; @@ -774,21 +780,30 @@ public void handleSupplyMessage( part.lock(); try { - Iterator infos = e.getValue().infos().iterator(); - - if (grp.mvccEnabled()) - mvccPreloadEntries(topVer, node, p, infos); - else - preloadEntries(topVer, node, p, infos); - - // If message was last for this partition, - // then we take ownership. - if (last) { - fut.partitionDone(nodeId, p, true); + List infos = e.getValue().infos(); + try { + if (grp.mvccEnabled()) + mvccPreloadEntries(topVer, node, p, infos); + else if (isEnoughSpaceForData(grp.dataRegion(), supplyMsg.messageSize() * 2)) + preloadEntriesBatched(topVer, node, p, infos); + else + preloadEntries(topVer, node, p, infos); + } + catch (GridDhtInvalidPartitionException ignored) { if (log.isDebugEnabled()) - log.debug("Finished rebalancing partition: " + - "[" + demandRoutineInfo(topicId, nodeId, supplyMsg) + ", p=" + p + "]"); + log.debug("Partition became invalid during rebalancing (will ignore): " + p); + } + finally { + // If message was last for this partition, + // then we take ownership. + if (last) { + fut.partitionDone(nodeId, p, true); + + if (log.isDebugEnabled()) + log.debug("Finished rebalancing partition: " + + "[" + demandRoutineInfo(topicId, nodeId, supplyMsg) + ", p=" + p + "]"); + } } } finally { @@ -869,21 +884,23 @@ public void handleSupplyMessage( * @throws IgniteInterruptedCheckedException If interrupted. */ private void mvccPreloadEntries(AffinityTopologyVersion topVer, ClusterNode node, int p, - Iterator infos) throws IgniteCheckedException { - if (!infos.hasNext()) + Collection infos) throws IgniteCheckedException { + if (infos.isEmpty()) return; List entryHist = new ArrayList<>(); GridCacheContext cctx = grp.sharedGroup() ? null : grp.singleCacheContext(); + Iterator iter = infos.iterator(); + // Loop through all received entries and try to preload them. - while (infos.hasNext() || !entryHist.isEmpty()) { + while (iter.hasNext() || !entryHist.isEmpty()) { ctx.database().checkpointReadLock(); try { - for (int i = 0; i < 100; i++) { - boolean hasMore = infos.hasNext(); + for (int i = 0; i < CHECKPOINT_THRESHOLD; i++) { + boolean hasMore = iter.hasNext(); assert hasMore || !entryHist.isEmpty(); @@ -892,7 +909,7 @@ private void mvccPreloadEntries(AffinityTopologyVersion topVer, ClusterNode node boolean flushHistory; if (hasMore) { - entry = (GridCacheMvccEntryInfo)infos.next(); + entry = (GridCacheMvccEntryInfo)iter.next(); GridCacheMvccEntryInfo prev = entryHist.isEmpty() ? null : entryHist.get(0); @@ -914,14 +931,7 @@ private void mvccPreloadEntries(AffinityTopologyVersion topVer, ClusterNode node } if (cctx != null) { - if (!mvccPreloadEntry(cctx, node, entryHist, topVer, p)) { - if (log.isTraceEnabled()) - log.trace("Got entries for invalid partition during " + - "preloading (will skip) [p=" + p + - ", entry=" + entryHist.get(entryHist.size() - 1) + ']'); - - return; // Skip current partition. - } + mvccPreloadEntry(cctx, node, entryHist, topVer, p); //TODO: IGNITE-11330: Update metrics for touched cache only. for (GridCacheContext ctx : grp.caches()) { @@ -955,19 +965,21 @@ private void mvccPreloadEntries(AffinityTopologyVersion topVer, ClusterNode node * @throws IgniteInterruptedCheckedException If interrupted. */ private void preloadEntries(AffinityTopologyVersion topVer, ClusterNode node, int p, - Iterator infos) throws IgniteCheckedException { + Iterable infos) throws IgniteCheckedException { GridCacheContext cctx = null; + Iterator iter = infos.iterator(); + // Loop through all received entries and try to preload them. - while (infos.hasNext()) { + while (iter.hasNext()) { ctx.database().checkpointReadLock(); try { - for (int i = 0; i < 100; i++) { - if (!infos.hasNext()) + for (int i = 0; i < CHECKPOINT_THRESHOLD; i++) { + if (!iter.hasNext()) break; - GridCacheEntryInfo entry = infos.next(); + GridCacheEntryInfo entry = iter.next(); if (cctx == null || (grp.sharedGroup() && entry.cacheId() != cctx.cacheId())) { cctx = grp.sharedGroup() ? grp.shared().cacheContext(entry.cacheId()) : grp.singleCacheContext(); @@ -978,13 +990,7 @@ else if (cctx.isNear()) cctx = cctx.dhtCache().context(); } - if (!preloadEntry(node, p, entry, topVer, cctx)) { - if (log.isTraceEnabled()) - log.trace("Got entries for invalid partition during " + - "preloading (will skip) [p=" + p + ", entry=" + entry + ']'); - - return; - } + preloadEntry(node, p, entry, topVer, cctx, null); //TODO: IGNITE-11330: Update metrics for touched cache only. for (GridCacheContext ctx : grp.caches()) { @@ -999,6 +1005,78 @@ else if (cctx.isNear()) } } + /** + * @param topVer Topology version. + * @param from Node which sent entry. + * @param p Partition id. + * @param infos Preloaded entries. + * @throws IgniteCheckedException If failed. + */ + private void preloadEntriesBatched( + AffinityTopologyVersion topVer, + ClusterNode from, + int p, + List infos + ) throws IgniteCheckedException { + int batchOff = 0; + + while (batchOff < infos.size()) { + List batch = + infos.subList(batchOff, Math.min(infos.size(), batchOff += CHECKPOINT_THRESHOLD)); + + Iterator> iter = null; + + ctx.database().checkpointReadLock(); + + try { + GridDhtLocalPartition part = grp.topology().localPartition(p); + + try { + // Create data rows on data pages before getting locks on cache entries. + iter = part.dataStore().createRows(batch).entrySet().iterator(); + + while (iter.hasNext()) { + Map.Entry e = iter.next(); + + GridCacheEntryInfo info = e.getKey(); + + CacheDataRow row = e.getValue(); + + GridCacheContext cctx = + grp.sharedGroup() ? ctx.cacheContext(info.cacheId()) : grp.singleCacheContext(); + + if (cctx != null && cctx.isNear()) + cctx = cctx.dhtCache().context(); + + if ((cctx == null || !preloadEntry(from, p, info, topVer, cctx, row)) && row != null) + part.dataStore().removeRow(row); + + if (cctx == null) + continue; + + //TODO: IGNITE-11330: Update metrics for touched cache only. + for (GridCacheContext cctx0 : grp.caches()) { + if (cctx0.statisticsEnabled()) + cctx0.cache().metrics0().onRebalanceKeyReceived(); + } + } + } + finally { + // Remove all unprocessed rows. + while (iter != null && iter.hasNext()) { + CacheDataRow row = iter.next().getValue(); + + if (row != null) + part.dataStore().removeRow(row); + } + } + } + finally { + ctx.database().checkpointReadUnlock(); + } + } + } + /** * Adds {@code entry} to partition {@code p}. * @@ -1007,7 +1085,8 @@ else if (cctx.isNear()) * @param entry Preloaded entry. * @param topVer Topology version. * @param cctx Cache context. - * @return {@code False} if partition has become invalid during preloading. + * @param row Pre-created data row, associated with this cache entry. + * @return {@code True} if the initial value was set for the specified cache entry. * @throws IgniteInterruptedCheckedException If interrupted. */ private boolean preloadEntry( @@ -1015,7 +1094,8 @@ private boolean preloadEntry( int p, GridCacheEntryInfo entry, AffinityTopologyVersion topVer, - GridCacheContext cctx + GridCacheContext cctx, + @Nullable CacheDataRow row ) throws IgniteCheckedException { assert ctx.database().checkpointLockIsHeldByThread(); @@ -1043,7 +1123,8 @@ private boolean preloadEntry( true, topVer, cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE, - false + false, + row )) { cached.touch(); // Start tracking. @@ -1051,6 +1132,8 @@ private boolean preloadEntry( cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(), null, null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null, false, null, null, null, true); + + return true; } else { cached.touch(); // Start tracking. @@ -1060,20 +1143,12 @@ private boolean preloadEntry( ", part=" + p + ']'); } } - else if (log.isTraceEnabled()) - log.trace("Rebalance predicate evaluated to false for entry (will ignore): " + entry); } catch (GridCacheEntryRemovedException ignored) { if (log.isTraceEnabled()) log.trace("Entry has been concurrently removed while rebalancing (will ignore) [key=" + cached.key() + ", part=" + p + ']'); } - catch (GridDhtInvalidPartitionException ignored) { - if (log.isDebugEnabled()) - log.debug("Partition became invalid during rebalancing (will ignore): " + p); - - return false; - } } catch (IgniteInterruptedCheckedException e) { throw e; @@ -1083,7 +1158,7 @@ else if (log.isTraceEnabled()) ctx.localNode() + ", node=" + from.id() + ", key=" + entry.key() + ", part=" + p + ']', e); } - return true; + return false; } /** @@ -1094,7 +1169,7 @@ else if (log.isTraceEnabled()) * @param history Mvcc entry history. * @param topVer Topology version. * @param p Partition id. - * @return {@code False} if partition has become invalid during preloading. + * @return {@code True} if the initial value was set for the specified cache entry. * @throws IgniteInterruptedCheckedException If interrupted. */ private boolean mvccPreloadEntry( @@ -1127,6 +1202,8 @@ private boolean mvccPreloadEntry( cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(), null, null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, null, true, null, false, null, null, null, true); + + return true; } else { cached.touch(); // Start tracking. @@ -1141,12 +1218,6 @@ private boolean mvccPreloadEntry( log.trace("Entry has been concurrently removed while rebalancing (will ignore) [key=" + cached.key() + ", part=" + p + ']'); } - catch (GridDhtInvalidPartitionException ignored) { - if (log.isDebugEnabled()) - log.debug("Partition became invalid during rebalancing (will ignore): " + p); - - return false; - } } catch (IgniteInterruptedCheckedException | ClusterTopologyCheckedException e) { throw e; @@ -1156,7 +1227,7 @@ private boolean mvccPreloadEntry( ctx.localNode() + ", node=" + from.id() + ", key=" + info.key() + ", part=" + p + ']', e); } - return true; + return false; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index c5045baa24bbad..eb6ab252491f44 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.persistence; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -59,6 +60,7 @@ import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; +import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheMvccEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheTtlManager; import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl; @@ -2374,6 +2376,15 @@ private Metas getOrAllocatePartitionMetas() throws IgniteCheckedException { return delegate.createRow(cctx, key, val, ver, expireTime, oldRow); } + /** {@inheritDoc} */ + @Override public void removeRow(CacheDataRow row) throws IgniteCheckedException { + assert ctx.database().checkpointLockIsHeldByThread(); + + CacheDataStore delegate = init0(false); + + delegate.removeRow(row); + } + /** {@inheritDoc} */ @Override public int cleanup(GridCacheContext cctx, @Nullable List cleanupRows) throws IgniteCheckedException { @@ -2399,6 +2410,17 @@ private Metas getOrAllocatePartitionMetas() throws IgniteCheckedException { delegate.invoke(cctx, key, c); } + /** {@inheritDoc} */ + @Override public Map createRows( + Collection infos + ) throws IgniteCheckedException { + assert ctx.database().checkpointLockIsHeldByThread(); + + CacheDataStore delegate = init0(false); + + return delegate.createRows(infos); + } + /** {@inheritDoc} */ @Override public void remove(GridCacheContext cctx, KeyCacheObject key, int partId) throws IgniteCheckedException { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java index 9a3d9d28756deb..d172e13b308357 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.persistence; +import java.util.Collection; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.PageMemory; @@ -117,6 +118,17 @@ public void addRow(CacheDataRow row, IoStatisticsHolder statHolder) throws Ignit ", link=" + U.hexLong(row.link()) + ']'; } + /** + * @param rows Rows. + * @param statHolder Statistics holder to track IO operations. + * @throws IgniteCheckedException If failed. + */ + public void addRows(Collection rows, IoStatisticsHolder statHolder) throws IgniteCheckedException { + assert ctx.database().checkpointLockIsHeldByThread(); + + freeList.insertDataRows(rows, statHolder); + } + /** * @param link Row link. * @param row New row data. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java index 22a79cc7a2061d..d6677de2961da2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.cache.persistence.freelist; +import java.util.Collection; +import java.util.Iterator; import java.util.concurrent.atomic.AtomicReferenceArray; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -130,13 +132,25 @@ private final class UpdateRowHandler extends PageHandler { } } - /** */ - private final PageHandler writeRow = new WriteRowHandler(); + /** Write handler which puts memory page into the free list after an update. */ + private final PageHandler writeRow = new WriteRowHandler(true); - /** - * - */ + /** Write handler which doesn't put memory page into the free list after an update. */ + private final PageHandler writeRowKeepPage = new WriteRowHandler(false); + + /** */ private final class WriteRowHandler extends PageHandler { + /** */ + private final boolean putPageIntoFreeList; + + /** + * @param putPageIntoFreeList Put page into the free list after an update. + */ + WriteRowHandler(boolean putPageIntoFreeList) { + this.putPageIntoFreeList = putPageIntoFreeList; + } + + /** {@inheritDoc} */ @Override public Integer run( int cacheId, long pageId, @@ -159,13 +173,15 @@ private final class WriteRowHandler extends PageHandler { written = (written == 0 && oldFreeSpace >= rowSize) ? addRow(pageId, page, pageAddr, io, row, rowSize) : addRowFragment(pageId, page, pageAddr, io, row, written, rowSize); - // Reread free space after update. - int newFreeSpace = io.getFreeSpace(pageAddr); + if (putPageIntoFreeList) { + // Reread free space after update. + int newFreeSpace = io.getFreeSpace(pageAddr); - if (newFreeSpace > MIN_PAGE_FREE_SPACE) { - int bucket = bucket(newFreeSpace, false); + if (newFreeSpace > MIN_PAGE_FREE_SPACE) { + int bucket = bucket(newFreeSpace, false); - put(null, pageId, page, pageAddr, bucket, statHolder); + put(null, pageId, page, pageAddr, bucket, statHolder); + } } if (written == rowSize) @@ -487,7 +503,7 @@ private long allocateDataPage(int part) throws IgniteCheckedException { long pageId = 0L; if (remaining < MIN_SIZE_FOR_DATA_PAGE) { - for (int b = bucket(remaining, false) + 1; b < BUCKETS - 1; b++) { + for (int b = bucket(remaining, false) + 1; b < REUSE_BUCKET; b++) { pageId = takeEmptyPage(b, row.ioVersions(), statHolder); if (pageId != 0L) @@ -510,7 +526,7 @@ private long allocateDataPage(int part) throws IgniteCheckedException { initIo = row.ioVersions().latest(); } else if (PageIdUtils.tag(pageId) != PageIdAllocator.FLAG_DATA) // Page is taken from reuse bucket. - pageId = initReusedPage(row, pageId, row.partition(), statHolder); + pageId = initReusedPage(row, pageId, statHolder); else // Page is taken from free space bucket. For in-memory mode partition must be changed. pageId = PageIdUtils.changePartitionId(pageId, (row.partition())); @@ -529,15 +545,182 @@ else if (PageIdUtils.tag(pageId) != PageIdAllocator.FLAG_DATA) // Page is taken } /** + * Reduces the workload on the free list by writing multiple rows into a single memory page at once.
+ *
+ * Rows are sequentially added to the page as long as there is enough free space on it. If the row is large then + * those fragments that occupy the whole memory page are written to other pages, and the remainder is added to the + * current one. + * + * @param rows Rows. + * @param statHolder Statistics holder to track IO operations. + * @throws IgniteCheckedException If failed. + */ + @Override public void insertDataRows(Collection rows, + IoStatisticsHolder statHolder) throws IgniteCheckedException { + try { + Iterator iter = rows.iterator(); + + T row = null; + + int written = COMPLETE; + + while (iter.hasNext() || written != COMPLETE) { + if (written == COMPLETE) { + row = iter.next(); + + // If the data row was completely written without remainder, proceed to the next. + if ((written = writeWholePages(row, statHolder)) == COMPLETE) + continue; + } + + int remaining = row.size() - written; + + long pageId = 0L; + + if (remaining < MIN_SIZE_FOR_DATA_PAGE) { + for (int b = bucket(remaining, false) + 1; b < REUSE_BUCKET; b++) { + pageId = takeEmptyPage(b, row.ioVersions(), statHolder); + + if (pageId != 0L) + break; + } + } + + if (pageId == 0L) { // Handle reuse bucket. + if (reuseList == this) + pageId = takeEmptyPage(REUSE_BUCKET, row.ioVersions(), statHolder); + else + pageId = reuseList.takeRecycledPage(); + } + + AbstractDataPageIO initIo = null; + + if (pageId == 0L) { + pageId = allocateDataPage(row.partition()); + + initIo = row.ioVersions().latest(); + } + else if (PageIdUtils.tag(pageId) != PageIdAllocator.FLAG_DATA) // Page is taken from reuse bucket. + pageId = initReusedPage(row, pageId, statHolder); + else // Page is taken from free space bucket. For in-memory mode partition must be changed. + pageId = PageIdUtils.changePartitionId(pageId, (row.partition())); + + // Acquire and lock page. + long page = acquirePage(pageId, statHolder); + + try { + long pageAddr = writeLock(pageId, page); + + assert pageAddr != 0; + + boolean dirty = false; + + try { + AbstractDataPageIO io = initIo == null ? PageIO.getPageIO(pageAddr) : initIo; + + // Fill the page up to the end. + while (iter.hasNext() || written != COMPLETE) { + if (written == COMPLETE) { + row = iter.next(); + + // If the data row was completely written without remainder, proceed to the next. + if ((written = writeWholePages(row, statHolder)) == COMPLETE) + continue; + + if (io.getFreeSpace(pageAddr) < row.size() - written) + break; + } + + written = PageHandler.writePage(pageMem, grpId, pageId, page, pageAddr, lockLsnr, + writeRowKeepPage, initIo, wal, null, row, written, statHolder); + + initIo = null; + + dirty = true; + + assert written == COMPLETE; + } + + int freeSpace = io.getFreeSpace(pageAddr); + + // Put page into the free list if needed. + if (freeSpace > MIN_PAGE_FREE_SPACE) { + int bucket = bucket(freeSpace, false); + + put(null, pageId, page, pageAddr, bucket, statHolder); + } + + assert PageIO.getCrc(pageAddr) == 0; //TODO GG-11480 + } + finally { + // Should always unlock data page after an update. + assert writeRowKeepPage.releaseAfterWrite(grpId, pageId, page, pageAddr, row, 0); + + writeUnlock(pageId, page, pageAddr, dirty); + } + } + finally { + releasePage(pageId, page); + } + } + } + catch (RuntimeException e) { + throw new CorruptedFreeListException("Failed to insert data rows", e); + } + } + + /** + * Write fragments of the row, which occupy the whole memory page. + * + * @param row Row to process. + * @param statHolder Statistics holder to track IO operations. + * @return Number of bytes written, {@link #COMPLETE} if the row was fully written. + * @throws IgniteCheckedException If failed. + */ + private int writeWholePages(T row, IoStatisticsHolder statHolder) throws IgniteCheckedException { + if (row.size() < MIN_SIZE_FOR_DATA_PAGE) + return 0; + + assert row.link() == 0 : row.link(); + + int written = 0; + + do { + long pageId = reuseList == this ? takeEmptyPage(REUSE_BUCKET, row.ioVersions(), statHolder) : + reuseList.takeRecycledPage(); + + AbstractDataPageIO initIo = null; + + if (pageId == 0L) { + pageId = allocateDataPage(row.partition()); + + initIo = row.ioVersions().latest(); + } + else if (PageIdUtils.tag(pageId) != PageIdAllocator.FLAG_DATA) // Page is taken from reuse bucket. + pageId = initReusedPage(row, pageId, statHolder); + else // Page is taken from free space bucket. For in-memory mode partition must be changed. + pageId = PageIdUtils.changePartitionId(pageId, (row.partition())); + + written = write(pageId, writeRow, initIo, row, written, FAIL_I, statHolder); + + assert written != FAIL_I; // We can't fail here. + + memMetrics.incrementLargeEntriesPages(); + } + while (row.size() - written >= MIN_SIZE_FOR_DATA_PAGE); + + return written; + } + + /** + * @param row Row. * @param reusedPageId Reused page id. - * @param partId Partition id. * @param statHolder Statistics holder to track IO operations. * @return Prepared page id. * * @see PagesList#initReusedPage(long, long, long, int, byte, PageIO) */ - private long initReusedPage(T row, long reusedPageId, int partId, - IoStatisticsHolder statHolder) throws IgniteCheckedException { + private long initReusedPage(T row, long reusedPageId, IoStatisticsHolder statHolder) throws IgniteCheckedException { long reusedPage = acquirePage(reusedPageId, statHolder); try { long reusedPageAddr = writeLock(reusedPageId, reusedPage); @@ -546,7 +729,7 @@ private long initReusedPage(T row, long reusedPageId, int partId, try { return initReusedPage(reusedPageId, reusedPage, reusedPageAddr, - partId, PageIdAllocator.FLAG_DATA, row.ioVersions().latest()); + row.partition(), PageIdAllocator.FLAG_DATA, row.ioVersions().latest()); } finally { writeUnlock(reusedPageId, reusedPage, reusedPageAddr, true); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java index e28d421bdf063c..cefde1cd346fae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.persistence.freelist; +import java.util.Collection; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.processors.cache.persistence.Storable; @@ -28,13 +29,22 @@ public interface FreeList { /** * @param row Row. + * @param statHolder Statistics holder to track IO operations. * @throws IgniteCheckedException If failed. */ public void insertDataRow(T row, IoStatisticsHolder statHolder) throws IgniteCheckedException; + /** + * @param rows Rows. + * @param statHolder Statistics holder to track IO operations. + * @throws IgniteCheckedException If failed. + */ + public void insertDataRows(Collection rows, IoStatisticsHolder statHolder) throws IgniteCheckedException; + /** * @param link Row link. * @param row New row data. + * @param statHolder Statistics holder to track IO operations. * @return {@code True} if was able to update row. * @throws IgniteCheckedException If failed. */ @@ -46,6 +56,7 @@ public interface FreeList { * @param arg Handler argument. * @param Argument type. * @param Result type. + * @param statHolder Statistics holder to track IO operations. * @return Result. * @throws IgniteCheckedException If failed. */ @@ -54,6 +65,7 @@ public R updateDataRow(long link, PageHandler pageHnd, S arg, IoSta /** * @param link Row link. + * @param statHolder Statistics holder to track IO operations. * @throws IgniteCheckedException If failed. */ public void removeDataRowByLink(long link, IoStatisticsHolder statHolder) throws IgniteCheckedException; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java index ff980af918c752..07158243d3e3c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java @@ -282,15 +282,8 @@ public static R writePage( boolean ok = false; try { - if (init != null) { - // It is a new page and we have to initialize it. - doInitPage(pageMem, grpId, pageId, page, pageAddr, init, wal); - walPlc = FALSE; - } - else - init = PageIO.getPageIO(pageAddr); - - R res = h.run(grpId, pageId, page, pageAddr, init, walPlc, arg, intArg, statHolder); + R res = writePage( + pageMem, grpId, pageId, page, pageAddr, lsnr, h, init, wal, walPlc, arg, intArg, statHolder); ok = true; @@ -309,6 +302,48 @@ public static R writePage( } } + /** + * @param pageMem Page memory. + * @param grpId Group ID. + * @param pageId Page ID. + * @param pageAddr Page address. + * @param lsnr Lock listener. + * @param h Handler. + * @param init IO for new page initialization or {@code null} if it is an existing page. + * @param wal Write ahead log. + * @param walPlc Full page WAL record policy. + * @param arg Argument. + * @param intArg Argument of type {@code int}. + * @param statHolder Statistics holder to track IO operations. + * @return Handler result. + * @throws IgniteCheckedException If failed. + */ + public static R writePage( + PageMemory pageMem, + int grpId, + long pageId, + long page, + long pageAddr, + PageLockListener lsnr, + PageHandler h, + PageIO init, + IgniteWriteAheadLogManager wal, + Boolean walPlc, + X arg, + int intArg, + IoStatisticsHolder statHolder + ) throws IgniteCheckedException { + if (init != null) { + // It is a new page and we have to initialize it. + doInitPage(pageMem, grpId, pageId, page, pageAddr, init, wal); + walPlc = FALSE; + } + else + init = PageIO.getPageIO(pageAddr); + + return h.run(grpId, pageId, page, pageAddr, init, walPlc, arg, intArg, statHolder); + } + /** * @param pageMem Page memory. * @param grpId Group ID. diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index 8d1ab878f2970d..f6900b8401ad6c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -711,7 +711,8 @@ void recheckLock() { boolean preload, AffinityTopologyVersion topVer, GridDrType drType, - boolean fromStore + boolean fromStore, + CacheDataRow row ) throws IgniteCheckedException, GridCacheEntryRemovedException { assert false; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MemoryLeakAfterRebalanceSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MemoryLeakAfterRebalanceSelfTest.java new file mode 100644 index 00000000000000..19809d674d46ea --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MemoryLeakAfterRebalanceSelfTest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.pagemem; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import javax.cache.Cache; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; +import org.apache.ignite.internal.util.lang.GridCloseableIterator; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * Check page memory consistency after preloading. + */ +@RunWith(Parameterized.class) +public class MemoryLeakAfterRebalanceSelfTest extends GridCommonAbstractTest { + /** */ + @Parameterized.Parameter + public CacheAtomicityMode cacheAtomicityMode; + + /** */ + @Parameterized.Parameters(name = " [atomicity={0}]") + public static Iterable parameters() { + return Arrays.asList(new Object[][] {{CacheAtomicityMode.ATOMIC}, {CacheAtomicityMode.TRANSACTIONAL}}); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); + + cfg.setDataStorageConfiguration(new DataStorageConfiguration(). + setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setMaxSize(200 * 1024 * 1024) + .setPersistenceEnabled(true))); + + CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); + + ccfg.setCacheMode(CacheMode.REPLICATED); + ccfg.setAtomicityMode(cacheAtomicityMode); + ccfg.setAffinity(new RendezvousAffinityFunction(false, 16)); + + cfg.setCacheConfiguration(ccfg); + + cfg.setRebalanceThreadPoolSize(4); + + return cfg; + } + + /** Initialization. */ + @Before + public void before() throws Exception { + cleanPersistenceDir(); + } + + /** Clean up. */ + @After + public void after() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testPreloadingWithConcurrentUpdates() throws Exception { + int size = GridTestUtils.SF.applyLB(500_000, 5_000); + + // Prepare data. + Map data = new HashMap<>(U.capacity(size)); + + for (int i = 0; i < size; i++) + data.put(i, i + " v.1"); + + // Start 1 node. + Ignite node0 = startGrid(0); + + node0.cluster().active(true); + + node0.cluster().baselineAutoAdjustTimeout(0); + + IgniteCache cache0 = node0.cache(DEFAULT_CACHE_NAME); + + // Load data. + cache0.putAll(data); + + TestRecordingCommunicationSpi.spi(node0) + .blockMessages((node, msg) -> + msg instanceof GridDhtPartitionSupplyMessage + && ((GridCacheGroupIdMessage)msg).groupId() == groupIdForCache(node0, DEFAULT_CACHE_NAME) + ); + + // Start 2 node. + IgniteEx node1 = startGrid(1); + + TestRecordingCommunicationSpi.spi(node0).waitForBlocked(); + + // Simulate concurrent updates when preloading. + for (int i = 0; i < size; i += 10) { + String val = i + " v.2"; + + cache0.put(i, val); + data.put(i, val); + + // Start preloading. + if (i == size / 2) + TestRecordingCommunicationSpi.spi(node0).stopBlock(); + } + + awaitPartitionMapExchange(); + + // Stop node 1. + stopGrid(0); + + awaitPartitionMapExchange(); + + IgniteInternalCache cache1 = node1.cachex(DEFAULT_CACHE_NAME); + + assertEquals(data.size(), cache1.size()); + + GridCacheContext cctx = cache1.context(); + + // Make sure that there are no duplicate entries on the data pages in the pages memory. + try (GridCloseableIterator> iter = cctx.offheap().cacheEntriesIterator( + cctx, + true, + true, + cctx.topology().readyTopologyVersion(), + false, + null, + true)) { + + while (iter.hasNext()) { + Cache.Entry entry = iter.next(); + + Integer key = entry.getKey(); + + String exp = data.remove(key); + + assertEquals(exp, entry.getValue()); + } + } + + assertTrue(data.isEmpty()); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java index b4089e9e842bdc..7460b8a3fa15ed 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java @@ -18,9 +18,11 @@ package org.apache.ignite.internal.processors.database; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; @@ -65,6 +67,9 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest { /** */ private static final long MB = 1024L * 1024L; + /** */ + private static final int BATCH_SIZE = 100; + /** */ private PageMemory pageMem; @@ -78,6 +83,46 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest { pageMem = null; } + /** + * @throws Exception if failed. + */ + @Test + public void testInsertDeleteSingleThreaded_batched_1024() throws Exception { + checkInsertDeleteSingleThreaded(1024, true); + } + + /** + * @throws Exception if failed. + */ + @Test + public void testInsertDeleteSingleThreaded_batched_2048() throws Exception { + checkInsertDeleteSingleThreaded(2048, true); + } + + /** + * @throws Exception if failed. + */ + @Test + public void testInsertDeleteSingleThreaded_batched_4096() throws Exception { + checkInsertDeleteSingleThreaded(4096, true); + } + + /** + * @throws Exception if failed. + */ + @Test + public void testInsertDeleteSingleThreaded_batched_8192() throws Exception { + checkInsertDeleteSingleThreaded(8192, true); + } + + /** + * @throws Exception if failed. + */ + @Test + public void testInsertDeleteSingleThreaded_batched_16384() throws Exception { + checkInsertDeleteSingleThreaded(16384, true); + } + /** * @throws Exception if failed. */ @@ -158,11 +203,60 @@ public void testInsertDeleteMultiThreaded_16384() throws Exception { checkInsertDeleteMultiThreaded(16384); } + /** + * @throws Exception if failed. + */ + @Test + public void testInsertDeleteMultiThreaded_batched_1024() throws Exception { + checkInsertDeleteMultiThreaded(1024, true); + } + + /** + * @throws Exception if failed. + */ + @Test + public void testInsertDeleteMultiThreaded_batched_2048() throws Exception { + checkInsertDeleteMultiThreaded(2048, true); + } + + /** + * @throws Exception if failed. + */ + @Test + public void testInsertDeleteMultiThreaded_batched_4096() throws Exception { + checkInsertDeleteMultiThreaded(4096, true); + } + + /** + * @throws Exception if failed. + */ + @Test + public void testInsertDeleteMultiThreaded_batched_8192() throws Exception { + checkInsertDeleteMultiThreaded(8192, true); + } + + /** + * @throws Exception if failed. + */ + @Test + public void testInsertDeleteMultiThreaded_batched_16384() throws Exception { + checkInsertDeleteMultiThreaded(16384, true); + } + + /** + * @param pageSize Page size. + * @throws Exception If failed. + */ + protected void checkInsertDeleteMultiThreaded(int pageSize) throws Exception { + checkInsertDeleteMultiThreaded(pageSize, false); + } + /** * @param pageSize Page size. + * @param batched Batch mode flag. * @throws Exception If failed. */ - protected void checkInsertDeleteMultiThreaded(final int pageSize) throws Exception { + protected void checkInsertDeleteMultiThreaded(final int pageSize, final boolean batched) throws Exception { final FreeList list = createFreeList(pageSize); Random rnd = new Random(); @@ -188,6 +282,8 @@ protected void checkInsertDeleteMultiThreaded(final int pageSize) throws Excepti GridTestUtils.runMultiThreaded(new Callable() { @Override public Object call() throws Exception { + List rows = new ArrayList<>(BATCH_SIZE); + Random rnd = ThreadLocalRandom.current(); for (int i = 0; i < 200_000; i++) { @@ -218,16 +314,34 @@ protected void checkInsertDeleteMultiThreaded(final int pageSize) throws Excepti TestDataRow row = new TestDataRow(keySize, valSize); - list.insertDataRow(row, IoStatisticsHolderNoOp.INSTANCE); + if (batched) + rows.add(row); + else { + list.insertDataRow(row, IoStatisticsHolderNoOp.INSTANCE); - assertTrue(row.link() != 0L); + assertTrue(row.link() != 0L); - TestDataRow old = stored.put(row.link(), row); + TestDataRow old = stored.put(row.link(), row); - assertNull(old); + assertNull(old); + } + + if (rows.size() == BATCH_SIZE) { + list.insertDataRows(rows, IoStatisticsHolderNoOp.INSTANCE); + + for (TestDataRow row0 : rows) { + assertTrue(row0.link() != 0L); + + TestDataRow old = stored.put(row0.link(), row0); + + assertNull(old); + } + + rows.clear(); + } } else { - while (true) { + while (!stored.isEmpty()) { Iterator it = stored.values().iterator(); if (it.hasNext()) { @@ -251,9 +365,19 @@ protected void checkInsertDeleteMultiThreaded(final int pageSize) throws Excepti } /** - * @throws Exception if failed. + * @param pageSize Page size. + * @throws Exception If failed. */ protected void checkInsertDeleteSingleThreaded(int pageSize) throws Exception { + checkInsertDeleteSingleThreaded(pageSize, false); + } + + /** + * @param pageSize Page size. + * @param batched Batch mode flag. + * @throws Exception if failed. + */ + protected void checkInsertDeleteSingleThreaded(int pageSize, boolean batched) throws Exception { FreeList list = createFreeList(pageSize); Random rnd = new Random(); @@ -277,6 +401,8 @@ protected void checkInsertDeleteSingleThreaded(int pageSize) throws Exception { boolean grow = true; + List rows = new ArrayList<>(BATCH_SIZE); + for (int i = 0; i < 1_000_000; i++) { if (grow) { if (stored.size() > 20_000) { @@ -301,13 +427,31 @@ protected void checkInsertDeleteSingleThreaded(int pageSize) throws Exception { TestDataRow row = new TestDataRow(keySize, valSize); - list.insertDataRow(row, IoStatisticsHolderNoOp.INSTANCE); + if (batched) + rows.add(row); + else { + list.insertDataRow(row, IoStatisticsHolderNoOp.INSTANCE); + + assertTrue(row.link() != 0L); + + TestDataRow old = stored.put(row.link(), row); + + assertNull(old); + } + + if (rows.size() == BATCH_SIZE) { + list.insertDataRows(rows, IoStatisticsHolderNoOp.INSTANCE); + + for (TestDataRow row0 : rows) { + assertTrue(row0.link() != 0L); - assertTrue(row.link() != 0L); + TestDataRow old = stored.put(row0.link(), row0); - TestDataRow old = stored.put(row.link(), row); + assertNull(old); + } - assertNull(old); + rows.clear(); + } } else { Iterator it = stored.values().iterator(); diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite.java index 220c41ff9d6200..12efe5302489c1 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.persistence.pagemem.BPlusTreeReuseListPageMemoryImplTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.FillFactorMetricTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.IndexStoragePageMemoryImplTest; +import org.apache.ignite.internal.processors.cache.persistence.pagemem.MemoryLeakAfterRebalanceSelfTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImplNoLoadTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImplTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryNoStoreLeakTest; @@ -83,6 +84,8 @@ public static List> suite() { ignoredTests.add(IgnitePdsDestroyCacheTest.class); ignoredTests.add(IgnitePdsDestroyCacheWithoutCheckpointsTest.class); + ignoredTests.add(MemoryLeakAfterRebalanceSelfTest.class); + return new ArrayList<>(IgnitePdsTestSuite.suite(ignoredTests)); } } diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java index f865bb8363967e..74e763b82c3781 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.persistence.pagemem.BPlusTreeReuseListPageMemoryImplTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.FillFactorMetricTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.IndexStoragePageMemoryImplTest; +import org.apache.ignite.internal.processors.cache.persistence.pagemem.MemoryLeakAfterRebalanceSelfTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImplNoLoadTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImplTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryLazyAllocationTest; @@ -98,6 +99,8 @@ public static List> suite(Collection ignoredTests) { //GridTestUtils.addTestIfNeeded(suite, PageIdDistributionTest.class, ignoredTests); //GridTestUtils.addTestIfNeeded(suite, TrackingPageIOTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, MemoryLeakAfterRebalanceSelfTest.class, ignoredTests); + // BTree tests with store page memory. GridTestUtils.addTestIfNeeded(suite, BPlusTreePageMemoryImplTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, BPlusTreeReuseListPageMemoryImplTest.class, ignoredTests);