Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,9 @@ public final class IgniteSystemProperties {
*/
public static final String IGNITE_DISCOVERY_DISABLE_CACHE_METRICS_UPDATE = "IGNITE_DISCOVERY_DISABLE_CACHE_METRICS_UPDATE";

/** */
public static final String IGNITE_DATA_STORAGE_BATCH_PAGE_WRITE = "IGNITE_DATA_STORAGE_BATCH_PAGE_WRITE";

/**
* Maximum number of different partitions to be extracted from between expression within sql query.
* In case of limit exceeding all partitions will be used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,15 @@ public void onRebalanceKeyReceived() {
rebalancingKeysRate.onHit();
}

/**
* Rebalance entry store callback.
*/
public void onRebalanceKeysReceived(long batchSize) {
rebalancedKeys.addAndGet(batchSize);

rebalancingKeysRate.onHits(batchSize);
}

/**
* Rebalance supply message callback.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,42 @@ default boolean initialValue(CacheObject val,
* @throws IgniteCheckedException In case of error.
* @throws GridCacheEntryRemovedException If entry was removed.
*/
default boolean initialValue(CacheObject val,
GridCacheVersion ver,
@Nullable MvccVersion mvccVer,
@Nullable MvccVersion newMvccVer,
byte mvccTxState,
byte newMvccTxState,
long ttl,
long expireTime,
boolean preload,
AffinityTopologyVersion topVer,
GridDrType drType,
boolean fromStore) throws IgniteCheckedException, GridCacheEntryRemovedException {
return initialValue(val, ver, null, null, TxState.NA, TxState.NA,
ttl, expireTime, preload, topVer, drType, fromStore, null);
}

/**
* Sets new value if current version is <tt>0</tt>
*
* @param val New value.
* @param ver Version to use.
* @param mvccVer Mvcc version.
* @param newMvccVer New mvcc version.
* @param mvccTxState Tx state hint for mvcc version.
* @param newMvccTxState Tx state hint for new mvcc version.
* @param ttl Time to live.
* @param expireTime Expiration time.
* @param preload Flag indicating whether entry is being preloaded.
* @param topVer Topology version.
* @param drType DR type.
* @param fromStore {@code True} if value was loaded from store.
* @param row Pre-created data row, associated with this cache entry.
* @return {@code True} if initial value was set.
* @throws IgniteCheckedException In case of error.
* @throws GridCacheEntryRemovedException If entry was removed.
*/
public boolean initialValue(CacheObject val,
GridCacheVersion ver,
@Nullable MvccVersion mvccVer,
Expand All @@ -803,7 +839,8 @@ public boolean initialValue(CacheObject val,
boolean preload,
AffinityTopologyVersion topVer,
GridDrType drType,
boolean fromStore) throws IgniteCheckedException, GridCacheEntryRemovedException;
boolean fromStore,
@Nullable CacheDataRow row) throws IgniteCheckedException, GridCacheEntryRemovedException;

/**
* Create versioned entry for this cache entry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3320,7 +3320,8 @@ private boolean skipInterceptor(@Nullable GridCacheVersion explicitVer) {
boolean preload,
AffinityTopologyVersion topVer,
GridDrType drType,
boolean fromStore
boolean fromStore,
CacheDataRow row
) throws IgniteCheckedException, GridCacheEntryRemovedException {
ensureFreeSpace();

Expand Down Expand Up @@ -3427,7 +3428,7 @@ else if (val == null)
}
else
// Optimization to access storage only once.
update = storeValue(val, expTime, ver, p);
update = storeValue(val, expTime, ver, p, row);
}

if (update) {
Expand Down Expand Up @@ -4260,6 +4261,14 @@ protected boolean storeValue(@Nullable CacheObject val,
return storeValue(val, expireTime, ver, null);
}

protected boolean storeValue(
@Nullable CacheObject val,
long expireTime,
GridCacheVersion ver,
@Nullable IgnitePredicate<CacheDataRow> predicate) throws IgniteCheckedException {
return storeValue(val, expireTime, ver, predicate, null);
}

/**
* Stores value in off-heap.
*
Expand All @@ -4274,10 +4283,11 @@ protected boolean storeValue(
@Nullable CacheObject val,
long expireTime,
GridCacheVersion ver,
@Nullable IgnitePredicate<CacheDataRow> predicate) throws IgniteCheckedException {
@Nullable IgnitePredicate<CacheDataRow> predicate,
@Nullable CacheDataRow dataRow) throws IgniteCheckedException {
assert lock.isHeldByCurrentThread();

UpdateClosure closure = new UpdateClosure(this, val, ver, expireTime, predicate);
UpdateClosure closure = new UpdateClosure(this, val, ver, expireTime, predicate, dataRow);

cctx.offheap().invoke(cctx, key, localPartition(), closure);

Expand Down Expand Up @@ -5717,12 +5727,13 @@ private static class UpdateClosure implements IgniteCacheOffheapManager.OffheapI
* @param predicate Optional predicate.
*/
UpdateClosure(GridCacheMapEntry entry, @Nullable CacheObject val, GridCacheVersion ver, long expireTime,
@Nullable IgnitePredicate<CacheDataRow> predicate) {
@Nullable IgnitePredicate<CacheDataRow> predicate, @Nullable CacheDataRow newRow) {
this.entry = entry;
this.val = val;
this.ver = ver;
this.expireTime = expireTime;
this.predicate = predicate;
this.newRow = newRow;
}

/** {@inheritDoc} */
Expand All @@ -5742,6 +5753,12 @@ private static class UpdateClosure implements IgniteCacheOffheapManager.OffheapI
}

if (val != null) {
if (newRow != null) {
treeOp = IgniteTree.OperationType.PUT;

return;
}

newRow = entry.cctx.offheap().dataStore(entry.localPartition()).createRow(
entry.cctx,
entry.key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.processors.cache;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import javax.cache.Cache;
Expand Down Expand Up @@ -188,6 +189,18 @@ public boolean expire(GridCacheContext cctx, IgniteInClosure2X<GridCacheEntryEx,
public void invoke(GridCacheContext cctx, KeyCacheObject key, GridDhtLocalPartition part, OffheapInvokeClosure c)
throws IgniteCheckedException;

/**
* @param cctx Cache context.
* @param part Partition.
* @param entries Entries.
* @throws IgniteCheckedException If failed.
*/
public List<CacheDataRow> storeAll(
GridCacheContext cctx,
GridDhtLocalPartition part,
Collection<? extends GridCacheEntryInfo> entries
) throws IgniteCheckedException;

/**
* @param cctx Cache context.
* @param key Key.
Expand Down Expand Up @@ -718,6 +731,18 @@ void update(
long expireTime,
@Nullable CacheDataRow oldRow) throws IgniteCheckedException;


/**
* @param cctx Cache context.
* @param entries Entries.
* @return Created rows.
* @throws IgniteCheckedException If failed.
*/
public List<CacheDataRow> storeAll(
GridCacheContext cctx,
Collection<? extends GridCacheEntryInfo> entries
) throws IgniteCheckedException;

/**
* @param cctx Cache context.
* @param key Key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -36,7 +37,6 @@
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageMvccMarkUpdatedRecord;
Expand Down Expand Up @@ -434,6 +434,15 @@ private Iterator<CacheDataStore> cacheData(boolean primary, boolean backup, Affi
dataStore(part).invoke(cctx, key, c);
}

/** {@inheritDoc} */
@Override public List<CacheDataRow> storeAll(
GridCacheContext cctx,
GridDhtLocalPartition part,
Collection<? extends GridCacheEntryInfo> entries
) throws IgniteCheckedException {
return dataStore(part).storeAll(cctx, entries);
}

/** {@inheritDoc} */
@Override public void update(
GridCacheContext cctx,
Expand Down Expand Up @@ -1617,6 +1626,8 @@ private void invoke0(GridCacheContext cctx, CacheSearchRow row, OffheapInvokeClo
throws IgniteCheckedException {
assert cctx.shared().database().checkpointLockIsHeldByThread();

boolean preCreated = c.newRow() != null;

dataTree.invoke(row, CacheDataRowAdapter.RowData.NO_KEY, c);

switch (c.operationType()) {
Expand All @@ -1639,13 +1650,59 @@ private void invoke0(GridCacheContext cctx, CacheSearchRow row, OffheapInvokeClo
}

case NOOP:
// Remove pre created row (preloading fallback).
if (preCreated)
rowStore.removeRow(c.newRow().link(), grp.statisticsHolderData());

break;

default:
assert false : c.operationType();
}
}

/** {@inheritDoc} */
@Override public List<CacheDataRow> storeAll(
GridCacheContext cctx,
Collection<? extends GridCacheEntryInfo> infos
) throws IgniteCheckedException {
if (!busyLock.enterBusy())
throw new NodeStoppingException("Operation has been cancelled (node is stopping).");

List<CacheDataRow> rows = new ArrayList<>(infos.size());

try {
assert cctx.shared().database().checkpointLockIsHeldByThread();

assert !cctx.mvccEnabled();

int cacheId = cctx.group().storeCacheIdInDataPage() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;

IoStatisticsHolder statHolder = grp.statisticsHolderData();

for (GridCacheEntryInfo info : infos) {
KeyCacheObject key = info.key();
CacheObject val = info.value();

CacheObjectContext coCtx = cctx.cacheObjectContext();

val.valueBytes(coCtx);
key.valueBytes(coCtx);

DataRow row = makeDataRow(key, val, info.version(), info.expireTime(), cacheId);

rows.add(row);
}

rowStore().addRows(rows, statHolder);
}
finally {
busyLock.leaveBusy();
}

return rows;
}

/** {@inheritDoc} */
@Override public CacheDataRow createRow(
GridCacheContext cctx,
Expand Down
Loading