diff --git a/examples/config/example-rebalance.xml b/examples/config/example-rebalance.xml
new file mode 100644
index 00000000000000..10f3367d4c6e11
--- /dev/null
+++ b/examples/config/example-rebalance.xml
@@ -0,0 +1,147 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 10.116.172.13:48601..48609
+ 10.116.172.15:48601..48609
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index fdb8ebc8b5e541..eea6db79b05601 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -36,6 +36,7 @@
import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor;
import org.apache.ignite.internal.stat.IoStatisticsManager;
import org.apache.ignite.internal.processors.compress.CompressionProcessor;
+import org.apache.ignite.internal.processors.diag.DiagnosticProcessor;
import org.apache.ignite.internal.worker.WorkersRegistry;
import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
import org.apache.ignite.internal.processors.authentication.IgniteAuthenticationProcessor;
@@ -467,6 +468,9 @@ public interface GridKernalContext extends Iterable {
*/
public FailureProcessor failure();
+ /** */
+ public DiagnosticProcessor diagnostic();
+
/**
* Print grid kernal memory stats (sizes of internal structures, etc.).
*
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 8131899af756f0..50b1e580a2e5e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -48,6 +48,10 @@
import org.apache.ignite.internal.managers.failover.GridFailoverManager;
import org.apache.ignite.internal.managers.indexing.GridIndexingManager;
import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor;
+import org.apache.ignite.internal.processors.compress.CompressionProcessor;
+import org.apache.ignite.internal.processors.diag.DiagnosticProcessor;
+import org.apache.ignite.internal.worker.WorkersRegistry;
import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
import org.apache.ignite.internal.processors.authentication.IgniteAuthenticationProcessor;
import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
@@ -423,6 +427,9 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
/** Failure processor. */
private FailureProcessor failureProc;
+ /** */
+ private DiagnosticProcessor diagProc;
+
/** Recovery mode flag. Flag is set to {@code false} when discovery manager started. */
private boolean recoveryMode = true;
@@ -585,7 +592,8 @@ else if (comp instanceof GridEncryptionManager)
* Processors.
* ==========
*/
-
+ else if (comp instanceof DiagnosticProcessor)
+ diagProc = (DiagnosticProcessor)comp;
else if (comp instanceof FailureProcessor)
failureProc = (FailureProcessor)comp;
else if (comp instanceof GridTaskProcessor)
@@ -1196,6 +1204,11 @@ void disconnected(boolean disconnected) {
return failureProc;
}
+ /** {@inheritDoc} */
+ @Override public DiagnosticProcessor diagnostic() {
+ return diagProc;
+ }
+
/** {@inheritDoc} */
@Override public Thread.UncaughtExceptionHandler uncaughtExceptionHandler() {
return hnd;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 95bf49c9c02507..79d4821f9216bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -140,6 +140,7 @@
import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor;
import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
+import org.apache.ignite.internal.processors.diag.DiagnosticProcessor;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.hadoop.Hadoop;
import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter;
@@ -963,6 +964,8 @@ public void start(
startProcessor(new FailureProcessor(ctx));
+ startProcessor(new DiagnosticProcessor(ctx));
+
startProcessor(new PoolProcessor(ctx));
// Closure processor should be started before all others
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 881e8688411a0c..1c38b7cced0087 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -76,6 +76,7 @@
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
+import org.apache.ignite.internal.processors.diag.DiagnosticTopics;
import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFilter;
@@ -121,6 +122,7 @@
import static org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter.RowData.NO_KEY;
import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.DUPLICATE_KEY;
import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.TRANSACTION_SERIALIZATION_ERROR;
+import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_UPDATED;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
/**
@@ -2978,6 +2980,8 @@ protected final void update(@Nullable CacheObject val, long expireTime, long ttl
assert lock.isHeldByCurrentThread();
assert ttl != CU.TTL_ZERO && ttl != CU.TTL_NOT_CHANGED && ttl >= 0 : ttl;
+ cctx.kernalContext().diagnostic().beginTrack(PRELOAD_UPDATED);
+
boolean trackNear = addTracked && isNear() && cctx.config().isEagerTtl();
long oldExpireTime = expireTimeExtras();
@@ -2994,6 +2998,8 @@ protected final void update(@Nullable CacheObject val, long expireTime, long ttl
if (trackNear && expireTime != 0 && (expireTime != oldExpireTime || isStartVersion()))
cctx.ttl().addTrackedEntry((GridNearCacheEntry)this);
+
+ cctx.kernalContext().diagnostic().endTrack(PRELOAD_UPDATED);
}
/**
@@ -3479,6 +3485,7 @@ else if (deletedUnlocked())
mvccVer == null ? MvccUtils.INITIAL_VERSION : mvccVer
)));
} else {
+ cctx.kernalContext().diagnostic().beginTrack(DiagnosticTopics.PRELOAD_ON_WAL_LOG);
cctx.shared().wal().log(new DataRecord(new DataEntry(
cctx.cacheId(),
key,
@@ -3490,12 +3497,14 @@ else if (deletedUnlocked())
partition(),
updateCntr
)));
+ cctx.kernalContext().diagnostic().endTrack(DiagnosticTopics.PRELOAD_ON_WAL_LOG);
}
}
drReplicate(drType, val, ver, topVer);
if (!skipQryNtf) {
+ cctx.kernalContext().diagnostic().beginTrack(DiagnosticTopics.PRELOAD_ON_ENTRY_UPDATED);
cctx.continuousQueries().onEntryUpdated(
key,
val,
@@ -3507,6 +3516,7 @@ else if (deletedUnlocked())
updateCntr,
null,
topVer);
+ cctx.kernalContext().diagnostic().endTrack(DiagnosticTopics.PRELOAD_ON_ENTRY_UPDATED);
}
onUpdateFinished(updateCntr);
@@ -4295,10 +4305,14 @@ protected boolean storeValue(
@Nullable IgnitePredicate predicate) throws IgniteCheckedException {
assert lock.isHeldByCurrentThread();
+ cctx.kernalContext().diagnostic().beginTrack(DiagnosticTopics.PRELOAD_OFFHEAP_INVOKE);
+
UpdateClosure closure = new UpdateClosure(this, val, ver, expireTime, predicate);
cctx.offheap().invoke(cctx, key, localPartition(), closure);
+ cctx.kernalContext().diagnostic().endTrack(DiagnosticTopics.PRELOAD_OFFHEAP_INVOKE);
+
return closure.treeOp != IgniteTree.OperationType.NOOP;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 364950c6b42d2a..cecf317b8e2be4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -138,6 +138,7 @@
import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.nextDumpTimeout;
import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.DFLT_PRELOAD_RESEND_TIMEOUT;
+import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.TOTAL;
/**
* Partition exchange manager.
@@ -3035,6 +3036,8 @@ else if (task instanceof ForceRebalanceExchangeTask) {
if (task instanceof ForceRebalanceExchangeTask)
forcedRebFut = ((ForceRebalanceExchangeTask)task).forcedRebalanceFuture();
+ cctx.kernalContext().diagnostic().beginTrack(TOTAL);
+
for (Integer order : orderMap.descendingKeySet()) {
for (Integer grpId : orderMap.get(order)) {
CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 0976f637a32b14..33b5b33e93712b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -128,6 +128,11 @@
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.unexpectedStateException;
import static org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager.EMPTY_CURSOR;
import static org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO.MVCC_INFO_SIZE;
+import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_PENDING_TREE_PUT;
+import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_PENDING_TREE_REMOVE;
+import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_TREE_ADD_ROW;
+import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_TREE_FINISH_UPDATE;
+import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_TREE_INVOKE;
import static org.apache.ignite.internal.util.IgniteTree.OperationType.NOOP;
import static org.apache.ignite.internal.util.IgniteTree.OperationType.PUT;
@@ -1660,15 +1665,20 @@ private void invoke0(GridCacheContext cctx, CacheSearchRow row, OffheapInvokeClo
try {
assert cctx.shared().database().checkpointLockIsHeldByThread();
+ ctx.kernalContext().diagnostic().beginTrack(PRELOAD_TREE_INVOKE);
+
dataTree.invoke(row, CacheDataRowAdapter.RowData.NO_KEY, c);
+ ctx.kernalContext().diagnostic().endTrack(PRELOAD_TREE_INVOKE);
+
switch (c.operationType()) {
case PUT: {
assert c.newRow() != null : c;
CacheDataRow oldRow = c.oldRow();
-
+ ctx.kernalContext().diagnostic().beginTrack(PRELOAD_TREE_FINISH_UPDATE);
finishUpdate(cctx, c.newRow(), oldRow);
+ ctx.kernalContext().diagnostic().endTrack(PRELOAD_TREE_FINISH_UPDATE);
break;
}
@@ -1705,6 +1715,8 @@ private void invoke0(GridCacheContext cctx, CacheSearchRow row, OffheapInvokeClo
DataRow dataRow = makeDataRow(key, val, ver, expireTime, cacheId);
+ ctx.kernalContext().diagnostic().beginTrack(PRELOAD_TREE_ADD_ROW);
+
if (canUpdateOldRow(cctx, oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow, grp.statisticsHolderData()))
dataRow.link(oldRow.link());
else {
@@ -1715,6 +1727,7 @@ private void invoke0(GridCacheContext cctx, CacheSearchRow row, OffheapInvokeClo
rowStore.addRow(dataRow, grp.statisticsHolderData());
}
+ ctx.kernalContext().diagnostic().endTrack(PRELOAD_TREE_ADD_ROW);
assert dataRow.link() != 0 : dataRow;
@@ -2687,13 +2700,17 @@ private void updatePendingEntries(GridCacheContext cctx, CacheDataRow newRow, @N
if (oldRow != null) {
assert oldRow.link() != 0 : oldRow;
- if (pendingTree() != null && oldRow.expireTime() != 0)
+ if (pendingTree() != null && oldRow.expireTime() != 0) {
+ cctx.kernalContext().diagnostic().beginTrack(PRELOAD_PENDING_TREE_REMOVE);
pendingTree().removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link()));
+ cctx.kernalContext().diagnostic().endTrack(PRELOAD_PENDING_TREE_REMOVE);
+ }
}
if (pendingTree() != null && expireTime != 0) {
+ cctx.kernalContext().diagnostic().beginTrack(PRELOAD_PENDING_TREE_PUT);
pendingTree().putx(new PendingRow(cacheId, expireTime, newRow.link()));
-
+ cctx.kernalContext().diagnostic().endTrack(PRELOAD_PENDING_TREE_PUT);
hasPendingEntries = true;
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index ddbb3b143fd4c4..e73ebd4c6ed473 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -78,6 +78,10 @@
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
+import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_ENTRY;
+import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.SEND_DEMAND;
+import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.SEND_RECEIVE;
+import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.TOTAL;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD;
@@ -372,6 +376,12 @@ Runnable addAssignments(
log.debug(e.getMessage());
}
});
+ else
+ fut.listen(f -> {
+ ctx.kernalContext().diagnostic().endTrack(TOTAL);
+
+ ctx.kernalContext().diagnostic().printStats();
+ });
requestPartitions(fut, assignments);
};
@@ -510,9 +520,13 @@ private void requestPartitions(final RebalanceFuture fut, GridDhtPreloaderAssign
return;
try {
+ ctx.kernalContext().diagnostic().beginTrack(SEND_DEMAND);
+
ctx.io().sendOrderedMessage(node, rebalanceTopics.get(topicId),
demandMsg.convertIfNeeded(node.version()), grp.ioPolicy(), demandMsg.timeout());
+ ctx.kernalContext().diagnostic().beginTrack(SEND_RECEIVE);
+
// Cleanup required in case partitions demanded in parallel with cancellation.
synchronized (fut) {
if (fut.isDone())
@@ -540,6 +554,9 @@ private void requestPartitions(final RebalanceFuture fut, GridDhtPreloaderAssign
fut.cancel();
}
+
+ ctx.kernalContext().diagnostic().endTrack(SEND_DEMAND);
+
}, true));
}
}
@@ -661,6 +678,8 @@ public void handleSupplyMessage(
final UUID nodeId,
final GridDhtPartitionSupplyMessage supplyMsg
) {
+ ctx.kernalContext().diagnostic().endTrack(SEND_RECEIVE);
+
AffinityTopologyVersion topVer = supplyMsg.topologyVersion();
final RebalanceFuture fut = rebalanceFut;
@@ -858,9 +877,13 @@ public void handleSupplyMessage(
if (!topologyChanged(fut) && !fut.isDone()) {
// Send demand message.
try {
+ ctx.kernalContext().diagnostic().beginTrack(SEND_DEMAND);
+
ctx.io().sendOrderedMessage(node, rebalanceTopics.get(topicId),
d.convertIfNeeded(node.version()), grp.ioPolicy(), grp.config().getRebalanceTimeout());
+ ctx.kernalContext().diagnostic().beginTrack(SEND_RECEIVE);
+
if (log.isDebugEnabled())
log.debug("Send next demand message [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]");
}
@@ -869,6 +892,9 @@ public void handleSupplyMessage(
log.debug("Supplier has left [" + demandRoutineInfo(topicId, nodeId, supplyMsg) +
", errMsg=" + e.getMessage() + ']');
}
+
+ ctx.kernalContext().diagnostic().endTrack(SEND_DEMAND);
+
}
else {
if (log.isDebugEnabled())
@@ -901,6 +927,8 @@ private boolean preloadEntry(
assert ctx.database().checkpointLockIsHeldByThread();
try {
+ ctx.kernalContext().diagnostic().beginTrack(PRELOAD_ENTRY);
+
GridCacheEntryEx cached = null;
try {
@@ -969,6 +997,9 @@ else if (log.isTraceEnabled())
throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" +
ctx.localNode() + ", node=" + from.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
}
+ finally {
+ ctx.kernalContext().diagnostic().endTrack(PRELOAD_ENTRY);
+ }
return true;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index d26d68ff20d0e9..a0637ac75bc798 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -53,6 +53,8 @@
import org.apache.ignite.spi.IgniteSpiException;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
+import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.SUPPLIER_PROCESS_MSG;
+import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.TOTAL;
/**
* Class for supplying partitions to demanding nodes.
@@ -253,6 +255,8 @@ public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemand
long maxBatchesCnt = grp.config().getRebalanceBatchesPrefetchCount();
if (sctx == null) {
+ grp.singleCacheContext().kernalContext().diagnostic().beginTrack(TOTAL);
+
if (log.isDebugEnabled())
log.debug("Starting supplying rebalancing [" + supplyRoutineInfo(topicId, nodeId, demandMsg) +
", fullPartitions=" + S.compact(demandMsg.partitions().fullSet()) +
@@ -314,6 +318,8 @@ public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemand
long batchesCnt = 0;
+ grp.singleCacheContext().kernalContext().diagnostic().beginTrack(SUPPLIER_PROCESS_MSG);
+
while (iter.hasNext()) {
if (supplyMsg.messageSize() >= msgMaxSize) {
if (++batchesCnt >= maxBatchesCnt) {
@@ -442,6 +448,10 @@ else if (iter.isPartitionMissing(p)) {
reply(topicId, demanderNode, demandMsg, supplyMsg, contextId);
+ // Print statistics for the Supplier.
+ grp.singleCacheContext().kernalContext().diagnostic().endTrack(TOTAL);
+ grp.singleCacheContext().kernalContext().diagnostic().printStats();
+
if (log.isInfoEnabled())
log.info("Finished supplying rebalancing [" + supplyRoutineInfo(topicId, nodeId, demandMsg) + "]");
}
@@ -512,6 +522,9 @@ private boolean reply(
GridDhtPartitionSupplyMessage supplyMsg,
T3 contextId
) throws IgniteCheckedException {
+
+ grp.singleCacheContext().kernalContext().diagnostic().endTrack(SUPPLIER_PROCESS_MSG);
+
try {
if (log.isDebugEnabled())
log.debug("Send next supply message [" + supplyRoutineInfo(topicId, demander.id(), demandMsg) + "]");
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
index 91fd2070cc0481..fe82904720e569 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
@@ -27,6 +27,8 @@
import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
import org.apache.ignite.internal.stat.IoStatisticsHolder;
+import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_FREELIST_REMOVE;
+
/**
* Data store for H2 rows.
*/
@@ -80,11 +82,12 @@ public void removeRow(long link, IoStatisticsHolder statHolder) throws IgniteChe
freeList.removeDataRowByLink(link, statHolder);
else {
ctx.database().checkpointReadLock();
-
+ ctx.kernalContext().diagnostic().beginTrack(PRELOAD_FREELIST_REMOVE);
try {
freeList.removeDataRowByLink(link, statHolder);
}
finally {
+ ctx.kernalContext().diagnostic().endTrack(PRELOAD_FREELIST_REMOVE);
ctx.database().checkpointReadUnlock();
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index a38af3a172466e..79fef035b28db0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -1769,8 +1769,8 @@ private SegmentArchiveResult archiveSegment(long absIdx) throws StorageException
File dstFile = new File(walArchiveDir, name);
- if (log.isInfoEnabled())
- log.info("Starting to copy WAL segment [absIdx=" + absIdx + ", segIdx=" + segIdx +
+ if (log.isDebugEnabled())
+ log.debug("Starting to copy WAL segment [absIdx=" + absIdx + ", segIdx=" + segIdx +
", origFile=" + origFile.getAbsolutePath() + ", dstFile=" + dstFile.getAbsolutePath() + ']');
try {
@@ -1792,8 +1792,8 @@ private SegmentArchiveResult archiveSegment(long absIdx) throws StorageException
", dstFile=" + dstTmpFile.getAbsolutePath() + ']', e);
}
- if (log.isInfoEnabled())
- log.info("Copied file [src=" + origFile.getAbsolutePath() +
+ if (log.isDebugEnabled())
+ log.debug("Copied file [src=" + origFile.getAbsolutePath() +
", dst=" + dstFile.getAbsolutePath() + ']');
return new SegmentArchiveResult(absIdx, origFile, dstFile);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index e56b05baefaa2c..3b3f7178fb9463 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -61,6 +61,7 @@
import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor;
import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
+import org.apache.ignite.internal.processors.diag.DiagnosticProcessor;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.hadoop.HadoopHelper;
import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter;
@@ -495,6 +496,11 @@ protected IgniteConfiguration prepareIgniteConfiguration() {
return null;
}
+ /** {@inheritDoc} */
+ @Override public DiagnosticProcessor diagnostic() {
+ return null;
+ }
+
/** {@inheritDoc} */
@Override public void printMemoryStats() {
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 8949a2f5dd1add..95091d7832710c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -137,6 +137,8 @@
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT;
+import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_INDEXING_REMOVE;
+import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.PRELOAD_INDEXING_STORE;
/**
* Query and index manager.
@@ -388,6 +390,8 @@ public void store(CacheDataRow newRow, @Nullable CacheDataRow prevRow,
if (!enterBusy())
throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
+ cctx.kernalContext().diagnostic().beginTrack(PRELOAD_INDEXING_STORE);
+
try {
if (isIndexingSpiEnabled()) {
CacheObjectContext coctx = cctx.cacheObjectContext();
@@ -403,6 +407,8 @@ public void store(CacheDataRow newRow, @Nullable CacheDataRow prevRow,
qryProc.store(cctx, newRow, prevRow, prevRowAvailable);
}
finally {
+ cctx.kernalContext().diagnostic().endTrack(PRELOAD_INDEXING_STORE);
+
invalidateResultCache();
leaveBusy();
@@ -422,6 +428,8 @@ public void remove(KeyCacheObject key, @Nullable CacheDataRow prevRow)
if (!enterBusy())
return; // Ignore index update when node is stopping.
+ cctx.kernalContext().diagnostic().beginTrack(PRELOAD_INDEXING_REMOVE);
+
try {
if (isIndexingSpiEnabled()) {
Object key0 = unwrapIfNeeded(key, cctx.cacheObjectContext());
@@ -434,6 +442,8 @@ public void remove(KeyCacheObject key, @Nullable CacheDataRow prevRow)
qryProc.remove(cctx, prevRow);
}
finally {
+ cctx.kernalContext().diagnostic().endTrack(PRELOAD_INDEXING_REMOVE);
+
invalidateResultCache();
leaveBusy();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticProcessor.java
new file mode 100644
index 00000000000000..5394c0fe202905
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticProcessor.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.diag;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.internal.processors.diag.DiagnosticTopics.TOTAL;
+
+/**
+ * General rebalance diagnostic processing API
+ */
+public class DiagnosticProcessor extends GridProcessorAdapter {
+ /** */
+ private final ConcurrentMap timings = new ConcurrentHashMap<>();
+
+ /** */
+ private final ConcurrentMap counts = new ConcurrentHashMap<>();
+
+ /** */
+ private final ConcurrentMap tracks = new ConcurrentHashMap<>();
+
+ /** */
+ private volatile boolean enabled;
+
+ /**
+ * @param ctx Context.
+ */
+ public DiagnosticProcessor(GridKernalContext ctx) {
+ super(ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteCheckedException {
+ for (DiagnosticTopics topics : DiagnosticTopics.values()) {
+ timings.put(topics.getName(), new LongAdder());
+
+ counts.put(topics.getName(), new LongAdder());
+ }
+
+ U.quietAndInfo(log, "DiagnosticProcessor started");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) throws IgniteCheckedException {
+ super.stop(cancel);
+
+ resetCounts();
+ }
+
+ /** */
+ public void beginTrack(DiagnosticTopics topic) {
+ beginTrack(topic.getName());
+ }
+
+ /** */
+ public void endTrack(DiagnosticTopics topic) {
+ endTrack(topic.getName());
+ }
+
+ /** */
+ public synchronized void beginTrack(String topic) {
+ if (TOTAL.getName().equals(topic))
+ enabled = true;
+
+ if (!enabled)
+ return;
+
+ tracks.putIfAbsent(topic, U.currentTimeMillis());
+ }
+
+ /** */
+ public synchronized void endTrack(String topic) {
+ if (!enabled)
+ return;
+
+ if (TOTAL.getName().equals(topic))
+ enabled = false;
+
+ Long value = tracks.remove(topic);
+
+ if (value == null)
+ return;
+
+ timings.get(topic).add(U.currentTimeMillis() - value);
+ counts.get(topic).increment();
+ }
+
+ /** */
+ public synchronized void printStats() {
+ Long total = timings.get(TOTAL.getName()).longValue();
+
+ String out = timings.entrySet()
+ .stream()
+ .filter(e -> e.getValue().longValue() != 0)
+ .sorted(Comparator.comparingInt(o -> DiagnosticTopics.get(o.getKey()).ordinal()))
+ .map(e -> String.format("# %s : %s ms : %.2f : %s",
+ e.getKey(),
+ e.getValue().longValue(),
+ ((float)e.getValue().longValue() / total * 100),
+ counts.get(e.getKey()).longValue()))
+ .collect(Collectors.joining("\n"));
+
+ log.info("\n# Diagnostic processor info: \n" + out);
+
+ resetCounts();
+
+ if (!tracks.isEmpty()) {
+ String str = tracks.entrySet()
+ .stream()
+ .map(e -> "# " + e.getKey() + " : " + e.getValue())
+ .collect(Collectors.joining("\n"));
+
+ log.info("\n# Unfinished tracks: \n" + str);
+ }
+
+ tracks.clear();
+ }
+
+ /** */
+ public synchronized void resetCounts() {
+ for (Map.Entry e : timings.entrySet())
+ e.getValue().reset();
+
+ for (Map.Entry c : counts.entrySet())
+ c.getValue().reset();
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticTopics.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticTopics.java
new file mode 100644
index 00000000000000..c650239b8e4d4c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/DiagnosticTopics.java
@@ -0,0 +1,74 @@
+package org.apache.ignite.internal.processors.diag;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *
+ */
+public enum DiagnosticTopics {
+ /** Root. */
+ TOTAL("# cache rebalance total"),
+ /** GridDhtPartitionDemander#preloadEntry(..) */
+ PRELOAD_ENTRY("# # preload on demander"),
+ /** GridCacheMapEntry#storeValue(..) */
+ PRELOAD_OFFHEAP_INVOKE("# # # offheap().invoke(..)"),
+ /** CacheDataStoreImpl#invoke0(..) */
+ PRELOAD_TREE_INVOKE("# # # # dataTree.invoke(..)"),
+ /** rowStore.addRow(..) */
+ PRELOAD_TREE_ADD_ROW("# # # # # FreeList.insertDataRow(..)"),
+ /** */
+ PRELOAD_TREE_FINISH_UPDATE("# # # # CacheDataStoreImpl.finishUpdate(..)"),
+ /** CacheDataStoreImpl.finishUpdate(..) */
+ PRELOAD_INDEXING_STORE("# # # # # indexing().store(..)"),
+ /** CacheDataStoreImpl.finishUpdate(..) */
+ PRELOAD_PENDING_TREE_REMOVE("# # # # # pendingTree().removex(..)"),
+ /** CacheDataStoreImpl.finishUpdate(..) */
+ PRELOAD_PENDING_TREE_PUT("# # # # # pendingTree().putx(..)"),
+ /** CacheDataStoreImpl#finishRemove(..) */
+ PRELOAD_INDEXING_REMOVE("# # # # finishRemove -> indexing().remove(..)"),
+ /** CacheDataStoreImpl#finishRemove(..) */
+ PRELOAD_FREELIST_REMOVE("# # # # finishRemove -> freeList.removeDataRowByLink(..)"),
+ /** */
+ PRELOAD_UPDATED("# # # ttl().addTrackedEntry(..)"),
+ /** */
+ PRELOAD_ON_WAL_LOG("# # # wal.log(..)"),
+ /** */
+ PRELOAD_ON_ENTRY_UPDATED("# # # continuousQueries().onEntryUpdated(..)"),
+
+ SEND_DEMAND("# message serialization"),
+ SEND_RECEIVE("# network delay between nodes"),
+ SUPPLIER_PROCESS_MSG("# make batch on supplier handleDemandMessage(..)");
+
+ /** Reverse-lookup map for getting a day from an abbreviation */
+ private static final Map lookup = new HashMap();
+
+ static {
+ for (DiagnosticTopics t : DiagnosticTopics.values())
+ lookup.put(t.getName(), t);
+ }
+
+ /** */
+ private String name;
+
+ /** */
+ DiagnosticTopics(String name) {
+ this.name = name;
+ }
+
+ /** */
+ public static DiagnosticTopics get(String topic) {
+ return lookup.get(topic);
+ }
+
+ /** */
+ public String getName() {
+ return name;
+ }
+
+ /** */
+ public DiagnosticTopics setName(String name) {
+ this.name = name;
+ return this;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/package-info.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/package-info.java
new file mode 100644
index 00000000000000..0c90411c184162
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/diag/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ * Failure processor.
+ */
+package org.apache.ignite.internal.processors.diag;
\ No newline at end of file