diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/pagemem/JmhBatchUpdatesInPreloadBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/pagemem/JmhBatchUpdatesInPreloadBenchmark.java new file mode 100644 index 0000000000000..a25571a206215 --- /dev/null +++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/pagemem/JmhBatchUpdatesInPreloadBenchmark.java @@ -0,0 +1,544 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.benchmarks.jmh.pagemem; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; +import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; +import org.apache.ignite.logger.NullLogger; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +/** + * Batch updates in pagemem through preloader. + * + * todo benchmark for internal testing purposes. + */ +@BenchmarkMode(Mode.AverageTime) +@Fork(value = 1, jvmArgsAppend = {"-Xms3g", "-Xmx3g", "-server", "-XX:+AggressiveOpts", "-XX:MaxMetaspaceSize=256m"}) +@Measurement(iterations = 5) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@State(Scope.Benchmark) +@Threads(1) +@Warmup(iterations = 10) +public class JmhBatchUpdatesInPreloadBenchmark { + /** */ + private static final long DEF_REG_SIZE = 20 * 1024 * 1024 * 1024L; + + /** */ + private static final int BATCH_SIZE = 500; + + /** */ + private static final String REG_BATCH = "batch-region"; + + /** */ + private static final String REG_SINGLE = "single-region"; + + /** */ + private static final String CACHE_BATCH = "batch"; + + /** */ + private static final String CACHE_SINGLE = "single"; + + /** */ + private static final String NODE_NAME = "srv0"; + + /** */ + private static int iteration = 0; + + /** */ + public enum OBJECT_SIZE_RANGE { + /** */ + r0_4(0, 4), + + /** */ + r4_16(4, 16), + + /** */ + r16_64(16, 64), + + /** */ + r100_200(100, 200), + + /** */ + r200_500(200, 500), + + /** */ + r500_800(500, 800), + + /** */ + r800_1200(800, 1200), + + /** */ + r2000_3000(2_000, 3_000), + + /** */ + r1000_8000(1_000, 8_000), + + /** Large objects only. */ + r4000_16000(4_000, 16_000), + + /** Mixed objects, mostly large objects. */ + r100_32000(100, 32_000); + + /** */ + private final int min; + + /** */ + private final int max; + + /** */ + OBJECT_SIZE_RANGE(int min, int max) { + this.min = min; + this.max = max; + } + } + + /** + * Create Ignite configuration. + * + * @return Ignite configuration. + */ + private IgniteConfiguration getConfiguration(String cfgName) { + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setGridLogger(new NullLogger()); + + cfg.setIgniteInstanceName(cfgName); + + DataRegionConfiguration reg1 = new DataRegionConfiguration(); + reg1.setInitialSize(DEF_REG_SIZE); + reg1.setMaxSize(DEF_REG_SIZE); + reg1.setName(REG_BATCH); + + DataRegionConfiguration reg2 = new DataRegionConfiguration(); + reg2.setInitialSize(DEF_REG_SIZE); + reg2.setMaxSize(DEF_REG_SIZE); + reg2.setName(REG_SINGLE); + + DataStorageConfiguration storeCfg = new DataStorageConfiguration(); + + storeCfg.setDataRegionConfigurations(reg1, reg2); + + cfg.setDataStorageConfiguration(storeCfg); + + cfg.setCacheConfiguration(ccfg(false), ccfg(true)); + + return cfg; + } + + /** + * @return Cache configuration. + */ + private CacheConfiguration ccfg(boolean batch) { + return new CacheConfiguration(batch ? CACHE_BATCH : CACHE_SINGLE) + .setAffinity(new RendezvousAffinityFunction(false, 1)) + .setCacheMode(CacheMode.REPLICATED) + .setAtomicityMode(CacheAtomicityMode.ATOMIC) + .setDataRegionName(batch ? REG_BATCH : REG_SINGLE); + } + + /** + * Test single updates. + * + * @param data Data that will be preloaded. + * @param preloader Data preloader. + */ + @Benchmark + @Fork(jvmArgsAppend = "-D" + IgniteSystemProperties.IGNITE_DATA_STORAGE_BATCH_PAGE_WRITE + "=false") + public void checkSingle(Data data, Preloader preloader) { + preloader.demanderSingle.handleSupplyMessage(0, data.node.localNode().id(), data.singleData); + } + + /** + * Test batch updates. + * + * @param data Data that will be preloaded. + * @param preloader Data preloader. + */ + @Benchmark + @Fork(jvmArgsAppend = "-D" + IgniteSystemProperties.IGNITE_DATA_STORAGE_BATCH_PAGE_WRITE + "=true") + public void checkBatch(Data data, Preloader preloader) { + preloader.demanderBatch.handleSupplyMessage(0, data.node.localNode().id(), data.batchData); + } + + /** + * Start 2 servers and 1 client. + */ + @Setup(Level.Trial) + public void setup() { + IgniteEx node = (IgniteEx)Ignition.start(getConfiguration(NODE_NAME)); + + partitionState(node, CACHE_BATCH, GridDhtPartitionState.MOVING); + partitionState(node, CACHE_SINGLE, GridDhtPartitionState.MOVING); + } + + /** */ + private void partitionState(IgniteEx node, String name, GridDhtPartitionState state) { + for (GridDhtLocalPartition part : node.cachex(name).context().group().topology().localPartitions()) + part.setState(state); + } + + /** + * Stop all grids after tests. + */ + @TearDown(Level.Trial) + public void tearDown() { + Ignition.stopAll(true); + } + + /** + * Create streamer on client cache. + */ + @State(Scope.Benchmark) + public static class Preloader { + /** */ + final GridDhtPartitionDemander demanderBatch = demander(CACHE_BATCH); + + /** */ + final GridDhtPartitionDemander demanderSingle = demander(CACHE_SINGLE); + + /** */ + GridDhtPartitionDemander demander(String name) { + try { + GridCacheContext cctx = ((IgniteEx)Ignition.ignite(NODE_NAME)).cachex(name).context(); + GridDhtPreloader preloader = (GridDhtPreloader)cctx.group().preloader(); + AffinityTopologyVersion topVer = cctx.group().affinity().lastVersion(); + + GridDhtPartitionDemander.RebalanceFuture fut = + newInstance(GridDhtPartitionDemander.RebalanceFuture.class); + + setFieldValue(fut, "rebalanceId", 0); + setFieldValue(fut, "topVer", topVer); + + GridDhtPartitionDemander demander = getFieldValue(preloader, "demander"); + + setFieldValue(demander, "rebalanceFut", fut); + + setFieldValue(cctx.shared().exchange(), "rebTopVer", topVer); + + return demander; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + /** + * Prepare and clean collection with streaming data. + */ + @State(Scope.Thread) + public static class Data { + /** */ + @Param + private OBJECT_SIZE_RANGE range; + + /** */ + private int[] sizes; + + /** */ + private GridDhtPartitionSupplyMessage batchData; + + /** */ + private GridDhtPartitionSupplyMessage singleData; + + /** */ + private GridCacheContext cctxBatch = ((IgniteEx)Ignition.ignite(NODE_NAME)).cachex(CACHE_BATCH).context(); + + /** */ + private GridCacheContext cctxSingle = ((IgniteEx)Ignition.ignite(NODE_NAME)).cachex(CACHE_SINGLE).context(); + + /** */ + private IgniteEx node = ((IgniteEx)Ignition.ignite(NODE_NAME)); + + /** */ + int part = 0; + + /** */ + @Setup(Level.Trial) + public void setup() { + sizes = sizes(range.min, range.max, BATCH_SIZE); + } + + /** + * Prepare collection. + */ + @Setup(Level.Invocation) + public void prepare() { + int iter = iteration++; + int off = iter * BATCH_SIZE; + int rebalanceId = 0; + + batchData = prepareSupplyMessage(cctxBatch, rebalanceId, part, off, BATCH_SIZE, sizes); + singleData = prepareSupplyMessage(cctxSingle, rebalanceId, part, off, BATCH_SIZE, sizes); + } + + /** + * Clean collection after each test. + */ + @TearDown(Level.Iteration) + public void cleanCollection() { + batchData = null; + singleData = null; + } + + /** */ + int[] sizes(int minObjSize, int maxObjSize, int batchSize) { + int sizes[] = new int[batchSize]; + int minSize = maxObjSize; + int maxSize = minObjSize; + + int delta = maxObjSize - minObjSize; + + for (int i = 0; i < batchSize; i++) { + int size = sizes[i] = minObjSize + (delta > 0 ? ThreadLocalRandom.current().nextInt(delta) : 0); + + if (size < minSize) + minSize = size; + + if (size > maxSize) + maxSize = size; + } + + return sizes; + } + + /** */ + private GridDhtPartitionSupplyMessage prepareSupplyMessage( + GridCacheContext cctx, + int rebalanceId, + int p, + int off, + int cnt, + int[] sizes + ) { + GridDhtPartitionSupplyMessage msg = newInstance(GridDhtPartitionSupplyMessage.class); + + setFieldValue(msg, GridCacheGroupIdMessage.class, "grpId", cctx.group().groupId()); + setFieldValue(msg, "rebalanceId", rebalanceId); + setFieldValue(msg, "topVer", cctx.group().affinity().lastVersion()); + + List infos = new ArrayList<>(); + + for (int i = off; i < off + cnt; i++) { + int size = sizes[i - off]; + + KeyCacheObject key = cctx.toCacheKeyObject(i); + CacheObject val = cctx.toCacheObject(new byte[size]); + + GridCacheEntryInfo info = new GridCacheEntryInfo(); + info.key(key); + info.value(val); + info.cacheId(cctx.cacheId()); + info.version(cctx.shared().versions().startVersion()); + + infos.add(info); + } + + Map map = new HashMap<>(); + + map.put(p, new CacheEntryInfoCollection(infos)); + + setFieldValue(msg, "infos", map); + + return msg; + } + } + + /** */ + private static T newInstance(Class clazz) { + Constructor constructors[] = clazz.getDeclaredConstructors(); + + try { + for (Constructor constructor : constructors) { + if (constructor.getParameterTypes().length == 0) { + constructor.setAccessible(true); + + return (T)constructor.newInstance(); + } + } + } + catch (IllegalAccessException | InstantiationException | InvocationTargetException e) { + throw new RuntimeException(e); + } + + throw new RuntimeException("No default constructor"); + } + + /** + * Get object field value via reflection. + * + * @param obj Object or class to get field value from. + * @param fieldNames Field names to get value for: obj->field1->field2->...->fieldN. + * @param Expected field class. + * @return Field value. + * @throws IgniteException In case of error. + */ + private static T getFieldValue(Object obj, String... fieldNames) throws IgniteException { + assert obj != null; + assert fieldNames != null; + assert fieldNames.length >= 1; + + try { + for (String fieldName : fieldNames) { + Class cls = obj instanceof Class ? (Class)obj : obj.getClass(); + + try { + obj = findField(cls, obj, fieldName); + } + catch (NoSuchFieldException e) { + throw new RuntimeException(e); + } + } + + return (T)obj; + } + catch (IllegalAccessException e) { + throw new IgniteException("Failed to get object field [obj=" + obj + + ", fieldNames=" + Arrays.toString(fieldNames) + ']', e); + } + } + + /** + * @param cls Class for searching. + * @param obj Target object. + * @param fieldName Field name for search. + * @return Field from object if it was found. + */ + private static Object findField(Class cls, Object obj, + String fieldName) throws NoSuchFieldException, IllegalAccessException { + // Resolve inner field. + Field field = cls.getDeclaredField(fieldName); + + boolean accessible = field.isAccessible(); + + if (!accessible) + field.setAccessible(true); + + return field.get(obj); + } + + /** + * Set object field value via reflection. + * + * @param obj Object to set field value to. + * @param fieldName Field name to set value for. + * @param val New field value. + * @throws IgniteException In case of error. + */ + private static void setFieldValue(Object obj, String fieldName, Object val) throws IgniteException { + setFieldValue(obj, obj.getClass(), fieldName, val); + } + + /** + * Set object field value via reflection. + * + * @param obj Object to set field value to. + * @param cls Class to get field from. + * @param fieldName Field name to set value for. + * @param val New field value. + * @throws IgniteException In case of error. + */ + private static void setFieldValue(Object obj, Class cls, String fieldName, Object val) throws IgniteException { + assert fieldName != null; + + try { + Field field = cls.getDeclaredField(fieldName); + + boolean accessible = field.isAccessible(); + + if (!accessible) + field.setAccessible(true); + + boolean isFinal = (field.getModifiers() & Modifier.FINAL) != 0; + + if (isFinal) { + Field modifiersField = Field.class.getDeclaredField("modifiers"); + + modifiersField.setAccessible(true); + + modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); + } + + field.set(obj, val); + } + catch (NoSuchFieldException | IllegalAccessException e) { + throw new IgniteException("Failed to set object field [obj=" + obj + ", field=" + fieldName + ']', e); + } + } + + /** + * Run benchmark. + * + * @param args Args. + */ + public static void main(String[] args) throws RunnerException { + final Options options = new OptionsBuilder() + .include(JmhBatchUpdatesInPreloadBenchmark.class.getSimpleName()) + .build(); + + new Runner(options).run(); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index f6eb650a092ad..8535e3763390b 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -1097,6 +1097,9 @@ public final class IgniteSystemProperties { */ public static final String IGNITE_DISCOVERY_DISABLE_CACHE_METRICS_UPDATE = "IGNITE_DISCOVERY_DISABLE_CACHE_METRICS_UPDATE"; + /** */ + public static final String IGNITE_DATA_STORAGE_BATCH_PAGE_WRITE = "IGNITE_DATA_STORAGE_BATCH_PAGE_WRITE"; + /** * Maximum number of different partitions to be extracted from between expression within sql query. * In case of limit exceeding all partitions will be used. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryInitialValuesBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryInitialValuesBatch.java new file mode 100644 index 0000000000000..bbe67006266f8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryInitialValuesBatch.java @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.wal.record.DataEntry; +import org.apache.ignite.internal.pagemem.wal.record.DataRecord; +import org.apache.ignite.internal.pagemem.wal.record.MvccDataEntry; +import org.apache.ignite.internal.pagemem.wal.record.MvccDataRecord; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils; +import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.dr.GridDrType; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.processors.cache.GridCacheMapEntry.ATOMIC_VER_COMPARATOR; +import static org.apache.ignite.internal.processors.cache.GridCacheMapEntry.IS_UNSWAPPED_MASK; +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; + +/** */ +public class CacheEntryInitialValuesBatch { + /** */ + private final GridCacheContext cctx; + + /** */ + private final GridDhtLocalPartition part; + + /** */ + public CacheEntryInitialValuesBatch(GridCacheContext cctx, int partId) { + this.cctx = cctx; + + part = cctx.topology().localPartition(partId, + cctx.topology().readyTopologyVersion(), true, true); + } + + /** */ + private List initialValues = new ArrayList<>(1); + + /** */ + class InitialValue extends GridCacheEntryInfo { + private final GridCacheMapEntry entry; +// private CacheObject val; +// private final GridCacheVersion ver; + // todo unable to use version aware + private final MvccVersion mvccVer; + private final MvccVersion newMvccVer; +// private final long ttl; +// private final long expireTime; + private final boolean preload; + private final AffinityTopologyVersion topVer; + private final GridDrType drType; + private final boolean fromStore; + + private Runnable unlockCb; + private boolean update; +// private boolean obsolete; + private long expTime; + private IgnitePredicate p; + + public InitialValue( + GridCacheMapEntry entry, + CacheObject val, + GridCacheVersion ver, + MvccVersion mvccVer, + MvccVersion newMvccVer, + long ttl, + long expireTime, + boolean preload, + AffinityTopologyVersion topVer, + GridDrType drType, + boolean fromStore + ) { + this.entry = entry; + + key(entry.key); + value(val); + version(ver); + ttl(ttl); + expireTime(expireTime); + + this.mvccVer = mvccVer; + + this.newMvccVer = newMvccVer; + + this.preload = preload; + this.topVer = topVer; + this.drType = drType; + this.fromStore = fromStore; + } + } + + /** */ + public CacheEntryInitialValuesBatch add( + GridCacheMapEntry entry, + CacheObject val, + GridCacheVersion ver, + MvccVersion mvccVer, + MvccVersion newMvccVer, + long ttl, + long expireTime, + boolean preload, + AffinityTopologyVersion topVer, + GridDrType drType, + boolean fromStore + ) { + initialValues.add(new InitialValue(entry, + val, + ver, + mvccVer, + newMvccVer, + ttl, + expireTime, + preload, + topVer, + drType, + fromStore)); + + return this; + } + + /** */ + public int initValues() throws IgniteCheckedException { + int initCnt = 0; + + cctx.shared().database().ensureFreeSpace(cctx.dataRegion()); + + cctx.group().listenerLock().readLock().lock(); + + boolean mvcc = cctx.mvccEnabled(); + + try { + Set skipped = new HashSet<>(); + + // lock stage + for (int i = 0; i < initialValues.size(); i++) { + InitialValue v = initialValues.get(i); + + v.entry.lockEntry(); + + v.expTime = v.expireTime() < 0 ? CU.toExpireTime(v.ttl()) : v.expireTime(); + + try { + if (!(v.update = prepareInitialValue(v))) + v.p = new InitialValuePredicate(v.entry, v.version(), v.preload); + else + if (mvcc) { + assert !v.preload; + + cctx.offheap().mvccInitialValue(v.entry, v.value(), v.version(), v.expTime, v.mvccVer, v.newMvccVer); + } + } + catch (GridCacheEntryRemovedException e) { + skipped.add(v.entry.key); + } + } + + if (!mvcc) { + if (initialValues.size() == 1) { + InitialValue v = initialValues.get(0); + + boolean update0 = v.entry.storeValue(v.value(), v.expTime, v.version(), v.p); + + if (update0 && v.p != null) + v.update = update0; + } + else { + List infos = new ArrayList<>(initialValues.size()); + + for (int i = 0; i < initialValues.size(); i++) { + InitialValue v = initialValues.get(i); + + if (!skipped.contains(v.entry.key)) + infos.add(v); + } + + cctx.offheap().updateAll(cctx, part, infos, (r, i) -> { + InitialValue iv = ((InitialValue)i); + + if (iv.p != null) + iv.update = iv.p.apply(r); + + return iv.p == null || iv.update; + }); + } + + } + + for (int i = 0; i < initialValues.size(); i++) { + InitialValue v = initialValues.get(i); + + if (v.update) { + finishInitialUpdate(v.entry, v.value(), v.expireTime(), v.ttl(), v.version(), v.topVer, v.drType, v.mvccVer, v.preload, v.fromStore); + + ++initCnt; + } + } + } finally { + for (InitialValue val : initialValues) { + val.entry.unlockEntry(); + + if (val.unlockCb != null) + val.unlockCb.run(); + } + + cctx.group().listenerLock().readLock().unlock(); + } + + return initCnt; + } + + /** */ + private boolean prepareInitialValue(InitialValue iv) throws IgniteCheckedException, GridCacheEntryRemovedException { + GridCacheMapEntry entry = iv.entry; + CacheObject val = iv.value(); + GridCacheVersion ver = iv.version(); + boolean preload = iv.preload; + + entry.checkObsolete(); + + iv.value(cctx.kernalContext().cacheObjects().prepareForCache(val, cctx)); + + final boolean unswapped = ((entry.flags & IS_UNSWAPPED_MASK) != 0); + + boolean update = false; + + if (unswapped || cctx.mvccEnabled()) { + if (!unswapped) + entry.unswap(false); + + if (update = new InitialValuePredicate(entry, ver, preload).apply(null)) { + // If entry is already unswapped and we are modifying it, we must run deletion callbacks for old value. + long oldExpTime = entry.expireTimeUnlocked(); + + if (oldExpTime > 0 && oldExpTime < U.currentTimeMillis()) { + if (entry.onExpired(entry.val, null)) { + if (cctx.deferredDelete()) { + final GridCacheVersion oldVer = entry.ver; + + iv.unlockCb = () -> cctx.onDeferredDelete(entry, oldVer); + } + else if (val == null) { + iv.unlockCb = () -> { + entry.onMarkedObsolete(); + + cctx.cache().removeEntry(entry); + }; + } + } + } + } + } + + return update; + } + + /** + * todo explain this and remove code duplication + * @param val New value. + * @param expireTime Expiration time. + * @param ttl Time to live. + * @param ver Version to use. + * @param topVer Topology version. + * @param drType DR type. + * @param mvccVer Mvcc version. + * @param preload Flag indicating whether entry is being preloaded. + * @throws IgniteCheckedException In case of error. + */ + protected void finishInitialUpdate( + GridCacheMapEntry entry, + @Nullable CacheObject val, + long expireTime, + long ttl, + GridCacheVersion ver, + AffinityTopologyVersion topVer, + GridDrType drType, + MvccVersion mvccVer, + boolean preload, + boolean fromStore + ) throws IgniteCheckedException { + boolean walEnabled = !cctx.isNear() && cctx.group().persistenceEnabled() && cctx.group().walEnabled(); + + entry.update(val, expireTime, ttl, ver, true); + + boolean skipQryNtf = false; + + if (val == null) { + skipQryNtf = true; + + if (cctx.deferredDelete() && !entry.deletedUnlocked() && !entry.isInternal()) + entry.deletedUnlocked(true); + } + else if (entry.deletedUnlocked()) + entry.deletedUnlocked(false); + + long updateCntr = 0; + + if (!preload) + // todo update counters is not applicable to cache entry and should be moved + updateCntr = entry.nextPartitionCounter(topVer, true, null); + + if (walEnabled) { + if (cctx.mvccEnabled()) { + cctx.shared().wal().log(new MvccDataRecord(new MvccDataEntry( + cctx.cacheId(), + entry.key, + val, + val == null ? DELETE : GridCacheOperation.CREATE, + null, + ver, + expireTime, + entry.partition(), + updateCntr, + mvccVer == null ? MvccUtils.INITIAL_VERSION : mvccVer + ))); + } else { + cctx.shared().wal().log(new DataRecord(new DataEntry( + cctx.cacheId(), + entry.key, + val, + val == null ? DELETE : GridCacheOperation.CREATE, + null, + ver, + expireTime, + entry.partition(), + updateCntr + ))); + } + } + + entry.drReplicate(drType, val, ver, topVer); + + if (!skipQryNtf) { + cctx.continuousQueries().onEntryUpdated( + entry.key, + val, + null, + entry.isInternal() || !entry.context().userCache(), + entry.partition(), + true, + preload, + updateCntr, + null, + topVer); + } + + entry.onUpdateFinished(updateCntr); + + if (!fromStore && cctx.store().isLocal()) { + if (val != null) + cctx.store().put(null, entry.key, val, ver); + } + } + + + /** */ + protected static class InitialValuePredicate implements IgnitePredicate { + /** */ + private final GridCacheMapEntry entry;; + + /** */ + private final boolean preload; + + /** */ + private final GridCacheVersion newVer; + + /** */ + InitialValuePredicate(GridCacheMapEntry entry, GridCacheVersion newVer, boolean preload) { + this.entry = entry; + this.preload = preload; + this.newVer = newVer; + } + + /** {@inheritDoc} */ + @Override public boolean apply(@Nullable CacheDataRow row) { + boolean update0; + + GridCacheVersion currentVer = row != null ? row.version() : entry.ver; + + GridCacheContext cctx = entry.cctx; + + boolean isStartVer = cctx.shared().versions().isStartVersion(currentVer); + + if (cctx.group().persistenceEnabled()) { + if (!isStartVer) { + if (cctx.atomic()) + update0 = ATOMIC_VER_COMPARATOR.compare(currentVer, newVer) < 0; + else + update0 = currentVer.compareTo(newVer) < 0; + } + else + update0 = true; + } + else + update0 = isStartVer; + + update0 |= (!preload && entry.deletedUnlocked()); + + return update0; + } + }; +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMapEntries.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMapEntries.java new file mode 100644 index 0000000000000..8e7788b8da528 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMapEntries.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.dr.GridDrType; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgnitePredicate; + +import static org.apache.ignite.internal.processors.cache.GridCacheMapEntry.ATOMIC_VER_COMPARATOR; + +/** + * Batch of cache map entries. + */ +public class CacheMapEntries { + /** + * + */ + private static class GridCacheEntryInfoEx extends GridCacheEntryInfo { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final GridCacheEntryInfo delegate; + + /** */ + private GridDhtCacheEntry cacheEntry; + + /** */ + private boolean update; + + /** */ + private GridCacheEntryInfoEx(GridCacheEntryInfo info) { + delegate = info; + } + + /** + * @return Key. + */ + @Override public KeyCacheObject key() { + return delegate.key(); + } + + /** + * @return Entry value. + */ + @Override public CacheObject value() { + return delegate.value(); + } + + /** + * @return Expire time. + */ + @Override public long expireTime() { + return delegate.expireTime(); + } + + /** + * @return Time to live. + */ + @Override public long ttl() { + return delegate.ttl(); + } + + /** + * @return Time to live. + */ + @Override public GridCacheVersion version() { + return delegate.version(); + } + } + + /** */ + public Collection> initialValues( + List infos, + AffinityTopologyVersion topVer, + GridCacheContext cctx, + int partId, + boolean preload, + GridDrType drType + ) throws IgniteCheckedException { + GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, true, true); + + Set skipped = new HashSet<>(); + + Collection locked = lockEntries(cctx, topVer, infos); + + try { + IgniteBiPredicate pred = + new IgniteBiPredicate() { + @Override public boolean apply(CacheDataRow row, GridCacheEntryInfo info) { + GridCacheEntryInfoEx infoEx = (GridCacheEntryInfoEx)info; + + IgnitePredicate p = + new CacheEntryInitialValuesBatch.InitialValuePredicate(infoEx.cacheEntry, info.version(), preload); + + return infoEx.update = p.apply(row); + } + }; + + cctx.offheap().updateAll(cctx, part, locked, pred); + } finally { + unlockEntries(cctx, topVer, preload, drType, locked, skipped); + } + + return F.viewReadOnly(locked, v -> new T2<>(v, v.cacheEntry), v -> !skipped.contains(v.key())); + } + + /** */ + private Collection lockEntries( + GridCacheContext cctx, + AffinityTopologyVersion topVer, + List infos + ) { + List locked = new ArrayList<>(infos.size()); + + while (true) { + Map uniqueEntries = new LinkedHashMap<>(); + + for (GridCacheEntryInfo e : infos) { + KeyCacheObject key = e.key(); + + GridCacheEntryInfoEx entryEx = new GridCacheEntryInfoEx(e); + + GridCacheEntryInfoEx old = uniqueEntries.put(key, entryEx); + + assert old == null || ATOMIC_VER_COMPARATOR.compare(old.version(), e.version()) < 0 : + "Version order mismatch: prev=" + old.version() + ", current=" + e.version(); + + GridCacheEntryEx entry = cctx.cache().entryEx(key, topVer); + + locked.add(entry); + + assert entry instanceof GridDhtCacheEntry; + + entryEx.cacheEntry = (GridDhtCacheEntry)entry; + } + + boolean retry = false; + + for (int i = 0; i < locked.size(); i++) { + GridCacheEntryEx entry = locked.get(i); + + if (entry == null) + continue; + + entry.lockEntry(); + + GridCacheEntryInfo info = infos.get(i); + + info.value(cctx.kernalContext().cacheObjects().prepareForCache(info.value(), cctx)); + + if (entry.obsolete()) { + // Unlock all locked. + for (int j = 0; j <= i; j++) { + if (locked.get(j) != null) + locked.get(j).unlockEntry(); + } + + // Clear entries. + locked.clear(); + + // Retry. + retry = true; + + break; + } + } + + if (!retry) + return uniqueEntries.values(); + } + } + + /** */ + private void unlockEntries( + GridCacheContext cctx, + AffinityTopologyVersion topVer, + boolean preload, + GridDrType drType, + Collection infos, + Set skippedKeys + ) { + // Process deleted entries before locks release. + // todo + assert cctx.deferredDelete() : this; + + try { + for (GridCacheEntryInfoEx info : infos) { + KeyCacheObject key = info.key(); + GridCacheMapEntry entry = info.cacheEntry; + + assert entry != null : key; + + if (!info.update) + continue; + + if (skippedKeys.contains(key)) + continue; + + if (entry.deleted()) { + skippedKeys.add(key); + + continue; + } +// todo +// try { +// entry.proc.finishInitialUpdate(entry, info.value(), info.expireTime(), info.ttl(), info.version(), topVer, +// drType, null, preload, false); +// } catch (IgniteCheckedException ex) { +// cctx.logger(getClass()).error("Unable to finish initial update, skip " + key, ex); +// +// skippedKeys.add(key); +// } + } + } + finally { + // At least RuntimeException can be thrown by the code above when GridCacheContext is cleaned and there is + // an attempt to use cleaned resources. + // That's why releasing locks in the finally block.. + for (GridCacheEntryInfoEx info : infos) { + GridCacheMapEntry entry = info.cacheEntry; + + if (entry != null) + entry.unlockEntry(); + } + } + + // Try evict partitions. + for (GridCacheEntryInfoEx info : infos) { + GridCacheMapEntry entry = info.cacheEntry; + + if (entry != null) + entry.onUnlock(); + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java index 8ce21c59de3d1..aa88b2e1bb04e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java @@ -1163,6 +1163,15 @@ public void onRebalanceKeyReceived() { rebalancingKeysRate.onHit(); } + /** + * Rebalance entry store callback. + */ + public void onRebalanceKeysReceived(long batchSize) { + rebalancedKeys.addAndGet(batchSize); + + rebalancingKeysRate.onHits(batchSize); + } + /** * Rebalance supply message callback. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 9aec3996c3204..8ab00495775c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -33,7 +33,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion; -import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; @@ -769,8 +768,7 @@ default boolean initialValue(CacheObject val, AffinityTopologyVersion topVer, GridDrType drType, boolean fromStore) throws IgniteCheckedException, GridCacheEntryRemovedException { - return initialValue(val, ver, null, null, TxState.NA, TxState.NA, - ttl, expireTime, preload, topVer, drType, fromStore); + return initialValue(val, ver, null, null, ttl, expireTime, preload, topVer, drType, fromStore); } /** @@ -780,8 +778,6 @@ default boolean initialValue(CacheObject val, * @param ver Version to use. * @param mvccVer Mvcc version. * @param newMvccVer New mvcc version. - * @param mvccTxState Tx state hint for mvcc version. - * @param newMvccTxState Tx state hint for new mvcc version. * @param ttl Time to live. * @param expireTime Expiration time. * @param preload Flag indicating whether entry is being preloaded. @@ -796,8 +792,6 @@ public boolean initialValue(CacheObject val, GridCacheVersion ver, @Nullable MvccVersion mvccVer, @Nullable MvccVersion newMvccVer, - byte mvccTxState, - byte newMvccTxState, long ttl, long expireTime, boolean preload, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 352ca0c6e662b..ca72e2c43257a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -133,7 +133,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme private static final byte IS_DELETED_MASK = 0x01; /** */ - private static final byte IS_UNSWAPPED_MASK = 0x02; + static final byte IS_UNSWAPPED_MASK = 0x02; /** */ private static final byte IS_EVICT_DISABLED = 0x04; @@ -225,7 +225,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme /** */ @GridToStringExclude - private final ReentrantLock lock = new ReentrantLock(); + final ReentrantLock lock = new ReentrantLock(); /** Read Lock for continuous query listener */ @GridToStringExclude @@ -2623,7 +2623,7 @@ private GridTuple3 ttlAndExpireTime(IgniteCacheExpiryPolicy * @param topVer Topology version. * @throws IgniteCheckedException In case of exception. */ - private void drReplicate(GridDrType drType, @Nullable CacheObject val, GridCacheVersion ver, AffinityTopologyVersion topVer) + protected void drReplicate(GridDrType drType, @Nullable CacheObject val, GridCacheVersion ver, AffinityTopologyVersion topVer) throws IgniteCheckedException { if (cctx.isDrEnabled() && drType != DR_NONE && !isInternal()) cctx.dr().replicate(key, val, rawTtl(), rawExpireTime(), ver.conflictVersion(), drType, topVer); @@ -3313,8 +3313,6 @@ private boolean skipInterceptor(@Nullable GridCacheVersion explicitVer) { GridCacheVersion ver, MvccVersion mvccVer, MvccVersion newMvccVer, - byte mvccTxState, - byte newMvccTxState, long ttl, long expireTime, boolean preload, @@ -3322,208 +3320,17 @@ private boolean skipInterceptor(@Nullable GridCacheVersion explicitVer) { GridDrType drType, boolean fromStore ) throws IgniteCheckedException, GridCacheEntryRemovedException { - ensureFreeSpace(); - - boolean deferred = false; - boolean obsolete = false; - - GridCacheVersion oldVer = null; - - lockListenerReadLock(); - lockEntry(); - - try { - checkObsolete(); - - boolean walEnabled = !cctx.isNear() && cctx.group().persistenceEnabled() && cctx.group().walEnabled(); - - long expTime = expireTime < 0 ? CU.toExpireTime(ttl) : expireTime; - - val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx); - - final boolean unswapped = ((flags & IS_UNSWAPPED_MASK) != 0); - - boolean update; - - IgnitePredicate p = new IgnitePredicate() { - @Override public boolean apply(@Nullable CacheDataRow row) { - boolean update0; - - GridCacheVersion currentVer = row != null ? row.version() : GridCacheMapEntry.this.ver; - - boolean isStartVer = cctx.shared().versions().isStartVersion(currentVer); - - if (cctx.group().persistenceEnabled()) { - if (!isStartVer) { - if (cctx.atomic()) - update0 = ATOMIC_VER_COMPARATOR.compare(currentVer, ver) < 0; - else - update0 = currentVer.compareTo(ver) < 0; - } - else - update0 = true; - } - else - update0 = isStartVer; - - update0 |= (!preload && deletedUnlocked()); - - return update0; - } - }; - - if (unswapped) { - update = p.apply(null); - - if (update) { - // If entry is already unswapped and we are modifying it, we must run deletion callbacks for old value. - long oldExpTime = expireTimeUnlocked(); - - if (oldExpTime > 0 && oldExpTime < U.currentTimeMillis()) { - if (onExpired(this.val, null)) { - if (cctx.deferredDelete()) { - deferred = true; - oldVer = this.ver; - } - else if (val == null) - obsolete = true; - } - } - - if (cctx.mvccEnabled()) { - assert !preload; - - cctx.offheap().mvccInitialValue(this, val, ver, expTime, mvccVer, newMvccVer); - } - else - storeValue(val, expTime, ver); - } - } - else { - if (cctx.mvccEnabled()) { - // cannot identify whether the entry is exist on the fly - unswap(false); - - if (update = p.apply(null)) { - // If entry is already unswapped and we are modifying it, we must run deletion callbacks for old value. - long oldExpTime = expireTimeUnlocked(); - long delta = (oldExpTime == 0 ? 0 : oldExpTime - U.currentTimeMillis()); - - if (delta < 0) { - if (onExpired(this.val, null)) { - if (cctx.deferredDelete()) { - deferred = true; - oldVer = this.ver; - } - else if (val == null) - obsolete = true; - } - } - - assert !preload; - - cctx.offheap().mvccInitialValue(this, val, ver, expTime, mvccVer, newMvccVer); - } - } - else - // Optimization to access storage only once. - update = storeValue(val, expTime, ver, p); - } - - if (update) { - update(val, expTime, ttl, ver, true); - - boolean skipQryNtf = false; - - if (val == null) { - skipQryNtf = true; - - if (cctx.deferredDelete() && !deletedUnlocked() && !isInternal()) - deletedUnlocked(true); - } - else if (deletedUnlocked()) - deletedUnlocked(false); - - long updateCntr = 0; - - if (!preload) - updateCntr = nextPartitionCounter(topVer, true, null); - - if (walEnabled) { - if (cctx.mvccEnabled()) { - cctx.shared().wal().log(new MvccDataRecord(new MvccDataEntry( - cctx.cacheId(), - key, - val, - val == null ? DELETE : GridCacheOperation.CREATE, - null, - ver, - expireTime, - partition(), - updateCntr, - mvccVer == null ? MvccUtils.INITIAL_VERSION : mvccVer - ))); - } else { - cctx.shared().wal().log(new DataRecord(new DataEntry( - cctx.cacheId(), - key, - val, - val == null ? DELETE : GridCacheOperation.CREATE, - null, - ver, - expireTime, - partition(), - updateCntr - ))); - } - } - - drReplicate(drType, val, ver, topVer); - - if (!skipQryNtf) { - cctx.continuousQueries().onEntryUpdated( - key, - val, - null, - this.isInternal() || !this.context().userCache(), - this.partition(), - true, - preload, - updateCntr, - null, - topVer); - } - - onUpdateFinished(updateCntr); - - if (!fromStore && cctx.store().isLocal()) { - if (val != null) - cctx.store().put(null, key, val, ver); - } - - return true; - } - return false; - } - finally { - unlockEntry(); - unlockListenerReadLock(); - - // It is necessary to execute these callbacks outside of lock to avoid deadlocks. - - if (obsolete) { - onMarkedObsolete(); - - cctx.cache().removeEntry(this); - } - - if (deferred) { - assert oldVer != null; - - cctx.onDeferredDelete(this, oldVer); - } - } + return new CacheEntryInitialValuesBatch(cctx, partition()).add(this, val, + ver, + mvccVer, + newMvccVer, + ttl, + expireTime, + preload, + topVer, + drType, + fromStore).initValues() == 1; } /** @@ -4054,7 +3861,7 @@ private GridCacheVersion nextVersion() { * @return {@code True} if entry was marked as removed. * @throws IgniteCheckedException If failed. */ - private boolean onExpired(CacheObject expiredVal, GridCacheVersion obsoleteVer) throws IgniteCheckedException { + protected boolean onExpired(CacheObject expiredVal, GridCacheVersion obsoleteVer) throws IgniteCheckedException { assert expiredVal != null; boolean rmvd = false; @@ -4502,7 +4309,7 @@ protected void removeValue() throws IgniteCheckedException { /** * Evicts necessary number of data pages if per-page eviction is configured in current {@link DataRegion}. */ - private void ensureFreeSpace() throws IgniteCheckedException { + protected void ensureFreeSpace() throws IgniteCheckedException { // Deadlock alert: evicting data page causes removing (and locking) all entries on the page one by one. assert !lock.isHeldByCurrentThread(); @@ -5017,7 +4824,7 @@ private int extrasSize() { * in order to ensure that the entry update is completed and existing continuous * query notified before the next cache listener update */ - private void lockListenerReadLock() { + void lockListenerReadLock() { listenerLock.readLock().lock(); } @@ -5026,7 +4833,7 @@ private void lockListenerReadLock() { * * @see #lockListenerReadLock() */ - private void unlockListenerReadLock() { + void unlockListenerReadLock() { listenerLock.readLock().unlock(); } @@ -5684,7 +5491,7 @@ private LazyValueEntry(KeyCacheObject key, boolean keepBinary) { /** * */ - private static class UpdateClosure implements IgniteCacheOffheapManager.OffheapInvokeClosure { + static class UpdateClosure implements IgniteCacheOffheapManager.OffheapInvokeClosure { /** */ private final GridCacheMapEntry entry; @@ -5707,7 +5514,7 @@ private static class UpdateClosure implements IgniteCacheOffheapManager.OffheapI private CacheDataRow oldRow; /** */ - private IgniteTree.OperationType treeOp = IgniteTree.OperationType.PUT; + IgniteTree.OperationType treeOp = IgniteTree.OperationType.PUT; /** * @param entry Entry. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index b7e8ec717fc38..8024c4e303485 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache; +import java.util.Collection; import java.util.List; import java.util.Map; import javax.cache.Cache; @@ -46,6 +47,7 @@ import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.lang.GridIterator; import org.apache.ignite.internal.util.lang.IgniteInClosure2X; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteBiTuple; import org.jetbrains.annotations.Nullable; @@ -188,6 +190,20 @@ public boolean expire(GridCacheContext cctx, IgniteInClosure2X entries, + IgniteBiPredicate pred + ) throws IgniteCheckedException; + /** * @param cctx Cache context. * @param key Key. @@ -718,6 +734,19 @@ void update( long expireTime, @Nullable CacheDataRow oldRow) throws IgniteCheckedException; + + /** + * @param cctx Cache context. + * @param entries Entries. + * @param pred Entry update predicate. + * @throws IgniteCheckedException If failed. + */ + public void updateAll( + GridCacheContext cctx, + Collection entries, + IgniteBiPredicate pred + ) throws IgniteCheckedException; + /** * @param cctx Cache context. * @param key Key. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 04443be2cf7d7..189fedc24d040 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -36,7 +37,6 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageMvccMarkUpdatedRecord; @@ -103,9 +103,11 @@ import org.apache.ignite.internal.util.lang.GridIterator; import org.apache.ignite.internal.util.lang.IgniteInClosure2X; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteInClosure; @@ -434,6 +436,16 @@ private Iterator cacheData(boolean primary, boolean backup, Affi dataStore(part).invoke(cctx, key, c); } + /** {@inheritDoc} */ + @Override public void updateAll( + GridCacheContext cctx, + GridDhtLocalPartition part, + Collection entries, + IgniteBiPredicate pred + ) throws IgniteCheckedException { + dataStore(part).updateAll(cctx, entries, pred); + } + /** {@inheritDoc} */ @Override public void update( GridCacheContext cctx, @@ -1607,6 +1619,31 @@ private boolean canUpdateOldRow(GridCacheContext cctx, @Nullable CacheDataRow ol } } + + /** {@inheritDoc} */ + @Override public void updateAll( + GridCacheContext cctx, + Collection entries, + IgniteBiPredicate pred + ) throws IgniteCheckedException { + int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; + + List searchRows = new ArrayList<>(entries.size()); + + boolean ordered = true; + + KeyCacheObject last = null; + + for (GridCacheEntryInfo entry : entries) { + searchRows.add(new SearchRow(cacheId, entry.key())); + + if (ordered && last != null && last.hashCode() >= entry.key().hashCode()) + ordered = false; + } + + updateAll0(cctx, searchRows, entries, pred, ordered); + } + /** * @param cctx Cache context. * @param row Search row. @@ -1646,6 +1683,153 @@ private void invoke0(GridCacheContext cctx, CacheSearchRow row, OffheapInvokeClo } } + /** + * @param cctx Cache context. + * @param keys Search rows. + * @param sorted Sorted flag. + * @param entries Entries. + * @param pred Entry update predicate. + * @throws IgniteCheckedException If failed. + */ + private void updateAll0( + GridCacheContext cctx, + List keys, + Collection entries, + IgniteBiPredicate pred, + boolean sorted + ) throws IgniteCheckedException { + if (!busyLock.enterBusy()) + throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); + + try { + assert cctx.shared().database().checkpointLockIsHeldByThread(); + + int cacheId = cctx.group().storeCacheIdInDataPage() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; + + IoStatisticsHolder statHolder = grp.statisticsHolderData(); + + List oldRows = findAll(keys, sorted); + + assert oldRows.size() == keys.size() : "Mismatch: expect=" + keys.size() + ", actual=" + oldRows.size(); + + Iterator oldRowsIter = oldRows.iterator(); + + // Old to new rows mapping. + List> resMapping = new ArrayList<>(8); + + for (GridCacheEntryInfo entry : entries) { + CacheDataRow oldRow = oldRowsIter.next(); + + KeyCacheObject key = entry.key(); + + if (!pred.apply(oldRow, entry)) + continue; + + CacheObject val = entry.value(); + + if (val == null) { + dataTree.removex(new SearchRow(cacheId, key)); + + finishRemove(cctx, key, oldRow); + + continue; + } + + CacheObjectContext coCtx = cctx.cacheObjectContext(); + + val.valueBytes(coCtx); + key.valueBytes(coCtx); + + DataRow row = makeDataRow(key, val, entry.version(), entry.expireTime(), cacheId); + + if (canUpdateOldRow(cctx, oldRow, row) && rowStore().updateRow(oldRow.link(), row, statHolder)) + continue; + + resMapping.add(new T2<>(oldRow, row)); + } + + if (!resMapping.isEmpty()) { + cctx.shared().database().ensureFreeSpace(cctx.dataRegion()); + + Collection newRows = F.viewReadOnly(resMapping, IgniteBiTuple::get2); + + rowStore().addRows(newRows, statHolder); + + if (cacheId == CU.UNDEFINED_CACHE_ID) { + // Set cacheId before store keys into tree. + for (DataRow row : newRows) + row.cacheId(cctx.cacheId()); + } + + for (T2 mapping : resMapping) { + CacheDataRow oldRow = mapping.get1(); + CacheDataRow newRow = mapping.get2(); + + assert newRow != null : mapping; + + dataTree.putx(newRow); + + finishUpdate(cctx, newRow, oldRow); + } + } + } + finally { + busyLock.leaveBusy(); + } + } + + /** + * @param keys Keys to search. + * @param sorted Sorted flag. + * @return List of found (and not found) rows, each value of which corresponds to the input key. + * @throws IgniteCheckedException If failed. + */ + private List findAll(List keys, boolean sorted) throws IgniteCheckedException { + assert keys != null && !keys.isEmpty(); + + List res = new ArrayList<>(keys.size()); + + if (!sorted) { + for (CacheSearchRow row : keys) + res.add(dataTree.findOne(row, null, CacheDataRowAdapter.RowData.NO_KEY)); + + return res; + } + + GridCursor cur = dataTree.find(keys.get(0), keys.get(keys.size() - 1)); + Iterator itr = keys.iterator(); + + CacheDataRow foundRow = null; + KeyCacheObject foundKey = null; + + KeyCacheObject key = itr.next().key(); + + while (cur.next()) { + foundRow = cur.get(); + foundKey = foundRow.key(); + + while (itr.hasNext() && key.hashCode() <= foundKey.hashCode()) { + boolean keyFound = false; + + while (key.hashCode() == foundKey.hashCode() && !(keyFound = key.equals(foundKey)) && cur.next()) { + foundRow = cur.get(); + foundKey = foundRow.key(); + } + + res.add(keyFound ? foundRow : null); + + key = itr.next().key(); + } + } + + res.add(key.equals(foundKey) ? foundRow : null); + + for (; itr.hasNext(); itr.next()) + res.add(null); + + return res; + } + /** {@inheritDoc} */ @Override public CacheDataRow createRow( GridCacheContext cctx, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index 4f3581acd1c52..003c0851465a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -57,7 +57,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersionAware; import org.apache.ignite.internal.processors.cache.mvcc.MvccVersionAware; -import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; @@ -1380,8 +1379,6 @@ void onResult(GridDhtLockResponse res) { info.version(), cctx.mvccEnabled() ? ((MvccVersionAware)info).mvccVersion() : null, cctx.mvccEnabled() ? ((MvccUpdateVersionAware)info).newMvccVersion() : null, - cctx.mvccEnabled() ? ((MvccVersionAware)entry).mvccTxState() : TxState.NA, - cctx.mvccEnabled() ? ((MvccUpdateVersionAware)entry).newMvccTxState() : TxState.NA, info.ttl(), info.expireTime(), true, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 97a1f03da25db..87e17521e61b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -67,7 +67,6 @@ import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersionAware; import org.apache.ignite.internal.processors.cache.mvcc.MvccVersionAware; -import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; @@ -75,7 +74,6 @@ import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; -import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.GridLeanSet; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -1910,8 +1908,6 @@ void onResult(GridDhtTxPrepareResponse res) { info.version(), cacheCtx.mvccEnabled() ? ((MvccVersionAware)info).mvccVersion() : null, cacheCtx.mvccEnabled() ? ((MvccUpdateVersionAware)info).newMvccVersion() : null, - cacheCtx.mvccEnabled() ? ((MvccVersionAware)info).mvccTxState() : TxState.NA, - cacheCtx.mvccEnabled() ? ((MvccUpdateVersionAware)info).newMvccTxState() : TxState.NA, info.ttl(), info.expireTime(), true, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java index fba1d9da58b1b..a440e20a0e558 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java @@ -44,7 +44,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersionAware; import org.apache.ignite.internal.processors.cache.mvcc.MvccVersionAware; -import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridLeanSet; import org.apache.ignite.internal.util.future.GridCompoundFuture; @@ -542,8 +541,6 @@ void onResult(GridDhtForceKeysResponse res) { info.version(), cctx.mvccEnabled() ? ((MvccVersionAware)info).mvccVersion() : null, cctx.mvccEnabled() ? ((MvccUpdateVersionAware)info).newMvccVersion() : null, - cctx.mvccEnabled() ? ((MvccVersionAware)entry).mvccTxState() : TxState.NA, - cctx.mvccEnabled() ? ((MvccUpdateVersionAware)entry).newMvccTxState() : TxState.NA, info.ttl(), info.expireTime(), true, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 93602db7fb84b..75070a6a0c7fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -31,9 +31,11 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheRebalanceMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataPageEvictionMode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.IgniteInternalFuture; @@ -43,12 +45,14 @@ import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection; +import org.apache.ignite.internal.processors.cache.CacheEntryInitialValuesBatch; import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.CacheMetricsImpl; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; +import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheMvccEntryInfo; import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -57,7 +61,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersionAware; import org.apache.ignite.internal.processors.cache.mvcc.MvccVersionAware; -import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; import org.apache.ignite.internal.util.future.GridCompoundFuture; @@ -67,6 +70,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; @@ -88,6 +92,13 @@ * Thread pool for requesting partitions from other nodes and populating local cache. */ public class GridDhtPartitionDemander { + /** */ + private static final int CHECKPOINT_THRESHOLD = 100; + + /** */ + private static final boolean BATCH_PAGE_WRITE_ENABLED = + IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_DATA_STORAGE_BATCH_PAGE_WRITE, true); + /** */ private final GridCacheSharedContext ctx; @@ -859,6 +870,96 @@ public void handleSupplyMessage( } } + /** + * @param from Node which sent entry. + * @param p Partition id. + * @param infos Preloaded entries. + * @param topVer Topology version. + * @throws IgniteCheckedException If failed. + */ + private void preloadEntriesBatch(ClusterNode from, + int p, + Collection infos, + AffinityTopologyVersion topVer + ) throws IgniteCheckedException { + if (infos.isEmpty()) + return; + + grp.listenerLock().readLock().lock(); + + try { + Map> cctxs = new HashMap<>(); + + // Group by cache id. + for (GridCacheEntryInfo e : infos) { + if (log.isTraceEnabled()) + log.trace("Rebalancing key [key=" + e.key() + ", part=" + p + ", node=" + from.id() + ']'); + + cctxs.computeIfAbsent(e.cacheId(), v -> new ArrayList<>(8)).add(e); + } + + //CacheMapEntries cacheEntries = new CacheMapEntries(); + + + for (Map.Entry> cctxEntry : cctxs.entrySet()) { + GridCacheContext cctx = grp.sharedGroup() ? ctx.cacheContext(cctxEntry.getKey()) : grp.singleCacheContext(); + + if (cctx == null) + return; + + List cctxInfos = cctxEntry.getValue(); + + if (cctx.isNear()) + cctx = cctx.dhtCache().context(); + + CacheEntryInitialValuesBatch cacheEntries = new CacheEntryInitialValuesBatch(cctx, p); + + List> res = new ArrayList<>(cctxInfos.size()); + + for (GridCacheEntryInfo info : cctxInfos) { + GridCacheMapEntry cached = (GridCacheMapEntry)cctx.cache().entryEx(info.key(), topVer); + + cacheEntries.add(cached, info.value(), info.version(), null, null, info.ttl(), info.expireTime(), true, topVer, DR_PRELOAD, false); + + res.add(new T2<>(cached, info)); + } + + try { +// Iterable> cachedEntries = +// cacheEntries.initialValues(cctxInfos, topVer, cctx, p, true, DR_PRELOAD); + cacheEntries.initValues(); + + for (Map.Entry e : res) { + GridCacheMapEntry cached = e.getKey(); + + cached.touch(); + + if (!cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) || cached.isInternal()) + continue; + + cctx.events().addEvent(p, cached.key(), cctx.localNodeId(), null, null, null, + EVT_CACHE_REBALANCE_OBJECT_LOADED, e.getValue().value(), true, null, + false, null, null, null, true); + } + } + catch (GridDhtInvalidPartitionException ignored) { + if (log.isDebugEnabled()) + log.debug("Partition became invalid during rebalancing (will ignore): " + p); + } + finally { + //TODO: IGNITE-11330: Update metrics for touched cache only. + for (GridCacheContext cctx0 : grp.caches()) { + if (cctx0.statisticsEnabled()) + cctx0.cache().metrics0().onRebalanceKeysReceived(cctxInfos.size()); + } + } + } + } + finally { + grp.listenerLock().readLock().unlock(); + } + } + /** * Adds mvcc entries with theirs history to partition p. * @@ -956,42 +1057,56 @@ private void mvccPreloadEntries(AffinityTopologyVersion topVer, ClusterNode node */ private void preloadEntries(AffinityTopologyVersion topVer, ClusterNode node, int p, Iterator infos) throws IgniteCheckedException { + GridCacheContext cctx = null; + boolean batchWriteEnabled = BATCH_PAGE_WRITE_ENABLED && (grp.dataRegion().config().isPersistenceEnabled() || + grp.dataRegion().config().getPageEvictionMode() == DataPageEvictionMode.DISABLED); + // Loop through all received entries and try to preload them. while (infos.hasNext()) { ctx.database().checkpointReadLock(); try { - for (int i = 0; i < 100; i++) { + List infosBatch = batchWriteEnabled ? new ArrayList<>(CHECKPOINT_THRESHOLD) : null; + + for (int i = 0; i < CHECKPOINT_THRESHOLD; i++) { if (!infos.hasNext()) break; GridCacheEntryInfo entry = infos.next(); - if (cctx == null || (grp.sharedGroup() && entry.cacheId() != cctx.cacheId())) { - cctx = grp.sharedGroup() ? grp.shared().cacheContext(entry.cacheId()) : grp.singleCacheContext(); - - if (cctx == null) - continue; - else if (cctx.isNear()) - cctx = cctx.dhtCache().context(); - } + if (batchWriteEnabled) + infosBatch.add(entry); + else { + if (cctx == null || (grp.sharedGroup() && entry.cacheId() != cctx.cacheId())) { + cctx = grp.sharedGroup() ? + grp.shared().cacheContext(entry.cacheId()) : grp.singleCacheContext(); + + if (cctx == null) + continue; + else if (cctx.isNear()) + cctx = cctx.dhtCache().context(); + } - if (!preloadEntry(node, p, entry, topVer, cctx)) { - if (log.isTraceEnabled()) - log.trace("Got entries for invalid partition during " + - "preloading (will skip) [p=" + p + ", entry=" + entry + ']'); + if (!preloadEntry(node, p, entry, topVer, cctx)) { + if (log.isTraceEnabled()) + log.trace("Got entries for invalid partition during " + + "preloading (will skip) [p=" + p + ", entry=" + entry + ']'); - return; - } + return; + } - //TODO: IGNITE-11330: Update metrics for touched cache only. - for (GridCacheContext ctx : grp.caches()) { - if (ctx.statisticsEnabled()) - ctx.cache().metrics0().onRebalanceKeyReceived(); + //TODO: IGNITE-11330: Update metrics for touched cache only. + for (GridCacheContext ctx : grp.caches()) { + if (ctx.statisticsEnabled()) + ctx.cache().metrics0().onRebalanceKeyReceived(); + } } } + + if (batchWriteEnabled) + preloadEntriesBatch(node, p, infosBatch, topVer); } finally { ctx.database().checkpointReadUnlock(); @@ -1034,8 +1149,6 @@ private boolean preloadEntry( entry.version(), cctx.mvccEnabled() ? ((MvccVersionAware)entry).mvccVersion() : null, cctx.mvccEnabled() ? ((MvccUpdateVersionAware)entry).newMvccVersion() : null, - cctx.mvccEnabled() ? ((MvccVersionAware)entry).mvccTxState() : TxState.NA, - cctx.mvccEnabled() ? ((MvccUpdateVersionAware)entry).newMvccTxState() : TxState.NA, entry.ttl(), entry.expireTime(), true, @@ -1395,10 +1508,11 @@ private void partitionDone(UUID nodeId, int p, boolean updateState) { int remainingRoutines = remaining.size() - 1; U.log(log, "Completed " + ((remainingRoutines == 0 ? "(final) " : "") + - "rebalancing [grp=" + grp.cacheOrGroupName() + - ", supplier=" + nodeId + - ", topVer=" + topologyVersion() + - ", progress=" + (routines - remainingRoutines) + "/" + routines + "]")); + "rebalancing [grp=" + grp.cacheOrGroupName() + + ", supplier=" + nodeId + + ", topVer=" + topologyVersion() + + ", progress=" + (routines - remainingRoutines) + "/" + routines + "," + + ", batch=" + BATCH_PAGE_WRITE_ENABLED + "]")); remaining.remove(nodeId); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStructure.java index 35dd3c46ee431..6bbe1316b8ca5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStructure.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStructure.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.persistence; +import java.util.List; import java.util.Random; import java.util.concurrent.ThreadLocalRandom; import org.apache.ignite.IgniteCheckedException; @@ -307,6 +308,27 @@ protected final R write( return PageHandler.writePage(pageMem, grpId, pageId, this, h, init, wal, null, arg, intArg, lockFailed, statHolder); } + /** + * @param pageId Page ID. + * @param h Handler. + * @param init IO for new page initialization or {@code null} if it is an existing page. + * @param arg Argument. + * @param lockFailed Result in case of lock failure due to page recycling. + * @param statHolder Statistics holder to track IO operations. + * @return Handler result. + * @throws IgniteCheckedException If failed. + */ + protected final R write( + long pageId, + PageHandler h, + PageIO init, + List arg, + int intArg, + R lockFailed, + IoStatisticsHolder statHolder) throws IgniteCheckedException { + return PageHandler.writePageBatch(pageMem, grpId, pageId, this, h, init, wal, null, arg, intArg, lockFailed, statHolder); + } + /** * @param pageId Page ID. * @param h Handler. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index ab48540f6dfc5..3d2cd4fa7c415 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.persistence; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -55,6 +56,7 @@ import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; +import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheMvccEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheTtlManager; import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl; @@ -96,6 +98,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteBiTuple; import org.jetbrains.annotations.Nullable; @@ -1990,7 +1993,7 @@ private Metas getOrAllocatePartitionMetas() throws IgniteCheckedException { return delegate.mvccInitialValue(cctx, key, val, ver, expireTime, mvccVer, newMvccVer); } - + /** {@inheritDoc} */ @Override public boolean mvccApplyHistoryIfAbsent( GridCacheContext cctx, @@ -2130,6 +2133,19 @@ private Metas getOrAllocatePartitionMetas() throws IgniteCheckedException { delegate.invoke(cctx, key, c); } + /** {@inheritDoc} */ + @Override public void updateAll( + GridCacheContext cctx, + Collection entries, + IgniteBiPredicate pred + ) throws IgniteCheckedException { + assert ctx.database().checkpointLockIsHeldByThread(); + + CacheDataStore delegate = init0(false); + + delegate.updateAll(cctx, entries, pred); + } + /** {@inheritDoc} */ @Override public void remove(GridCacheContext cctx, KeyCacheObject key, int partId) throws IgniteCheckedException { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java index 4cc59f17e40b7..11eb0a92c2d28 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java @@ -250,7 +250,7 @@ protected void initPageMemoryDataStructures(DataStorageConfiguration dbCfg) thro boolean persistenceEnabled = memPlcCfg.isPersistenceEnabled(); CacheFreeListImpl freeList = new CacheFreeListImpl(0, - cctx.igniteInstanceName(), + memPlc.config().getName(), memMetrics, memPlc, null, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java index 91fd2070cc048..5b721710bde89 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.persistence; +import java.util.Collection; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.processors.cache.CacheGroupContext; @@ -111,6 +112,25 @@ public void addRow(CacheDataRow row, IoStatisticsHolder statHolder) throws Ignit } } + /** + * @param rows Rows. + * @throws IgniteCheckedException If failed. + */ + public void addRows(Collection rows, IoStatisticsHolder statHolder) throws IgniteCheckedException { + if (!persistenceEnabled) + freeList.insertDataRows(rows, statHolder); + else { + ctx.database().checkpointReadLock(); + + try { + freeList.insertDataRows(rows, statHolder); + } + finally { + ctx.database().checkpointReadUnlock(); + } + } + } + /** * @param link Row link. * @param row New row data. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java index 60aefb927ce6f..bd81d757f03ac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java @@ -17,6 +17,9 @@ package org.apache.ignite.internal.processors.cache.persistence.freelist; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.concurrent.atomic.AtomicReferenceArray; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -65,6 +68,9 @@ public abstract class AbstractFreeList extends PagesList imp /** */ private static final int MIN_PAGE_FREE_SPACE = 8; + /** */ + private static final int MAX_DATA_ROWS_PER_PAGE = 255; // Item index on data page has 1-byte length. + /** * Step between buckets in free list, measured in powers of two. * For example, for page size 4096 and 256 buckets, shift is 4 and step is 16 bytes. @@ -133,12 +139,15 @@ private final class UpdateRowHandler extends PageHandler { /** */ private final PageHandler writeRow = new WriteRowHandler(); + /** */ + private final PageHandler writeRows = new WriteRowsHandler(); + /** * */ - private final class WriteRowHandler extends PageHandler { - @Override public Integer run( - int cacheId, + private class WriteRowHandler extends PageHandler { + /** {@inheritDoc} */ + @Override public Integer run(int cacheId, long pageId, long page, long pageAddr, @@ -146,6 +155,33 @@ private final class WriteRowHandler extends PageHandler { Boolean walPlc, T row, int written, + IoStatisticsHolder statHolder + ) throws IgniteCheckedException { + written = run0(pageId, page, pageAddr, iox, row, written, statHolder); + + putPage((AbstractDataPageIO)iox, pageId, page, pageAddr, statHolder); + + return written; + } + + /** + * @param pageId Page ID. + * @param page Page absolute pointer. + * @param pageAddr Page address. + * @param iox IO. + * @param row Data row. + * @param written Count of bytes written. + * @param statHolder Statistics holder to track IO operations. + * @return Result. + * @throws IgniteCheckedException If failed. + */ + protected Integer run0( + long pageId, + long page, + long pageAddr, + PageIO iox, + T row, + int written, IoStatisticsHolder statHolder) throws IgniteCheckedException { AbstractDataPageIO io = (AbstractDataPageIO)iox; @@ -159,15 +195,6 @@ private final class WriteRowHandler extends PageHandler { written = (written == 0 && oldFreeSpace >= rowSize) ? addRow(pageId, page, pageAddr, io, row, rowSize) : addRowFragment(pageId, page, pageAddr, io, row, written, rowSize); - // Reread free space after update. - int newFreeSpace = io.getFreeSpace(pageAddr); - - if (newFreeSpace > MIN_PAGE_FREE_SPACE) { - int bucket = bucket(newFreeSpace, false); - - put(null, pageId, page, pageAddr, bucket, statHolder); - } - if (written == rowSize) evictionTracker.touchPage(pageId); @@ -185,7 +212,7 @@ private final class WriteRowHandler extends PageHandler { * @return Written size which is always equal to row size here. * @throws IgniteCheckedException If failed. */ - private int addRow( + protected int addRow( long pageId, long page, long pageAddr, @@ -225,7 +252,7 @@ private int addRow( * @return Updated written size. * @throws IgniteCheckedException If failed. */ - private int addRowFragment( + protected int addRowFragment( long pageId, long page, long pageAddr, @@ -254,6 +281,111 @@ private int addRowFragment( return written + payloadSize; } + + /** + * Put page into freelist if needed. + * + * @param iox IO. + * @param pageId Page ID. + * @param page Paege pointer. + * @param pageAddr Page address. + * @param statHolder Statistics holder to track IO operations. + */ + protected void putPage( + AbstractDataPageIO iox, + long pageId, + long page, + long pageAddr, + IoStatisticsHolder statHolder + ) throws IgniteCheckedException { + // Reread free space after update. + int newFreeSpace = ((AbstractDataPageIO)iox).getFreeSpace(pageAddr); + + if (newFreeSpace > MIN_PAGE_FREE_SPACE) { + int bucket = bucket(newFreeSpace, false); + + put(null, pageId, page, pageAddr, bucket, statHolder); + } + } + } + + /** + * + */ + private class WriteRowsHandler extends WriteRowHandler { + + /** {@inheritDoc} */ + @Override public Integer runAll( + int cacheId, + long pageId, + long page, + long pageAddr, + PageIO io, + Boolean walPlc, + List rows, + int writtenCnt, + IoStatisticsHolder statHolder + ) throws IgniteCheckedException { + AbstractDataPageIO iox = (AbstractDataPageIO)io; + + int idx = writtenCnt; + int remainSpace = iox.getFreeSpace(pageAddr); + int remainItems = MAX_DATA_ROWS_PER_PAGE - iox.getRowsCount(pageAddr); + + boolean pageIsEmpty = remainItems == MAX_DATA_ROWS_PER_PAGE; + + List rows0 = pageIsEmpty ? new ArrayList<>(8) : null; + + while (idx < rows.size() && remainItems > 0) { + T row = rows.get(idx); + + int size = row.size(); + int payloadSize = size % MIN_SIZE_FOR_DATA_PAGE; + + // If there is not enough space on page. + if (remainSpace < payloadSize) + break; + + if (pageIsEmpty) + rows0.add(row); + else { + int written = size > MIN_SIZE_FOR_DATA_PAGE ? + addRowFragment(pageId, page, pageAddr, iox, row, size - payloadSize, size) : + addRow(pageId, page, pageAddr, iox, row, size); + + assert written == size : "The object is not fully written into page: pageId=" + pageId + + ", written=" + written + ", size=" + row.size(); + } + + remainSpace -= getPageEntrySize(row, iox); + remainItems -= 1; + + idx += 1; + } + + // Update page counters only once. + if (pageIsEmpty) + iox.addRows(pageMem, pageId, pageAddr, rows0, pageSize()); + + assert idx != writtenCnt; + + evictionTracker.touchPage(pageId); + + putPage((AbstractDataPageIO)io, pageId, page, pageAddr, statHolder); + + return idx; + } + + /** */ + private int getPageEntrySize(T row, AbstractDataPageIO io) throws IgniteCheckedException { + int size = row.size(); + + int sizeSetup = size > MIN_SIZE_FOR_DATA_PAGE ? + AbstractDataPageIO.SHOW_PAYLOAD_LEN | AbstractDataPageIO.SHOW_LINK | AbstractDataPageIO.SHOW_ITEM : + AbstractDataPageIO.SHOW_PAYLOAD_LEN | AbstractDataPageIO.SHOW_ITEM; + + return io.getPageEntrySize(size % MIN_SIZE_FOR_DATA_PAGE, sizeSetup); + } } /** */ @@ -509,6 +641,96 @@ else if (PageIdUtils.tag(pageId) != PageIdAllocator.FLAG_DATA) while (written != COMPLETE); } + /** {@inheritDoc} */ + @Override public void insertDataRows(Collection rows, IoStatisticsHolder statHolder) throws IgniteCheckedException { + // Objects that don't fit into a single data page. + List largeRows = new ArrayList<>(); + + // Ordinary objects and the remaining parts of large objects. + List regularRows = new ArrayList<>(8); + + for (T dataRow : rows) { + int size = dataRow.size(); + + if (size < MIN_SIZE_FOR_DATA_PAGE) + regularRows.add(dataRow); + else { + largeRows.add(dataRow); + + if (size % MIN_SIZE_FOR_DATA_PAGE > 0) + regularRows.add(dataRow); + } + } + + for (T row : largeRows) { + int size = row.size(); + + int written = 0; + + do { + int remaining = size - written; + + if (remaining < MIN_SIZE_FOR_DATA_PAGE) + break; + + long pageId = takeEmptyPage(REUSE_BUCKET, ioVersions(), statHolder); + + AbstractDataPageIO initIo = null; + + if (pageId == 0L) { + pageId = allocateDataPage(row.partition()); + + initIo = ioVersions().latest(); + } + else if (PageIdUtils.tag(pageId) != PageIdAllocator.FLAG_DATA) + pageId = initReusedPage(pageId, row.partition(), statHolder); + else + pageId = PageIdUtils.changePartitionId(pageId, (row.partition())); + + written = write(pageId, writeRow, initIo, row, written, FAIL_I, statHolder); + + assert written != FAIL_I; // We can't fail here. + + memMetrics.incrementLargeEntriesPages(); + } + while (written != COMPLETE); + } + + for (int writtenCnt = 0; writtenCnt < regularRows.size(); ) { + T row = regularRows.get(writtenCnt); + + int size = row.size() % MIN_SIZE_FOR_DATA_PAGE; + + int minBucket = bucket(size, false); + + AbstractDataPageIO initIo = null; + + long pageId = 0; + + for (int b = REUSE_BUCKET - 1; b > minBucket; b--) { + pageId = takeEmptyPage(b, ioVersions(), statHolder); + + if (pageId != 0L) + break; + } + + if (pageId == 0) + pageId = takeEmptyPage(REUSE_BUCKET, ioVersions(), statHolder); + + if (pageId == 0) { + pageId = allocateDataPage(row.partition()); + + initIo = ioVersions().latest(); + } + else if (PageIdUtils.tag(pageId) != PageIdAllocator.FLAG_DATA) + pageId = initReusedPage(pageId, row.partition(), statHolder); + else + pageId = PageIdUtils.changePartitionId(pageId, row.partition()); + + writtenCnt = write(pageId, writeRows, initIo, regularRows, writtenCnt, FAIL_I, statHolder);; + } + } + /** * @param reusedPageId Reused page id. * @param partId Partition id. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java index e28d421bdf063..894c1aa64faca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.persistence.freelist; +import java.util.Collection; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.processors.cache.persistence.Storable; @@ -32,6 +33,12 @@ public interface FreeList { */ public void insertDataRow(T row, IoStatisticsHolder statHolder) throws IgniteCheckedException; + /** + * @param rows Rows. + * @throws IgniteCheckedException If failed. + */ + public void insertDataRows(Collection rows, IoStatisticsHolder statHolder) throws IgniteCheckedException; + /** * @param link Row link. * @param row New row data. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java index 78752bbfefc84..96df8e3a2bb0a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.pagemem.PageIdUtils; @@ -31,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler; import org.apache.ignite.internal.util.GridStringBuilder; import org.apache.ignite.internal.util.typedef.internal.SB; +import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress; @@ -41,13 +43,13 @@ public abstract class AbstractDataPageIO extends PageIO implements CompactablePageIO { /** */ - private static final int SHOW_ITEM = 0b0001; + public static final int SHOW_ITEM = 0b0001; /** */ - private static final int SHOW_PAYLOAD_LEN = 0b0010; + public static final int SHOW_PAYLOAD_LEN = 0b0010; /** */ - private static final int SHOW_LINK = 0b0100; + public static final int SHOW_LINK = 0b0100; /** */ private static final int FREE_LIST_PAGE_ID_OFF = COMMON_HEADER_END; @@ -147,7 +149,7 @@ private int getPageEntrySize(long pageAddr, int dataOff, int show) { * @param show What elements of data page entry to show in the result. * @return Data page entry size. */ - private int getPageEntrySize(int payloadLen, int show) { + public int getPageEntrySize(int payloadLen, int show) { assert payloadLen > 0 : payloadLen; int res = payloadLen; @@ -810,7 +812,7 @@ public void addRow( final int rowSize, final int pageSize ) throws IgniteCheckedException { - assert rowSize <= getFreeSpace(pageAddr) : "can't call addRow if not enough space for the whole row"; + assert rowSize <= getFreeSpace(pageAddr) : "can't call addRow if not enough space for the whole row (free=" + getFreeSpace(pageAddr) + ", required=" + rowSize + ")"; int fullEntrySize = getPageEntrySize(rowSize, SHOW_PAYLOAD_LEN | SHOW_ITEM); @@ -977,6 +979,67 @@ public void addRowFragment( addRowFragment(null, pageId, pageAddr, 0, 0, lastLink, null, payload, pageSize); } + /** + * @param pageMem Page memory. + * @param pageId Page ID to use to construct a link. + * @param pageAddr Page address. + * @param rows Data rows. + * @param pageSize Page size. + * @throws IgniteCheckedException If failed. + */ + public void addRows( + final PageMemory pageMem, + final long pageId, + final long pageAddr, + final Collection rows, + final int pageSize + ) throws IgniteCheckedException { + int maxPayloadSIze = pageSize - MIN_DATA_PAGE_OVERHEAD; + int dataOff = pageSize; + int written = 0; + int cnt = 0; + + for (T row : rows) { + int size = row.size(); + int payloadSize = size % maxPayloadSIze; + boolean fragment = size > maxPayloadSIze; + + int fullEntrySize = getPageEntrySize(payloadSize, fragment ? + SHOW_PAYLOAD_LEN | SHOW_ITEM | SHOW_LINK : SHOW_PAYLOAD_LEN | SHOW_ITEM); + + written += fullEntrySize; + dataOff -= (fullEntrySize - ITEM_SIZE); + + if (fragment) { + ByteBuffer buf = pageMem.pageBuffer(pageAddr); + + buf.position(dataOff); + + buf.putShort((short)(payloadSize | FRAGMENTED_FLAG)); + buf.putLong(row.link()); + + writeFragmentData(row, buf, 0, payloadSize); + } + else + writeRowData(pageAddr, dataOff, payloadSize, row, true); + + setItem(pageAddr, cnt, directItemFromOffset(dataOff)); + + assert checkIndex(cnt) : cnt; + assert getIndirectCount(pageAddr) <= getDirectCount(pageAddr); + + setLinkByPageId(row, pageId, cnt); + + cnt += 1; + } + + setDirectCount(pageAddr, cnt); + + setFirstEntryOffset(pageAddr, dataOff, pageSize); + + setRealFreeSpace(pageAddr, getRealFreeSpace(pageAddr) - written, pageSize); + } + /** * Adds maximum possible fragment of the given row to this data page and sets respective link to the row. * @@ -1112,6 +1175,7 @@ private int insertItem(long pageAddr, int dataOff, int directCnt, int indirectCn setItem(pageAddr, directCnt, directItemFromOffset(dataOff)); setDirectCount(pageAddr, directCnt + 1); + assert getDirectCount(pageAddr) == directCnt + 1; return directCnt; // Previous directCnt will be our itemId. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java index 5ab1bf38dbc18..07467f42c166e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.persistence.tree.util; import java.nio.ByteBuffer; +import java.util.List; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.PageSupport; @@ -70,6 +71,32 @@ public abstract R run( ) throws IgniteCheckedException; + /** + * @param cacheId Cache ID. + * @param pageId Page ID. + * @param page Page absolute pointer. + * @param pageAddr Page address. + * @param io IO. + * @param walPlc Full page WAL record policy. + * @param args Arguments. + * @param statHolder Statistics holder to track IO operations. + * @return Result. + * @throws IgniteCheckedException If failed. + */ + public R runAll( + int cacheId, + long pageId, + long page, + long pageAddr, + PageIO io, + Boolean walPlc, + List args, + int intArg, + IoStatisticsHolder statHolder + ) throws IgniteCheckedException { + throw new UnsupportedOperationException(); + } + /** * @param cacheId Cache ID. * @param pageId Page ID. @@ -308,6 +335,75 @@ public static R writePage( } } + /** + * @param pageMem Page memory. + * @param grpId Group ID. + * @param pageId Page ID. + * @param lsnr Lock listener. + * @param h Handler. + * @param init IO for new page initialization or {@code null} if it is an existing page. + * @param wal Write ahead log. + * @param walPlc Full page WAL record policy. + * @param args Argument. + * @param lockFailed Result in case of lock failure due to page recycling. + * @param statHolder Statistics holder to track IO operations. + * @return Handler result. + * @throws IgniteCheckedException If failed. + */ + public static R writePageBatch( + PageMemory pageMem, + int grpId, + final long pageId, + PageLockListener lsnr, + PageHandler h, + PageIO init, + IgniteWriteAheadLogManager wal, + Boolean walPlc, + List args, + int intArg, + R lockFailed, + IoStatisticsHolder statHolder + ) throws IgniteCheckedException { + boolean releaseAfterWrite = true; + + long page = pageMem.acquirePage(grpId, pageId, statHolder); + + try { + long pageAddr = writeLock(pageMem, grpId, pageId, page, lsnr, false); + + if (pageAddr == 0L) + return lockFailed; + + boolean ok = false; + + try { + if (init != null) { + // It is a new page and we have to initialize it. + doInitPage(pageMem, grpId, pageId, page, pageAddr, init, wal); + walPlc = FALSE; + } + else + init = PageIO.getPageIO(pageAddr); + + R res = h.runAll(grpId, pageId, page, pageAddr, init, walPlc, args, intArg, statHolder); + + ok = true; + + return res; + } + finally { + assert PageIO.getCrc(pageAddr) == 0; //TODO GG-11480 + + if (releaseAfterWrite = h.releaseAfterWrite(grpId, pageId, page, pageAddr, null, 0)) + writeUnlock(pageMem, grpId, pageId, page, pageAddr, lsnr, walPlc, ok); + } + } + finally { + if (releaseAfterWrite) + pageMem.releasePage(grpId, pageId, page); + } + } + /** * @param pageMem Page memory. * @param grpId Group ID. diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index 8d1ab878f2970..4d46ba6f5ad89 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -704,8 +704,6 @@ void recheckLock() { GridCacheVersion ver, MvccVersion mvccVer, MvccVersion newMvccVer, - byte mvccTxState, - byte newMvccTxState, long ttl, long expireTime, boolean preload, diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListPreloadWithBatchUpdatesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListPreloadWithBatchUpdatesTest.java new file mode 100644 index 0000000000000..bbe6f59a7258d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListPreloadWithBatchUpdatesTest.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.database; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.testframework.junits.WithSystemProperty; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_BASELINE_AUTO_ADJUST_ENABLED; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_DATA_STORAGE_BATCH_PAGE_WRITE; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD; +import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_PAGE_SIZE; + +/** + * + */ +@RunWith(Parameterized.class) +public class FreeListPreloadWithBatchUpdatesTest extends GridCommonAbstractTest { + /** */ + private static final long DEF_REG_SIZE_INIT = 3400 * 1024 * 1024L; + + /** */ + private static final long DEF_REG_SIZE = 6144 * 1024 * 1024L; + + /** */ + private static final int LARGE_PAGE = 16 * 1024; + + /** */ + private static final String DEF_CACHE_NAME = "some-cache"; + + /** */ + @Parameterized.Parameters(name = " atomicity={0}, persistence={1}, pageSize={2}") + public static Iterable setup() { + return Arrays.asList(new Object[][]{ + {CacheAtomicityMode.ATOMIC, false, DFLT_PAGE_SIZE}, + {CacheAtomicityMode.ATOMIC, true, DFLT_PAGE_SIZE}, + {CacheAtomicityMode.ATOMIC, false, LARGE_PAGE}, + {CacheAtomicityMode.ATOMIC, true, LARGE_PAGE}, + {CacheAtomicityMode.TRANSACTIONAL, false, DFLT_PAGE_SIZE}, + {CacheAtomicityMode.TRANSACTIONAL, true, DFLT_PAGE_SIZE}, + {CacheAtomicityMode.TRANSACTIONAL, false, LARGE_PAGE}, + {CacheAtomicityMode.TRANSACTIONAL, true, LARGE_PAGE}, + {CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT, false, DFLT_PAGE_SIZE}, + {CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT, true, DFLT_PAGE_SIZE} + }); + } + + /** */ + @Parameterized.Parameter + public CacheAtomicityMode cacheAtomicityMode; + + /** */ + @Parameterized.Parameter(1) + public boolean persistence; + + /** */ + @Parameterized.Parameter(2) + public Integer pageSize; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + DataRegionConfiguration def = new DataRegionConfiguration(); + def.setInitialSize(DEF_REG_SIZE_INIT); + def.setMaxSize(DEF_REG_SIZE); + def.setPersistenceEnabled(persistence); + + DataStorageConfiguration storeCfg = new DataStorageConfiguration(); + + storeCfg.setDefaultDataRegionConfiguration(def); + storeCfg.setPageSize(pageSize); + + if (persistence) { + storeCfg.setWalMode(WALMode.LOG_ONLY); + storeCfg.setMaxWalArchiveSize(Integer.MAX_VALUE); + } + + cfg.setDataStorageConfiguration(storeCfg); + + cfg.setCacheConfiguration(new CacheConfiguration(DEF_CACHE_NAME) + .setAffinity(new RendezvousAffinityFunction(false, 1)) + .setCacheMode(CacheMode.REPLICATED) + .setAtomicityMode(cacheAtomicityMode)); + + return cfg; + } + + /** + * + */ + @Before + public void before() throws Exception { + cleanPersistenceDir(); + } + + /** + * + */ + @After + public void after() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** + * @throws Exception If failed. + */ + @Test + @WithSystemProperty(key = IGNITE_DATA_STORAGE_BATCH_PAGE_WRITE, value = "true") + @WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "100") + @WithSystemProperty(key = IGNITE_BASELINE_AUTO_ADJUST_ENABLED, value = "false") + public void testBatchRebalance() throws Exception { + Ignite node1 = startGrid(0); + + node1.cluster().active(true); + + int cnt = 50_000; + + int minSize = 0; + int maxSize = 16384; + + Map srcMap = new HashMap<>(); + + for (int i = 0; i < cnt; i++) { + int size = maxSize == minSize ? maxSize : minSize + ThreadLocalRandom.current().nextInt(maxSize - minSize); + + byte[] obj = new byte[size]; + + srcMap.put(i, obj); + } + + try (IgniteDataStreamer streamer = node1.dataStreamer(DEF_CACHE_NAME)) { + streamer.addData(srcMap); + } + + log.info("Data loaded."); + + if (persistence) + node1.cluster().active(false); + + final IgniteEx node2 = startGrid(1); + + if (persistence) { + node1.cluster().active(true); + + node1.cluster().setBaselineTopology(node1.cluster().forServers().nodes()); + } + + log.info("Await rebalance."); + + awaitPartitionMapExchange(); + + if (persistence) + forceCheckpoint(node1); + + node1.close(); + + IgniteCache cache = node2.cache(DEF_CACHE_NAME); + + validateCacheEntries(cache, srcMap); + + // Check WAL rebalance. + if (persistence) { + log.info("Updating values on node #1."); + + // Update existing extries. + for (int i = 100; i < 5_000; i++) { + if (i % 3 == 0) { + cache.remove(i); + + srcMap.remove(i); + } + else { + byte[] bytes = new byte[ThreadLocalRandom.current().nextInt(maxSize)]; + + Arrays.fill(bytes, (byte)1); + + srcMap.put(i, bytes); + cache.put(i, bytes); + } + } + + // New entries. + for (int i = cnt; i < cnt + 1_000; i++) { + byte[] bytes = new byte[ThreadLocalRandom.current().nextInt(maxSize)]; + + srcMap.put(i, bytes); + cache.put(i, bytes); + } + + forceCheckpoint(node2); + + log.info("Starting node #1"); + + node1 = startGrid(0); + + node1.cluster().active(true); + + node1.cluster().setBaselineTopology(node1.cluster().forServers().nodes()); + + log.info("Await rebalance on node #1."); + + awaitPartitionMapExchange(); + + log.info("Stop node #2."); + + node2.close(); + + validateCacheEntries(node1.cache(DEF_CACHE_NAME), srcMap); + } + } + + /** + * @param cache Cache. + * @param map Map. + */ + @SuppressWarnings("unchecked") + private void validateCacheEntries(IgniteCache cache, Map map) { + int size = cache.size(); + + assertEquals("Cache size mismatch.", map.size(), size); + + log.info("Validation " + cache.getName() + ", size=" + size); + + for (Map.Entry e : map.entrySet()) { + String idx = "key=" + e.getKey(); + + assertEquals(idx, e.getValue().length, ((byte[])cache.get(e.getKey())).length); + } + } +}